-
Notifications
You must be signed in to change notification settings - Fork 1.8k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[WIP] Initial offset management implementation. #379
Conversation
The name of the topic is |
b5466ce
to
d441c77
Compare
72ba0c9
to
3afb001
Compare
|
I came to realize that we should use |
@@ -320,11 +325,171 @@ func (client *client) GetOffset(topic string, partitionID int32, time int64) (in | |||
return block.Offsets[0], nil | |||
} | |||
|
|||
func (client *client) CommitOffset(consumerGroup string, topic string, partitionID int32, offset int64, metadata string) error { | |||
return client.tryCommitOffset(consumerGroup, topic, partitionID, offset, metadata, 5) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
in hindsight I kind of dislike this pattern, I think I'd prefer a loop. YMMV
Why? They are entirely different fields in the request... |
return client.tryCommitOffset(consumerGroup, topic, partitionID, offset, metadata, 5) | ||
} | ||
|
||
func (client *client) tryCommitOffset(consumerGroup string, topic string, partitionID int32, offset int64, metadata string, attemptsRemaining int) error { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
re. group vs id etc, I think Consumer.Group
should be a config, and these should be methods on PartitionConsumer
that use that config (and the topic/partition etc.)
👍 for splitting up the functional tests |
I split up the functional tests in a separate PR (#388) |
3afb001
to
8fdb6a8
Compare
Updated:
I am happy with the
|
I am still struggling with finding a nice API for the Fetch and Commit calls. I don't really want to add them to PartitionConsumer initially, because this requires you to have the PartitionConsumer instance to be in scope, which can easily not be the case (see #339). Also, it wouldn't allow for batching. So I'd like to have a generic method on Client that allows you to commit/fetch multiple offsets at once, that handles discovering the coordinator, and dealing with retries. We can then build higher level APIs on top of that. |
I'm not sure I agree - in 99% of cases the consuming code will be for msg := range partitionConsumer.Messages() {
// do whatever
} and will trivially have both in scope. I'm wondering if we should perhaps rethink the consumer group API instead. We can discuss this offline at some point - a good first step is to split the |
I imagined #366 returning a |
0324256
to
60e71cb
Compare
60e71cb
to
16f46ef
Compare
Rewrote this based on the latest master (which has The questions around API design are still outstanding. |
W.r.t. API design: I think we either need to move towards the model of having an integrated high-level consumer, or stick the the model of a low-level consumer with utility methods for now. Moving towards a higher level consumer (using ZK, and/or later using the new APIs that kafka 0.8.3 or 0.9 will offer) would be in line with the JVM library. It would involve adding a new type (e.g. If we are not ready to go down that path yet, I suggest we expose these methods as is on either the |
Reviving this. My preferred approach:
|
I've been thinking more about where this code should go, and I maintain that the client is the wrong spot for it, but I'm coming to agree that the consumer (or partition-consumer) is also wrong. There's actually going to be a fair bit of code involved in proper offset management, handling flushes per-time and per-offset, handling and rebatching retries when leadership moves. It will basically be a somewhat-simplified producer (I actually played with just adapting the producer code, but there are enough differences to make that impractical). As such, I'm coming to the conclusion that it should just be its own separate object entirely (
Does this make sense at all or am I on the wrong track? |
Having these "raw" API methods exposed would be great. I understand the point of batching, but having a straight-through low level method available would be a big help until Sarama matures to having proper handling of these, like @eapache suggests. It would also make it possible to implement different strategies depending on the use case. Just out of curiosity - is there a chance of this being merged as-is? |
I don't mind adding "raw" building block methods like these. However, if we are going down that path, they should be able to accept offsets for multiple partitions, not just one, so we can use them as building blocks for higher level constructs like the offset manager that uses batching. |
Functionally, all the low-level building blocks are already available; you can call It's also worth noting that the more advanced handling I suggested is actually fairly far along (see the branch https://github.com/Shopify/sarama/tree/offset-manager), I just need to find some time to clean it up and test it. |
Ah, right, hadn't considered that approach. Thanks! Can't wait for the offset manager to be done :) |
Attempt at fixing #2. This adds the following methods to
Client
:CommitOffset(group, topic, partition, offset, metadata)
FetchOffset(group, topic, partition)
Notes
@Shopify/kafka