English     Dutch     French     German     Spanish

STANDARD: MQTT

Last modified by Wim Verheirstraeten on 2023/10/06 18:41

On this page, you can find our standards for publishing data via MQTT. For variations on these standards, please contact us at info@eniris.be.

Introduction

These standards are used for sending time-related data. Before we delve into the details of sending data via the standards themselves, we briefly explain how this data is stored in our time series database. Behind the scenes, we use the Influx database. This system has the advantage of being optimized for efficiently storing and querying time series.

One particular aspect that plays an important role in this, and that we need to explain within the context of the API, is the idea of so-called "retention policies." Data within a certain retention policy is stored for a certain period of time. Once this time has passed, it is automatically rotated out of the database. The Eniris retention policies have names that indicate how frequently a certain measurement value stored within that policy can be updated. We have the following retention policies:

'rp_one_s' (one second)
'rp_one_m' (one minute)
'rp_ten_m' (ten minutes)
'rp_one_h' (one hour)
'rp_six_h' (six hours)

Data stored in a retention policy with a high frequency is deleted within 10 days, while data stored in a retention policy with a lower frequency is kept for several decades. For more information, see: https://wiki.eniris.be/wiki/publicinformation/view/Eniris%20Insights/Insights%20EMS/Retention%20Policies/. The motivation is simple: the amount of data stored per series in the database is the product of the frequency of the measurement point and the duration that the data is stored. By limiting the duration, we limit the amount of data per measurement point. Before frequent data is rotated out, we aggregate it within a retention policy with a lower frequency (so that a user with frequent measurements who clicks back a few days in their history doesn't suddenly lose all values). This whole story explains why, when passing data via the standards, an indication of the frequency of updates per series needs to be given by explicitly specifying the retention policy.

Timeseries

In the previous paragraph, we informally talked about a series, but we haven't explained exactly what we mean by that yet. That's why we provide more information in this paragraph on how data is stored within a retention policy.

Within each retention policy, there are a series of measurements (known as "measurements" in Influx terminology). You can think of a measurement as a table in Excel (one with not too many columns, but possibly a lot of rows), or as a SQL table. Within a measurement, there are three types of columns.

The first column is easy to understand: it displays a timestamp for each row (which we call a data point in this context). This timestamp doesn't have to be unique: there can be multiple data points that share the same timestamp.

The second column is called the "tag set". This is a group of columns that is such that if we look at the values of these columns within a single measurement, these columns uniquely describe a specific meter that measures a limited number of things. Perhaps this is a bit unclear, so here's an example: let's say we're monitoring solar installations, and each installation has one or more inverters with numbers 0 to N-1, and we measure the current power and the extra energy production since our last update for each inverter. In such a situation, our table of inverter data will have a column called "installationId" and a column called "inverterId", which together uniquely describe an inverter (for SQL experts: think of an aggregate key). For each inverter, we still want to store two values: the power and the energy delta. This is where our last type of column comes in: the fields. These contain the actual measurement values, which depending on the context can be a float, an int, a boolean, or a string. Within our example, we have two fields that together form the so-called "field set": actualPower and energyDelta.

Now we can go back to the definition of a series, which we discussed so extensively in the previous paragraph: a series is a unique combination of a measurement + values for all tags within a measurement + one field. Within the example, an example of a series could be: all actualPower values for an inverter with id 0 in an installation named 'pvEniris'. If you retrieve and visualize all the values within a specific numerical series from the database, you will normally get a curve that makes sense (in the example, the current power of a specific inverter over time). How often such a curve is updated is what you should look at when choosing the retention policy.

We now also see why there can sometimes be multiple data points for one timestamp: in the example, these points would correspond to updates from different inverters for the same timestamp. However, within one series, there can be at most one value for one timestamp: if we write down twice what the actualPower value is for the inverter with id 0 in an installation named 'pvEniris' at noon on April 9, 2021, the second written value will automatically overwrite the earlier value.

If this paragraph is a bit unclear, this link may be useful: https://docs.influxdata.com/influxdb/v1.8/concepts/glossary/

