Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Pause/Resume Semantics Question #280

Closed
3 of 7 tasks
sachnk opened this issue Feb 7, 2019 · 3 comments
Closed
3 of 7 tasks

Pause/Resume Semantics Question #280

sachnk opened this issue Feb 7, 2019 · 3 comments

Comments

@sachnk
Copy link

sachnk commented Feb 7, 2019

Description

This is more of a question than an issue. I'm trying to utilize the Pause() and Resume() functions of the go client, but I'd like to make sure I understand the semantic guarantees these functions have.

Consider:

	for {
		select {
		default:
			ev := c.Poll(100)
			if ev == nil {
				continue
			}

                         c.Pause(...)
            }

In the above snippet, after the call to Pause(), am I guaranteed that the next invocation of Poll() won't return a message? Or does pause behave more in a best-effort fashion where eventually poll stops returning messages after some queues deplete?

I tested this locally, and it seems like it's a strong guarantee that poll won't return messages while paused -- but it's possible there are edge cases I'm not capturing.

Checklist

Please provide the following information:

  • confluent-kafka-go and librdkafka version (LibraryVersion()): 0.11.6
  • Apache Kafka broker version: confluentinc/cp-enterprise-kafka:5.1.0
  • Client configuration: ConfigMap{...}
  • Operating system: Mac OS
  • Provide client logs (with "debug": ".." as necessary)
  • Provide broker log excerpts
  • Critical issue
@edenhill
Copy link
Contributor

edenhill commented Feb 7, 2019

In your example you are guaranteed not to receive any more messages until you call Resume(), but you may get other event types (rebalances, errors, etc..).

Do note that you may only Pause() partitions that have been Assign():ed (either explicitly, or implicitly if you don't have a rebalance handler).

Also note that Pause() will purge the local fetch queue of pre-fetched messages.
If you do alot of pausing and resuming you probably want to minimize the fetch queue to avoid extra network utilization as messages are re-fetched and then thrown away.
See queued.min.messages.

@kpiyush17
Copy link

@edenhill
If the consumer is paused and rebalance happens, does after resume, is it possible it gets assigned to a different partition? If yes, it there a way to guarantee the same partition assignment after rebalancing, in case of pause/resume scenario or in general?

@edenhill
Copy link
Contributor

All currently assigned partitions are revoked on rebalance before any new partitions are assigned. The application should call resume() on the partitions it has paused when they are recoved as to reset them to a clean state.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

3 participants