-
Notifications
You must be signed in to change notification settings - Fork 162
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Use offset_fetch api_version when fetching messages #450
Use offset_fetch api_version when fetching messages #450
Conversation
Hello, @GDegrove! This is your first Pull Request that will be reviewed by SourceLevel, an automatic Code Review service. It will leave comments on this diff with potential issues and style violations found in the code as you push new commits. You can also see all the issues found on this Pull Request on its review page. Please check our documentation for more information. |
1cbed9d
to
5a68968
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Apart from my comment, this looks good to me. Given that it's a sensitive piece of code and that I worked on this with you, I'd rather we had a second approval.
@dantswain @joshuawscott could either of you take a look ? It's a short one :)
lib/kafka_ex.ex
Outdated
offset_commit_api_version = Keyword.get(opts, :offset_commit_api_version, 0) | ||
|
||
retrieved_offset = | ||
current_offset(supplied_offset, partition, topic, worker_name) | ||
current_offset( | ||
supplied_offset, | ||
partition, | ||
topic, | ||
worker_name, | ||
offset_commit_api_version, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nit-pick on this: I'm wondering if we shouldn't offer a choice to the user whether they wish to use offset_commit_api_version
also for offset_fetch_api_version
.
In any case, I'm thinking that maybe some explicit variable name would help the reader follow the logic and make it explicit that we've decided that offset_fetch_api_version = offset_commit_api_version
.
Suggestion:
offset_commit_api_version = Keyword.get(opts, :offset_commit_api_version, 0)
# By default, it makes sense to synchronize the API version of the offset_commit and the offset_fetch
# operations, otherwise we might commit the offsets in zookeeper and read them from Kafka, meaning
# that the value would be incorrect.
offset_fetch_api_version = Keyword.get(opts, :offset_fetch_api_version, offset_commit_api_version)
retrieved_offset =
current_offset(
supplied_offset,
partition,
topic,
worker_name,
offset_fetch_api_version,
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It makes real sense, I pushed the correction :)
5a68968
to
8109298
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
OK for me. Once we have a second approval, we can merge.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
lgtm
Thanks a lot @bjhaid ! |
When switching to the new kayrock client, we noticed that the api_version for the auto_commit of the fetch message is not taken into account.
Therefore, even if using the API version 2 for the offset commit with kayrock, the next fetch will always use the api_version 0
This introduces a divergence in the way the new client can be used compared to the old implementation. As it's up to the user code to provide the correct offset to fetch.