0

MQTT Collector Overview (version 23)

Within MQTT, the MQTT Collector acts as a "subscriber", consuming data from an MQTT server (or broker) to which devices "publish" data to. The collector supports three payload types:

  1. SparkplugB (2.2)
  2. SparkplugB (3.0.0)
  3. JSON (utf-8, utf-16, us-ascii, utf-16BE, utf-32, utf-7)

As of version 23.0 Canary supports SparkplugB 3.0. For more information, please see SparkplugB 3.0 Compliance.

Configuration settings that apply to only SparkplugB appear in blue, while those that pertain to only JSON appear in orange.

Status

The 'Status' screen displays all configured 'Connection Groups' within the collector. Each connection group consists of at least one MQTT server the collector is connected to. Each connection group can then log the data to one or more historian servers.

  • NAME - the name of the Connection Group along with the MQTT server(s) it is connected to
  • ENABLED - True or False, whether the session is running and if the servers are online
  • CONNECTED - True or False, whether or not the collector is connected to the server(s)
  • SUBSCRIBED TAGS - the number of tags the connection group is subscribed to
  • LOGGING TAGS - the number of tags the connection group is currently logging
  • TVQ/SEC - the number of updates (TVQ = timestamp, value, quality) the collector is logging per second
  • ERRORS - the number of errors reported since the connection group was last enabled
  • RECONNECTS - the number of reconnects the collector has established with the MQTT server since the connection group was enabled
  • CONNECTED SINCE - the date and time of when the collector intially connected to the MQTT server

Configuration

The 'Configuration' screen allows the user to create a logging session and subscribe to multiple topics residing within an MQTT server. A connection group must be disabled on the 'Status' screen if wishing to modify it.

Configuration settings vary depending on the 'Payload Type': SparkplugB 2.2, Sparkplug3.0.0, or Other.

Connection Groups (SparkplugB)

A new Connection Group can be added by clicking the '+' button.

Name - the name of the connection group

MQTT Client ID - a unique name identifying the collector within the MQTT server. If the name is NOT unique, the last client connecting to the server with that ID wins and kicks the other client out. The refresh button next to the field generates a randomized client ID.

Payload Type - the type of payload being used across the connection group. Options include SparkplugB 2.2, SparkplugB 3.0.0, and Other for JSON or other payload types. Currently, JSON is the only option for Other.

Is Primary Application - (only visible with SparkplugB 2.2 payload) enables the collector to be the primary subscriber and publishes a STATE/<clientID> ONLINE message when connecting. If a publishing client has the ability to buffer data and is configured to identify the primary subscriber, data will only be published when the primary subscriber is connected. With SparkplugB 3.0.0, the publishing device dictates who the primary application is. (*Note: If the MQTT Collector is designated as the primary application and disconnects, no other client is receiving data.)

Request Device Births - sends NCMD (Node Command) messages to publishing devices when out of sequence. This should almost always be enabled as it prompts the publishing device(s) to send its tag list along with value updates whenever the collector connects to the server. Furthermore, it is imperative it is enabled if the publishing device(s) are using aliasing or templating. Otherwise, the collector will have no context when it connects to the server.

SparkplugB Reordering Queue Size - the number of messages the collector will buffer if the messages received are out of order. Only used if 'Request Device Births' is enabled. The max value is 255. Leave at 0 if unsure the MQTT broker sends messages out of order.

SparkplugB Log No Data On Deaths - stores a 'NoData' quality value in the historian for any node or device tags it receives a DEATH message for

Store No Data On Disconnect - stores a 'NoData' quality value for ALL tags when the collector is disabled or disconnects from the MQTT broker. Leave disabled if publishing devices are configured to buffer data when the collector is disconnected.

Connection Groups (JSON)

Name - the name of the connection group

MQTT Client ID - a unique name identifying the collector within the MQTT server. If the name is NOT unique, the last client connecting to the server with that ID wins and kicks the other client out. The refresh button next to the field generates a randomized client ID.

Payload Type - the type of payload being used across the connection group. Options include SparkplugB 2.2, SparkplugB 3.0.0, and Other for JSON or other payload types. Currently, JSON is the only option for Other.

