STANDARD: MQTT
Op deze pagina kunt u onze standaarden vinden om data te publiceren via MQTT. Voor variaties hierop, gelieve ons te contacteren via info@eniris.be.
Introductie
De standaarden worden gebruikt voor het versturen van tijdsgerelateerde data. Vooraleer we ingaan op de details van het versturen van de data via de standaarden zelf, geven we kort een woordje uitleg over hoe deze data wordt opgeslagen in onze tijdreeksdatabase. Achter de schermen maken we gebruik van de Influx database. Dit systeem heeft als voordeel dat het geoptimaliseerd is voor het efficiënt opslaan en query-en van tijdsreeksen.
Eén bepaald aspect dat daarin een belangrijke rol speelt, en dat we binnen de context van de API moeten toelichten is het idee van zogenaamde ‘retention policies’. Data binnen een bepaalde retention policy wordt gedurende een zekere termijn opgeslagen. Zodra deze voorbij is, wordt deze automatisch gerouleerd uit de database. De Eniris retention policies hebben namen die aangeven hoe frequent een bepaalde meetwaarde die binnen die policy wordt opgeslagen maximaal updatet. Zo hebben we volgende retention policies:
'rp_one_s' (één seconde) 'rp_one_m' (één minuut) 'rp_ten_m' (tien minuten) 'rp_one_h' (één uur) 'rp_six_h' (zes uren)
Data die wordt opgeslagen in een retention policy met een hoge frequentie wordt binnen de 10 dagen verwijderd, terwijl de data binnen een retention policy met een lagere frequentie tientallen jaren bewaard wordt. Meer info zie: https://wiki.eniris.be/wiki/publicinformation/view/Eniris%20Insights/Insights%20EMS/Retention%20Policies/. De motivatie is eenvoudig: de hoeveelheid data die per reeks wordt opgeslagen in de database is het product van de frequentie van het meetpunt en de duur dat data wordt gestockeerd. Door de duur te beperken, beperken we de hoeveelheid data per meetpunt. Vooraleer frequente data uitgerouleerd wordt, aggregeren we deze binnen een retention policy met een lagere frequentie (zodat het niet zo is dat een gebruiker met frequente metingen die enkele dagen terugklikt in zijn geschiedenis plots geen waardes meer terugvindt). Dit hele verhaal verklaart waarom, als u data via de standaarden doorgeeft, een indicatie dient te geven van de frequentie van de updates per reeks door de retention policy expliciet op te geven.
Tijdsreeksen
In de voorgaande paragraaf hebben we informeel over een reeks gesproken, maar we hebben tot nog toe niet heel duidelijk uitgelegd wat we daarmee precies bedoelen. Daarom geven we in deze alinea meer info over hoe data binnen een retention policy wordt opgeslagen.
Binnen elke retention policy zijn er een reeks metingen (measurements in Influx terminologie). Zo een meting kan u zich voorstellen als een tabel in Excel (ééntje met niet te veel kolommen, maar mogelijks zéér veel rijen), of als een SQL tabel. Binnen een measurement zijn er drie soorten kolommen.
De eerste kolom is eenvoudig te begrijpen: deze geeft per rij (een rij noemen we binnen deze context een meetpunt) een tijdsstip weer. Dit tijdsstip hoeft niet uniek te zijn: er kunnen meerdere meetpunten zijn die dezelfde timestamp delen.
De tweede kolom wordt de ‘tag set’ genoemd. Dit is een groep van kolommen die zo is, dat als we binnen één measurement kijken naar de waardes van deze kolommen, dat deze kolommen uniek een specifieke meter beschrijven die een beperkt aantal zaken meet. Misschien is dit wat onduidelijk, dus daarom een voorbeeldje: stel dat we zonneinstallaties monitoren, en elke installatie heeft één of meerdere inverters met nummers 0 tot N-1 waarvan we telkens het actuele vermogen meten, en de extra energieproductie sinds onze laatste update. In zo een situatie zal onze tabel met inverter data een kolom “installatieId” bevatten, en een kolom “inverterId”, die samen uniek een inverter beschrijven (voor de SQL-kenners: denk aan een aggregate key). Per inverter hebben we dan nog twee waardes die we willen stockeren: het vermogen en de energie delta.
Dit is waar ons laatste type kolom de kop op steekt: de velden. Deze bevatten de eigenlijke meetwaardes, wat afhankelijk van de context een float kan zijn, een int, een boolean, of een string. Binnen ons voorbeeldje hebben we dus twee velden die samen de zogenaamde ‘veld set’ vormen: actualPower en energyDelta.
Nu kunnen we terug itereren naar de definitie van een reeks, waarover we het zo uitgebreid hadden in de vorige alinea: een reeks is een unieke combinatie van een measurement + waardes voor alle tags binnen een measurement + één veld. Binnen het voorbeeld zou een voorbeeld van een reeks kunnen zijn: alle actualPower waardes voor een inverter met id 0 in een installatie met naam ‘pvEniris’. Als u al de waardes binnen een specifieke numerieke reeks uit de database haalt en visualiseert, dan krijgt u normaal gezien een curve die logisch steek houdt (in het voorbeeld het actueel vermogen van een welbepaalde inverter over tijd). Hoe vaak zo’n curve geüpdatet wordt, is dus waar u naar moet kijken bij het kiezen van de retention policy.
We zien nu ook waarom er soms voor één tijdsstip meerdere meetpunten kunnen zijn: in het voorbeeld zouden deze punten overeen komen met updates van verschillende inverters voor hetzelfde tijdsstip. Het is wel zo dat binnen één reeks er voor één tijdsstip hoogstens één waarde is: als we twee keer wegschrijven wat de actualPower waarde is voor de inverter met id 0 in een installatie met naam ‘pvEniris’ op 9 april 2021 op de middag, dan zal de tweede geschreven waarde automatisch de eerdere waarde overschrijven.
Als deze alinea wat onduidelijk is, kan deze link nuttig zijn: https://docs.influxdata.com/influxdb/v1.8/concepts/glossary/
MQTT-standaard (inkomende data)
Nu we inzicht hebben in hoe de data weggeschreven wordt in influx, en op dat vlak de juiste keuzes kunnen maken, is de eigenlijke implementatie van de MQTT standaard een koud kunstje. Als data geëncrypteerd verstuurd dient te worden, dan kan de data verstuurd worden via MQTT over TLS. Hiervoor dient u te connecteren met ‘mqtt.eniris.be’ (om de certificaten correct te valideren moet de domeinnaam gebruikt worden, niet het ip adres) op poort 8883. Als een toestel dit niet ondersteunt, kan een ongeëncrypteerde verbinding ook, via poort 1883. Hiervoor kan u connecteren met mqtt.eniris.be, of 188.166.18.198
Wij voorzien voor elk van uw toestellen een unieke gebruikersnaam en een wachtwoord combinatie om te authenticeren met de server.
Naar welk topic moet ik data sturen?
Voor een toestel kan u na aanmelden publiceren naar eender welk topic van de vorm:
standard1/$rp/$mea/$user
Hierbij is $rp een retention policy, zoals hogerop beschreven, $mea is een naam voor een measurement (dit kan u vrij kiezen, we verzoeken enkel om camelCase te gebruiken bij het definiëren van measurement names). $user is de user name waarmee u inlogt.
Hoe moet de payload eruitzien?
Elk bericht codeert één meetpunt en is een json string van de vorm:
{"time": $ts, "extraTags": $extraTags, "fields": $fields}
Waarbij $ts een unix timestamp is (in secondes). De variabele $extraTags is een dictionary (met key en value van het type string), waaraan steeds een element toegevoegd zal worden met als key “id” en als waarde de username. Het aldus resulterende dictionary specificeert de tag set van het meetpunt. Als er geen extra tags nodig zijn, kan deze variabele weggelaten worden. Merk op dat het niet zinvol is om aan extraTags een element met key “id” toe te voegen, aangezien de waarde daarvan steeds met de username zal overschreven worden. Tot slot is er $fields, een dictionary die de velden specificeert.
Extra informatie over het coderen van de velden vindt u in een voorbeeld python file (de enige dependency is paho-mqtt). Deze kunt u bij ons opvragen door onderstaand formulier in te vullen.
Example of data packet
"time": 1661506491,
"extraTags": {
"subId": "T19"
},
"fields": {
"temperature_degC": 16.76
}
}
Data reliability
Om de kwaliteit van data mee te geven kan er gewerkt worden met een extraTag ‘reliable’. Deze dient ‘true’ of ‘false’ te zijn en van het type string.
Aanmaken van MQTT-credentials
Om credentials aan te maken dient u over een account te beschikken op Eniris Insights.
Onder "Voeg Toe" vindt u bij "Nieuwe MQTT credentials" een mogelijkheid om credentials aan te maken.
De username moet van de vorm zijn uwbedrijfsnaam-index (bv. eniris-0).
Controleren of data goed toekomt
Via MQTT explorer kunt u verbinding maken met onze broker. MQTT explorer is hier te downloaden: http://mqtt-explorer.com/
Als u MQTT explorer heeft geïnstalleerd dan dient u eerst een verbinding tot stand te brengen. Selecteer “+” om een nieuwe verbinding toe te voegen.
Werken via TLS:
Subscriben op een topic via –> ADD:
Subscribe op het topic waarnaar er wordt gepushd.
bv: "standard1/autogen/messages/username"
Klik vervolgens op ADD.
MQTT-standaard (uitgaande data)
/standard1/outbound/$installationId/$mea/$username/$field
Waarbij de field een key value pair is, waarbij de key overeenstemt met de toegelaten keys uit de metric data collection standaard. https://wiki.eniris.be/wiki/publicinformation/view/Metrics%20for%20solar%20%26%20batteries/#
Waarbij mea staat voor het measurement. Wederom in overeenstemming met https://wiki.eniris.be/wiki/publicinformation/view/Metrics%20for%20solar%20%26%20batteries/#
Er wordt geen timestamp meegestuurd.
Waarbij installationId overeenstemt met deviceId uit data.json en in de Neo4j database.
QoS should be 1 or 2
If the mqtt server cannot accept data, or is not reachable, then the internal memory of your data logger or PLC must be used as a buffer. To ensure this principle, you should set QoS (Quality of Service) to a minimum of 1. Also check whether the buffer capacity can handle periods of up to 18 hours.
Python script
When data is sent via MQTT, you are not allowed to send an extraTag "id"!
Behind the scenes we add "id", which is equal to the MQTT username.
Update, we no longer allow pushing integers.
import paho.mqtt.client as mqtt
# The callback for when the client receives a CONNACK response from the server.
def on_connect(client, userdata, flags, rc):
print("Connected with result code "+str(rc))
username = ""
password = ""
qos = 1 # QoS should be 1 or 2 to guarantee that the data is stored
client = mqtt.Client()
client.tls_set() # When connecting on port 8883, tls support should be turned on. In this case a device must connect using the domain name (rather than via the ip address).
# Devices which do not support tls can connect via port 1883 (in this case an ip address is allowed)
client.username_pw_set(username, password)
client.on_connect = on_connect
client.connect("mqtt.eniris.be", 8883) # Host should be mqtt.eniris.be or 188.166.18.198 . Port should be 8883 (TLS) or 1883 (unencrypted)
client.loop_start()
for i in range(1000):
retention_policy = "rp_one_s" # Valid values are: "autogen" (unknown), "rp_one_s" (1 second), "rp_ten_s" (ten seconds), "rp_one_m" (one minute), "rp_ten_m" (ten minutes), "rp_one_h" (one hour), "rp_six_h" (six hours)
# A retention policy should be chosen such that its duration is close to the time between different measurements. Autogen is for cases where the duration is highly unpredictable (e.g. alarms)
measurement = "evChargerMetrics"
topic = "standard1/" + retention_policy + "/" + measurement + "/" + username # The "standard1/" prefix is obligatory. Make sure not to start the topic name with a / character.
ts = time.time() # The time of the measurement should be communicated as a unix timestamp. If second precision is sufficient, please round to a second. Idem for millisecond, or microsecond precisision.
# Two measurements cannot be closer together than one nanosecond. If 'time' is omitted, the time at which the data is processed on the server will be used.
# Note that this may differ significantly from the measurement time if the client network connection is poor, or if there is high load on the server. Therefore we recommend to explicitely include the time
# Publish data of inverter 0
extraTags = {"subId": "laadpaalAlfen"} # A dictionary with extra tags. Both the dictionary keys and values should be strings. If there are not extra tags, this can be ommitted from the message.
fields = {"voltageL1N_V": 400.0} # A dictionary with the measurement fields. The keys should be strings. The value can be either:
# - a number or a string representing a float and ending with the letter 'f': in this case the value will be stored as a float
# - a string representing an integer and ending with the letter 'i': in this case the value will be stored as an integer -- update, we no longer allow pushing integers.
# - True of False: in this case the value will be stored as a bool
# - other strings will be stored as plain strings
# - other values, such are None, float('nan'), float('inf'), float('-inf') are invalid. If a values is not defined, just do not include it in the measurement, rather than using None.
# Please ensure that the data type corresponding to a specific field key is always the same.
client.publish(topic, json.dumps({"time": int(ts), "extraTags": extraTags, "fields": fields}), qos=qos)
time.sleep(10)
client.loop_stop()
Voorbeeldscript versturen van alarmen:
import paho.mqtt.client as mqtt
import random
# The callback for when the client receives a CONNACK response from the server.
def on_connect(client, userdata, flags, rc):
print("Connected with result code "+str(rc))
username = ""
password = ""
qos = 1 # QoS should be 1 or 2 to guarantee that the data is stored
client = mqtt.Client()
client.tls_set() # When connecting on port 8883 or 443, tls support should be turned on. In this case a device must connect using the domain name (rather than via the ip address).
# Devices which do not support tls can connect via port 1883 or 80 (in this case connecting using an ip address is allowed, but still deprecated)
client.username_pw_set(username, password)
client.on_connect = on_connect
client.connect("mqtt.eniris.be", 8883) # Host should be mqtt.eniris.be (recommended) or 188.166.18.198 (deprecated). Port should be 8883|443 (TLS) or 1883|80 (unencrypted)
client.loop_start()
## As an example, we simulate the occurence of ten alarms
for i in range(10):
## First, we define some standard-related parameters that indicate where the alarm data should be stored
retention_policy = "autogen" # Valid values are: "autogen" (unknown), "rp_one_s" (1 second), "rp_ten_s" (ten seconds), "rp_one_m" (one minute), "rp_ten_m" (ten minutes), "rp_one_h" (one hour), "rp_six_h" (six hours)
# A retention policy should be chosen such that its duration is close to the time between different measurements. Autogen is for cases where the duration is highly unpredictable (e.g. alarms)
measurement = "messages"
topic = "standard1/" + retention_policy + "/" + measurement + "/" + username # The "standard1/" prefix is obligatory. Make sure not to start the topic name with a / character.
## Let's say an alarm occurs right now. We need the Unix timestamp at which the alarm starts:
startTs = time.time()
extraTags = {"subId": "laadpaalAlfen"} # A dictionary with extra tags. Both the dictionary keys and values should be strings. If there are no extra tags, this can be ommitted from the message.
# If the MQTT credentials are used by a single device, it is ok to not include any extra tags; however if there are multiple devices sending data via the same credentials, including a subId is recommended, to indicate to which specific device the message corresponds.
fields = {"text": "My alarm message"} # A dictionary with the measurement fields. The keys should be strings. The value can be either:
# - a number or a string representing a float and ending with the letter 'f': in this case the value will be stored as a float
# - a string representing an integer and ending with the letter 'i': in this case the value will be stored as an integer -- update, we no longer allow pushing integers.
# - True of False: in this case the value will be stored as a bool
# - other strings will be stored as plain strings
# - other values, such are None, float('nan'), float('inf'), float('-inf') are invalid. If a values is not defined, just do not include it in the measurement, rather than using None.
# Please ensure that the data type corresponding to a specific field key is always the same.
## Let's send the message
client.publish(topic, json.dumps({"time": int(startTs), "extraTags": extraTags, "fields": fields}), qos=qos)
## Wait until the alarm is over
time.sleep(10 + random.random()*50)
## Once the alarm is no longer present, send a new message to store the end time
endTs = time.time()
extraTags = {"subId": "laadpaalAlfen"}
fields = {"endTime": endTs} ## Feel free to include the text field as well, if the contents of the alarm text should be updated
client.publish(topic, json.dumps({"time": int(startTs), "extraTags": extraTags, "fields": fields}), qos=qos) ## NOTE THAT THE TIME OF THE MESSAGE SHOULD CORRESPOND TO THE START TIME OF THE ALARM!!
## Now nothing has to be done until a new alarm occurs...
time.sleep(10 + random.random()*50)
client.loop_stop()
Vaak voorkomende problemen en FAQ
- Het topic ontbreekt of is verkeerd geformuleerd. Deze dient van de vorm te zijn:
topic = "standard1/" + retention_policy + "/" + measurement + "/" + username
- De MQTT-verbinding wordt soms verbroken.
Dit kan voorvallen indien bv. NaN wordt gestuurd ipv een getal.
- Lukt het om meetpunten i.p.v. in één bericht in meerdere berichten door te sturen?
Jazeker. Deze kunnen bericht na bericht verstuurd worden. Bv.
{"time":1669042670,"extraTags":{"subId":"W1"},"fields":{"absVolume_m3":0, “temperature_degC”:20}}
wordt:
{"time":1669042670,"extraTags":{"subId":"W1"},"fields":{"absVolume_m3":0}}
{"time":1669042670,"extraTags":{"subId":"W1"},"fields":{“temperature_degC”:20}}
- Hoe controleren of data goed toekomt?
Dit kan via MQTT Explorer.
- Welke retention policy dien ik te kiezen?
Je kiest de retention policy in functie van de frequentie waarmee je data stuurt en bijbehorende bewaartermijn. Meer uitleg over retention policies en de link met de frequentie van data is te vinden in de introductie. Meer info over de bewaartermijn: https://wiki.eniris.be/wiki/publicinformation/view/Eniris%20Insights/Insights%20EMS/Retention%20Policies/
- Als het topic standard1/$rp/$mea/$user is, hoe wordt er dan bepaald voor welk toestel de data is? Of moet $mea altijd uniek zijn?
Via de extraTags kan een toestel uniek gedefinieerd worden. $mea wordt niet uniek gekozen maar i.f.v. het type data: https://wiki.eniris.be/wiki/publicinformation/view/Metrics%20for%20solar%20%26%20batteries/
- Hoe controleer ik de timestamp?
Ga naar https://www.unixtimestamp.com/ en vul de timestamp in. Controleer of de datum klopt en het formaat "Seconds" is.