The purpose of this module is to solve the problem of multiple keystores using Spring Boot + Kafka + Schema Registry + SSL
- Compatible with JDK 8, 11, 15, 16 and 17
- Compatible with schema-registry version 5.3.1 or later
- Compatible with avro version 1.9.1 or later
-
When
- We have a Spring Boot application exposing SSL end-points with a first distinct certificate;
- We have communication with Kafka via SSL with a second distinct certificate;
- We have the communication with Schema Registry with the same certificate used for communication with Kafka or a separate third party certificate;
-
Scenarios
Protocol Spring Boot Kafka Schema Registry Result SSL Yes Not Not Ok SSL Yes Yes Not Ok SSL Yes Yes Yes Fail SSL Not Yes Yes Ok SSL Not Not Yes Ok SSL Not Not Not Ok
The failure happens in a scenario where we would expect it to be fully functional, where the application uses one certificate to securely expose endpoints, and uses other certificates to communicate with Schema Registry and Kafka.
+-------------------+ +------------------------------+
| |<---json--->| Schema Registry + SSL + mTLS |
| | +------------------------------+
| Spring Boot + SSL |
| | +------------------------------+
| |<--binary-->| Kafka + SSL + mTLS |
+-------------------+ +------------------------------+
The problem happens because the kafka-avro-serializer
component uses the JVM variables javax.net.ssl.trustStore
, javax.net.ssl.keyStore
, javax.net.ssl.trustStorePassword
and javax.net.ssl.keyStorePassword
, and these variables apply to the whole application. As a consequence, if we use a certificate to publish the application API, it will be used by the kafka-avro-serializer
component.
It is intended that the application uses a certificate to expose its API and use a second certificate to communicate with the Schema Registry.
This chapter will show you how to use Kafka + Schema Registry over SSL.
1.1 Create certificates generate-certificates.sh
Run
./generate-certificates.sh
Expected output:
$./generate-certificates.sh
=> ROOT and CA
=> Generate the private keys (for root and ca)
=> Generate the root certificate
=> Generate certificate for ca signed by root (root -> ca)
=> Import ca cert chain into ca.jks
=> Kafka Server
=> Generate the private keys (for the server)
=> Generate certificate for the server signed by ca (root -> ca -> kafka-server)
=> Import the server cert chain into kafka.server.keystore.jks
=> Import the server cert chain into kafka.server.truststore.jks
=> Schema Registry Server
=> Generate the private keys (for schema-registry-server)
=> Generate certificate for the server signed by ca (root -> ca -> schema-registry-server)
=> Import the server cert chain into schema-registry.server.keystore.jks
=> Import the server cert chain into schema-registry.server.truststore.jks
=> Control Center Server
=> Generate the private keys (for control-center-server)
=> Generate certificate for the server signed by ca (root -> ca -> control-center-server)
=> Import the server cert chain into control-center.server.keystore.jks
=> Import the server cert chain into control-center.server.truststore.jks
=> Appplication Client
=> Generate the private keys (for application-client)
=> Generate certificate for the client signed by ca (root -> ca -> application-client)
=> Import the client cert chain into application.client.keystore.jks
=> Import the client cert chain into application.client.truststore.jks
=> Clean up
=> Move files
1.2 Running stack docker-compose-yml
Run
docker-compose up -d
Expected output:
Creating network "schema" with the default driver
Creating zookeeper ... done
Creating kafka ... done
Creating schema-registry ... done
Creating control-center ... done
You can pull it from the central Maven repositories:
<dependency>
<groupId>com.github.mvallim</groupId>
<artifactId>spring-schema-registry</artifactId>
<version>2.1.0</version>
</dependency>
If you want to try a snapshot version, add the following repository:
<repository>
<id>sonatype-snapshots</id>
<name>Sonatype Snapshots</name>
<url>https://oss.sonatype.org/content/repositories/snapshots</url>
<snapshots>
<enabled>true</enabled>
</snapshots>
</repository>
implementation 'com.github.mvallim:spring-schema-registry:2.1.0'
If you want to try a snapshot version, add the following repository:
repositories {
maven {
url "https://oss.sonatype.org/content/repositories/snapshots"
}
}
This Avro event deserializer is intended to deserialize events from a topic that have events registered in the Schema Registry and with the Avros imported into the application.
This deserializer does not handle events that are not registered in Schema Registry and/or Avros with compatibility problem, in these cases an exception will be thrown.
Attention: You must use YAML or PROPERTIES.
spring:
kafka:
bootstrap-servers: localhost:9092
properties:
security.protocol: SSL
auto.register.schemas: true
value.subject.name.strategy: io.confluent.kafka.serializers.subject.TopicRecordNameStrategy
schema.registry.url: https://localhost:8082
ssl:
protocol: SSL
key-password: changeit
key-store-location: file:certificates/application/application.client.keystore.jks
key-store-password: changeit
key-store-type: JKS
trust-store-location: file:certificates/application/application.client.truststore.jks
trust-store-password: changeit
trust-store-type: JKS
consumer:
properties:
max.poll.interval.ms: 3000
specific.avro.reader: true
group-id: people
auto-offset-reset: earliest
enable-auto-commit: true
key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
value-deserializer: org.springframework.schemaregistry.deserializer.SpecificKafkaAvroDeserializer
producer:
acks: all
key-serializer: org.apache.kafka.common.serialization.StringSerializer
value-serializer: org.springframework.schemaregistry.serializer.SpecificKafkaAvroSerializer
server:
port: 8443
ssl:
key-store: classpath:keystore.jks
key-store-password: changeit
keyStoreType: JKS
trust-store: classpath:truststore.jks
trust-store-password: changeit
trustStoreType: JKS
spring.kafka.bootstrap-servers=localhost:9092
spring.kafka.properties.auto.register.schemas=false
spring.kafka.properties.value.subject.name.strategy=io.confluent.kafka.serializers.subject.TopicRecordNameStrategy
spring.kafka.properties.schema.registry.url=https://localhost:8082
spring.kafka.properties.security.protocol=SSL
spring.kafka.properties.auto.register.schemas=true
spring.kafka.properties.value.subject.name.strategy=io.confluent.kafka.serializers.subject.TopicRecordNameStrategy
spring.kafka.ssl.protocol=SSL
spring.kafka.ssl.key-password=changeit
spring.kafka.ssl.key-store-location=file:certificates/application/application.client.keystore.jks
spring.kafka.ssl.key-store-password=changeit
spring.kafka.ssl.key-store-type=JKS
spring.kafka.ssl.trust-store-location=file:certificates/application/application.client.truststore.jks
spring.kafka.ssl.trust-store-password=changeit
spring.kafka.ssl.trust-store-type=JKS
spring.kafka.consumer.properties.max.poll.interval.ms=3000
spring.kafka.consumer.properties.specific.avro.reader=true
spring.kafka.consumer.group-id=people
spring.kafka.consumer.auto-offset-reset=earliest
spring.kafka.consumer.enable-auto-commit=true
spring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializer
spring.kafka.consumer.value-deserializer=org.springframework.schemaregistry.deserializer.SpecificKafkaAvroDeserializer
spring.kafka.producer.acks=all
spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer
spring.kafka.producer.value-serializer=org.springframework.schemaregistry.serializer.SpecificKafkaAvroSerializer
server.port=8443
server.ssl.key-store=classpath:keystore.jks
server.ssl.key-store-password=changeit
server.ssl.keyStoreType=JKS
server.ssl.trust-store=classpath:truststore.jks
server.ssl.trust-store-password=changeit
server.ssl.trustStoreType=JKS
This Avro event deserializer is intended to deserialize each and every event from a topic that is registered in the Schema Registry, with or without Avros imported into the application.
For example, currently the application is prepared to consume and deserialize events of type A
, but if some other event that the application is not prepared to consume occurs in this topic, it will be transformed into a GenericRecord
(provided that this type of event is registered in the Schema Registry). So it is possible to consume all events without deserialization errors. This prevents the application that wants to consume only certain types of events from a topic, from not forcing the import of all Avros (.svc) into its application.
This deserializer does not handle events that are not registered in Schema Registry and/or Avros with compatibility problem, in these cases an exception will be thrown.
Attention: You must use YAML or PROPERTIES.
spring:
kafka:
bootstrap-servers: localhost:9092
properties:
security.protocol: SSL
auto.register.schemas: true
value.subject.name.strategy: io.confluent.kafka.serializers.subject.TopicRecordNameStrategy
schema.registry.url: https://localhost:8082
ssl:
protocol: SSL
key-password: changeit
key-store-location: file:certificates/application/application.client.keystore.jks
key-store-password: changeit
key-store-type: JKS
trust-store-location: file:certificates/application/application.client.truststore.jks
trust-store-password: changeit
trust-store-type: JKS
consumer:
properties:
max.poll.interval.ms: 3000
specific.avro.reader: true
group-id: people
auto-offset-reset: earliest
enable-auto-commit: true
key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
value-deserializer: org.springframework.schemaregistry.deserializer.GenericKafkaAvroDeserializer
producer:
acks: all
key-serializer: org.apache.kafka.common.serialization.StringSerializer
value-serializer: org.springframework.schemaregistry.serializer.SpecificKafkaAvroSerializer
server:
port: 8443
ssl:
key-store: classpath:keystore.jks
key-store-password: changeit
keyStoreType: JKS
trust-store: classpath:truststore.jks
trust-store-password: changeit
trustStoreType: JKS
spring.kafka.bootstrap-servers=localhost:9092
spring.kafka.properties.auto.register.schemas=false
spring.kafka.properties.value.subject.name.strategy=io.confluent.kafka.serializers.subject.TopicRecordNameStrategy
spring.kafka.properties.schema.registry.url=https://localhost:8082
spring.kafka.properties.security.protocol=SSL
spring.kafka.properties.auto.register.schemas=true
spring.kafka.properties.value.subject.name.strategy=io.confluent.kafka.serializers.subject.TopicRecordNameStrategy
spring.kafka.ssl.protocol=SSL
spring.kafka.ssl.key-password=changeit
spring.kafka.ssl.key-store-location=file:certificates/application/application.client.keystore.jks
spring.kafka.ssl.key-store-password=changeit
spring.kafka.ssl.key-store-type=JKS
spring.kafka.ssl.trust-store-location=file:certificates/application/application.client.truststore.jks
spring.kafka.ssl.trust-store-password=changeit
spring.kafka.ssl.trust-store-type=JKS
spring.kafka.consumer.properties.max.poll.interval.ms=3000
spring.kafka.consumer.properties.specific.avro.reader=true
spring.kafka.consumer.group-id=people
spring.kafka.consumer.auto-offset-reset=earliest
spring.kafka.consumer.enable-auto-commit=true
spring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializer
spring.kafka.consumer.value-deserializer=org.springframework.schemaregistry.deserializer.GenericKafkaAvroDeserializer
spring.kafka.producer.acks=all
spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer
spring.kafka.producer.value-serializer=org.springframework.schemaregistry.serializer.SpecificKafkaAvroSerializer
server.port=8443
server.ssl.key-store=classpath:keystore.jks
server.ssl.key-store-password=changeit
server.ssl.keyStoreType=JKS
server.ssl.trust-store=classpath:truststore.jks
server.ssl.trust-store-password=changeit
server.ssl.trustStoreType=JKS
@Configuration
public class ProducerConfig {
@Bean
public ProducerFactory<String, GenericRecord> producerFactory(final KafkaProperties kafkaProperties) {
return new DefaultKafkaProducerFactory<>(kafkaProperties.buildProducerProperties());
}
@Bean
public KafkaTemplate<String, GenericRecord> kafkaTemplate(final ProducerFactory<String, GenericRecord> producerFactory) {
return new KafkaTemplate<>(producerFactory);
}
}
@EnableKafka
@Configuration
public class ConsumerConfig {
@Bean
public ConsumerFactory<String, GenericRecord> consumerFactory(final KafkaProperties kafkaProperties) {
return new DefaultKafkaConsumerFactory<>(kafkaProperties.buildConsumerProperties());
}
@Bean
public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, GenericRecord>> containerFactory(final ConsumerFactory<String, GenericRecord> consumerFactory) {
final ConcurrentKafkaListenerContainerFactory<String, GenericRecord> containerFactory = new ConcurrentKafkaListenerContainerFactory<>();
containerFactory.setConsumerFactory(consumerFactory);
containerFactory.setConcurrency(20);
return containerFactory;
}
}
Please read CONTRIBUTING.md for details on our code of conduct, and the process for submitting pull requests to us.
We use GitHub for versioning. For the versions available, see the tags on this repository.
- Marcos Vallim - Founder, Author, Development, Test, Documentation - mvallim
- Carlos Batistão - Development, Test, Documentation - cezbatistao
- Ricardo Comar - Development, Test - ricardo-comar
See also the list of contributors who participated in this project.
This project is licensed under the Apache License - see the LICENSE file for details