Device Driver Example
This example shows a (pseudo) setup and publishing flow.
Global Variables and Environment Used For Configuration
For each of the following RabbitMQ queue setups, the following global variables are expected and will be read in from the environment if blank:
Global variable | Environment variable name |
---|---|
minimumLogLevel | MINIMUM_LOG_LEVEL |
errorLogLevel | ERROR_LOG_LEVEL |
amqpHost | AMQP_HOST |
amqpPort | AMQP_PORT |
amqpProtocol | AMQP_PROTOCOL |
cacert | AMQP_CACERT |
clientCert | AMQP_CLIENT_CERT |
clientKey | AMQP_CLIENT_KEY |
n/a (username) | TELEMETRY_USER |
n/a (password) | TELEMETRY_PASSWORD |
n/a (username) | API_USER |
n/a (password) | API_PASSWORD |
n/a (username) | DRIVER_USER |
n/a (password) | DRIVER_PASSWORD |
The connectionURL can be constructed as follows:
{amqpProtocol}://{username}:{password}@{ampqHost}:{amqpPort}
Setup API Communication
At startup, the first thing that a driver needs to do is request an array of WoT Thing Descriptions (TDs) from the API via the endpoint:
GET /things
Connecting to the API via rabbitmq uses the username/password defined by environment variables, and does not require logging in and storing an access token.
RabbitMQ
amqp.SetConnConfig("API", connectionURL, cacert, clientCert, clientKey) amqp.SetAPIidentifier("API") // first time setup response = amqp.CallAPI("GET", "/things")
HTTP/REST
GET /things
Via AMQP
Exchange | Headers | Body |
---|---|---|
ase.exchange.api |
requestMethod=GET
|
This will return an array of all TD objects in the system.
Retain the TD Objects of Interest
- Filter List of Thing Descriptors
- Apply a filter to retain only the TDs this driver will handle. One
method of doing this is to match one of the semantic types listed in the
@type
field. - Process
-
- Retrieve all TDs using an API call. Loop over the returned list
and check for specific keywords in the
@type
field (for example, "Virtual" and "Meter"). Store each TD that matches the criteria.Note: Note the{thing-id}
(the unique ULID) will have to be extracted from the TD's "id" field. - With the list of known TDs the property values of each can be retrieved from the API.
- Use API call
GET /things/<thing-id>/properties
to retrieve all values. Store the values. - Now interacting with the devices described in the TDs can be started.
- Retrieve all TDs using an API call. Loop over the returned list
and check for specific keywords in the
Connect to the Driver Queue
Connect as user for the driver communication:
amqp.SetConnConfig("driver", connectionURL, cacert, clientCert, clientKey)
Next, the driver should create an AMQP queue for this drivers Driver
Queue
, which is the ECPs only means of pushing messages to the
driver.
queueName = "ase.queue.driver" + name salting (i.e. random 12 character string & current time) // e.g.: queueName = "ase.queue.driver.abcdefABCDEF.11215061" amqp.SetupAMQPqueue( driverIdentifier, // identifier used in library code queueName, // queue name false, // durable true, // delete when usused true, // exclusive false, // noWait nil, // arguments )
Then, the driver can BIND this unique queue to the ECPs DRIVER
EXCHANGE
; specifying which keywords (routing keys) it should
receive.
Bind to messages relevant to the driver, as indicated in section Exchange
ase.exchange.driver
.
For each message or thingId:
args["x-match"] = "all" args["messageType"] = {message to process} -or- args["thingID"] = {thingID} amqp.BindAMQPQueue("driver", queueName, "", "ase.exchange.driver", false, args)
Install a callback, with a reference to the message processing function. This callback will receive all messages specified by the previous binding steps.
amqp.SetupAMQPsubscriber("driver", queueName, callbackFunction)
with a callback function along the lines of:
processAMQPmessageDriver(msg) { thingID = msg.Headers["thingId"] switch msg.Headers["messageType"] { case "requestAction": // start running action for thing case "setProperty": // parse new property key/value in msg.Body case: "addThing": // process incoming TD and if matches criteria, store and connect case: "updateThing": // update thingID's model case: "deleteThing": // delete thing and stop processing ... etc. } }
Connect to the Telemetry Queue
amqp.SetConnConfig("telemetry", connectionURL, cacert, clientCert, clientKey)
Start Publishing Data
For a given body
of data in the format of a stringified json
object:
{ "thingID" : {thingID}, "timestamp" : {timestamp}, // epoch in seconds or microseconds "messageType": "propertyStatus", "data": { "key":"value", "key2":"value2" } } }
Add the following telemetry AMQP headers and send the message to the ECPs telemetry exchange:
headers["publishTag"] = "raw" headers["thingID"] = "{thing-id}" amqp.PublishAMQP("telemetry", "ase.exchange.telemetry", "", body, headers, "", "")