Creating MQTT Input Data Source
The MQTT connector allows:
q connection to MQTT’s message bus on a real-time streaming basis.
q Panopticon Streams server to subscribe to FIX, JSON, Text or XML based messages that are published on particular topics. The data format itself is arbitrary, and consequently, the connection includes the message definition.
q encrypted/SSL connections using a generated CA certificate file.
Steps:
1. In the New Data Source page, select Input > MQTT in the Connector drop-down list.

2. Enter the following properties:
|
Property |
Description |
|
Broker URL |
The location of the message broker. Default is tcp://localhost:1883. |
|
Topic |
The topic or the queue physical name. |
|
User Id |
The user Id that will be used to connect to MQTT. |
|
Password |
The password that will be used to connect to MQTT. |
3. To allow encrypted connections, enter the CA Certificate path of the file.
4. In MQTT, a topic consists of one or more topic levels. Enter the Topic Level Separator to use. Default is / (forward slash).
5. Select the Message Type.
6. Select either the period (.) or comma (,) as the Decimal Separator.
|
NOTE |
Prepend 'default:' for the elements falling under default namespace.
|
7. Click
to the fetch the schema
based on the connection details. Consequently, the list of columns
with the data type found from inspecting the first ‘n’ rows of the
input data source is populated and the Save button is enabled.
8. You can also opt to load or save a copy of the column definition.
9. You
can opt to click
to
add columns to the Solace connection that represent sections of the
message. Then enter or select:
|
Property |
Description |
|
Name |
The column name of the source schema. |
|
XPath/JsonPath/Fix Tag/Column Index |
The XPath/JsonPath/Fix Tag/Column Index of the source schema. |
|
Type |
The data type of the column. Can be a Text, Numeric, or Time |
|
Date Format |
The format when the data type is Time. NOTE: To parse and format times with higher than millisecond precision, the format string needs to end with a period followed by sequence of lower case Fs. There can be no additional characters following them. For example: yyyy-MM-dd HH:mm:ss.ffffff |
|
Filter |
Defined parameters that can be used as filter. Only available for JSON, Text, and XML message types. |
|
Enabled |
Determines whether the message field should be processed. |
If Message Type is set to Fix, the Add Column will display as:

f Message Type is set to JSON, the Add Column will display as:

If Message Type is set to Text, the Add column will display as:

If Message Type is set to XML, the Add column will display as:

To delete a column,
check its
or
all the column entries, check the topmost
, then click
.
10. Text for topic levels can be consumed as additional columns into the data table.
The Topic Columns section shows and allows defining data table columns and mapping them to topic hierarchy levels (index based from left, 0 based).
Like columns from message
data, manually add them by clicking
. A new entry displays.

Name can be any unique topic level within the topic name. The Level is the hierarchy level of the topic column.
Check the Enabled box to enable a topic column.
To delete a topic column,
check its
or
all the topic column entries, check the topmost
, then click
.
11. If the Type is selected as Text, it will be listed in the Id Column drop-down list box and can be used to select a key column to manage data updates and inserts.
Note: Every message definition needs a text column to be defined as the ID column. By default, only the latest data will be loaded into memory.
Furthermore, a streaming time series window can be generated by creating a compound key with the Id Column, plus a separately specified Time ID column. This Time ID column can be from the source dataset, or alternatively automatically generated.
If the Time Id column is selected, then a scrolling time window can be specified.

For Automatic Time Id, define the Time Id Column Name.
As new data arrives from the subscription new time slices will automatically be added, and old ones will be deleted.
If a new ID is received, a new row is added to the in-memory data set representing the ActiveMQ topic subscription. While if an existing ID is received, an existing row is updated.
12. Check the Reset Data on Reconnect box to flush out the stale data and reload data after reconnection.
13. Click
. The new data source is added in the Data
Sources list.


