Skip to content

Commit

Permalink
Fix kafka broker matching when port is not set (#8613) (#8636)
Browse files Browse the repository at this point in the history
Fix kafka broker matching when port is not set and add tests
for broker matcher function.

(cherry picked from commit 1603e64)
  • Loading branch information
jsoriano authored Oct 18, 2018
1 parent f7701b2 commit 67adc41
Show file tree
Hide file tree
Showing 3 changed files with 212 additions and 22 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,7 @@ https://github.com/elastic/beats/compare/v6.4.0...6.x[Check the HEAD diff]
- Avoid mapping issues in kubernetes module. {pull}8487[8487]
- Recover metrics for old apache versions removed by mistake on #6450. {pull}7871[7871]
- Fix dropwizard module parsing of metric names. {issue}8365[8365] {pull}6385[8385]
- Fix issue that would prevent kafka module to find a proper broker when port is not set {pull}8613[8613]

*Packetbeat*

Expand Down
74 changes: 52 additions & 22 deletions metricbeat/module/kafka/broker.go
Original file line number Diff line number Diff line change
Expand Up @@ -122,7 +122,8 @@ func (b *Broker) Connect() error {
return err
}

other := findMatchingBroker(brokerAddress(b.broker), meta.Brokers)
finder := brokerFinder{Net: &defaultNet{}}
other := finder.findBroker(brokerAddress(b.broker), meta.Brokers)
if other == nil { // no broker found
closeBroker(b.broker)
return fmt.Errorf("No advertised broker with address %v found", b.Addr())
Expand Down Expand Up @@ -352,44 +353,73 @@ func checkRetryQuery(err error) (retry, reconnect bool) {
return false, false
}

func findMatchingBroker(
addr string,
brokers []*sarama.Broker,
) *sarama.Broker {
// NetInfo can be used to obtain network information
type NetInfo interface {
LookupIP(string) ([]net.IP, error)
LookupAddr(string) ([]string, error)
LocalIPAddrs() ([]net.IP, error)
Hostname() (string, error)
}

type defaultNet struct{}

// LookupIP looks up a host using the local resolver
func (m *defaultNet) LookupIP(addr string) ([]net.IP, error) {
return net.LookupIP(addr)
}

// LookupAddr returns the list of hosts resolving to an specific address
func (m *defaultNet) LookupAddr(address string) ([]string, error) {
return net.LookupAddr(address)
}

// LocalIPAddrs return the list of IP addresses configured in local network interfaces
func (m *defaultNet) LocalIPAddrs() ([]net.IP, error) {
return common.LocalIPAddrs()
}

// Hostname returns the hostname reported by the OS
func (m *defaultNet) Hostname() (string, error) {
return os.Hostname()
}

type brokerFinder struct {
Net NetInfo
}

func (m *brokerFinder) findBroker(addr string, brokers []*sarama.Broker) *sarama.Broker {
lst := brokerAddresses(brokers)
if idx, found := findMatchingAddress(addr, lst); found {
if idx, found := m.findAddress(addr, lst); found {
return brokers[idx]
}
return nil
}

func findMatchingAddress(
addr string,
brokers []string,
) (int, bool) {
func (m *brokerFinder) findAddress(addr string, brokers []string) (int, bool) {
debugf("Try to match broker to: %v", addr)

// compare connection address to list of broker addresses
if i, found := indexOf(addr, brokers); found {
return i, true
}

// get connection 'port'
host, port, err := net.SplitHostPort(addr)
if err != nil || port == "" {
host = addr
port = "9092"
}

// compare connection address to list of broker addresses
if i, found := indexOf(net.JoinHostPort(host, port), brokers); found {
return i, true
}

// lookup local machines ips for comparing with broker addresses
localIPs, err := common.LocalIPAddrs()
localIPs, err := m.Net.LocalIPAddrs()
if err != nil || len(localIPs) == 0 {
return -1, false
}
debugf("local machine ips: %v", localIPs)

// try to find broker by comparing the fqdn for each known ip to list of
// brokers
localHosts := lookupHosts(localIPs)
localHosts := m.lookupHosts(localIPs)
debugf("local machine addresses: %v", localHosts)
for _, host := range localHosts {
debugf("try to match with fqdn: %v (%v)", host, port)
Expand All @@ -401,7 +431,7 @@ func findMatchingAddress(
// try matching ip of configured host with broker list, this would
// match if hosts of advertised addresses are IPs, but configured host
// is a hostname
ips, err := net.LookupIP(host)
ips, err := m.Net.LookupIP(host)
if err == nil {
for _, ip := range ips {
addr := net.JoinHostPort(ip.String(), port)
Expand All @@ -413,7 +443,7 @@ func findMatchingAddress(

// try to find broker id by comparing the machines local hostname to
// broker hostnames in metadata
if host, err := os.Hostname(); err == nil {
if host, err := m.Net.Hostname(); err == nil {
debugf("try to match with hostname only: %v (%v)", host, port)

tmp := net.JoinHostPort(strings.ToLower(host), port)
Expand All @@ -437,7 +467,7 @@ func findMatchingAddress(
}

// lookup all ips for brokers host:
ips, err := net.LookupIP(bh)
ips, err := m.Net.LookupIP(bh)
debugf("broker %v ips: %v, %v", bh, ips, err)
if err != nil {
continue
Expand All @@ -454,15 +484,15 @@ func findMatchingAddress(
return -1, false
}

func lookupHosts(ips []net.IP) []string {
func (m *brokerFinder) lookupHosts(ips []net.IP) []string {
set := map[string]struct{}{}
for _, ip := range ips {
txt, err := ip.MarshalText()
if err != nil {
continue
}

hosts, err := net.LookupAddr(string(txt))
hosts, err := m.Net.LookupAddr(string(txt))
debugf("lookup %v => %v, %v", string(txt), hosts, err)
if err != nil {
continue
Expand Down
159 changes: 159 additions & 0 deletions metricbeat/module/kafka/broker_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,159 @@
// Licensed to Elasticsearch B.V. under one or more contributor
// license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright
// ownership. Elasticsearch B.V. licenses this file to you under
// the Apache License, Version 2.0 (the "License"); you may
// not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

package kafka

import (
"net"
"testing"

"github.com/pkg/errors"
"github.com/stretchr/testify/assert"
)

type dummyNet struct{}

func (m *dummyNet) LookupIP(addr string) ([]net.IP, error) {
dns := map[string][]net.IP{
"kafka1": []net.IP{net.IPv4(10, 0, 0, 1)},
"kafka2": []net.IP{net.IPv4(10, 0, 0, 2)},
"kafka3": []net.IP{net.IPv4(10, 0, 0, 3)},
}
ips, found := dns[addr]
if !found {
return nil, errors.New("not found")
}
return ips, nil
}

func (m *dummyNet) LookupAddr(addr string) ([]string, error) {
dns := map[string][]string{
"10.0.0.1": []string{"kafka1"},
"10.0.0.2": []string{"kafka2"},
"10.0.0.3": []string{"kafka3"},
}
names, found := dns[addr]
if !found {
return nil, errors.New("not found")
}
return names, nil
}

func (m *dummyNet) LocalIPAddrs() ([]net.IP, error) {
return []net.IP{
net.IPv4(127, 0, 0, 1),
net.IPv4(10, 0, 0, 2),
net.IPv4(10, 1, 0, 2),
}, nil
}

func (m *dummyNet) Hostname() (string, error) {
return "kafka2", nil
}

func TestFindMatchingAddress(t *testing.T) {
cases := []struct {
title string
address string
brokers []string
index int
exists bool
}{
{
title: "exists",
address: "10.0.0.2:9092",
brokers: []string{"10.0.0.1:9092", "10.0.0.2:9092"},
index: 1,
exists: true,
},
{
title: "doesn't exist",
address: "8.8.8.8:9092",
brokers: []string{"10.0.0.1:9092", "10.0.0.2:9092"},
exists: false,
},
{
title: "exists on default port",
address: "10.0.0.2",
brokers: []string{"10.0.0.1:9092", "10.0.0.2:9092"},
index: 1,
exists: true,
},
{
title: "multiple brokers on same host",
address: "127.0.0.1:9093",
brokers: []string{"127.0.0.1:9092", "127.0.0.1:9093", "127.0.0.1:9094"},
index: 1,
exists: true,
},
{
title: "hostname",
address: "kafka2:9092",
brokers: []string{"kafka1:9092", "kafka2:9092", "kafka3:9092"},
index: 1,
exists: true,
},
{
title: "hostname and default port",
address: "kafka2",
brokers: []string{"kafka1:9092", "kafka2:9092", "kafka3:9092"},
index: 1,
exists: true,
},
{
title: "hostname and default port doesn't exist",
address: "kafka2",
brokers: []string{"kafka1:9092", "kafka2:9093", "kafka3:9092"},
exists: false,
},
{
title: "hostname with ip brokers",
address: "kafka2:9092",
brokers: []string{"10.0.0.1:9092", "10.0.0.2:9092", "10.0.0.3:9092"},
index: 1,
exists: true,
},
{
title: "ip with named brokers",
address: "10.0.0.2:9092",
brokers: []string{"kafka1:9092", "kafka2:9092", "kafka3:9092"},
index: 1,
exists: true,
},
{
title: "ip with multiple local brokers without name",
address: "10.1.0.2:9094",
brokers: []string{"10.1.0.2:9092", "10.1.0.2:9093", "10.1.0.2:9094"},
index: 2,
exists: true,
},
}

finder := brokerFinder{Net: &dummyNet{}}
for _, c := range cases {
t.Run(c.title, func(t *testing.T) {
i, found := finder.findAddress(c.address, c.brokers)
if c.exists {
if assert.True(t, found, "broker expected to be found") {
assert.Equal(t, c.index, i, "incorrect broker match")
}
} else {
assert.False(t, found, "broker shouldn't be found")
}
})
}
}

0 comments on commit 67adc41

Please sign in to comment.