diff --git a/src/main/java/io/confluent/examples/streams/GlobalKTablesExample.java b/src/main/java/io/confluent/examples/streams/GlobalKTablesExample.java index c25aa582b..8e01e8c8c 100644 --- a/src/main/java/io/confluent/examples/streams/GlobalKTablesExample.java +++ b/src/main/java/io/confluent/examples/streams/GlobalKTablesExample.java @@ -19,7 +19,7 @@ import io.confluent.examples.streams.avro.EnrichedOrder; import io.confluent.examples.streams.avro.Order; import io.confluent.examples.streams.avro.Product; -import io.confluent.kafka.serializers.AbstractKafkaAvroSerDeConfig; +import io.confluent.kafka.serializers.AbstractKafkaSchemaSerDeConfig; import io.confluent.kafka.streams.serdes.avro.SpecificAvroSerde; import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.common.serialization.Serdes; @@ -145,7 +145,7 @@ public static KafkaStreams createStreams(final String bootstrapServers, // create and configure the SpecificAvroSerdes required in this example final SpecificAvroSerde orderSerde = new SpecificAvroSerde<>(); final Map serdeConfig = - Collections.singletonMap(AbstractKafkaAvroSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, + Collections.singletonMap(AbstractKafkaSchemaSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, schemaRegistryUrl); orderSerde.configure(serdeConfig, false); diff --git a/src/main/java/io/confluent/examples/streams/JsonToAvroExample.java b/src/main/java/io/confluent/examples/streams/JsonToAvroExample.java index 24766c345..90b4bdb6a 100644 --- a/src/main/java/io/confluent/examples/streams/JsonToAvroExample.java +++ b/src/main/java/io/confluent/examples/streams/JsonToAvroExample.java @@ -18,7 +18,7 @@ import com.fasterxml.jackson.databind.JsonNode; import com.fasterxml.jackson.databind.ObjectMapper; import io.confluent.examples.streams.avro.WikiFeed; -import io.confluent.kafka.serializers.AbstractKafkaAvroSerDeConfig; +import io.confluent.kafka.serializers.AbstractKafkaSchemaSerDeConfig; import io.confluent.kafka.streams.serdes.avro.SpecificAvroSerde; import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.common.serialization.Serdes; @@ -98,7 +98,7 @@ static KafkaStreams buildJsonToAvroStream(final String bootstrapServers, streamsConfiguration.put(StreamsConfig.CLIENT_ID_CONFIG, "json-to-avro-stream-conversion-client"); streamsConfiguration.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers); // Where to find the Confluent schema registry instance(s) - streamsConfiguration.put(AbstractKafkaAvroSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, schemaRegistryUrl); + streamsConfiguration.put(AbstractKafkaSchemaSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, schemaRegistryUrl); streamsConfiguration.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, SpecificAvroSerde.class); streamsConfiguration.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName()); diff --git a/src/main/java/io/confluent/examples/streams/JsonToAvroExampleDriver.java b/src/main/java/io/confluent/examples/streams/JsonToAvroExampleDriver.java index 272efbe9c..e7819d0d0 100644 --- a/src/main/java/io/confluent/examples/streams/JsonToAvroExampleDriver.java +++ b/src/main/java/io/confluent/examples/streams/JsonToAvroExampleDriver.java @@ -17,7 +17,7 @@ import com.fasterxml.jackson.databind.ObjectMapper; import io.confluent.examples.streams.avro.WikiFeed; -import io.confluent.kafka.serializers.AbstractKafkaAvroSerDeConfig; +import io.confluent.kafka.serializers.AbstractKafkaSchemaSerDeConfig; import io.confluent.kafka.serializers.KafkaAvroDeserializerConfig; import io.confluent.kafka.streams.serdes.avro.SpecificAvroDeserializer; import java.time.Duration; @@ -73,7 +73,7 @@ private static void produceJsonInputs(final String bootstrapServers, final Strin props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers); props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class); props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class); - props.put(AbstractKafkaAvroSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, schemaRegistryUrl); + props.put(AbstractKafkaSchemaSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, schemaRegistryUrl); try (final KafkaProducer producer = new KafkaProducer<>(props)) { Arrays.stream(users).map(user -> { @@ -98,7 +98,7 @@ private static void produceJsonInputs(final String bootstrapServers, final Strin private static void consumeAvroOutput(final String bootstrapServers, final String schemaRegistryUrl) { final Properties consumerProperties = new Properties(); consumerProperties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers); - consumerProperties.put(AbstractKafkaAvroSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, schemaRegistryUrl); + consumerProperties.put(AbstractKafkaSchemaSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, schemaRegistryUrl); consumerProperties.put(ConsumerConfig.GROUP_ID_CONFIG, "json-to-avro-group"); consumerProperties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); consumerProperties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, SpecificAvroDeserializer.class); diff --git a/src/main/java/io/confluent/examples/streams/PageViewRegionExample.java b/src/main/java/io/confluent/examples/streams/PageViewRegionExample.java index 95b924dad..2e9140732 100644 --- a/src/main/java/io/confluent/examples/streams/PageViewRegionExample.java +++ b/src/main/java/io/confluent/examples/streams/PageViewRegionExample.java @@ -15,7 +15,7 @@ */ package io.confluent.examples.streams; -import io.confluent.kafka.serializers.AbstractKafkaAvroSerDeConfig; +import io.confluent.kafka.serializers.AbstractKafkaSchemaSerDeConfig; import io.confluent.kafka.streams.serdes.avro.GenericAvroSerde; import org.apache.avro.Schema; import org.apache.avro.generic.GenericData; @@ -39,7 +39,6 @@ import java.io.InputStream; import java.time.Duration; import java.util.Properties; -import java.util.concurrent.TimeUnit; /** * Demonstrates how to perform a join between a KStream and a KTable, i.e. an example of a stateful @@ -128,7 +127,7 @@ public static void main(final String[] args) throws Exception { // Where to find Kafka broker(s). streamsConfiguration.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers); // Where to find the Confluent schema registry instance(s) - streamsConfiguration.put(AbstractKafkaAvroSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, schemaRegistryUrl); + streamsConfiguration.put(AbstractKafkaSchemaSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, schemaRegistryUrl); // Specify default (de)serializers for record keys and for record values. streamsConfiguration.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName()); streamsConfiguration.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, GenericAvroSerde.class); diff --git a/src/main/java/io/confluent/examples/streams/PageViewRegionExampleDriver.java b/src/main/java/io/confluent/examples/streams/PageViewRegionExampleDriver.java index d6c997472..d8933c5b3 100644 --- a/src/main/java/io/confluent/examples/streams/PageViewRegionExampleDriver.java +++ b/src/main/java/io/confluent/examples/streams/PageViewRegionExampleDriver.java @@ -15,7 +15,7 @@ */ package io.confluent.examples.streams; -import io.confluent.kafka.serializers.AbstractKafkaAvroSerDeConfig; +import io.confluent.kafka.serializers.AbstractKafkaSchemaSerDeConfig; import org.apache.avro.Schema; import org.apache.avro.generic.GenericData; import org.apache.avro.generic.GenericRecord; @@ -71,7 +71,7 @@ private static void produceInputs(final String bootstrapServers, final String sc props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers); props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class); props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, io.confluent.kafka.serializers.KafkaAvroSerializer.class); - props.put(AbstractKafkaAvroSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, schemaRegistryUrl); + props.put(AbstractKafkaSchemaSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, schemaRegistryUrl); final GenericRecordBuilder pageViewBuilder = new GenericRecordBuilder(loadSchema("pageview.avsc")); diff --git a/src/main/java/io/confluent/examples/streams/PageViewRegionLambdaExample.java b/src/main/java/io/confluent/examples/streams/PageViewRegionLambdaExample.java index 80c49b544..27d8b1230 100644 --- a/src/main/java/io/confluent/examples/streams/PageViewRegionLambdaExample.java +++ b/src/main/java/io/confluent/examples/streams/PageViewRegionLambdaExample.java @@ -15,7 +15,7 @@ */ package io.confluent.examples.streams; -import io.confluent.kafka.serializers.AbstractKafkaAvroSerDeConfig; +import io.confluent.kafka.serializers.AbstractKafkaSchemaSerDeConfig; import io.confluent.kafka.streams.serdes.avro.GenericAvroSerde; import org.apache.avro.Schema; import org.apache.avro.generic.GenericData; @@ -124,7 +124,7 @@ public static void main(final String[] args) throws Exception { // Where to find Kafka broker(s). streamsConfiguration.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers); // Where to find the Confluent schema registry instance(s) - streamsConfiguration.put(AbstractKafkaAvroSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, schemaRegistryUrl); + streamsConfiguration.put(AbstractKafkaSchemaSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, schemaRegistryUrl); // Specify default (de)serializers for record keys and for record values. streamsConfiguration.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName()); streamsConfiguration.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, GenericAvroSerde.class); diff --git a/src/main/java/io/confluent/examples/streams/SessionWindowsExample.java b/src/main/java/io/confluent/examples/streams/SessionWindowsExample.java index f47b281fd..f3dd47237 100644 --- a/src/main/java/io/confluent/examples/streams/SessionWindowsExample.java +++ b/src/main/java/io/confluent/examples/streams/SessionWindowsExample.java @@ -16,7 +16,7 @@ package io.confluent.examples.streams; import io.confluent.examples.streams.avro.PlayEvent; -import io.confluent.kafka.serializers.AbstractKafkaAvroSerDeConfig; +import io.confluent.kafka.serializers.AbstractKafkaSchemaSerDeConfig; import io.confluent.kafka.streams.serdes.avro.SpecificAvroSerde; import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.common.serialization.Serdes; @@ -115,7 +115,7 @@ public static void main(final String[] args) { final String bootstrapServers = args.length > 0 ? args[0] : "localhost:9092"; final String schemaRegistryUrl = args.length > 1 ? args[1] : "http://localhost:8081"; final KafkaStreams streams = new KafkaStreams( - buildTopology(singletonMap(AbstractKafkaAvroSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, schemaRegistryUrl)), + buildTopology(singletonMap(AbstractKafkaSchemaSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, schemaRegistryUrl)), streamsConfig(bootstrapServers, "/tmp/kafka-streams") ); diff --git a/src/main/java/io/confluent/examples/streams/SessionWindowsExampleDriver.java b/src/main/java/io/confluent/examples/streams/SessionWindowsExampleDriver.java index 0adc87c86..47e868f21 100644 --- a/src/main/java/io/confluent/examples/streams/SessionWindowsExampleDriver.java +++ b/src/main/java/io/confluent/examples/streams/SessionWindowsExampleDriver.java @@ -16,7 +16,7 @@ package io.confluent.examples.streams; import io.confluent.examples.streams.avro.PlayEvent; -import io.confluent.kafka.serializers.AbstractKafkaAvroSerDeConfig; +import io.confluent.kafka.serializers.AbstractKafkaSchemaSerDeConfig; import io.confluent.kafka.streams.serdes.avro.SpecificAvroSerializer; import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.clients.consumer.ConsumerRecords; @@ -61,7 +61,7 @@ private static void producePlayEvents(final String bootstrapServers, final Strin final SpecificAvroSerializer playEventSerializer = new SpecificAvroSerializer<>(); final Map serdeConfig = Collections.singletonMap( - AbstractKafkaAvroSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, schemaRegistryUrl); + AbstractKafkaSchemaSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, schemaRegistryUrl); playEventSerializer.configure(serdeConfig, false); final Properties producerProperties = new Properties(); diff --git a/src/main/java/io/confluent/examples/streams/TopArticlesExampleDriver.java b/src/main/java/io/confluent/examples/streams/TopArticlesExampleDriver.java index b54674f89..652adbcb2 100644 --- a/src/main/java/io/confluent/examples/streams/TopArticlesExampleDriver.java +++ b/src/main/java/io/confluent/examples/streams/TopArticlesExampleDriver.java @@ -15,7 +15,7 @@ */ package io.confluent.examples.streams; -import io.confluent.kafka.serializers.AbstractKafkaAvroSerDeConfig; +import io.confluent.kafka.serializers.AbstractKafkaSchemaSerDeConfig; import org.apache.avro.Schema; import org.apache.avro.generic.GenericRecord; import org.apache.avro.generic.GenericRecordBuilder; @@ -79,7 +79,7 @@ private static void produceInputs(final String bootstrapServers, final String sc StringSerializer.class); props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, io.confluent.kafka.serializers.KafkaAvroSerializer.class); - props.put(AbstractKafkaAvroSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, schemaRegistryUrl); + props.put(AbstractKafkaSchemaSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, schemaRegistryUrl); final KafkaProducer producer = new KafkaProducer<>(props); final GenericRecordBuilder pageViewBuilder = @@ -108,7 +108,7 @@ private static void consumeOutput(final String bootstrapServers, final String sc consumerProperties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); consumerProperties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); - consumerProperties.put(AbstractKafkaAvroSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, schemaRegistryUrl); + consumerProperties.put(AbstractKafkaSchemaSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, schemaRegistryUrl); consumerProperties.put(ConsumerConfig.GROUP_ID_CONFIG, "top-articles-lambda-example-consumer"); consumerProperties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); diff --git a/src/main/java/io/confluent/examples/streams/TopArticlesLambdaExample.java b/src/main/java/io/confluent/examples/streams/TopArticlesLambdaExample.java index 74d5990dd..252adccf1 100644 --- a/src/main/java/io/confluent/examples/streams/TopArticlesLambdaExample.java +++ b/src/main/java/io/confluent/examples/streams/TopArticlesLambdaExample.java @@ -16,7 +16,7 @@ package io.confluent.examples.streams; import io.confluent.examples.streams.utils.PriorityQueueSerde; -import io.confluent.kafka.serializers.AbstractKafkaAvroSerDeConfig; +import io.confluent.kafka.serializers.AbstractKafkaSchemaSerDeConfig; import io.confluent.kafka.streams.serdes.avro.GenericAvroSerde; import org.apache.avro.Schema; import org.apache.avro.generic.GenericData; @@ -144,7 +144,7 @@ static KafkaStreams buildTopArticlesStream(final String bootstrapServers, // Where to find Kafka broker(s). streamsConfiguration.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers); // Where to find the Confluent schema registry instance(s) - streamsConfiguration.put(AbstractKafkaAvroSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, schemaRegistryUrl); + streamsConfiguration.put(AbstractKafkaSchemaSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, schemaRegistryUrl); // Specify default (de)serializers for record keys and for record values. streamsConfiguration.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName()); streamsConfiguration.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, GenericAvroSerde.class); @@ -157,7 +157,7 @@ static KafkaStreams buildTopArticlesStream(final String bootstrapServers, final Serde stringSerde = Serdes.String(); final Map serdeConfig = Collections.singletonMap( - AbstractKafkaAvroSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, schemaRegistryUrl); + AbstractKafkaSchemaSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, schemaRegistryUrl); final Serde keyAvroSerde = new GenericAvroSerde(); keyAvroSerde.configure(serdeConfig, true); diff --git a/src/main/java/io/confluent/examples/streams/WikipediaFeedAvroExample.java b/src/main/java/io/confluent/examples/streams/WikipediaFeedAvroExample.java index dcb2aaf45..d031ce338 100644 --- a/src/main/java/io/confluent/examples/streams/WikipediaFeedAvroExample.java +++ b/src/main/java/io/confluent/examples/streams/WikipediaFeedAvroExample.java @@ -17,7 +17,7 @@ import io.confluent.examples.streams.avro.WikiFeed; import io.confluent.examples.streams.utils.MonitoringInterceptorUtils; -import io.confluent.kafka.serializers.AbstractKafkaAvroSerDeConfig; +import io.confluent.kafka.serializers.AbstractKafkaSchemaSerDeConfig; import io.confluent.kafka.streams.serdes.avro.SpecificAvroSerde; import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.common.serialization.Serde; @@ -119,7 +119,7 @@ static KafkaStreams buildWikipediaFeed(final String bootstrapServers, // Where to find Kafka broker(s). streamsConfiguration.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers); // Where to find the Confluent schema registry instance(s) - streamsConfiguration.put(AbstractKafkaAvroSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, schemaRegistryUrl); + streamsConfiguration.put(AbstractKafkaSchemaSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, schemaRegistryUrl); // Specify default (de)serializers for record keys and for record values. streamsConfiguration.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName()); streamsConfiguration.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, SpecificAvroSerde.class); diff --git a/src/main/java/io/confluent/examples/streams/WikipediaFeedAvroExampleDriver.java b/src/main/java/io/confluent/examples/streams/WikipediaFeedAvroExampleDriver.java index 15382735d..fbb1e03c2 100644 --- a/src/main/java/io/confluent/examples/streams/WikipediaFeedAvroExampleDriver.java +++ b/src/main/java/io/confluent/examples/streams/WikipediaFeedAvroExampleDriver.java @@ -16,7 +16,7 @@ package io.confluent.examples.streams; import io.confluent.examples.streams.avro.WikiFeed; -import io.confluent.kafka.serializers.AbstractKafkaAvroSerDeConfig; +import io.confluent.kafka.serializers.AbstractKafkaSchemaSerDeConfig; import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecords; @@ -28,7 +28,6 @@ import org.apache.kafka.common.serialization.StringDeserializer; import org.apache.kafka.common.serialization.StringSerializer; -import java.io.IOException; import java.time.Duration; import java.util.Collections; import java.util.Properties; @@ -71,7 +70,7 @@ private static void produceInputs(final String bootstrapServers, final String sc StringSerializer.class); props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, io.confluent.kafka.serializers.KafkaAvroSerializer.class); - props.put(AbstractKafkaAvroSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, schemaRegistryUrl); + props.put(AbstractKafkaSchemaSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, schemaRegistryUrl); final KafkaProducer producer = new KafkaProducer<>(props); final Random random = new Random(); @@ -89,7 +88,7 @@ private static void consumeOutput(final String bootstrapServers, final String sc consumerProperties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers); consumerProperties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); consumerProperties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); - consumerProperties.put(AbstractKafkaAvroSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, schemaRegistryUrl); + consumerProperties.put(AbstractKafkaSchemaSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, schemaRegistryUrl); consumerProperties.put(ConsumerConfig.GROUP_ID_CONFIG, "wikipedia-feed-example-consumer"); consumerProperties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); final KafkaConsumer consumer = new KafkaConsumer<>(consumerProperties, diff --git a/src/main/java/io/confluent/examples/streams/WikipediaFeedAvroLambdaExample.java b/src/main/java/io/confluent/examples/streams/WikipediaFeedAvroLambdaExample.java index b6b376039..329c54e8e 100644 --- a/src/main/java/io/confluent/examples/streams/WikipediaFeedAvroLambdaExample.java +++ b/src/main/java/io/confluent/examples/streams/WikipediaFeedAvroLambdaExample.java @@ -16,7 +16,7 @@ package io.confluent.examples.streams; import io.confluent.examples.streams.avro.WikiFeed; -import io.confluent.kafka.serializers.AbstractKafkaAvroSerDeConfig; +import io.confluent.kafka.serializers.AbstractKafkaSchemaSerDeConfig; import io.confluent.kafka.streams.serdes.avro.SpecificAvroSerde; import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.common.serialization.Serde; @@ -107,7 +107,7 @@ static KafkaStreams buildWikipediaFeed(final String bootstrapServers, // Where to find Kafka broker(s). streamsConfiguration.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers); // Where to find the Confluent schema registry instance(s) - streamsConfiguration.put(AbstractKafkaAvroSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, schemaRegistryUrl); + streamsConfiguration.put(AbstractKafkaSchemaSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, schemaRegistryUrl); // Specify default (de)serializers for record keys and for record values. streamsConfiguration.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName()); streamsConfiguration.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, SpecificAvroSerde.class); diff --git a/src/main/java/io/confluent/examples/streams/interactivequeries/kafkamusic/KafkaMusicExample.java b/src/main/java/io/confluent/examples/streams/interactivequeries/kafkamusic/KafkaMusicExample.java index c7f5bf883..337683d29 100644 --- a/src/main/java/io/confluent/examples/streams/interactivequeries/kafkamusic/KafkaMusicExample.java +++ b/src/main/java/io/confluent/examples/streams/interactivequeries/kafkamusic/KafkaMusicExample.java @@ -18,7 +18,7 @@ import io.confluent.examples.streams.avro.PlayEvent; import io.confluent.examples.streams.avro.Song; import io.confluent.examples.streams.avro.SongPlayCount; -import io.confluent.kafka.serializers.AbstractKafkaAvroSerDeConfig; +import io.confluent.kafka.serializers.AbstractKafkaSchemaSerDeConfig; import io.confluent.kafka.streams.serdes.avro.SpecificAvroSerde; import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.common.serialization.Deserializer; @@ -200,7 +200,7 @@ public static void main(final String[] args) throws Exception { System.out.println("REST endpoint at http://" + restEndpointHostname + ":" + restEndpointPort); final KafkaStreams streams = new KafkaStreams( - buildTopology(singletonMap(AbstractKafkaAvroSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, schemaRegistryUrl)), + buildTopology(singletonMap(AbstractKafkaSchemaSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, schemaRegistryUrl)), streamsConfig(bootstrapServers, restEndpointPort, "/tmp/kafka-streams", restEndpointHostname) ); diff --git a/src/main/java/io/confluent/examples/streams/interactivequeries/kafkamusic/KafkaMusicExampleDriver.java b/src/main/java/io/confluent/examples/streams/interactivequeries/kafkamusic/KafkaMusicExampleDriver.java index 1d5c8254d..003b1a984 100644 --- a/src/main/java/io/confluent/examples/streams/interactivequeries/kafkamusic/KafkaMusicExampleDriver.java +++ b/src/main/java/io/confluent/examples/streams/interactivequeries/kafkamusic/KafkaMusicExampleDriver.java @@ -17,7 +17,7 @@ import io.confluent.examples.streams.avro.PlayEvent; import io.confluent.examples.streams.avro.Song; -import io.confluent.kafka.serializers.AbstractKafkaAvroSerDeConfig; +import io.confluent.kafka.serializers.AbstractKafkaSchemaSerDeConfig; import io.confluent.kafka.streams.serdes.avro.SpecificAvroSerializer; import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.ProducerConfig; @@ -25,7 +25,6 @@ import org.apache.kafka.common.serialization.LongSerializer; import org.apache.kafka.common.serialization.Serdes; -import java.util.Arrays; import java.util.ArrayList; import java.util.Collections; import java.util.List; @@ -37,7 +36,6 @@ import java.io.InputStream; import java.io.InputStreamReader; import static java.nio.charset.StandardCharsets.UTF_8; -import java.io.IOException; @@ -84,7 +82,7 @@ public static void main(final String [] args) throws Exception { props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers); final Map serdeConfig = Collections.singletonMap( - AbstractKafkaAvroSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, schemaRegistryUrl); + AbstractKafkaSchemaSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, schemaRegistryUrl); final SpecificAvroSerializer playEventSerializer = new SpecificAvroSerializer<>(); playEventSerializer.configure(serdeConfig, false); final SpecificAvroSerializer songSerializer = new SpecificAvroSerializer<>(); diff --git a/src/test/java/io/confluent/examples/streams/GenericAvroIntegrationTest.java b/src/test/java/io/confluent/examples/streams/GenericAvroIntegrationTest.java index 810d02ed0..773c7fda7 100644 --- a/src/test/java/io/confluent/examples/streams/GenericAvroIntegrationTest.java +++ b/src/test/java/io/confluent/examples/streams/GenericAvroIntegrationTest.java @@ -17,7 +17,7 @@ import io.confluent.kafka.schemaregistry.client.SchemaRegistryClient; import io.confluent.kafka.schemaregistry.testutil.MockSchemaRegistry; -import io.confluent.kafka.serializers.AbstractKafkaAvroSerDeConfig; +import io.confluent.kafka.serializers.AbstractKafkaSchemaSerDeConfig; import io.confluent.kafka.serializers.KafkaAvroDeserializer; import io.confluent.kafka.serializers.KafkaAvroSerializer; import io.confluent.kafka.streams.serdes.avro.GenericAvroSerde; @@ -84,7 +84,7 @@ public void shouldRoundTripGenericAvroDataThroughKafka() throws Exception { streamsConfiguration.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "dummy config"); streamsConfiguration.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.ByteArray().getClass().getName()); streamsConfiguration.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, GenericAvroSerde.class); - streamsConfiguration.put(AbstractKafkaAvroSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, MOCK_SCHEMA_REGISTRY_URL); + streamsConfiguration.put(AbstractKafkaSchemaSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, MOCK_SCHEMA_REGISTRY_URL); // Write the input data as-is to the output topic. // @@ -102,7 +102,7 @@ public void shouldRoundTripGenericAvroDataThroughKafka() throws Exception { // This is different from the case of setting default serdes (see `streamsConfiguration` // above), which will be auto-configured based on the `StreamsConfiguration` instance. genericAvroSerde.configure( - Collections.singletonMap(AbstractKafkaAvroSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, MOCK_SCHEMA_REGISTRY_URL), + Collections.singletonMap(AbstractKafkaSchemaSerDeConfig .SCHEMA_REGISTRY_URL_CONFIG, MOCK_SCHEMA_REGISTRY_URL), /*isKey*/ false); final KStream stream = builder.stream(inputTopic); stream.to(outputTopic, Produced.with(stringSerde, genericAvroSerde)); diff --git a/src/test/java/io/confluent/examples/streams/SessionWindowsExampleTest.java b/src/test/java/io/confluent/examples/streams/SessionWindowsExampleTest.java index 61c10ddf3..92472587a 100644 --- a/src/test/java/io/confluent/examples/streams/SessionWindowsExampleTest.java +++ b/src/test/java/io/confluent/examples/streams/SessionWindowsExampleTest.java @@ -17,7 +17,7 @@ import io.confluent.examples.streams.avro.PlayEvent; import io.confluent.kafka.schemaregistry.testutil.MockSchemaRegistry; -import io.confluent.kafka.serializers.AbstractKafkaAvroSerDeConfig; +import io.confluent.kafka.serializers.AbstractKafkaSchemaSerDeConfig; import io.confluent.kafka.streams.serdes.avro.SpecificAvroSerializer; import org.apache.kafka.common.serialization.LongDeserializer; import org.apache.kafka.common.serialization.StringDeserializer; @@ -54,7 +54,7 @@ public class SessionWindowsExampleTest { private TestInputTopic input; private TestOutputTopic output; private final Map AVRO_SERDE_CONFIG = Collections.singletonMap( - AbstractKafkaAvroSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, MOCK_SCHEMA_REGISTRY_URL + AbstractKafkaSchemaSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, MOCK_SCHEMA_REGISTRY_URL ); @Before diff --git a/src/test/java/io/confluent/examples/streams/SpecificAvroIntegrationTest.java b/src/test/java/io/confluent/examples/streams/SpecificAvroIntegrationTest.java index ddc2b58e0..23b73934f 100644 --- a/src/test/java/io/confluent/examples/streams/SpecificAvroIntegrationTest.java +++ b/src/test/java/io/confluent/examples/streams/SpecificAvroIntegrationTest.java @@ -17,7 +17,7 @@ import io.confluent.examples.streams.avro.WikiFeed; import io.confluent.examples.streams.kafka.EmbeddedSingleNodeKafkaCluster; -import io.confluent.kafka.serializers.AbstractKafkaAvroSerDeConfig; +import io.confluent.kafka.serializers.AbstractKafkaSchemaSerDeConfig; import io.confluent.kafka.serializers.KafkaAvroDeserializer; import io.confluent.kafka.serializers.KafkaAvroDeserializerConfig; import io.confluent.kafka.serializers.KafkaAvroSerializer; @@ -79,7 +79,7 @@ public void shouldRoundTripSpecificAvroDataThroughKafka() throws Exception { streamsConfiguration.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers()); streamsConfiguration.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.ByteArray().getClass().getName()); streamsConfiguration.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, SpecificAvroSerde.class); - streamsConfiguration.put(AbstractKafkaAvroSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, CLUSTER.schemaRegistryUrl()); + streamsConfiguration.put(AbstractKafkaSchemaSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, CLUSTER.schemaRegistryUrl()); streamsConfiguration.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); // Write the input data as-is to the output topic. @@ -99,7 +99,7 @@ public void shouldRoundTripSpecificAvroDataThroughKafka() throws Exception { // above), which will be auto-configured based on the `StreamsConfiguration` instance. final boolean isKeySerde = false; specificAvroSerde.configure( - Collections.singletonMap(AbstractKafkaAvroSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, CLUSTER.schemaRegistryUrl()), + Collections.singletonMap(AbstractKafkaSchemaSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, CLUSTER.schemaRegistryUrl()), isKeySerde); final KStream stream = builder.stream(inputTopic); stream.to(outputTopic, Produced.with(stringSerde, specificAvroSerde)); @@ -116,7 +116,7 @@ public void shouldRoundTripSpecificAvroDataThroughKafka() throws Exception { producerConfig.put(ProducerConfig.RETRIES_CONFIG, 0); producerConfig.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class); producerConfig.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, KafkaAvroSerializer.class); - producerConfig.put(AbstractKafkaAvroSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, CLUSTER.schemaRegistryUrl()); + producerConfig.put(AbstractKafkaSchemaSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, CLUSTER.schemaRegistryUrl()); IntegrationTestUtils.produceValuesSynchronously(inputTopic, inputValues, producerConfig); // @@ -128,7 +128,7 @@ public void shouldRoundTripSpecificAvroDataThroughKafka() throws Exception { consumerConfig.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); consumerConfig.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class); consumerConfig.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, KafkaAvroDeserializer.class); - consumerConfig.put(AbstractKafkaAvroSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, CLUSTER.schemaRegistryUrl()); + consumerConfig.put(AbstractKafkaSchemaSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, CLUSTER.schemaRegistryUrl()); consumerConfig.put(KafkaAvroDeserializerConfig.SPECIFIC_AVRO_READER_CONFIG, true); final List actualValues = IntegrationTestUtils.waitUntilMinValuesRecordsReceived( consumerConfig, diff --git a/src/test/java/io/confluent/examples/streams/TopArticlesLambdaExampleTest.java b/src/test/java/io/confluent/examples/streams/TopArticlesLambdaExampleTest.java index b9335c411..1d23c6a5d 100644 --- a/src/test/java/io/confluent/examples/streams/TopArticlesLambdaExampleTest.java +++ b/src/test/java/io/confluent/examples/streams/TopArticlesLambdaExampleTest.java @@ -17,7 +17,7 @@ import io.confluent.examples.streams.kafka.EmbeddedSingleNodeKafkaCluster; import io.confluent.examples.streams.utils.MonitoringInterceptorUtils; -import io.confluent.kafka.serializers.AbstractKafkaAvroSerDeConfig; +import io.confluent.kafka.serializers.AbstractKafkaSchemaSerDeConfig; import org.apache.avro.generic.GenericRecord; import org.apache.avro.generic.GenericRecordBuilder; import org.apache.kafka.clients.consumer.ConsumerConfig; @@ -88,7 +88,7 @@ public void shouldProduceTopNArticles() throws Exception { StringSerializer.class); props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, io.confluent.kafka.serializers.KafkaAvroSerializer.class); - props.put(AbstractKafkaAvroSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, CLUSTER.schemaRegistryUrl()); + props.put(AbstractKafkaSchemaSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, CLUSTER.schemaRegistryUrl()); MonitoringInterceptorUtils.maybeConfigureInterceptorsProducer(props); diff --git a/src/test/java/io/confluent/examples/streams/WikipediaFeedAvroExampleTest.java b/src/test/java/io/confluent/examples/streams/WikipediaFeedAvroExampleTest.java index 6d788630d..409252f19 100644 --- a/src/test/java/io/confluent/examples/streams/WikipediaFeedAvroExampleTest.java +++ b/src/test/java/io/confluent/examples/streams/WikipediaFeedAvroExampleTest.java @@ -17,7 +17,7 @@ import io.confluent.examples.streams.avro.WikiFeed; import io.confluent.examples.streams.kafka.EmbeddedSingleNodeKafkaCluster; -import io.confluent.kafka.serializers.AbstractKafkaAvroSerDeConfig; +import io.confluent.kafka.serializers.AbstractKafkaSchemaSerDeConfig; import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.clients.consumer.KafkaConsumer; @@ -35,7 +35,6 @@ import org.junit.ClassRule; import org.junit.Test; -import java.io.IOException; import java.time.Duration; import java.util.Collections; import java.util.HashMap; @@ -78,7 +77,7 @@ public void shouldRunTheWikipediaFeedExample() { StringSerializer.class); props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, io.confluent.kafka.serializers.KafkaAvroSerializer.class); - props.put(AbstractKafkaAvroSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, CLUSTER.schemaRegistryUrl()); + props.put(AbstractKafkaSchemaSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, CLUSTER.schemaRegistryUrl()); final KafkaProducer producer = new KafkaProducer<>(props); producer.send(new ProducerRecord<>(WikipediaFeedAvroExample.WIKIPEDIA_FEED, diff --git a/src/test/java/io/confluent/examples/streams/WikipediaFeedAvroLambdaExampleTest.java b/src/test/java/io/confluent/examples/streams/WikipediaFeedAvroLambdaExampleTest.java index 5c5cc7bd8..f30db610b 100644 --- a/src/test/java/io/confluent/examples/streams/WikipediaFeedAvroLambdaExampleTest.java +++ b/src/test/java/io/confluent/examples/streams/WikipediaFeedAvroLambdaExampleTest.java @@ -17,7 +17,7 @@ import io.confluent.examples.streams.avro.WikiFeed; import io.confluent.examples.streams.kafka.EmbeddedSingleNodeKafkaCluster; -import io.confluent.kafka.serializers.AbstractKafkaAvroSerDeConfig; +import io.confluent.kafka.serializers.AbstractKafkaSchemaSerDeConfig; import kafka.server.KafkaConfig; import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.clients.consumer.ConsumerRecords; @@ -82,7 +82,7 @@ public void shouldRunTheWikipediaFeedLambdaExample() { StringSerializer.class); props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, io.confluent.kafka.serializers.KafkaAvroSerializer.class); - props.put(AbstractKafkaAvroSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, CLUSTER.schemaRegistryUrl()); + props.put(AbstractKafkaSchemaSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, CLUSTER.schemaRegistryUrl()); final KafkaProducer producer = new KafkaProducer<>(props); producer.send(new ProducerRecord<>(WikipediaFeedAvroExample.WIKIPEDIA_FEED, diff --git a/src/test/java/io/confluent/examples/streams/interactivequeries/kafkamusic/KafkaMusicExampleTest.java b/src/test/java/io/confluent/examples/streams/interactivequeries/kafkamusic/KafkaMusicExampleTest.java index fe9e4138a..5d45d1968 100644 --- a/src/test/java/io/confluent/examples/streams/interactivequeries/kafkamusic/KafkaMusicExampleTest.java +++ b/src/test/java/io/confluent/examples/streams/interactivequeries/kafkamusic/KafkaMusicExampleTest.java @@ -21,7 +21,7 @@ import io.confluent.examples.streams.kafka.EmbeddedSingleNodeKafkaCluster; import io.confluent.examples.streams.microservices.util.MicroserviceTestUtils; import io.confluent.kafka.schemaregistry.testutil.MockSchemaRegistry; -import io.confluent.kafka.serializers.AbstractKafkaAvroSerDeConfig; +import io.confluent.kafka.serializers.AbstractKafkaSchemaSerDeConfig; import io.confluent.kafka.streams.serdes.avro.SpecificAvroSerializer; import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.ProducerConfig; @@ -119,7 +119,7 @@ public static void createTopicsAndProduceDataToInputTopics() throws Exception { props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers()); final Map serdeConfig = Collections.singletonMap( - AbstractKafkaAvroSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, MOCK_SCHEMA_REGISTRY_URL + AbstractKafkaSchemaSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, MOCK_SCHEMA_REGISTRY_URL ); final SpecificAvroSerializer playEventSerializer = new SpecificAvroSerializer<>(); playEventSerializer.configure(serdeConfig, false); @@ -170,7 +170,7 @@ private void createStreams(final String host) throws Exception { appServerPort = ExampleTestUtils.randomFreeLocalPort(); final Map serdeConfig = Collections.singletonMap( - AbstractKafkaAvroSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, MOCK_SCHEMA_REGISTRY_URL + AbstractKafkaSchemaSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, MOCK_SCHEMA_REGISTRY_URL ); streams = new KafkaStreams( KafkaMusicExample.buildTopology(serdeConfig), diff --git a/src/test/scala/io/confluent/examples/streams/GenericAvroScalaIntegrationTest.scala b/src/test/scala/io/confluent/examples/streams/GenericAvroScalaIntegrationTest.scala index 8851bf636..968ede999 100644 --- a/src/test/scala/io/confluent/examples/streams/GenericAvroScalaIntegrationTest.scala +++ b/src/test/scala/io/confluent/examples/streams/GenericAvroScalaIntegrationTest.scala @@ -18,7 +18,7 @@ package io.confluent.examples.streams import java.util.{Collections, Properties} import io.confluent.examples.streams.kafka.EmbeddedSingleNodeKafkaCluster -import io.confluent.kafka.serializers.{AbstractKafkaAvroSerDeConfig, KafkaAvroDeserializer, KafkaAvroSerializer} +import io.confluent.kafka.serializers.{AbstractKafkaSchemaSerDeConfig, KafkaAvroDeserializer, KafkaAvroSerializer} import io.confluent.kafka.streams.serdes.avro.GenericAvroSerde import org.apache.avro.Schema import org.apache.avro.generic.{GenericData, GenericRecord} @@ -74,7 +74,7 @@ class GenericAvroScalaIntegrationTest extends AssertionsForJUnit { val p = new Properties() p.put(StreamsConfig.APPLICATION_ID_CONFIG, "generic-avro-scala-integration-test") p.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, cluster.bootstrapServers()) - p.put(AbstractKafkaAvroSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, cluster.schemaRegistryUrl) + p.put(AbstractKafkaSchemaSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, cluster.schemaRegistryUrl) p.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest") p } @@ -83,7 +83,7 @@ class GenericAvroScalaIntegrationTest extends AssertionsForJUnit { implicit val genericAvroSerde: Serde[GenericRecord] = { val gas = new GenericAvroSerde val isKeySerde: Boolean = false - gas.configure(Collections.singletonMap(AbstractKafkaAvroSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, cluster.schemaRegistryUrl), isKeySerde) + gas.configure(Collections.singletonMap(AbstractKafkaSchemaSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, cluster.schemaRegistryUrl), isKeySerde) gas } @@ -103,7 +103,7 @@ class GenericAvroScalaIntegrationTest extends AssertionsForJUnit { p.put(ProducerConfig.RETRIES_CONFIG, "0") p.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, classOf[ByteArraySerializer]) p.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, classOf[KafkaAvroSerializer]) - p.put(AbstractKafkaAvroSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, cluster.schemaRegistryUrl) + p.put(AbstractKafkaSchemaSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, cluster.schemaRegistryUrl) p } import collection.JavaConverters._ @@ -119,7 +119,7 @@ class GenericAvroScalaIntegrationTest extends AssertionsForJUnit { p.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest") p.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, classOf[ByteArrayDeserializer]) p.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, classOf[KafkaAvroDeserializer]) - p.put(AbstractKafkaAvroSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, cluster.schemaRegistryUrl) + p.put(AbstractKafkaSchemaSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, cluster.schemaRegistryUrl) p } val actualValues: java.util.List[GenericRecord] = diff --git a/src/test/scala/io/confluent/examples/streams/SpecificAvroScalaIntegrationTest.scala b/src/test/scala/io/confluent/examples/streams/SpecificAvroScalaIntegrationTest.scala index ba37690f8..f8547ab70 100644 --- a/src/test/scala/io/confluent/examples/streams/SpecificAvroScalaIntegrationTest.scala +++ b/src/test/scala/io/confluent/examples/streams/SpecificAvroScalaIntegrationTest.scala @@ -19,7 +19,7 @@ import java.util.{Collections, Properties} import io.confluent.examples.streams.avro.WikiFeed import io.confluent.examples.streams.kafka.EmbeddedSingleNodeKafkaCluster -import io.confluent.kafka.serializers.{AbstractKafkaAvroSerDeConfig, KafkaAvroDeserializer, KafkaAvroDeserializerConfig, KafkaAvroSerializer} +import io.confluent.kafka.serializers.{AbstractKafkaSchemaSerDeConfig, KafkaAvroDeserializer, KafkaAvroDeserializerConfig, KafkaAvroSerializer} import io.confluent.kafka.streams.serdes.avro.SpecificAvroSerde import org.apache.kafka.clients.consumer.ConsumerConfig import org.apache.kafka.clients.producer.ProducerConfig @@ -66,7 +66,7 @@ class SpecificAvroScalaIntegrationTest extends AssertionsForJUnit { val p = new Properties() p.put(StreamsConfig.APPLICATION_ID_CONFIG, "specific-avro-scala-integration-test") p.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, cluster.bootstrapServers()) - p.put(AbstractKafkaAvroSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, cluster.schemaRegistryUrl) + p.put(AbstractKafkaSchemaSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, cluster.schemaRegistryUrl) p.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest") p } @@ -75,7 +75,7 @@ class SpecificAvroScalaIntegrationTest extends AssertionsForJUnit { implicit val specificAvroSerde: Serde[WikiFeed] = { val sas = new SpecificAvroSerde[WikiFeed] val isKeySerde: Boolean = false - sas.configure(Collections.singletonMap(AbstractKafkaAvroSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, cluster.schemaRegistryUrl), isKeySerde) + sas.configure(Collections.singletonMap(AbstractKafkaSchemaSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, cluster.schemaRegistryUrl), isKeySerde) sas } @@ -95,7 +95,7 @@ class SpecificAvroScalaIntegrationTest extends AssertionsForJUnit { p.put(ProducerConfig.RETRIES_CONFIG, "0") p.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, classOf[ByteArraySerializer]) p.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, classOf[KafkaAvroSerializer]) - p.put(AbstractKafkaAvroSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, cluster.schemaRegistryUrl) + p.put(AbstractKafkaSchemaSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, cluster.schemaRegistryUrl) p } import collection.JavaConverters._ @@ -111,7 +111,7 @@ class SpecificAvroScalaIntegrationTest extends AssertionsForJUnit { p.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest") p.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, classOf[ByteArrayDeserializer]) p.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, classOf[KafkaAvroDeserializer]) - p.put(AbstractKafkaAvroSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, cluster.schemaRegistryUrl) + p.put(AbstractKafkaSchemaSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, cluster.schemaRegistryUrl) p.put(KafkaAvroDeserializerConfig.SPECIFIC_AVRO_READER_CONFIG, "true") p }