MQTT standard (incoming data)

Now that we understand how the data is written to Influx, and can make the right choices in that regard, the actual implementation of the MQTT standard is a breeze. If data needs to be sent encrypted, it can be sent via MQTT over TLS. To do this, you need to connect to 'mqtt.eniris.be' (to correctly validate the certificates, the domain name must be used, not the IP address) on port 8883. If a device does not support this, an unencrypted connection can also be used via port 1883. To do this, you can connect to mqtt.eniris.be or 188.166.18.198.

We provide a unique username and password combination for each of your devices to authenticate with the server.

To which topic should I send data?

For a device, after logging in, you can publish to any topic of the form:

standard1/$rp/$mea/$user

Here, $rp is a retention policy, as described earlier, $mea is a name for a measurement (you can choose this freely, we only request that you use camelCase when defining measurement names). $user is the user name you use to log in.

What should the payload look like?

Each message encodes one data point and is a JSON string of the form:

{"time": $ts, "extraTags": $extraTags, "fields": $fields}

Where $ts is a Unix timestamp (in seconds). The variable $extraTags is a dictionary (with key and value of type string), to which an element will always be added with key "id" and value equal to the username. The resulting dictionary specifies the tag set of the data point. If no extra tags are needed, this variable can be omitted. Note that it does not make sense to add an element with the key "id" to extraTags, as its value will always be overwritten with the username. Finally, there is $fields, a dictionary that specifies the fields.

Additional information on encoding the fields can be found in a sample Python file (the only dependency is paho-mqtt). You can request this from us by filling out the form below.

Example of data packet

{
 "time": 1661506491,
 "extraTags": {
   "subId": "T19"
  },
 "fields": {
   "temperature_degC": 16.76
  }
}

Data reliability

To give information about the quality of data, an extraTag 'reliable' can be used. It should be of type string and either 'true' or 'false'.

Creating MQTT credentials

To create credentials, you need to have an account on Eniris Insights.

Under "Add", you will find an option to create new MQTT credentials.

The username should be in the format yourcompanyname-index (e.g. eniris-0).

image-20221201000652-1.png

Checking if data is received correctly

You can connect to our broker via MQTT explorer. MQTT explorer can be downloaded here: http://mqtt-explorer.com/ If you have installed MQTT explorer, you first need to establish a connection. Select "+" to add a new connection.

Working via TLS:

Subscribe to a topic via -> ADD:

1643019460831-821.png

Subscribe to the topic to which data is being pushed.

e.g. "standard1/autogen/messages/username"

Then click on ADD.

1668700037452-247.png

MQTT standard (outgoing data)

/standard1/outbound/$installationId/$mea/$username/$field

Where $field is a key value pair, where the key corresponds to the allowed keys from the metric data collection standard. https://wiki.eniris.be/wiki/publicinformation/view/Metrics%20for%20solar%20%26%20batteries/#

Where $mea stands for the measurement. Again in accordance with https://wiki.eniris.be/wiki/publicinformation/view/Metrics%20for%20solar%20%26%20batteries/#

No timestamp is included.

Where $installationId corresponds to deviceId from data.json and in the Neo4j database.

QoS should be 1 or 2

If the MQTT server cannot accept data, or is not reachable, then the internal memory of your data logger or PLC must be used as a buffer. To ensure this principle, you should set QoS (Quality of Service) to a minimum of 1. Also check whether the buffer capacity can handle periods of up to 18 hours.

Python script

When data is sent via MQTT, you are not allowed to send an extraTag "id"!
Behind the scenes we add "id", which is equal to the MQTT username.

Update, we no longer allow pushing integers.

import time, json
import paho.mqtt.client as mqtt

# The callback for when the client receives a CONNACK response from the server.
def on_connect(client, userdata, flags, rc):
   print("Connected with result code "+str(rc))

username = ""
password = ""
qos = 1 # QoS should be 1 or 2 to guarantee that the data is stored

client = mqtt.Client()
client.tls_set() # When connecting on port 8883, tls support should be turned on. In this case a device must connect using the domain name (rather than via the ip address).
                # Devices which do not support tls can connect via port 1883 (in this case an ip address is allowed)
client.username_pw_set(username, password)
client.on_connect = on_connect

client.connect("mqtt.eniris.be", 8883) # Host should be mqtt.eniris.be or 188.166.18.198 . Port should be 8883 (TLS) or 1883 (unencrypted)
client.loop_start()
for i in range(1000):
  retention_policy = "rp_one_s" # Valid values are: "autogen" (unknown), "rp_one_s" (1 second), "rp_ten_s" (ten seconds), "rp_one_m" (one minute), "rp_ten_m" (ten minutes), "rp_one_h" (one hour), "rp_six_h" (six hours)
                               # A retention policy should be chosen such that its duration is close to the time between different measurements. Autogen is for cases where the duration is highly unpredictable (e.g. alarms)
  measurement = "evChargerMetrics"
  topic = "standard1/" + retention_policy + "/" + measurement + "/" + username # The "standard1/" prefix is obligatory. Make sure not to start the topic name with a / character.
  ts = time.time() # The time of the measurement should be communicated as a unix timestamp. If second precision is sufficient, please round to a second. Idem for millisecond, or microsecond precisision.
                  # Two measurements cannot be closer together than one nanosecond. If 'time' is omitted, the time at which the data is processed on the server will be used.
                  # Note that this may differ significantly from the measurement time if the client network connection is poor, or if there is high load on the server. Therefore we recommend to explicitely include the time
 # Publish data of inverter 0
  extraTags = {"subId": "laadpaalAlfen"} # A dictionary with extra tags. Both the dictionary keys and values should be strings. If there are not extra tags, this can be ommitted from the message.
  fields = {"voltageL1N_V": 400.0} # A dictionary with the measurement fields. The keys should be strings. The value can be either:
                                                 #    - a number or a string representing a float and ending with the letter 'f': in this case the value will be stored as a float
                                                 #    - a string representing an integer and ending with the letter 'i': in this case the value will be stored as an integer -- update, we no longer allow pushing integers.
                                                 #    - True of False: in this case the value will be stored as a bool
                                                 #    - other strings will be stored as plain strings
                                                 #    - other values, such are None, float('nan'), float('inf'), float('-inf') are invalid. If a values is not defined, just do not include it in the measurement, rather than using None.
                                                 # Please ensure that the data type corresponding to a specific field key is always the same.
  client.publish(topic, json.dumps({"time": int(ts), "extraTags": extraTags, "fields": fields}), qos=qos)

  time.sleep(10)

client.loop_stop()

Example script for sending alarms:

import time, json, random
import paho.mqtt.client as mqtt
import random

# The callback for when the client receives a CONNACK response from the server.
def on_connect(client, userdata, flags, rc):
   print("Connected with result code "+str(rc))

username = ""
password = ""
qos = 1 # QoS should be 1 or 2 to guarantee that the data is stored

client = mqtt.Client()
client.tls_set() # When connecting on port 8883 or 443, tls support should be turned on. In this case a device must connect using the domain name (rather than via the ip address).
                # Devices which do not support tls can connect via port 1883 or 80 (in this case connecting using an ip address is allowed, but still deprecated)
client.username_pw_set(username, password)
client.on_connect = on_connect

client.connect("mqtt.eniris.be", 8883) # Host should be mqtt.eniris.be (recommended) or 188.166.18.198 (deprecated). Port should be 8883|443 (TLS) or 1883|80 (unencrypted)
client.loop_start()

