0

MQTT Collector Tile (version 24)

Within MQTT, the MQTT Collector acts as a "subscriber", consuming data from an MQTT server (or broker) to which devices "publish" data to. The collector supports three payload types:

  1. SparkplugB (2.2)
  2. SparkplugB (3.0.0)
  3. JSON (utf-8, utf-16, us-ascii, utf-16BE, utf-32, utf-7)

As of version 23.0 Canary supports SparkplugB 3.0. For more information, please see SparkplugB 3.0 Compliance.

Configuration settings that apply to only SparkplugB appear in blue, while those that pertain to only JSON appear in orange.

Status

The 'Status' screen displays all configured 'Connection Groups' within the Collector. Each connection group consists of at least one MQTT server the Collector is connected to. Each connection group can then log the data to one or more Historian servers.

  • NAME - name of the Connection Group along with the MQTT server(s) it is connected to
  • ENABLED - True or False, whether the session is running and if the servers are online
  • CONNECTED - True or False, whether or not the collector is connected to the server(s)
  • SUBSCRIBED TAGS - number of tags the connection group is subscribed to
  • LOGGING TAGS - number of tags the connection group is currently logging
  • TVQ/SEC - rate of updates or TVQs per second (timestamp, value, quality) the Collector is logging
  • ERRORS - number of errors reported since the connection group was last enabled
  • RECONNECTS - number of reconnects the Collector has established with the MQTT server since the connection group was enabled
  • CONNECTED SINCE - datetime of when the Collector initially connected to the MQTT server

Configuration

The 'Configuration' screen allows the user to create a logging session and subscribe to multiple topics residing within an MQTT server. A connection group must be disabled on the 'Status' screen if wishing to modify it.

Configuration settings vary depending on the 'Payload Type': SparkplugB 2.2, Sparkplug3.0.0, or Other.

Connection Groups (SparkplugB)

A new Connection Group can be added by clicking the '+' button.

Name - name of the connection group

MQTT Client ID - a unique name identifying the Collector within the MQTT server. If the name is NOT unique, the last client connecting to the server with that ID wins and kicks the other client out. The refresh button next to the field generates a randomized client ID.

Payload Type - type of payload being used across the connection group. Options include SparkplugB 2.2, SparkplugB 3.0.0, and Other for JSON or other payload types. Currently, JSON is the only option for Other.

Is Primary Application - (only visible with SparkplugB 2.2 payload) enables the collector to be the primary subscriber and publishes a STATE/<clientID> ONLINE message when connecting. If a publishing client has the ability to buffer data and is configured to identify the primary subscriber, data will only be published when the primary subscriber is connected. With SparkplugB 3.0.0, the publishing device dictates who the primary application is. (*Note: If the MQTT Collector is designated as the primary application and disconnects, no other client will receive data unless the Collector is connected.)

Request Device Births - sends NCMD (Node Command) messages to publishing devices when out of sequence. This should almost always be enabled as it prompts the publishing device(s) to send its tag list along with value updates whenever the Collector connects to the server. Furthermore, it is imperative it is enabled if the publishing device(s) are using aliasing or templating; otherwise, the Collector will have no context of the data being published when it connects to the server.

SparkplugB Reordering Queue Size - number of messages the Collector will buffer if the messages received are out of order. Only used if 'Request Device Births' is enabled. The max value is 255. Leave at 0 if unsure the MQTT broker sends messages out of order.

SparkplugB Log No Data On Deaths - stores a 'NoData' quality value in the Historian for any node or device tags it receives a DEATH message for

Store No Data On Disconnect - stores a 'NoData' quality value for ALL tags when the Collector is disabled or disconnects from the MQTT broker. Leave disabled if publishing devices are configured to buffer data when the Collector is disconnected.

Connection Groups (JSON)

Name - name of the connection group

MQTT Client ID - a unique name identifying the Collector within the MQTT server. If the name is NOT unique, the last client connecting to the server with that ID wins and kicks the other client out. The refresh button next to the field generates a randomized client ID.

