Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Refactor subscriber #121

Merged
merged 10 commits into from
Oct 25, 2017
Merged

Refactor subscriber #121

merged 10 commits into from
Oct 25, 2017

Conversation

horkhe
Copy link
Member

@horkhe horkhe commented Oct 23, 2017

Subscriber used to set watches only on the change in the number of consumer group members. To detect changes in member subscriptions (list of topics they consume) a trick had been used to remove registration and then add it again with an updated topic list. To avoid unnecessary rebalancing on such temporary deregistrations an internal RebalancingDelay timeout was used to postpone rebalancing in expectation that registration may follow.

In this PR we vendored a mailgun branch of kazoo library that allows setting watches on member subscriptions so that deregistration to update a list of topics became unnecessary. As a result the logic became simpler.

Besides a several unnecessary optimizations were removed and one of them was a cause of #120.

config/config.go Outdated
@@ -419,7 +417,7 @@ func defaultProxyWithClientID(clientID string) *Proxy {
c.Producer.RetryMax = 6
c.Producer.ShutdownTimeout = 30 * time.Second

c.Consumer.AckTimeout = 15 * time.Second
c.Consumer.AckTimeout = 45 * time.Second
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why are the Ack timeouts so aggressive, If scout moves to a model of only ack after processing an event successfully we will hit this timeout in the case of a network outage or temporary disruption. In the case of a temporary outage, this will result in the event being processed twice by scout. It is better to have a longer timeout to mitigate such instances. 300 seconds is relatively standard for network communication. In scouts case gocql has a network "connectivity" timeout of 30 seconds, but will retry each node in the cluster with a 3 seconds connection timeout. Which means by the time gocql has returned an error, scout will log the event as un-processed and will not ack the event. scout will then ask pixy for the same event again and try processing the event again, all within the 300 seconds ack timeout window.

Also, shouldn't c.Consumer.SubscriptionTimeout be larger than the AckTimeout ?

Your thoughts please,

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can make it 300 seconds, no problem. But note that we already override this default in Radar setting it to 90 seconds.

There used to be a requirement to make SubscriptionTimeout larger than AckTimeout for otherwise AckTimeout was not honored on rebalancing, but there is no such requirement anymore. A relation between the timeouts is as follows: if consume requests for a group-topic stop coming for SubscritionTimeout, then rebalancing will happen as soon as the last pending request is acked, or AckTimeout elapses. Until rebalancing is triggered any incoming requests will reset SubscritionTimeout and consumption of the group-topic will be resumed.

Note that Scout consumes events in auto-ack mode so AckTimeout is not used.

@thrawn01
Copy link
Contributor

Alot of this is over my head, because I don't have a full running theory of how kafka-pixy works. At some point in the future I would love to corner you and ask a bunch of questions. =)

@horkhe horkhe merged commit 6354fd1 into master Oct 25, 2017
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants