Creating Apache Kafka Input Data Source
Allows Panopticon Streams to subscribe to Kafka topics on an external cluster.
Steps:
1. In the New Data Source page, select Input > Kafka in the Connector drop-down list.

2. Enter the connection details:
|
Property |
Description |
|
Zookeeper Host |
Where the Zookeeper server is located. Default is localhost. |
|
Zookeeper Port |
The port number of the Zookeeper. Default is 2181. |
|
Bootstrap Server |
After clicking By default, the value is localhost:9092,broker:29092. However, this can be overridden by specifying another bootstrap server in the External Settings text box (as specified in step 3). |
|
Schema Registry Host |
Where the Schema Registry is located. This can be in a different location from the Kafka cluster. |
|
Schema Registry Port |
The port number of the schema registry which provides the serving layer for the metadata. Default is 8081. |
3. Enter the External Settings to support authentication (i.e., username and password). Note that if the bootstrap server is not secure, then there is no need to authenticate and you may leave this text box blank.
Below is an example of system settings for an SASL authentication:

4. Click Fetch Topics to populate the Topic drop-down list.

By default, the Hide Internal Topics toggle button is enabled.

Tap the slider to turn it off. The internal Kafka topics are also displayed in the drop-down list.

Click the drop-down list to search and select the desired topic.
For Avro topics, the generated columns are displayed.

For non-Avro topics, select the Message Type: Fix, JSON, Text, or XML.
· If Text is selected, confirm the Text Qualifier, Column Delimiter, and if the first row of the message includes column headings.

· If JSON is selected, enter the Record Path which allows the identification of multiple records within the JSON document (e.g., myroot.items.item).

5. Check the From Beginning box to subscribe from the beginning to the latest messages.
If un-checked, you will only be subscribed to the latest messages.
6. Select either the period (.) or comma (,) as the Decimal Separator.
|
NOTE |
Prepend 'default:' for the elements falling under default namespace.
|
7. Click
to
fetch the schema based on the connection details. Consequently, the
list of columns with the data type found from inspecting the first
‘n’ rows of the input data source is populated and the Save button
is enabled.
8. You can also opt to load or save a copy of the column definition.
9. For non-Avro message types, click
to add columns to the Kafka
connection that represent sections of the message. Then enter or select:
|
Property |
Description |
|
Name |
The column name of the source schema. |
|
Fix Tag/JsonPath/Text Column Index/XPath |
The Fix Tag/JsonPath/Text Column Index/XPath of the source schema. |
|
Type |
The data type of the column. Can be a Text, Numeric, or Time |
|
Date Format |
The format when the data type is Time. |
|
Filter |
Defined parameters that can be used as filter. Only available for Avro, JSON, Text, and XML message types. |
|
Enabled |
Determines whether the message field should be processed. |
|
NOTE |
To parse and format times with higher than millisecond precision, the format string needs to end with a period followed by sequence of lower case Fs. There can be no additional characters following them. For example: yyyy-MM-dd HH:mm:ss.ffffff
|
10. If the Type is selected as Text, it will be listed in the Id Column drop-down list box and can be used to select a key column to manage data updates and inserts.
Note: Every message definition needs a text column to be defined as the ID column. By default, only the latest data will be loaded into memory.
Furthermore, a streaming time series window can be generated by creating a compound key with the Id Column, plus a separately specified Time ID column. This Time ID column can be from the source dataset, or alternatively automatically generated.
If the Time Id column is selected, then a scrolling time window can be specified.

For Automatic Time Id, define the Time Id Column Name.
As new data arrives from the subscription, new time slices will automatically be added, and old ones will be deleted.
If a new ID is received, a new row is added to the in-memory data set representing the Kafka topic subscription. While if an existing ID is received, an existing row is updated.
11. Check the Reset Data on Reconnect box to flush out the stale data and reload data after reconnection.
12. Click
. The
new data source is added in the Data Sources list.



, this
property displays which lists host/port pairs of Kafka servers
used to bootstrap connections to a Kafka cluster.