Payload Type - type of payload being used across the connection group. Options include SparkplugB 2.2, SparkplugB 3.0.0, and Other for JSON or other payload types. Currently, JSON is the only option for Other.

LWT Message - last will and testament message to be published when the Collector loses connection (e.g. "OFFLINE")

LWT On Connect Message - last will and testament message to be published upon connecting (e.g. "ONLINE")

LWT Topic - the MQTT topic for the 'LWT Message' and 'LWT On Connect Message'

LWT Retain - whether or not the 'LWT Message' and 'LWT On Connect Message' are retained messages

LWT QoS - the quality of service for the LWT messages

  • AT MOST ONCE (QoS = 0)
  • AT LEAST ONCE (QoS = 1)
  • EXACTLY ONCE (QoS = 2)

Store No Data On Disconnect - stores a 'NoData' quality value for ALL tags when the Collector is disabled or disconnects from the MQTT broker. Leave disabled if publishing devices are configured to buffer data when the Collector is disconnected.

 

Servers

The collector is able to connect to multiple servers simultaneously if high availability or redundancy is needed. To add multiple MQTT servers click the '+' icon next to 'Servers'.

URI - address of the MQTT server in the format tcp://<serverName>:<port>. Typically, the default port for an anonymous connection is 1883 and a secure connection is 8883.

Username/Password - credentials used to make a secure connection to the MQTT server

Store Configured Password in Plain Text - this should only be enabled when needing to integrate directly with the Collector configuration database

Reconnect Interval - how frequently the Collector tries to reconnect to the server after a disconnect

Keep Alive - (seconds) how frequently the server is notified that the session is still open

Clean Session - if true, the server creates a new session when the Collector connects, discarding any previous session settings it may have saved

Enable SSL/TLS - the Collector will attempt to connect securely (8883) expecting a server certificate

Self Signed Certificate - only visible if 'Enable SSL/TLS' is true, the Collector will ignore any certificate chain warnings (not recommended for production environments). By default, Canary generates a self-signed certificate.

Client Certificate - if true, the Collector presents the configured certificate within the 'Settings' screen to the MQTT server when connecting

 

Subscriptions

The Collector can subscribe to multiple topics by clicking the '+' button next to 'Subscriptions'. The subscriptions apply to all servers within the connection group. As stated above, there are three messages types that are supported: SparkplugB 2.2, SparkplugB 3.0.0, and JSON. It is possible that one connection group can subscribe to different message types simultaneously.

The screenshot above depicts a SparkplugB subscription.

Message Type - SparkplugB

Topic - the MQTT topic the Collector is subscribing to

By default, the collector is set to subscribe to ALL topics within the server denoted by spBv1.0/#. All SparkplugB topics must start with spBv1.0/. The '#' is a wildcard to include 0 or more levels. Similarly, a '+' can be used to include 1 level. Topics are defined in SparkplugB as follows:

spBv1.0/<GroupName>/<MessageType>/<NodeName>/<DeviceName>

