Skip to content

Commit

Permalink
Support to infer deserializers correctly batch listeners recieving Co…
Browse files Browse the repository at this point in the history
…nsumerRecords (#598)

It looks like when using a batch listener with a parameter of List<ConsumerRecord<K,V>> the framework picks a List deserializer instead of looking at the ConsumerRecord type args. With this change, it will look at the type args of the ConsumerRecord to determine the kafka message deserializer
  • Loading branch information
dhofftgt authored and sdelamo committed Mar 22, 2024
1 parent f0145e6 commit 3750b6b
Show file tree
Hide file tree
Showing 2 changed files with 38 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -949,7 +949,11 @@ private static Argument<?> findBodyArgument(ExecutableMethod<?, ?> method) {
private void configureDeserializers(final ExecutableMethod<?, ?> method, final DefaultKafkaConsumerConfiguration consumerConfiguration) {
final Properties properties = consumerConfiguration.getConfig();
// figure out the Key deserializer
final Argument<?> bodyArgument = findBodyArgument(method);
boolean batch = method.isTrue(KafkaListener.class, "batch");

Argument<?> tempBodyArg = findBodyArgument(method);

final Argument<?> bodyArgument = batch && tempBodyArg != null ? getComponentType(tempBodyArg) : tempBodyArg;

if (!properties.containsKey(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG) && !consumerConfiguration.getKeyDeserializer().isPresent()) {
final Optional<Argument<?>> keyArgument = Arrays.stream(method.getArguments())
Expand Down Expand Up @@ -986,8 +990,7 @@ private void configureDeserializers(final ExecutableMethod<?, ?> method, final D
consumerConfiguration.setValueDeserializer(new StringDeserializer());
}
} else {
final boolean batch = method.isTrue(KafkaListener.class, "batch");
consumerConfiguration.setValueDeserializer(serdeRegistry.pickDeserializer(batch ? getComponentType(bodyArgument) : bodyArgument));
consumerConfiguration.setValueDeserializer(serdeRegistry.pickDeserializer(bodyArgument));
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,11 @@ import groovy.transform.EqualsAndHashCode
import groovy.transform.ToString
import io.micronaut.configuration.kafka.AbstractKafkaContainerSpec
import io.micronaut.context.annotation.Requires
import io.micronaut.messaging.annotation.MessageBody
import io.micronaut.messaging.annotation.MessageHeader
import io.micronaut.messaging.annotation.SendTo
import io.reactivex.Flowable
import org.apache.kafka.clients.consumer.ConsumerRecord
import reactor.core.publisher.Flux
import spock.lang.Retry

Expand All @@ -18,6 +20,7 @@ class KafkaBatchListenerSpec extends AbstractKafkaContainerSpec {

public static final String BOOKS_TOPIC = 'KafkaBatchListenerSpec-books'
public static final String BOOKS_LIST_TOPIC = 'KafkaBatchListenerSpec-books-list'
public static final String BOOK_CONSUMER_RECORD_LIST_TOPIC = 'KafkaBatchListenerSpec-consumer-records'
public static final String BOOKS_HEADERS_TOPIC = 'KafkaBatchListenerSpec-books-headers'
public static final String BOOKS_FLUX_TOPIC = 'KafkaBatchListenerSpec-books-flux'
public static final String BOOKS_FLOWABLE_TOPIC = 'KafkaBatchListenerSpec-books-flowable'
Expand Down Expand Up @@ -203,6 +206,24 @@ class KafkaBatchListenerSpec extends AbstractKafkaContainerSpec {
}
}

void "test keys and values deserialized to the correct type when receiving a batch of ConsumerRecord"() {
given:
MyBatchClient myBatchClient = context.getBean(MyBatchClient)
BookListener bookListener = context.getBean(BookListener)
bookListener.books?.clear()
bookListener.keys?.clear()

when:
myBatchClient.sendToReceiveAsConsumerRecord("book-1", new Book(title: "The Flowable"))
myBatchClient.sendToReceiveAsConsumerRecord("book-2", new Book(title: "The Shining"))

then:
conditions.eventually {
bookListener.books == [new Book(title: "The Flowable"), new Book(title: "The Shining")]
bookListener.keys == ["book-1", "book-2"]
}
}

@Requires(property = 'spec.name', value = 'KafkaBatchListenerSpec')
@KafkaClient(batch = true)
@Topic(KafkaBatchListenerSpec.BOOKS_TOPIC)
Expand Down Expand Up @@ -235,13 +256,18 @@ class KafkaBatchListenerSpec extends AbstractKafkaContainerSpec {

@Topic(KafkaBatchListenerSpec.BOOKS_FLOWABLE_TOPIC)
void sendBooksFlowable(Flowable<Book> books)

@Topic(KafkaBatchListenerSpec.BOOK_CONSUMER_RECORD_LIST_TOPIC)
void sendToReceiveAsConsumerRecord(@KafkaKey String key, @MessageBody Book book)

}

@Requires(property = 'spec.name', value = 'KafkaBatchListenerSpec')
@KafkaListener(batch = true, offsetReset = EARLIEST)
@Topic(KafkaBatchListenerSpec.BOOKS_TOPIC)
static class BookListener {
List<Book> books = []
List<String> keys = []
List<String> headers = []

@Topic(KafkaBatchListenerSpec.BOOKS_LIST_TOPIC)
Expand Down Expand Up @@ -297,6 +323,12 @@ class KafkaBatchListenerSpec extends AbstractKafkaContainerSpec {
void receiveFlowable(Flowable<Book> books) {
this.books.addAll books.toList().blockingGet()
}

@Topic(KafkaBatchListenerSpec.BOOK_CONSUMER_RECORD_LIST_TOPIC)
void receiveConsumerRecords(List<ConsumerRecord<String, Book>> books) {
this.keys.addAll(books.collect { it.key() })
this.books.addAll(books.collect { it.value() })
}
}

@Requires(property = 'spec.name', value = 'KafkaBatchListenerSpec')
Expand Down

0 comments on commit 3750b6b

Please sign in to comment.