From v4.1, EMQX MQTT broker provides the specified plugin that supports multiple languages emqx_extension_hook. Currently, it is supported that use other programming languages to process the hook events of EMQX. The developer can use Python or Java to quickly develop their plugins or do some expansions based on the official functions to satisfy their business scenarios. For example:
- Verify the client's login permission: when connecting to the client, the corresponding function will be triggered and the client information will be obtained through parameters. Finally, it judges whether it has the login permission after reading the database, comparison, etc.
- Record the online status of client and online and offline history: trigger corresponding functions when the status of the client changes, the client information will be obtained through parameters, and the online status of the client in the database will be rewritten.
- Verify the operation permission for PUB/SUB of the client: trigger corresponding functions when publish or subscribe, and the client information and current topics will be obtained through parameters to judge whether it has the corresponding operation permission.
- Handle session and message events, implement the subscription relation and message processing or storage: trigger corresponding functions when publish messages and status changes, the current client information, information status and message content will be forwarded to Kafka or database for storage.
Note:the message hook is only supported in the enterprise.
Python and Java drivers are based on the processes Erlang/OTP-Port to implement communication, and have very high throughput performance. This article will take Python expansion as an example to introduce how to use EMQX cross-language expansion.
- The broker of EMQX is required to install Python 3.6 or higher version
- Install Python SDK through pip
- Adjust EMQX configurations to ensure that the corresponding configuration items correctly point Python project
- Import SDK to write code
Install SDK locally through pip commands and ensure using pip3 to install:
pip3 install emqx-extension-sdk
Modify the configuration of plugin emqx-extension-hook
and correctly use expansion.
## Setup the supported drivers
##
## Value: python2 | python3 | java
exhook.drivers = python3
## Search path for scripts/library
exhook.drivers.python3.path = data/extension/hooks.py
## Call timeout
##
## Value: Duration
##exhook.drivers.python3.call_timeout = 5s
## Initial module name
## Your filename or module name
exhook.drivers.python3.init_module = hooks
Create file hooks.py
in the directory emqx/data/extension
and imporrt SDK to write business logic. The example is as follows.
## data/extension/hooks.py
from emqx_extension.hooks import EmqxHookSdk, hooks_handler
from emqx_extension.types import EMQX_CLIENTINFO_PARSE_T, EMQX_MESSAGE_PARSE_T
# inherit SDK class HookSdk
class CustomHook(EmqxHookSdk):
# use decorator to register hooks
@hooks_handler()
def on_client_connect(self,
conninfo: EMQX_CLIENTINFO_PARSE_T = None,
props: dict = None,
state: list = None):
print(f'[Python SDK] [on_client_connect] {conninfo.clientid} connecte')
@hooks_handler()
def on_client_connected(self,
clientinfo: EMQX_CLIENTINFO_PARSE_T,
state: list = None):
print(
f'[Python SDK] [on_client_connected] {clientinfo.clientid} connected')
@hooks_handler()
def on_client_check_acl(self, clientinfo: EMQX_CLIENTINFO_PARSE_T,
pubsub: str,
topic: str,
result: bool,
state: tuple) -> bool:
print(
f'[Python SDK] [on_client_check_acl] {clientinfo.username} check ACL: {pubsub} {topic}')
# when the username is empty, ACL verification failed
if clientinfo.username == '':
return False
return True
@hooks_handler()
def on_client_authenticate(self, clientinfo: EMQX_CLIENTINFO_PARSE_T, authresult,
state) -> bool:
print(
f'[Python SDK] [on_client_authenticate] {clientinfo.clientid} authenticate')
# when the clientid is not empty, verification passed
if clientinfo.clientid != '':
return True
return False
# on_message_* only supports the enterprise
@hooks_handler()
def on_message_publish(self, message: EMQX_MESSAGE_PARSE_T, state):
print(
f'[Python SDK] [on_message_publish] {message.topic} {message.payload}')
emqx_hook = CustomHook(hook_module=f'{__name__}.emqx_hook')
def init():
return emqx_hook.start()
def deinit():
return
Enable plugin emqx_extension_hook
. If configuration error or write wrong Python code, it can not be enabled normally. After it is enabled, try to establish the MQTT connection and observer the running situation of the business.
./bin/emqx_ctl plugins load emqx_extension_hook
Currently, the EMQX Python extension SDK is open source, if you have higher requirements for the controllability and performance or you need to use the running environment of Python 2.7, welcome to contribute code or develop based on original examples.
- Code repository: emqx-extension-python-sdk
- You can refer to this example to wrap: emqx-extension-hook main.py