This repository contains the source code for a MySQL loadable function library (previously called UDF - User Defined Functions), which provides some additonal SQL functions to interact with your MQTT server for publish and subscribe MQTT topics.
If you like lib_mysqludf_mqtt give it a star or fork it:
Ensure the Eclipse Paho C Client Library for the MQTT Protocol is installed.
Also install libjsonparser:
sudo apt install libjsonparser-dev
From the base directory run:
make
sudo make install
This will build and install the library file.
To active the loadable function within your MySQL server run the follwoing SQL queries:
CREATE FUNCTION mqtt_info RETURNS STRING SONAME 'lib_mysqludf_mqtt.so';
CREATE FUNCTION mqtt_lasterror RETURNS STRING SONAME 'lib_mysqludf_mqtt.so';
CREATE FUNCTION mqtt_connect RETURNS INTEGER SONAME 'lib_mysqludf_mqtt.so';
CREATE FUNCTION mqtt_disconnect RETURNS INTEGER SONAME 'lib_mysqludf_mqtt.so';
CREATE FUNCTION mqtt_publish RETURNS INTEGER SONAME 'lib_mysqludf_mqtt.so';
CREATE FUNCTION mqtt_subscribe RETURNS STRING SONAME 'lib_mysqludf_mqtt.so';
To uninstall first deactive the loadable function within your MySQL server running the SQL queries:
DROP FUNCTION IF EXISTS mqtt_info;
DROP FUNCTION IF EXISTS mqtt_lasterror;
DROP FUNCTION IF EXISTS mqtt_connect;
DROP FUNCTION IF EXISTS mqtt_disconnect;
DROP FUNCTION IF EXISTS mqtt_publish;
DROP FUNCTION IF EXISTS mqtt_subscribe;
Then uninstall the library file using command line:
sudo make uninstall
Connect to a mqtt server and returns a handle.
mqtt_connect(server {,[username]} {,[password] {,[options]}}})
Parameter in {}
are optional an can be omit.
Parameter in []
can be NULL
- in this case a default value is used.
To use optional parameters after omitting other optional parameters, use NULL
.
server
String- Specifying the server to which the client will connect. It takes the form
protocol://host:port
.
Currentlyprotocol
must betcp
orssl
.
Forhost
you can specify either an IP address or a hostname.
For instance, to connect to a server running on the local machines with the default MQTT port, specifytcp://localhost:1883
orssl://localhost:8883
for an SSL connection. username
String- Username for authentification. If
username
should remain unused, omit the parameter or set it toNULL
password
String- Password for authentification. If the
password
should remain unused, omit the parameter or set it toNULL
options
String- JSON string containing additonal options or NULL if unused. The following JSON objects are accept:
CApath
: String- Points to a directory containing CA certificates in PEM format
CAfile
: String- The file in PEM format containing the public digital certificates trusted by the client.
keyStore
: String- The file in PEM format containing the public certificate chain of the client. It may also include the client's private key.
privateKey
: String- If not included in the sslKeyStore, this setting points to the file in PEM format containing the client's private key.
privateKeyPassword
: String- The password to load the client's privateKey if encrypted.
enabledCipherSuites
: String- The list of cipher suites that the client will present to the server during the SSL handshake.
verify
: boolean- Whether to carry out post-connect checks, including that a certificate matches the given host name.
enableServerCertAuth
: boolean- True/False option to enable verification of the server certificate.
sslVersion
: integer- The SSL/TLS version to use. Specify one of:
0 = MQTT_SSL_VERSION_DEFAULT
1 = MQTT_SSL_VERSION_TLS_1_0
2 = MQTT_SSL_VERSION_TLS_1_1
3 = MQTT_SSL_VERSION_TLS_1_2 keepAliveInterval
: integer- The "keep alive" interval, measured in seconds, defines the maximum time that should pass without communication between the client and the server.
cleansession
: boolean- The cleansession setting controls the behaviour of both the client and the server at connection and disconnection time.
reliable
: boolean- This is a boolean value that controls how many messages can be in-flight simultaneously. Setting reliable to true means that a published message must be completed (acknowledgements received) before another can be sent.
MQTTVersion
: String- Sets the version of MQTT to be used on the connect:
0 = MQTTVERSION_DEFAULT start with 3.1.1, and if that fails, fall back to 3.1
3 = MQTTVERSION_3_1
4 = MQTTVERSION_3_1_1
5 = MQTTVERSION_5 maxInflightMessages
: integer- The maximum number of messages in flight
willTopic
: String- The LWT topic to which the LWT message will be published.
willMessage
: String- The LWT payload.
willRetained
: boolean- The retained flag for the LWT message.
willQos
: String- The quality of service setting for the LWT message
Returns a valid handle or 0 on error
Examples:
SET @client = (SELECT mqtt_connect('tcp://localhost:1883', NULL, NULL));
SET @client = (SELECT mqtt_connect('tcp://localhost:1883', 'myuser', 'mypasswd'));
SET @client = (SELECT mqtt_connect('ssl://mqtt.eclipseprojects.io:8883', NULL, NULL, '{"verify":true,"CApath":"/etc/ssl/certs"}'));
SET @client = (SELECT mqtt_connect('ssl://mqtt.eclipseprojects.io:8883', 'myuser', 'mypasswd', '{"verify":true,"CAfile":"/etc/ssl/certs/ISRG_Root_X1.pem"}'));
Disconnect from a mqtt server using previous requested handle by mqtt_connect()
.
mqtt_disconnect(handle, {timeout})
Parameter in {}
are optional an can be omit.
handle
BIGINT- Handle previously got from
mqtt_connect
. timeout
INT- Optional timeout in ms
Returns 0 if successful.
Example:
SELECT mqtt_disconnect(@client);
Publish a mqtt payload and returns its status.
Parameter in {}
are optional an can be omit.
Parameter in []
can be NULL
- in this case a default value is used.
To use optional parameters after omitting other optional parameters, use NULL
.
Possible call variants:
(1) mqtt_publish(server, [username], [password], topic, [payload] {,[qos] {,[retained] {,[timeout] {,[options]}}}})
(2) mqtt_publish(client, topic, [payload] {,[qos] {,[retained] {,[timeout] {,[options]}}}})
Variant (1) connects to MQTT, publish the payload and disconnnect after publish. This variant is provided for individual single mqtt_publish()
calls.
Because this variant may slow down when a lot of publishing should be done, you can do publish using variant (2) using a client handle from a previous mqtt_connect()
.
Variant (2) should be used for multiple mqtt_publish()
calls with a preceding mqtt_connect()
and a final mqtt_disconnect()
:
- Call
mqtt_connect()
to get a valid mqtt client connection handle - Call
mqtt_publish()
usinghandle
parameter - Repeat step 2. for your needs
- Call
mqtt_disconnect()
usinghandle
to free the client connection handle
server
String- Specifying the server to which the client will connect. It takes the form
protocol://host:port
.
Currentlyprotocol
must betcp
orssl
.
Forhost
you can specify either an IP address or a hostname.
For instance, to connect to a server running on the local machines with the default MQTT port, specifytcp://localhost:1883
orssl://localhost:8883
for an SSL connection. username
String- Username for authentification or
NULL
if unused password
String- Password for authentification or
NULL
if unused client
BIGINT- A valid handle returned from mqtt_connect() call.
topic
String- The topic to be published
payload
String- The message published for the topic
qos
INT [0..2] (default 0)- The QOS (Quality Of Service) number
retained
INT [0,1] (default 0)- Flag if message should be retained (1) or not (0)
timeout
INT- Timeout value for connecting to MQTT server (in ms)
options
String- JSON string containing additonal options or NULL if unused.
For details seemqtt_connect()
Returns 0 for success, otherwise error code from MQTTClient_connect() (see also http://www.eclipse.org/paho/files/mqttdoc/MQTTClient/html/_m_q_t_t_client_8h.html).
You can also retrieve the error code and description using mqtt_lasterror()
.
Examples:
SELECT mqtt_publish('tcp://localhost:1883', 'myuser', 'mypasswd', 'mytopic/time', NOW());
SET @client = (SELECT mqtt_connect('ssl://localhost:8883', 'myuser', 'mypasswd', '{"verify":true}'));
SELECT IF(@client IS NOT NULL, mqtt_publish(@client, 'mytopic/LWT', "Online", NULL, 1), NULL);
SELECT IF(@client IS NOT NULL, mqtt_publish(@client, 'mytopic/time', NOW()));
SELECT IF(@client IS NOT NULL, mqtt_publish(@client, 'mytopic/hello', "world", 1, 1));
SELECT IF(@client IS NOT NULL, mqtt_publish(@client, 'mytopic/LWT', "Offline", NULL, 1), NULL);
SELECT IF(@client IS NOT NULL, mqtt_disconnect(@client), NULL);
Subsribe to a mqtt topic and returns the payload if any..
Parameter in {}
are optional an can be omit.
Parameter in []
can be NULL
- in this case a default value is used.
To use optional parameters after omitting other optional parameters, use NULL
.
Possible call variants:
(1) mqtt_subscribe(server, [username], [password], topic, {,[qos] {,[timeout] {,[options]}}})
(2) mqtt_subscribe(client, topic, [payload] {,[qos] {,[timeout] {,[options]}}})
Variant (1) connects to MQTT, subscribes to a topic and disconnnect after subscribe. This variant is provided for individual single mqtt_subscribe()
calls.
Because this variant may slow down when a lot of subscribes should be done, you can do subscribes using variant (2) using a client handle from a previous mqtt_connect()
.
Vvariant (2) should be used for multiple mqtt_subscribe()
calls with a preceding mqtt_connect()
and a final mqtt_disconnect()
:
- Call
mqtt_connect()
to get a valid mqtt client connection handle - Call
mqtt_subscribe()
usinghandle
parameter - Repeat step 2. for your needs
- Call
mqtt_disconnect()
usinghandle
to free the client connection handle
server
String- Specifying the server to which the client will connect. It takes the form
protocol://host:port
.
Currentlyprotocol
must betcp
orssl
.
Forhost
you can specify either an IP address or a hostname.
For instance, to connect to a server running on the local machines with the default MQTT port, specifytcp://localhost:1883
orssl://localhost:8883
for an SSL connection. username
String- Username for authentification or
NULL
if unused password
String- Password for authentification or
NULL
if unused client
BIGINT- A valid handle returned from mqtt_connect() call.
topic
String- The topic to be published
payload
String- The message published for the topic
qos
INT [0..2] (default 0)- The QOS (Quality Of Service) number
retained
INT [0,1] (default 0)- Flag if message should be retained (1) or not (0)
timeout
INT- Timeout value for connecting to MQTT server (in ms)
options
String- JSON string containing additonal options or NULL if unused.
For details seemqtt_connect()
Returns 0 for success, otherwise error code from MQTTClient_connect() (see also http://www.eclipse.org/paho/files/mqttdoc/MQTTClient/html/_m_q_t_t_client_8h.html).
You can also retrieve the error code and description using mqtt_lasterror()
.
Examples:
SELECT mqtt_subscribe('tcp://localhost:1883', 'myuser', 'mypasswd', 'mytopic/time');
2021-11-01 14:35:25
SET @client = (SELECT mqtt_connect('ssl://localhost:8883', 'myuser', 'mypasswd', '{"verify":true}'));
SELECT IF(@client IS NOT NULL, mqtt_subscribe(@client, 'mytopic/time', NULL, 1000));
SELECT IF(@client IS NOT NULL, mqtt_subscribe(@client, 'mytopic/hello', 1, 5000));
SELECT IF(@client IS NOT NULL, mqtt_disconnect(@client), NULL);
Returns last error as JSON string
SELECT mqtt_lasterror();
Examples:
> SELECT mqtt_lasterror();
+----------------------------------------------------------------------+
| mqtt_lasterror() |
+----------------------------------------------------------------------+
| {"func":"MQTTClient_connect","rc":5, "desc": "Unknown error code 5"} |
+----------------------------------------------------------------------+
> SELECT
JSON_UNQUOTE(JSON_VALUE(mqtt_lasterror(),'$.rc')) AS rc,
JSON_UNQUOTE(JSON_VALUE(mqtt_lasterror(),'$."func"')) AS 'func',
JSON_UNQUOTE(JSON_VALUE(mqtt_lasterror(),'$."desc"')) AS 'desc';
+--------------------+------+----------------------+
| func | rc | desc |
+--------------------+------+----------------------+
| MQTTClient_connect | 5 | Unknown error code 5 |
+--------------------+------+----------------------+
Returns library info as JSON string
Examples:
> SELECT
JSON_UNQUOTE(JSON_VALUE(mqtt_info(),'$.Name')) AS Name,
JSON_UNQUOTE(JSON_VALUE(mqtt_info(),'$."Version"')) AS Version,
JSON_UNQUOTE(JSON_VALUE(mqtt_info(),'$."Build"')) AS Build;
+-------------------+---------+----------------------+
| Name | Version | Build |
+-------------------+---------+----------------------+
| lib_mysqludf_mqtt | 1.0.0 | Nov 1 2021 09:31:40 |
+-------------------+---------+----------------------+
> SELECT
JSON_UNQUOTE(JSON_VALUE(mqtt_info(),'$."Library"."Product name"')) AS Library,
JSON_UNQUOTE(JSON_VALUE(mqtt_info(),'$."Library"."Version"')) AS `Library Version`,
JSON_UNQUOTE(JSON_VALUE(mqtt_info(),'$."Library"."Build level"')) AS `Library Build`;
+------------------------------------------------+-----------------+-------------------------------+
| Library | Library Version | Library Build |
+------------------------------------------------+-----------------+-------------------------------+
| Eclipse Paho Synchronous MQTT C Client Library | 1.3.9 | Sa 23. Okt 11:09:35 CEST 2021 |
+------------------------------------------------+-----------------+-------------------------------+
> SELECT JSON_UNQUOTE(JSON_VALUE(mqtt_info(),'$."Library"."Product name"')) AS Library;
+------------------------------------------------+
| Library |
+------------------------------------------------+
| Eclipse Paho Synchronous MQTT C Client Library |
+------------------------------------------------+
1 row in set (0.001 sec)
> SELECT JSON_UNQUOTE(JSON_VALUE(mqtt_info(),'$."Library"."Version"')) AS `Library Version`;
+-----------------+
| Library Version |
+-----------------+
| 1.3.9 |
+-----------------+
1 row in set (0.001 sec)
> SELECT JSON_UNQUOTE(JSON_VALUE(mqtt_info(),'$."Library"."Build level"')) AS `Library Build`;
+-------------------------------+
| Library Build |
+-------------------------------+
| Sa 23. Okt 15:59:53 CEST 2021 |
+-------------------------------+
1 row in set (0.001 sec)
> SELECT JSON_UNQUOTE(JSON_VALUE(mqtt_info(),'$."Library"."OpenSSL platform"')) AS `OpenSSL platform`;
+------------------------+
| OpenSSL platform |
+------------------------+
| platform: debian-amd64 |
+------------------------+
1 row in set (0.000 sec)
> SELECT JSON_UNQUOTE(JSON_VALUE(mqtt_info(),'$."Library"."OpenSSL build timestamp"')) AS `OpenSSL build`;
+----------------------------------------+
| OpenSSL build |
+----------------------------------------+
| built on: Mon Aug 23 17:02:39 2021 UTC |
+----------------------------------------+
1 row in set (0.001 sec)
> SELECT JSON_UNQUOTE(JSON_VALUE(mqtt_info(),'$."Library"."OpenSSL version"')) AS `OpenSSL version`;
+-----------------------------+
| OpenSSL version |
+-----------------------------+
| OpenSSL 1.1.1f 31 Mar 2020 |
+-----------------------------+
1 row in set (0.000 sec)
> SELECT JSON_UNQUOTE(JSON_VALUE(mqtt_info(),'$."Library"."OpenSSL directory"')) AS `OpenSSL directory`;
+----------------------------+
| OpenSSL directory |
+----------------------------+
| OPENSSLDIR: '/usr/lib/ssl' |
+----------------------------+
1 row in set (0.000 sec)
> SELECT JSON_UNQUOTE(JSON_VALUE(mqtt_info(),'$."Library"."OpenSSL flags"')) AS `OpenSSL flags`;
+--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
| OpenSSL flags |
+--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
| compiler: gcc -fPIC -pthread -m64 -Wa,--noexecstack -Wall -Wa,--noexecstack -g -O2 -fdebug-prefix-map=/build/openssl-JWge0V/openssl-1.1.1f=. -fstack-protector-strong -Wformat -Werror=format-security -DOPENSSL_TLS_SECURITY_LEVEL=2 -DOPENSSL_USE_NODELETE -DL_ENDIAN -DOPENSSL_PIC -DOPENSSL_CPUID_OBJ -DOPENSSL_IA32_SSE2 -DOPENSSL_BN_ASM_MONT -DOPENSSL_BN_ASM_MONT5 -DOPENSSL_BN_ASM_GF2m -DSHA1_ASM -DSHA256_ASM -DSHA512_ASM -DKECCAK1600_ASM -DRC4_ASM -DMD5_ASM -DAESNI_ASM -DVPAES_ASM -DGHASH_ASM -DECP_NISTZ256_ASM -DX25519_ASM -DPOLY1305_ASM -DNDEBUG -Wdate-time -D_FORTIFY_SOURCE=2 |
+--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
1 row in set (0.000 sec)
If you get some errors that system does not find the MQTT Paho library, make a copy from local lib dir to lib dir:
sudo cp /usr/local/lib/libpaho-mqtt3* /usr/lib/
Function like mqtt_lasterror()
returns a hex string instead of a JSON like 0x7B2266756E63223A224D5154544...
.
This is a client setting.
To prevent this start your client with --binary-as-hex=0
If this is not possible, convert the hex-string into a readable one using CONVERT():
SELECT CONVERT(mqtt_info() USING utf8);