This directory includes projects demonstrating how to use the Java producer and consumer with Avro and Confluent Schema Registry
How to use these examples:
- ProducerExample.java: see Confluent Schema Registry tutorial
- ConsumerExample.java: see Confluent Schema Registry tutorial
-
Install Confluent platform locally using Docker: https://docs.confluent.io/current/quickstart/ce-docker-quickstart.html
-
Produce messages for schema v1 (missing future "region" field):
git co schema-v1 && mvn clean package exec:java -Dexec.mainClass=io.confluent.examples.clients.basicavro.ProducerExample
Note the automatically registered schema:
curl -X GET http://localhost:8081/subjects/ curl -X GET http://localhost:8081/subjects/transactions-value/versions
-
Consume messages for schema v1:
git co schema-v1 && mvn clean package exec:java -Dexec.mainClass=io.confluent.examples.clients.basicavro.ConsumerExample
-
Attempt to change Producer's schema to use a new, but incompatible version (no default value for "region"):
git co schema-v2-invalid && mvn clean package exec:java -Dexec.mainClass=io.confluent.examples.clients.basicavro.ProducerExample
Note the runtime error:
org.apache.kafka.common.errors.SerializationException: Error registering Avro schema: {"type":"record","name":"Payment","namespace":"io.confluent.examples.clients.basicavro","fields":[{"name":"id","type":"string","logicalType":"UUID"},{"name":"amount","type":"double"},{"name":"region","type":"string"}]} Caused by: io.confluent.kafka.schemaregistry.client.rest.exceptions.RestClientException: Schema being registered is incompatible with an earlier schema; error code: 409
-
Consume schema v1 messages for schema v2:
git co schema-v2 && mvn clean package exec:java -Dexec.mainClass=io.confluent.examples.clients.basicavro.ConsumerExample
Note that consumer accepts schema v1 messages, using default value.
-
Produce messages for schema 2 (contains "region" field):
git co schema-v2 && mvn clean package exec:java -Dexec.mainClass=io.confluent.examples.clients.basicavro.ProducerExample
Note that v1 consumer accepts schema v2 messages, ignoring new region field value. Note that v2 consumer accepts schema v2 messages, showing new region field value.
- To replay consumers with existing messages (of any schema version), first terminate consumer processes, wait for consume group to become "stable", and then run:
/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --reset-offsets --to-earliest --all-topics --execute --group transactions-v1 /kafka-consumer-groups.sh --bootstrap-server localhost:9092 --reset-offsets --to-earliest --all-topics --execute --group transactions-v2
- To run through demo again, first reset Kafka state (topic and schemas):
kafka-topics.sh --zookeeper localhost:2181/ --delete --topic transaction
curl -X DELETE http://localhost:8081/subjects/transactions-value
- To modify and test other schema compatibility level checking by Confluent Schema Registry:
See:
$ curl -X GET http://localhost:8081/config {"compatibilityLevel":"BACKWARD"} $ $ curl -X PUT -H "Content-Type: application/json" -d '{"compatibility":"FULL_TRANSITIVE"}' http://localhost:8081/config
- To view Avro-encoded messages using Kafka CLI tools:
(The Docker container
docker exec -it schema-registry bash kafka-avro-console-consumer --bootstrap-server broker:29092 --offset earliest --topic transactions --partition 0
schema-registry
has thekafka-avro-console-consumer
CLI installed)