-
Notifications
You must be signed in to change notification settings - Fork 2
/
arduino-mkr-nb-1500-cumulocity-mqtt.ino
373 lines (295 loc) · 8.53 KB
/
arduino-mkr-nb-1500-cumulocity-mqtt.ino
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
#include "arduino_secrets.h"
#include <ArduinoMqttClient.h>
#include <MKRNB.h>
const char broker[] = SECRET_BROKER;
String imei;
String deviceTenant;
String deviceUsername;
String devicePassword;
const char* c8yDownstreamDeviceCredsTopic = "s/dcr";
const char* c8yUpstreamDeviceCredsTopic = "s/ucr";
const char* c8yUpstreamPublishTopic = "s/us";
int measurementMsgFreq = 60000;
bool receivedDeviceCreds = false;
unsigned long lastMillis = 0;
NB nbAccess;
GPRS gprs;
NBSSLClient sslClient;
MqttClient mqttClient(sslClient);
NBModem modem;
/*
* Connects to network, registers and then creates device on Cumulocity
*/
void setup() {
// Wait until Serial is connected. Comment out if desired
Serial.begin(115200);
while (!Serial);
// Using IMEI for MQTT client id
getImei();
// Connect to network
connectNB();
// C8y setup
Serial.println("Registering device");
registerC8yDevice();
Serial.println("Device registered\n");
Serial.println("Creating device");
createC8yDevice();
Serial.println("Device created");
}
/*
* Connects to LTE network (if not already connected)
* Connects to MQTT broker (if not already connected)
* Sends measurement every X seconds
*/
void loop() {
// If disconnected from LTE network, reconnect
if (nbAccess.status() != NB_READY || gprs.status() != GPRS_READY) {
connectNB();
}
// If disconnected from broker, reconnect
if (!mqttClient.connected()) {
connectMQTT();
}
// Poll for new MQTT messages
mqttClient.poll();
// Publish measurement every X seconds
if (millis() - lastMillis > measurementMsgFreq) {
lastMillis = millis();
publishMeasurementMessage();
}
}
/*
* Register the device with Cumulocity by requesting the device creds
*/
void registerC8yDevice() {
// Use IMEI as client id for MQTT
mqttClient.setId(imei);
mqttClient.setUsernamePassword(SECRET_USER, SECRET_PASSWORD);
// Set the MQTT message callback
mqttClient.onMessage(onMessageReceived);
// Connect to device creds broker
Serial.print("Attempting to connect to MQTT broker: ");
Serial.print(broker);
Serial.println(" ");
while (!mqttClient.connect(broker, 8883)) {
// failed, retry
Serial.print(".");
Serial.println(mqttClient.connectError());
delay(5000);
}
Serial.println();
Serial.println("You're connected to the MQTT broker");
Serial.println();
// Subscribe to topic to receive device creds
mqttClient.subscribe(c8yDownstreamDeviceCredsTopic);
// Wait to receive device creds
int publishRegisterMsgFreq = 2000;
unsigned long lastMillisRegister = 0;
unsigned long timeOutMillis = millis() + 60000;
while (!receivedDeviceCreds) {
mqttClient.poll();
if (millis() - lastMillisRegister > publishRegisterMsgFreq) {
lastMillisRegister = millis();
publishRegisterDeviceMessage(); // Send again
} else if (millis() > timeOutMillis) { // Time out
break;
}
}
// Disconnect from broker
mqttClient.stop();
if (!receivedDeviceCreds) {
Serial.println("Timed out waiting to receive device creds.");
while (true) {
// Do nothing
}
}
}
/*
* Create device in Cumulocity
*/
void createC8yDevice() {
// Use IMEI as client id for MQTT
mqttClient.setId(imei);
Serial.print("Using username: ");
Serial.println(deviceTenant + "/" + deviceUsername);
Serial.print("and password: ");
Serial.println(devicePassword);
mqttClient.setUsernamePassword(deviceTenant + "/" + deviceUsername, devicePassword);
// Set the MQTT message callback
mqttClient.onMessage(onMessageReceived);
// Connect to device creds broker
Serial.print("Attempting to connect to MQTT broker: ");
Serial.print(broker);
Serial.println(" ");
while (!mqttClient.connect(broker, 8883)) {
// Failed, retry
Serial.print(".");
Serial.println(mqttClient.connectError());
delay(5000);
}
Serial.println();
Serial.println("You're connected to the MQTT broker");
Serial.println();
// Create device
publishCreateDeviceMessage();
}
/*
* Gets the IMEI of the modem
*/
void getImei() {
// Get IMEI
if (!modem.begin()) {
Serial.println("ERROR, no modem answer.");
}
imei = modem.getIMEI();
if (imei != NULL) {
Serial.println("Modem's IMEI: " + imei);
} else {
Serial.println("Error: Could not get IMEI");
}
}
/*
* Connect/reconnect to the LTE Network
*/
void connectNB() {
Serial.println("\nConnecting to LTE Network");
while ((nbAccess.begin() != NB_READY) ||
(gprs.attachGPRS() != GPRS_READY)) {
Serial.print(".");
delay(1000);
}
Serial.println("Connected to LTE Network");
Serial.println();
}
/*
* Connect/reconnect to broker
* Refer to ArduinoMqttClient/src/MqttClient.h for error codes
*/
void connectMQTT() {
Serial.print("Attempting to connect to MQTT broker: ");
Serial.print(broker);
Serial.println(" ");
while (!mqttClient.connect(broker, 8883)) {
// Failed, retry
Serial.print(".");
Serial.println(mqttClient.connectError());
delay(5000);
}
Serial.println();
Serial.println("You're connected to the MQTT broker");
Serial.println();
}
/*
* Pings until the device is accepted on Cumulocity
*/
void publishRegisterDeviceMessage() {
Serial.println("Publishing register device message");
mqttClient.beginMessage(c8yUpstreamDeviceCredsTopic);
mqttClient.print("");
mqttClient.endMessage();
}
/*
* Creates the device on Cumulocity
*/
void publishCreateDeviceMessage() {
Serial.println("Publishing create device message");
String createDeviceMessage = "100,";
createDeviceMessage += imei;
createDeviceMessage += ",c8y_MQTTdevice";
mqttClient.beginMessage(c8yUpstreamPublishTopic);
mqttClient.print(createDeviceMessage);
mqttClient.endMessage();
}
/*
* Publishes measurement to Cumulocity
*/
void publishMeasurementMessage() {
Serial.println("Publishing measurement message");
String newMessage = getMeasurementMessage();
mqttClient.beginMessage(c8yUpstreamPublishTopic);
mqttClient.print(newMessage);
mqttClient.endMessage();
}
/*
* Creates the measurement message
* Currently using random data, would add sensor code here
*/
String getMeasurementMessage() {
long temp = random(10,30);
String measurementMessage = "200,myCustomTemperatureMeasurement,celcius,";
measurementMessage += temp;
measurementMessage += ",C";
Serial.println(measurementMessage);
return measurementMessage;
}
/*
* Handler for receiving messages from subscribed topics
*/
void onMessageReceived(int messageSize) {
String topic = mqttClient.messageTopic();
Serial.print("Received a message on topic: ");
Serial.println(topic);
// Store message
char* message = (char*) malloc(sizeof(char) * (messageSize + 1));
int i = 0;
while (mqttClient.available()) {
message[i] = (char) mqttClient.read();
i++;
}
message[messageSize] = '\0';
Serial.println(message);
// Handle based on topic
if (topic == c8yDownstreamDeviceCredsTopic) {
handleDeviceCredsMessage(message);
} else {
Serial.print("Warning: Received message on an unhandled topic: ");
Serial.println(topic);
}
}
/*
* Handler for received device creds message
*/
void handleDeviceCredsMessage(char* message) {
// Check message id
String messageId = getValueInCsv((char *) message, 0);
if (messageId == "70") {
Serial.println("Received device creds");
// Stop requesting device creds
receivedDeviceCreds = true;
// Extract creds
deviceTenant = getValueInCsv((char *) message, 1);
deviceUsername = getValueInCsv((char *) message, 2);
devicePassword = getValueInCsv((char *) message, 3);
} else {
Serial.print("Warning: Received unhandled message id on device creds topic: ");
Serial.println(messageId);
}
}
/*
* Returns the element in the provided index of a comma-separated char array
* If the index is out of bounds (i.e. beyond the number of commas) it will return NULL
*/
char* getValueInCsv(char* message, int index) {
int found = 0;
int n = -1;
int m = 0;
int messageLength = strlen(message);
// Cycle over message, counting commas
for (int i = 0; i <= messageLength && found <= index; i++) {
if (message[i] == ',' || i == messageLength) {
found++;
m = n + 1;
n = (i == messageLength) ? i+1 : i;
}
}
if (found <= index) { // Less items than provided index
return NULL;
}
int len = n - m;
// Alloc
char* subMessage = (char*) malloc(sizeof(char) * (len + 1));
// Copy len number of chars starting from m-th index
strncpy(subMessage, &message[m], len);
subMessage[len] = '\0';
return subMessage;
}