-
Notifications
You must be signed in to change notification settings - Fork 1.8k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat: add methods to pause/resume consumer's consumption #2005
Conversation
I'm with the same error related to the CLA that @faillefer mentioned with another PR |
Looks like CLA check is now green. |
@raulnegreiros 👋🏻 hey, I'm sorry this took so long to review, I hadn't spotted that it was still pending. Overall the changes look reasonable and it's good for us to sync up with the Java client with this capability However, I'm not sure the diff --git a/consumer_test.go b/consumer_test.go
index d0a4ac6..aac3c3f 100644
--- a/consumer_test.go
+++ b/consumer_test.go
@@ -91,7 +91,7 @@ func TestPauseResumeConsumption(t *testing.T) {
t.Fatal(err)
}
- consumer.Pause()
+ // consumer.Pause()
// Then: no message is available
if len(consumer.Messages()) != 0 {
@@ -99,7 +99,7 @@ func TestPauseResumeConsumption(t *testing.T) {
}
// When
- consumer.Resume()
+ // consumer.Resume()
// Then: messages starting from offset 1234 are consumed.
for i := 0; i < 10; i++ { |
Ping @raulnegreiros |
Hello 👋 is this likely to get merged? |
@d-baranowski still waiting for someone to fixup the unittests. As mentioned above, they’re not currently exercising the functionality that was added |
I'll take a look after work |
sorry the huge delay. Click to expand the test code``` golang func TestPauseResumeConsumption(t *testing.T) {
}
|
It aims to allow consumption control, providing some methods to pause and resume consumer consumption. When your data destination is offline it becomes pointless to continue to consume new messages from the broker once it certainly will result in an error. The Java library already provides something thing similar to it. Note that the consumption state is not preserved between the rebalance process, so it's the user responsibility to manage it using the callbacks.
fb8dd6d
to
7c4f89a
Compare
@dnwe are you sure you wanted to push force?
in the forced branch. I think it Is breaking the tests. Do you want I fix it? |
@raulnegreiros thanks! I was trying to spot where I'd messed up the merge conflict resolution |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks @raulnegreiros — I'm ready to merge this for the next release, but please could you rebase one last time to include #2078 — I think you might need to adjust the highwatermark offset in the mock consumer in a similar way and update your tests
done @dnwe! |
@raulnegreiros thank you for a great contribution! Merged |
When using the new "pause consumer" support (IBM#2005), Sarama would incorrectly submit empty FetchRequests if all of the assigned partitions were paused. This is because we only used Pause to skip adding the topicPartition blocks to the FetchRequest and still went ahead and sent the Fetch even if it was essentially empty:
I'm submitting this PR in order to solve this issue.
It aims to allow consumption control, providing some methods to pause and resume consumer consumption.
Motivation:
When your data destination is offline it becomes pointless to continue to consume new messages from the broker once it certainly will result in an error. The Java library already provides something thing similar to it.
Note that the consumption state is not preserved between the rebalance process, so it's the user responsibility to manage it using the callbacks.