Skip to content

Commit

Permalink
Publish sensor data to MQTT (#15)
Browse files Browse the repository at this point in the history
* Add MQTT/Home Assistant integration
  • Loading branch information
stefanthoss authored Nov 10, 2022
1 parent 899a70b commit 7ab1fb7
Show file tree
Hide file tree
Showing 6 changed files with 177 additions and 46 deletions.
3 changes: 1 addition & 2 deletions .github/workflows/docker-publish.yml
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@ jobs:
# Login against a Docker registry except on PR
# https://github.com/docker/login-action
- name: Log into registry ${{ env.REGISTRY }}
if: github.event_name != 'pull_request'
uses: docker/login-action@28218f9b04b4f3f62068d7b6ce6ca5b26e35336c
with:
registry: ${{ env.REGISTRY }}
Expand All @@ -48,6 +47,6 @@ jobs:
uses: docker/build-push-action@ad44023a93711e3deb337508980b4b5e9bcdc5dc
with:
context: .
push: ${{ github.event_name != 'pull_request' }}
push: true
tags: ${{ steps.meta.outputs.tags }}
labels: ${{ steps.meta.outputs.labels }}
68 changes: 41 additions & 27 deletions README.md
Original file line number Diff line number Diff line change
@@ -1,51 +1,72 @@
# Air Quality Bridge

This Flask-based server accepts particulate matter/temperature/humidity data from a [sensor.community sensor](https://sensor.community/en/sensors/airrohr) and writes it to a InfluxDB 2.0 server. It also calculates the Air Quality Index (AQI) with [hrbonz/python-aqi](https://github.com/hrbonz/python-aqi).
This Flask-based server accepts air quality/temperature/humidity data from a [sensor.community sensor](https://sensor.community/en/sensors/airrohr) and forwards it to an InfluxDB 2.0 server or a Home Assistant instance through an MQTT broker. It also calculates the Air Quality Index (AQI) with [hrbonz/python-aqi](https://github.com/hrbonz/python-aqi).

## Requirements
The bridge supports receiving data from multiple air quality sensors at the same time and is configured using environment variables.

This project uses Python 3. Install the required dependencies with
## Deployment

```shell
pip install -r requirements.txt
```
The Docker image from the repository gets automatically build and published to the GitHub Container Registry as [ghcr.io/stefanthoss/air-quality-bridge](https://github.com/stefanthoss/air-quality-bridge/pkgs/container/air-quality-bridge).

The best way to deploy the application is with Docker Compose. Download the `docker-compose.yml` file, change the environment variables according to your local setup, and start the Docker service with `docker-compose up`:

If your InfluxDB server doesn't use a trusted SSL certificate, you'll have to add the environment variable `INFLUXDB_V2_VERIFY_SSL=False`.

## InfluxDB Configuration

Set `ENABLE_INFLUXDB=true` to enable writing data to InfluxDB. The InfluxDB connection is configured using the environment variables `INFLUXDB_V2_URL`, `INFLUXDB_V2_TOKEN`, and `INFLUXDB_V2_ORG`. Other configuration parameters for InfluxDB are documented in the [influxdb-client-python README](https://github.com/influxdata/influxdb-client-python#via-environment-properties). Use `INFLUXDB_BUCKET` to configure the bucket (default: `sensors`) and `INFLUXDB_MEASUREMENT` to configure the measurement name (default: `air_quality`).

## Home Assistant / MQTT Integration

Set `ENABLE_MQTT=true` to enable writing data to MQTT and exposing the data as a sensor in Home Assistant. The MQTT broker is configured using the environment variables `MQTT_BROKER_URL`, `MQTT_BROKER_PORT`, `MQTT_USERNAME`, `MQTT_PASSWORD`, and `MQTT_CLIENT_ID`. Other configuration parameters for MQTT are documented in the [flask-mqtt README](https://flask-mqtt.readthedocs.io/en/latest/configuration.html#configuration-keys).

In Home Assistant, add the [MQTT integration](https://www.home-assistant.io/integrations/mqtt/) and enable *Enable newly added entities* in the integration's system options. Once the Air Quality Bridge is running and receives data, it will publish data to MQTT which Home Assistant will use to create devices and entities for the air quality sensor through MQTT's auto discovery feature.

## Sensor Configuration

In the *Configuration* / *APIs* section of your air quality sensor:

## Configuration
* Enable *Send data to custom API*.
* Set *Server* to the IP of your Docker deployment.
* Set *Pathth* to `/upload_measurement`.
* Set *Port* to 5000.

The InfluxDB connection is configured via the environment variables `INFLUXDB_V2_URL`, `INFLUXDB_V2_TOKEN`, and `INFLUXDB_V2_ORG`. Other configuration parameters for InfluxDB are documented in the [influxdb-client-python README](https://github.com/influxdata/influxdb-client-python#via-environment-properties).
## Development / Non-Docker Usage

Use `INFLUXDB_BUCKET` to configure the bucket (default: `sensors`) and `INFLUXDB_MEASUREMENT` to configure the measurement name (default: `air_quality`).
This project uses Python 3. Install the required dependencies with

## Development
```shell
pip install -r requirements.txt
```

Launch the app locally in development mode and access it at <http://localhost:5000>:

```shell
export ENABLE_INFLUXDB=true
export INFLUXDB_V2_URL="https://localhost:8086"
export INFLUXDB_V2_TOKEN="my-token"
export INFLUXDB_V2_ORG="my-org"
export INFLUXDB_BUCKET="my-bucket"
export INFLUXDB_MEASUREMENT="air_quality"
export ENABLE_INFLUXDB=true
export MQTT_BROKER_URL="localhost"
export MQTT_BROKER_PORT=1883
export MQTT_USERNAME="my-username"
export MQTT_PASSWORD="my-password"
python main.py
```
Add the environment variable `FLASK_DEBUG=1` for further debugging.

If everything is configured correctly, executing `curl -X GET http://localhost:5000/info` should return a JSON object that indicates the InfluxDB client is ready.
If everything is configured correctly, executing `curl -X GET http://localhost:5000/info` should return a JSON object that indicates the application is ready. This endpoint is also used for the Docker health check.

If you want to build the Docker image locally, execute:

```shell
docker build -t air-quality-bridge:devel .
```

## Deployment

The Docker image from the repository gets automatically build and published to the GitHub Container Registry as [ghcr.io/stefanthoss/air-quality-bridge](https://github.com/stefanthoss/air-quality-bridge/pkgs/container/air-quality-bridge).

The best way to deploy the application is with Docker Compose. Download the `docker-compose.yml` file, change the environment variables according to your local setup, and start the Docker service with `docker-compose up`:

If your InfluxDB server doesn't use a trusted SSL certificate, you'll have to add the environment variable `INFLUXDB_V2_VERIFY_SSL=False`.

## Sample Payload
## Sample Sensor Payload

From the sensor firmware version `NRZ-2020-129`:

Expand Down Expand Up @@ -94,11 +115,4 @@ From the sensor firmware version `NRZ-2020-129`:
}
```

## Sensor Configuration

In the *Configuration* section of your air quality sensor:

* Set *Server* to the IP of your deployment.
* Activate *Send data to custom API* and *HTTPS*.
* Set */path* to `/upload_measurement`.
* Set *Port* to 5443.
Use `curl -X POST -H "Content-Type: application/json" -d @test/measurement.json http://127.0.0.1:5000/upload_measurement` to use this test file locally.
7 changes: 7 additions & 0 deletions docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -5,13 +5,20 @@ services:
image: ghcr.io/stefanthoss/air-quality-bridge:latest
container_name: air-quality-bridge
restart: unless-stopped
stop_signal: SIGINT
ports:
- "5000:5000"
environment:
- ENABLE_INFLUXDB=false
- INFLUXDB_V2_URL=https://localhost:8086
- INFLUXDB_V2_TOKEN=my-token
- INFLUXDB_V2_ORG=my-org
- INFLUXDB_BUCKET=my-bucket
- INFLUXDB_MEASUREMENT=air_quality
- ENABLE_MQTT=false
- MQTT_BROKER_URL=my-mqtt-broker
- MQTT_BROKER_PORT=1883
- MQTT_USERNAME=my-username
- MQTT_PASSWORD=my-password
healthcheck:
test: ["CMD", "curl", "-f", "http://localhost:5000/info"]
142 changes: 125 additions & 17 deletions main.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,14 @@
#!/usr/bin/python3

import json
import os

import aqi
from flask import Flask, jsonify, request
from flask_mqtt import Mqtt
from influxdb_client import InfluxDBClient
from influxdb_client.client.write_api import SYNCHRONOUS
from waitress import serve

AQI_CATEGORIES = {
(-1, 50): "Good",
Expand All @@ -16,13 +19,93 @@
(300, 500): "Hazardous",
}

influxdb_bucket = os.environ.get("INFLUXDB_BUCKET", "sensors")
influxdb_measurement = os.environ.get("INFLUXDB_MEASUREMENT", "air_quality")

app = Flask(__name__)

client = InfluxDBClient.from_env_properties()
write_api = client.write_api(write_options=SYNCHRONOUS)
ENABLE_INFLUXDB = os.environ.get("ENABLE_INFLUXDB", "false").lower() == "true"
app.logger.debug(f"InfluxDB enabled: {ENABLE_INFLUXDB}")
ENABLE_MQTT = os.environ.get("ENABLE_MQTT", "false").lower() == "true"
app.logger.debug(f"MQTT enabled: {ENABLE_MQTT}")

if not ENABLE_INFLUXDB and not ENABLE_MQTT:
app.logger.warning("No data destination configured, the bridge will not forward incoming data.")

if ENABLE_INFLUXDB:
app.logger.info("Creating InfluxDB connection...")
influxdb_client = InfluxDBClient.from_env_properties()
write_api = influxdb_client.write_api(write_options=SYNCHRONOUS)
influxdb_bucket = os.environ.get("INFLUXDB_BUCKET", "sensors")
influxdb_measurement = os.environ.get("INFLUXDB_MEASUREMENT", "air_quality")

if ENABLE_MQTT:
app.logger.info("Creating MQTT connection...")
app.config["MQTT_BROKER_URL"] = os.environ.get("MQTT_BROKER_URL")
app.config["MQTT_BROKER_PORT"] = int(os.environ.get("MQTT_BROKER_PORT", 1883))
app.config["MQTT_USERNAME"] = os.environ.get("MQTT_USERNAME")
app.config["MQTT_PASSWORD"] = os.environ.get("MQTT_PASSWORD")
app.config["MQTT_CLIENT_ID"] = os.environ.get("MQTT_CLIENT_ID", "air-quality-bridge")
mqtt = Mqtt(app)


def register_mqtt_sensor(device_name, sensor_name, device_info_dict):
device_class = None # https://developers.home-assistant.io/docs/core/entity/sensor/#available-device-classes
sensor_name_readable = sensor_name
unit_of_measurement = None
enabled_by_default = "true"

if sensor_name.endswith("P0"):
device_class = "pm1"
sensor_name_readable = "PM 1"
unit_of_measurement = "µg/m³"
elif sensor_name.endswith("P1"):
device_class = "pm10"
sensor_name_readable = "PM 10"
unit_of_measurement = "µg/m³"
elif sensor_name.endswith("P2"):
device_class = "pm25"
sensor_name_readable = "PM 2.5"
unit_of_measurement = "µg/m³"
elif sensor_name.endswith("temperature"):
device_class = "temperature"
sensor_name_readable = "Temperature"
unit_of_measurement = "°C"
elif sensor_name.endswith("humidity"):
device_class = "humidity"
sensor_name_readable = "Humidity"
unit_of_measurement = "%"
elif sensor_name.endswith("pressure"):
device_class = "pressure"
sensor_name_readable = "Pressure"
unit_of_measurement = "Pa"
elif sensor_name.endswith("lux"):
device_class = "illuminance"
sensor_name_readable = "Light"
unit_of_measurement = "lx"
elif sensor_name == "AQI_value":
device_class = "aqi"
sensor_name_readable = "AQI"
elif sensor_name == "AQI_category":
sensor_name_readable = "AQI Category"
else:
enabled_by_default = "false"

ha_sensor_config = {
"availability_topic": f"homeassistant/sensor/{device_name}/status",
"device": device_info_dict,
"enabled_by_default": enabled_by_default,
"name": f"{device_name} {sensor_name_readable}",
"state_class": "measurement",
"state_topic": f"homeassistant/sensor/{device_name}/state",
"unique_id": f"{device_name}_{sensor_name}",
"value_template": f"{{{{ value_json.{sensor_name} }}}}",
}
if device_class is not None:
ha_sensor_config["device_class"] = device_class
if unit_of_measurement is not None:
ha_sensor_config["unit_of_measurement"] = unit_of_measurement

# Publish configuration
app.logger.debug(f"Configuring MQTT sensor: {ha_sensor_config}")
mqtt.publish(f"homeassistant/sensor/{device_name}/{sensor_name}/config", json.dumps(ha_sensor_config))


def transform_data(data):
Expand All @@ -39,14 +122,17 @@ def get_aqi_category(aqi_value):


@app.route("/info", methods=["GET"])
def root():
return jsonify({"app_name": app.name, "influxdb_client": client.ready().status})
def info():
response = {"app_name": app.name}
if ENABLE_INFLUXDB:
response["influxdb_client"] = influxdb_client.ready().status
return jsonify(response)


@app.route("/upload_measurement", methods=["POST"])
def upload_measurement():
data = request.json
app.logger.debug(f"Received data: {data}")
app.logger.debug(f"Received data from sensor: {data}")
data_points = transform_data(data["sensordatavalues"])

node_tag = "unknown"
Expand All @@ -57,28 +143,50 @@ def upload_measurement():

aqi_value = None
if "SDS_P1" in data_points and "SDS_P2" in data_points:
aqi_value = int(
aqi_value = float(
aqi.to_aqi([(aqi.POLLUTANT_PM10, data_points["SDS_P1"]), (aqi.POLLUTANT_PM25, data_points["SDS_P2"])])
)
elif "PMS_P1" in data_points and "PMS_P2" in data_points:
aqi_value = int(
aqi_value = float(
aqi.to_aqi([(aqi.POLLUTANT_PM10, data_points["PMS_P1"]), (aqi.POLLUTANT_PM25, data_points["PMS_P2"])])
)
else:
app.logger.warn("Measurement for {node_tag} does not contain pollutant data.")
app.logger.warn("Measurement for {node_tag} does not contain air pollutant data.")

if aqi_value is not None:
data_points["AQI_value"] = aqi_value
data_points["AQI_category"] = get_aqi_category(aqi_value)

app.logger.debug(f"Writing data: {data_points}")
write_api.write(
bucket=influxdb_bucket,
record=[{"measurement": influxdb_measurement, "tags": {"node": node_tag}, "fields": data_points}],
)
app.logger.debug(f"Parsed and transformed data: {data_points}")

if ENABLE_INFLUXDB:
app.logger.debug("Writing data to InfluxDB...")
write_api.write(
bucket=influxdb_bucket,
record=[{"measurement": influxdb_measurement, "tags": {"node": node_tag}, "fields": data_points}],
)

if ENABLE_MQTT:
app.logger.debug("Writing data to MQTT...")

ip_addr = request.environ.get("HTTP_X_FORWARDED_FOR", request.remote_addr)
device_info_dict = {
"configuration_url": f"http://{ip_addr}",
"identifiers": node_tag,
"manufacturer": "Sensor.Community",
"name": f"Air Sensor {node_tag}",
"sw_version": data["software_version"],
"via_device": "air-quality-bridge",
}

# Publish HA sensor data to MQTT
for sensor_name in data_points:
register_mqtt_sensor(node_tag, sensor_name, device_info_dict)
mqtt.publish(f"homeassistant/sensor/{node_tag}/status", "online")
mqtt.publish(f"homeassistant/sensor/{node_tag}/state", json.dumps(data_points))

return jsonify({"success": "true"})


if __name__ == "__main__":
app.run(host="0.0.0.0")
serve(app, host="0.0.0.0", port=5000)
2 changes: 2 additions & 0 deletions requirements.txt
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
Flask-MQTT~=1.1.1
Flask~=2.2.2
influxdb-client~=1.31.0
python-aqi~=0.6.1
waitress~=2.1.2
1 change: 1 addition & 0 deletions test/measurement.json
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
{"esp8266id":"9372054","software_version":"NRZ-2020-129","sensordatavalues":[{"value_type":"SDS_P1","value":"18.83"},{"value_type":"SDS_P2","value":"10.60"},{"value_type":"BME280_temperature","value":"17.00"},{"value_type":"BME280_pressure","value":"101001.28"},{"value_type":"BME280_humidity","value":"66.66"},{"value_type":"samples","value":"4314326"},{"value_type":"min_micro","value":"33"},{"value_type":"max_micro","value":"20201"},{"value_type":"signal","value":"-46"}]}

0 comments on commit 7ab1fb7

Please sign in to comment.