Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Metricbeat: Kafka integration tests with 1.1 #7616

Merged
merged 2 commits into from
Jul 17, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -307,6 +307,7 @@ https://github.com/elastic/beats/compare/v6.2.3...master[Check the HEAD diff]
- Add Elasticsearch ml_job metricsets. {pull}7196[7196]
- Add support for bearer token files to HTTP helper. {pull}7527[7527]
- Add Elasticsearch index recovery metricset. {pull}7225[7225]
- Run Kafka integration tests on version 1.1.0 {pull}7616[7616]

*Packetbeat*

Expand Down
9 changes: 8 additions & 1 deletion metricbeat/docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,14 @@ services:
build: ./module/jolokia/_meta

kafka:
build: ./module/kafka/_meta
build:
context: ./module/kafka/_meta
dockerfile: Dockerfile.1.1.0

kafka_0_10_2:
build:
context: ./module/kafka/_meta
dockerfile: Dockerfile.0.10.2

kibana:
build: ./module/kibana/_meta
Expand Down
2 changes: 2 additions & 0 deletions metricbeat/docs/modules/kafka.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@ This is the Kafka module.

The default metricsets are `consumergroup` and `partition`.

This module is tested with Kafka 0.10.2 and 1.1.0.


[float]
=== Example configuration
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,12 @@ FROM debian:stretch

ENV KAFKA_HOME /kafka
# The advertised host is kafka. This means it will not work if container is started locally and connected from localhost to it
ENV KAFKA_ADVERTISED_HOST kafka
ENV KAFKA_LOGS_DIR="/kafka-logs"
ENV KAFKA_VERSION 0.10.2.1
ENV _JAVA_OPTIONS "-Djava.net.preferIPv4Stack=true"
ENV TERM=linux

RUN apt-get update && apt-get install -y curl openjdk-8-jre-headless netcat
RUN apt-get update && apt-get install -y curl openjdk-8-jre-headless netcat dnsutils

RUN mkdir -p ${KAFKA_LOGS_DIR} && mkdir -p ${KAFKA_HOME} && curl -s -o $INSTALL_DIR/kafka.tgz \
"http://ftp.wayne.edu/apache/kafka/${KAFKA_VERSION}/kafka_2.11-${KAFKA_VERSION}.tgz" && \
Expand Down
25 changes: 25 additions & 0 deletions metricbeat/module/kafka/_meta/Dockerfile.1.1.0
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
FROM debian:stretch

ENV KAFKA_HOME /kafka
# The advertised host is kafka. This means it will not work if container is started locally and connected from localhost to it
ENV KAFKA_LOGS_DIR="/kafka-logs"
ENV KAFKA_VERSION 1.1.0
ENV _JAVA_OPTIONS "-Djava.net.preferIPv4Stack=true"
ENV TERM=linux

RUN apt-get update && apt-get install -y curl openjdk-8-jre-headless netcat dnsutils

RUN mkdir -p ${KAFKA_LOGS_DIR} && mkdir -p ${KAFKA_HOME} && curl -s -o $INSTALL_DIR/kafka.tgz \
"http://ftp.wayne.edu/apache/kafka/${KAFKA_VERSION}/kafka_2.11-${KAFKA_VERSION}.tgz" && \
tar xzf ${INSTALL_DIR}/kafka.tgz -C ${KAFKA_HOME} --strip-components 1

ADD run.sh /run.sh
ADD healthcheck.sh /healthcheck.sh

EXPOSE 9092
EXPOSE 2181

# Healthcheck creates an empty topic foo. As soon as a topic is created, it assumes broke is available
HEALTHCHECK --interval=1s --retries=90 CMD /healthcheck.sh

ENTRYPOINT ["/run.sh"]
2 changes: 2 additions & 0 deletions metricbeat/module/kafka/_meta/docs.asciidoc
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
This is the Kafka module.

The default metricsets are `consumergroup` and `partition`.

