mqttasgi is an ASGI protocol server that implements a complete interface for MQTT for the Django development framework. Built following daphne protocol server.
- Publish / Subscribe to any topic
- Multiple workers to handle different topics / subscriptions.
- Full Django ORM support within consumers.
- Full Channel Layers support.
- Full testing consumer to enable TDD.
- Lightweight.
- Django 3.x, 4.x / Channels 3.x support
To install mqttasgi for Django 3.x, 4.x
pip install mqttasgi
IMPORTANT NOTE: If legacy support for Django 2.x is required install latest 0.x mqttasgi.
Mqttasgi provides a cli interface to run the protocol server.
mqttasgi -H localhost -p 1883 my_application.asgi:application
Parameters:
Parameter | Explanation | Environment variable | Default |
---|---|---|---|
-H / --host | MQTT broker host | MQTT_HOSTNAME | localhost |
-p / --port | MQTT broker port | MQTT_PORT | 1883 |
-c / --cleansession | MQTT Clean Session | MQTT_CLEAN | True |
-v / --verbosity | Logging verbosity | VERBOSITY | 0 |
-U / --username | MQTT Username | MQTT_USERNAME | |
-P / --password | MQTT Password | MQTT_PASSWORD | |
-i / --id | MQTT Client ID | MQTT_CLIENT_ID | |
-C / --cert | TLS Certificate | TLS_CERT | |
-K / --key | TLS Key | TLS_KEY | |
-S / --cacert | TLS CA Certificate | TLS_CA | |
-SSL / --use-ssl | Use ssl (without certificate authentication) | MQTT_USE_SSL | False |
-T / --transport | Transport type (tcp or websockets) | MQTT_TRANSPORT | tcp |
-r / --retries | Num. retries on disconnect | MQTT_RETRIES | 3 |
Last argument | ASGI Apllication |
Environment variables are supported and can be set using a .env
file on the root of the project, but passing a parameter overrides this value.
To add your consumer to the asgi.py
file in your django application:
import os
import django
from channels.routing import ProtocolTypeRouter
from my_application.consumers import MyMqttConsumer
from django.core.asgi import get_asgi_application
os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'my_application.settings')
django.setup()
application = ProtocolTypeRouter({
'http': get_asgi_application(),
'mqtt': MyMqttConsumer.as_asgi(),
})
Your consumer should inherit from MqttConsumer in mqttasgi.consumers. It implements helper functions such as publish and subscribe. A simple example:
from mqttasgi.consumers import MqttConsumer
class MyMqttConsumer(MqttConsumer):
async def connect(self):
await self.subscribe('my/testing/topic', 2)
async def receive(self, mqtt_message):
print('Received a message at topic:', mqtt_message['topic'])
print('With payload', mqtt_message['payload'])
print('And QOS:', mqtt_message['qos'])
pass
async def disconnect(self):
await self.unsubscribe('my/testing/topic')
The consumer provides a full API to interact with MQTT and with the channel layer:
Publishes a message to the MQTT topic passed as argument with the given payload. The QOS of the message or retain flagged can be also passed as aditional arguments.
await self.publish(topic, payload, qos=1, retain=False)
Subscribes to the MQTT topic passed as argument with the given QOS.
await self.subscribe(topic, qos)
Unsubscribes from the given MQTT topic.
await self.unsubscribe(topic)
This is an advanced functionality of the MQTTAsgi protocol server that allows the user to run multiple consumers on the same mqttasgi instance.
The app_id
is a unique identifier for the worker, the consumer_path
is the dot separated path to the consumer and consumer_params
is the parameter dictonary to pass down to the new consumer.
await self.spawn_worker(app_id, consumer_path, consumer_params)
The consumer can also kill the spawned workers with a specific app_id
:
await self.kill_worker(self, app_id)
MQTTAsgi supports channel layer communications and group messages. It follows the Channel Layers implementation:
Outside of the consumer:
from channels.layers import get_channel_layer
from asgiref.sync import async_to_sync
channel_layer = get_channel_layer()
async_to_sync(channel_layer.group_send)("my.group", {"type": "my.custom.message", "text":"Hi from outside of the consumer"})
In the consumer:
from mqttasgi.consumers import MqttConsumer
class MyMqttConsumer(MqttConsumer):
async def connect(self):
await self.subscribe('my/testing/topic', 2)
await self.channel_layer.group_add("my.group", self.channel_name)
async def receive(self, mqtt_message):
print('Received a message at topic:', mqtt_message['topic'])
print('With payload', mqtt_message['payload'])
print('And QOS:', mqtt_message['qos'])
pass
async def my_custom_message(self, event):
print('Received a channel layer message')
print(event)
async def disconnect(self):
await self.unsubscribe('my/testing/topic')
MAPER - IIOT Asset Monitoring - Webpage
Predict failures before they happen.
Real time health monitoring to avoid unexpected downtimes and organize maintenance in industrial plants.
Combining IoT Technology and Artificial Intelligence, we deliver a complete view of your assets like never before.
With real time health diagnostics you will increase the reliability of the whole production process, benefitting both the company and it's staff.