Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Kafka module: query each broker all the partitions it is a leader for #16556

Closed
wants to merge 7 commits into from
Closed
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
113 changes: 101 additions & 12 deletions metricbeat/module/kafka/broker.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,15 @@ type MemberDescription struct {
Topics map[string][]int32
}

type PartitionOffsets struct {
Err error
Offset int64
}

type brokerTopicPartitionsMap map[int32]map[string]int32

type brokersMap map[int32]*sarama.Broker

const noID = -1

// NewBroker creates a new unconnected kafka Broker connection instance.
Expand Down Expand Up @@ -141,7 +150,7 @@ func (b *Broker) Connect() error {
return fmt.Errorf("No advertised broker with address %v found", b.Addr())
}

debugf("found matching broker %v with id %v", other.Addr(), other.ID())
log.Debugf("found matching broker %v with id %v", other.Addr(), other.ID())
b.id = other.ID()
b.advertisedAddr = other.Addr()

Expand Down Expand Up @@ -281,7 +290,7 @@ func (b *Broker) FetchGroupOffsets(group string, partitions map[string][]int32)
return b.broker.FetchOffset(requ)
}

// FetchPartitionOffsetFromTheLeader fetches the OffsetNewest from the leader.
// FetchPartitionOffsetFromTheLeader fetches the Newest from the leader.
func (b *Broker) FetchPartitionOffsetFromTheLeader(topic string, partitionID int32) (int64, error) {
offset, err := b.client.GetOffset(topic, partitionID, sarama.OffsetNewest)
if err != nil {
Expand All @@ -290,6 +299,86 @@ func (b *Broker) FetchPartitionOffsetFromTheLeader(topic string, partitionID int
return offset, nil
}

// FetchPartitionOffsetsForTopics fetches offsets of all partitions from its leaders.
func (b *Broker) FetchPartitionOffsetsForTopics(topics []*sarama.TopicMetadata, time int64) map[string]map[int32]*PartitionOffsets {
jsoriano marked this conversation as resolved.
Show resolved Hide resolved
leaderTopicPartition, leaderBrokers := b.groupBrokersPerTopicPartitions(topics)
defer b.closeLeaders(leaderBrokers)

topicPartitionPartitionOffsets := b.fetchGroupedPartitionOffsetsPerBroker(leaderTopicPartition, leaderBrokers, time)
return topicPartitionPartitionOffsets
}

func (b *Broker) groupBrokersPerTopicPartitions(topics []*sarama.TopicMetadata) (brokerTopicPartitionsMap, brokersMap) {
leaderTopicPartition := brokerTopicPartitionsMap{}
leaderBrokers := brokersMap{}

for _, topic := range topics {
for _, partition := range topic.Partitions {
broker, err := b.client.Leader(topic.Name, partition.ID)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Leader id is available in partition.Leader. We could use this id for the grouping of partitions per broker.

If we continue using b.client.Leader() we have to remember to Close() the returned broker. Maybe we can use b.client.Brokers() to look for the broker per id.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Using the method b.client.Leader(topic, partition) will always return the most actual leader (there might be a metadata update in background, right?).

The method b.client.Brokers() returns brokers without opening connections to them. To establish a connection, I would need a configuration structure: broker.Open(conf *Config). Leader() handles it on its own.

Regarding Close(), I think you're right.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There can still be a problem with this approach, here we are calling Leader for each topic and partition, this may open too many connections to brokers, and we may be leaking connections because we only keep track of one connection per broker in leaderBrokers.

Using the method b.client.Leader(topic, partition) will always return the most actual leader (there might be a metadata update in background, right?).

This is right, but between this moment and the moment we make the offsets request there can still be some metadata change, if we want to solve this for good (not sure if it worths it) we would need to handle leadership errors (second option in #13380).

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Answered below.

if err != nil {
log.Errorf("connecting to leader broker failed (topicName: %s, partitionID: %d): %v",
topic.Name, partition.ID, err)
continue
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit. This initialization is not needed, append will initialize it if needed.


if _, ok := leaderTopicPartition[broker.ID()]; !ok {
leaderTopicPartition[broker.ID()] = map[string]int32{}
}
leaderTopicPartition[broker.ID()][topic.Name] = partition.ID
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it safe to assume that there cannot be two partitions for the same topic in the same broker? (Probably, but I am not sure about that)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There might be a case in which the number of Kafka brokers is lower than number of Kafka partitions of the same topic, so would rather keep this map as is.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As discussed offline, we may need to list multiple partitions for the same topic in the same broker.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed.

leaderBrokers[broker.ID()] = broker
}
}
return leaderTopicPartition, leaderBrokers
}

func (b *Broker) fetchGroupedPartitionOffsetsPerBroker(leaderTopicPartition brokerTopicPartitionsMap,
leaderBrokers brokersMap, time int64) map[string]map[int32]*PartitionOffsets {
topicPartitionPartitionOffsets := map[string]map[int32]*PartitionOffsets{}

for leader, topicPartition := range leaderTopicPartition {
req := new(sarama.OffsetRequest)
for topic, partition := range leaderTopicPartition[leader] {
req.AddBlock(topic, partition, time, 1)
}

resp, err := leaderBrokers[leader].GetAvailableOffsets(req)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit and probably unneeded optimizatiod 😄: we could parallelize requests per leader.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed.

if err != nil {
log.Errorf("get available offsets failed by leader (ID: %d): %v", leader, err)
continue
}

for topic, partition := range topicPartition {
if _, ok := topicPartitionPartitionOffsets[topic]; !ok {
topicPartitionPartitionOffsets[topic] = map[int32]*PartitionOffsets{}
}

if err != nil {
topicPartitionPartitionOffsets[topic][partition] = &PartitionOffsets{Err: err, Offset: -1}
continue
}

block := resp.GetBlock(topic, partition)
if len(block.Offsets) == 0 || block.Err != 0 {
err = fmt.Errorf("block offsets is invalid (topicName: %s, partitionID: %d, leaderID: %d): %v",
topic, partition, leader, block.Err.Error())
topicPartitionPartitionOffsets[topic][partition] = &PartitionOffsets{Err: err, Offset: -1}
continue
}
topicPartitionPartitionOffsets[topic][partition] = &PartitionOffsets{Offset: block.Offsets[0]}
}
}
return topicPartitionPartitionOffsets
}

func (b *Broker) closeLeaders(leaderBrokers brokersMap) {
for id, broker := range leaderBrokers {
err := broker.Close()
if err != nil {
log.Debugf("closing broker (ID: %d) failed: ", id, err)
}
}
}

// ID returns the broker ID or -1 if the broker id is unknown.
func (b *Broker) ID() int32 {
if b.id == noID {
Expand Down Expand Up @@ -418,7 +507,7 @@ func (m *brokerFinder) findBroker(addr string, brokers []*sarama.Broker) *sarama
}

func (m *brokerFinder) findAddress(addr string, brokers []string) (int, bool) {
debugf("Try to match broker to: %v", addr)
log.Debugf("Try to match broker to: %v", addr)

// get connection 'port'
host, port, err := net.SplitHostPort(addr)
Expand All @@ -437,14 +526,14 @@ func (m *brokerFinder) findAddress(addr string, brokers []string) (int, bool) {
if err != nil || len(localIPs) == 0 {
return -1, false
}
debugf("local machine ips: %v", localIPs)
log.Debugf("local machine ips: %v", localIPs)

// try to find broker by comparing the fqdn for each known ip to list of
// brokers
localHosts := m.lookupHosts(localIPs)
debugf("local machine addresses: %v", localHosts)
log.Debugf("local machine addresses: %v", localHosts)
for _, host := range localHosts {
debugf("try to match with fqdn: %v (%v)", host, port)
log.Debugf("try to match with fqdn: %v (%v)", host, port)
if i, found := indexOf(net.JoinHostPort(host, port), brokers); found {
return i, true
}
Expand All @@ -466,7 +555,7 @@ func (m *brokerFinder) findAddress(addr string, brokers []string) (int, bool) {
// try to find broker id by comparing the machines local hostname to
// broker hostnames in metadata
if host, err := m.Net.Hostname(); err == nil {
debugf("try to match with hostname only: %v (%v)", host, port)
log.Debugf("try to match with hostname only: %v (%v)", host, port)

tmp := net.JoinHostPort(strings.ToLower(host), port)
if i, found := indexOf(tmp, brokers); found {
Expand All @@ -475,9 +564,9 @@ func (m *brokerFinder) findAddress(addr string, brokers []string) (int, bool) {
}

// lookup ips for all brokers
debugf("match by ips")
log.Debugf("match by ips")
for i, b := range brokers {
debugf("test broker address: %v", b)
log.Debugf("test broker address: %v", b)
bh, bp, err := net.SplitHostPort(b)
if err != nil {
continue
Expand All @@ -490,12 +579,12 @@ func (m *brokerFinder) findAddress(addr string, brokers []string) (int, bool) {

// lookup all ips for brokers host:
ips, err := m.Net.LookupIP(bh)
debugf("broker %v ips: %v, %v", bh, ips, err)
log.Debugf("broker %v ips: %v, %v", bh, ips, err)
if err != nil {
continue
}

debugf("broker (%v) ips: %v", bh, ips)
log.Debugf("broker (%v) ips: %v", bh, ips)

// check if ip is known
if anyIPsMatch(ips, localIPs) {
Expand All @@ -515,7 +604,7 @@ func (m *brokerFinder) lookupHosts(ips []net.IP) []string {
}

hosts, err := m.Net.LookupAddr(string(txt))
debugf("lookup %v => %v, %v", string(txt), hosts, err)
log.Debugf("lookup %v => %v, %v", string(txt), hosts, err)
if err != nil {
continue
}
Expand Down
2 changes: 1 addition & 1 deletion metricbeat/module/kafka/consumergroup/consumergroup.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ type groupAssignment struct {
clientHost string
}

var debugf = logp.MakeDebug("kafka")
var log = logp.NewLogger("kafka")

// New creates a new instance of the MetricSet.
func New(base mb.BaseMetricSet) (mb.MetricSet, error) {
Expand Down
2 changes: 1 addition & 1 deletion metricbeat/module/kafka/consumergroup/query.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ func fetchGroupInfo(
return nil
}

debugf("known consumer groups: ", groups)
log.Debugf("known consumer groups: ", groups)

assignments, err := fetchGroupAssignments(b, groups)
if err != nil {
Expand Down
2 changes: 1 addition & 1 deletion metricbeat/module/kafka/kafka.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,4 +19,4 @@ package kafka

import "github.com/elastic/beats/libbeat/logp"

var debugf = logp.MakeDebug("kafka")
var log = logp.NewLogger("kafka")
jsoriano marked this conversation as resolved.
Show resolved Hide resolved
83 changes: 35 additions & 48 deletions metricbeat/module/kafka/partition/partition.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ type MetricSet struct {

var errFailQueryOffset = errors.New("operation failed")

var debugf = logp.MakeDebug("kafka")
var log = logp.NewLogger("kafka")

// New creates a new instance of the partition MetricSet.
func New(base mb.BaseMetricSet) (mb.MetricSet, error) {
Expand Down Expand Up @@ -87,7 +87,7 @@ func (m *MetricSet) Fetch(r mb.ReporterV2) error {
return errors.Wrap(err, "error getting topic metadata")
}
if len(topics) == 0 {
debugf("no topic could be read, check ACLs")
log.Debugf("no topic could be read, check ACLs")
return nil
}

Expand All @@ -96,8 +96,10 @@ func (m *MetricSet) Fetch(r mb.ReporterV2) error {
"address": broker.AdvertisedAddr(),
}

topicPartitionPartitionOldestOffsets := broker.FetchPartitionOffsetsForTopics(topics, sarama.OffsetOldest)
topicPartitionPartitionNewestOffsets := broker.FetchPartitionOffsetsForTopics(topics, sarama.OffsetNewest)

for _, topic := range topics {
debugf("fetch events for topic: ", topic.Name)
evtTopic := common.MapStr{
"name": topic.Name,
}
Expand All @@ -109,36 +111,25 @@ func (m *MetricSet) Fetch(r mb.ReporterV2) error {
}

for _, partition := range topic.Partitions {
// partition offsets can be queried from leader only
if broker.ID() != partition.Leader {
debugf("broker is not leader (broker=%v, leader=%v)", broker.ID(), partition.Leader)
continue
}

// collect offsets for all replicas
for _, id := range partition.Replicas {

// Get oldest and newest available offsets
offOldest, offNewest, offOK, err := queryOffsetRange(broker, id, topic.Name, partition.ID)
jsoriano marked this conversation as resolved.
Show resolved Hide resolved

if !offOK {
if err == nil {
err = errFailQueryOffset
}

msg := fmt.Errorf("Failed to query kafka partition (%v:%v) offsets: %v",
topic.Name, partition.ID, err)
m.Logger().Warn(msg)
r.Error(msg)
for _, replicaID := range partition.Replicas {
oldestPartitionOffsets, err := m.selectPartitionOffsets(topicPartitionPartitionOldestOffsets, topic, partition)
if err != nil {
m.reportPartitionOffsetsError(r, err)
continue
}
newestPartitionOffsets, err := m.selectPartitionOffsets(topicPartitionPartitionNewestOffsets, topic, partition)
if err != nil {
m.reportPartitionOffsetsError(r, err)
continue
}

partitionEvent := common.MapStr{
"id": partition.ID,
"leader": partition.Leader,
"replica": id,
"is_leader": partition.Leader == id,
"insync_replica": hasID(id, partition.Isr),
"replica": replicaID,
"is_leader": partition.Leader == replicaID,
"insync_replica": hasID(replicaID, partition.Isr),
}

if partition.Err != 0 {
Expand All @@ -149,7 +140,7 @@ func (m *MetricSet) Fetch(r mb.ReporterV2) error {

// Helpful IDs to avoid scripts on queries
partitionTopicID := fmt.Sprintf("%d-%s", partition.ID, topic.Name)
partitionTopicBrokerID := fmt.Sprintf("%s-%d", partitionTopicID, id)
partitionTopicBrokerID := fmt.Sprintf("%s-%d", partitionTopicID, replicaID)

// create event
event := common.MapStr{
Expand All @@ -162,8 +153,8 @@ func (m *MetricSet) Fetch(r mb.ReporterV2) error {
"broker": evtBroker,
"partition": partitionEvent,
"offset": common.MapStr{
"newest": offNewest,
"oldest": offOldest,
"newest": newestPartitionOffsets.Offset,
"oldest": oldestPartitionOffsets.Offset,
},
}

Expand All @@ -184,27 +175,23 @@ func (m *MetricSet) Fetch(r mb.ReporterV2) error {
return nil
}

// queryOffsetRange queries the broker for the oldest and the newest offsets in
// a kafka topics partition for a given replica.
func queryOffsetRange(
b *kafka.Broker,
replicaID int32,
topic string,
partition int32,
) (int64, int64, bool, error) {
oldest, err := b.PartitionOffset(replicaID, topic, partition, sarama.OffsetOldest)
if err != nil {
return -1, -1, false, errors.Wrap(err, "failed to get oldest offset")
}

newest, err := b.PartitionOffset(replicaID, topic, partition, sarama.OffsetNewest)
if err != nil {
return -1, -1, false, errors.Wrap(err, "failed to get newest offset")
func (m *MetricSet) selectPartitionOffsets(topicPartitionPartitionOffsets map[string]map[int32]*kafka.PartitionOffsets,
topic *sarama.TopicMetadata, partition *sarama.PartitionMetadata) (*kafka.PartitionOffsets, error) {
offsets := topicPartitionPartitionOffsets[topic.Name][partition.ID]
if offsets == nil {
err := fmt.Errorf("no partition offsets defined (%v:%v)", topic.Name, partition.ID)
return nil, err
} else if offsets.Err != nil {
err := fmt.Errorf("failed to query kafka partition (%v:%v) offsets: %v",
topic.Name, partition.ID, offsets.Err)
return nil, err
}
return offsets, nil
}

okOld := oldest != -1
okNew := newest != -1
return oldest, newest, okOld && okNew, nil
func (m *MetricSet) reportPartitionOffsetsError(r mb.ReporterV2, err error) {
m.Logger().Warn(err)
r.Error(err)
}

func hasID(id int32, lst []int32) bool {
Expand Down