diff --git a/CHANGELOG.asciidoc b/CHANGELOG.asciidoc index 6a59ed76ba7..a533f1fbbf0 100644 --- a/CHANGELOG.asciidoc +++ b/CHANGELOG.asciidoc @@ -307,6 +307,7 @@ https://github.com/elastic/beats/compare/v6.2.3...master[Check the HEAD diff] - Add Elasticsearch ml_job metricsets. {pull}7196[7196] - Add support for bearer token files to HTTP helper. {pull}7527[7527] - Add Elasticsearch index recovery metricset. {pull}7225[7225] +- Run Kafka integration tests on version 1.1.0 {pull}7616[7616] *Packetbeat* diff --git a/metricbeat/docker-compose.yml b/metricbeat/docker-compose.yml index 8bb5435df2f..0762a0aac20 100644 --- a/metricbeat/docker-compose.yml +++ b/metricbeat/docker-compose.yml @@ -92,7 +92,14 @@ services: build: ./module/jolokia/_meta kafka: - build: ./module/kafka/_meta + build: + context: ./module/kafka/_meta + dockerfile: Dockerfile.1.1.0 + + kafka_0_10_2: + build: + context: ./module/kafka/_meta + dockerfile: Dockerfile.0.10.2 kibana: build: ./module/kibana/_meta diff --git a/metricbeat/docs/modules/kafka.asciidoc b/metricbeat/docs/modules/kafka.asciidoc index ad1a0de3c70..faacaaea5b7 100644 --- a/metricbeat/docs/modules/kafka.asciidoc +++ b/metricbeat/docs/modules/kafka.asciidoc @@ -11,6 +11,8 @@ This is the Kafka module. The default metricsets are `consumergroup` and `partition`. +This module is tested with Kafka 0.10.2 and 1.1.0. + [float] === Example configuration diff --git a/metricbeat/module/kafka/_meta/Dockerfile b/metricbeat/module/kafka/_meta/Dockerfile.0.10.2 similarity index 95% rename from metricbeat/module/kafka/_meta/Dockerfile rename to metricbeat/module/kafka/_meta/Dockerfile.0.10.2 index 4cf9f1b377d..52ae3ff0339 100644 --- a/metricbeat/module/kafka/_meta/Dockerfile +++ b/metricbeat/module/kafka/_meta/Dockerfile.0.10.2 @@ -2,13 +2,12 @@ FROM debian:stretch ENV KAFKA_HOME /kafka # The advertised host is kafka. This means it will not work if container is started locally and connected from localhost to it -ENV KAFKA_ADVERTISED_HOST kafka ENV KAFKA_LOGS_DIR="/kafka-logs" ENV KAFKA_VERSION 0.10.2.1 ENV _JAVA_OPTIONS "-Djava.net.preferIPv4Stack=true" ENV TERM=linux -RUN apt-get update && apt-get install -y curl openjdk-8-jre-headless netcat +RUN apt-get update && apt-get install -y curl openjdk-8-jre-headless netcat dnsutils RUN mkdir -p ${KAFKA_LOGS_DIR} && mkdir -p ${KAFKA_HOME} && curl -s -o $INSTALL_DIR/kafka.tgz \ "http://ftp.wayne.edu/apache/kafka/${KAFKA_VERSION}/kafka_2.11-${KAFKA_VERSION}.tgz" && \ diff --git a/metricbeat/module/kafka/_meta/Dockerfile.1.1.0 b/metricbeat/module/kafka/_meta/Dockerfile.1.1.0 new file mode 100644 index 00000000000..54f0d105f12 --- /dev/null +++ b/metricbeat/module/kafka/_meta/Dockerfile.1.1.0 @@ -0,0 +1,25 @@ +FROM debian:stretch + +ENV KAFKA_HOME /kafka +# The advertised host is kafka. This means it will not work if container is started locally and connected from localhost to it +ENV KAFKA_LOGS_DIR="/kafka-logs" +ENV KAFKA_VERSION 1.1.0 +ENV _JAVA_OPTIONS "-Djava.net.preferIPv4Stack=true" +ENV TERM=linux + +RUN apt-get update && apt-get install -y curl openjdk-8-jre-headless netcat dnsutils + +RUN mkdir -p ${KAFKA_LOGS_DIR} && mkdir -p ${KAFKA_HOME} && curl -s -o $INSTALL_DIR/kafka.tgz \ + "http://ftp.wayne.edu/apache/kafka/${KAFKA_VERSION}/kafka_2.11-${KAFKA_VERSION}.tgz" && \ + tar xzf ${INSTALL_DIR}/kafka.tgz -C ${KAFKA_HOME} --strip-components 1 + +ADD run.sh /run.sh +ADD healthcheck.sh /healthcheck.sh + +EXPOSE 9092 +EXPOSE 2181 + +# Healthcheck creates an empty topic foo. As soon as a topic is created, it assumes broke is available +HEALTHCHECK --interval=1s --retries=90 CMD /healthcheck.sh + +ENTRYPOINT ["/run.sh"] diff --git a/metricbeat/module/kafka/_meta/docs.asciidoc b/metricbeat/module/kafka/_meta/docs.asciidoc index 98a1435c16e..a8682ea194b 100644 --- a/metricbeat/module/kafka/_meta/docs.asciidoc +++ b/metricbeat/module/kafka/_meta/docs.asciidoc @@ -1,3 +1,5 @@ This is the Kafka module. The default metricsets are `consumergroup` and `partition`. + +This module is tested with Kafka 0.10.2 and 1.1.0. diff --git a/metricbeat/module/kafka/_meta/env b/metricbeat/module/kafka/_meta/env index caf678712bf..227657f404e 100644 --- a/metricbeat/module/kafka/_meta/env +++ b/metricbeat/module/kafka/_meta/env @@ -1,2 +1,3 @@ +KAFKA_0_10_2_HOST=kafka_0_10_2 KAFKA_HOST=kafka KAFKA_PORT=9092 diff --git a/metricbeat/module/kafka/_meta/run.sh b/metricbeat/module/kafka/_meta/run.sh index adec90dd18e..36b42a6a0d5 100755 --- a/metricbeat/module/kafka/_meta/run.sh +++ b/metricbeat/module/kafka/_meta/run.sh @@ -1,5 +1,7 @@ #!/bin/bash +KAFKA_ADVERTISED_HOST=$(dig +short $HOSTNAME) + wait_for_port() { count=20 port=$1 diff --git a/metricbeat/module/kafka/broker.go b/metricbeat/module/kafka/broker.go index 736e858ae28..dd5a15b47a8 100644 --- a/metricbeat/module/kafka/broker.go +++ b/metricbeat/module/kafka/broker.go @@ -18,7 +18,6 @@ package kafka import ( - "bytes" "crypto/tls" "fmt" "io" @@ -370,7 +369,7 @@ func findMatchingAddress( } // get connection 'port' - _, port, err := net.SplitHostPort(addr) + host, port, err := net.SplitHostPort(addr) if err != nil || port == "" { port = "9092" } @@ -393,6 +392,19 @@ func findMatchingAddress( } } + // try matching ip of configured host with broker list, this would + // match if hosts of advertised addresses are IPs, but configured host + // is a hostname + ips, err := net.LookupIP(host) + if err == nil { + for _, ip := range ips { + addr := net.JoinHostPort(ip.String(), port) + if i, found := indexOf(addr, brokers); found { + return i, true + } + } + } + // try to find broker id by comparing the machines local hostname to // broker hostnames in metadata if host, err := os.Hostname(); err == nil { @@ -466,7 +478,7 @@ func lookupHosts(ips []net.IP) []string { func anyIPsMatch(as, bs []net.IP) bool { for _, a := range as { for _, b := range bs { - if bytes.Equal(a, b) { + if a.Equal(b) { return true } } diff --git a/metricbeat/module/kafka/partition/partition_integration_test.go b/metricbeat/module/kafka/partition/partition_integration_test.go index 973b81e6cb1..f96e35a5396 100644 --- a/metricbeat/module/kafka/partition/partition_integration_test.go +++ b/metricbeat/module/kafka/partition/partition_integration_test.go @@ -131,6 +131,7 @@ func generateKafkaData(t *testing.T, topic string) { client, err := sarama.NewClient([]string{getTestKafkaHost()}, config) if err != nil { t.Errorf("%s", err) + t.FailNow() } producer, err := sarama.NewSyncProducerFromClient(client) @@ -146,10 +147,13 @@ func generateKafkaData(t *testing.T, topic string) { _, _, err = producer.SendMessage(msg) if err != nil { - t.Errorf("FAILED to send message: %s\n", err) + t.Errorf("failed to send message: %s\n", err) } - client.RefreshMetadata(topic) + err = client.RefreshMetadata(topic) + if err != nil { + t.Errorf("failed to refresh metadata for topic '%s': %s\n", topic, err) + } } func getConfig(topic string) map[string]interface{} { diff --git a/metricbeat/tests/system/requirements.txt b/metricbeat/tests/system/requirements.txt index 14fc6adf455..d7d946f4b19 100644 --- a/metricbeat/tests/system/requirements.txt +++ b/metricbeat/tests/system/requirements.txt @@ -1,2 +1,2 @@ -kafka-python==1.4.2 +kafka-python==1.4.3 elasticsearch==6.2.0 diff --git a/metricbeat/tests/system/test_kafka.py b/metricbeat/tests/system/test_kafka.py index 72f388e63be..4a21ee3d060 100644 --- a/metricbeat/tests/system/test_kafka.py +++ b/metricbeat/tests/system/test_kafka.py @@ -5,8 +5,7 @@ from nose.plugins.skip import SkipTest -class Test(metricbeat.BaseTest): - +class KafkaTest(metricbeat.BaseTest): COMPOSE_SERVICES = ['kafka'] @unittest.skipUnless(metricbeat.INTEGRATION_TESTS, "integration test") @@ -35,13 +34,16 @@ def test_partition(self): self.assert_fields_are_documented(evt) def create_topic(self): - from kafka import KafkaProducer - producer = KafkaProducer(bootstrap_servers=self.get_hosts()[ - 0], retries=20, retry_backoff_ms=500, api_version=("0.10")) + producer = KafkaProducer(bootstrap_servers=self.get_hosts()[0], + retries=20, retry_backoff_ms=500) producer.send('foobar', b'some_message_bytes') def get_hosts(self): - return [os.getenv('KAFKA_HOST', 'localhost') + ':' + + return [self.compose_hosts()[0] + ':' + os.getenv('KAFKA_PORT', '9092')] + + +class Kafka_0_10_2_Test(KafkaTest): + COMPOSE_SERVICES = ['kafka_0_10_2']