This module is tested with Kafka 0.10.2 and 1.1.0.
1 change: 1 addition & 0 deletions metricbeat/module/kafka/_meta/env
Original file line number Diff line number Diff line change
@@ -1,2 +1,3 @@
KAFKA_0_10_2_HOST=kafka_0_10_2
KAFKA_HOST=kafka
KAFKA_PORT=9092
2 changes: 2 additions & 0 deletions metricbeat/module/kafka/_meta/run.sh
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
#!/bin/bash

KAFKA_ADVERTISED_HOST=$(dig +short $HOSTNAME)

wait_for_port() {
count=20
port=$1
Expand Down
18 changes: 15 additions & 3 deletions metricbeat/module/kafka/broker.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@
package kafka

import (
"bytes"
"crypto/tls"
"fmt"
"io"
Expand Down Expand Up @@ -370,7 +369,7 @@ func findMatchingAddress(
}

// get connection 'port'
_, port, err := net.SplitHostPort(addr)
host, port, err := net.SplitHostPort(addr)
if err != nil || port == "" {
port = "9092"
}
Expand All @@ -393,6 +392,19 @@ func findMatchingAddress(
}
}

// try matching ip of configured host with broker list, this would
// match if hosts of advertised addresses are IPs, but configured host
// is a hostname
ips, err := net.LookupIP(host)
if err == nil {
for _, ip := range ips {
addr := net.JoinHostPort(ip.String(), port)
if i, found := indexOf(addr, brokers); found {
return i, true
}
}
}

// try to find broker id by comparing the machines local hostname to
// broker hostnames in metadata
if host, err := os.Hostname(); err == nil {
Expand Down Expand Up @@ -466,7 +478,7 @@ func lookupHosts(ips []net.IP) []string {
func anyIPsMatch(as, bs []net.IP) bool {
for _, a := range as {
for _, b := range bs {
if bytes.Equal(a, b) {
if a.Equal(b) {
return true
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -131,6 +131,7 @@ func generateKafkaData(t *testing.T, topic string) {
client, err := sarama.NewClient([]string{getTestKafkaHost()}, config)
if err != nil {
t.Errorf("%s", err)
t.FailNow()
}

producer, err := sarama.NewSyncProducerFromClient(client)
Expand All @@ -146,10 +147,13 @@ func generateKafkaData(t *testing.T, topic string) {

_, _, err = producer.SendMessage(msg)
if err != nil {
t.Errorf("FAILED to send message: %s\n", err)
t.Errorf("failed to send message: %s\n", err)
}

client.RefreshMetadata(topic)
err = client.RefreshMetadata(topic)
if err != nil {
t.Errorf("failed to refresh metadata for topic '%s': %s\n", topic, err)
}
}

func getConfig(topic string) map[string]interface{} {
Expand Down
2 changes: 1 addition & 1 deletion metricbeat/tests/system/requirements.txt
Original file line number Diff line number Diff line change
@@ -1,2 +1,2 @@
kafka-python==1.4.2
kafka-python==1.4.3
elasticsearch==6.2.0
14 changes: 8 additions & 6 deletions metricbeat/tests/system/test_kafka.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,7 @@
from nose.plugins.skip import SkipTest


class Test(metricbeat.BaseTest):

class KafkaTest(metricbeat.BaseTest):
COMPOSE_SERVICES = ['kafka']

@unittest.skipUnless(metricbeat.INTEGRATION_TESTS, "integration test")
Expand Down Expand Up @@ -35,13 +34,16 @@ def test_partition(self):
self.assert_fields_are_documented(evt)

def create_topic(self):

from kafka import KafkaProducer

producer = KafkaProducer(bootstrap_servers=self.get_hosts()[
0], retries=20, retry_backoff_ms=500, api_version=("0.10"))
producer = KafkaProducer(bootstrap_servers=self.get_hosts()[0],
retries=20, retry_backoff_ms=500)
producer.send('foobar', b'some_message_bytes')

def get_hosts(self):
return [os.getenv('KAFKA_HOST', 'localhost') + ':' +
return [self.compose_hosts()[0] + ':' +
os.getenv('KAFKA_PORT', '9092')]


class Kafka_0_10_2_Test(KafkaTest):
COMPOSE_SERVICES = ['kafka_0_10_2']