Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add consumer pause & resume #41

Merged
merged 9 commits into from
Mar 26, 2018
Merged

Add consumer pause & resume #41

merged 9 commits into from
Mar 26, 2018

Conversation

Nevon
Copy link
Collaborator

@Nevon Nevon commented Mar 9, 2018

This also fixes a bug where if you specify 0 as the partition for a message, the partitioner would fall back to using the hash of the key, as it would consider message.partition falsy.

@Nevon Nevon requested a review from tulios March 9, 2018 14:04
@soroushhakami
Copy link

👏

@tulios
Copy link
Owner

tulios commented Mar 9, 2018

Looks good, I would just mention in the readme that the topic won't be fetched in the next cycle, you might get messages from the paused topic in the current batch.

@tulios
Copy link
Owner

tulios commented Mar 9, 2018

You can also add the signature of the methods in the readme and mention that the library currently doesn't support specific partitions.

@Nevon
Copy link
Collaborator Author

Nevon commented Mar 9, 2018

This works fine as long as you are subscribed to multiple topics, and don't pause all of them. If you do, however, it looks like the fetch loop becomes synchronous, so nothing else ever gets a chance to execute.

@Nevon
Copy link
Collaborator Author

Nevon commented Mar 9, 2018

@tulios: 🏖💻🍹

Copy link

@joaomilho joaomilho left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just some dumb comments, overall code is quite clear.

this.subscriptionState.resume(topics)
}

paused() {

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

small stuff, but wouldn't a get paused() be a nicer API?

@@ -0,0 +1,26 @@
module.exports = class SubscriptionState {
constructor() {
this.pausedTopics = new Set()

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fancy

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Don't want any of those filthy duplicates in here. 💅

@@ -26,7 +26,7 @@ module.exports = () => {
const availablePartitions = partitionMetadata.filter(p => p.leader >= 0)
const numAvailablePartitions = availablePartitions.length

if (message.partition) {
if (message.partition !== null && message.partition !== undefined) {

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why? Is false a valid partition?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No, but 0 is.

@Nevon
Copy link
Collaborator Author

Nevon commented Mar 9, 2018

This is as fascinating as it is terrifying. I'm running my consumer and pausing/resuming by calling an HTTP endpoint. If I pause consuming of all topics, KafkaJS goes into a busy loop that won't let me call the endpoint to resume consuming.

The culprit appears to be:

const results = await Promise.all(requests)
in ConsumerGroup#fetch. requests will be an empty array, so my thinking was that maybe await Promise.all([]) resolves synchronously or something (doesn't make any sense, but nothing makes sense in this world). So I tried changing it to:

const results = await Promise.all(requests.length > 0 ? requests : [new Promise(resolve => process.nextTick(() => resolve([])))])

This did not help. At this point I know that it resolves asynchronously, but still, express never got a chance to do anything.

So, let's try with setTimeout:

const results = await Promise.all(requests.length > 0 ? requests : [new Promise(resolve => setTimeout(() => resolve([]), 0))])

This works.

At this point someone needs to explain to me how the event loop works, because clearly it does not work the way I think it does.

This avoids going into an infinite loop when there are no topics
to fetch from.
@Nevon
Copy link
Collaborator Author

Nevon commented Mar 9, 2018

I'm still confused, but at least I now know why process.nextTick didn't work. https://nodejs.org/en/docs/guides/event-loop-timers-and-nexttick/#process-nexttick

Scheduling fetch using setImmediate makes more sense.

@Nevon
Copy link
Collaborator Author

Nevon commented Mar 15, 2018

I've been running this for a while now, and it seems to be working fine. However, if you're running with a loglevel >= info, you'll produce an insane amount of logs, as each fetch loop logs, and without any topics to fetch from, it will basically produce logs as fast as it can.

I think it makes sense to add some throttling to the fetch loop, at least when you're not subscribed to any topic.

@tulios tulios merged commit c04a39b into master Mar 26, 2018
@Nevon Nevon deleted the add-consumer-pause branch March 26, 2018 14:31
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants