Skip to content

Commit

Permalink
metricbeat: enhance kafka broker matching
Browse files Browse the repository at this point in the history
- compare broker names to hostname
- try to lookup metricbeat host machine fqdn and compare to broker name
- compare all ips of local machine with resolved broker name ips
  • Loading branch information
urso committed Dec 13, 2016
1 parent 817465f commit 9bf4517
Show file tree
Hide file tree
Showing 2 changed files with 179 additions and 12 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ https://github.com/elastic/beats/compare/v5.1.1...master[Check the HEAD diff]
- Added new flags to import_dashboards (-cacert, -cert, -key, -insecure). {pull}3139[3139] {pull}3163[3163]

*Metricbeat*
- Kafka module broker matching enhancements. {pull}3129[3129]


*Packetbeat*
Expand Down
190 changes: 178 additions & 12 deletions metricbeat/module/kafka/partition/partition.go
Original file line number Diff line number Diff line change
@@ -1,9 +1,13 @@
package partition

import (
"bytes"
"errors"
"fmt"
"io"
"net"
"os"
"strings"
"time"

"github.com/elastic/beats/libbeat/common"
Expand Down Expand Up @@ -32,10 +36,12 @@ type MetricSet struct {
topics []string
}

var noID int32 = -1
const noID int32 = -1

var errFailQueryOffset = errors.New("operation failed")

var debugf = logp.MakeDebug("kafka")

// New create a new instance of the partition MetricSet
func New(base mb.BaseMetricSet) (mb.MetricSet, error) {
config := defaultConfig
Expand Down Expand Up @@ -91,20 +97,15 @@ func (m *MetricSet) connect() (*sarama.Broker, error) {
return nil, err
}

addr := b.Addr()
for _, other := range meta.Brokers {
if other.Addr() == addr {
m.id = other.ID()
break
}
}

if m.id == noID {
other := findMatchingBroker(b.Addr(), meta.Brokers)
if other == nil { // no broker found
closeBroker(b)
err = fmt.Errorf("No advertised broker with address %v found", addr)
return nil, err
return nil, fmt.Errorf("No advertised broker with address %v found", b.Addr())
}

debugf("found matching broker %v with id %v", other.Addr(), other.ID())

m.id = other.ID()
return b, nil
}

Expand Down Expand Up @@ -322,3 +323,168 @@ func checkRetryQuery(err error) (retry, reconnect bool) {

return false, false
}

func findMatchingBroker(
addr string,
brokers []*sarama.Broker,
) *sarama.Broker {
debugf("Try to match broker to: %v", addr)

// compare connection address to list of broker addresses
for _, b := range brokers {
if b.Addr() == addr {
return b
}
}

// get connection 'port'
_, port, err := net.SplitHostPort(addr)
if err != nil || port == "" {
port = "9092"
}

// lookup local machines ips for comparing with broker addresses
localIPs, err := interfaceIPs()
if err != nil || len(localIPs) == 0 {
return nil
}
debugf("local machine ips: %v", localIPs)

// try to find broker by comparing the fqdn for each known ip to list of
// brokers
localHosts := lookupHosts(localIPs)
debugf("local machine addresses: %v", localHosts)
for _, host := range localHosts {
debugf("try to match with fqdn: %v (%v)", host, port)

if b := findBroker(host, port, brokers); b != nil {
return b
}
}

// try to find broker id by comparing the machines local hostname to
// broker hostnames in metadata
if host, err := os.Hostname(); err == nil {
debugf("try to match with hostname only: %v (%v)", host, port)

if b := findBroker(host, port, brokers); b != nil {
return b
}
}

// lookup ips for all brokers
debugf("match by ips")
for _, b := range brokers {
debugf("test broker address: %v", b.Addr())
bh, bp, err := net.SplitHostPort(b.Addr())
if err != nil {
continue
}

// port numbers do not match
if bp != port {
continue
}

// lookup all ips for brokers host:
ips, err := net.LookupIP(bh)
debugf("broker %v ips: %v, %v", bh, ips, err)
if err != nil {
continue
}

debugf("broker (%v) ips: %v", bh, ips)

// check if ip is known
if ipsMatch(ips, localIPs) {
return b
}
}

return nil
}

func findBroker(host, port string, brokers []*sarama.Broker) *sarama.Broker {
for _, b := range brokers {
debugf("test broker address: %v", b.Addr())

bh, bp, err := net.SplitHostPort(b.Addr())
if err != nil {
debugf("failed to parse broker address: %v", err)
continue
}

if bh == host && port == bp {
return b
}
}

return nil
}

func interfaceIPs() ([]net.IP, error) {
var ips []net.IP
addrs, err := net.InterfaceAddrs()
if err != nil {
return nil, err
}
for _, addr := range addrs {
var ip net.IP
ok := true

switch v := addr.(type) {
case *net.IPNet:
ip = v.IP
case *net.IPAddr:
ip = v.IP
default:
debugf("non ip address: %v", addr)
ok = false
}

if !ok {
continue
}

ips = append(ips, ip)
}
return ips, nil
}

func lookupHosts(ips []net.IP) []string {
set := map[string]struct{}{}
for _, ip := range ips {
txt, err := ip.MarshalText()
if err != nil {
continue
}

hosts, err := net.LookupAddr(string(txt))
debugf("lookup %v => %v, %v", string(txt), hosts, err)
if err != nil {
continue
}

for _, host := range hosts {
h := strings.TrimSuffix(host, ".")
set[h] = struct{}{}
}
}

hosts := make([]string, 0, len(set))
for host := range set {
hosts = append(hosts, host)
}
return hosts
}

func ipsMatch(as, bs []net.IP) bool {
for _, a := range as {
for _, b := range bs {
if bytes.Equal(a, b) {
return true
}
}
}
return false
}

0 comments on commit 9bf4517

Please sign in to comment.