The Strimzi operator installs and manages Kafka clusters on Kubernetes. It’s been pre-installed for you, so all you have to do is create a Kafka cluster inside your namespace.
First, open the {{OCP_CONSOLE}}/topology[OpenShift Console], go to the Developer console and navigate to your project. Once there, on the left menu click on Add > Developer Catalog and type in kafka
in the keyword filter box:
These are all of the Kafka cluster elements you can install. Click on Kafka, and then click on Create. This will open a yaml file for you to configure the cluster before it’s installed. Use the following manifest.
apiVersion: kafka.strimzi.io/v1beta1 kind: Kafka metadata: name: names-cluster namespace: <myuser-namespace> spec: entityOperator: topicOperator: reconciliationIntervalSeconds: 30 watchedNamespace: <myuser-namespace> userOperator: reconciliationIntervalSeconds: 30 watchedNamespace: <myuser-namespace> kafka: listeners: external: type: route authentication: type: tls plain: {} tls: {} replicas: 3 storage: type: ephemeral zookeeper: replicas: 3 storage: type: ephemeral
This will create a new Kafka Kubernetes object in your namespace, triggering the Operator to deploy Kafka. After clicking Create you will be taken to the list of objects created by the Kafka operator.
Go to Developer Catalog again, and enter topic
into the search box. Click on the Kafka Topic box, then click Create:
We’ll need to create a topic for our application to stream to and from, so in the YAML:
-
Change the metadata > name value from
my-topic
tonames
. -
Change the vale of the
strimzi.io/cluster
label frommy-cluster
tonames-cluster
apiVersion: kafka.strimzi.io/v1beta1 kind: KafkaTopic metadata: name: names labels: strimzi.io/cluster: names-cluster spec: partitions: 10 replicas: 2
This will cause the Operator to provision a new Topic in the Kafka cluster.
Verify that the Kafka and Zookeeper pods are starting up by executing this command in a Terminal in Che:
oc get pods -lstrimzi.io/cluster=names-cluster
You’ll see something like:
names-cluster-entity-operator-78686cdd4d-rfkwd 3/3 Running 0 6m50s
names-cluster-kafka-0 2/2 Running 0 7m41s
names-cluster-kafka-1 2/2 Running 0 7m41s
names-cluster-kafka-2 2/2 Running 1 7m41s
names-cluster-zookeeper-0 2/2 Running 0 8m31s
names-cluster-zookeeper-1 2/2 Running 0 8m31s
names-cluster-zookeeper-2 2/2 Running 0 8m31s
Don’t worry if they’re not all in the Running status, they will eventually complete and we’ll use them later on in this exercise.
In the Kafka cluster we created, we have configured no authentication for the plain
and tls
listeners and mTLS authentication for the external
listener.
plain
and tls
listeners are used for internal communication, while the as the name implies the external
listener is externally exposed using an OpenShift Route.
Recall the Kafka CR:
listeners: external: type: route authentication: type: tls plain: {} tls: {}
In the next two sections we will producing and receiving messages both inside the cluster using internal listener, as well as external communication the cluster using mTLS.
Launch Producer
oc -n <myuser-namespace> run kafka-producer -ti --image=strimzi/kafka:0.14.0-kafka-2.3.0 --rm=true --restart=Never -- bin/kafka-console-producer.sh --broker-list names-cluster-kafka-bootstrap:9092 --topic my-topic
Launch Consumer
oc -n <myuser-namespace> run kafka-consumer -ti --image=strimzi/kafka:0.14.0-kafka-2.3.0 --rm=true --restart=Never -- bin/kafka-console-consumer.sh --bootstrap-server names-cluster-kafka-bootstrap:9092 --topic my-topic --from-beginning
To test producing and consuming messages outside cluster, we will need to install Kafka binaries.
curl -O http://www.pirbot.com/mirrors/apache/kafka/2.3.1/kafka_2.12-2.3.1.tgz
tar zxvf kafka_2.12-2.3.1.tgz
ln -s kafka_2.12-2.3.1 kafka
Create a truststore containing Kafka CA certificates
oc extract secret/names-cluster-cluster-ca-cert
keytool -import -trustcacerts -alias root -file ca.crt -keystore truststore.jks -storepass 123456 -noprompt
Create a Kafka user
cat <<EOF | oc create -f -
apiVersion: kafka.strimzi.io/v1alpha1
kind: KafkaUser
metadata:
name: test
labels:
strimzi.io/cluster: names-cluster
spec:
authentication:
type: tls
EOF
Create a keystore with the client certificates required by mTLS
oc extract secret/test --keys=user.crt --to=- > user.crt
oc extract secret/test --keys=user.key --to=- > user.key
openssl pkcs12 -export -in user.crt -inkey user.key -name test -password pass:123456 -out user.p12
Obtain the Kafka bootstrap route
KAFKA_BOOTSTRAP=$(oc get route |awk '/boot/ {print $2}')
Run a producer
kafka/bin/kafka-console-producer.sh --broker-list $KAFKA_BOOTSTRAP:443 --producer-property security.protocol=SSL --producer-property ssl.truststore.password=123456 --producer-property ssl.truststore.location=./truststore.jks --producer-property ssl.keystore.password=123456 --producer-property ssl.keystore.location=./user.p12 --topic orders
Run a consumer (in a separate window)
kafka/bin/kafka-console-consumer.sh --bootstrap-server $KAFKA_BOOTSTRAP:443 --consumer-property security.protocol=SSL --consumer-property ssl.truststore.password=123456 --consumer-property ssl.truststore.location=./truststore.jks --consumer-property ssl.keystore.password=123456 --consumer-property ssl.keystore.location=./user.p12 --topic orders --from-beginning