A connector plugin to use with Kafka's Connect API. It can be configured to map any topic from an mqtt broker to any topic on the Kafka broker. The connector can be configured to use SSL in communication with the mqtt broker.
To be able to test the connector, we first need to set up the Kafka infrastructure. For simplicity, we start by configuring single nodes (one Zookeeper and one Kafka).
- Java version >8 installed to run Kafka and this source-connector. Check if Java is installed, and which version by running
java -version
in your terminal. We useopenjdk version "11.0.6" 2020-01-14
. - Linux. We are running this setup on Ubuntu 16.4.
- A mqtt-broker. We use EMQX
- Maven. Check if maven is installed properly with running
mvn -v
in your terminal. We use Maven 3.6.0
- Download EMQX from https://www.emqx.io/downloads
- Extract the download to your desired destination, here termed "path-to-emqx".
- Run the following command in your terminal to start the EMQX broker:
"path-to-emqx"/emqx/bin/emqx start
- Check that EMQX is running with the following terminal command:
"path-to-emqx"/emqx/bin/emqx_ctl status
Download a binary Kafka release from https://kafka.apache.org/downloads. We work with the compressed download:
kafka_2.13-2.4.1.tgz Extract the download to your desired destination, here termed "path-to-kafka".
About Zookeeper:
"Zookeeper is a top-level software developed by Apache that acts as a centralized service and is used to maintain naming and configuration data and to provide flexible and robust synchronization within distributed systems. Zookeeper keeps track of status of the Kafka cluster nodes and it also keeps track of Kafka topics, partitions etc. Zookeeper it self is allowing multiple clients to perform simultaneous reads and writes and acts as a shared configuration service within the system. The Zookeeper atomic broadcast (ZAB) protocol i s the brains of the whole system, making it possible for Zookeeper to act as an atomic broadcast system and issue orderly updates." Cloudkarafka
Start Zookeeper
"path-to-kafka"/kafka_2.13-2.4.1/bin/zookeeper-server-start.sh "path-to-kafka"/kafka_2.13-2.4.1/config/zookeeper.properties
P.S. The default properties of zookeeper.properties works well for this tutorial's purpose. It will start Zookeeper on the default port 2181
.
As mentioned, we will only kick up a single instance Kafka Broker. The Kafka Broker will use "path-to-kafka"/kafka_2.13-2.4.1/config/server.properties
, and it could be worth checking that
zookeeper.connect=localhost:2181
or set according to your custom configuration in zookeeper.properties
.
Start Kafka Broker
"path-to-kafka"/kafka_2.13-2.4.1/bin/kafka-server-start.sh "path-to-kafka"/kafka_2.13-2.4.1/config/server.properties
Create Kafka Topic
"path-to-kafka"/kafka_2.13-2.4.1/bin/kafka-topics --create --bootstrap-server 127.0.0.1:9092 --replication-factor 1 --partitions 1 --topic test
The Kafka Connect API is what we utilise as a framework around our connectors, to handle scaling, polling from Kafka, work distribution etc. Kafka Connect can run as connect-standalone or as connect-distributed. The connect-standalone is engineered for demo and test purposes, as it cannot provide fallback in a production environment.
Start Kafka Connect Follow the respective steps below to start Kafka Connect in preferred mode.
Connect in general
Build this java maven project, but navigating to root kafka-mqtt-source-connector
in a terminal and typing:
mvn install
Copy the kafka-mqtt-source-connector-"version".jar
from your maven target directory to the directory /usr/share/java/kafka
:
sudo mkdir /usr/share/java/kafka
sudo cp ./target/*with-dependencies.jar /usr/share/java/kafka/.
Insecure - using tcp Connect Standalone
- Uncomment
plugin.path
in"path-to-kafka"/kafka_2.13-2.4.1/config/connect-standalone.properties
, so that it is set to
plugin.path=/usr/share/java,/usr/local/share/kafka/plugins,/usr/local/share/java/
- Copy the accompanying source connector properties file in this repository, source-connect-mqtt.properties, to
"path-to-kafka"/kafka_2.13-2.4.1/config/
(or create a new properties file with the same name in the given directory). - Ensure the following configuration in
source-connect-mqtt.properties
:
name=mqtt-source-connector
tasks.max=1
connector.class=com.sintef.asam.MqttSourceConnector
mqtt.connector.broker.uri=tcp://0.0.0.0:1883
mqtt.connector.broker.topic=test/#
mqtt.conncetor.kafka.topic=test
where mqtt.connector.broker.topic
sets the topic one wants to subscribe to in the mqtt broker, while mqtt.connector.kafka.topic
sets the topic for publishing to the Kafka broker. The mqtt.connector.broker.uri
needs to be set according to your own mqtt broker, but the default for mosquitto and emqx will be the abovementioned.
- Start Connect Standalone with our connector by typing (this may take a minute or two):
"path-to-kafka"/kafka_2.13-2.4.1/bin/connect-standalone.sh "path-to-kafka"/kafka_2.13-2.4.1/config/connect-standalone.properties "path-to-kafka"/kafka_2.13-2.4.1/config/source-connect-mqtt.properties
Connect Distributed Kafka Connect Distributed does not need properties files to configure connectors. It uses the Kafka Connect REST-interface.
- Uncomment
plugin.path
in"path-to-kafka"/kafka_2.13-2.4.1/config/connect-distributed.properties
, so that it is set to
plugin.path=/usr/share/java,/usr/local/share/kafka/plugins,/usr/local/share/java/
and that rest.port
so that it is set to
rest.port=19005
which will help one to avoid some "bind" exceptions. This will be the port for the Connect REST-interface.
- Start Connect Distributed with by typing (this may take a minute or two):
"path-to-kafka"/kafka_2.13-2.4.1/bin/connect-distributed.sh "path-to-kafka"/kafka_2.13-2.4.1/config/connect-distributed.properties
- Start our connector by posting the following command to the Connect REST-interface:
curl -s -X POST -H 'Content-Type: application/json' http://127.0.0.1:19005/connectors -d '{"name":"mqtt-source-connector","config":{"connector.class":"com.sintef.asam.MqttSourceConnector","tasks.max":"1","mqtt.connector.broker.uri":"tcp://localhost:1883", "mqtt.connector.broker.topic":"test/#","mqtt.connector.kafka.topic":"test"}}'
- Inspect the terminal where you started Conncet Distributed, and after the connector seem to have successfully started, check the existence by typing:
curl 'Content-Type: application/json' http://127.0.0.1:19005/connectors
where the response is an array with connectors by name.
- Test the connector by making a Kafka Consumer subscribing to the topic
test
:
Documents/confluent-5.4.0/bin/kafka-console-consumer --bootstrap-server 127.0.0.1:9092 --topic test
- Then publish something to the test topic on your EMQX broker, using e.g. mosquitto:
mosquitto_pub -h 127.0.0.1 -p 1883 -t test -m "Hello, world!"
and see the message appear base64 encoded to your Kafka Consumer.
Secure - using SSL
Setting up your own certificate authority (for test purposes) and configure MQTT broker to use SSL
We first need certificates and keys to encrypt our secure socket layer (SSL) communication to and from the broker. To make your own certificate authority, and to create a client certificate and a client key, this provides very thorough and instructive guide: https://deliciousbrains.com/ssl-certificate-authority-for-local-https-development/
Let us assume that you have a /home/CA.crt
, a /home/client.crt
and a /home/client.key
, we configure our EMQX broker by finding the configuration file "path-to-emqx"/etc/emqx.conf
and setting/uncommenting the following properties:
listener.ssl.external = 8883
listener.ssl.external.access.1 = allow all
listener.ssl.external.keyfile = /home/client.key
listener.ssl.external.certfile = /home/client.cert
listener.ssl.external.cacertfile = /home/CA.cert
Then restart your mqtt broker.
Connect Distributed 12. Delete the previous made connector using TCP, if one is running, using the following call to the Connect REST-interface:
curl -X DELETE 'Content-Type: application/json' http://127.0.0.1:19005/connectors/mqtt-source-connector
if your connector was named mqtt-source-connector
. Check running connectors by name using:
curl 'Content-Type: application/json' http://127.0.0.1:19005/connectors
- Copy your CA certificate, client certificate and client key to desired directory. In our test case we have the full paths
/home/ca.crt
,/home/client.crt
and/home/client.key
. - Start our connector by posting the following command to the Connect REST-interface:
curl -s -X POST -H 'Content-Type: application/json' http://127.0.0.1:19005/connectors -d '{"name":"mqtt-source-connector","config":{"connector.class":"com.sintef.asam.MqttSourceConnector","tasks.max":"1","mqtt.connector.broker.uri":"ssl://localhost:8883", "mqtt.connector.broker.topic":"test/#", "mqtt.connector.kafka.topic":"test","mqtt.connector.ssl":true, "mqtt.connector.ssl.ca":"/home/ca.crt/","mqtt.connector.ssl.crt":"/home/client.crt","mqtt.connector.ssl.key":"/home/client.key"}}'
- Test the connector by making a Kafka Consumer subscribing to the topic
test
:
Documents/confluent-5.4.0/bin/kafka-console-consumer --bootstrap-server 127.0.0.1:9092 --topic test
- Then publish something to the test topic on your EMQX broker, using e.g. mosquitto:
mosquitto_pub --url mqtts://127.0.0.1:8883/test --cafile /home/ca.crt --cert /home/client.crt --key /home/client.key --insecure --tls-version tlsv1.2 -m "Hello, world!"
and see the message appear base64 encoded to your Kafka Consumer.