Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

In case of offsetOutOfRange, the consumer should be paused #170

Closed
kadishmal opened this issue Mar 5, 2015 · 3 comments
Closed

In case of offsetOutOfRange, the consumer should be paused #170

kadishmal opened this issue Mar 5, 2015 · 3 comments

Comments

@kadishmal
Copy link
Contributor

The following is the output from the latest "kafka-node": "0.2.21".

In our CI environment we test our code which has to send X messages to Kafka. Once all the messages are sent, we verify that Kafka has all the messages by reading all of them off of it. We assert that the X messages that we expect equal to the amount of messages read from Kafka. Pretty simple.

Now I've added some console.log to kafka-node HighLevelConsumer to show that when offsetOutOfRange event is emitted, we need to pause the consumer so that while we are adjusting the offset, the consumer doesn't try to fetch message because that emits yet another offsetOutOfRange event. The second one is unnecessary.

$ node kafka.js
Committed offset for "users-updated-db" topic is 179.
The earliest message available in Kafka starts at 180 offset.
The latest offset in Kafka is 188 meaning there are 8 messages in the queue
8 need to be consumed.
Going to consume messages from 180 offset for "users-updated-db" topic considering the earliest message available in Kafka.
Going to fetch. Logged in `.fetch()`.
offsetOutOfRange for topic 'users-updated-db'. Going to request the earliest offset available.
'done` event is triggered after every fetch response. Received 0 messages.
Going to fetch. `.fetch()` is called again.
The earliest offset available in Kafka is 180. Going to set this offset to a topic.
offsetOutOfRange for topic 'users-updated-db'. Going to request the earliest offset available.
'done` event is triggered after every fetch response. Received 0 messages.
Going to fetch. `.fetch()` is called again.
The earliest offset available in Kafka is 180. Going to set this offset to a topic.
'done` event is triggered after every fetch response. Received 0 messages.
Going to fetch. `.fetch()` is called again.
Message #1.
'done` event is triggered after every fetch response. Received 1 messages.
Going to fetch.
Message #2. 
Message #3. 
Message #4. 
Message #5. 
Message #6. 
Message #7. 
Message #8. 
'done` event is triggered after every fetch response. Received 8 messages.

As you can see offsetOutOfRange is triggered twice because fetch kept being called even though the first offsetOutOfRange event was emitted. This results in 2 requests to self.offsetRequest([topic], function (err, offsets) {....

To avoid this it is necessary to pause the consumer as soon as offsetOutOfRange event is emitted.

I have fixed this. Let me know if you're ready to accept the PR.

This issue is related with #169.

@haio
Copy link
Member

haio commented Mar 5, 2015

Can you send the fix along with PR #171, thanks!

@kadishmal
Copy link
Contributor Author

Ok, both are done.

@haio
Copy link
Member

haio commented Mar 6, 2015

ok, thanks.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants