diff --git a/CHANGELOG.asciidoc b/CHANGELOG.asciidoc index 9016e090428..8ca838878b6 100644 --- a/CHANGELOG.asciidoc +++ b/CHANGELOG.asciidoc @@ -51,6 +51,7 @@ https://github.com/elastic/beats/compare/v5.1.1...master[Check the HEAD diff] - Added new flags to import_dashboards (-cacert, -cert, -key, -insecure). {pull}3139[3139] {pull}3163[3163] *Metricbeat* +- Kafka module broker matching enhancements. {pull}3129[3129] *Packetbeat* diff --git a/metricbeat/module/kafka/partition/partition.go b/metricbeat/module/kafka/partition/partition.go index 59c02e7987c..ae0c0e6f0e0 100644 --- a/metricbeat/module/kafka/partition/partition.go +++ b/metricbeat/module/kafka/partition/partition.go @@ -1,9 +1,13 @@ package partition import ( + "bytes" "errors" "fmt" "io" + "net" + "os" + "strings" "time" "github.com/elastic/beats/libbeat/common" @@ -32,10 +36,12 @@ type MetricSet struct { topics []string } -var noID int32 = -1 +const noID int32 = -1 var errFailQueryOffset = errors.New("operation failed") +var debugf = logp.MakeDebug("kafka") + // New create a new instance of the partition MetricSet func New(base mb.BaseMetricSet) (mb.MetricSet, error) { config := defaultConfig @@ -91,20 +97,15 @@ func (m *MetricSet) connect() (*sarama.Broker, error) { return nil, err } - addr := b.Addr() - for _, other := range meta.Brokers { - if other.Addr() == addr { - m.id = other.ID() - break - } - } - - if m.id == noID { + other := findMatchingBroker(b.Addr(), meta.Brokers) + if other == nil { // no broker found closeBroker(b) - err = fmt.Errorf("No advertised broker with address %v found", addr) - return nil, err + return nil, fmt.Errorf("No advertised broker with address %v found", b.Addr()) } + debugf("found matching broker %v with id %v", other.Addr(), other.ID()) + + m.id = other.ID() return b, nil } @@ -322,3 +323,168 @@ func checkRetryQuery(err error) (retry, reconnect bool) { return false, false } + +func findMatchingBroker( + addr string, + brokers []*sarama.Broker, +) *sarama.Broker { + debugf("Try to match broker to: %v", addr) + + // compare connection address to list of broker addresses + for _, b := range brokers { + if b.Addr() == addr { + return b + } + } + + // get connection 'port' + _, port, err := net.SplitHostPort(addr) + if err != nil || port == "" { + port = "9092" + } + + // lookup local machines ips for comparing with broker addresses + localIPs, err := interfaceIPs() + if err != nil || len(localIPs) == 0 { + return nil + } + debugf("local machine ips: %v", localIPs) + + // try to find broker by comparing the fqdn for each known ip to list of + // brokers + localHosts := lookupHosts(localIPs) + debugf("local machine addresses: %v", localHosts) + for _, host := range localHosts { + debugf("try to match with fqdn: %v (%v)", host, port) + + if b := findBroker(host, port, brokers); b != nil { + return b + } + } + + // try to find broker id by comparing the machines local hostname to + // broker hostnames in metadata + if host, err := os.Hostname(); err == nil { + debugf("try to match with hostname only: %v (%v)", host, port) + + if b := findBroker(host, port, brokers); b != nil { + return b + } + } + + // lookup ips for all brokers + debugf("match by ips") + for _, b := range brokers { + debugf("test broker address: %v", b.Addr()) + bh, bp, err := net.SplitHostPort(b.Addr()) + if err != nil { + continue + } + + // port numbers do not match + if bp != port { + continue + } + + // lookup all ips for brokers host: + ips, err := net.LookupIP(bh) + debugf("broker %v ips: %v, %v", bh, ips, err) + if err != nil { + continue + } + + debugf("broker (%v) ips: %v", bh, ips) + + // check if ip is known + if ipsMatch(ips, localIPs) { + return b + } + } + + return nil +} + +func findBroker(host, port string, brokers []*sarama.Broker) *sarama.Broker { + for _, b := range brokers { + debugf("test broker address: %v", b.Addr()) + + bh, bp, err := net.SplitHostPort(b.Addr()) + if err != nil { + debugf("failed to parse broker address: %v", err) + continue + } + + if bh == host && port == bp { + return b + } + } + + return nil +} + +func interfaceIPs() ([]net.IP, error) { + var ips []net.IP + addrs, err := net.InterfaceAddrs() + if err != nil { + return nil, err + } + for _, addr := range addrs { + var ip net.IP + ok := true + + switch v := addr.(type) { + case *net.IPNet: + ip = v.IP + case *net.IPAddr: + ip = v.IP + default: + debugf("non ip address: %v", addr) + ok = false + } + + if !ok { + continue + } + + ips = append(ips, ip) + } + return ips, nil +} + +func lookupHosts(ips []net.IP) []string { + set := map[string]struct{}{} + for _, ip := range ips { + txt, err := ip.MarshalText() + if err != nil { + continue + } + + hosts, err := net.LookupAddr(string(txt)) + debugf("lookup %v => %v, %v", string(txt), hosts, err) + if err != nil { + continue + } + + for _, host := range hosts { + h := strings.TrimSuffix(host, ".") + set[h] = struct{}{} + } + } + + hosts := make([]string, 0, len(set)) + for host := range set { + hosts = append(hosts, host) + } + return hosts +} + +func ipsMatch(as, bs []net.IP) bool { + for _, a := range as { + for _, b := range bs { + if bytes.Equal(a, b) { + return true + } + } + } + return false +}