LWT Message - last will and testament message to be published when the collector loses connection (i.e. "OFFLINE")

LWT On Connect Message - last will and testament message to be published upon connecting (i.e. "ONLINE")

LWT Topic - the MQTT topic for the 'LWT Message' and 'LWT On Connect Message'

LWT Retain - whether or not the 'LWT Message' and 'LWT On Connect Message' are retained messages

LWT QoS - the quality of service for the LWT messages

  • AT MOST ONCE (QoS = 0)
  • AT LEAST ONCE (QoS = 1)
  • EXACTLY ONCE (QoS = 2)

Store No Data On Disconnect - stores a 'NoData' quality value for ALL tags when the collector is disabled or disconnects from the MQTT broker. Leave disabled if publishing devices are configured to buffer data when the collector is disconnected.

 

Servers

The collector is able to connect to multiple servers simultaneously if high availability or redundancy is needed. To add multiple MQTT servers click the '+' icon next to 'Servers'.

URI - address of the MQTT server in the format tcp://<serverName>:<port>. Typically, the default port for an anonymous connection is 1883 and a secure connection is 8883.

Username/Password - credentials used to make a secure connection to the MQTT server

Store Configured Password in Plain Text - this should only be enabled when needing to integrate directly with the collector configuration database

Reconnect Interval - how frequently the collector tries to reconnect to the server after a disconnect

Keep Alive - (seconds) how frequently the server is notified that the session is still open

Clean Session - if true, the server creates a new session when the collector connects, discarding any previous session settings it may have saved

Enable SSL/TLS - the collector will attempt to connect securely (8883) expecting a server certificate

Self Signed Certificate - only visible if 'Enable SSL/TLS' is true, the collector will ignore any certificate chain warnings (not recommended for production environments). By default, Canary generates a self-signed certificate.

Client Certificate - if true, the collector presents the configured certificate within the 'Settings' screen to the MQTT server when connecting

 

Subscriptions

The collector can subscribe to multiple topics by clicking the '+' button next to 'Subscriptions'. The subscriptions apply to all servers within the connection group. As stated above, there are three messages types that are supported: SparkplugB 2.2, SparkplugB 3.0.0, and JSON. It is possible that one connection group can subscribe to different message types simultaneously.

The screenshot above depicts a SparkplugB subscription.

Message Type - SparkplugB

Topic - the MQTT topic the collector is subscribing to

By default, the collector is set to subscribe to ALL topics within the server denoted by spBv1.0/#. All SparkplugB topics must start with spBv1.0/. The '#' is a wildcard to include 0 or more levels. Similarly, a '+' can be used to include 1 level. Topics are defined in SparkplugB as follows:

spBv1.0/<GroupName>/<MessageType>/<NodeName>/<DeviceName>

