kafkaconsumer

Create a Kafka consumer.

Syntax

cobj = kafkaconsumer(bootstrap_servers,group_id)

cobj = kafkaconsumer(bootstrap_servers,group_id,[Name, value])

Inputs

bootstrap_servers
server address tag
Type: string | cell
group_id
Group id to associate consumber to a given group.
Type: string
Name, value
Name
Type: string
Valid options are:
Name
Value
connection_timeout
Numeric value in milliseconds, time given for connection timeout.
Type: scalar
request_timeout
Numeric value in milliseconds, maximum amount of time the client will wait for the response of a request. If the response is not received before the timeout elapses the client will resend the request if necessary or fail the request if retries are exhausted.
Type: scalar
offset
Valid values are 'earliest' and 'latest' (default).
Type: string
client_id
Client id is a label names the consumer.
Type: string

Outputs

cobj
Kafka consumber object.
Type:
Function Name, Syntax
Type: string
Valid options are:
Function Name
Description/Syntax/Arguments
subscribe
Subscribe to a single or list of topics or a regex pattern.
subscribe(topic) topic is a string or cell array of strings or regex pattern to match in available topics.
unsubscribe
Unsubscribe from all topics.
Unsubscribe()
topics
Get list of topics available to view.
R = topics() Output R is a struct with key as Topics and Value as partition.
poll
Fetch data for the topics or partitions specified using one of the subscribe APIs.
R = poll() R = poll(timeout) Timeout duration in seconds to wait for record (data) on the subscribe topic. If time expires, empty record is returned. Output R is a struct with key as topics and value as data.
commit
Commit offsets to kafka asynchronously and trigger callback once commit completes.
commit() commit(offsets, 'callback',function_name) offsets is struct variable with key as topic and value as cell contains partition and offset. callback function_name oml function name as string, it must take four arguments: topic, partition, offset and message. poll() function must be called frequently to trigger callback function.
commited
Gets the last committed offsets for the given partitions.
R = commited(topic_partition, timeout) topic_partition is struct with key as topic and value as partition. Output R is last committed offsets for given partition.
getoffsets
Get first, last, next offset in a given partition.
R = getoffsets(topic_partition, option) R = getoffsets(topic_partition, option, timeout_duration) topic_partition is struct with key as topic and value as partition. option is string, and valid values are 'first','last' and 'next'. first: Get the first offset for the given partitions. last: Get the end offsets for the given partitions. next: Get the offset of the next record that will be fetched. timeout_duration is the maximum amount of time to await determination of the selected position. Output R is selected offset for a given partition or multiple partitions.
assign
Manually assign list of partitions to this consumer.
assign(topic_partition) topic_partition is struct with key as topic and value as partition.
assignment
Get the list of topic partitions assigned to this consumer.
R = assignment() Output R is cell array of struct with key as topic and value as partition.
partitionsfor
Get the list of topic partitions assigned to this consumer.
R = partitionsfor(topic) topic is string to get partition metadata. Output R is cell array with partitions.
close
Closes Consumer.
close() close(timeout) Timeout duration in seconds, tries to close consumer cleanly within the specified duration.