Skip to content

Commit

Permalink
Add option to publish status message via MQTT
Browse files Browse the repository at this point in the history
  • Loading branch information
oseiler2 committed Sep 16, 2022
1 parent 35e9986 commit aeaec2a
Show file tree
Hide file tree
Showing 3 changed files with 108 additions and 61 deletions.
1 change: 1 addition & 0 deletions include/mqtt.h
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ namespace mqtt {

void publishSensors(uint16_t mask);
void publishConfiguration();
void publishStatusMsg(const char* statusMessage);

void mqttLoop(void* pvParameters);

Expand Down
162 changes: 102 additions & 60 deletions src/mqtt.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -24,10 +24,12 @@ namespace mqtt {
struct MqttMessage {
uint8_t cmd;
uint16_t mask;
const char* statusMessage;
};

const uint8_t X_CMD_PUBLISH_SENSORS = bit(0);
const uint8_t X_CMD_PUBLISH_CONFIGURATION = bit(1);
const uint8_t X_CMD_PUBLISH_STATUS_MSG = bit(2);

TaskHandle_t mqttTask;
QueueHandle_t mqttQueue;
Expand All @@ -45,6 +47,9 @@ namespace mqtt {
getSPS30StatusCallback_t getSPS30StatusCallback;

uint32_t lastReconnectAttempt = 0;
uint16_t connectionAttempts = 0;

StaticJsonDocument<CONFIG_SIZE> doc;

void publishSensors(uint16_t mask) {
if (!WiFi.isConnected() || !mqtt_client->connected()) return;
Expand All @@ -60,26 +65,26 @@ namespace mqtt {
sprintf(topic, "%s/%u/up/sensors", config.mqttTopic, config.deviceId);

char buf[8];
StaticJsonDocument<512> json;
if (mask & M_CO2) json["co2"] = model->getCo2();
doc.clear();
if (mask & M_CO2) doc["co2"] = model->getCo2();
if (mask & M_TEMPERATURE) {
sprintf(buf, "%.1f", model->getTemperature());
json["temperature"] = buf;
doc["temperature"] = buf;
}
if (mask & M_HUMIDITY) {
sprintf(buf, "%.1f", model->getHumidity());
json["humidity"] = buf;
doc["humidity"] = buf;
}
if (mask & M_PRESSURE) json["pressure"] = model->getPressure();
if (mask & M_IAQ) json["iaq"] = model->getIAQ();
if (mask & M_PM0_5) json["pm0.5"] = model->getPM0_5();
if (mask & M_PM1_0) json["pm1"] = model->getPM1();
if (mask & M_PM2_5) json["pm2.5"] = model->getPM2_5();
if (mask & M_PM4) json["pm4"] = model->getPM4();
if (mask & M_PM10) json["pm10"] = model->getPM10();
if (mask & M_PRESSURE) doc["pressure"] = model->getPressure();
if (mask & M_IAQ) doc["iaq"] = model->getIAQ();
if (mask & M_PM0_5) doc["pm0.5"] = model->getPM0_5();
if (mask & M_PM1_0) doc["pm1"] = model->getPM1();
if (mask & M_PM2_5) doc["pm2.5"] = model->getPM2_5();
if (mask & M_PM4) doc["pm4"] = model->getPM4();
if (mask & M_PM10) doc["pm10"] = model->getPM10();

// Serialize JSON to file
if (serializeJson(json, msg) == 0) {
if (serializeJson(doc, msg) == 0) {
ESP_LOGW(TAG, "Failed to serialise payload");
return;
}
Expand All @@ -88,76 +93,99 @@ namespace mqtt {
return;
}
ESP_LOGD(TAG, "Publishing sensor values: %s:%s", topic, msg);
if (!mqtt_client->publish(topic, msg)) ESP_LOGE(TAG, "publish failed!");
if (!mqtt_client->publish(topic, msg)) ESP_LOGE(TAG, "publish sensors failed!");
}

void publishConfiguration() {
MqttMessage msg;
msg.cmd = X_CMD_PUBLISH_CONFIGURATION;
msg.mask = 0;
msg.statusMessage = nullptr;
if (mqttQueue) xQueueSendToBack(mqttQueue, (void*)&msg, pdMS_TO_TICKS(100));
}

void publishConfigurationInternal() {
char buf[384];
char msg[CONFIG_SIZE];
StaticJsonDocument<CONFIG_SIZE> json;
json["appVersion"] = APP_VERSION;
json["altitude"] = config.altitude;
json["co2YellowThreshold"] = config.co2YellowThreshold;
json["co2RedThreshold"] = config.co2RedThreshold;
json["co2DarkRedThreshold"] = config.co2DarkRedThreshold;
json["iaqYellowThreshold"] = config.iaqYellowThreshold;
json["iaqRedThreshold"] = config.iaqRedThreshold;
json["iaqDarkRedThreshold"] = config.iaqDarkRedThreshold;
json["brightness"] = config.brightness;
doc.clear();
doc["appVersion"] = APP_VERSION;
doc["altitude"] = config.altitude;
doc["co2YellowThreshold"] = config.co2YellowThreshold;
doc["co2RedThreshold"] = config.co2RedThreshold;
doc["co2DarkRedThreshold"] = config.co2DarkRedThreshold;
doc["iaqYellowThreshold"] = config.iaqYellowThreshold;
doc["iaqRedThreshold"] = config.iaqRedThreshold;
doc["iaqDarkRedThreshold"] = config.iaqDarkRedThreshold;
doc["brightness"] = config.brightness;
sprintf(buf, "%s", WifiManager::getMac().c_str());
json["mac"] = buf;
doc["mac"] = buf;
sprintf(buf, "%s", WiFi.localIP().toString().c_str());
json["ip"] = buf;
doc["ip"] = buf;
if (I2C::scd30Present())
json["scd30"] = true;
doc["scd30"] = true;
if (I2C::scd40Present())
json["scd40"] = true;
doc["scd40"] = true;
if (I2C::bme680Present())
json["bme680"] = true;
doc["bme680"] = true;
if (I2C::lcdPresent())
json["lcd"] = true;
doc["lcd"] = true;
if (I2C::sps30Present()) {
json["sps30"] = true;
json["sps30AutoCleanInt"] = getSPS30AutoCleanIntervalCallback();
json["sps30Status"] = getSPS30StatusCallback();
doc["sps30"] = true;
doc["sps30AutoCleanInt"] = getSPS30AutoCleanIntervalCallback();
doc["sps30Status"] = getSPS30StatusCallback();
}
json["ssd1306Rows"] = config.ssd1306Rows;
json["greenLed"] = config.greenLed;
json["yellowLed"] = config.yellowLed;
json["redLed"] = config.redLed;
json["neopixelData"] = config.neopixelData;
json["neopixelNumber"] = config.neopixelNumber;
json["featherMatrixData"] = config.featherMatrixData;
json["featherMatrixClock"] = config.featherMatrixClock;
json["hub75R1"] = config.hub75R1;
json["hub75G1"] = config.hub75G1;
json["hub75B1"] = config.hub75B1;
json["hub75R2"] = config.hub75R2;
json["hub75G2"] = config.hub75G2;
json["hub75B2"] = config.hub75B2;
json["hub75ChA"] = config.hub75ChA;
json["hub75ChB"] = config.hub75ChB;
json["hub75ChC"] = config.hub75ChC;
json["hub75ChD"] = config.hub75ChD;
json["hub75Clk"] = config.hub75Clk;
json["hub75Lat"] = config.hub75Lat;
json["hub75Oe"] = config.hub75Oe;
doc["ssd1306Rows"] = config.ssd1306Rows;
doc["greenLed"] = config.greenLed;
doc["yellowLed"] = config.yellowLed;
doc["redLed"] = config.redLed;
doc["neopixelData"] = config.neopixelData;
doc["neopixelNumber"] = config.neopixelNumber;
doc["featherMatrixData"] = config.featherMatrixData;
doc["featherMatrixClock"] = config.featherMatrixClock;
doc["hub75R1"] = config.hub75R1;
doc["hub75G1"] = config.hub75G1;
doc["hub75B1"] = config.hub75B1;
doc["hub75R2"] = config.hub75R2;
doc["hub75G2"] = config.hub75G2;
doc["hub75B2"] = config.hub75B2;
doc["hub75ChA"] = config.hub75ChA;
doc["hub75ChB"] = config.hub75ChB;
doc["hub75ChC"] = config.hub75ChC;
doc["hub75ChD"] = config.hub75ChD;
doc["hub75Clk"] = config.hub75Clk;
doc["hub75Lat"] = config.hub75Lat;
doc["hub75Oe"] = config.hub75Oe;
sprintf(buf, "%.1f", getTemperatureOffsetCallback());
json["tempOffset"] = buf;
if (serializeJson(json, msg) == 0) {
doc["tempOffset"] = buf;
if (serializeJson(doc, msg) == 0) {
ESP_LOGW(TAG, "Failed to serialise payload");
return;
}
sprintf(buf, "%s/%u/up/config", config.mqttTopic, config.deviceId);
ESP_LOGI(TAG, "Publishing configuration: %s:%s", buf, msg);
if (!mqtt_client->publish(buf, msg)) ESP_LOGE(TAG, "publish failed!");
if (!mqtt_client->publish(buf, msg)) ESP_LOGE(TAG, "publish configuration failed!");
}

void publishStatusMsg(const char* statusMessage) {
MqttMessage msg;
msg.cmd = X_CMD_PUBLISH_STATUS_MSG;
msg.mask = 0;
msg.statusMessage = statusMessage;
if (mqttQueue) xQueueSendToBack(mqttQueue, (void*)&msg, pdMS_TO_TICKS(100));
}

void publishStatusMsgInternal(const char* statusMessage) {
if (strlen(statusMessage) > 200) return;
char buf[256];
sprintf(buf, "%s/%u/up/status", config.mqttTopic, config.deviceId);
char msg[256];
doc.clear();
doc["msg"] = statusMessage;
if (serializeJson(doc, msg) == 0) {
ESP_LOGW(TAG, "Failed to serialise payload");
return;
}
if (!mqtt_client->publish(buf, msg)) ESP_LOGE(TAG, "publish status msg failed!");
}

void callback(char* topic, byte* payload, unsigned int length) {
Expand Down Expand Up @@ -201,7 +229,7 @@ namespace mqtt {
} else if (strncmp(buf, "getConfig", strlen(buf)) == 0) {
publishConfiguration();
} else if (strncmp(buf, "setConfig", strlen(buf)) == 0) {
StaticJsonDocument<CONFIG_SIZE> doc;
doc.clear();
DeserializationError error = deserializeJson(doc, msg);
if (error) {
ESP_LOGW(TAG, "Failed to parse message: %s", error.f_str());
Expand Down Expand Up @@ -239,7 +267,7 @@ namespace mqtt {
if (doc.containsKey("hub75Oe")) { config.hub75Oe = doc["hub75Oe"].as<uint8_t>(); rebootRequired = true; }
if (saveConfiguration(config) && rebootRequired) {
sprintf(buf, "%s/%u/up/status", config.mqttTopic, config.deviceId);
mqtt_client->publish(buf, "{\"msg\":\"configuration updated, rebooting shortly\"}");
publishStatusMsgInternal("configuration updated, rebooting shortly");
delay(1000);
esp_restart();
}
Expand All @@ -263,14 +291,26 @@ namespace mqtt {
lastReconnectAttempt = millis();
if (!mqtt_client->connected()) {
ESP_LOGD(TAG, "Attempting MQTT connection...");
connectionAttempts++;
if (mqtt_client->connect(buf, config.mqttUsername, config.mqttPassword)) {
ESP_LOGD(TAG, "MQTT connected");
sprintf(buf, "%s/%u/down/#", config.mqttTopic, config.deviceId);
mqtt_client->subscribe(buf);
sprintf(buf, "%s/down/#", config.mqttTopic);
mqtt_client->subscribe(buf);
sprintf(buf, "%s/%u/up/status", config.mqttTopic, config.deviceId);
mqtt_client->publish(buf, "{\"online\":true}");
char msg[256];
doc.clear();
doc["online"] = true;
doc["connectionAttempts"] = connectionAttempts;
if (serializeJson(doc, msg) == 0) {
ESP_LOGW(TAG, "Failed to serialise payload");
return;
}
if (mqtt_client->publish(buf, msg))
connectionAttempts = 0;
else
ESP_LOGE(TAG, "publish connect msg failed!");
} else {
ESP_LOGW(TAG, "MQTT connection failed, rc=%i", mqtt_client->state());
vTaskDelay(pdMS_TO_TICKS(1000));
Expand All @@ -288,7 +328,7 @@ namespace mqtt {
cleanSPS30Callback_t _cleanSPS30Callback,
getSPS30StatusCallback_t _getSPS30StatusCallback
) {
mqttQueue = xQueueCreate(2, sizeof(struct MqttMessage*));
mqttQueue = xQueueCreate(2, sizeof(struct MqttMessage));
if (mqttQueue == NULL) {
ESP_LOGE(TAG, "Queue creation failed!");
}
Expand Down Expand Up @@ -347,6 +387,8 @@ namespace mqtt {
publishConfigurationInternal();
} else if (msg.cmd == X_CMD_PUBLISH_SENSORS) {
publishSensorsInternal(msg.mask);
} else if (msg.cmd == X_CMD_PUBLISH_STATUS_MSG) {
publishStatusMsgInternal(msg.statusMessage);
}
}
if (!mqtt_client->connected()) {
Expand Down
6 changes: 5 additions & 1 deletion src/ota.cpp
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
#include <Arduino.h>
#include <config.h>
#include <mqtt.h>
#include <ota.h>
#include <esp32fota.h>
#include <Ticker.h>
Expand Down Expand Up @@ -36,6 +37,7 @@ namespace OTA {
if (shouldExecuteFirmwareUpdate) {
ESP_LOGD(TAG, "Firmware update available");
if (preUpdateCallback) preUpdateCallback();
mqtt::publishStatusMsg("Starting OTA update");
esp32FOTA.execOTA();
} else {
ESP_LOGD(TAG, "No firmware update available");
Expand All @@ -52,9 +54,11 @@ namespace OTA {
ESP_LOGD(TAG, "Beginning forced OTA");
if (preUpdateCallback) preUpdateCallback();
esp32FOTA esp32FOTA(OTA_APP, APP_VERSION, LittleFS, false, false);
mqtt::publishStatusMsg("Starting forced OTA update");
esp32FOTA.forceUpdate(forceUpdateURL, false);
forceUpdateURL = "";
ESP_LOGD(TAG, "Forced OTA done"); forceUpdateURL = "";
ESP_LOGD(TAG, "Forced OTA done");
forceUpdateURL = "";
}

void otaLoop(void* pvParameters) {
Expand Down

0 comments on commit aeaec2a

Please sign in to comment.