English     Dutch     French     German     Spanish

Wiki source code of MQTT

Last modified by Wim Verheirstraeten on 2024/05/05 16:29

Show last authors
1 On this page, you can find our standards for publishing data via MQTT. For variations on these standards, please contact us at [[info@eniris.be>>mailto:info@eniris.be]].
2
3 == **Introduction** ==
4
5 These standards are used for sending time-related data. Before we delve into the details of sending data via the standards themselves, we briefly explain how this data is stored in our time series database. Behind the scenes, we use the Influx database. This system has the advantage of being optimized for efficiently storing and querying time series.
6
7 One particular aspect that plays an important role in this, and that we need to explain within the context of the API, is the idea of so-called "retention policies." Data within a certain retention policy is stored for a certain period of time. Once this time has passed, it is automatically rotated out of the database. The Eniris retention policies have names that indicate how frequently a certain measurement value stored within that policy can be updated. We have the following retention policies:
8
9
10 {{{'rp_one_s' (one second)
11 'rp_one_m' (one minute)
12 'rp_ten_m' (ten minutes)
13 'rp_one_h' (one hour)
14 'rp_six_h' (six hours)}}}
15
16
17 Data stored in a retention policy with a high frequency is deleted within 10 days, while data stored in a retention policy with a lower frequency is kept for several decades. For more information, see: [[https:~~/~~/wiki.eniris.be/wiki/publicinformation/view/Eniris%20Insights/Insights%20EMS/Retention%20Policies/>>url:https://wiki.eniris.be/wiki/publicinformation/view/Eniris%20Insights/Insights%20EMS/Retention%20Policies/]]. The motivation is simple: the amount of data stored per series in the database is the product of the frequency of the measurement point and the duration that the data is stored. By limiting the duration, we limit the amount of data per measurement point. Before frequent data is rotated out, we aggregate it within a retention policy with a lower frequency (so that a user with frequent measurements who clicks back a few days in their history doesn't suddenly lose all values). This whole story explains why, when passing data via the standards, an indication of the frequency of updates per series needs to be given by explicitly specifying the retention policy.
18
19
20 === **Timeseries** ===
21
22 In the previous paragraph, we informally talked about a series, but we haven't explained exactly what we mean by that yet. That's why we provide more information in this paragraph on how data is stored within a retention policy.
23
24 Within each retention policy, there are a series of measurements (known as "measurements" in Influx terminology). You can think of a measurement as a table in Excel (one with not too many columns, but possibly a lot of rows), or as a SQL table. Within a measurement, there are three types of columns.
25
26 The first column is easy to understand: it displays a timestamp for each row (which we call a data point in this context). This timestamp doesn't have to be unique: there can be multiple data points that share the same timestamp.
27
28 The second column is called the "tag set". This is a group of columns that is such that if we look at the values of these columns within a single measurement, these columns uniquely describe a specific meter that measures a limited number of things. Perhaps this is a bit unclear, so here's an example: let's say we're monitoring solar installations, and each installation has one or more inverters with numbers 0 to N-1, and we measure the current power and the extra energy production since our last update for each inverter. In such a situation, our table of inverter data will have a column called "installationId" and a column called "inverterId", which together uniquely describe an inverter (for SQL experts: think of an aggregate key). For each inverter, we still want to store two values: the power and the energy delta. This is where our last type of column comes in: the fields. These contain the actual measurement values, which depending on the context can be a float, an int, a boolean, or a string. Within our example, we have two fields that together form the so-called "field set": actualPower and energyDelta.
29
30 Now we can go back to the definition of a series, which we discussed so extensively in the previous paragraph: a series is a unique combination of a measurement + values for all tags within a measurement + one field. Within the example, an example of a series could be: all actualPower values for an inverter with id 0 in an installation named 'pvEniris'. If you retrieve and visualize all the values within a specific numerical series from the database, you will normally get a curve that makes sense (in the example, the current power of a specific inverter over time). How often such a curve is updated is what you should look at when choosing the retention policy.
31
32 We now also see why there can sometimes be multiple data points for one timestamp: in the example, these points would correspond to updates from different inverters for the same timestamp. However, within one series, there can be at most one value for one timestamp: if we write down twice what the actualPower value is for the inverter with id 0 in an installation named 'pvEniris' at noon on April 9, 2021, the second written value will automatically overwrite the earlier value.
33
34 If this paragraph is a bit unclear, this link may be useful: [[https:~~/~~/docs.influxdata.com/influxdb/v1.8/concepts/glossary/>>url:https://docs.influxdata.com/influxdb/v1.8/concepts/glossary/]]
35
36
37 == **MQTT standard (incoming data)** ==
38
39 Now that we understand how the data is written to Influx, and can make the right choices in that regard, the actual implementation of the MQTT standard is a breeze. If data needs to be sent encrypted, it can be sent via MQTT over TLS. To do this, you need to connect to 'mqtt.eniris.be' (to correctly validate the certificates, the domain name must be used, not the IP address) on port 8883. If a device does not support this, an unencrypted connection can also be used via port 1883. To do this, you can connect to mqtt.eniris.be or 188.166.18.198.
40
41 We provide a unique username and password combination for each of your devices to authenticate with the server.
42
43 >To which topic should I send data?
44
45
46
47 For a device, after logging in, you can publish to any topic of the form:
48
49 {{{standard1/$rp/$mea/$user}}}
50
51 Here, $rp is a retention policy, as described earlier, $mea is a name for a measurement (you can choose this freely, we only request that you use camelCase when defining measurement names). $user is the user name you use to log in.
52
53 >What should the payload look like?
54
55
56 Each message encodes one data point and is a JSON string of the form:
57
58 {"time": $ts, "extraTags": $extraTags, "fields": $fields}
59
60 Where $ts is a Unix timestamp (in seconds). The variable $extraTags is a dictionary (with key and value of type string), to which an element will always be added with key "id" and value equal to the username. The resulting dictionary specifies the tag set of the data point. If no extra tags are needed, this variable can be omitted. Note that it does not make sense to add an element with the key "id" to extraTags, as its value will always be overwritten with the username. Finally, there is $fields, a dictionary that specifies the fields.
61
62 Additional information on encoding the fields can be found in a sample Python file (the only dependency is paho-mqtt). You can request this from us by filling out the form below.
63
64 === **Example of data packet** ===
65
66 {{code language="JSON"}}
67 {
68 "time": 1661506491,
69 "extraTags": {
70 "subId": "T19"
71 },
72 "fields": {
73 "temperature_degC": 16.76
74 }
75 }
76 {{/code}}
77
78 === //Data reliability// ===
79
80 To give information about the quality of data, an extraTag 'reliable' can be used. It should be of type string and either 'true' or 'false'.
81
82
83 = **Creating MQTT credentials** =
84
85
86 To create credentials, you need to have an account on Eniris Insights.
87
88 Under "Add", you will find an option to create new MQTT credentials.
89
90 The username should be in the format yourcompanyname-index (e.g. eniris-0).
91
92
93 [[image:image-20221201000652-1.png]]
94
95
96 = **Checking if data is received correctly** =
97
98
99 You can connect to our broker via MQTT explorer. MQTT explorer can be downloaded here: [[http:~~/~~/mqtt-explorer.com/>>url:http://mqtt-explorer.com/]] If you have installed MQTT explorer, you first need to establish a connection. Select "+" to add a new connection.
100
101 Working via TLS:
102
103 Subscribe to a topic via -> ADD:
104
105
106 [[image:1643019460831-821.png]]
107
108
109
110 Subscribe to the topic to which data is being pushed.
111
112 e.g. "standard1/autogen/messages/username"
113
114 Then click on ADD.
115
116 [[image:1668700037452-247.png||height="594" width="1105"]]
117
118
119
120 == **MQTT standard (outgoing data)** ==
121
122
123 {{code language="none"}}
124 /standard1/outbound/$installationId/$mea/$username/$field
125 {{/code}}
126
127 Where $field is a key value pair, where the key corresponds to the allowed keys from the metric data collection standard. [[https:~~/~~/wiki.eniris.be/wiki/publicinformation/view/Metrics%20for%20solar%20%26%20batteries/#>>url:https://wiki.eniris.be/wiki/publicinformation/view/Metrics%20for%20solar%20%26%20batteries/#]]
128
129 Where $mea stands for the measurement. Again in accordance with [[https:~~/~~/wiki.eniris.be/wiki/publicinformation/view/Metrics%20for%20solar%20%26%20batteries/#>>url:https://wiki.eniris.be/wiki/publicinformation/view/Metrics%20for%20solar%20%26%20batteries/#]]
130
131 No timestamp is included.
132
133 Where $installationId corresponds to deviceId from data.json and in the Neo4j database.
134
135
136 **QoS should be 1 or 2**
137
138
139 If the MQTT server cannot accept data, or is not reachable, then the internal memory of your data logger or PLC must be used as a buffer. To ensure this principle, you should set QoS (Quality of Service) to a minimum of 1. Also check whether the buffer capacity can handle periods of up to 18 hours.
140
141
142 == **Python script** ==
143
144 When data is sent via MQTT, **you are not allowed to send an extraTag "id"!**
145 Behind the scenes we add "id", which is equal to the MQTT username.
146
147
148 Update, we no longer allow pushing integers.
149
150
151 {{code language="python"}}
152 import time, json
153 import paho.mqtt.client as mqtt
154
155 # The callback for when the client receives a CONNACK response from the server.
156 def on_connect(client, userdata, flags, rc):
157 print("Connected with result code "+str(rc))
158
159 username = ""
160 password = ""
161 qos = 1 # QoS should be 1 or 2 to guarantee that the data is stored
162
163 client = mqtt.Client()
164 client.tls_set() # When connecting on port 8883, tls support should be turned on. In this case a device must connect using the domain name (rather than via the ip address).
165 # Devices which do not support tls can connect via port 1883 (in this case an ip address is allowed)
166 client.username_pw_set(username, password)
167 client.on_connect = on_connect
168
169 client.connect("mqtt.eniris.be", 8883) # Host should be mqtt.eniris.be or 188.166.18.198 . Port should be 8883 (TLS) or 1883 (unencrypted)
170 client.loop_start()
171 for i in range(1000):
172 retention_policy = "rp_one_s" # Valid values are: "autogen" (unknown), "rp_one_s" (1 second), "rp_ten_s" (ten seconds), "rp_one_m" (one minute), "rp_ten_m" (ten minutes), "rp_one_h" (one hour), "rp_six_h" (six hours)
173 # A retention policy should be chosen such that its duration is close to the time between different measurements. Autogen is for cases where the duration is highly unpredictable (e.g. alarms)
174 measurement = "evChargerMetrics"
175 topic = "standard1/" + retention_policy + "/" + measurement + "/" + username # The "standard1/" prefix is obligatory. Make sure not to start the topic name with a / character.
176 ts = time.time() # The time of the measurement should be communicated as a unix timestamp. If second precision is sufficient, please round to a second. Idem for millisecond, or microsecond precisision.
177 # Two measurements cannot be closer together than one nanosecond. If 'time' is omitted, the time at which the data is processed on the server will be used.
178 # Note that this may differ significantly from the measurement time if the client network connection is poor, or if there is high load on the server. Therefore we recommend to explicitely include the time
179 # Publish data of inverter 0
180 extraTags = {"subId": "laadpaalAlfen"} # A dictionary with extra tags. Both the dictionary keys and values should be strings. If there are not extra tags, this can be ommitted from the message.
181 fields = {"voltageL1N_V": 400.0} # A dictionary with the measurement fields. The keys should be strings. The value can be either:
182 # - a number or a string representing a float and ending with the letter 'f': in this case the value will be stored as a float
183 # - a string representing an integer and ending with the letter 'i': in this case the value will be stored as an integer -- update, we no longer allow pushing integers.
184 # - True of False: in this case the value will be stored as a bool
185 # - other strings will be stored as plain strings
186 # - other values, such are None, float('nan'), float('inf'), float('-inf') are invalid. If a values is not defined, just do not include it in the measurement, rather than using None.
187 # Please ensure that the data type corresponding to a specific field key is always the same.
188 client.publish(topic, json.dumps({"time": int(ts), "extraTags": extraTags, "fields": fields}), qos=qos)
189
190 time.sleep(10)
191
192 client.loop_stop()
193 {{/code}}
194
195 Example script for sending alarms:
196
197 {{code language="python"}}
198 import time, json, random
199 import paho.mqtt.client as mqtt
200 import random
201
202 # The callback for when the client receives a CONNACK response from the server.
203 def on_connect(client, userdata, flags, rc):
204 print("Connected with result code "+str(rc))
205
206 username = ""
207 password = ""
208 qos = 1 # QoS should be 1 or 2 to guarantee that the data is stored
209
210 client = mqtt.Client()
211 client.tls_set() # When connecting on port 8883 or 443, tls support should be turned on. In this case a device must connect using the domain name (rather than via the ip address).
212 # Devices which do not support tls can connect via port 1883 or 80 (in this case connecting using an ip address is allowed, but still deprecated)
213 client.username_pw_set(username, password)
214 client.on_connect = on_connect
215
216 client.connect("mqtt.eniris.be", 8883) # Host should be mqtt.eniris.be (recommended) or 188.166.18.198 (deprecated). Port should be 8883|443 (TLS) or 1883|80 (unencrypted)
217 client.loop_start()
218
219 ## As an example, we simulate the occurence of ten alarms
220 for i in range(10):
221 ## First, we define some standard-related parameters that indicate where the alarm data should be stored
222 retention_policy = "autogen" # Valid values are: "autogen" (unknown), "rp_one_s" (1 second), "rp_ten_s" (ten seconds), "rp_one_m" (one minute), "rp_ten_m" (ten minutes), "rp_one_h" (one hour), "rp_six_h" (six hours)
223 # A retention policy should be chosen such that its duration is close to the time between different measurements. Autogen is for cases where the duration is highly unpredictable (e.g. alarms)
224 measurement = "messages"
225 topic = "standard1/" + retention_policy + "/" + measurement + "/" + username # The "standard1/" prefix is obligatory. Make sure not to start the topic name with a / character.
226
227
228 ## Let's say an alarm occurs right now. We need the Unix timestamp at which the alarm starts:
229 startTs = time.time()
230 extraTags = {"subId": "laadpaalAlfen"} # A dictionary with extra tags. Both the dictionary keys and values should be strings. If there are no extra tags, this can be ommitted from the message.
231 # If the MQTT credentials are used by a single device, it is ok to not include any extra tags; however if there are multiple devices sending data via the same credentials, including a subId is recommended, to indicate to which specific device the message corresponds.
232 fields = {"text": "My alarm message"} # A dictionary with the measurement fields. The keys should be strings. The value can be either:
233 # - a number or a string representing a float and ending with the letter 'f': in this case the value will be stored as a float
234 # - a string representing an integer and ending with the letter 'i': in this case the value will be stored as an integer -- update, we no longer allow pushing integers.
235 # - True of False: in this case the value will be stored as a bool
236 # - other strings will be stored as plain strings
237 # - other values, such are None, float('nan'), float('inf'), float('-inf') are invalid. If a values is not defined, just do not include it in the measurement, rather than using None.
238 # Please ensure that the data type corresponding to a specific field key is always the same.
239 ## Let's send the message
240 client.publish(topic, json.dumps({"time": int(startTs), "extraTags": extraTags, "fields": fields}), qos=qos)
241
242 ## Wait until the alarm is over
243 time.sleep(10 + random.random()*50)
244
245 ## Once the alarm is no longer present, send a new message to store the end time
246 endTs = time.time()
247 extraTags = {"subId": "laadpaalAlfen"}
248 fields = {"endTime": endTs} ## Feel free to include the text field as well, if the contents of the alarm text should be updated
249 client.publish(topic, json.dumps({"time": int(startTs), "extraTags": extraTags, "fields": fields}), qos=qos) ## NOTE THAT THE TIME OF THE MESSAGE SHOULD CORRESPOND TO THE START TIME OF THE ALARM!!
250
251 ## Now nothing has to be done until a new alarm occurs...
252 time.sleep(10 + random.random()*50)
253
254 client.loop_stop()
255 {{/code}}
256
257
258
259
260
261
262
263 = **Common issues and FAQs:** =
264
265 * The topic is missing or incorrectly formulated. It should be in the form:   
266
267 {{code language="none"}}topic = "standard1/" + retention_policy + "/" + measurement + "/" + username{{/code}}
268
269
270 * Is it possible to send measurements in multiple messages instead of one?
271
272 Yes, measurements can be sent message by message. For example:
273
274 {"time":1669042670,"extraTags":{"subId":"W1"},"fields":{"absVolume_m3":0, “temperature_degC”:20}}
275
276 would become:
277
278 {"time":1669042670,"extraTags":{"subId":"W1"},"fields":{"absVolume_m3":0}}
279
280 {"time":1669042670,"extraTags":{"subId":"W1"},"fields":{“temperature_degC”:20}}
281
282
283 * How can I verify that the data is being received correctly?
284
285 This can be done via MQTT Explorer.
286
287
288 * Which retention policy should I choose?
289
290 The retention policy should be chosen based on the frequency of data transmission and the corresponding storage period. More information about retention policies and their relationship to data frequency can be found in the introduction. More information about the storage period can be found here: [[https:~~/~~/wiki.eniris.be/wiki/publicinformation/view/Eniris%20Insights/Insights%20EMS/Retention%20Policies/>>url:https://wiki.eniris.be/wiki/publicinformation/view/Eniris%20Insights/Insights%20EMS/Retention%20Policies/]]
291
292
293 * If the topic is standard1/$rp/$mea/$user, how is the device associated with the data determined? Or should $mea always be unique?
294
295 The device can be uniquely defined through the extraTags. $mea is not chosen uniquely but according to the type of data: [[https:~~/~~/wiki.eniris.be/wiki/publicinformation/view/Metrics%20for%20solar%20%26%20batteries/>>url:https://wiki.eniris.be/wiki/publicinformation/view/Metrics%20for%20solar%20%26%20batteries/]]
296
297
298 * How can I verify the timestamp?
299
300 Go to [[https:~~/~~/www.unixtimestamp.com/>>url:https://www.unixtimestamp.com/]] and enter the timestamp. Check that the date is correct and that the format is "Seconds".
301
302 [[image:1677761859720-123.png]]

Applications

(c) Eniris, 2024