Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

High water mark offset support to the consumer #339

Merged
merged 1 commit into from
Apr 27, 2015

Conversation

wvanbergen
Copy link
Contributor

Expose high water marker offset in consumer for lag monitoring purposes.

@Shopify/kafka

@eapache
Copy link
Contributor

eapache commented Mar 12, 2015

need rebase

@wvanbergen wvanbergen force-pushed the consumer_highwatermark branch from 3a200f5 to 3f853c6 Compare March 12, 2015 20:09
@@ -238,6 +239,11 @@ type PartitionConsumer interface {
// errors are logged and not returned over this channel. If you want to implement any custom errpr
// handling, set your config's Consumer.ReturnErrors setting to true, and read from this channel.
Errors() <-chan *ConsumerError

// HighWaterMarkOffset returns the high water mark offset of the partition, i.e. the offset that will
// be used for the next message that will be produced. You can use this to determine high far behind
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

s/high/how

@wvanbergen wvanbergen force-pushed the consumer_highwatermark branch from 3f853c6 to 8e13cd0 Compare March 12, 2015 20:23
@wvanbergen
Copy link
Contributor Author

I think we either need to expose the high water mark on the ConsumerMessage, or expose the latest consumed offset on the PartitionConsumer. With just this, you need to have both a message and the related PartitionConsumer in scope in order to do monitoring. I rather change that to be a single object.

@eapache
Copy link
Contributor

eapache commented Mar 12, 2015

With the buffer on the return I don't think there's a reliable non-confusing way to expose that information though. I think in the common case you will have both in scope anyways:

for msg := range pc.Messages() {
    // do whatever, including msg.Offset and pc.HighWaterMark()
}

@wvanbergen
Copy link
Contributor Author

As a counter example, in my consumergroup library, you will get messages from all partitions on the channel, and you don't have access to the partition consumers. I guess I could somehow build some code around that to make it work, but it feels painful.

@eapache
Copy link
Contributor

eapache commented Mar 12, 2015

What about exposing a GetChild(topic, partition) method (or some other name) on the master consumer which returns the PartitionConsumer but doesn't try to create it. Then your library's HighWaterMark(topic, partition) implementation can be

consumer.GetChild(topic, partition).HighWaterMark()

I think this is potentially a useful method to have anyways, and is very easy to write.

@eapache
Copy link
Contributor

eapache commented Apr 24, 2015

I still like this PR as it stands (needs a rebase, but). Maybe we need to expose this information in other places as well, but the current place on the PartitionConsumer is the simplest place to start.

@wvanbergen wvanbergen force-pushed the consumer_highwatermark branch from 7fd499b to 5f78d90 Compare April 25, 2015 12:27
@wvanbergen wvanbergen changed the title Add high water mark offset support to the consumer. High water mark offset support to the consumer Apr 25, 2015
@wvanbergen
Copy link
Contributor Author

Rebased.

// HighWaterMarkOffset returns the high water mark offset of the partition, i.e. the offset that will
// be used for the next message that will be produced. You can use this to determine how far behind
// the processing is.
HighWaterMarkOffset() int64
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

FWIW, the kafka docs have "watermark" as one word, so I guess HighWatermarkOffset would be more consistent, but YMMV

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

oh, I guess that's my fault for misnaming the variable in FetchResponseBlock :(

Add it to the list of breaking changes I guess...

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm, grammatically I feel my capitalization closer to high-water mark instead of the incorrect high watermark.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Personally, I have a problem with it being high-water mark offset; I'd have preferred high-water mark or high-water offset instead of this Frankensteinian composition. But that would be deviating from the protocol spec too much. Oh well :P

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

And for a different opinion, wikipedia doesn't even hyphenate, it just uses three distinct words ("high water mark") everywhere. So I guess this issue is contentious :)

This is fine as-is (and is consistent with FetchResponseBlock). We can revisit if anybody complains.

@eapache
Copy link
Contributor

eapache commented Apr 27, 2015

This LGTM. :shipit: if you're comfortable with the API.

@wvanbergen
Copy link
Contributor Author

I am ok with this API as a low level building block, but for the high-level consumer we should definitely give this some more thought.

wvanbergen added a commit that referenced this pull request Apr 27, 2015
High water mark offset support to the consumer
@wvanbergen wvanbergen merged commit 197f620 into master Apr 27, 2015
@wvanbergen wvanbergen deleted the consumer_highwatermark branch April 27, 2015 13:41
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants