Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Expose KafkaConsumer in ReceiverOptionsCustomizer.addAssignListener() - parity with KafkaBindingRebalanceListener #379

Open
mvazquezrius opened this issue Jan 8, 2024 · 0 comments
Labels
❓need-triage This issue needs triage, hasn't been looked at by a team member yet

Comments

@mvazquezrius
Copy link

mvazquezrius commented Jan 8, 2024

To be able to access the KafkaConsumer when using receiverOptions.addAssignListener:

Motivation

In spring-cloud-stream/issues/2727
a similar feature was requested to access the endOffsets. This was then implemented in reactor-kafka/issues/344 by @garyrussell .

In my case, instead of needing the endOffsets, I need to check which partitions have commits. The usecase is to only seekToTimestamp for partitions that have no commits.

Maybe we should expose the consumer instance as originally suggested like in KafkaBindingRebalanceListener, to avoid ending up with a facade for all possible custom needs.

Desired solution

A consumer instance is received in an overload of addAssignListener.

@Bean
public ReceiverOptionsCustomizer customizer() {
   return (bindingName, receiverOptions) -> receiverOptions.addAssignListener((partitions, consumer) -> {
     // ...
});

Additional context

spring-cloud/spring-cloud-stream#2727
#344
https://docs.spring.io/spring-cloud-stream/reference/kafka/kafka-binder/rebalance_listener.html

Workarround

Right now, I am forced to go with a very bad workaround suing the consumerListener which is cleary not good enough for production.

@Component
public class ReplayStartReceiverOptionsCustomizer {
    private final ConsumerListener consumerListener;
    public ReplayStartReceiverOptionsCustomizer(){
        this.consumerListener = new ConsumerListener();
    }
    @Bean
    public ReceiverOptionsCustomizer<String, String> customizer(){
        return (bindingName, options) -> options
                .consumerListener(this.consumerListener)
                .addAssignListener(partitions -> {
                System.out.println(">>>>> Assign listener");

                Map<String, ReceiverPartition> partitionsByKey = partitions
                        .stream()
                        .collect(Collectors.toMap(
                                Object::toString,
                                Function.identity())
                        );

                List<TopicPartition> topicPartitions = partitions.stream().map(ReceiverPartition::topicPartition).toList();
                getPartitionsWithoutCommits(this.consumerListener.consumer, topicPartitions)
                        .forEach(partition -> {
                            System.out.printf(
                                    "Partition '%s' has no offset. Seeking to start time: '%d'.", partition,
                                    1704720729);
                            partitionsByKey.get(partition.toString()).seekToTimestamp(1704720729);
                        });
            });
    }

    private static Stream<TopicPartition> getPartitionsWithoutCommits(Consumer<?, ?> consumer, Collection<TopicPartition> partitions){
        return consumer.committed(new HashSet<>(partitions))
                .entrySet()
                .stream()
                .filter(e -> e.getValue() == null)  // no commits (first time or new partition)
                .map(Map.Entry::getKey);
    }

    static class ConsumerListener implements ReceiverOptions.ConsumerListener {
        public Consumer<?,?> consumer;
        @Override
        public void consumerAdded(String id, Consumer<?, ?> consumer) {
            System.out.println(">>>>> Consumer added");
            System.out.println(id);
            System.out.println(consumer);
            this.consumer = consumer;
        }
        @Override
        public void consumerRemoved(String id, Consumer<?, ?> consumer)
        {
            this.consumer = null;
        }
    }
}

Another alternative would be to add a seekToTimestampIfNoCommits or a flag skipIfHasCommits. However, if we start adding so many options, we are basically replicating the full consumer.

@reactorbot reactorbot added the ❓need-triage This issue needs triage, hasn't been looked at by a team member yet label Jan 8, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
❓need-triage This issue needs triage, hasn't been looked at by a team member yet
Projects
None yet
Development

No branches or pull requests

2 participants