diff --git a/include/mqtt.h b/include/mqtt.h index c667e4f..1944cad 100644 --- a/include/mqtt.h +++ b/include/mqtt.h @@ -26,6 +26,7 @@ namespace mqtt { void publishSensors(uint16_t mask); void publishConfiguration(); + void publishStatusMsg(const char* statusMessage); void mqttLoop(void* pvParameters); diff --git a/src/mqtt.cpp b/src/mqtt.cpp index 3819fe6..332f3f1 100644 --- a/src/mqtt.cpp +++ b/src/mqtt.cpp @@ -24,10 +24,12 @@ namespace mqtt { struct MqttMessage { uint8_t cmd; uint16_t mask; + const char* statusMessage; }; const uint8_t X_CMD_PUBLISH_SENSORS = bit(0); const uint8_t X_CMD_PUBLISH_CONFIGURATION = bit(1); + const uint8_t X_CMD_PUBLISH_STATUS_MSG = bit(2); TaskHandle_t mqttTask; QueueHandle_t mqttQueue; @@ -45,6 +47,9 @@ namespace mqtt { getSPS30StatusCallback_t getSPS30StatusCallback; uint32_t lastReconnectAttempt = 0; + uint16_t connectionAttempts = 0; + + StaticJsonDocument doc; void publishSensors(uint16_t mask) { if (!WiFi.isConnected() || !mqtt_client->connected()) return; @@ -60,26 +65,26 @@ namespace mqtt { sprintf(topic, "%s/%u/up/sensors", config.mqttTopic, config.deviceId); char buf[8]; - StaticJsonDocument<512> json; - if (mask & M_CO2) json["co2"] = model->getCo2(); + doc.clear(); + if (mask & M_CO2) doc["co2"] = model->getCo2(); if (mask & M_TEMPERATURE) { sprintf(buf, "%.1f", model->getTemperature()); - json["temperature"] = buf; + doc["temperature"] = buf; } if (mask & M_HUMIDITY) { sprintf(buf, "%.1f", model->getHumidity()); - json["humidity"] = buf; + doc["humidity"] = buf; } - if (mask & M_PRESSURE) json["pressure"] = model->getPressure(); - if (mask & M_IAQ) json["iaq"] = model->getIAQ(); - if (mask & M_PM0_5) json["pm0.5"] = model->getPM0_5(); - if (mask & M_PM1_0) json["pm1"] = model->getPM1(); - if (mask & M_PM2_5) json["pm2.5"] = model->getPM2_5(); - if (mask & M_PM4) json["pm4"] = model->getPM4(); - if (mask & M_PM10) json["pm10"] = model->getPM10(); + if (mask & M_PRESSURE) doc["pressure"] = model->getPressure(); + if (mask & M_IAQ) doc["iaq"] = model->getIAQ(); + if (mask & M_PM0_5) doc["pm0.5"] = model->getPM0_5(); + if (mask & M_PM1_0) doc["pm1"] = model->getPM1(); + if (mask & M_PM2_5) doc["pm2.5"] = model->getPM2_5(); + if (mask & M_PM4) doc["pm4"] = model->getPM4(); + if (mask & M_PM10) doc["pm10"] = model->getPM10(); // Serialize JSON to file - if (serializeJson(json, msg) == 0) { + if (serializeJson(doc, msg) == 0) { ESP_LOGW(TAG, "Failed to serialise payload"); return; } @@ -88,76 +93,99 @@ namespace mqtt { return; } ESP_LOGD(TAG, "Publishing sensor values: %s:%s", topic, msg); - if (!mqtt_client->publish(topic, msg)) ESP_LOGE(TAG, "publish failed!"); + if (!mqtt_client->publish(topic, msg)) ESP_LOGE(TAG, "publish sensors failed!"); } void publishConfiguration() { MqttMessage msg; msg.cmd = X_CMD_PUBLISH_CONFIGURATION; msg.mask = 0; + msg.statusMessage = nullptr; if (mqttQueue) xQueueSendToBack(mqttQueue, (void*)&msg, pdMS_TO_TICKS(100)); } void publishConfigurationInternal() { char buf[384]; char msg[CONFIG_SIZE]; - StaticJsonDocument json; - json["appVersion"] = APP_VERSION; - json["altitude"] = config.altitude; - json["co2YellowThreshold"] = config.co2YellowThreshold; - json["co2RedThreshold"] = config.co2RedThreshold; - json["co2DarkRedThreshold"] = config.co2DarkRedThreshold; - json["iaqYellowThreshold"] = config.iaqYellowThreshold; - json["iaqRedThreshold"] = config.iaqRedThreshold; - json["iaqDarkRedThreshold"] = config.iaqDarkRedThreshold; - json["brightness"] = config.brightness; + doc.clear(); + doc["appVersion"] = APP_VERSION; + doc["altitude"] = config.altitude; + doc["co2YellowThreshold"] = config.co2YellowThreshold; + doc["co2RedThreshold"] = config.co2RedThreshold; + doc["co2DarkRedThreshold"] = config.co2DarkRedThreshold; + doc["iaqYellowThreshold"] = config.iaqYellowThreshold; + doc["iaqRedThreshold"] = config.iaqRedThreshold; + doc["iaqDarkRedThreshold"] = config.iaqDarkRedThreshold; + doc["brightness"] = config.brightness; sprintf(buf, "%s", WifiManager::getMac().c_str()); - json["mac"] = buf; + doc["mac"] = buf; sprintf(buf, "%s", WiFi.localIP().toString().c_str()); - json["ip"] = buf; + doc["ip"] = buf; if (I2C::scd30Present()) - json["scd30"] = true; + doc["scd30"] = true; if (I2C::scd40Present()) - json["scd40"] = true; + doc["scd40"] = true; if (I2C::bme680Present()) - json["bme680"] = true; + doc["bme680"] = true; if (I2C::lcdPresent()) - json["lcd"] = true; + doc["lcd"] = true; if (I2C::sps30Present()) { - json["sps30"] = true; - json["sps30AutoCleanInt"] = getSPS30AutoCleanIntervalCallback(); - json["sps30Status"] = getSPS30StatusCallback(); + doc["sps30"] = true; + doc["sps30AutoCleanInt"] = getSPS30AutoCleanIntervalCallback(); + doc["sps30Status"] = getSPS30StatusCallback(); } - json["ssd1306Rows"] = config.ssd1306Rows; - json["greenLed"] = config.greenLed; - json["yellowLed"] = config.yellowLed; - json["redLed"] = config.redLed; - json["neopixelData"] = config.neopixelData; - json["neopixelNumber"] = config.neopixelNumber; - json["featherMatrixData"] = config.featherMatrixData; - json["featherMatrixClock"] = config.featherMatrixClock; - json["hub75R1"] = config.hub75R1; - json["hub75G1"] = config.hub75G1; - json["hub75B1"] = config.hub75B1; - json["hub75R2"] = config.hub75R2; - json["hub75G2"] = config.hub75G2; - json["hub75B2"] = config.hub75B2; - json["hub75ChA"] = config.hub75ChA; - json["hub75ChB"] = config.hub75ChB; - json["hub75ChC"] = config.hub75ChC; - json["hub75ChD"] = config.hub75ChD; - json["hub75Clk"] = config.hub75Clk; - json["hub75Lat"] = config.hub75Lat; - json["hub75Oe"] = config.hub75Oe; + doc["ssd1306Rows"] = config.ssd1306Rows; + doc["greenLed"] = config.greenLed; + doc["yellowLed"] = config.yellowLed; + doc["redLed"] = config.redLed; + doc["neopixelData"] = config.neopixelData; + doc["neopixelNumber"] = config.neopixelNumber; + doc["featherMatrixData"] = config.featherMatrixData; + doc["featherMatrixClock"] = config.featherMatrixClock; + doc["hub75R1"] = config.hub75R1; + doc["hub75G1"] = config.hub75G1; + doc["hub75B1"] = config.hub75B1; + doc["hub75R2"] = config.hub75R2; + doc["hub75G2"] = config.hub75G2; + doc["hub75B2"] = config.hub75B2; + doc["hub75ChA"] = config.hub75ChA; + doc["hub75ChB"] = config.hub75ChB; + doc["hub75ChC"] = config.hub75ChC; + doc["hub75ChD"] = config.hub75ChD; + doc["hub75Clk"] = config.hub75Clk; + doc["hub75Lat"] = config.hub75Lat; + doc["hub75Oe"] = config.hub75Oe; sprintf(buf, "%.1f", getTemperatureOffsetCallback()); - json["tempOffset"] = buf; - if (serializeJson(json, msg) == 0) { + doc["tempOffset"] = buf; + if (serializeJson(doc, msg) == 0) { ESP_LOGW(TAG, "Failed to serialise payload"); return; } sprintf(buf, "%s/%u/up/config", config.mqttTopic, config.deviceId); ESP_LOGI(TAG, "Publishing configuration: %s:%s", buf, msg); - if (!mqtt_client->publish(buf, msg)) ESP_LOGE(TAG, "publish failed!"); + if (!mqtt_client->publish(buf, msg)) ESP_LOGE(TAG, "publish configuration failed!"); + } + + void publishStatusMsg(const char* statusMessage) { + MqttMessage msg; + msg.cmd = X_CMD_PUBLISH_STATUS_MSG; + msg.mask = 0; + msg.statusMessage = statusMessage; + if (mqttQueue) xQueueSendToBack(mqttQueue, (void*)&msg, pdMS_TO_TICKS(100)); + } + + void publishStatusMsgInternal(const char* statusMessage) { + if (strlen(statusMessage) > 200) return; + char buf[256]; + sprintf(buf, "%s/%u/up/status", config.mqttTopic, config.deviceId); + char msg[256]; + doc.clear(); + doc["msg"] = statusMessage; + if (serializeJson(doc, msg) == 0) { + ESP_LOGW(TAG, "Failed to serialise payload"); + return; + } + if (!mqtt_client->publish(buf, msg)) ESP_LOGE(TAG, "publish status msg failed!"); } void callback(char* topic, byte* payload, unsigned int length) { @@ -201,7 +229,7 @@ namespace mqtt { } else if (strncmp(buf, "getConfig", strlen(buf)) == 0) { publishConfiguration(); } else if (strncmp(buf, "setConfig", strlen(buf)) == 0) { - StaticJsonDocument doc; + doc.clear(); DeserializationError error = deserializeJson(doc, msg); if (error) { ESP_LOGW(TAG, "Failed to parse message: %s", error.f_str()); @@ -239,7 +267,7 @@ namespace mqtt { if (doc.containsKey("hub75Oe")) { config.hub75Oe = doc["hub75Oe"].as(); rebootRequired = true; } if (saveConfiguration(config) && rebootRequired) { sprintf(buf, "%s/%u/up/status", config.mqttTopic, config.deviceId); - mqtt_client->publish(buf, "{\"msg\":\"configuration updated, rebooting shortly\"}"); + publishStatusMsgInternal("configuration updated, rebooting shortly"); delay(1000); esp_restart(); } @@ -263,6 +291,7 @@ namespace mqtt { lastReconnectAttempt = millis(); if (!mqtt_client->connected()) { ESP_LOGD(TAG, "Attempting MQTT connection..."); + connectionAttempts++; if (mqtt_client->connect(buf, config.mqttUsername, config.mqttPassword)) { ESP_LOGD(TAG, "MQTT connected"); sprintf(buf, "%s/%u/down/#", config.mqttTopic, config.deviceId); @@ -270,7 +299,18 @@ namespace mqtt { sprintf(buf, "%s/down/#", config.mqttTopic); mqtt_client->subscribe(buf); sprintf(buf, "%s/%u/up/status", config.mqttTopic, config.deviceId); - mqtt_client->publish(buf, "{\"online\":true}"); + char msg[256]; + doc.clear(); + doc["online"] = true; + doc["connectionAttempts"] = connectionAttempts; + if (serializeJson(doc, msg) == 0) { + ESP_LOGW(TAG, "Failed to serialise payload"); + return; + } + if (mqtt_client->publish(buf, msg)) + connectionAttempts = 0; + else + ESP_LOGE(TAG, "publish connect msg failed!"); } else { ESP_LOGW(TAG, "MQTT connection failed, rc=%i", mqtt_client->state()); vTaskDelay(pdMS_TO_TICKS(1000)); @@ -288,7 +328,7 @@ namespace mqtt { cleanSPS30Callback_t _cleanSPS30Callback, getSPS30StatusCallback_t _getSPS30StatusCallback ) { - mqttQueue = xQueueCreate(2, sizeof(struct MqttMessage*)); + mqttQueue = xQueueCreate(2, sizeof(struct MqttMessage)); if (mqttQueue == NULL) { ESP_LOGE(TAG, "Queue creation failed!"); } @@ -347,6 +387,8 @@ namespace mqtt { publishConfigurationInternal(); } else if (msg.cmd == X_CMD_PUBLISH_SENSORS) { publishSensorsInternal(msg.mask); + } else if (msg.cmd == X_CMD_PUBLISH_STATUS_MSG) { + publishStatusMsgInternal(msg.statusMessage); } } if (!mqtt_client->connected()) { diff --git a/src/ota.cpp b/src/ota.cpp index c7c1857..a56775e 100644 --- a/src/ota.cpp +++ b/src/ota.cpp @@ -1,5 +1,6 @@ #include #include +#include #include #include #include @@ -36,6 +37,7 @@ namespace OTA { if (shouldExecuteFirmwareUpdate) { ESP_LOGD(TAG, "Firmware update available"); if (preUpdateCallback) preUpdateCallback(); + mqtt::publishStatusMsg("Starting OTA update"); esp32FOTA.execOTA(); } else { ESP_LOGD(TAG, "No firmware update available"); @@ -52,9 +54,11 @@ namespace OTA { ESP_LOGD(TAG, "Beginning forced OTA"); if (preUpdateCallback) preUpdateCallback(); esp32FOTA esp32FOTA(OTA_APP, APP_VERSION, LittleFS, false, false); + mqtt::publishStatusMsg("Starting forced OTA update"); esp32FOTA.forceUpdate(forceUpdateURL, false); forceUpdateURL = ""; - ESP_LOGD(TAG, "Forced OTA done"); forceUpdateURL = ""; + ESP_LOGD(TAG, "Forced OTA done"); + forceUpdateURL = ""; } void otaLoop(void* pvParameters) {