-
Notifications
You must be signed in to change notification settings - Fork 1.8k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Dont require all consumers drained #485
Conversation
6216a3f
to
cdb278c
Compare
child.messages <- msg | ||
} | ||
child.broker.input <- child | ||
continue feederLoop |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can you explain this section a bit more? Can the second child.messages <- msg
block as well, or am I understand this incorrectly?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is happening in its own goroutine. If the timeout triggers, we send a message back to the brokerConsumer telling it to take us out of the pool, and then we feed the rest of the messages to the user at our leisure (so yes, the second message send probably blocks until the user catches up, but it doesn't block any other partitions).
Once we've flushed all our messages, we put ourselves back in the pool.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
So, if the messages never get drained by the consumer (because their consumer goroutine shut down for some reason, we will end up with a stuck goroutine?
I think that's OK in that scenario, just want to understand the code.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yup, it will still shut down if/when Close
is called though since that drains any outstanding messages.
I think it's doable to add a test for this: consume 2 partitions, drain only one of them. |
@eapache @wvanbergen There is such a test case. It is about to be introduced in #492. It is skipped there. |
cdb278c
to
a53ff72
Compare
OK, please re-review this PR as it should now also fix all the race conditions that the previous changes introduced. If it looks good, I'll merge it and then you can rebase #492 without the skipped test. |
|
||
fetchSize int32 | ||
offset int64 | ||
highWaterMarkOffset int64 | ||
} | ||
|
||
var errTimedOut = errors.New("timed out feeding messages to the user") // not user-facing |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Capitalize error message?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hmmm, all the configuration errors (see config.go above) are capitalized.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
good point, I've submitted a PR to fix them :)
I think this looks OK. @horkhe any final comments? |
Take the previous refactor to its logical conclusion by handling *all* the error logic in the brokerConsumer, not the responseFeeder. This fixes the race to close the dying channel (since the brokerConsumer can just close the trigger instead as it has ownership). At the same time, refactor `updateSubscriptionCache` into `handleResponses`, and inline the "new subscriptions" bit into the main loop; otherwise we end up processing the previous iterations results at the very beginning of the next iteration, rather than at the very end of the current one.
Prep for unblocking consumers that are not being drained
If a partitionConsumer fills up and is not being drained (or is taking a long time) remove its subscription until it can proceed again in order to not block other partitions which may still be making progress.
f7da387
to
292f3b0
Compare
Dont require all consumers drained
If a partitionConsumer fills up and is not being drained (or is taking a long time) remove its subscription until it can proceed again in order to not block other partitions which may still be making progress.
@Shopify/kafka @horkhe