Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Supports multi-topic in CommittableOffsetBatch #1041

Open
wants to merge 7 commits into
base: series/2.x
Choose a base branch
from

Conversation

geirolz
Copy link
Contributor

@geirolz geirolz commented Jul 28, 2022

The purpose of this PR is to resolve issue #347 at its root allowing the CommittableOffsetBatch to process records from different topics.
It's very useful when you consume from multiple topics merging the streams.

I didn't address the tests for now, I'll do asap, I the meanwhile let me know what you think about the solution.

Open points:

  • Not sure about the error message/exception class name/message in case we can't retrieve the commit action for a specific topic. Any suggestions?

@geirolz geirolz changed the title Supports multi-topic batch commit Supports multi-topic in CommittableOffsetBatch Jul 28, 2022
@geirolz geirolz marked this pull request as ready for review July 28, 2022 15:53
@geirolz geirolz force-pushed the multi_topic_commit branch from 3601e49 to 67cde6c Compare July 30, 2022 16:36
@geirolz geirolz force-pushed the multi_topic_commit branch from 67cde6c to ea31b9b Compare July 30, 2022 16:47
@geirolz
Copy link
Contributor Author

geirolz commented Mar 21, 2023

Hi @bplommer did you have the chance to take a look at this?

@aartigao
Copy link
Contributor

Hi @geirolz!

I've read this PR and the issue #347 and I'm confused about the usage of Topics and Consumer Groups. I'm trying to understand why current CommittableOffsetBatch implementation is not sufficient and what's the use case scenario for that.

For sure I'm missing something (and I'd assume that the issue comes when using multiple Consumer Groups, not Topics, correct me if I'm wrong). If ConsumerSettings only allows a single Group ID how come it ends a stream with multiple Consumer Groups? Is it because you merge multiple KafkaConsumer.records streams?

If that's the case, I'd say that this is something to be handled in user land 🤔 Because by reading the Scaladoc of CommitableOffsetBatch#consumerGroupIds:

There might be more than one consumer group ID in the set
if offsets from multiple consumers, with different group
IDs, have accidentally been mixed. The set might also be
empty if no consumer group IDs have been specified.

The key point is: accidentally been mixed. That suggests it's forbidden, and I think it's right because every KafkaConsumer has its own underlying session/connection with Kafka, and manage its offsets isolated from other groups.

@geirolz
Copy link
Contributor Author

geirolz commented Sep 30, 2023

Hi @aartigao, I've noticed this problem in my previous company. We where consuming from multiple topics merging all messages into one single stream consumed by and internal processor, once processed we had to commit and here the problem came out.

Untitled (1)

Moreover as #347 this behaviour it's a bit implicit by semantic.

I don't see the disadvantages of supporting this but I may missed something since it was quite easy to implement. I wonder what's the reason behind this design decision then

@aartigao
Copy link
Contributor

Thanks for sharing the graph, now to me it makes sense what's happening.

Most probably those consumers have a different Consumer Group, right?

I'd need to dig deep on this when I have time but I'm afraid that depending on how the underlying Kafka Client works it may not be possible to fix this at lib level. That would explain the decision to support a single Consumer Group only.

Also, just to give more context, a Consumer Group can manage multiple Topics offsets (in fact, multiple TopicPartition). Maybe the case in your company's design was to have a 1:1 relation between Topic and Consumer Group, but that's not the general case. This is where I was confused about before, by looking at the PR implementation using the Topic name as the Map's key.

But maybe I'm wrong, ofc 😅 I'll try to clarify the situation.

In the meantime, thanks for giving your time to create and clarify this PR 🙌🏼

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Development

Successfully merging this pull request may close these issues.

2 participants