Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Provide a pause() helper to eachMessage/eachBatch #1364

Merged
merged 8 commits into from
Jun 28, 2022

Conversation

brianphillips
Copy link
Contributor

This function will take care of pausing (and optionally, resuming) message consumption on the current topic/partition when processing messages either within the eachMessage or eachBatch handler functions.

src/consumer/runner.js Outdated Show resolved Hide resolved
docs/Consuming.md Outdated Show resolved Hide resolved
@Nevon
Copy link
Collaborator

Nevon commented May 23, 2022

What's the intended use-case that makes increasing the API surface here worth it? The example from the documentation, just for comparison:

await consumer.run({
    eachMessage: async ({ topic, partition, message }) => {
        try {
            await sendToDependency(message)
        } catch (e) {
            if (e instanceof TooManyRequestsError) {
                consumer.pause([{ topic, partitions: [partition] }])
                setTimeout(() => {
                    consumer.resume([{ topic, partitions: [partition] }])
                }, e.retryAfter * 1000)
            }
            throw e
        }
    },
})

And this proposal:

await consumer.run({ eachMessage: async ({ topic, message, pause }) => {
    try {
        await sendToDependency(message)
    } catch (e) {
        if (e instanceof TooManyRequestsError) {
            pause(e.retryAfter * 1000) // returns control to KafkaJS until timeout has expired
        }
        throw e
    }
}})

To me they look very similar. The new proposal is a bit terser, but I'm not sure it's really eliminating much complexity. You still need, as a user, to understand that it's pausing a specific topic-partition, even if that parameter is now hidden internally.

The one benefit that I find interesting is that it allows a way to communicate back to the calling code that we actually want to exit out of the message loop without having to throw an error and everything this entails. Instead, we have a way for the consumer to know that we should continue operating as usual, just without processing any further fetched messages on that topic for as long as it's paused. The way it's communicated, by throwing a special error, I'm not so keen on, but the same thing can be achieved in other ways.

Then there's the thing about keeping the timer for resuming. Any time I see stateful code that includes timers, I get a bit nervous, because it's so hard to predict whether things are in a valid state to proceed with what you're doing when the timer fires. For example, does the consumer still exist? Is it still running? Is it still subscribed to the same topics? Is it still assigned the relevant partition? What if the user explicitly pauses the topic afterwards? And so on. Some of these might not be relevant in this case - I'm just pointing out that whenever you're dealing with doing something in the future based on nothing but a timer, things tend to get complicated and lead to complex bugs down the line.

This function will take care of pausing (and optionally, resuming)
message consumption on the current topic/partition when processing
messages either within the `eachMessage` or `eachBatch` handler
functions.
@brianphillips
Copy link
Contributor Author

@Nevon thank you for your feedback. I've tweaked the implementation and interface a bit based on your suggestions (timers are left to the user, no special exceptions are used for flow control).

What's the intended use-case that makes increasing the API surface here worth it?

I do think the primary benefit of this means of pausing/resuming is being able to stop processing messages from a batch in the middle without having to keep track of whether the current message is from a topic/partition that was paused. The other thing that this helps with is the ability to pause processing without passing around a reference to the consumer object (a minor annoyance, but is one that I find annoying).

...but the same thing can be achieved in other ways.

Agreed. Would an implementation that simply checks if the current topic/partition are paused after each eachMessage invocation be preferable to a callback that handles calling consumer.pause for you and then sets a boolean to trigger the loop to break? This would allow a user to just use the previously available consumer.pause(...) method (caveat being that you have to make sure the consumer object is passed into whatever your eachMessage handler is implemented as mentioned above).

Then there's the thing about keeping the timer for resuming. Any time I see stateful code that includes timers, I get a bit nervous

💯 This is a very good point and was short-sighted on my part. I've left one vestigial convenience in here for resuming (the pause() callback itself returns a callback for resuming the same topic/partition that could be passed to setTimeout(...), if desired) but could easily be removed as well if you prefer to just make use of consumer.resume() (although, again, this requires that the caller have access to the consumer object in the same way they would if we were relying simply on consumer.pause as described above).

There are a few paths forward, as I see them:

  1. You like what you see here, maybe with some additional tweaks but we proceed
  2. You'd prefer consumers use existing consumer.pause(...) and consumer.resume(...) methods without the convenience methods provided here but you like the idea of being able to stop processing a batch of messages early by checking for paused topics/partitions after the eachMessage handler is called
  3. You don't find any of this worthwhile in which case I'll make do with the existing API and count this as a learning exercise since I understand some of the internals a lot better than when I started, and that will be perfectly fine

Let me know what you think, and thanks so much for your time reviewing and giving quality feedback.

@Nevon
Copy link
Collaborator

Nevon commented Jun 2, 2022

If the provided pause callback is just a simple convenience over consumer.pause so that you don't have to provide the topic and partition, then I think it's a reasonable convenience with a small downside. I like the design of pause returning a corresponding resume function.

However, we need to ensure that this functionality works the same whether you are pausing using pause or consumer.pause. Essentially, the implementation of pause can only be a simple delegation to consumer.pause. With the current design, the consumer will work differently depending on whether you've used the pause convenience function of consumer.pause, because only pause is affecting whether to break out of the consumer loop or not, which seems like a big footgun.

Funnily enough, someone else was doing similar work in #1382. What they found was that if you pause a topic-partition within the eachMessage or eachBatch functions, and then threw an error, the error would bubble up to the retrier and the retrier would retry processing the same batch from the now paused partition. There, the solution is to not invoke onBatch for paused topic-partitions (the list of paused topic-partitions can be gotten via the ConsumerGroup). I think very similarly, the same thing can be done here by simply bailing out of handleEachMessage/handleEachBatch if the current topic-partition is paused, but rather than keeping a local paused variable, we can expose SubscriptionState.isPaused via ConsumerGroup and check whether the current topic-partition is paused through there. That way it will work the exact same way whether you're pausing through consumer.pause or the convenience pause function. Make sense?

@Nevon
Copy link
Collaborator

Nevon commented Jun 2, 2022

Just to set expectations, I will be going on vacation tomorrow and will be back on June 25th. So if you haven't heard from me before then, I'm not ghosting you, I'm just enjoying a coconut drink on a beach. 😄

If a topic/partition is paused within the `eachMessage` or `eachBatch`
callback, we want to stop processing messages and avoid retrying the
batch or processing additional messages in the current batch.
@brianphillips brianphillips changed the title Provide a pause(optionalTimeout) helper to eachMessage/eachBatch Provide a pause() helper to eachMessage/eachBatch Jun 2, 2022
src/consumer/runner.js Outdated Show resolved Hide resolved
docs/Consuming.md Outdated Show resolved Hide resolved
docs/Consuming.md Outdated Show resolved Hide resolved
docs/Consuming.md Outdated Show resolved Hide resolved
docs/Consuming.md Outdated Show resolved Hide resolved
Copy link
Collaborator

@Nevon Nevon left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Alright, I am back from vacation and have gone through this another round. Looks good to me overall, just have some notes on the comms as well as one possibly important note on handling errors that require rejoining.

@brianphillips brianphillips requested a review from Nevon June 27, 2022 20:01
@Nevon
Copy link
Collaborator

Nevon commented Jun 28, 2022

Merged master and resolved the conflicts. With #1382 the handling of paused partitions during error handling is already taken care of.

@Nevon Nevon merged commit 196105c into tulios:master Jun 28, 2022
@brianphillips brianphillips deleted the pause-resume-helper branch June 28, 2022 14:38
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants