Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix consumer pause resume functionality #1382

Merged

Conversation

arszen123
Copy link
Contributor

This fixes #1376

The problem was that when in the eachMessage or eachBatch the consumption from a topic-partition is paused and an error is thrown (breaks further consumption of messages). The same batch (that was already fetched) is processed again, instead of fetching a new batch of messages.

src/consumer/runner.js Show resolved Hide resolved
@@ -404,39 +440,7 @@ module.exports = class Runner extends EventEmitter {
await this.heartbeat()
}

return this.retrier(async (bail, retryCount, retryTime) => {
try {
await onBatch(batch)
Copy link
Collaborator

@Nevon Nevon Jun 1, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This now means that on every error, we are going to re-fetch the data for no real reason. The data that we received is still valid, it's just that we don't want to process some of it. I would instead suggest that we conditionally invoke onBatch only if the topic-partition is not paused. That way if the topic-partition was paused in the execution of the handler, we just continue processing the other data included in the fetch response without having to do any additional data fetching, and we don't have to do any particularly intrusive changes to the runner. It already has access to the ConsumerGroup which holds the state of which topic-partitions are paused. It does mean that we do filtering in two places, but I would consider that a much smaller price to pay.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Correct me if I am wrong, but based on the fetch manager implementation, the rest of the batches (from other partitions) will be processed in that cycle. Therefore there will be no unnecessary data fetching.
If the processing of a batch fails, it is most likely that part of that batch will be re-fetched in the next cycle, which I think is okay, because we don't know what the last committed offset was to filter out messages. It is possible to fetch the offsets but I think it's more convenient to drop and re-fetch.
Thus the messages from the paused partition will be fetched in the next fetch cycle, right after the consumption from that partition is resumed.

@Nevon
Copy link
Collaborator

Nevon commented Jun 27, 2022

This also solves the issue of unhandled promise rejections mentioned here

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Pausing the consumer doesn't work in 2.x versions as before
2 participants