Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Obey offsets stored in Kafka for existing groups #42

Merged

Conversation

myNameIsPatrick
Copy link
Collaborator

@myNameIsPatrick myNameIsPatrick commented Aug 15, 2020

This PR should address #41. In Consumer.subscribe(), instead of making an assignment based on the start_at position in the configuration, it'll call get_watermark_offsets() to grab the current offset in the consumer to make that assignment. In the case there is no offset stored, it'll pull the value stored via "auto.offset.reset" in the configured consumer.

I have also moved where the warning about using the LATEST offset is raised since this check isn't being done in the same location anymore.

Closes #41.

Copy link
Contributor

@spenczar spenczar left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think this is the right method to call. get_watermark_offsets gets the lowest and highest offset values for the topic-partition pair. It doesn't know anything about the stored position for the consumer group. Different consumers could be at different positions in the stream, but this would always set consumers to read from the last message.

I would recommend adding an integration test to make sure the behavior is right - this is pretty subtle, tricky stuff.

@myNameIsPatrick
Copy link
Collaborator Author

I don't think this is the right method to call. get_watermark_offsets gets the lowest and highest offset values for the topic-partition pair. It doesn't know anything about the stored position for the consumer group. Different consumers could be at different positions in the stream, but this would always set consumers to read from the last message.

You're absolutely right. The tests I was doing before didn't capture that correctly and running it again showed that this just points to the latest offset. So looking at this a bit more, I have a question about this. Why are we setting the offset at all? Shouldn't the offset already stored be the correct one for a given consumer group? At least by not setting the offset here when calling Consumer.assign(), it seems on a first glance to do the right thing when I tried out mixing in different consumer groups and which messages were read.

I would recommend adding an integration test to make sure the behavior is right - this is pretty subtle, tricky stuff.

That sounds good. I'll take a crack at this later.

@spenczar
Copy link
Contributor

Why are we setting the offset at all? Shouldn't the offset already stored be the correct one for a given consumer group?

We set them explicitly to handle initial offsets, when a consumer group hasn't committed anything. But this might not be necessary; we set auto.offset.reset when constructing the confluent_kafka Consumer, which should handle that part automatically. I think you might be right that we skip setting the offsets at all.

@myNameIsPatrick
Copy link
Collaborator Author

@spenczar, I added integration tests for testing stored offsets with different consumer groups. I also removed the CONSUMER value in ConsumerStartPosition since you were mentioning that wasn't needed anymore in the issue listed.

One other thing I did, and wanted to check with you on (made a last single commit that would be easy to revert), is that I removed the option to allow an arbitrary start_at being passed into auto.offset.reset since that feature isn't available via librdkafka through the configuration. Now the only options are earliest or latest. I also changed the relevant test to check reading from beginning to complement the integration test reading from the latest offset. I'm not sure supporting an arbitrary offset makes sense anymore given the changes; as in, this may not make sense having it as the default value if the consumer hasn't committed any offsets yet.

Copy link
Contributor

@spenczar spenczar left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I really like the test code here. Looks good.

@spenczar spenczar merged commit 4e489f8 into astronomy-commons:master Aug 19, 2020
@spenczar
Copy link
Contributor

This is in the v1.1.0 release, which I've pushed to PyPI and Conda.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

ConsumerStartPosition.PRODUCER not handled properly in Consumer
2 participants