0

MQTT Collector Overview (version 22)

Within MQTT, the MQTT Collector acts as a "subscriber", consuming data from an MQTT server (or broker) to which devices "publish" data to. Currently, the collector supports two message types:

  1. SparkplugB (3.1.1)
  2. JSON (utf-8, utf-16, us-ascii, utf-16BE, utf-32, utf-7)

The MQTT Collector for SparkplugB supports both GZIP and Deflate compression.

Status

The 'Status' screen displays all configured 'Connection Groups' within the collector. Each connection group consists of at least one MQTT server the collector is connected to. Each connection group can then log the data to one or more historian servers.

  • NAME - the name of the Connection Group along with the MQTT server(s) it is connected to
  • ENABLED - True or False, whether the session is running and if the servers are online
  • CONNECTED - True or False, whether or not the collector is connected to the server(s)
  • SUBSCRIBED TAGS - the number of tags the connection group is subscribed to
  • LOGGING TAGS - the number of tags the connection group is currently logging
  • TVQ/SEC - the number of updates (TVQ = timestamp, value, quality) the collector is logging per second
  • ERRORS - the number of errors reported since the connection group was last enabled
  • RECONNECTS - the number of reconnects the collector has established with the MQTT server since the connection group was enabled
  • CONNECTED SINCE - the date and time of when the collector intially connected to the MQTT server

Configuration

The 'Configuration' screen allows the user to subscribe to multiple topics residing within an MQTT server. The collector is able to connect to multiple servers simultaneously if high availability or redundancy is needed. A connection group must be disabled on the 'Status' screen if wishing to modify it.

Some configuration settings only apply to SparkplugB, while others only apply to JSON.

Connection Groups

A new Connection Group can be added by clicking the '+' button.

Name - the name of the connection group

MQTT Client ID - a unique name identifying the collector within the MQTT server. If the name is NOT unique, the last client connecting to the server with that ID wins and kicks the other client out. The refresh button next to the field generates a randomized client ID.

SparkplugB Primary Application - configures the collector to be the primary subscriber and publishes a STATE/<clientID> ONLINE message when connecting. If a publishing client has the ability to buffer data and is configured to identify the primary subscriber, data will only be published when the primary subscriber is connected. (*Note: If the MQTT Collector is designated as the primary application and disconnects, no other client is receiving data.)

SparkplugB Request Device Births - sends NCMD (Node Command) messages to publishing devices when out of sequence. This should almost always be enabled as it prompts the publishing device(s) to send its tag list along with value updates whenever the collector connects to the server. Furthermore, it is imperative it is enabled if the publishing device(s) are using aliasing or templating. Otherwise, the collector will have no context when it connects to the server.

LWT Message - last will and testament message to be published when the collector loses connection (i.e. "OFFLINE")

LWT On Connect Message - last will and testament message to be published upon connecting (i.e. "ONLINE")

LWT Topic - the MQTT topic for the 'LWT Message' and 'LWT On Connect Message'

LWT Retain - whether or not the 'LWT Message' and 'LWT On Connect Message' are retained messages

LWT QoS - the quality of service for the LWT messages

  • AT MOST ONCE (QoS = 0)
  • AT LEAST ONCE (QoS = 1)
  • EXACTLY ONCE (QoS = 2)

 

Servers

 

Multiple MQTT servers can be added to the Connection group by clicking the '+' button next to 'Servers' if wishing to support a high availability or redundant infrastructure.

URI - address of the MQTT server in the format tcp://<serverName>:<port>. Typically, the default port for an anonymous connection is 1883 and a secure connection is 8883.

Username/Password - credentials used to make a secure connection to the MQTT server

Store Configured Password in Plain Text - this should only be enabled when needing to integrate directly with the collector configuration database

Reconnect Interval - how frequently the collector tries to reconnect to the server after a disconnect

Keep Alive - (seconds) how frequently the server is notified that the session is still open

Clean Session - if true, the server creates a new session when the collector connects, discarding any previous session settings it may have saved

Enable SSL/TLS - the collector will attempt to connect securely (8883) expecting a server certificate

Self Signed Certificate - only visible if 'Enable SSL/TLS' is true, the collector will ignore any certificate chain warnings (not recommended for production environments). By default, Canary generates a self-signed certificate.

Client Certificate - if true, the collector presents the configured certificate within the 'Settings' screen to the MQTT server when connecting

 

Subscriptions

The collector can subscribe to multiple topics by clicking the '+' button next to 'Subscriptions'. The subscriptions apply to all servers within the connection group. As stated above, there are two messages types that are supported: SparkplugB and JSON. It is possible that one connection group can subscribe to both message types simultaneously.

