-
Notifications
You must be signed in to change notification settings - Fork 1.8k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat(consumer): incremental cooperative balance strategy (KIP-429) #2608
base: main
Are you sure you want to change the base?
feat(consumer): incremental cooperative balance strategy (KIP-429) #2608
Conversation
b7a87f1
to
1c8e617
Compare
@napallday - I'm exited to see that cooperative incremental re-balancing is making its way into Sarama. I think we have apps that could benefit from it. I've had a quick play with the consumergroup_cooperative example and here's what I found:
Regarding your Seeking Community Guidance question. IMO it would be great to add cooperative re-balancing support as soon as practical - without necessarily waiting for Sarama v2. For example, Sarama v1 could be extended with the |
1c8e617
to
03e4160
Compare
@napallday thanks for putting together this PR! RE: 3. mentioned by @prestona it looks like a bug in our decoding of the MemberMetadata in the JoinGroups response where we return early if OwnedPartitions is an empty array, so we haven't consumed the whole buffer. I'll fix that up in a separate PR I think you also need to cope with UserData being empty in the balance strategy to avoid a decoding failure: diff --git a/balance_strategy.go b/balance_strategy.go
index cc7a0a3..eab26b2 100644
--- a/balance_strategy.go
+++ b/balance_strategy.go
@@ -967,6 +967,9 @@ func prepopulateCurrentAssignments(members map[string]ConsumerGroupMemberMetadat
// for each partition we create a sorted map of its consumers by generation
sortedPartitionConsumersByGeneration := make(map[topicPartitionAssignment]map[int]string)
for memberID, meta := range members {
+ if len(meta.UserData) == 0 {
+ continue
+ }
consumerUserData, err := deserializeTopicPartitionAssignment(meta.UserData)
if err != nil {
return nil, nil, err I'll add this change as well after rebasing your PR on #2618 |
03e4160
to
ada6a47
Compare
That seemed to work well for me. I ran sarama, rdkafka and (Java) kafka-console-consumer in the same cooperative balance group and ran some chaos testing restarting each of them randomly and each member was able to join/rejoin/lead without causing issues for the others In the middle of lots of runs I did trigger one protocol decode failure in |
Big thanks to @prestona and @dnwe for the thorough testing and fix! 😄
@dnwe Regarding this issue, could you help provide information about the combination of Kafka clients and the version of Kafka server used during testing? I personally conducted tests using Sarama, without direct interaction with |
@napallday no worries, so I set KAFKA_VERSION=3.3.2 and started up docker-compose as used in the FV with the toxiproxy routes configured and created a six partition topic called Then in three different terminals I ran:
And stopped and started each of them at various points so they all had a turn leading and joining the group |
The panic will be reproduced following these steps: Step 1. Run the Sarama consumer (Leader). This is due to the following improvements of cooperative-sticky strategy implementations within Java. Specifically, in I will address this issue at a later time. |
6b95cec
to
5214325
Compare
FYI, I've submitted a commit to fix the issue mentioned above. 🛠️ After extensive testing with different Kafka clients, including However, during testing with |
Interesting. It took me a few attempts, but yes I also managed to trigger that IllegalStateException with a Java 2.8.1 consumer:
|
@napallday OK I looked over apache/kafka and I see that they skipped this validation code in the kafka-client under KAFKA-13406 and PR #11439 due to issues with the verification and that was the Java client behaviour from 2.8.2 onward |
Cool! 🎉 Huge thanks to @dnwe for the invetigation. The explanations provided in the Jira ticket are crystal clear. By the way, just like this fix in apache/kafka, the official Java clients have some improvements in sticky and cooperative-sticky assignors since the initial introduction of the incremental cooperative balance protocol. In the future, we can integrate these enhancements gradually. |
Add support for the incremental cooperative protocol as outlined in KIP-429 https://cwiki.apache.org/confluence/display/KAFKA/KIP-429%3A+Kafka+Consumer+Incremental+Rebalance+Protocol Signed-off-by: napallday <bzx0619@gmail.com>
fe9fdd5
to
4c83bd0
Compare
@napallday were you planning to do the follow up work around metrics, pause/resume and additional FV tests under this PR? I suppose the alternative would be to merge this as-is, but report the feature as experimental in the changelog so people can test it out in their staging environments |
070a073
to
4c83bd0
Compare
Signed-off-by: napallday <bzx0619@gmail.com>
ade333e
to
bf40615
Compare
@dnwe Thanks for the reminder. For the
Regarding the metrics and FV tests, I may add it at a later time due to some personal matters. Therefore, I concur with releasing it as an experimental feature initially. |
Thank you for your contribution! However, this pull request has not had any activity in the past 90 days and will be closed in 30 days if no updates occur. |
Hi @napallday @dnwe, it seems there is no major concern about this PR. Do you think this can be merged soon? It is indeed a useful feature I'd like to try out. But it's better to use main branch (or better a release) than a pull request branch. |
@wwwjfy 👋🏻 thanks for getting in touch The issue with merging to main at the moment is that the PR branch introduces new interfaces of ConsumeV2 and ConsumerGroupHandlerV2, which we'd then be locked into supporting and ideally we want to be exposing cooperative balance strategy as just a config change for users, rather than requiring them to rewrite their app code to make use of it. Similarly, if we merge to main without more FV coverage, it might not be obvious to users (unless they read notes on GitHub Releases) that it is considered experimental. Would it help your testing if I pull the commits into a branch on the main repository rather than them being only on a fork? |
I hope the current ConsumerGroupHandler can still use the cooperative balance stragtegy. If user want to receive the revoke and assign partition, they can provide their own OnAssignPartition and OnRevokePartition method. and the default handler will be provide if user use origin ConsumerGroupHandler without change current code. |
Thanks for the quick response :) It does make sense, if possible, we maintain a single interface than two. (Sorry I didn't read the change and the comments in this PR) I'm not sure what's the way forward. Using single interfaces will probably break backward compatibility I guess, which means we'll wait until a major version upgrade as well as the consolidation work before that. For testing, to me alone, I have my forked version anyway, though for others, what you mentioned may be useful :D Still evaluating using sarama or confluent sdk which already has this feature. will have feedback if I get anything useful |
Yes I think the ideal scenario would be if we can rework this PR to keep the Sarama consumer interface consistent, but expose an opt-in similarly to the Java client via an equivalent to the |
I feel it's hard to keep using the As the new implementation I can help add fv tests later but it seems to be a huge work... |
if we want to keep
so if user don't care about assignment change in their previous code(or they only do some log print), only enable cooperative in config will be enough. |
diff := make(map[string][]int32) | ||
for topic, partitions := range map1 { | ||
for _, partition := range partitions { | ||
if _, exist := set[topic][partition]; !exist { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can be simplified to if !set[topic][partition]
@@ -19,6 +19,8 @@ const ( | |||
// StickyBalanceStrategyName identifies strategies that use the sticky-partition assignment strategy | |||
StickyBalanceStrategyName = "sticky" | |||
|
|||
CooperativeStickyBalanceStrategyName = "cooperative-sticky" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There should be a comment for exported variable
om.pomsLock.RLock() | ||
pom := om.poms[topic][partition] | ||
om.pomsLock.RUnlock() | ||
err := pom.Close() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I met panic during local test.
panic: send on closed channel
goroutine 67 [running]:
github.com/IBM/sarama.(*partitionOffsetManager).handleError(0x1400013a300, {0x104c21ac8, 0x104fb5e30})
/Users/wwwjfy/Develop/streamsdk/tmp/vendor/github.com/IBM/sarama/offset_manager.go:748 +0xac
github.com/IBM/sarama.(*offsetManager).handleResponse(0x140004961a0, 0x14000494008, 0x140003414a0, 0x1400039bde0)
/Users/wwwjfy/Develop/streamsdk/tmp/vendor/github.com/IBM/sarama/offset_manager.go:491 +0x338
github.com/IBM/sarama.(*offsetManager).flushToBroker(0x140004961a0)
/Users/wwwjfy/Develop/streamsdk/tmp/vendor/github.com/IBM/sarama/offset_manager.go:381 +0x80
github.com/IBM/sarama.(*offsetManager).Commit(0x140004961a0)
/Users/wwwjfy/Develop/streamsdk/tmp/vendor/github.com/IBM/sarama/offset_manager.go:357 +0x20
github.com/IBM/sarama.(*offsetManager).mainLoop(0x140004961a0)
/Users/wwwjfy/Develop/streamsdk/tmp/vendor/github.com/IBM/sarama/offset_manager.go:341 +0x88
github.com/IBM/sarama.withRecover(0x0?)
/Users/wwwjfy/Develop/streamsdk/tmp/vendor/github.com/IBM/sarama/utils.go:43 +0x40
created by github.com/IBM/sarama.newOffsetManagerFromClient in goroutine 23
/Users/wwwjfy/Develop/streamsdk/tmp/vendor/github.com/IBM/sarama/offset_manager.go:113 +0x230
exit status 2
It happened during a partition is revoked from this consumer.
I tried to track how this happens, and I think this is the case:
goroutine 1:
1.1. consumer is informed a partition is revoked
1.2. it comes here to do cleanup, and this line leads to errors
channel closure at line 755
1.3. later this pom is removed from om.poms
at line 180
goroutine 2:
2.1 in the mainLoop
, om.Commit
is triggered every second by default
2.2 it constructs requests based on om.poms
(line 440)
2.3 it gets response, and if it's an error ErrIllegalGeneration
, send to pom.errors
(line 490)
In a race condition, 2.3 happens between 1.2 and 1.3, a panic will happen.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
propose change:
iff --git offset_manager.go offset_manager.go
index 6c40a88..4cfe3f5 100644
--- offset_manager.go
+++ offset_manager.go
@@ -150,10 +150,15 @@ func (om *offsetManager) RemovePartitions(topicPartitions map[string][]int32) er
go func(topic string, partition int32) {
defer wg.Done()
- om.pomsLock.RLock()
+ om.pomsLock.Lock()
pom := om.poms[topic][partition]
- om.pomsLock.RUnlock()
err := pom.Close()
+ delete(om.poms[topic], partition)
+ if len(om.poms[topic]) == 0 {
+ delete(om.poms, topic)
+ }
+
+ om.pomsLock.Unlock()
if err != nil {
errsLock.Lock()
var consumerErrs ConsumerErrors
@@ -174,17 +179,6 @@ func (om *offsetManager) RemovePartitions(topicPartitions map[string][]int32) er
}
}
- om.pomsLock.Lock()
- for topic, partitions := range topicPartitions {
- for _, partition := range partitions {
- delete(om.poms[topic], partition)
- if len(om.poms[topic]) == 0 {
- delete(om.poms, topic)
- }
- }
- }
-
- om.pomsLock.Unlock()
if len(errs) > 0 {
return errs
}
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for troubleshooting. I think this propose change works, and I also move the logic of last offset flush ahead.
// response and send another join request with that id to actually join the | ||
// group | ||
c.memberID = join.MemberId | ||
return c.retryJoinGroup(ctx, topics, retries+1 /*keep retry time*/, join.Err) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Similar to the accepted pull request #2759, should this be return c.joinGroup(ctx, topics, retries)
?
Signed-off-by: napallday <bzx0619@gmail.com>
any ETA for this feature ? |
Incremental Cooperative Protocol POC Proposal
I'm excited to present this Proof of Concept (POC) version that supports the incremental cooperative protocol as outlined in KIP-429. The proposed feature addresses a long-standing community demand (#1858). I've conducted several manual tests, and the results have been promising. (Can run examples for a try)
Major Changes
The existing design in Sarama tightly couples with the EAGER protocol, particularly evident in
ConsumerGroupSession
. In order to accommodate the newCOOPERATIVE
rebalance protocol, I've introduced theConsumerGroupHandlerV2
interface and a new method calledConsumeV2
.Seeking Community Guidance
I'm eager to align this work with the community's future direction. Two possible paths are under consideration:
Major Version Update Plan: If Sarama intends to increment the major version while introducing support for the
COOPERATIVE
rebalance protocol, it would allow us to streamline our implementation. We could eliminateConsumerGroupSession
-related coded from versionsv2.x.x
, focusing solely on the new approach.Maintaining Compatibility: Alternatively, if a major version update isn't planned, I'll take steps to enhance future maintainability. This involves refactoring common logic within the
Consume
andConsumerV2
methods, paving the way for smoother maintenance.Upcoming Work
In the pipeline, I plan to incorporate the following enhancements:
Pause
andResume
functionality within the new protocol.I'm looking forward to the community's insights and feedback as we work towards a more efficient and feature-rich Sarama.