Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fixed ConsumerGroup flooding logs with client/metadata update req #1544 #1578

Merged
merged 3 commits into from
Jan 22, 2020

Conversation

antsbean
Copy link
Contributor

reduce refresh metadata frequency to avoid flood logs

@dnwe
Copy link
Collaborator

dnwe commented Jan 16, 2020

@antsbean I think this PR looks good for fixing the problem described in #1544, particularly as you no-longer call RefreshMetadata in topicToPartitionNumbers(...) so that is being handled by the client's existing backgroundMetadataUpdater(...) — so I think one of the maintainers should be fine to review and merge this.

Ideally I think it would make sense rather than having this polling background thread on a Ticker if we had the client expose a slice of channels that we could AddMetadataSubscriber/RemoveMetadataSubscriber on, to receive a broadcast notification whenever the client's updateMetadata(...) func has been called with new metadata information

@antsbean
Copy link
Contributor Author

@dnwe your think is good, but

After reviewing the relevant code of kakfa consumer, I think that the consumer group will not call RefreshMata after receiving the message normally from kafka server, it will only call it when brokerConsumer receive error from kafka server.

please see code


func (bc *brokerConsumer) handleResponses() {
	for child := range bc.subscriptions {
		....
		switch result {
		case nil:
                      ...
		case ErrUnknownTopicOrPartition, ErrNotLeaderForPartition, ErrLeaderNotAvailable, ErrReplicaNotAvailable:
			....
			child.trigger <- none{}
			delete(bc.subscriptions, child)
		default:
			....
			child.trigger <- none{}
		}
	}
}

child.trigger will notify

func (child *partitionConsumer) dispatcher() {
	for range child.trigger {
                         .....
			if err := child.dispatch(); err != nil {
				child.sendError(err)
				child.trigger <- none{}
			}
		}
	}
....
}

then

child.dispatch() will  call RefreshMetadata()

@dnwe
Copy link
Collaborator

dnwe commented Jan 16, 2020

I'm not sure I understand what you mean? The code you link to is fine, that's just the consumer forcing a refresh of metadata for specific topics when a partitionConsumer received an error (e.g., if leadership has changed or whatever) — it doesn't need to refresh the metadata whilst it is receiving messages successfully.

The backgroundMetadataUpdater(...) I referred to is is the goroutine kicked off in NewClient(...) in client.go which periodically refreshes the metadata on a fixed cadence (client.conf.Metadata.RefreshFrequency) whatever is happening with the cluster

@antsbean
Copy link
Contributor Author

@dnwe I mean
if implement AddMetadataSubscriber/RemoveMetadataSubscriber function
but kafka consumer will not call RefreshMetadata normally lead to AddMetadataSubscriber/RemoveMetadataSubscriber invalid

@dnwe
Copy link
Collaborator

dnwe commented Jan 16, 2020

@antsbean I still don't understand, but that's ok 😀

FYI your branch needs rebasing

branch is 2 commits ahead, 27 commits behind Shopify:master.

@dnwe
Copy link
Collaborator

dnwe commented Jan 22, 2020

@d1egoaz can you review and merge this one? Whilst it may not 100% be the final solution we'd want, it does fix #1544 which is in the current release-tagged version so it'd be good to push it through

We should probably merge your zstd PR and bundle up any other fixes from the PR's queue and then cut another release version really

@d1egoaz
Copy link
Contributor

d1egoaz commented Jan 22, 2020

if this is fixing #1544 we should merge it now.

But we should loop back to what @dnwe suggested here:
https://github.com/Shopify/sarama/pull/1578/files#issuecomment-575085696

@d1egoaz d1egoaz merged commit 2ead77f into IBM:master Jan 22, 2020
@d1egoaz
Copy link
Contributor

d1egoaz commented Jan 22, 2020

thanks for the contribution @antsbean

@d1egoaz
Copy link
Contributor

d1egoaz commented Jun 29, 2020

friends, @antsbean @dnwe

if topicToPartitionNumbers is not refreshing the metadata anymore, what is this function doing?

it only calls if oldTopicToPartitionNum, err = c.topicToPartitionNumbers(topics); err != nil {
which doesn't change any state of any object.

I wonder if the original intention of #1525 was lost here, and this doesn't make any sense now.

@dnwe
Copy link
Collaborator

dnwe commented Jun 29, 2020

@d1egoaz the client is already refreshing its metadata in the background goroutine so topicToPartitionNumbers will get the last known state from that refresh when called by the ticker. If the number of partitions for any topic differs to the one stored in the ticker loop then that returns out the for loop, calling session.cancel() which will force the consumer to leave and rejoin the group

@d1egoaz
Copy link
Contributor

d1egoaz commented Jun 29, 2020

so, https://github.com/Shopify/sarama/blob/221ed1a25150fa98d750f2e2f33f58ff018238b4/client.go#L901
will eventually be called by the background backgroundMetadataUpdater

Thanks @dnwe that makes sense now!

@dnwe
Copy link
Collaborator

dnwe commented Jun 29, 2020

Yep I think so. So we should detect new partitions (if a topic is scaled) in less than client.conf.Metadata.RefreshFrequency time

@cheaster062211
Copy link

cheaster062211 commented Apr 12, 2023

friends, @antsbean @dnwe

if topicToPartitionNumbers is not refreshing the metadata anymore, what is this function doing?

it only calls if oldTopicToPartitionNum, err = c.topicToPartitionNumbers(topics); err != nil { which doesn't change any state of any object.

I wonder if the original intention of #1525 was lost here, and this doesn't make any sense now.

so,

https://github.com/Shopify/sarama/blob/221ed1a25150fa98d750f2e2f33f58ff018238b4/client.go#L901

will eventually be called by the background backgroundMetadataUpdater
Thanks @dnwe that makes sense now!

I have a question,if oldTopicToPartitionNum, err = c.topicToPartitionNumbers(topics); err != nil {
return
}
When this oldTopicToPartitionNum already changed , and subsequent checks based on the changed number, then cannot trigger the rebalance.
In extreme cases, this oldTopicToPartitionNum may not be the number of sessions that were created, making subsequent checks invalid.
The root cause is no reset timer in backgroundMetadataUpdater function, executing refreshMetadata before loopCheckPartitionNumbers ,so refreshed the partition number @dnwe

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants