Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix kafka broker matching when port is not set #8613

Merged
merged 2 commits into from
Oct 17, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,7 @@ https://github.com/elastic/beats/compare/v6.4.0...master[Check the HEAD diff]
- Fix dropwizard module parsing of metric names. {issue}8365[8365] {pull}6385[8385]
- Added io disk read and write times to system module {issue}8473[8473] {pull}8508[8508]
- Avoid mapping issues in kubernetes module. {pull}8487[8487]
- Fix issue that would prevent kafka module to find a proper broker when port is not set {pull}8613[8613]

*Packetbeat*

Expand Down
74 changes: 52 additions & 22 deletions metricbeat/module/kafka/broker.go
Original file line number Diff line number Diff line change
Expand Up @@ -122,7 +122,8 @@ func (b *Broker) Connect() error {
return err
}

other := findMatchingBroker(brokerAddress(b.broker), meta.Brokers)
finder := brokerFinder{Net: &defaultNet{}}
other := finder.findBroker(brokerAddress(b.broker), meta.Brokers)
if other == nil { // no broker found
closeBroker(b.broker)
return fmt.Errorf("No advertised broker with address %v found", b.Addr())
Expand Down Expand Up @@ -352,44 +353,73 @@ func checkRetryQuery(err error) (retry, reconnect bool) {
return false, false
}

func findMatchingBroker(
addr string,
brokers []*sarama.Broker,
) *sarama.Broker {
// NetInfo can be used to obtain network information
type NetInfo interface {
jsoriano marked this conversation as resolved.
Show resolved Hide resolved
LookupIP(string) ([]net.IP, error)
LookupAddr(string) ([]string, error)
LocalIPAddrs() ([]net.IP, error)
Hostname() (string, error)
}

type defaultNet struct{}

// LookupIP looks up a host using the local resolver
func (m *defaultNet) LookupIP(addr string) ([]net.IP, error) {
return net.LookupIP(addr)
}

// LookupAddr returns the list of hosts resolving to an specific address
func (m *defaultNet) LookupAddr(address string) ([]string, error) {
return net.LookupAddr(address)
}

// LocalIPAddrs return the list of IP addresses configured in local network interfaces
func (m *defaultNet) LocalIPAddrs() ([]net.IP, error) {
return common.LocalIPAddrs()
}

// Hostname returns the hostname reported by the OS
func (m *defaultNet) Hostname() (string, error) {
return os.Hostname()
}

type brokerFinder struct {
Net NetInfo
}

func (m *brokerFinder) findBroker(addr string, brokers []*sarama.Broker) *sarama.Broker {
lst := brokerAddresses(brokers)
if idx, found := findMatchingAddress(addr, lst); found {
if idx, found := m.findAddress(addr, lst); found {
return brokers[idx]
}
return nil
}

func findMatchingAddress(
addr string,
brokers []string,
) (int, bool) {
func (m *brokerFinder) findAddress(addr string, brokers []string) (int, bool) {
debugf("Try to match broker to: %v", addr)

// compare connection address to list of broker addresses
if i, found := indexOf(addr, brokers); found {
return i, true
}

// get connection 'port'
host, port, err := net.SplitHostPort(addr)
if err != nil || port == "" {
host = addr
port = "9092"
}

// compare connection address to list of broker addresses
if i, found := indexOf(net.JoinHostPort(host, port), brokers); found {
return i, true
}

// lookup local machines ips for comparing with broker addresses
localIPs, err := common.LocalIPAddrs()
localIPs, err := m.Net.LocalIPAddrs()
if err != nil || len(localIPs) == 0 {
return -1, false
}
debugf("local machine ips: %v", localIPs)

// try to find broker by comparing the fqdn for each known ip to list of
// brokers
localHosts := lookupHosts(localIPs)
localHosts := m.lookupHosts(localIPs)
debugf("local machine addresses: %v", localHosts)
for _, host := range localHosts {
debugf("try to match with fqdn: %v (%v)", host, port)
Expand All @@ -401,7 +431,7 @@ func findMatchingAddress(
// try matching ip of configured host with broker list, this would
// match if hosts of advertised addresses are IPs, but configured host
// is a hostname
ips, err := net.LookupIP(host)
ips, err := m.Net.LookupIP(host)
if err == nil {
for _, ip := range ips {
addr := net.JoinHostPort(ip.String(), port)
Expand All @@ -413,7 +443,7 @@ func findMatchingAddress(

// try to find broker id by comparing the machines local hostname to
// broker hostnames in metadata
if host, err := os.Hostname(); err == nil {
if host, err := m.Net.Hostname(); err == nil {
debugf("try to match with hostname only: %v (%v)", host, port)

tmp := net.JoinHostPort(strings.ToLower(host), port)
Expand All @@ -437,7 +467,7 @@ func findMatchingAddress(
}

// lookup all ips for brokers host:
ips, err := net.LookupIP(bh)
ips, err := m.Net.LookupIP(bh)
debugf("broker %v ips: %v, %v", bh, ips, err)
if err != nil {
continue
Expand All @@ -454,15 +484,15 @@ func findMatchingAddress(
return -1, false
}

func lookupHosts(ips []net.IP) []string {
func (m *brokerFinder) lookupHosts(ips []net.IP) []string {
set := map[string]struct{}{}
for _, ip := range ips {
txt, err := ip.MarshalText()
if err != nil {
continue
}

hosts, err := net.LookupAddr(string(txt))
hosts, err := m.Net.LookupAddr(string(txt))
debugf("lookup %v => %v, %v", string(txt), hosts, err)
if err != nil {
continue
Expand Down
159 changes: 159 additions & 0 deletions metricbeat/module/kafka/broker_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,159 @@
// Licensed to Elasticsearch B.V. under one or more contributor
// license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright
// ownership. Elasticsearch B.V. licenses this file to you under
// the Apache License, Version 2.0 (the "License"); you may
// not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

package kafka

import (
"net"
"testing"

"github.com/pkg/errors"
"github.com/stretchr/testify/assert"
)

type dummyNet struct{}

func (m *dummyNet) LookupIP(addr string) ([]net.IP, error) {
dns := map[string][]net.IP{
"kafka1": []net.IP{net.IPv4(10, 0, 0, 1)},
"kafka2": []net.IP{net.IPv4(10, 0, 0, 2)},
"kafka3": []net.IP{net.IPv4(10, 0, 0, 3)},
}
ips, found := dns[addr]
if !found {
return nil, errors.New("not found")
}
return ips, nil
}

func (m *dummyNet) LookupAddr(addr string) ([]string, error) {
dns := map[string][]string{
"10.0.0.1": []string{"kafka1"},
"10.0.0.2": []string{"kafka2"},
"10.0.0.3": []string{"kafka3"},
}
names, found := dns[addr]
if !found {
return nil, errors.New("not found")
}
return names, nil
}

func (m *dummyNet) LocalIPAddrs() ([]net.IP, error) {
return []net.IP{
net.IPv4(127, 0, 0, 1),
net.IPv4(10, 0, 0, 2),
net.IPv4(10, 1, 0, 2),
}, nil
}

func (m *dummyNet) Hostname() (string, error) {
return "kafka2", nil
}

func TestFindMatchingAddress(t *testing.T) {
cases := []struct {
title string
address string
brokers []string
index int
exists bool
}{
{
title: "exists",
address: "10.0.0.2:9092",
brokers: []string{"10.0.0.1:9092", "10.0.0.2:9092"},
index: 1,
exists: true,
},
{
title: "doesn't exist",
address: "8.8.8.8:9092",
brokers: []string{"10.0.0.1:9092", "10.0.0.2:9092"},
exists: false,
},
{
title: "exists on default port",
address: "10.0.0.2",
brokers: []string{"10.0.0.1:9092", "10.0.0.2:9092"},
index: 1,
exists: true,
},
{
title: "multiple brokers on same host",
address: "127.0.0.1:9093",
brokers: []string{"127.0.0.1:9092", "127.0.0.1:9093", "127.0.0.1:9094"},
index: 1,
exists: true,
},
{
title: "hostname",
address: "kafka2:9092",
brokers: []string{"kafka1:9092", "kafka2:9092", "kafka3:9092"},
index: 1,
exists: true,
},
{
title: "hostname and default port",
address: "kafka2",
brokers: []string{"kafka1:9092", "kafka2:9092", "kafka3:9092"},
index: 1,
exists: true,
},
{
title: "hostname and default port doesn't exist",
address: "kafka2",
brokers: []string{"kafka1:9092", "kafka2:9093", "kafka3:9092"},
exists: false,
},
{
title: "hostname with ip brokers",
address: "kafka2:9092",
brokers: []string{"10.0.0.1:9092", "10.0.0.2:9092", "10.0.0.3:9092"},
index: 1,
exists: true,
},
{
title: "ip with named brokers",
address: "10.0.0.2:9092",
brokers: []string{"kafka1:9092", "kafka2:9092", "kafka3:9092"},
index: 1,
exists: true,
},
{
title: "ip with multiple local brokers without name",
address: "10.1.0.2:9094",
brokers: []string{"10.1.0.2:9092", "10.1.0.2:9093", "10.1.0.2:9094"},
index: 2,
exists: true,
},
}

finder := brokerFinder{Net: &dummyNet{}}
for _, c := range cases {
t.Run(c.title, func(t *testing.T) {
i, found := finder.findAddress(c.address, c.brokers)
if c.exists {
if assert.True(t, found, "broker expected to be found") {
assert.Equal(t, c.index, i, "incorrect broker match")
}
} else {
assert.False(t, found, "broker shouldn't be found")
}
})
}
}