Skip to content

Commit

Permalink
Kafka ProducerInterceptor Bean
Browse files Browse the repository at this point in the history
  • Loading branch information
ozangunalp committed Jan 25, 2023
1 parent b0f1fb5 commit aff3c11
Show file tree
Hide file tree
Showing 15 changed files with 351 additions and 53 deletions.
21 changes: 21 additions & 0 deletions documentation/src/main/docs/kafka/writing-kafka-records.md
Original file line number Diff line number Diff line change
Expand Up @@ -284,6 +284,27 @@ can also be used for producing messages:
This is an advanced feature. The `ProducerRecord` is sent to Kafka as
is. Any possible metadata attached through `Message<ProducerRecord<K, V>>` are ignored and lost.

## Producer Interceptors

Producer interceptors can be configured for Kafka producer clients using the standard `interceptor.classes` property.
Configured classes will be instantiated by the Kafka producer on client creation.

Alternatively, producer clients can be configured with a CDI managed-bean implementing {{ javadoc('org.apache.kafka.clients.producer.ProducerInterceptor', True, 'org.apache.kafka/kafka-clients') }} interface:

To achieve this, the bean must be exposed with the `@Identifier` qualifier specifying the name of the bean:


``` java
{{ insert('kafka/outbound/ProducerInterceptorBeanExample.java') }}
```

Then, in the channel configuration, specify the following attribute:
`mp.messaging.incoming.$channel.interceptor-bean=my-producer-interceptor`.

!!!warning
The `onSend` method will be called on the producer *sending thread* and `onAcknowledgement` will be called on the *Kafka producer I/O thread*.
In both cases if implementations are not fast, sending of messages could be delayed.

## Configuration Reference

{{ insert('../../../target/connectors/smallrye-kafka-outgoing.md') }}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
package kafka.outbound;

import java.util.Map;

import javax.enterprise.context.ApplicationScoped;

import org.apache.kafka.clients.producer.ProducerInterceptor;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;

import io.smallrye.common.annotation.Identifier;

@ApplicationScoped
@Identifier("my-producer-interceptor")
public class ProducerInterceptorBeanExample implements ProducerInterceptor<Integer, String> {

@Override
public ProducerRecord<Integer, String> onSend(ProducerRecord<Integer, String> producerRecord) {
// called before send
return producerRecord;
}

@Override
public void onAcknowledgement(RecordMetadata recordMetadata, Exception e) {
// called on send acknowledgement callback
}

@Override
public void close() {
// called on client close
}

@Override
public void configure(Map<String, ?> map) {
// called on client configuration
}
}
29 changes: 29 additions & 0 deletions smallrye-reactive-messaging-kafka/revapi.json
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,35 @@
"code": "java.class.removed",
"old": "class io.smallrye.reactive.messaging.kafka.tracing.HeaderInjectAdapter",
"justification": "Replaced by otel instrumentation api"
},
{
"code": "java.annotation.attributeValueChanged",
"old": "class io.smallrye.reactive.messaging.kafka.KafkaConnector",
"new": "class io.smallrye.reactive.messaging.kafka.KafkaConnector",
"annotationType": "io.smallrye.reactive.messaging.annotations.ConnectorAttributes",
"attribute": "value",
"justification": "Added interceptor-bean to connector attributes"
},
{
"ignore": true,
"code": "java.method.numberOfParametersChanged",
"old": "method void io.smallrye.reactive.messaging.kafka.impl.KafkaSink::<init>(io.smallrye.reactive.messaging.kafka.KafkaConnectorOutgoingConfiguration, io.smallrye.reactive.messaging.kafka.KafkaCDIEvents, javax.enterprise.inject.Instance<io.smallrye.reactive.messaging.kafka.SerializationFailureHandler<?>>)",
"new": "method void io.smallrye.reactive.messaging.kafka.impl.KafkaSink::<init>(io.smallrye.reactive.messaging.kafka.KafkaConnectorOutgoingConfiguration, io.smallrye.reactive.messaging.kafka.KafkaCDIEvents, javax.enterprise.inject.Instance<io.smallrye.reactive.messaging.kafka.SerializationFailureHandler<?>>, javax.enterprise.inject.Instance<org.apache.kafka.clients.producer.ProducerInterceptor<?, ?>>)",
"justification": "Added ProducerInterceptor instances to constructor"
},
{
"ignore": true,
"code": "java.method.numberOfParametersChanged",
"old": "method void io.smallrye.reactive.messaging.kafka.impl.ReactiveKafkaProducer<K, V>::<init>(io.smallrye.reactive.messaging.kafka.KafkaConnectorOutgoingConfiguration, javax.enterprise.inject.Instance<io.smallrye.reactive.messaging.kafka.SerializationFailureHandler<?>>, java.util.function.Consumer<java.lang.Throwable>, java.util.function.BiConsumer<org.apache.kafka.clients.producer.Producer<?, ?>, java.util.Map<java.lang.String, java.lang.Object>>)",
"new": "method void io.smallrye.reactive.messaging.kafka.impl.ReactiveKafkaProducer<K, V>::<init>(io.smallrye.reactive.messaging.kafka.KafkaConnectorOutgoingConfiguration, javax.enterprise.inject.Instance<io.smallrye.reactive.messaging.kafka.SerializationFailureHandler<?>>, javax.enterprise.inject.Instance<org.apache.kafka.clients.producer.ProducerInterceptor<?, ?>>, java.util.function.Consumer<java.lang.Throwable>, java.util.function.BiConsumer<org.apache.kafka.clients.producer.Producer<?, ?>, java.util.Map<java.lang.String, java.lang.Object>>)",
"justification": "Added ProducerInterceptor instances to constructor"
},
{
"ignore": true,
"code": "java.method.numberOfParametersChanged",
"old": "method void io.smallrye.reactive.messaging.kafka.impl.ReactiveKafkaProducer<K, V>::<init>(java.util.Map<java.lang.String, java.lang.Object>, java.lang.String, int, boolean, io.smallrye.reactive.messaging.kafka.SerializationFailureHandler<K>, io.smallrye.reactive.messaging.kafka.SerializationFailureHandler<V>, java.util.function.BiConsumer<org.apache.kafka.clients.producer.Producer<?, ?>, java.util.Map<java.lang.String, java.lang.Object>>)",
"new": "method void io.smallrye.reactive.messaging.kafka.impl.ReactiveKafkaProducer<K, V>::<init>(java.util.Map<java.lang.String, java.lang.Object>, java.lang.String, int, boolean, org.apache.kafka.clients.producer.ProducerInterceptor<K, V>, io.smallrye.reactive.messaging.kafka.SerializationFailureHandler<K>, io.smallrye.reactive.messaging.kafka.SerializationFailureHandler<V>, java.util.function.BiConsumer<org.apache.kafka.clients.producer.Producer<?, ?>, java.util.Map<java.lang.String, java.lang.Object>>)",
"justification": "Added ProducerInterceptor instances to constructor"
}
]
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
import javax.enterprise.inject.Instance;
import javax.inject.Inject;

import org.apache.kafka.clients.producer.ProducerInterceptor;
import org.eclipse.microprofile.config.Config;
import org.eclipse.microprofile.reactive.messaging.Message;
import org.eclipse.microprofile.reactive.messaging.spi.Connector;
Expand Down Expand Up @@ -114,6 +115,7 @@
@ConnectorAttribute(name = "propagate-headers", direction = Direction.OUTGOING, description = "A comma-separating list of incoming record headers to be propagated to the outgoing record", type = "string", defaultValue = "")
@ConnectorAttribute(name = "key-serialization-failure-handler", type = "string", direction = Direction.OUTGOING, description = "The name set in `@Identifier` of a bean that implements `io.smallrye.reactive.messaging.kafka.SerializationFailureHandler`. If set, serialization failure happening when serializing keys are delegated to this handler which may provide a fallback value.")
@ConnectorAttribute(name = "value-serialization-failure-handler", type = "string", direction = Direction.OUTGOING, description = "The name set in `@Identifier` of a bean that implements `io.smallrye.reactive.messaging.kafka.SerializationFailureHandler`. If set, serialization failure happening when serializing values are delegated to this handler which may provide a fallback value.")
@ConnectorAttribute(name = "interceptor-bean", type = "string", direction = Direction.OUTGOING, description = "The name set in `@Identifier` of a bean that implements `org.apache.kafka.clients.producer.ProducerInterceptor`. If set, the identified bean will be used as the producer interceptor.")
public class KafkaConnector implements InboundConnector, OutboundConnector, HealthReporter {

public static final String CONNECTOR_NAME = "smallrye-kafka";
Expand All @@ -133,6 +135,10 @@ public SpanBuilder spanBuilder(final String spanName) {
@Any
Instance<KafkaConsumerRebalanceListener> consumerRebalanceListeners;

@Inject
@Any
Instance<ProducerInterceptor<?, ?>> producerInterceptors;

@Inject
@Any
Instance<DeserializationFailureHandler<?>> deserializationFailureHandlers;
Expand Down Expand Up @@ -256,7 +262,7 @@ public Subscriber<? extends Message<?>> getSubscriber(Config config) {
if (oc.getHealthReadinessTimeout().isPresent()) {
log.deprecatedConfig("health-readiness-timeout", "health-topic-verification-timeout");
}
KafkaSink sink = new KafkaSink(oc, kafkaCDIEvents, serializationFailureHandlers);
KafkaSink sink = new KafkaSink(oc, kafkaCDIEvents, serializationFailureHandlers, producerInterceptors);
sinks.add(sink);
return sink.getSink();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,7 @@ public KafkaFailureHandler create(KafkaConnectorIncomingConfiguration config,

// fire producer event (e.g. bind metrics)
ReactiveKafkaProducer<Object, Object> producer = new ReactiveKafkaProducer<>(deadQueueProducerConfig,
deadQueueTopic, 10000, false, null, null,
deadQueueTopic, 10000, false, null, null, null,
(p, c) -> kafkaCDIEvents.producer().fire(p));

return new KafkaDeadLetterQueue(config.getChannel(), deadQueueTopic, producer, reportFailure);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -310,4 +310,16 @@ void acknowledgementFromRevokedTopicPartition(long offset, TopicPartition topicP
@Message(id = 18273, value = "Checkpoint commit strategy processing state type not found for channel %s : %s")
void checkpointStateTypeNotFound(String channel, String fqcn);

@LogMessage(level = Logger.Level.INFO)
@Message(id = 18274, value = "Error caught in producer interceptor `onSend` for channel %s")
void interceptorOnSendError(String channel, @Cause Throwable cause);

@LogMessage(level = Logger.Level.TRACE)
@Message(id = 18275, value = "Error caught in producer interceptor `onAcknowledge` for channel %s")
void interceptorOnAcknowledgeError(String channel, @Cause Throwable cause);

@LogMessage(level = Logger.Level.TRACE)
@Message(id = 18276, value = "Error caught in producer interceptor `close` for channel %s")
void interceptorCloseError(String channel, @Cause Throwable cause);

}
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ public class ConfigurationCleaner {
"key-serialization-failure-handler",
"value-serialization-failure-handler",
"merge",
"interceptor-bean",

// Remove most common attributes, may have been configured from the default config
"key.deserializer",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerInterceptor;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.common.errors.InvalidTopicException;
Expand Down Expand Up @@ -85,15 +86,17 @@ public class KafkaSink {
private final Instrumenter<KafkaTrace, Void> instrumenter;

public KafkaSink(KafkaConnectorOutgoingConfiguration config, KafkaCDIEvents kafkaCDIEvents,
Instance<SerializationFailureHandler<?>> serializationFailureHandlers) {
Instance<SerializationFailureHandler<?>> serializationFailureHandlers,
Instance<ProducerInterceptor<?, ?>> producerInterceptors) {
this.isTracingEnabled = config.getTracingEnabled();
this.partition = config.getPartition();
this.retries = config.getRetries();
this.topic = config.getTopic().orElseGet(config::getChannel);
this.key = config.getKey().orElse(null);
this.channel = config.getChannel();

this.client = new ReactiveKafkaProducer<>(config, serializationFailureHandlers, this::reportFailure,
this.client = new ReactiveKafkaProducer<>(config, serializationFailureHandlers, producerInterceptors,
this::reportFailure,
(p, c) -> {
log.connectedToKafka(getClientId(c), config.getBootstrapServers(), topic);
// fire producer event (e.g. bind metrics)
Expand Down Expand Up @@ -235,7 +238,8 @@ record = getProducerRecord(message, outgoingMetadata, incomingMetadata, actualTo

Uni<Void> uni = sendUni.onItem().transformToUni(recordMetadata -> {
OutgoingMessageMetadata.setResultOnMessage(message, recordMetadata);
log.successfullyToTopic(message, record.topic(), recordMetadata.partition(), recordMetadata.offset());
log.successfullyToTopic(message, recordMetadata.topic(), recordMetadata.partition(),
recordMetadata.offset());
return Uni.createFrom().completionStage(message.ack());
});

Expand Down
Loading

0 comments on commit aff3c11

Please sign in to comment.