The third level of the topic path, <MessageType>, refers to a specific command sent. Please ensure this is set to a ‘+’ so that the collector can receive all message types. (Ex: spBv1.0/Group/+/Node/#)

When subscribing to a topic, please ensure that you do NOT subscribe to the device level. The collector needs contextual information from the node level to ensure correct template processing.

  • Good example: spBv1.0/Group1/+/Node3/#
  • Bad example: spBv1.0/Group1/+/Node3/Device4/#

Note: Topics are case-sensitive. Group1 != group1.

QOS - the quality of service setting which guarantees the reliability of message delivery under different network environments. There may be unique circumstances where QoS 1 and 2 can be used with SparkplugB which allows the broker to buffer messages if the publishing device cannot, but in general, SparkplugB needs to operate on QoS = 0.

  • AT MOST ONCE (QoS = 0) - sometimes referred to as "Fire and Forget", this QOS level puts the least strain on system resources, but data may occasionally be lost. This is preferable for client - server connections that are stable.
  • AT LEAST ONCE (QoS = 1) - Guarantees that a message is delivered at least one time to the receiver. With this QOS level more strain is put on system resources than QOS 0 but not as much as QOS 2. The downside is that duplicate messages may be delivered if no confirmation response is received from the MQTT Collector.
  • EXACTLY ONCE (QoS = 2) - Guarantees that a message is delivered exactly once, but puts the most strain on system resources.

Enabled - if enabled, the collector will subscribe to this topic

Dataset Prefix - the dataset the data will be logged to in the historian. If the dataset does not already exist, it will be created when logging begins.

The dataset name is locked in once logging begins. If wishing to change the dataset prefix at a later time, the session must be disabled and the 'Tags' table deleted. When the session resumes, tags will be logged to the newly defined dataset prefix.

Tags Per Dataset - the max number of tags that will be logged into the dataset. If the max number is exceeded, a new dataset will be created and incremented by "1". (i.e. MQTTData, MQTTData2, MQTTData3, ...)

Remove Tag Name Prefix (# of Branches) - the collector will remove this number of branches from the beginning of the auto-generated tag name before logging to the historian. If set to 0, the full tag path will include the Group, Node, Device, and tag name.

Automatically Log All Tags - if true, any subscribed tags will automatically be logged into the historian. If false, subscribed tags will be discovered, but then need to be manually enabled in the 'Tags' table.

Sync Historian Tags - automatically removes tags from the historian based on the messages the collector receives from the broker. If collecting JSON data, each message MUST contain the full list of tag names.

 

The screenshot above depicts a JSON subscription using the KepwareIOT message structure.

Message Type - JSON

Topic - similar to SparkplugB, '#' and '+' can be used to to subscribe to '0 or more levels' or '1 level' respectively. Unlike SparkplugB though, the topic is not confined to 5 distinct levels. A topic could have any number of levels and does not include a message type. (Ex: Region4/Area1/Facility1/Wellpad5/Well23/Pump2/#)

Note: Topics are case-sensitive. Group1 != group1.

Message Encoding - encoding to use when deserializing the JSON message from bytes

  • utf-8
  • utf-16
  • us-ascii
  • utf-16BE
  • utf-32
  • utf-7

Message Parser - parser to be used when processing the JSON message. There are two options: KepwareIOT or Script.

  1. KepwareIOT - the collector expects the following structure in order to be parsed correctly. (An export of this structure is also attached at the top of this article - KepwareIOT JSON Payload.txt.)
    {
        "timestamp": 1640188770477,
        "values": [
            {
                "id": "Simulation Examples.Functions.Ramp1",
                "v": 51,
                "q": true,
                "t": 1640188770305
            },
            {
                "id": "Simulation Examples.Functions.Ramp2",
                "v": 180,
                "q": true,
                "t": 1640188770305
            },
            {
                "id": "Simulation Examples.Functions.Ramp3",
                "v": 104,
                "q": true,
                "t": 1640188770305
            },
            {
                "id": "Simulation Examples.Functions.Ramp4",
                "v": 940,
                "q": true,
                "t": 1640188770305
            }
        ]
    }
     

        Tag Field - name of the JSON field that contains the tag name
        Tag Path Delimiter - the symbol used to indicate a new branch in
        the tag path. Canary will replace the tag path delimiter with a
        period in the historian. Leave blank if no tag path delimiter exists in
        the tag name.
        Timestamp Type - the timestamp used in the JSON payload. If no
        timestamp is contained within the message, select
        'CollectorTimestamped' from the drop-down list and a timestamp
        will be applied at the time the value is received.
        Timestamp Field - name of the JSON field that contains the
        timestamp
        Custom DateTime Format - if 'CustomDateTimeString' is selected
        for the Timestamp type, the user will be able to input the custom
        datetime. (Ex: DD-MM-yyyy HH:mm:ss)
        Custom Format in UTC - check to enable if the custom datetime is
        in UTC
        Value Field - name of the JSON field that contains the value
        Quality Field - name of the JSON field that contains the quality. If
        left blank, the collector assumes the value is Good.

  2. Script - allows the user to parse a custom JSON payload using JavaScript. The U/I is not intended to be a text editor. It is advised to create the script outside of the U/I then paste it into the admin.

    Script - the JavaScript used to parse the JSON structure. (An example of a script used in the screenshot above is attached at the top of this article - MQTTScriptTest.txt. This example will parse the JSON structure found in the KepwareIOT JSON Payload.txt file. Also attached is the MQTT JSON JavaScript Parsing for Publisher.txt file. This can be used if wishing to parse the JSON message the Canary Publisher service exports when using the JSONMQTT option.)

    There are 2 variables that the collector passes to the JavaScript function. The first is message which contains the json message contents. The second is publishedTopic which contains the topic on which the message was received by the collector. Both of these variables can be used in the JavaScript function for building the structure to return to the collector for logging into the historian.

    Sample Topic - the topic that the sample message is published on
    Sample Message - the example JSON message to be processed by the given script. (The screenshot above uses the sample message from the KepwareIOT JSON Payload.txt file attached at the top of this article.)
    Test Results - displays a message stating if the returned JSON payload has the expected structure and is successful or not
    Returned Message - the message that is returned after the example payload has been processed by the script. Below is the structure the collector expects the script to return:

    {
          "data": [
          {
                "tagName": "mytag",
                "values": [
                      {
                            "datetime": "2022-04-11 09:45:00-0500",
                            "value": 23.3435,
                            "stringQuality": "Good",
                            "numericQuality": 192
                      },
                      {
                            "datetime": "2022-04-12 09:45:00-0500",
                            "value": 25.3435
                      }
                ],
                "properties": [
                      {
                            "propertyName": "EngUnit",
                            "records": [
                                  {
                                        "datetime": "2022-04-12 09:45:00-0500",
                                        "value": "psi",
                                        "stringQuality": "Good",
                                        "numericQuality": 192
                                  }
                            ]
                      },
                      {
                            "propertyName": "Business Unit",
                            "records": [
                                  {
                                        "datetime": "2022-04-12 09:45:00-0500",
                                        "value": "20"
                                  }
                            ]
                      }
                ],
                "annotations": [
                      {
                            "user": "jwolf",
                            "annotationTime": "2018-01-09T12:00:00.0000000-05:00",
                            "annotation": "This is a basic note regarding something that happened at this time",
                            "entryTime": "2018-01-09T12:00:00.0000000-05:00" // can be optional
                      },
                      {
                            "user": "grover",
                            "annotationTime": "2019-01-09T12:00:00.0000000-05:00",
                            "annotation": "Gail\'s note",
                      }
                ]
          }
    }

    Annotations and properties are optional. String or (numeric) qualities can be passed through. Valid string qualities are "Good" (192), "Bad" (0), and "NoData" (32768).

    For more information on the supported JavaScript features and the library used for parsing, visit https://github.com/sebastienros/jint.

Sender Sessions

Once subscribed to a topic(s), data can be sent to a historian (or proxy) server. If wishing to send duplicate streams of data to more than one historian, click the '+' button next to 'Sender Sessions'.

Historian - the host name or IP address of the destination historian (or proxy) server

Enabled - if enabled, data will be sent

External Property Storage - if true, properties will be stored externally from the Historian. Further configuration is needed if wishing to store properties in an external database. See How to Configure External Property Storage.

 

Filters

Filters can be applied to automatically include or exclude tags to be logged. Note, filters do NOT prevent the tag from appearing on the 'Tags' table, they simply override the 'Automatically Log All Tags' setting in the 'Subscriptions' screen. Inclusive rules will set tags to be enabled. Exclusive rules will disable the tags from logging. Inclusion rules are applied first. If any inclusive filters are created, then none of the tags from the topic are included by default. If no inclusive filters are created, then all of the tags from the topic are logged by default. Exclusive filters are applied last. If tags match any of the exclusive rules, they are excluded from being logged (even if an inclusive rule was matched).

Filters use RegEx (Regular Expressions) rules to match string values that will be found within the tag path. These rules are applied after branches have been removed using the 'Remove Tag Name Prefix (# of Branches)' setting in the 'Subscriptions' screen. If no branches are removed the tag path includes all topic branches along with the tag name. For example, the tag name ‘Compressor1/Tank/Pressure/PSI’ published on the SparkplugB topic ‘spBv1.0/Texas/DBIRTH/West/PLC1’ would be logged as 'Texas.West.PLC1.Compressor1.Tank.Pressure.PSI'.

 

Tag Properties

The 'Tag Properties' screen allows the user to add static properties to all tags within the subscription group or translate an existing property that is being published by the device. Select the '+' icon to add a static or translated property.

Property Type - StaticValue or NameTranslation

  • StaticValue - a static property applied to all tags in the subscription group
    • Name - name of the property to assign to tags
    • Value - the value of the static property
  • NameTranslation - translates the existing property name to a new name to be stored in the historian
    • Name - the new name of the property as it currently exists in the MQTT message
    • Translated Name - the new name the property will be translated to

 

Tags

The 'Tags' table consists of all the tags the collector is subscribed to for that particular subscription group. This list is not populated until the connection group is enabled from the 'Status' screen. Not all subscribed tags are necessarily logged. Only those which are 'Enabled'. If tags are NOT set to be automatically logged from the 'Subscriptions' menu, they can be manually enabled from this list. If 'Filters' are being used, any tags matching on inclusive rules will be enabled. Any tags matching on exclusive rules will be disabled.

DELETE ALL - removes all tags from the list or the tags that are selected. Multiple tags can be selected using Ctrl+Click or Shift+Click.

DISABLE ALL - disables all tags in the list or the tags that are selected. Multiple tags can be selected using Ctrl+Click or Shift+Click.

ENABLE ALL - enables all tags in the list or the tags that are selected. Multiple tags can be selected using Ctrl+Click or Shift+Click.

FILTER - allows the user to filter the list based upon the Published Topic, Published Name, Canary Dataset, whether or not the tag is enabled, the Initial Birth, or the Latest Birth.

Note: This is filter is separate from the 'Filters' section mentioned above. The 'Filters' from above are generated prior to the tag list being generated.

 

Settings

The 'Settings' screen allows the user to configure the certificate used to make a secure connection to the MQTT server. To enable a secure connection, the user will need to ensure 'Enable SSL/TLS' is checked from the 'Configuration' screen.

  

Store Name - the file location of the client certificate within the machine's certificate store

Find Type - criteria by which the Collector searches for the certificate

  • FindBySubjectName
  • FindByThumbprint
  • FindByTemplateName

Subject Name - the value of the Subject Name or Thumbprint

8 replies

null
    • vikan
    • 11 mths ago
    • Reported - view

    Do you have an example of connecting to the AWS IoT Core? In particular, the way to upload the Certificate, CA1, and Private key to the Windows certificate store

      • smason
      • 7 mths ago
      • Reported - view

      vikan We would recommend using open SSL to combine the public and private key cert files into a single file.

    • kullmann
    • 7 mths ago
    • Reported - view

    Is it possible to log to Message from within the javascript?
    How to debug incoming messages when the topic is unknown?

      • smason
      • 7 mths ago
      • Reported - view

      kullmann The intention of the javascript function is to convert an incoming json message to the predefined JSON message that Canary can ingest. The javascript function can be as simple or as complicated as you would like. You can do whatever you need to do in the javascript function from a conversion standpoint using the topic and message. 

      • kullmann
      • 7 mths ago
      • Reported - view

      Steve Mason Thats clear :-)
      But if message is containing rubbish, do i have a possibility to report that into the messagetile, like log (debug, "Myscript: MQTT message was misaligned!")

      • smason
      • 7 mths ago
      • Reported - view

      kullmann If the script is designed to throw an error then some of those error details would most likely appear in the message log. Similarly, if there's an issue with script executing, you would see:

      Script used to process messages threw an error. <exception message details>
      

      If the script works but we encounter an error with the JSON structure you would see:

      We failed to validate the script result against the expected JSON structure. <exception message details>
      
    • aris_macaballug
    • 2 mths ago
    • Reported - view

    How to connect the hivemq broker to canary if the MQTT message itself is not in SparkplugB format but in a MQTT JSON format?

      • smason
      • 2 mths ago
      • Reported - view

      You would need to use the JSON message type setting at the top of the Subscription window then choose the Script option for Message Parser. You would need to create a javascript which tells the Collector how to parse the JSON messages that come through. There are two examples of javascript attached at the top of this article.