kafkaproducer

Create a Kafka producer.

Syntax

pobj = kafkaproducer(bootstrap_servers,acks)

pobj = kafkaproducer(bootstrap_servers,acks,[Name, value])

Inputs

bootstrap_servers
server address tag
Type: string | cell
acks
acks is the number of brokers who need to acknowledge receiving the message before it is considered a successful write.Valid values are 0, 1 or 'all'.
Type: scalar | string
Name, value
Name
Type: string
Valid options are:
Name
Value
request_timeout
Numeric value in milliseconds, controls the maximum amount of time the client will wait for the response of a request. If the response is not received before the timeout elapses the client will resend the request if necessary or fail the request if retries are exhausted.
Type: scalar
client_id
Client id is a label names the consumer.
Type: string

Outputs

pobj
Kafka consumer object.
Type:
Function Name, Syntax
Type: string
Valid options are:
Function Name
Description/Syntax/Arguments
send
Asynchronously send a record to a topic and invoke the provided callback when the send has been acknowledged.
send(topic,value) send(topic,key,value) send(topic,partition,key,value) send(topic,partition,key,value,time_stamp) send(topic,…,'callback',function_name) topic is a string where the message will be published. key is a string. value is a data to be send, supported data type is string or scalar or matrix or complex. partition is a scalar. time_stamp is a scalar. callback function_name oml function name as string, it must take three arguments: partition, offset and message. flush() function must be called frequently to trigger callback function.
partitionsfor
Get the list of topic partitions assigned to this producer.
R = partitionsfor(topic) topic is string to get partition metadata. Output R is cell array with partitions.
flush
All messages in the Producer queue will be delivered.
flush() flush(time_out) Timeout duration in milliseconds.
close
Closes Consumer.
close() close(timeout) Timeout duration in seconds, tries to close producer cleanly within the specified duration.

Examples

Producer creation and sending data
function sendcallback(partition, offset, message)
	disp('start of sendcallback')
	partition
	offset
	message
	disp('end of sendcallback')
end
%producer sending data
kproduer = kafkaproducer('192.168.40.49', 1);
partitions=kproduer.partitionsfor('compose_topic')
kproduer.send('compose_topic','hello','callback','sendcallback')
kproduer.flush()
kproduer.close()
partitions = 
{
  [1,1] 0
}
start of sendcallback
partition = 0
offset = 947
message = hello
end of sendcallback