## As an example, we simulate the occurence of ten alarms
for i in range(10):
 ## First, we define some standard-related parameters that indicate where the alarm data should be stored
  retention_policy = "autogen" # Valid values are: "autogen" (unknown), "rp_one_s" (1 second), "rp_ten_s" (ten seconds), "rp_one_m" (one minute), "rp_ten_m" (ten minutes), "rp_one_h" (one hour), "rp_six_h" (six hours)
                              # A retention policy should be chosen such that its duration is close to the time between different measurements. Autogen is for cases where the duration is highly unpredictable (e.g. alarms)
  measurement = "messages"
  topic = "standard1/" + retention_policy + "/" + measurement + "/" + username # The "standard1/" prefix is obligatory. Make sure not to start the topic name with a / character.
 
 
 ## Let's say an alarm occurs right now. We need the Unix timestamp at which the alarm starts:
  startTs = time.time()
  extraTags = {"subId": "laadpaalAlfen"} # A dictionary with extra tags. Both the dictionary keys and values should be strings. If there are no extra tags, this can be ommitted from the message.
                                        # If the MQTT credentials are used by a single device, it is ok to not include any extra tags; however if there are multiple devices sending data via the same credentials, including a subId is recommended, to indicate to which specific device the message corresponds.
  fields = {"text": "My alarm message"} # A dictionary with the measurement fields. The keys should be strings. The value can be either:
                                                 #    - a number or a string representing a float and ending with the letter 'f': in this case the value will be stored as a float
                                                 #    - a string representing an integer and ending with the letter 'i': in this case the value will be stored as an integer -- update, we no longer allow pushing integers.
                                                 #    - True of False: in this case the value will be stored as a bool
                                                 #    - other strings will be stored as plain strings
                                                 #    - other values, such are None, float('nan'), float('inf'), float('-inf') are invalid. If a values is not defined, just do not include it in the measurement, rather than using None.
                                                 # Please ensure that the data type corresponding to a specific field key is always the same.
 ## Let's send the message
  client.publish(topic, json.dumps({"time": int(startTs), "extraTags": extraTags, "fields": fields}), qos=qos)
 
 ## Wait until the alarm is over
  time.sleep(10 + random.random()*50)  
 
 ## Once the alarm is no longer present, send a new message to store the end time
  endTs = time.time()
  extraTags = {"subId": "laadpaalAlfen"}
  fields = {"endTime": endTs} ## Feel free to include the text field as well, if the contents of the alarm text should be updated
  client.publish(topic, json.dumps({"time": int(startTs), "extraTags": extraTags, "fields": fields}), qos=qos) ## NOTE THAT THE TIME OF THE MESSAGE SHOULD CORRESPOND TO THE START TIME OF THE ALARM!!
 
 ## Now nothing has to be done until a new alarm occurs...
  time.sleep(10 + random.random()*50)

client.loop_stop()

Common issues and FAQs:

  • The topic is missing or incorrectly formulated. It should be in the form:    

   topic = "standard1/" + retention_policy + "/" + measurement + "/" + username

  • Is it possible to send measurements in multiple messages instead of one?

Yes, measurements can be sent message by message. For example:

{"time":1669042670,"extraTags":{"subId":"W1"},"fields":{"absVolume_m3":0, “temperature_degC”:20}}

would become:

{"time":1669042670,"extraTags":{"subId":"W1"},"fields":{"absVolume_m3":0}}

{"time":1669042670,"extraTags":{"subId":"W1"},"fields":{“temperature_degC”:20}}

  • How can I verify that the data is being received correctly?

This can be done via MQTT Explorer.

  • Which retention policy should I choose?

The retention policy should be chosen based on the frequency of data transmission and the corresponding storage period. More information about retention policies and their relationship to data frequency can be found in the introduction. More information about the storage period can be found here: https://wiki.eniris.be/wiki/publicinformation/view/Eniris%20Insights/Insights%20EMS/Retention%20Policies/

  • If the topic is standard1/$rp/$mea/$user, how is the device associated with the data determined? Or should $mea always be unique?

The device can be uniquely defined through the extraTags. $mea is not chosen uniquely but according to the type of data: https://wiki.eniris.be/wiki/publicinformation/view/Metrics%20for%20solar%20%26%20batteries/

  • How can I verify the timestamp?

Go to https://www.unixtimestamp.com/ and enter the timestamp. Check that the date is correct and that the format is "Seconds".

1677761859720-123.png

    

Applications

(c) Eniris, 2024