kafkaconsumer
Create a Kafka consumer.
Attention: Valid only with Altair Communication Extension.
Syntax
cobj = kafkaconsumer(bootstrap_servers,group_id)
cobj = kafkaconsumer(bootstrap_servers,group_id,[Name, value])
Inputs
- bootstrap_servers
- server address tag
- group_id
- Group id to associate consumber to a given group.
- Name, value
- Name
Outputs
- cobj
- Kafka consumber object.
- Function Name, Syntax
- Type: string
Examples
Consumer creation and assigning partition
kconsumer = kafkaconsumer('192.168.40.49', 'AnyGroup');
%list available consumer offsets and topics
available_topics = kconsumer.topics()
%assign topic partition
kconsumer.assign(struct('compose_topic', 0));
%check assigned topic partition
kconsumer.assignment()
kconsumer.close()
available_topics = struct [
__consumer_offsets:
{
[1,1] 0
[2,1] 1
[3,1] 2
...
[49,1] 48
[50,1] 49
}
activate_test:
{
[1,1] 0
}
compose_topic:
{
[1,1] 0
}
t1:
{
[1,1] 0
}
testx:
{
[1,1] 0
}
topic:
{
[1,1] 0
}
topic1:
{
[1,1] 0
}
]
ans =
{
[1,1] struct [
compose_topic: 0
]
}
Subscribe to a topic and read data
kconsumer = kafkaconsumer('192.168.40.49', 'AnyGroup');
%assign topic partition
kconsumer.assign(struct('compose_topic', 0));
%check assigned topic partition
assigned_topic=kconsumer.assignment()
%subscribe to a topic
kconsumer.subscribe('compose_topic')
%producer sending data
kproduer = kafkaproducer('192.168.40.49', 1);
kproduer.send('compose_topic','hello')
kproduer.flush()
%consumer reading data
data = kconsumer.poll(5)
kconsumer.close()
kproduer.close()
assigned_topic =
{
[1,1] struct [
compose_topic: 0
]
}
data = struct [
compose_topic: hello
]