The third level of the topic path, <MessageType>, refers to a specific command sent. Please ensure this is set to a ‘+’ so that the collector can receive all message types. (Ex: spBv1.0/Group/+/Node/#)

When subscribing to a topic, please ensure that you do NOT subscribe to the device level. The collector needs contextual information from the node level to ensure correct template processing.

  • Good example: spBv1.0/Group1/+/Node3/#
  • Bad example: spBv1.0/Group1/+/Node3/Device4/#

Note: Topics are case-sensitive. Group1 != group1.

QOS - the quality of service setting which guarantees the reliability of message delivery under different network environments. There may be unique circumstances where QoS 1 and 2 can be used with SparkplugB which allows the broker to buffer messages if the publishing device cannot, but in general, SparkplugB needs to operate on QoS = 0.

  • AT MOST ONCE (QoS = 0) - sometimes referred to as "Fire and Forget", this QOS level puts the least strain on system resources, but data may occasionally be lost. This is preferable for client - server connections that are stable.
  • AT LEAST ONCE (QoS = 1) - Guarantees that a message is delivered at least one time to the receiver. With this QOS level more strain is put on system resources than QOS 0 but not as much as QOS 2. The downside is that duplicate messages may be delivered if no confirmation response is received from the MQTT Collector.
  • EXACTLY ONCE (QoS = 2) - Guarantees that a message is delivered exactly once, but puts the most strain on system resources.

Enabled - if enabled, the Collector will subscribe to the defined topic

Dataset Prefix - DataSet the data will be logged to in the Historian. If the DataDet does not already exist, it will be created when logging begins.

The DataSet name is locked in once logging begins. If wishing to change the DataSet prefix at a later time, the session must be disabled and the 'Tags' table deleted. When the session resumes, tags will be logged to the newly defined DataSet prefix.

Tags Per Dataset - max number of tags that will be logged into the DataSet. If this number is exceeded, a new DataSet will be created and incremented by "1". (e.g. MQTTData, MQTTData2, MQTTData3, ...)

Remove Tag Name Prefix (# of Branches) - the Collector will remove this number of branches from the beginning of the auto-generated tag name before logging to the Historian. If set to 0, the full tag path will include the Group, Node, Device, and tag name.

Automatically Log All Tags - if true, any subscribed tags will automatically be logged into the Historian. If false, subscribed tags will be discovered, but need  manually enabled in the 'Tags' table.

Sync Historian Tags - automatically removes tags from the Historian based on the messages the Collector receives from the broker. If collecting JSON data, each message MUST contain the full list of tag names.

 

The screenshot above depicts a JSON subscription using the KepwareIOT message structure.

Message Type - JSON

Topic - similar to SparkplugB, '#' and '+' can be used to to subscribe to '0 or more levels' or '1 level' respectively. Unlike SparkplugB though, the topic is not confined to 5 distinct levels. A topic could have any number of levels and does not include a message type. (Ex: Region4/Area1/Facility1/Wellpad5/Well23/Pump2/#)

Note: Topics are case-sensitive. Group1 != group1.

Message Encoding - encoding to use when deserializing the JSON message from bytes

  • utf-8
  • utf-16
  • us-ascii
  • utf-16BE
  • utf-32
  • utf-7

Message Parser - parser to be used when processing the JSON message. There are two options: KepwareIOT or Script.

  1. KepwareIOT - the Collector expects the following structure in order to be parsed correctly. (An export of this structure is also attached at the top of this article - KepwareIOT JSON Payload.txt.)
    {
        "timestamp": 1640188770477,
        "values": [
            {
                "id": "Simulation Examples.Functions.Ramp1",
                "v": 51,
                "q": true,
                "t": 1640188770305
            },
            {
                "id": "Simulation Examples.Functions.Ramp2",
                "v": 180,
                "q": true,
                "t": 1640188770305
            },
            {
                "id": "Simulation Examples.Functions.Ramp3",
                "v": 104,
                "q": true,
                "t": 1640188770305
            },
            {
                "id": "Simulation Examples.Functions.Ramp4",
                "v": 940,
                "q": true,
                "t": 1640188770305
            }
        ]
    }
    • Tag Field - name of the JSON field that contains the tag name
    • Tag Path Delimiter - symbol used to indicate a new branch in the tag path. Canary will replace the tag path delimiter with a period in the Historian. Leave blank if no tag path delimiter exists in the tag name.
    • Timestamp Type - timestamp used in the JSON payload. If no timestamp is contained within the message, select 'CollectorTimestamped' from the drop-down list and a timestamp will be applied at the time the value is received.
      • Custom DateTime Format - if 'CustomDateTimeString' is selected for the Timestamp Type, the user will be able to input the custom datetime. (Ex: DD-MM-yyyy HH:mm:ss)
      • Custom Format in UTC - check to enable if the custom datetime is in UTC
    • Timestamp Field - name of the JSON field that contains the timestamp
    • Value Field - name of the JSON field that contains the value
    • Quality Field - name of the JSON field that contains the quality. If left blank, the Collector assumes the quality is Good.
  2. Script - allows the user to parse a custom JSON payload using JavaScript. The U/I is not intended to be a text editor. It is advised to create the script outside of the U/I then paste it into the admin.

    • Script - the JavaScript used to parse the JSON structure. (An example of a script used in the screenshot above is attached at the top of this article - MQTTScriptTest.txt. This example will parse the JSON structure found in the KepwareIOT JSON Payload.txt file. Also attached is the MQTT JSON JavaScript Parsing for Publisher.txt file. This can be used if wishing to parse the JSON message the Canary Publisher service exports when using the JSONMQTT option.)

      There are 2 variables that the collector passes to the JavaScript function. The first is message which contains the JSON message contents. The second is publishedTopic which contains the topic on which the message was received by the Collector. Both of these variables can be used in the JavaScript function for building the structure to return to the Collector for logging to the Historian.
    • Sample Topic - the topic that the sample message is published on
    • Sample Message - the example JSON message to be processed by the given script. (The screenshot above uses the sample message from the KepwareIOT JSON Payload.txt file attached at the top of this article.)
    • Test Results - displays a message stating if the returned JSON payload has the expected structure and is successful or not
    • Returned Message - the message that is returned after the example payload has been processed by the script. Below is the structure the collector expects the script to return:
      {
            "data": [
            {
                  "tagName": "mytag",
                  "values": [
                        {
                              "datetime": "2022-04-11 09:45:00-0500",
                              "value": 23.3435,
                              "stringQuality": "Good",
                              "numericQuality": 192
                        },
                        {
                              "datetime": "2022-04-12 09:45:00-0500",
                              "value": 25.3435
                        }
                  ],
                  "properties": [
                        {
                              "propertyName": "EngUnit",
                              "records": [
                                    {
                                          "datetime": "2022-04-12 09:45:00-0500",
                                          "value": "psi",
                                          "stringQuality": "Good",
                                          "numericQuality": 192
                                    }
                              ]
                        },
                        {
                              "propertyName": "Business Unit",
                              "records": [
                                    {
                                          "datetime": "2022-04-12 09:45:00-0500",
                                          "value": "20"
                                    }
                              ]
                        }
                  ],
                  "annotations": [
                        {
                              "user": "jwolf",
                              "annotationTime": "2018-01-09T12:00:00.0000000-05:00",
                              "annotation": "This is a basic note regarding something that happened at this time",
                              "entryTime": "2018-01-09T12:00:00.0000000-05:00" // can be optional
                        },
                        {
                              "user": "grover",
                              "annotationTime": "2019-01-09T12:00:00.0000000-05:00",
                              "annotation": "Gail\'s note",
                        }
                  ]
            }
      }

      Annotations and properties are optional. String or (numeric) qualities can be passed through. Valid string qualities are "Good" (192), "Bad" (0), and "NoData" (32768).

      For more information on the supported JavaScript features and the library used for parsing, visit https://github.com/sebastienros/jint.

  3. CanaryJsonFormat - the collector assumes the JSON payload is already in the expected format shown above where no script is needed to parse it.

SaF Sessions

Once subscribed to a topic(s), data can be sent to a destination (Historian or proxy server). If wishing to send duplicate streams of data to more than one destination, click the '+' button next to 'SaF Sessions'.

Historian - host name or IP address of the destination Historian (or proxy) server

Enabled - if enabled, data will be sent

External Property Storage - if true, properties will be stored in an external SQLite database apart from the Historian. Further configuration is needed if wishing to store properties in an external database. See How to Configure External Property Storage.

Filters

Filters can be applied to automatically include or exclude tags to be logged. Note, filters do NOT prevent the tag from appearing on the 'Tags' table, they simply override the 'Automatically Log All Tags' setting in the 'Subscriptions' screen. Inclusive rules will set tags to be enabled. Exclusive rules will disable the tags from logging. Inclusion rules are applied first. If any inclusive filters are created, then none of the tags from the topic are included by default. If no inclusive filters are created, then all of the tags from the topic are logged by default. Exclusive filters are applied last. If tags match any of the exclusive rules, they are excluded from being logged (even if an inclusive rule was matched).

Filters use RegEx (Regular Expressions) rules to match string values that will be found within the tag path. These rules are applied after branches have been removed using the 'Remove Tag Name Prefix (# of Branches)' setting in the 'Subscriptions' screen. If no branches are removed the tag path includes all topic branches along with the tag name. For example, the tag name ‘Compressor1/Tank/Pressure/PSI’ published on the SparkplugB topic ‘spBv1.0/Texas/DBIRTH/West/PLC1’ would be logged as 'Texas.West.PLC1.Compressor1.Tank.Pressure.PSI'.

Tag Properties

The 'Tag Properties' screen allows the user to add static properties to all tags within the subscription group or translate an existing property that is being published by the device. Select the '+' icon to add a static or translated property.

Property Type - StaticValue or NameTranslation

  • StaticValue - a static property applied to all tags in the subscription group
    • Name - name of the property to assign to tags
    • Value - value of the static property
  • NameTranslation - translates the existing property name to a new name to be stored in the Historian
    • Name - name of the property as it currently exists in the MQTT message
    • Translated Name - new name the property will be translated to

Tags

The 'Tags' table consists of all the tags the Collector is subscribed to for a particular subscription group. This list is not populated until the connection group is enabled from the 'Status' screen. Not all subscribed tags are necessarily logged. Only those which are 'Enabled'. If tags are NOT set to be automatically logged from the 'Subscriptions' menu, they can be manually enabled from this list. If 'Filters' are being used, any tags matching on inclusive rules will be enabled. Any tags matching on exclusive rules will be disabled.

DELETE ALL - removes all tags from the list or the tags that are selected. Multiple tags can be selected using Ctrl+Click or Shift+Click.

DISABLE ALL - disables all tags in the list or the tags that are selected. Multiple tags can be selected using Ctrl+Click or Shift+Click.

ENABLE ALL - enables all tags in the list or the tags that are selected. Multiple tags can be selected using Ctrl+Click or Shift+Click.

FILTER - allows the user to filter the list based upon the Published Topic, Published Name, Canary Dataset, whether or not the tag is enabled, the Initial Birth, or the Latest Birth.

Note: This is filter is separate from the 'Filters' section mentioned above. The 'Filters' from above are generated prior to the tag list being generated.

Settings

Endpoints

  • gRPC Admin (55330) - endpoint used specifically by the local Admin service for configuring settings and requesting status information

Settings

CERTIFICATE (CLIENT)

The certificate is used to make a secure connection to the MQTT server. By default, Canary generates a self-signed certificate. This can be reconfigured if wishing to use a different certificate that has been installed on the server. Clicking the INFO button will open a new window displaying the certificate's details.

To enable a secure connection, the user will need to ensure 'Enable SSL/TLS' is checked from the 'Configuration' screen.

  • Kind - the kind of certificate the Collector is configured to use
    • SelfSignedCertificate - this is the default certificate that Canary uses. It is stored in the Local Computer\Personal store.
    • Certificate - use this option if wishing to upload your own certificate. Doing so will present more options to configure.

      • Store Name - the local computer store name where the certificate is installed
      • Find Type - criteria by which the Collector searches for the certificate
        • FindBySubjectName
        • FindByThumbprint
        • FindByTemplateName
  • Subject Name - set to '[serverName]' if using the SelfSignedCertificate option. Otherwise, the value of the Subject, Thumbprint, or Template name if using the Certificate setting.

Reply

null