Device Driver Example

This example shows a (pseudo) setup and publishing flow.

Global Variables and Environment Used For Configuration

For each of the following RabbitMQ queue setups, the following global variables are expected and will be read in from the environment if blank:

Global variable Environment variable name
minimumLogLevel MINIMUM_LOG_LEVEL
errorLogLevel ERROR_LOG_LEVEL
amqpHost AMQP_HOST
amqpPort AMQP_PORT
amqpProtocol AMQP_PROTOCOL
cacert AMQP_CACERT
clientCert AMQP_CLIENT_CERT
clientKey AMQP_CLIENT_KEY
n/a (username) TELEMETRY_USER
n/a (password) TELEMETRY_PASSWORD
n/a (username) API_USER
n/a (password) API_PASSWORD
n/a (username) DRIVER_USER
n/a (password) DRIVER_PASSWORD

The connectionURL can be constructed as follows: {amqpProtocol}://{username}:{password}@{ampqHost}:{amqpPort}

Setup API Communication

At startup, the first thing that a driver needs to do is request an array of WoT Thing Descriptions (TDs) from the API via the endpoint:

GET /things

Connecting to the API via rabbitmq uses the username/password defined by environment variables, and does not require logging in and storing an access token.

RabbitMQ

amqp.SetConnConfig("API", connectionURL, cacert, clientCert, clientKey)
amqp.SetAPIidentifier("API")    // first time setup
response = amqp.CallAPI("GET", "/things")
Note: See the API section of the documentation for the available endpoints and how to use them over AMQP with a callback queue.

HTTP/REST

GET /things

Via AMQP

Exchange Headers Body
ase.exchange.api requestMethod=GET

href=/things

This will return an array of all TD objects in the system.

Retain the TD Objects of Interest

Filter List of Thing Descriptors
Apply a filter to retain only the TDs this driver will handle. One method of doing this is to match one of the semantic types listed in the @type field.
Process
  1. Retrieve all TDs using an API call. Loop over the returned list and check for specific keywords in the @type field (for example, "Virtual" and "Meter"). Store each TD that matches the criteria.
    Note: Note the {thing-id} (the unique ULID) will have to be extracted from the TD's "id" field.
  2. With the list of known TDs the property values of each can be retrieved from the API.
  3. Use API call GET /things/<thing-id>/properties to retrieve all values. Store the values.
  4. Now interacting with the devices described in the TDs can be started.

Connect to the Driver Queue

Connect as user for the driver communication:

amqp.SetConnConfig("driver", connectionURL, cacert, clientCert, clientKey)

Next, the driver should create an AMQP queue for this drivers Driver Queue, which is the ECPs only means of pushing messages to the driver.

queueName = "ase.queue.driver" + name salting (i.e. random 12 character string & current time)
// e.g.: queueName = "ase.queue.driver.abcdefABCDEF.11215061"

amqp.SetupAMQPqueue(
            driverIdentifier, // identifier used in library code
            queueName,        // queue name
            false,            // durable
            true,             // delete when usused
            true,             // exclusive
            false,            // noWait
            nil,              // arguments
        )

Then, the driver can BIND this unique queue to the ECPs DRIVER EXCHANGE; specifying which keywords (routing keys) it should receive.

Bind to messages relevant to the driver, as indicated in section Exchange ase.exchange.driver.

For each message or thingId:

args["x-match"] = "all"

args["messageType"] = {message to process}
-or-
args["thingID"] = {thingID}

amqp.BindAMQPQueue("driver", queueName, "", "ase.exchange.driver", false, args)

Install a callback, with a reference to the message processing function. This callback will receive all messages specified by the previous binding steps.

amqp.SetupAMQPsubscriber("driver", queueName, callbackFunction)

with a callback function along the lines of:

processAMQPmessageDriver(msg) {

    thingID = msg.Headers["thingId"]

    switch msg.Headers["messageType"] {
        case "requestAction":
            // start running action for thing
        case "setProperty":
            // parse new property key/value in msg.Body
        case: "addThing":
            // process incoming TD and if matches criteria, store and connect
        case: "updateThing":
            // update thingID's model
        case: "deleteThing":
            // delete thing and stop processing
        ... etc.
    }
}

Connect to the Telemetry Queue

amqp.SetConnConfig("telemetry", connectionURL, cacert, clientCert, clientKey)

Start Publishing Data

For a given body of data in the format of a stringified json object:

{
    "thingID" : {thingID},      
    "timestamp" : {timestamp},  // epoch in seconds or microseconds
    "messageType": "propertyStatus",
    "data": { 
        "key":"value",
        "key2":"value2"
        } 
    }
}

Add the following telemetry AMQP headers and send the message to the ECPs telemetry exchange:

headers["publishTag"]  = "raw" 
headers["thingID"]  = "{thing-id}" 

amqp.PublishAMQP("telemetry", "ase.exchange.telemetry", "", body, headers, "", "")