From aff3c114ad15526d1a5c9e0b024135dc1236cde4 Mon Sep 17 00:00:00 2001 From: Ozan Gunalp Date: Mon, 16 Jan 2023 11:51:32 +0000 Subject: [PATCH] Kafka ProducerInterceptor Bean --- .../main/docs/kafka/writing-kafka-records.md | 21 ++++ .../ProducerInterceptorBeanExample.java | 37 ++++++ smallrye-reactive-messaging-kafka/revapi.json | 29 +++++ .../messaging/kafka/KafkaConnector.java | 8 +- .../kafka/fault/KafkaDeadLetterQueue.java | 2 +- .../messaging/kafka/i18n/KafkaLogging.java | 12 ++ .../kafka/impl/ConfigurationCleaner.java | 1 + .../messaging/kafka/impl/KafkaSink.java | 10 +- .../kafka/impl/ReactiveKafkaProducer.java | 83 +++++++++++-- .../messaging/kafka/KafkaSinkTest.java | 114 +++++++++++++++++- .../KafkaSinkWithLegacyMetadataTest.java | 14 ++- .../ce/KafkaSinkWithCloudEventsTest.java | 53 +++++--- .../kafka/client/LazyInitializedTest.java | 4 +- .../client/ReactiveKafkaProducerTest.java | 6 +- .../serde/SerializerConfigurationTest.java | 10 +- 15 files changed, 351 insertions(+), 53 deletions(-) create mode 100644 documentation/src/main/java/kafka/outbound/ProducerInterceptorBeanExample.java diff --git a/documentation/src/main/docs/kafka/writing-kafka-records.md b/documentation/src/main/docs/kafka/writing-kafka-records.md index 2b5ae0525e..c3be9ffcd3 100644 --- a/documentation/src/main/docs/kafka/writing-kafka-records.md +++ b/documentation/src/main/docs/kafka/writing-kafka-records.md @@ -284,6 +284,27 @@ can also be used for producing messages: This is an advanced feature. The `ProducerRecord` is sent to Kafka as is. Any possible metadata attached through `Message>` are ignored and lost. +## Producer Interceptors + +Producer interceptors can be configured for Kafka producer clients using the standard `interceptor.classes` property. +Configured classes will be instantiated by the Kafka producer on client creation. + +Alternatively, producer clients can be configured with a CDI managed-bean implementing {{ javadoc('org.apache.kafka.clients.producer.ProducerInterceptor', True, 'org.apache.kafka/kafka-clients') }} interface: + +To achieve this, the bean must be exposed with the `@Identifier` qualifier specifying the name of the bean: + + +``` java +{{ insert('kafka/outbound/ProducerInterceptorBeanExample.java') }} +``` + +Then, in the channel configuration, specify the following attribute: +`mp.messaging.incoming.$channel.interceptor-bean=my-producer-interceptor`. + +!!!warning + The `onSend` method will be called on the producer *sending thread* and `onAcknowledgement` will be called on the *Kafka producer I/O thread*. + In both cases if implementations are not fast, sending of messages could be delayed. + ## Configuration Reference {{ insert('../../../target/connectors/smallrye-kafka-outgoing.md') }} diff --git a/documentation/src/main/java/kafka/outbound/ProducerInterceptorBeanExample.java b/documentation/src/main/java/kafka/outbound/ProducerInterceptorBeanExample.java new file mode 100644 index 0000000000..a321feed84 --- /dev/null +++ b/documentation/src/main/java/kafka/outbound/ProducerInterceptorBeanExample.java @@ -0,0 +1,37 @@ +package kafka.outbound; + +import java.util.Map; + +import javax.enterprise.context.ApplicationScoped; + +import org.apache.kafka.clients.producer.ProducerInterceptor; +import org.apache.kafka.clients.producer.ProducerRecord; +import org.apache.kafka.clients.producer.RecordMetadata; + +import io.smallrye.common.annotation.Identifier; + +@ApplicationScoped +@Identifier("my-producer-interceptor") +public class ProducerInterceptorBeanExample implements ProducerInterceptor { + + @Override + public ProducerRecord onSend(ProducerRecord producerRecord) { + // called before send + return producerRecord; + } + + @Override + public void onAcknowledgement(RecordMetadata recordMetadata, Exception e) { + // called on send acknowledgement callback + } + + @Override + public void close() { + // called on client close + } + + @Override + public void configure(Map map) { + // called on client configuration + } +} diff --git a/smallrye-reactive-messaging-kafka/revapi.json b/smallrye-reactive-messaging-kafka/revapi.json index 74eb15d976..f9f05927dd 100644 --- a/smallrye-reactive-messaging-kafka/revapi.json +++ b/smallrye-reactive-messaging-kafka/revapi.json @@ -51,6 +51,35 @@ "code": "java.class.removed", "old": "class io.smallrye.reactive.messaging.kafka.tracing.HeaderInjectAdapter", "justification": "Replaced by otel instrumentation api" + }, + { + "code": "java.annotation.attributeValueChanged", + "old": "class io.smallrye.reactive.messaging.kafka.KafkaConnector", + "new": "class io.smallrye.reactive.messaging.kafka.KafkaConnector", + "annotationType": "io.smallrye.reactive.messaging.annotations.ConnectorAttributes", + "attribute": "value", + "justification": "Added interceptor-bean to connector attributes" + }, + { + "ignore": true, + "code": "java.method.numberOfParametersChanged", + "old": "method void io.smallrye.reactive.messaging.kafka.impl.KafkaSink::(io.smallrye.reactive.messaging.kafka.KafkaConnectorOutgoingConfiguration, io.smallrye.reactive.messaging.kafka.KafkaCDIEvents, javax.enterprise.inject.Instance>)", + "new": "method void io.smallrye.reactive.messaging.kafka.impl.KafkaSink::(io.smallrye.reactive.messaging.kafka.KafkaConnectorOutgoingConfiguration, io.smallrye.reactive.messaging.kafka.KafkaCDIEvents, javax.enterprise.inject.Instance>, javax.enterprise.inject.Instance>)", + "justification": "Added ProducerInterceptor instances to constructor" + }, + { + "ignore": true, + "code": "java.method.numberOfParametersChanged", + "old": "method void io.smallrye.reactive.messaging.kafka.impl.ReactiveKafkaProducer::(io.smallrye.reactive.messaging.kafka.KafkaConnectorOutgoingConfiguration, javax.enterprise.inject.Instance>, java.util.function.Consumer, java.util.function.BiConsumer, java.util.Map>)", + "new": "method void io.smallrye.reactive.messaging.kafka.impl.ReactiveKafkaProducer::(io.smallrye.reactive.messaging.kafka.KafkaConnectorOutgoingConfiguration, javax.enterprise.inject.Instance>, javax.enterprise.inject.Instance>, java.util.function.Consumer, java.util.function.BiConsumer, java.util.Map>)", + "justification": "Added ProducerInterceptor instances to constructor" + }, + { + "ignore": true, + "code": "java.method.numberOfParametersChanged", + "old": "method void io.smallrye.reactive.messaging.kafka.impl.ReactiveKafkaProducer::(java.util.Map, java.lang.String, int, boolean, io.smallrye.reactive.messaging.kafka.SerializationFailureHandler, io.smallrye.reactive.messaging.kafka.SerializationFailureHandler, java.util.function.BiConsumer, java.util.Map>)", + "new": "method void io.smallrye.reactive.messaging.kafka.impl.ReactiveKafkaProducer::(java.util.Map, java.lang.String, int, boolean, org.apache.kafka.clients.producer.ProducerInterceptor, io.smallrye.reactive.messaging.kafka.SerializationFailureHandler, io.smallrye.reactive.messaging.kafka.SerializationFailureHandler, java.util.function.BiConsumer, java.util.Map>)", + "justification": "Added ProducerInterceptor instances to constructor" } ] } diff --git a/smallrye-reactive-messaging-kafka/src/main/java/io/smallrye/reactive/messaging/kafka/KafkaConnector.java b/smallrye-reactive-messaging-kafka/src/main/java/io/smallrye/reactive/messaging/kafka/KafkaConnector.java index 1f58f77adc..240b63fe54 100644 --- a/smallrye-reactive-messaging-kafka/src/main/java/io/smallrye/reactive/messaging/kafka/KafkaConnector.java +++ b/smallrye-reactive-messaging-kafka/src/main/java/io/smallrye/reactive/messaging/kafka/KafkaConnector.java @@ -17,6 +17,7 @@ import javax.enterprise.inject.Instance; import javax.inject.Inject; +import org.apache.kafka.clients.producer.ProducerInterceptor; import org.eclipse.microprofile.config.Config; import org.eclipse.microprofile.reactive.messaging.Message; import org.eclipse.microprofile.reactive.messaging.spi.Connector; @@ -114,6 +115,7 @@ @ConnectorAttribute(name = "propagate-headers", direction = Direction.OUTGOING, description = "A comma-separating list of incoming record headers to be propagated to the outgoing record", type = "string", defaultValue = "") @ConnectorAttribute(name = "key-serialization-failure-handler", type = "string", direction = Direction.OUTGOING, description = "The name set in `@Identifier` of a bean that implements `io.smallrye.reactive.messaging.kafka.SerializationFailureHandler`. If set, serialization failure happening when serializing keys are delegated to this handler which may provide a fallback value.") @ConnectorAttribute(name = "value-serialization-failure-handler", type = "string", direction = Direction.OUTGOING, description = "The name set in `@Identifier` of a bean that implements `io.smallrye.reactive.messaging.kafka.SerializationFailureHandler`. If set, serialization failure happening when serializing values are delegated to this handler which may provide a fallback value.") +@ConnectorAttribute(name = "interceptor-bean", type = "string", direction = Direction.OUTGOING, description = "The name set in `@Identifier` of a bean that implements `org.apache.kafka.clients.producer.ProducerInterceptor`. If set, the identified bean will be used as the producer interceptor.") public class KafkaConnector implements InboundConnector, OutboundConnector, HealthReporter { public static final String CONNECTOR_NAME = "smallrye-kafka"; @@ -133,6 +135,10 @@ public SpanBuilder spanBuilder(final String spanName) { @Any Instance consumerRebalanceListeners; + @Inject + @Any + Instance> producerInterceptors; + @Inject @Any Instance> deserializationFailureHandlers; @@ -256,7 +262,7 @@ public Subscriber> getSubscriber(Config config) { if (oc.getHealthReadinessTimeout().isPresent()) { log.deprecatedConfig("health-readiness-timeout", "health-topic-verification-timeout"); } - KafkaSink sink = new KafkaSink(oc, kafkaCDIEvents, serializationFailureHandlers); + KafkaSink sink = new KafkaSink(oc, kafkaCDIEvents, serializationFailureHandlers, producerInterceptors); sinks.add(sink); return sink.getSink(); } diff --git a/smallrye-reactive-messaging-kafka/src/main/java/io/smallrye/reactive/messaging/kafka/fault/KafkaDeadLetterQueue.java b/smallrye-reactive-messaging-kafka/src/main/java/io/smallrye/reactive/messaging/kafka/fault/KafkaDeadLetterQueue.java index ecb31279da..1c70279894 100644 --- a/smallrye-reactive-messaging-kafka/src/main/java/io/smallrye/reactive/messaging/kafka/fault/KafkaDeadLetterQueue.java +++ b/smallrye-reactive-messaging-kafka/src/main/java/io/smallrye/reactive/messaging/kafka/fault/KafkaDeadLetterQueue.java @@ -93,7 +93,7 @@ public KafkaFailureHandler create(KafkaConnectorIncomingConfiguration config, // fire producer event (e.g. bind metrics) ReactiveKafkaProducer producer = new ReactiveKafkaProducer<>(deadQueueProducerConfig, - deadQueueTopic, 10000, false, null, null, + deadQueueTopic, 10000, false, null, null, null, (p, c) -> kafkaCDIEvents.producer().fire(p)); return new KafkaDeadLetterQueue(config.getChannel(), deadQueueTopic, producer, reportFailure); diff --git a/smallrye-reactive-messaging-kafka/src/main/java/io/smallrye/reactive/messaging/kafka/i18n/KafkaLogging.java b/smallrye-reactive-messaging-kafka/src/main/java/io/smallrye/reactive/messaging/kafka/i18n/KafkaLogging.java index a1182f8f30..3e90db472d 100644 --- a/smallrye-reactive-messaging-kafka/src/main/java/io/smallrye/reactive/messaging/kafka/i18n/KafkaLogging.java +++ b/smallrye-reactive-messaging-kafka/src/main/java/io/smallrye/reactive/messaging/kafka/i18n/KafkaLogging.java @@ -310,4 +310,16 @@ void acknowledgementFromRevokedTopicPartition(long offset, TopicPartition topicP @Message(id = 18273, value = "Checkpoint commit strategy processing state type not found for channel %s : %s") void checkpointStateTypeNotFound(String channel, String fqcn); + @LogMessage(level = Logger.Level.INFO) + @Message(id = 18274, value = "Error caught in producer interceptor `onSend` for channel %s") + void interceptorOnSendError(String channel, @Cause Throwable cause); + + @LogMessage(level = Logger.Level.TRACE) + @Message(id = 18275, value = "Error caught in producer interceptor `onAcknowledge` for channel %s") + void interceptorOnAcknowledgeError(String channel, @Cause Throwable cause); + + @LogMessage(level = Logger.Level.TRACE) + @Message(id = 18276, value = "Error caught in producer interceptor `close` for channel %s") + void interceptorCloseError(String channel, @Cause Throwable cause); + } diff --git a/smallrye-reactive-messaging-kafka/src/main/java/io/smallrye/reactive/messaging/kafka/impl/ConfigurationCleaner.java b/smallrye-reactive-messaging-kafka/src/main/java/io/smallrye/reactive/messaging/kafka/impl/ConfigurationCleaner.java index 4f529f2389..f5f106ec30 100644 --- a/smallrye-reactive-messaging-kafka/src/main/java/io/smallrye/reactive/messaging/kafka/impl/ConfigurationCleaner.java +++ b/smallrye-reactive-messaging-kafka/src/main/java/io/smallrye/reactive/messaging/kafka/impl/ConfigurationCleaner.java @@ -39,6 +39,7 @@ public class ConfigurationCleaner { "key-serialization-failure-handler", "value-serialization-failure-handler", "merge", + "interceptor-bean", // Remove most common attributes, may have been configured from the default config "key.deserializer", diff --git a/smallrye-reactive-messaging-kafka/src/main/java/io/smallrye/reactive/messaging/kafka/impl/KafkaSink.java b/smallrye-reactive-messaging-kafka/src/main/java/io/smallrye/reactive/messaging/kafka/impl/KafkaSink.java index 4dee89d737..d3bd659b6e 100644 --- a/smallrye-reactive-messaging-kafka/src/main/java/io/smallrye/reactive/messaging/kafka/impl/KafkaSink.java +++ b/smallrye-reactive-messaging-kafka/src/main/java/io/smallrye/reactive/messaging/kafka/impl/KafkaSink.java @@ -17,6 +17,7 @@ import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.clients.producer.ProducerConfig; +import org.apache.kafka.clients.producer.ProducerInterceptor; import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.clients.producer.RecordMetadata; import org.apache.kafka.common.errors.InvalidTopicException; @@ -85,7 +86,8 @@ public class KafkaSink { private final Instrumenter instrumenter; public KafkaSink(KafkaConnectorOutgoingConfiguration config, KafkaCDIEvents kafkaCDIEvents, - Instance> serializationFailureHandlers) { + Instance> serializationFailureHandlers, + Instance> producerInterceptors) { this.isTracingEnabled = config.getTracingEnabled(); this.partition = config.getPartition(); this.retries = config.getRetries(); @@ -93,7 +95,8 @@ public KafkaSink(KafkaConnectorOutgoingConfiguration config, KafkaCDIEvents kafk this.key = config.getKey().orElse(null); this.channel = config.getChannel(); - this.client = new ReactiveKafkaProducer<>(config, serializationFailureHandlers, this::reportFailure, + this.client = new ReactiveKafkaProducer<>(config, serializationFailureHandlers, producerInterceptors, + this::reportFailure, (p, c) -> { log.connectedToKafka(getClientId(c), config.getBootstrapServers(), topic); // fire producer event (e.g. bind metrics) @@ -235,7 +238,8 @@ record = getProducerRecord(message, outgoingMetadata, incomingMetadata, actualTo Uni uni = sendUni.onItem().transformToUni(recordMetadata -> { OutgoingMessageMetadata.setResultOnMessage(message, recordMetadata); - log.successfullyToTopic(message, record.topic(), recordMetadata.partition(), recordMetadata.offset()); + log.successfullyToTopic(message, recordMetadata.topic(), recordMetadata.partition(), + recordMetadata.offset()); return Uni.createFrom().completionStage(message.ack()); }); diff --git a/smallrye-reactive-messaging-kafka/src/main/java/io/smallrye/reactive/messaging/kafka/impl/ReactiveKafkaProducer.java b/smallrye-reactive-messaging-kafka/src/main/java/io/smallrye/reactive/messaging/kafka/impl/ReactiveKafkaProducer.java index e0e5da8600..cdf0182fcb 100644 --- a/smallrye-reactive-messaging-kafka/src/main/java/io/smallrye/reactive/messaging/kafka/impl/ReactiveKafkaProducer.java +++ b/smallrye-reactive-messaging-kafka/src/main/java/io/smallrye/reactive/messaging/kafka/impl/ReactiveKafkaProducer.java @@ -24,10 +24,12 @@ import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.Producer; import org.apache.kafka.clients.producer.ProducerConfig; +import org.apache.kafka.clients.producer.ProducerInterceptor; import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.clients.producer.RecordMetadata; import org.apache.kafka.common.PartitionInfo; import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.record.RecordBatch; import org.apache.kafka.common.serialization.Serializer; import io.smallrye.common.annotation.CheckReturnValue; @@ -36,12 +38,14 @@ import io.smallrye.reactive.messaging.kafka.KafkaConnectorOutgoingConfiguration; import io.smallrye.reactive.messaging.kafka.SerializationFailureHandler; import io.smallrye.reactive.messaging.kafka.fault.SerializerWrapper; +import io.smallrye.reactive.messaging.providers.helpers.CDIUtils; import io.vertx.core.Context; public class ReactiveKafkaProducer implements io.smallrye.reactive.messaging.kafka.KafkaProducer { private final AtomicBoolean closed = new AtomicBoolean(true); private final String clientId; + private final ProducerInterceptor interceptor; private final Uni> producerUni; private final AtomicReference> producerRef = new AtomicReference<>(); @@ -55,10 +59,12 @@ public class ReactiveKafkaProducer implements io.smallrye.reactive.messagi public ReactiveKafkaProducer(KafkaConnectorOutgoingConfiguration config, Instance> serializationFailureHandlers, + Instance> producerInterceptors, Consumer reportFailure, BiConsumer, Map> onProducerCreated) { this(getKafkaProducerConfiguration(config), config.getChannel(), config.getCloseTimeout(), config.getLazyClient(), + getProducerInterceptorBean(config, producerInterceptors), createSerializationFailureHandler(config.getChannel(), config.getKeySerializationFailureHandler().orElse(null), serializationFailureHandlers), @@ -75,6 +81,7 @@ public String getClientId() { public ReactiveKafkaProducer(Map kafkaConfiguration, String channel, int closeTimeout, boolean lazyClient, + ProducerInterceptor interceptor, SerializationFailureHandler keySerializationFailureHandler, SerializationFailureHandler valueSerializationFailureHandler, BiConsumer, Map> onProducerCreated) { @@ -82,6 +89,7 @@ public ReactiveKafkaProducer(Map kafkaConfiguration, String chan this.channel = channel; this.closetimeout = closeTimeout; this.clientId = kafkaConfiguration.get(ProducerConfig.CLIENT_ID_CONFIG).toString(); + this.interceptor = interceptor; String keySerializerCN = (String) kafkaConfiguration.get(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG); String valueSerializerCN = (String) kafkaConfiguration.get(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG); @@ -98,6 +106,10 @@ public ReactiveKafkaProducer(Map kafkaConfiguration, String chan // Configure the underlying serializers keySerializer.configure(kafkaConfiguration, true); valueSerializer.configure(kafkaConfiguration, false); + // Configure interceptor + if (interceptor != null) { + interceptor.configure(kafkaConfiguration); + } kafkaWorker = Executors.newSingleThreadExecutor(KafkaSendingThread::new); producerUni = Uni.createFrom().item(() -> producerRef.updateAndGet(p -> { @@ -147,18 +159,22 @@ public Uni runOnSendingThread(java.util.function.Consumer> @CheckReturnValue public Uni send(ProducerRecord record) { return withProducerOnSendingThread() - .chain(c -> Uni.createFrom().emitter(em -> c.send(record, (metadata, exception) -> { - if (exception != null) { - if (record.topic() != null) { - log.unableToWrite(this.channel, record.topic(), exception); + .chain(c -> { + final ProducerRecord intercepted = interceptOnSend(record); + return Uni.createFrom().emitter(em -> c.send(intercepted, (metadata, exception) -> { + interceptOnAcknowledge(intercepted, metadata, exception); + if (exception != null) { + if (record.topic() != null) { + log.unableToWrite(this.channel, record.topic(), exception); + } else { + log.unableToWrite(this.channel, exception); + } + em.fail(exception); } else { - log.unableToWrite(this.channel, exception); + em.complete(metadata); } - em.fail(exception); - } else { - em.complete(metadata); - } - }))); + })); + }); } @Override @@ -209,6 +225,14 @@ public Uni sendOffsetsToTransaction(Map }); } + @SuppressWarnings({ "unchecked" }) + private static ProducerInterceptor getProducerInterceptorBean(KafkaConnectorOutgoingConfiguration config, + Instance> producerInterceptors) { + return (ProducerInterceptor) config.getInterceptorBean() + .flatMap(identifier -> CDIUtils.getInstanceById(producerInterceptors, identifier).stream().findFirst()) + .orElse(null); + } + @SuppressWarnings({ "unchecked" }) private static SerializationFailureHandler createSerializationFailureHandler(String channelName, String failureHandlerName, Instance> deserializationFailureHandlers) { @@ -290,6 +314,7 @@ public void close() { if (closed.compareAndSet(false, true)) { int timeout = this.closetimeout; Uni uni = runOnSendingThread(p -> { + interceptClose(); if (System.getSecurityManager() == null) { p.close(Duration.ofMillis(timeout)); } else { @@ -309,4 +334,42 @@ public void close() { } } + private ProducerRecord interceptOnSend(ProducerRecord record) { + if (interceptor != null) { + try { + return interceptor.onSend(record); + } catch (Throwable t) { + log.interceptorOnSendError(channel, t); + } + } + return record; + } + + private void interceptOnAcknowledge(ProducerRecord intercepted, RecordMetadata recordMetadata, Exception exception) { + if (interceptor != null) { + try { + RecordMetadata metadata = exception == null ? recordMetadata : getRecordMetadataForFailure(intercepted); + interceptor.onAcknowledgement(metadata, exception); + } catch (Throwable t) { + log.interceptorOnAcknowledgeError(this.channel, t); + } + } + } + + private static RecordMetadata getRecordMetadataForFailure(ProducerRecord record) { + return new RecordMetadata(new TopicPartition(record.topic(), record.partition()), + -1, -1, RecordBatch.NO_TIMESTAMP, -1, -1); + } + + private void interceptClose() { + if (interceptor != null) { + try { + interceptor.close(); + } catch (Throwable t) { + log.interceptorCloseError(channel, t); + } + } + } + + } diff --git a/smallrye-reactive-messaging-kafka/src/test/java/io/smallrye/reactive/messaging/kafka/KafkaSinkTest.java b/smallrye-reactive-messaging-kafka/src/test/java/io/smallrye/reactive/messaging/kafka/KafkaSinkTest.java index 10f1b1b2d6..a02d8458d5 100644 --- a/smallrye-reactive-messaging-kafka/src/test/java/io/smallrye/reactive/messaging/kafka/KafkaSinkTest.java +++ b/smallrye-reactive-messaging-kafka/src/test/java/io/smallrye/reactive/messaging/kafka/KafkaSinkTest.java @@ -5,13 +5,18 @@ import static org.awaitility.Awaitility.await; import java.time.Duration; +import java.util.HashMap; import java.util.List; +import java.util.Map; import java.util.concurrent.CompletableFuture; import java.util.concurrent.CopyOnWriteArrayList; import javax.enterprise.context.ApplicationScoped; import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.clients.producer.ProducerInterceptor; +import org.apache.kafka.clients.producer.ProducerRecord; +import org.apache.kafka.clients.producer.RecordMetadata; import org.apache.kafka.common.serialization.IntegerSerializer; import org.apache.kafka.common.serialization.StringSerializer; import org.eclipse.microprofile.reactive.messaging.Incoming; @@ -22,6 +27,7 @@ import org.reactivestreams.Publisher; import org.reactivestreams.Subscriber; +import io.smallrye.common.annotation.Identifier; import io.smallrye.mutiny.Multi; import io.smallrye.mutiny.Uni; import io.smallrye.reactive.messaging.health.HealthReport; @@ -56,7 +62,8 @@ public void testSinkUsingInteger() { .with("partition", 0) .with("channel-name", "testSinkUsingInteger"); KafkaConnectorOutgoingConfiguration oc = new KafkaConnectorOutgoingConfiguration(config); - sink = new KafkaSink(oc, CountKafkaCdiEvents.noCdiEvents, UnsatisfiedInstance.instance()); + sink = new KafkaSink(oc, CountKafkaCdiEvents.noCdiEvents, UnsatisfiedInstance.instance(), + UnsatisfiedInstance.instance()); Subscriber> subscriber = sink.getSink(); Multi.createFrom().range(0, 10) @@ -76,7 +83,8 @@ public void testSinkUsingIntegerAndChannelName() { .with("value.serializer", IntegerSerializer.class.getName()) .with("partition", 0); KafkaConnectorOutgoingConfiguration oc = new KafkaConnectorOutgoingConfiguration(config); - sink = new KafkaSink(oc, CountKafkaCdiEvents.noCdiEvents, UnsatisfiedInstance.instance()); + sink = new KafkaSink(oc, CountKafkaCdiEvents.noCdiEvents, UnsatisfiedInstance.instance(), + UnsatisfiedInstance.instance()); Subscriber> subscriber = sink.getSink(); Multi.createFrom().range(0, 10) @@ -97,7 +105,8 @@ public void testSinkUsingString() { .with("partition", 0) .with("channel-name", "testSinkUsingString"); KafkaConnectorOutgoingConfiguration oc = new KafkaConnectorOutgoingConfiguration(config); - sink = new KafkaSink(oc, CountKafkaCdiEvents.noCdiEvents, UnsatisfiedInstance.instance()); + sink = new KafkaSink(oc, CountKafkaCdiEvents.noCdiEvents, UnsatisfiedInstance.instance(), + UnsatisfiedInstance.instance()); Subscriber> subscriber = sink.getSink(); Multi.createFrom().range(0, 10) @@ -228,7 +237,7 @@ public void testInvalidPayloadType() { .with("retries", 0L); // disable retry. KafkaConnectorOutgoingConfiguration oc = new KafkaConnectorOutgoingConfiguration(config); CountKafkaCdiEvents testCdiEvents = new CountKafkaCdiEvents(); - sink = new KafkaSink(oc, testCdiEvents, UnsatisfiedInstance.instance()); + sink = new KafkaSink(oc, testCdiEvents, UnsatisfiedInstance.instance(), UnsatisfiedInstance.instance()); await().until(() -> { HealthReport.HealthReportBuilder builder = HealthReport.builder(); @@ -277,7 +286,8 @@ public void testInvalidTypeWithDefaultInflightMessages() { .with("retries", 0L) .with("channel-name", "testInvalidTypeWithDefaultInflightMessages"); KafkaConnectorOutgoingConfiguration oc = new KafkaConnectorOutgoingConfiguration(config); - sink = new KafkaSink(oc, CountKafkaCdiEvents.noCdiEvents, UnsatisfiedInstance.instance()); + sink = new KafkaSink(oc, CountKafkaCdiEvents.noCdiEvents, UnsatisfiedInstance.instance(), + UnsatisfiedInstance.instance()); Subscriber subscriber = sink.getSink(); Multi.createFrom().range(0, 5) @@ -426,6 +436,100 @@ public void testConnectorWithMultipleUpstreams() { .contains(0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19); } + @Test + public void testProducerInterceptorBean() { + ConsumerTask consumed = companion.consume(Integer.class, String.class) + .fromTopics(topic, 10); + + addBeans(ProducerInterceptorBean.class); + KafkaMapBasedConfig config = getKafkaSinkConfigForRecordProducingBean(topic) + .with("interceptor-bean", "my-producer-interceptor"); + runApplication(config, BeanProducingKafkaRecord.class); + + await().until(this::isReady); + await().until(this::isAlive); + + assertThat(consumed.awaitCompletion(Duration.ofMinutes(1)).count()).isEqualTo(10); + assertThat(consumed.getRecords()) + .extracting(ConsumerRecord::key) + .containsExactly(0, 1, 2, 3, 4, 5, 6, 7, 8, 9); + assertThat(consumed.getRecords()) + .extracting(ConsumerRecord::headers) + .extracting(h -> h.lastHeader("intercept")) + .allSatisfy(h -> assertThat(h.value()).asString().isEqualTo("value")); + assertThat(consumed.getRecords()) + .extracting(ConsumerRecord::value) + .containsExactly("value-1", "value-2", "value-3", "value-4", "value-5", "value-6", "value-7", "value-8", + "value-9", "value-10"); + + ProducerInterceptorBean interceptor = getBeanManager().createInstance() + .select(ProducerInterceptorBean.class, Identifier.Literal.of("my-producer-interceptor")).get(); + assertThat(interceptor.getIntercepted()) + .hasSizeGreaterThanOrEqualTo(10) + .extracting(ProducerRecord::headers) + .extracting(h -> h.lastHeader("intercept")) + .allSatisfy(h -> assertThat(h.value()).asString().isEqualTo("value")); + assertThat(interceptor.getAcknowledged()) + .hasSizeGreaterThanOrEqualTo(10) + .extracting(RecordMetadata::offset) + .containsExactly(0L, 1L, 2L, 3L, 4L, 5L, 6L, 7L, 8L, 9L); + assertThat(interceptor.getConfig()).isNotEmpty(); + } + + @ApplicationScoped + @Identifier("my-producer-interceptor") + public static class ProducerInterceptorBean implements ProducerInterceptor { + + List> intercepted = new CopyOnWriteArrayList<>(); + List acknowledged = new CopyOnWriteArrayList<>(); + Map config = new HashMap<>(); + volatile boolean closed = false; + + @Override + public ProducerRecord onSend(ProducerRecord producerRecord) { + producerRecord.headers().add("intercept", "value".getBytes()); + intercepted.add(producerRecord); + if (producerRecord.key() == 3) { + throw new IllegalArgumentException("boom on intercepted send"); + } + return producerRecord; + } + + @Override + public void onAcknowledgement(RecordMetadata recordMetadata, Exception e) { + acknowledged.add(recordMetadata); + if (recordMetadata.offset() == 6) { + throw new IllegalArgumentException("boom on intercepted acknowledge"); + } + } + + @Override + public void close() { + closed = true; + } + + @Override + public void configure(Map map) { + config.putAll(map); + } + + public List> getIntercepted() { + return intercepted; + } + + public List getAcknowledged() { + return acknowledged; + } + + public Map getConfig() { + return config; + } + + public boolean isClosed() { + return closed; + } + } + @ApplicationScoped public static class BeanProducingKafkaRecord { diff --git a/smallrye-reactive-messaging-kafka/src/test/java/io/smallrye/reactive/messaging/kafka/KafkaSinkWithLegacyMetadataTest.java b/smallrye-reactive-messaging-kafka/src/test/java/io/smallrye/reactive/messaging/kafka/KafkaSinkWithLegacyMetadataTest.java index d1652018c6..1502e39ecb 100644 --- a/smallrye-reactive-messaging-kafka/src/test/java/io/smallrye/reactive/messaging/kafka/KafkaSinkWithLegacyMetadataTest.java +++ b/smallrye-reactive-messaging-kafka/src/test/java/io/smallrye/reactive/messaging/kafka/KafkaSinkWithLegacyMetadataTest.java @@ -61,7 +61,8 @@ public void testSinkUsingInteger() { .with("partition", 0) .with("channel-name", "testSinkUsingInteger"); KafkaConnectorOutgoingConfiguration oc = new KafkaConnectorOutgoingConfiguration(config); - sink = new KafkaSink(oc, CountKafkaCdiEvents.noCdiEvents, UnsatisfiedInstance.instance()); + sink = new KafkaSink(oc, CountKafkaCdiEvents.noCdiEvents, UnsatisfiedInstance.instance(), + UnsatisfiedInstance.instance()); Subscriber> subscriber = sink.getSink(); Multi.createFrom().range(0, 10) @@ -81,7 +82,8 @@ public void testSinkUsingIntegerAndChannelName() { .with("value.serializer", IntegerSerializer.class.getName()) .with("partition", 0); KafkaConnectorOutgoingConfiguration oc = new KafkaConnectorOutgoingConfiguration(config); - sink = new KafkaSink(oc, CountKafkaCdiEvents.noCdiEvents, UnsatisfiedInstance.instance()); + sink = new KafkaSink(oc, CountKafkaCdiEvents.noCdiEvents, UnsatisfiedInstance.instance(), + UnsatisfiedInstance.instance()); Subscriber> subscriber = sink.getSink(); Multi.createFrom().range(0, 10) @@ -102,7 +104,8 @@ public void testSinkUsingString() { .with("partition", 0) .with("channel-name", "testSinkUsingString"); KafkaConnectorOutgoingConfiguration oc = new KafkaConnectorOutgoingConfiguration(config); - sink = new KafkaSink(oc, CountKafkaCdiEvents.noCdiEvents, UnsatisfiedInstance.instance()); + sink = new KafkaSink(oc, CountKafkaCdiEvents.noCdiEvents, UnsatisfiedInstance.instance(), + UnsatisfiedInstance.instance()); Subscriber> subscriber = sink.getSink(); Multi.createFrom().range(0, 10) @@ -234,7 +237,7 @@ public void testInvalidPayloadType() { .with("retries", 0L); // disable retry. KafkaConnectorOutgoingConfiguration oc = new KafkaConnectorOutgoingConfiguration(config); CountKafkaCdiEvents testCdiEvents = new CountKafkaCdiEvents(); - sink = new KafkaSink(oc, testCdiEvents, UnsatisfiedInstance.instance()); + sink = new KafkaSink(oc, testCdiEvents, UnsatisfiedInstance.instance(), UnsatisfiedInstance.instance()); await().until(() -> { HealthReport.HealthReportBuilder builder = HealthReport.builder(); @@ -283,7 +286,8 @@ public void testInvalidTypeWithDefaultInflightMessages() { .with("retries", 0L) .with("channel-name", "testInvalidTypeWithDefaultInflightMessages"); KafkaConnectorOutgoingConfiguration oc = new KafkaConnectorOutgoingConfiguration(config); - sink = new KafkaSink(oc, CountKafkaCdiEvents.noCdiEvents, UnsatisfiedInstance.instance()); + sink = new KafkaSink(oc, CountKafkaCdiEvents.noCdiEvents, UnsatisfiedInstance.instance(), + UnsatisfiedInstance.instance()); Subscriber subscriber = sink.getSink(); Multi.createFrom().range(0, 5) diff --git a/smallrye-reactive-messaging-kafka/src/test/java/io/smallrye/reactive/messaging/kafka/ce/KafkaSinkWithCloudEventsTest.java b/smallrye-reactive-messaging-kafka/src/test/java/io/smallrye/reactive/messaging/kafka/ce/KafkaSinkWithCloudEventsTest.java index 80c60aeff1..a58684d01b 100644 --- a/smallrye-reactive-messaging-kafka/src/test/java/io/smallrye/reactive/messaging/kafka/ce/KafkaSinkWithCloudEventsTest.java +++ b/smallrye-reactive-messaging-kafka/src/test/java/io/smallrye/reactive/messaging/kafka/ce/KafkaSinkWithCloudEventsTest.java @@ -58,7 +58,8 @@ public void testSendingStructuredCloudEvents() { config.put("channel-name", topic); config.put("cloud-events-mode", "structured"); KafkaConnectorOutgoingConfiguration oc = new KafkaConnectorOutgoingConfiguration(config); - sink = new KafkaSink(oc, CountKafkaCdiEvents.noCdiEvents, UnsatisfiedInstance.instance()); + sink = new KafkaSink(oc, CountKafkaCdiEvents.noCdiEvents, UnsatisfiedInstance.instance(), + UnsatisfiedInstance.instance()); try (ConsumerTask records = companion.consumeStrings().fromTopics(topic)) { @@ -100,7 +101,8 @@ public void testSendingStructuredCloudEventsWithComplexPayload() { config.put("channel-name", topic); config.put("cloud-events-mode", "structured"); KafkaConnectorOutgoingConfiguration oc = new KafkaConnectorOutgoingConfiguration(config); - sink = new KafkaSink(oc, CountKafkaCdiEvents.noCdiEvents, UnsatisfiedInstance.instance()); + sink = new KafkaSink(oc, CountKafkaCdiEvents.noCdiEvents, UnsatisfiedInstance.instance(), + UnsatisfiedInstance.instance()); try (ConsumerTask records = companion.consumeStrings().fromTopics(topic)) { @@ -142,7 +144,8 @@ public void testSendingStructuredCloudEventsWithTimestampAndSubject() { config.put("channel-name", topic); config.put("cloud-events-mode", "structured"); KafkaConnectorOutgoingConfiguration oc = new KafkaConnectorOutgoingConfiguration(config); - sink = new KafkaSink(oc, CountKafkaCdiEvents.noCdiEvents, UnsatisfiedInstance.instance()); + sink = new KafkaSink(oc, CountKafkaCdiEvents.noCdiEvents, UnsatisfiedInstance.instance(), + UnsatisfiedInstance.instance()); try (ConsumerTask records = companion.consumeStrings().fromTopics(topic)) { @@ -186,7 +189,8 @@ public void testSendingStructuredCloudEventsMissingMandatoryAttribute() { config.put("channel-name", topic); config.put("cloud-events-mode", "structured"); KafkaConnectorOutgoingConfiguration oc = new KafkaConnectorOutgoingConfiguration(config); - sink = new KafkaSink(oc, CountKafkaCdiEvents.noCdiEvents, UnsatisfiedInstance.instance()); + sink = new KafkaSink(oc, CountKafkaCdiEvents.noCdiEvents, UnsatisfiedInstance.instance(), + UnsatisfiedInstance.instance()); Message message = Message.of("hello").addMetadata(OutgoingCloudEventMetadata.builder() .withSource(URI.create("test://test")) @@ -219,8 +223,9 @@ public void testSendingStructuredCloudEventsWithWrongSerializer() { config.put("cloud-events-mode", "structured"); KafkaConnectorOutgoingConfiguration oc = new KafkaConnectorOutgoingConfiguration(config); - assertThatThrownBy(() -> new KafkaSink(oc, CountKafkaCdiEvents.noCdiEvents, UnsatisfiedInstance.instance())) - .isInstanceOf(IllegalStateException.class); + assertThatThrownBy(() -> new KafkaSink(oc, CountKafkaCdiEvents.noCdiEvents, UnsatisfiedInstance.instance(), + UnsatisfiedInstance.instance())) + .isInstanceOf(IllegalStateException.class); } @SuppressWarnings({ "unchecked", "rawtypes" }) @@ -232,7 +237,8 @@ public void testSendingStructuredCloudEventsWithKey() { config.put("channel-name", topic); config.put("cloud-events-mode", "structured"); KafkaConnectorOutgoingConfiguration oc = new KafkaConnectorOutgoingConfiguration(config); - sink = new KafkaSink(oc, CountKafkaCdiEvents.noCdiEvents, UnsatisfiedInstance.instance()); + sink = new KafkaSink(oc, CountKafkaCdiEvents.noCdiEvents, UnsatisfiedInstance.instance(), + UnsatisfiedInstance.instance()); try (ConsumerTask records = companion.consumeStrings().fromTopics(topic)) { @@ -274,7 +280,8 @@ public void testSendingStructuredCloudEventsWithConfiguredTypeAndSource() { config.put("cloud-events-type", "my type"); config.put("cloud-events-source", "http://acme.org"); KafkaConnectorOutgoingConfiguration oc = new KafkaConnectorOutgoingConfiguration(config); - sink = new KafkaSink(oc, CountKafkaCdiEvents.noCdiEvents, UnsatisfiedInstance.instance()); + sink = new KafkaSink(oc, CountKafkaCdiEvents.noCdiEvents, UnsatisfiedInstance.instance(), + UnsatisfiedInstance.instance()); try (ConsumerTask records = companion.consumeStrings().fromTopics(topic)) { @@ -312,7 +319,8 @@ public void testSendingStructuredCloudEventsWithConfiguredTypeAndSourceAndNoClou config.put("cloud-events-type", "my type"); config.put("cloud-events-source", "http://acme.org"); KafkaConnectorOutgoingConfiguration oc = new KafkaConnectorOutgoingConfiguration(config); - sink = new KafkaSink(oc, CountKafkaCdiEvents.noCdiEvents, UnsatisfiedInstance.instance()); + sink = new KafkaSink(oc, CountKafkaCdiEvents.noCdiEvents, UnsatisfiedInstance.instance(), + UnsatisfiedInstance.instance()); try (ConsumerTask records = companion.consumeStrings().fromTopics(topic)) { @@ -346,7 +354,8 @@ public void testSendingStructuredCloudEventsWithExtensions() { config.put("channel-name", topic); config.put("cloud-events-mode", "structured"); KafkaConnectorOutgoingConfiguration oc = new KafkaConnectorOutgoingConfiguration(config); - sink = new KafkaSink(oc, CountKafkaCdiEvents.noCdiEvents, UnsatisfiedInstance.instance()); + sink = new KafkaSink(oc, CountKafkaCdiEvents.noCdiEvents, UnsatisfiedInstance.instance(), + UnsatisfiedInstance.instance()); try (ConsumerTask records = companion.consumeStrings().fromTopics(topic)) { @@ -387,7 +396,8 @@ public void testSendingBinaryCloudEvents() { config.put("value.serializer", StringSerializer.class.getName()); config.put("channel-name", topic); KafkaConnectorOutgoingConfiguration oc = new KafkaConnectorOutgoingConfiguration(config); - sink = new KafkaSink(oc, CountKafkaCdiEvents.noCdiEvents, UnsatisfiedInstance.instance()); + sink = new KafkaSink(oc, CountKafkaCdiEvents.noCdiEvents, UnsatisfiedInstance.instance(), + UnsatisfiedInstance.instance()); try (ConsumerTask records = companion.consumeStrings().fromTopics(topic)) { @@ -423,7 +433,8 @@ public void testSendingBinaryCloudEventsWithContentType() { config.put("value.serializer", StringSerializer.class.getName()); config.put("channel-name", topic); KafkaConnectorOutgoingConfiguration oc = new KafkaConnectorOutgoingConfiguration(config); - sink = new KafkaSink(oc, CountKafkaCdiEvents.noCdiEvents, UnsatisfiedInstance.instance()); + sink = new KafkaSink(oc, CountKafkaCdiEvents.noCdiEvents, UnsatisfiedInstance.instance(), + UnsatisfiedInstance.instance()); try (ConsumerTask records = companion.consumeStrings().fromTopics(topic)) { @@ -463,7 +474,8 @@ public void testSendingBinaryCloudEventsWithKey() { config.put("value.serializer", StringSerializer.class.getName()); config.put("channel-name", topic); KafkaConnectorOutgoingConfiguration oc = new KafkaConnectorOutgoingConfiguration(config); - sink = new KafkaSink(oc, CountKafkaCdiEvents.noCdiEvents, UnsatisfiedInstance.instance()); + sink = new KafkaSink(oc, CountKafkaCdiEvents.noCdiEvents, UnsatisfiedInstance.instance(), + UnsatisfiedInstance.instance()); try (ConsumerTask records = companion.consumeStrings().fromTopics(topic)) { @@ -504,7 +516,8 @@ public void testSendingBinaryCloudEventsWithConfiguredTypeAndSource() { config.put("cloud-events-type", "my type"); config.put("cloud-events-source", "http://acme.org"); KafkaConnectorOutgoingConfiguration oc = new KafkaConnectorOutgoingConfiguration(config); - sink = new KafkaSink(oc, CountKafkaCdiEvents.noCdiEvents, UnsatisfiedInstance.instance()); + sink = new KafkaSink(oc, CountKafkaCdiEvents.noCdiEvents, UnsatisfiedInstance.instance(), + UnsatisfiedInstance.instance()); try (ConsumerTask records = companion.consumeStrings().fromTopics(topic)) { @@ -541,7 +554,8 @@ public void testSendingBinaryCloudEventsWithConfiguredTypeAndSourceButNoMetadata config.put("cloud-events-type", "my type"); config.put("cloud-events-source", "http://acme.org"); KafkaConnectorOutgoingConfiguration oc = new KafkaConnectorOutgoingConfiguration(config); - sink = new KafkaSink(oc, CountKafkaCdiEvents.noCdiEvents, UnsatisfiedInstance.instance()); + sink = new KafkaSink(oc, CountKafkaCdiEvents.noCdiEvents, UnsatisfiedInstance.instance(), + UnsatisfiedInstance.instance()); try (ConsumerTask records = companion.consumeStrings().fromTopics(topic)) { @@ -573,7 +587,8 @@ public void testSendingBinaryCloudEventsMissingMandatoryAttribute() { config.put("value.serializer", StringSerializer.class.getName()); config.put("channel-name", topic); KafkaConnectorOutgoingConfiguration oc = new KafkaConnectorOutgoingConfiguration(config); - sink = new KafkaSink(oc, CountKafkaCdiEvents.noCdiEvents, UnsatisfiedInstance.instance()); + sink = new KafkaSink(oc, CountKafkaCdiEvents.noCdiEvents, UnsatisfiedInstance.instance(), + UnsatisfiedInstance.instance()); Message message = Message.of("hello").addMetadata(OutgoingCloudEventMetadata.builder() .withSource(URI.create("test://test")) @@ -607,7 +622,8 @@ public void testWithCloudEventDisabled() { config.put("key", "my-key"); config.put("cloud-events", false); KafkaConnectorOutgoingConfiguration oc = new KafkaConnectorOutgoingConfiguration(config); - sink = new KafkaSink(oc, CountKafkaCdiEvents.noCdiEvents, UnsatisfiedInstance.instance()); + sink = new KafkaSink(oc, CountKafkaCdiEvents.noCdiEvents, UnsatisfiedInstance.instance(), + UnsatisfiedInstance.instance()); try (ConsumerTask records = companion.consumeStrings().fromTopics(topic)) { @@ -637,7 +653,8 @@ public void testSendingBinaryCloudEventsWithExtensions() { config.put("value.serializer", StringSerializer.class.getName()); config.put("channel-name", topic); KafkaConnectorOutgoingConfiguration oc = new KafkaConnectorOutgoingConfiguration(config); - sink = new KafkaSink(oc, CountKafkaCdiEvents.noCdiEvents, UnsatisfiedInstance.instance()); + sink = new KafkaSink(oc, CountKafkaCdiEvents.noCdiEvents, UnsatisfiedInstance.instance(), + UnsatisfiedInstance.instance()); try (ConsumerTask records = companion.consumeStrings().fromTopics(topic)) { diff --git a/smallrye-reactive-messaging-kafka/src/test/java/io/smallrye/reactive/messaging/kafka/client/LazyInitializedTest.java b/smallrye-reactive-messaging-kafka/src/test/java/io/smallrye/reactive/messaging/kafka/client/LazyInitializedTest.java index 9b7cb4008b..f90ad630ce 100644 --- a/smallrye-reactive-messaging-kafka/src/test/java/io/smallrye/reactive/messaging/kafka/client/LazyInitializedTest.java +++ b/smallrye-reactive-messaging-kafka/src/test/java/io/smallrye/reactive/messaging/kafka/client/LazyInitializedTest.java @@ -51,7 +51,7 @@ void testLazyInitializedProducer() { MapBasedConfig config = new MapBasedConfig(props); KafkaSink sink = new KafkaSink(new KafkaConnectorOutgoingConfiguration(config), - CountKafkaCdiEvents.noCdiEvents, UnsatisfiedInstance.instance()); + CountKafkaCdiEvents.noCdiEvents, UnsatisfiedInstance.instance(), UnsatisfiedInstance.instance()); KafkaProducer producer = sink.getProducer(); assertThat(producer).isNotNull(); @@ -89,7 +89,7 @@ void testEagerInitializedProducer() { assertThatThrownBy(() -> { KafkaSink sink = new KafkaSink(new KafkaConnectorOutgoingConfiguration(config), - CountKafkaCdiEvents.noCdiEvents, UnsatisfiedInstance.instance()); + CountKafkaCdiEvents.noCdiEvents, UnsatisfiedInstance.instance(), UnsatisfiedInstance.instance()); }).hasCauseInstanceOf(KafkaException.class); } diff --git a/smallrye-reactive-messaging-kafka/src/test/java/io/smallrye/reactive/messaging/kafka/client/ReactiveKafkaProducerTest.java b/smallrye-reactive-messaging-kafka/src/test/java/io/smallrye/reactive/messaging/kafka/client/ReactiveKafkaProducerTest.java index 2d7c0db8c2..653b62fad0 100644 --- a/smallrye-reactive-messaging-kafka/src/test/java/io/smallrye/reactive/messaging/kafka/client/ReactiveKafkaProducerTest.java +++ b/smallrye-reactive-messaging-kafka/src/test/java/io/smallrye/reactive/messaging/kafka/client/ReactiveKafkaProducerTest.java @@ -165,14 +165,14 @@ public KafkaSink createSink() { .put("topic", topic); KafkaSink sink = new KafkaSink(new KafkaConnectorOutgoingConfiguration(config), - CountKafkaCdiEvents.noCdiEvents, UnsatisfiedInstance.instance()); + CountKafkaCdiEvents.noCdiEvents, UnsatisfiedInstance.instance(), UnsatisfiedInstance.instance()); this.sinks.add(sink); return sink; } public KafkaSink createSink(MapBasedConfig config) { KafkaSink sink = new KafkaSink(new KafkaConnectorOutgoingConfiguration(config), - CountKafkaCdiEvents.noCdiEvents, UnsatisfiedInstance.instance()); + CountKafkaCdiEvents.noCdiEvents, UnsatisfiedInstance.instance(), UnsatisfiedInstance.instance()); this.sinks.add(sink); return sink; } @@ -186,7 +186,7 @@ public KafkaSink createTransactionalSink() { .with("topic", topic); KafkaSink sink = new KafkaSink(new KafkaConnectorOutgoingConfiguration(config), - CountKafkaCdiEvents.noCdiEvents, UnsatisfiedInstance.instance()); + CountKafkaCdiEvents.noCdiEvents, UnsatisfiedInstance.instance(), UnsatisfiedInstance.instance()); this.sinks.add(sink); return sink; } diff --git a/smallrye-reactive-messaging-kafka/src/test/java/io/smallrye/reactive/messaging/kafka/serde/SerializerConfigurationTest.java b/smallrye-reactive-messaging-kafka/src/test/java/io/smallrye/reactive/messaging/kafka/serde/SerializerConfigurationTest.java index fb3cd4a1c5..1cc11a532b 100644 --- a/smallrye-reactive-messaging-kafka/src/test/java/io/smallrye/reactive/messaging/kafka/serde/SerializerConfigurationTest.java +++ b/smallrye-reactive-messaging-kafka/src/test/java/io/smallrye/reactive/messaging/kafka/serde/SerializerConfigurationTest.java @@ -44,7 +44,7 @@ public void cleanup() { public void testThatWhenNotSetKeySerializerIsString() { MapBasedConfig config = commonConsumerConfiguration(); sink = new KafkaSink(new KafkaConnectorOutgoingConfiguration(config), CountKafkaCdiEvents.noCdiEvents, - UnsatisfiedInstance.instance()); + UnsatisfiedInstance.instance(), UnsatisfiedInstance.instance()); ConsumerTask consumed = companion.consumeStrings().fromTopics(topic, 4, Duration.ofSeconds(10)); @@ -76,7 +76,7 @@ public void testKeySerializationFailure() { .with("key.serializer", JsonObjectSerde.JsonObjectSerializer.class.getName()) .with("retries", 0L); sink = new KafkaSink(new KafkaConnectorOutgoingConfiguration(config), CountKafkaCdiEvents.noCdiEvents, - UnsatisfiedInstance.instance()); + UnsatisfiedInstance.instance(), UnsatisfiedInstance.instance()); Subscriber> subscriber = sink.getSink(); AtomicBoolean nacked = new AtomicBoolean(); Multi.createFrom().items( @@ -95,7 +95,7 @@ public void testValueSerializationFailure() { .with("key.serializer", JsonObjectSerde.JsonObjectSerializer.class.getName()) .with("retries", 0L); sink = new KafkaSink(new KafkaConnectorOutgoingConfiguration(config), CountKafkaCdiEvents.noCdiEvents, - UnsatisfiedInstance.instance()); + UnsatisfiedInstance.instance(), UnsatisfiedInstance.instance()); Subscriber> subscriber = sink.getSink(); AtomicBoolean nacked = new AtomicBoolean(); Multi.createFrom().items( @@ -114,7 +114,7 @@ public void testFailureWhenValueSerializerIsNotSet() { assertThatThrownBy(() -> { sink = new KafkaSink(new KafkaConnectorOutgoingConfiguration(config), CountKafkaCdiEvents.noCdiEvents, - UnsatisfiedInstance.instance()); + UnsatisfiedInstance.instance(), UnsatisfiedInstance.instance()); }).isInstanceOf(IllegalArgumentException.class).hasMessageContaining("value.serializer"); } @@ -126,7 +126,7 @@ public void testFailureWhenSerializerFailsDuringConfiguration() { assertThatThrownBy(() -> { sink = new KafkaSink(new KafkaConnectorOutgoingConfiguration(config), CountKafkaCdiEvents.noCdiEvents, - UnsatisfiedInstance.instance()); + UnsatisfiedInstance.instance(), UnsatisfiedInstance.instance()); }).isInstanceOf(KafkaException.class) .hasCauseInstanceOf(IllegalStateException.class) .hasStackTraceContaining("boom");