Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Kafka module: query each broker all the partitions it is a leader for #16556

Closed
wants to merge 7 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
145 changes: 133 additions & 12 deletions metricbeat/module/kafka/broker.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
"net"
"os"
"strings"
"sync"
"time"

"github.com/pkg/errors"
Expand Down Expand Up @@ -73,6 +74,18 @@ type MemberDescription struct {
Topics map[string][]int32
}

type PartitionOffsets struct {
Err error
Offset int64
}

type topicPartition struct {
topic string
partition int32
}

type brokerTopicPartitionsMap map[int32][]topicPartition

const noID = -1

// NewBroker creates a new unconnected kafka Broker connection instance.
Expand Down Expand Up @@ -141,7 +154,7 @@ func (b *Broker) Connect() error {
return fmt.Errorf("No advertised broker with address %v found", b.Addr())
}

debugf("found matching broker %v with id %v", other.Addr(), other.ID())
log.Debugf("found matching broker %v with id %v", other.Addr(), other.ID())
b.id = other.ID()
b.advertisedAddr = other.Addr()

Expand Down Expand Up @@ -281,7 +294,7 @@ func (b *Broker) FetchGroupOffsets(group string, partitions map[string][]int32)
return b.broker.FetchOffset(requ)
}

// FetchPartitionOffsetFromTheLeader fetches the OffsetNewest from the leader.
// FetchPartitionOffsetFromTheLeader fetches the Newest from the leader.
func (b *Broker) FetchPartitionOffsetFromTheLeader(topic string, partitionID int32) (int64, error) {
offset, err := b.client.GetOffset(topic, partitionID, sarama.OffsetNewest)
if err != nil {
Expand All @@ -290,6 +303,114 @@ func (b *Broker) FetchPartitionOffsetFromTheLeader(topic string, partitionID int
return offset, nil
}

// FetchPartitionOffsetsForTopics fetches offsets of all partitions from its leaders.
func (b *Broker) FetchPartitionOffsetsForTopics(topics []*sarama.TopicMetadata, time int64) map[string]map[int32]PartitionOffsets {
leaderTopicPartition := b.groupBrokersPerTopicPartitions(topics)
topicPartitionPartitionOffsets := b.fetchGroupedPartitionOffsetsPerBroker(leaderTopicPartition, time)
return topicPartitionPartitionOffsets
}

func (b *Broker) groupBrokersPerTopicPartitions(topics []*sarama.TopicMetadata) brokerTopicPartitionsMap {
leaderTopicPartition := brokerTopicPartitionsMap{}
for _, topic := range topics {
for _, partition := range topic.Partitions {
if _, ok := leaderTopicPartition[partition.Leader]; !ok {
leaderTopicPartition[partition.Leader] = []topicPartition{}
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit. This initialization is not needed, append will initialize it if needed.

leaderTopicPartition[partition.Leader] = append(leaderTopicPartition[partition.Leader],
topicPartition{topic: topic.Name, partition: partition.ID})
}
}
return leaderTopicPartition
}

func (b *Broker) fetchGroupedPartitionOffsetsPerBroker(leaderTopicPartition brokerTopicPartitionsMap,
time int64) map[string]map[int32]PartitionOffsets {

var wg sync.WaitGroup
wg.Add(len(leaderTopicPartition))

queryResults := make(chan map[string]map[int32]PartitionOffsets, len(leaderTopicPartition))
defer close(queryResults)

for leader, topicPartitions := range leaderTopicPartition {
thisLeader := leader
thisTopicPartitions := topicPartitions
go func() {
queryResults <- b.queryBrokerForPartitionOffsets(thisLeader, thisTopicPartitions, time)
wg.Done()
}()
}
wg.Wait()

topicPartitionPartitionOffsets := map[string]map[int32]PartitionOffsets{}
for i := 0; i < len(leaderTopicPartition); i++ {
queryResult := <-queryResults
for topic := range queryResult {
topicPartitionPartitionOffsets[topic] = queryResult[topic]
}
}
return topicPartitionPartitionOffsets
}

func (b *Broker) queryBrokerForPartitionOffsets(brokerID int32, topicPartitions []topicPartition, time int64) map[string]map[int32]PartitionOffsets {
req := new(sarama.OffsetRequest)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Something I am missing here is that in previous implementation we were making a request per replica (the request done in b.PartitionOffset() was also modified with req.SetReplicaID()).
Are we missing the offsets of replicas now? 🤔

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It seems that this may work (replicaID = -1 means a leader), with I believe this is getting too complex.

for _, topicPartition := range topicPartitions {
req.AddBlock(topicPartition.topic, topicPartition.partition, time, 1)
}

broker, err := b.client.Leader(topicPartitions[0].topic, topicPartitions[0].partition)
if err != nil {
return b.handleBrokerQueryError(topicPartitions, err)
}

defer func() {
err := broker.Close()
if err != nil {
log.Debugf("closing broker (ID: %d) failed: ", brokerID, err)
}
}()

resp, err := broker.GetAvailableOffsets(req)
if err != nil {
err = fmt.Errorf("get available offsets failed by leader (ID: %d): %v", brokerID, err)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit. We could use errors.Wrap to add context to these errors and others added in this PR.

return b.handleBrokerQueryError(topicPartitions, err)
}

topicPartitionPartitionOffsets := map[string]map[int32]PartitionOffsets{}
for _, topicPartition := range topicPartitions {
if _, ok := topicPartitionPartitionOffsets[topicPartition.topic]; !ok {
topicPartitionPartitionOffsets[topicPartition.topic] = map[int32]PartitionOffsets{}
}

block := resp.GetBlock(topicPartition.topic, topicPartition.partition)
if len(block.Offsets) == 0 || block.Err != 0 {
err = fmt.Errorf("offsets block is invalid (topicName: %s, partitionID: %d, leaderID: %d): %v",
topicPartition.topic, topicPartition.partition, brokerID, block.Err.Error())
topicPartitionPartitionOffsets[topicPartition.topic][topicPartition.partition] = PartitionOffsets{Err: err, Offset: -1}
continue
}
topicPartitionPartitionOffsets[topicPartition.topic][topicPartition.partition] = PartitionOffsets{Offset: block.Offsets[0]}
}
return topicPartitionPartitionOffsets
}

func (b *Broker) handleBrokerQueryError(topicPartitions []topicPartition, err error) map[string]map[int32]PartitionOffsets {
topicPartitionPartitionOffsets := map[string]map[int32]PartitionOffsets{}
for _, topicPartition := range topicPartitions {
if err != nil {
if _, ok := topicPartitionPartitionOffsets[topicPartition.topic]; ok {
topicPartitionPartitionOffsets[topicPartition.topic] = map[int32]PartitionOffsets{}
}
topicPartitionPartitionOffsets[topicPartition.topic][topicPartition.partition] = PartitionOffsets{
Err: err,
Offset: -1,
}
}
}
return topicPartitionPartitionOffsets
}

// ID returns the broker ID or -1 if the broker id is unknown.
func (b *Broker) ID() int32 {
if b.id == noID {
Expand Down Expand Up @@ -418,7 +539,7 @@ func (m *brokerFinder) findBroker(addr string, brokers []*sarama.Broker) *sarama
}

func (m *brokerFinder) findAddress(addr string, brokers []string) (int, bool) {
debugf("Try to match broker to: %v", addr)
log.Debugf("Try to match broker to: %v", addr)

// get connection 'port'
host, port, err := net.SplitHostPort(addr)
Expand All @@ -437,14 +558,14 @@ func (m *brokerFinder) findAddress(addr string, brokers []string) (int, bool) {
if err != nil || len(localIPs) == 0 {
return -1, false
}
debugf("local machine ips: %v", localIPs)
log.Debugf("local machine ips: %v", localIPs)

// try to find broker by comparing the fqdn for each known ip to list of
// brokers
localHosts := m.lookupHosts(localIPs)
debugf("local machine addresses: %v", localHosts)
log.Debugf("local machine addresses: %v", localHosts)
for _, host := range localHosts {
debugf("try to match with fqdn: %v (%v)", host, port)
log.Debugf("try to match with fqdn: %v (%v)", host, port)
if i, found := indexOf(net.JoinHostPort(host, port), brokers); found {
return i, true
}
Expand All @@ -466,7 +587,7 @@ func (m *brokerFinder) findAddress(addr string, brokers []string) (int, bool) {
// try to find broker id by comparing the machines local hostname to
// broker hostnames in metadata
if host, err := m.Net.Hostname(); err == nil {
debugf("try to match with hostname only: %v (%v)", host, port)
log.Debugf("try to match with hostname only: %v (%v)", host, port)

tmp := net.JoinHostPort(strings.ToLower(host), port)
if i, found := indexOf(tmp, brokers); found {
Expand All @@ -475,9 +596,9 @@ func (m *brokerFinder) findAddress(addr string, brokers []string) (int, bool) {
}

// lookup ips for all brokers
debugf("match by ips")
log.Debugf("match by ips")
for i, b := range brokers {
debugf("test broker address: %v", b)
log.Debugf("test broker address: %v", b)
bh, bp, err := net.SplitHostPort(b)
if err != nil {
continue
Expand All @@ -490,12 +611,12 @@ func (m *brokerFinder) findAddress(addr string, brokers []string) (int, bool) {

// lookup all ips for brokers host:
ips, err := m.Net.LookupIP(bh)
debugf("broker %v ips: %v, %v", bh, ips, err)
log.Debugf("broker %v ips: %v, %v", bh, ips, err)
if err != nil {
continue
}

debugf("broker (%v) ips: %v", bh, ips)
log.Debugf("broker (%v) ips: %v", bh, ips)

// check if ip is known
if anyIPsMatch(ips, localIPs) {
Expand All @@ -515,7 +636,7 @@ func (m *brokerFinder) lookupHosts(ips []net.IP) []string {
}

hosts, err := m.Net.LookupAddr(string(txt))
debugf("lookup %v => %v, %v", string(txt), hosts, err)
log.Debugf("lookup %v => %v, %v", string(txt), hosts, err)
if err != nil {
continue
}
Expand Down
2 changes: 1 addition & 1 deletion metricbeat/module/kafka/consumergroup/consumergroup.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ type groupAssignment struct {
clientHost string
}

var debugf = logp.MakeDebug("kafka")
var log = logp.NewLogger("kafka")

// New creates a new instance of the MetricSet.
func New(base mb.BaseMetricSet) (mb.MetricSet, error) {
Expand Down
2 changes: 1 addition & 1 deletion metricbeat/module/kafka/consumergroup/query.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ func fetchGroupInfo(
return nil
}

debugf("known consumer groups: ", groups)
log.Debugf("known consumer groups: ", groups)

assignments, err := fetchGroupAssignments(b, groups)
if err != nil {
Expand Down
2 changes: 1 addition & 1 deletion metricbeat/module/kafka/kafka.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,4 +19,4 @@ package kafka

import "github.com/elastic/beats/libbeat/logp"

var debugf = logp.MakeDebug("kafka")
var log = logp.NewLogger("kafka")
jsoriano marked this conversation as resolved.
Show resolved Hide resolved
80 changes: 32 additions & 48 deletions metricbeat/module/kafka/partition/partition.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ type MetricSet struct {

var errFailQueryOffset = errors.New("operation failed")

var debugf = logp.MakeDebug("kafka")
var log = logp.NewLogger("kafka")

// New creates a new instance of the partition MetricSet.
func New(base mb.BaseMetricSet) (mb.MetricSet, error) {
Expand Down Expand Up @@ -87,7 +87,7 @@ func (m *MetricSet) Fetch(r mb.ReporterV2) error {
return errors.Wrap(err, "error getting topic metadata")
}
if len(topics) == 0 {
debugf("no topic could be read, check ACLs")
log.Debugf("no topic could be read, check ACLs")
return nil
}

Expand All @@ -96,8 +96,10 @@ func (m *MetricSet) Fetch(r mb.ReporterV2) error {
"address": broker.AdvertisedAddr(),
}

topicPartitionPartitionOldestOffsets := broker.FetchPartitionOffsetsForTopics(topics, sarama.OffsetOldest)
topicPartitionPartitionNewestOffsets := broker.FetchPartitionOffsetsForTopics(topics, sarama.OffsetNewest)

for _, topic := range topics {
debugf("fetch events for topic: ", topic.Name)
evtTopic := common.MapStr{
"name": topic.Name,
}
Expand All @@ -109,36 +111,25 @@ func (m *MetricSet) Fetch(r mb.ReporterV2) error {
}

for _, partition := range topic.Partitions {
// partition offsets can be queried from leader only
if broker.ID() != partition.Leader {
debugf("broker is not leader (broker=%v, leader=%v)", broker.ID(), partition.Leader)
continue
}

// collect offsets for all replicas
for _, id := range partition.Replicas {

// Get oldest and newest available offsets
offOldest, offNewest, offOK, err := queryOffsetRange(broker, id, topic.Name, partition.ID)
jsoriano marked this conversation as resolved.
Show resolved Hide resolved

if !offOK {
if err == nil {
err = errFailQueryOffset
}

msg := fmt.Errorf("Failed to query kafka partition (%v:%v) offsets: %v",
topic.Name, partition.ID, err)
m.Logger().Warn(msg)
r.Error(msg)
for _, replicaID := range partition.Replicas {
oldestPartitionOffsets, err := m.selectPartitionOffsets(topicPartitionPartitionOldestOffsets, topic, partition)
if err != nil {
m.reportPartitionOffsetsError(r, err)
continue
}
newestPartitionOffsets, err := m.selectPartitionOffsets(topicPartitionPartitionNewestOffsets, topic, partition)
if err != nil {
m.reportPartitionOffsetsError(r, err)
continue
}

partitionEvent := common.MapStr{
"id": partition.ID,
"leader": partition.Leader,
"replica": id,
"is_leader": partition.Leader == id,
"insync_replica": hasID(id, partition.Isr),
"replica": replicaID,
"is_leader": partition.Leader == replicaID,
"insync_replica": hasID(replicaID, partition.Isr),
}

if partition.Err != 0 {
Expand All @@ -149,7 +140,7 @@ func (m *MetricSet) Fetch(r mb.ReporterV2) error {

// Helpful IDs to avoid scripts on queries
partitionTopicID := fmt.Sprintf("%d-%s", partition.ID, topic.Name)
partitionTopicBrokerID := fmt.Sprintf("%s-%d", partitionTopicID, id)
partitionTopicBrokerID := fmt.Sprintf("%s-%d", partitionTopicID, replicaID)

// create event
event := common.MapStr{
Expand All @@ -162,8 +153,8 @@ func (m *MetricSet) Fetch(r mb.ReporterV2) error {
"broker": evtBroker,
"partition": partitionEvent,
"offset": common.MapStr{
"newest": offNewest,
"oldest": offOldest,
"newest": newestPartitionOffsets.Offset,
"oldest": oldestPartitionOffsets.Offset,
},
}

Expand All @@ -184,27 +175,20 @@ func (m *MetricSet) Fetch(r mb.ReporterV2) error {
return nil
}

// queryOffsetRange queries the broker for the oldest and the newest offsets in
// a kafka topics partition for a given replica.
func queryOffsetRange(
b *kafka.Broker,
replicaID int32,
topic string,
partition int32,
) (int64, int64, bool, error) {
oldest, err := b.PartitionOffset(replicaID, topic, partition, sarama.OffsetOldest)
if err != nil {
return -1, -1, false, errors.Wrap(err, "failed to get oldest offset")
}

newest, err := b.PartitionOffset(replicaID, topic, partition, sarama.OffsetNewest)
if err != nil {
return -1, -1, false, errors.Wrap(err, "failed to get newest offset")
func (m *MetricSet) selectPartitionOffsets(topicPartitionPartitionOffsets map[string]map[int32]kafka.PartitionOffsets,
topic *sarama.TopicMetadata, partition *sarama.PartitionMetadata) (offsets kafka.PartitionOffsets, err error) {
offsets = topicPartitionPartitionOffsets[topic.Name][partition.ID]
if offsets.Err != nil {
err = fmt.Errorf("failed to query kafka partition (%v:%v) offsets: %v",
topic.Name, partition.ID, offsets.Err)
return
}
return
}

okOld := oldest != -1
okNew := newest != -1
return oldest, newest, okOld && okNew, nil
func (m *MetricSet) reportPartitionOffsetsError(r mb.ReporterV2, err error) {
m.Logger().Warn(err)
r.Error(err)
}

func hasID(id int32, lst []int32) bool {
Expand Down