kafkaconsumer

Create a Kafka consumer.

Attention: Valid only with Altair Communication Extension.

Syntax

cobj = kafkaconsumer(bootstrap_servers,group_id)

cobj = kafkaconsumer(bootstrap_servers,group_id,[Name, value])

Inputs

bootstrap_servers
server address tag
Type: string | cell
group_id
Group id to associate consumber to a given group.
Type: string
Name, value
Name
Type: string
Valid options are:
Name
Value
connection_timeout
Numeric value in milliseconds, time given for connection timeout.
Type: scalar
request_timeout
Numeric value in milliseconds, maximum amount of time the client will wait for the response of a request. If the response is not received before the timeout elapses the client will resend the request if necessary or fail the request if retries are exhausted.
Type: scalar
offset
Valid values are 'earliest' and 'latest' (default).
Type: string
client_id
Client id is a label names the consumer.
Type: string

Outputs

cobj
Kafka consumber object.
Type:
Function Name, Syntax
Type: string
Valid options are:
Function Name
Description/Syntax/Arguments
subscribe
Subscribe to a single or list of topics or a regex pattern.
subscribe(topic) topic is a string or cell array of strings or regex pattern to match in available topics.
unsubscribe
Unsubscribe from all topics.
Unsubscribe()
topics
Get list of topics available to view.
R = topics() Output R is a struct with key as Topics and Value as partition.
poll
Fetch data for the topics or partitions specified using one of the subscribe APIs.
R = poll() R = poll(timeout) Timeout duration in seconds to wait for record (data) on the subscribe topic. If time expires, empty record is returned. Output R is a struct with key as topics and value as data.
commit
Commit offsets to kafka asynchronously and trigger callback once commit completes.
commit() commit(offsets, 'callback',function_name) offsets is struct variable with key as topic and value as cell contains partition and offset. callback function_name oml function name as string, it must take four arguments: topic, partition, offset and message. poll() function must be called frequently to trigger callback function.
commited
Gets the last committed offsets for the given partitions.
R = commited(topic_partition, timeout) topic_partition is struct with key as topic and value as partition. Output R is last committed offsets for given partition.
getoffsets
Get first, last, next offset in a given partition.
R = getoffsets(topic_partition, option) R = getoffsets(topic_partition, option, timeout_duration) topic_partition is struct with key as topic and value as partition. option is string, and valid values are 'first','last' and 'next'. first: Get the first offset for the given partitions. last: Get the end offsets for the given partitions. next: Get the offset of the next record that will be fetched. timeout_duration is the maximum amount of time to await determination of the selected position. Output R is selected offset for a given partition or multiple partitions.
assign
Manually assign list of partitions to this consumer.
assign(topic_partition) topic_partition is struct with key as topic and value as partition.
assignment
Get the list of topic partitions assigned to this consumer.
R = assignment() Output R is cell array of struct with key as topic and value as partition.
partitionsfor
Get the list of topic partitions assigned to this consumer.
R = partitionsfor(topic) topic is string to get partition metadata. Output R is cell array with partitions.
close
Closes Consumer.
close() close(timeout) Timeout duration in seconds, tries to close consumer cleanly within the specified duration.

Examples

Consumer creation and assigning partition
kconsumer = kafkaconsumer('192.168.40.49', 'AnyGroup');
%list available consumer offsets and topics
available_topics = kconsumer.topics()
%assign topic partition
kconsumer.assign(struct('compose_topic', 0));
%check assigned topic partition
kconsumer.assignment()
kconsumer.close()
available_topics = struct [
  __consumer_offsets: 
  {
    [1,1] 0
    [2,1] 1
    [3,1] 2
	...
    [49,1] 48
    [50,1] 49
  }
  activate_test: 
  {
    [1,1] 0
  }
  compose_topic: 
  {
    [1,1] 0
  }
  t1: 
  {
    [1,1] 0
  }
  testx: 
  {
    [1,1] 0
  }
  topic: 
  {
    [1,1] 0
  }
  topic1: 
  {
    [1,1] 0
  }
]
ans = 
{
  [1,1] struct [
    compose_topic: 0
  ]
}
Subscribe to a topic and read data
kconsumer = kafkaconsumer('192.168.40.49', 'AnyGroup');
%assign topic partition
kconsumer.assign(struct('compose_topic', 0));
%check assigned topic partition
assigned_topic=kconsumer.assignment()
%subscribe to a topic
kconsumer.subscribe('compose_topic')

%producer sending data
kproduer = kafkaproducer('192.168.40.49', 1);
kproduer.send('compose_topic','hello')
kproduer.flush()

%consumer reading data
data = kconsumer.poll(5)

kconsumer.close()
kproduer.close()	
assigned_topic = 
{
  [1,1] struct [
	compose_topic: 0
  ]
}
data = struct [
  compose_topic: hello
]