Skip to content
This repository has been archived by the owner on Jul 27, 2023. It is now read-only.

Commit

Permalink
java-spring-boot2: add kafka template (#742)
Browse files Browse the repository at this point in the history
* java-spring-boot2 - add kafka template

* parameterize KafkaTemplate and some clean up

* java-spring-boot2 - changes in README.md

* java-spring-boot2 - update readme with TLS support

* java-spring-boot2 - clean up

Co-authored-by: Sandy Koh <sandykoh99@gmail.com>
  • Loading branch information
yharish991 and skoh7645 authored Apr 15, 2020
1 parent 1ed4340 commit 516d028
Show file tree
Hide file tree
Showing 13 changed files with 515 additions and 1 deletion.
2 changes: 1 addition & 1 deletion incubator/java-spring-boot2/stack.yaml
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
name: Spring Boot®
version: 0.3.28
version: 0.3.29
description: Spring Boot using OpenJ9 and Maven
license: Apache-2.0
language: java
Expand Down
28 changes: 28 additions & 0 deletions incubator/java-spring-boot2/templates/kafka/.gitignore
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
/target/*
.appsody-spring-trigger

### STS ###
.apt_generated
.classpath
.factorypath
.project
.settings
.springBeans
.sts4-cache

### IntelliJ IDEA ###
.idea
*.iws
*.iml
*.ipr

### NetBeans ###
/nbproject/private/
/nbbuild/
/dist/
/nbdist/
/.nb-gradle/
/build/

### VS Code ###
.vscode/
169 changes: 169 additions & 0 deletions incubator/java-spring-boot2/templates/kafka/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,169 @@
# Kafka Template

The java-spring-boot2 `kafka` template provides a consistent way of developing Spring Boot applications which connect to Kafka. This template is an extension of `default` template and uses [spring-kafka](https://spring.io/projects/spring-kafka#overview) to connect to the Kafka instance running on Kubernetes managed by [Strimzi](https://strimzi.io/) Kafka operator.

The `kafka` template provides a `pom.xml` file that references the parent POM defined by the stack, dependencies that enables the Spring boot application to connect to Kafka, simple producer that publishes a message to the Kafka topic and a simple consumer that consumes the messages published on to Kafka topic by the producer. It also provides a basic liveness endpoint, and a set of unit tests that ensure enabled actuator endpoints work properly: `/actuator/health`, `/actuator/metric`, `/actuator/prometheus` and `/actuator/liveness`

## Getting Started

1. Create a new folder in your local directory and initialize it using the Appsody CLI, e.g.:

```
mkdir my-project
cd my-project
appsody init java-spring-boot2 kafka
```
This will initialize a Spring Boot 2 project using the kafka template.

2. Once your project has been initialized you can then run your application using the following command:

```
appsody run --docker-options "--env KAFKA_BOOTSTRAP_SERVERS=${KAFKA_BOOTSTRAP_SERVERS}"
```
E.g:
```
appsody run --network kafka_default --docker-options "--env KAFKA_BOOTSTRAP_SERVERS=kafka:9092"
```
`DOCKER_NETWORK_NAME` is the name of the docker network in which the kafka container is running.

This template expects `KAFKA_BOOTSTRAP_SERVERS` environment variable to be set to addresses of the bootstrap servers of kafka.

This launches a Docker container that will run your application in the foreground, exposing it on port 8080. You should see that the producer publishes message to the kafka topic and the consumer reads it. The application will be restarted automatically when changes are detected.

3. You should be able to access the following endpoints, as they are exposed by your template application by default:

* Health endpoint: http://localhost:8080/actuator/health
* Liveness endpoint: http://localhost:8080/actuator/liveness
* Metrics endpoint: http://localhost:8080/actuator/metrics
* Prometheus endpoint: http://localhost:8080/actuator/prometheus

4. To deploy the application to Kubernetes run the following command:
```
appsody deploy
```
Make sure to add the `KAFKA_BOOTSTRAP_SERVERS` environment variable in the `app-deploy.yaml` before running the above command

```
env:
- name: KAFKA_BOOTSTRAP_SERVERS
value: ${KAFKA_BOOTSTRAP_SERVERS}
```

If you are trying to connect to a Kafka instance managed by Strimzi Kafka operator, the value of `KAFKA_BOOTSTRAP_SERVERS` should be a fully qualified service hostname.

E.g: my-cluster-kafka-bootstrap.strimzi.svc.cluster.local:9092

* `my-cluster` is the Kafka resource name.
* `kafka-bootstrap` is the Broker load balancer name.
* `strimzi` is the namespace in which Kafka instance is deployed.
* `9092` is the PLAINTEXT port.

5. To deploy the application that connects to kafka managed by Strimzi operator where the brokers support TLS Client authentication

Add the following properties to `application.properties`

```
spring.kafka.properties.security.protocol=ssl
spring.kafka.properties.ssl.protocol=ssl
spring.kafka.properties.ssl.truststore.location=/etc/secrets/keystores/truststore.p12
spring.kafka.properties.ssl.truststore.password=changeit
spring.kafka.properties.ssl.truststore.type=${TRUSTSTORE_PASSWORD}
spring.kafka.properties.ssl.keystore.location=/etc/secrets/keystores/keystore.p12
spring.kafka.properties.ssl.keystore.password=${KEYSTORE_PASSWORD}
spring.kafka.properties.ssl.keystore.type=PKCS12
spring.kafka.properties.ssl.key.password=${KEYSTORE_PASSWORD}
spring.kafka.properties.ssl.endpoint.identification.algorithm=
```

`TRUSTSTORE_PASSWORD` is the password that you have used when creating the truststore.

`KEYSTORE_PASSWORD` is the password that you have used when creating the keystore.

Next, add the following in the `app-deploy.yaml` under `spec` section

* Add the following volumes

```
volumes:
# emptyDir volume to store the keystore and truststore files so that the application container can eventually read them.
- emtpyDir: {}
name: keystore-volume
# this is the secret that is created when the kafka user is created
- name: my-user-credentials
secret:
secretName: my-user
# secret that holds CA certificate created by the operator for the brokers
- name: my-cluster-cluster-ca-cert
secret:
secretName: my-cluster-cluster-ca-cert
```
* Volume mount the `keystore-volume`

```
volumeMounts:
- mountPath: /etc/secrets/keystores
name: keystore-volume
```
* Add `KAFKA_BOOTSTRAP_SERVERS` environment variable. E.g.:

```
env:
- name: KAFKA_BOOTSTRAP_SERVERS
value: my-cluster-kafka-bootstrap.kafka.svc.cluster.local:9093
```
`9093` is the TLS port.

* Add `initContainers` that generate the keystore and truststore which will eventually be used by the application container.

```
initContainers:
- args:
- -c
- echo $ca_bundle && csplit -z -f crt- $ca_bundle '/-----BEGIN CERTIFICATE-----/'
'{*}' && for file in crt-*; do keytool -import -noprompt -keystore $truststore_jks
-file $file -storepass $password -storetype PKCS12 -alias service-$file; done
command:
- /bin/bash
env:
- name: ca_bundle
value: /etc/secrets/my-cluster-cluster-ca-cert/ca.crt
- name: truststore_jks
value: /etc/secrets/keystores/truststore.p12
- name: password
value: ${TRUSTSTORE_PASSWORD}
image: registry.access.redhat.com/redhat-sso-7/sso71-openshift:1.1-16
name: pem-to-truststore
volumeMounts:
- mountPath: /etc/secrets/keystores
name: keystore-volume
- mountPath: /etc/secrets/my-user
name: my-user-credentials
readOnly: true
- mountPath: /etc/secrets/my-cluster-cluster-ca-cert
name: my-cluster-cluster-ca-cert
readOnly: true
- args:
- -c
- openssl pkcs12 -export -inkey $keyfile -in $crtfile -out $keystore_pkcs12 -password
pass:$password -name "name"
command:
- /bin/bash
env:
- name: keyfile
value: /etc/secrets/my-user/user.key
- name: crtfile
value: /etc/secrets/my-user/user.crt
- name: keystore_pkcs12
value: /etc/secrets/keystores/keystore.p12
- name: password
value: ${KEYSTORE_PASSWORD}
image: registry.access.redhat.com/redhat-sso-7/sso71-openshift:1.1-16
name: pem-to-keystore
volumeMounts:
- mountPath: /etc/secrets/keystores
name: keystore-volume
- mountPath: /etc/secrets/my-user
name: my-user-credentials
readOnly: true
```
** Here `my-user` is the kafka user and `my-cluster` is the kafka cluster name.
46 changes: 46 additions & 0 deletions incubator/java-spring-boot2/templates/kafka/pom.xml
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>

<parent><!--required parent POM-->
<groupId>{{.stack.parentpomgroup}}</groupId>
<artifactId>{{.stack.parentpomid}}</artifactId>
<version>{{.stack.parentpomrange}}</version>
<relativePath/>
</parent>

<groupId>dev.appsody</groupId>
<artifactId>default-kafka-application</artifactId>
<version>0.0.1-SNAPSHOT</version>
<packaging>jar</packaging>

<!-- versions will come from the parent pom (and included bom):
mvn dependency:tree
mvn dependency:display-ancestors
mvn help:effective-pom | grep '\.version>'
-->

<dependencies>
<!-- From parent:
org.springframework.boot:spring-boot-starter
org.springframework.boot:spring-boot-starter-actuator
org.springframework.boot:spring-boot-starter-test
-->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka-test</artifactId>
<scope>test</scope>
</dependency>

</dependencies>

</project>
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
package application;

import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Service;

import java.util.concurrent.CountDownLatch;

// a simple kafka consumer
@Service
public class KafkaConsumer {
private final CountDownLatch countDownLatch = new CountDownLatch(1);

@KafkaListener(topics = "orders", groupId = "orders-service")
public void receiveString(String message) {
System.out.println("Receiving message = " + message);
countDownLatch.countDown();
}

public CountDownLatch getCountDownLatch() {
return countDownLatch;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
/*
* Copyright © 2019 IBM Corp. All rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file
* except in compliance with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the
* License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
* either express or implied. See the License for the specific language governing permissions
* and limitations under the License.
*/

package application;

import org.springframework.boot.actuate.endpoint.annotation.Endpoint;
import org.springframework.boot.actuate.endpoint.annotation.ReadOperation;
import org.springframework.stereotype.Component;

// Simple custom liveness check
@Endpoint(id = "liveness")
@Component
public class LivenessEndpoint {

@ReadOperation
public String testLiveness() {
return "{\"status\":\"UP\"}";
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
package application;

import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;

@SpringBootApplication
public class Main {

public static void main(String[] args) {
SpringApplication.run(Main.class, args);
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
package application.config;

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.CommandLineRunner;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.core.KafkaTemplate;

import application.KafkaConsumer;

import java.util.UUID;

@Configuration
public class KafkaProducer {

@Autowired
KafkaTemplate<String, String> kafkaTemplate;

private static String TOPIC_NAME = "orders";

// a simple kafka producer that publishes a message to the "orders" topic after the application is initialized
@Bean
public CommandLineRunner kafkaCommandLineRunner(KafkaConsumer kafkaConsumer) {
return args -> {
String data = "testData:" + UUID.randomUUID();
System.out.println("Sending message to kafka = " + data);
kafkaTemplate.send(TOPIC_NAME, data);
kafkaConsumer.getCountDownLatch().await();
};
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
# spring.embedded.kafka.brokers system property is set by Embedded Kafka server to the addresses of the bootstrap servers
spring.kafka.consumer.bootstrap-servers=${spring.embedded.kafka.brokers}
spring.kafka.consumer.group-id=orders-service
spring.kafka.consumer.auto-offset-reset=earliest
spring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializer
spring.kafka.consumer.value-deserializer=org.apache.kafka.common.serialization.StringDeserializer

# spring.embedded.kafka.brokers system property is set by Embedded Kafka server to the addresses of the bootstrap servers
spring.kafka.producer.bootstrap-servers=${spring.embedded.kafka.brokers}
spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer
spring.kafka.producer.value-serializer=org.apache.kafka.common.serialization.StringSerializer
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
#enable the actuator endpoints for health, metrics, and prometheus.
management.endpoints.web.exposure.include=health,metrics,prometheus,liveness
opentracing.jaeger.log-spans=false

spring.kafka.consumer.bootstrap-servers=${KAFKA_BOOTSTRAP_SERVERS}
spring.kafka.consumer.group-id=orders-service
spring.kafka.consumer.auto-offset-reset=earliest
spring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializer
spring.kafka.consumer.value-deserializer=org.apache.kafka.common.serialization.StringDeserializer

spring.kafka.producer.bootstrap-servers=${KAFKA_BOOTSTRAP_SERVERS}
spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer
spring.kafka.producer.value-serializer=org.apache.kafka.common.serialization.StringSerializer
spring.kafka.consumer.enable-auto-commit=true
Loading

0 comments on commit 516d028

Please sign in to comment.