kafkaconsumer
Create a Kafka consumer.
Syntax
cobj = kafkaconsumer(bootstrap_servers,group_id)
cobj = kafkaconsumer(bootstrap_servers,group_id,[Name, value])
Inputs
- bootstrap_servers
- server address tag
- Type: string | cell
- group_id
- Group id to associate consumber to a given group.
- Type: string
- Name, value
- Name
- Type: string
- Valid options are:
- Name
- Value
- connection_timeout
- Numeric value in milliseconds, time given for connection timeout.
- Type: scalar
- request_timeout
- Numeric value in milliseconds, maximum amount of time the client will wait for the response of a request. If the response is not received before the timeout elapses the client will resend the request if necessary or fail the request if retries are exhausted.
- Type: scalar
- offset
- Valid values are 'earliest' and 'latest' (default).
- Type: string
- client_id
- Client id is a label names the consumer.
- Type: string
Outputs
- cobj
- Kafka consumber object.
- Type:
- Function Name, Syntax
- Type: string
- Valid options are:
- Function Name
- Description/Syntax/Arguments
- subscribe
- Subscribe to a single or list of topics or a regex pattern.
- subscribe(topic) topic is a string or cell array of strings or regex pattern to match in available topics.
- unsubscribe
- Unsubscribe from all topics.
- Unsubscribe()
- topics
- Get list of topics available to view.
- R = topics() Output R is a struct with key as Topics and Value as partition.
- poll
- Fetch data for the topics or partitions specified using one of the subscribe APIs.
- R = poll() R = poll(timeout) Timeout duration in seconds to wait for record (data) on the subscribe topic. If time expires, empty record is returned. Output R is a struct with key as topics and value as data.
- commit
- Commit offsets to kafka asynchronously and trigger callback once commit completes.
- commit() commit(offsets, 'callback',function_name) offsets is struct variable with key as topic and value as cell contains partition and offset. callback function_name oml function name as string, it must take four arguments: topic, partition, offset and message. poll() function must be called frequently to trigger callback function.
- commited
- Gets the last committed offsets for the given partitions.
- R = commited(topic_partition, timeout) topic_partition is struct with key as topic and value as partition. Output R is last committed offsets for given partition.
- getoffsets
- Get first, last, next offset in a given partition.
- R = getoffsets(topic_partition, option) R = getoffsets(topic_partition, option, timeout_duration) topic_partition is struct with key as topic and value as partition. option is string, and valid values are 'first','last' and 'next'. first: Get the first offset for the given partitions. last: Get the end offsets for the given partitions. next: Get the offset of the next record that will be fetched. timeout_duration is the maximum amount of time to await determination of the selected position. Output R is selected offset for a given partition or multiple partitions.
- assign
- Manually assign list of partitions to this consumer.
- assign(topic_partition) topic_partition is struct with key as topic and value as partition.
- assignment
- Get the list of topic partitions assigned to this consumer.
- R = assignment() Output R is cell array of struct with key as topic and value as partition.
- partitionsfor
- Get the list of topic partitions assigned to this consumer.
- R = partitionsfor(topic) topic is string to get partition metadata. Output R is cell array with partitions.
- close
- Closes Consumer.
- close() close(timeout) Timeout duration in seconds, tries to close consumer cleanly within the specified duration.