-
Notifications
You must be signed in to change notification settings - Fork 4.9k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
metricbeat: fix the consumergroup
of kafka module incorrectly work
#5880
Conversation
Can one of the admins verify this patch? |
_partitions := make(map[string][]int32) | ||
for topic, partitions := range topics { | ||
if topicsFilter == nil || topicsFilter(topic) { | ||
for partitionID, _ := range partitions { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
should omit 2nd value from range; this loop is equivalent to for partitionID := range ...
metricbeat/module/kafka/broker.go
Outdated
@@ -230,11 +230,16 @@ func (b *Broker) DescribeGroups( | |||
return groups, nil | |||
} | |||
|
|||
func (b *Broker) FetchGroupOffsets(group string) (*sarama.OffsetFetchResponse, error) { | |||
func (b *Broker) FetchGroupOffsets(group string, partitions map[string][]int32) (*sarama.OffsetFetchResponse, error) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
exported method Broker.FetchGroupOffsets should have comment or be unexported
_partitions := make(map[string][]int32) | ||
for topic, partitions := range topics { | ||
if topicsFilter == nil || topicsFilter(topic) { | ||
for partitionID, _ := range partitions { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
should omit 2nd value from range; this loop is equivalent to for partitionID := range ...
metricbeat/module/kafka/broker.go
Outdated
@@ -230,11 +230,16 @@ func (b *Broker) DescribeGroups( | |||
return groups, nil | |||
} | |||
|
|||
func (b *Broker) FetchGroupOffsets(group string) (*sarama.OffsetFetchResponse, error) { | |||
func (b *Broker) FetchGroupOffsets(group string, partitions map[string][]int32) (*sarama.OffsetFetchResponse, error) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
exported method Broker.FetchGroupOffsets should have comment or be unexported
consumergroup
of kafka module do not fetch anythingconsumergroup
of kafka module incorrectly work
8a04335
to
03d7f64
Compare
return err | ||
for waiting > 0 { | ||
ret := <-results | ||
waiting -= 1 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
should replace waiting -= 1 with waiting--
|
||
wg.Add(1) | ||
go func() { | ||
waiting += 1 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
should replace waiting += 1 with waiting++
03d7f64
to
e4d1877
Compare
@wangdisdu Just wanted to let you know that we saw the PR. As @urso was thinking already a bit about how to fix Kafka I'm hoping he can have a look at it. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for contributing to the kafka module. It's a great way forword. Unfortunately I don't think it fixes all issues we have: basically some consumergroup state being held by another kafka broker.
|
||
waiting := 0 | ||
for group, topics := range assignments { | ||
_partitions := make(map[string][]int32) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
no underscore please.
in beats codebase we prefer to use make/new only if there is not other syntax pattern. Use
partitions = map[string][]int32{}
How about (reduce key lookups and appends/alloc to slice):
queryTopics := map[string][]int32{}
for topic, partitions := range topics {
if topicFilter != nil && !topicFilter(topic) {
continue
}
// copy partition ids
L := len(partitions)
if L == 0 {
continue
}
ids, i := make([]int32, L), 0
for partition := range partitions {
ids[i], i = partition, i+1
}
queryTopics[topic] = ids
}
if len(queryTopics) == 0 {
continue
}
...
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
fixed: change underscore variable name
for partitionID := range partitions { | ||
if _, ok := _partitions[topic]; !ok { | ||
_partitions[topic] = make([]int32, 0) | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This if-clause is not required. _partitions[topic]
returns nil
, if topic is not known. append(([]int32)(nil), value)
returns a new []int32{value}
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
fixed: optimalized the code generate partition array
results := make(chan result, len(groups)) | ||
for _, group := range groups { | ||
group := group | ||
results := make(chan result, len(assignments)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
As the loop collecting data does not wait for wg.Wait anymore, but through all results, the wg is not really required anymore. We can even introduce an unbuffered channel make(chan result)
now.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
fixed: removed unecessary waitgroup
ret := <-results | ||
waiting-- | ||
if ret.err != nil { | ||
continue |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
originally we did return an error -> we still want to return the first error we encounter
for waiting > 0 {
ret := <- results
waiting--
if ret.err != nil {
if err == nil {
err = ret.err
}
continue
}
...
}
if err != nil {
return err
}
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
fixed: return the error if has
e4d1877
to
664fb09
Compare
Hi @ruflin @urso Thanks for your reviewed. I have adjusted this PR:
And For the issue: |
} | ||
return err | ||
continue |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
we can stop processing data after having seen an error by doing this:
for waiting > 0 {
ret := <-results
waiting--
if ret.err != nil && err == nil {
err = ret.err
}
if err != nil {
continue
}
...
}
This will keep the waiting loop alive, assign the first ret.err
only, but also stop processing any further results (results are discarded anyways).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@urso I fix it.
This might still be a problem. The consumergroup needs to correlate data, which are potentially distributed between different kafka nodes. Problem is, the metricsets in metricbeat are fully isolated. That is, even if you configure all brokers, the metricsets are running in isolation per broker -> can not correlated meta-data distributed on different brokers. See related ticket: #4285 |
664fb09
to
f0c00d1
Compare
@urso Hi, I found out that it is not a problem.
It means meta data of consumer group stored in one specific broker called coordinator. And I test and verify it on kafka 0.10.2.
|
@wangdisdu Cool. how many kafka brokers have you used? Did you use one central metricbeat, or one per host? Normally we recommend installing metricbeat on each host and configure metricbeat kafka module to connect to PR look good to me. Can you add a changelog about fixing the OffsetFetch request? |
@urso For test kafka metricset, I build a kafka cluster with 3 brokers. And I added changelog. |
…ot fetch anything The `consumergroup` fetch offset info by sending `OffsetFetch` request to Kafka. But the `topic` and `partition` parameters missed in the sent `OffsetFetch` request. These parameters is required, and kafka will do not response anything if they are missed.
f0c00d1
to
f07636a
Compare
@ruflin Will this PR be merged? So I can do more thing base on it, like batching requests together for efficiency. |
@wangdisdu how many partitions and consumer groups did you use. The ticket I mentioned uses 10 partitions and 3 consumer groups, as sometimes for less consumer groups we could see all data. Only adding more groups/partitions to the picture to show us some missing consumer groups data. |
Merged the PR. Thank you for taking the time to work on this long outstanding issue. Your help is highly appreciated ;) |
@urso It working well even if add more groups. I test and verify it:
|
@wangdisdu I can only double down on what @urso said. We really appreciate your work here and being patient with us to get the changes in. ❤️ |
Which version of metricbeat is this available on and will it work with Elasticsearch 5.3? Thanks. |
This is part of the 6.3 release. |
The
consumergroup
fetch offset info by sendingOffsetFetch
request to Kafka.But the
topic
andpartition
parameters missed in the sentOffsetFetch
request.These parameters is required, and kafka will do not response anything if they are missed.
This is Kafka protocol guide: http://kafka.apache.org/protocol.html#The_Messages_OffsetFetch