Message Type - SparkplugB

Topic - the MQTT topic the collector is subscribing to

By default, the collector is set to subscribe to ALL topics within the server denoted by spBv1.0/#. All SparkplugB topics must start with spBv1.0/. The '#' is a wildcard to include 0 or more levels. Similarly, a '+' can be used to include 1 level. Topics are defined in sparkplugB as follows:

spBv1.0/<GroupName>/<MessageType>/<NodeName>/<DeviceName>

The third level of the topic path, <MessageType>, refers to a specific command sent. Please ensure this is set to a ‘+’ so that the collector can receive all message types. (Ex: spBv1.0/Group/+/Node/#)

When subscribing to a topic, please ensure that you do NOT subscribe to the device level. The collector needs contextual information from the node level to ensure correct template processing.

  • Good example: spBv1.0/Group1/+/Node3/#
  • Bad example: spBv1.0/Group1/+/Node3/Device4/#

Note: Topics are case-sensitive. Group1 != group1.

QOS - the quality of service setting which guarantees the reliability of message delivery under different network environments

  • AT MOST ONCE (QoS = 0) - sometimes referred to as "Fire and Forget", this QOS level puts the least strain on system resources, but data may occasionally be lost. This is preferable for client - server connections that are stable.
  • AT LEAST ONCE (QoS = 1) - Guarantees that a message is delivered at least one time to the receiver. With this QOS level more strain is put on system resources than QOS 0 but not as much as QOS 2. The downside is that duplicate messages may be delivered if no confirmation response is received from the MQTT Collector.
  • EXACTLY ONCE (QoS = 2) - Guarantees that a message is delivered exactly once, but puts the most strain on system resources.

Enabled - if enabled, the collector will subscribe to this topic

Dataset Prefix - the dataset the data will be logged to in the historian. If the dataset does not already exist, it will be created when logging begins.

Tags Per Dataset - the max number of tags that will be logged into the dataset. If the max number is exceeded, a new dataset will be created and incremented by "1". (i.e. MQTTData, MQTTData1, MQTTData2, ...)

Remove Tag Name Prefix (# of Branches) - the collector will remove this number of branches from the beginning of the auto-generated tag name before logging to the historian. If set to 0, the full tag path will include the Group, Node, Device, and tag name.

Automatically Log All Tags - if true, any subscribed tags will automatically be logged into the historian. If false, subscribed tags will be discovered, but then need to be manually enabled in the 'Tags' table.

Sync Historian Tags - automatically removes tags from the historian based on the messages the collector receives from the broker. If collecting JSON data, each message MUST contain the full list of tag names.

Message Type - JSON

 

Topic - similar to SparkplugB, '#' and '+' can be used to to subscribe to '0 or more levels' or '1 level' respectively. Unlike SparkplugB though, the topic is not confined to 5 distinct levels. A topic could have any number of levels and does not include a message type. (Ex: Region4/Area1/Facility1/Wellpad5/Well23/Pump2/#)

Note: Topics are case-sensitive. Group1 != group1.

Sync Historian Tags - automatically removes tags from the historian based on the messages the collector receives from the broker. If collecting JSON data, each message MUST contain the full list of tag names.

Message Encoding - encoding to use when deserializing the JSON message from bytes

  • utf-8
  • utf-16
  • us-ascii
  • utf-16BE
  • utf-32
  • utf-7

Tag Field - name of the JSON field that contains the tag name

Tag Path Delimiter - Canary will replace the tag path delimiter with a period in the historian. Leave blank if no tag path delimiter exists in the tag name.

Timestamp Type - the timestamp used in the JSON payload. If no timestamp is contained within the message, select 'CollectorTimestamped' from the drop-down list and a timestamp will be applied at the time the value is received.

  • EpochMilliseconds
  • EpochSeconds
  • Iso8601String
  • CustomDateTimeString
  • CollectorTimestamped

Timestamp Field - name of the JSON field that contains the timestamp

Custom DateTime Format - if 'CustomDateTimeString' is selected for the Timestamp type, the user will be able to input the custom datetime. (Ex: DD-MM-yyyy HH:mm:ss)

Custom Format in UTC - check to enable if the custom datetime is in UTC

Value Field - name of the JSON field that contains the value

Quality Field - name of the JSON field that contains the quality. If left blank, the collector assumes the value is Good.

 

Sender Sessions

Once subscribed to a topic(s), data can be sent to a historian (or proxy) server. If wishing to send duplicate streams of data to more than one historian, click the '+' button next to 'Sender Sessions'.

Historian - the host name or IP address of the destination historian (or proxy) server

Enabled - if enabled, data will be sent

External Property Storage - if true, properties will be stored externally from the historian. Further configuration is needed if wishing to store properties in an external database. See How to Configure External Property Storage.

 

Filters

Although not commonly used since tags can be manually enabled in the 'Tags' table, filters can be applied prior to populating the 'Tags' table. Filters use RegEx (Regular Expressions) rules to match string values that will be found within the tag path. The tag path includes all topic branches along with the tag name. For example, the tag name ‘Compressor1/Tank/Pressure/PSI’ published on the SparkplugB topic ‘spBv1.0/Texas/DBIRTH/West/PLC1’ would be logged as 'Texas.West.PLC1.Compressor1.Tank.Pressure.PSI'.

Once a rule is created, the user has the option of including or excluding any tags that match the expression. Inclusion rules are applied first. If any inclusive filters are created, then none of the tags from the topic are included by default. If no inclusive filters are created, then all of the tags from the topic are logged by default. Exclusive filters are applied last. If tags match any of the exclusive rules, they are excluded from being logged (even if an inclusive rule was matched).

 

Tag Properties

In addition to properties that are logged from the publishing clients, the collector has the ability to include static properties that apply to ALL tags within the subscription group.

Name - name of the property to assign to tags

Value - the value of the given property

 

Tags

The 'Tags' table consists of all the tags the collector is subscribed to for that particular subscription group. This list is not populated until the connection group is enabled from the 'Status' screen and any applicable filters are applied. Not all subscribed tags are necessarily logged. Only those which are 'Enabled'. If tags are NOT set to be automatically logged from the 'Subscriptions' menu, they can be manually enabled from this list.

DELETE ALL - removes all tags from the list or the tags that are selected. Multiple tags can be selected using Ctrl+Click or Shift+Click.

DISABLE ALL - disables all tags in the list or the tags that are selected. Multiple tags can be selected using Ctrl+Click or Shift+Click.

ENABLE ALL - enables all tags in the list or the tags that are selected. Multiple tags can be selected using Ctrl+Click or Shift+Click.

FILTER - allows the user to filter the list based upon the Published Topic, Published Name, Canary Dataset, whether or not the tag is enabled, the Initial Birth, or the Latest Birth.

 

Note: This is filter is separate from the 'Filters' section mentioned above. The 'Filters' from above are generated prior to the tag list being generated.

 

Settings

The 'Settings' screen allows the user to configure the certificate used to make a secure connection to the MQTT server. To enable a secure connection, the user will need to ensure 'Enable SSL/TLS' is checked from the 'Configuration' screen.

  

Store Name - the file location of the client certificate within the machine's certificate store

Find Type - criteria by which the Collector searches for the certificate

  • FindBySubjectName
  • FindByThumbprint

Subject Name - the value of the Subject Name or Thumbprint

8replies Oldest first
  • Oldest first
  • Newest first
  • Active threads
  • Popular
  • It's not immediately clear to me what the JSON format should be exactly to function with the MQTT collector.  Could I get an example?  There apparently was an example in the Data Generator page but the zip file leads to a broken link now.

    Like
    • george. tian 

      We initially built this functionality off of what Kepware's IoT Gateway was producing. It doesn't have to match this exactly but here is a snippet of what that JSON payload looks like:

      {
          "timestamp": 1640188770477,
          "values": [
              {
                  "id": "Simulation Examples.Functions.Ramp1",
                  "v": 51,
                  "q": true,
                  "t": 1640188770305
              },
              {
                  "id": "Simulation Examples.Functions.Ramp2",
                  "v": 180,
                  "q": true,
                  "t": 1640188770305
              },
              {
                  "id": "Simulation Examples.Functions.Ramp3",
                  "v": 104,
                  "q": true,
                  "t": 1640188770305
              },
              {
                  "id": "Simulation Examples.Functions.Ramp4",
                  "v": 940,
                  "q": true,
                  "t": 1640188770305
              }
          ]
      }
      
      Like 1
  • I am unsure how to point the MQTT Collector to my own certificates.  I have generated my own key and certificate using openssl.  The Store Name in the settings tab shows 'Personal' as the default but I have no idea where that points to.

    Like
    • george. tian That is the personal folder in the computer's certificate store.  

      Like
    • Steve Mason I added my certificate that I generated using openSSL into my personal store and used FindByThumbprint and input the thumbprint but it does not seem to be finding it.

      Like
    • george. tian Does it find it if you search by SubjectName?

      Like
    • Steve Mason I tried various parts of the subject name but no success.

      Like
    • george. tian could you open a ticket with support@canarylabs.com? I'm not sure what is prohibiting the collector from the seeing the cert.

      Like