diff --git a/metricbeat/docker-compose.yml b/metricbeat/docker-compose.yml index ebbcc352c769..d6b9ecb917e6 100644 --- a/metricbeat/docker-compose.yml +++ b/metricbeat/docker-compose.yml @@ -4,6 +4,7 @@ services: beat: build: ${PWD}/. environment: + - BEAT_STRICT_PERMS=false - TEST_ENVIRONMENT=false working_dir: /go/src/github.com/elastic/beats/metricbeat volumes: diff --git a/metricbeat/docs/fields.asciidoc b/metricbeat/docs/fields.asciidoc index 2e42b7484e8f..9c554d1cfcaf 100644 --- a/metricbeat/docs/fields.asciidoc +++ b/metricbeat/docs/fields.asciidoc @@ -15436,6 +15436,24 @@ type: float -- +*`kafka.broker.request.produce.failed`*:: ++ +-- +The number of failed produce requests + +type: float + +-- + +*`kafka.broker.request.fetch.failed`*:: ++ +-- +The number of client fetch request failures + +type: float + +-- + *`kafka.broker.replication.leader_elections`*:: + -- @@ -15499,7 +15517,43 @@ type: float -- -*`kafka.broker.topic.net.bytes_in`*:: +*`kafka.broker.topic.net.in.bytes_per_sec`*:: ++ +-- +The incoming byte rate per topic + +type: float + +-- + +*`kafka.broker.topic.net.out.bytes_per_sec`*:: ++ +-- +The outgoing byte rate per topic + +type: float + +-- + +*`kafka.broker.topic.net.rejected.bytes_per_sec`*:: ++ +-- +The rejected byte rate per topic + +type: float + +-- + +*`kafka.broker.topic.messages_in`*:: ++ +-- +The incoming message rate per topic + +type: float + +-- + +*`kafka.broker.net.in.bytes_per_sec`*:: + -- The incoming byte rate @@ -15508,7 +15562,7 @@ type: float -- -*`kafka.broker.topic.net.bytes_out`*:: +*`kafka.broker.net.out.bytes_per_sec`*:: + -- The outgoing byte rate @@ -15517,7 +15571,7 @@ type: float -- -*`kafka.broker.topic.net.bytes_rejected`*:: +*`kafka.broker.net.rejected.bytes_per_sec`*:: + -- The rejected byte rate @@ -15526,7 +15580,7 @@ type: float -- -*`kafka.broker.topic.messages_in`*:: +*`kafka.broker.messages_in`*:: + -- The incoming message rate @@ -15577,7 +15631,7 @@ type: float -- -*`kafka.consumer.bytes_in`*:: +*`kafka.consumer.in.bytes_per_sec`*:: + -- The rate of bytes coming in to the consumer @@ -15586,6 +15640,42 @@ type: float -- +*`kafka.consumer.max_lag`*:: ++ +-- +The maximum consumer lag + +type: float + +-- + +*`kafka.consumer.zookeeper_commits`*:: ++ +-- +The rate of offset commits to ZooKeeper + +type: float + +-- + +*`kafka.consumer.kafka_commits`*:: ++ +-- +The rate of offset commits to Kafka + +type: float + +-- + +*`kafka.consumer.messages_in`*:: ++ +-- +The rate of consumer message consumption + +type: float + +-- + [float] === consumergroup @@ -16004,7 +16094,7 @@ type: float -- -*`kafka.producer.bytes_out`*:: +*`kafka.producer.out.bytes_per_sec`*:: + -- The rate of bytes going out for the producer @@ -16013,6 +16103,15 @@ type: float -- +*`kafka.producer.message_rate`*:: ++ +-- +The producer message rate + +type: float + +-- + [[exported-fields-kibana]] == Kibana fields diff --git a/metricbeat/module/kafka/broker/_meta/fields.yml b/metricbeat/module/kafka/broker/_meta/fields.yml index 4fa195e7b298..9239f8ef4556 100644 --- a/metricbeat/module/kafka/broker/_meta/fields.yml +++ b/metricbeat/module/kafka/broker/_meta/fields.yml @@ -15,6 +15,12 @@ - name: request.fetch.failed_per_second description: The rate of client fetch request failures per second type: float + - name: request.produce.failed + description: The number of failed produce requests + type: float + - name: request.fetch.failed + description: The number of client fetch request failures + type: float - name: replication.leader_elections description: The leader election rate type: float @@ -36,15 +42,27 @@ - name: log.flush_rate description: The log flush rate type: float - - name: topic.net.bytes_in + - name: topic.net.in.bytes_per_sec + description: The incoming byte rate per topic + type: float + - name: topic.net.out.bytes_per_sec + description: The outgoing byte rate per topic + type: float + - name: topic.net.rejected.bytes_per_sec + description: The rejected byte rate per topic + type: float + - name: topic.messages_in + description: The incoming message rate per topic + type: float + - name: net.in.bytes_per_sec description: The incoming byte rate type: float - - name: topic.net.bytes_out + - name: net.out.bytes_per_sec description: The outgoing byte rate type: float - - name: topic.net.bytes_rejected + - name: net.rejected.bytes_per_sec description: The rejected byte rate type: float - - name: topic.messages_in + - name: messages_in description: The incoming message rate type: float diff --git a/metricbeat/module/kafka/broker/broker_integration_test.go b/metricbeat/module/kafka/broker/broker_integration_test.go new file mode 100644 index 000000000000..092bafddb96e --- /dev/null +++ b/metricbeat/module/kafka/broker/broker_integration_test.go @@ -0,0 +1,66 @@ +// Licensed to Elasticsearch B.V. under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Elasticsearch B.V. licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +// +build integration + +package broker + +import ( + "testing" + "time" + + "github.com/stretchr/testify/assert" + + "github.com/elastic/beats/libbeat/tests/compose" + mbtest "github.com/elastic/beats/metricbeat/mb/testing" + // Register input module and metricset + _ "github.com/elastic/beats/metricbeat/module/jolokia" + _ "github.com/elastic/beats/metricbeat/module/jolokia/jmx" +) + +func TestData(t *testing.T) { + service := compose.EnsureUp(t, "kafka", + compose.UpWithTimeout(600*time.Second), + compose.UpWithAdvertisedHostEnvFileForPort(9092), + ) + + m := mbtest.NewFetcher(t, getConfig(service.HostForPort(8779))) + m.WriteEvents(t, "") +} + +func TestFetch(t *testing.T) { + service := compose.EnsureUp(t, "kafka", + compose.UpWithTimeout(600*time.Second), + compose.UpWithAdvertisedHostEnvFileForPort(9092), + ) + + m := mbtest.NewFetcher(t, getConfig(service.HostForPort(8779))) + events, errs := m.FetchEvents() + if len(errs) > 0 { + t.Fatalf("Expected 0 error, had %d. %v\n", len(errs), errs) + } + assert.NotEmpty(t, events) + t.Logf("%s/%s event: %+v", m.Module().Name(), m.Name(), events[0]) +} + +func getConfig(host string) map[string]interface{} { + return map[string]interface{}{ + "module": "kafka", + "metricsets": []string{"broker"}, + "hosts": []string{host}, + } +} diff --git a/metricbeat/module/kafka/broker/broker_test.go b/metricbeat/module/kafka/broker/broker_test.go new file mode 100644 index 000000000000..baa751cafd06 --- /dev/null +++ b/metricbeat/module/kafka/broker/broker_test.go @@ -0,0 +1,33 @@ +// Licensed to Elasticsearch B.V. under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Elasticsearch B.V. licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package broker + +import ( + "os" + + "github.com/elastic/beats/metricbeat/mb" + // Register input module and metricset + _ "github.com/elastic/beats/metricbeat/module/jolokia" + _ "github.com/elastic/beats/metricbeat/module/jolokia/jmx" +) + +func init() { + // To be moved to some kind of helper + os.Setenv("BEAT_STRICT_PERMS", "false") + mb.Registry.SetSecondarySource(mb.NewLightModulesSource("../../../module")) +} diff --git a/metricbeat/module/kafka/broker/manifest.yml b/metricbeat/module/kafka/broker/manifest.yml index e8c68c4459ed..c2d5b5433182 100644 --- a/metricbeat/module/kafka/broker/manifest.yml +++ b/metricbeat/module/kafka/broker/manifest.yml @@ -19,6 +19,14 @@ input: attributes: - attr: MeanRate field: request.fetch.failed_per_second + - mbean: 'kafka.server:name=FailedProduceRequestsPerSec,type=BrokerTopicMetrics' + attributes: + - attr: MeanRate + field: request.produce.failed + - mbean: 'kafka.server:name=FailedFetchRequestsPerSec,type=BrokerTopicMetrics' + attributes: + - attr: MeanRate + field: request.fetch.failed - mbean: 'kafka.controller:name=LeaderElectionRateAndTimeMs,type=ControllerStats' attributes: - attr: MeanRate @@ -47,19 +55,37 @@ input: attributes: - attr: MeanRate field: log.flush_rate + - mbean: 'kafka.server:name=BytesRejectedPerSec,topic=*,type=BrokerTopicMetrics' + attributes: + - attr: MeanRate + field: topic.net.rejected.bytes_per_sec + - mbean: 'kafka.server:name=BytesInPerSec,topic=*,type=BrokerTopicMetrics,topic=*' + attributes: + - attr: MeanRate + field: topic.net.in.bytes_per_sec + - mbean: 'kafka.server:name=BytesOutPerSec,topic=*,type=BrokerTopicMetrics,topic=*' + attributes: + - attr: MeanRate + field: topic.net.out.bytes_per_sec + - mbean: 'kafka.server:type=BrokerTopicMetrics,topic=*,name=MessagesInPerSec,topic=*' + attributes: + - attr: MeanRate + field: topic.messages_in - mbean: 'kafka.server:name=BytesRejectedPerSec,type=BrokerTopicMetrics' attributes: - attr: MeanRate - field: topic.net.bytes_rejected + field: net.rejected.bytes_per_sec - mbean: 'kafka.server:name=BytesInPerSec,type=BrokerTopicMetrics' attributes: - attr: MeanRate - field: topic.net.bytes_in + field: net.in.bytes_per_sec - mbean: 'kafka.server:name=BytesOutPerSec,type=BrokerTopicMetrics' attributes: - attr: MeanRate - field: topic.net.bytes_out + field: net.out.bytes_per_sec - mbean: 'kafka.server:type=BrokerTopicMetrics,name=MessagesInPerSec' attributes: - attr: MeanRate - field: topic.messages_in + field: messages_in + + diff --git a/metricbeat/module/kafka/consumer/_meta/fields.yml b/metricbeat/module/kafka/consumer/_meta/fields.yml index c9946d6d801c..f04b43026d2e 100644 --- a/metricbeat/module/kafka/consumer/_meta/fields.yml +++ b/metricbeat/module/kafka/consumer/_meta/fields.yml @@ -15,6 +15,18 @@ - name: records_consumed description: The average number of records consumed per second for a specific topic type: float - - name: bytes_in + - name: in.bytes_per_sec description: The rate of bytes coming in to the consumer type: float + - name: max_lag + description: The maximum consumer lag + type: float + - name: zookeeper_commits + description: The rate of offset commits to ZooKeeper + type: float + - name: kafka_commits + description: The rate of offset commits to Kafka + type: float + - name: messages_in + description: The rate of consumer message consumption + type: float diff --git a/metricbeat/module/kafka/consumer/consumer_integration_test.go b/metricbeat/module/kafka/consumer/consumer_integration_test.go new file mode 100644 index 000000000000..a338012741a5 --- /dev/null +++ b/metricbeat/module/kafka/consumer/consumer_integration_test.go @@ -0,0 +1,66 @@ +// Licensed to Elasticsearch B.V. under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Elasticsearch B.V. licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +// +build integration + +package consumer + +import ( + "testing" + "time" + + "github.com/stretchr/testify/assert" + + "github.com/elastic/beats/libbeat/tests/compose" + mbtest "github.com/elastic/beats/metricbeat/mb/testing" + // Register input module and metricset + _ "github.com/elastic/beats/metricbeat/module/jolokia" + _ "github.com/elastic/beats/metricbeat/module/jolokia/jmx" +) + +func TestData(t *testing.T) { + service := compose.EnsureUp(t, "kafka", + compose.UpWithTimeout(600*time.Second), + compose.UpWithAdvertisedHostEnvFileForPort(9092), + ) + + m := mbtest.NewFetcher(t, getConfig(service.HostForPort(8774))) + m.WriteEvents(t, "") +} + +func TestFetch(t *testing.T) { + service := compose.EnsureUp(t, "kafka", + compose.UpWithTimeout(600*time.Second), + compose.UpWithAdvertisedHostEnvFileForPort(9092), + ) + + m := mbtest.NewFetcher(t, getConfig(service.HostForPort(8774))) + events, errs := m.FetchEvents() + if len(errs) > 0 { + t.Fatalf("Expected 0 error, had %d. %v\n", len(errs), errs) + } + assert.NotEmpty(t, events) + t.Logf("%s/%s event: %+v", m.Module().Name(), m.Name(), events[0]) +} + +func getConfig(host string) map[string]interface{} { + return map[string]interface{}{ + "module": "kafka", + "metricsets": []string{"consumer"}, + "hosts": []string{host}, + } +} diff --git a/metricbeat/module/kafka/consumer/consumer_test.go b/metricbeat/module/kafka/consumer/consumer_test.go new file mode 100644 index 000000000000..e3dd80155668 --- /dev/null +++ b/metricbeat/module/kafka/consumer/consumer_test.go @@ -0,0 +1,33 @@ +// Licensed to Elasticsearch B.V. under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Elasticsearch B.V. licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package consumer + +import ( + "os" + + "github.com/elastic/beats/metricbeat/mb" + // Register input module and metricset + _ "github.com/elastic/beats/metricbeat/module/jolokia" + _ "github.com/elastic/beats/metricbeat/module/jolokia/jmx" +) + +func init() { + // To be moved to some kind of helper + os.Setenv("BEAT_STRICT_PERMS", "false") + mb.Registry.SetSecondarySource(mb.NewLightModulesSource("../../../module")) +} diff --git a/metricbeat/module/kafka/consumer/manifest.yml b/metricbeat/module/kafka/consumer/manifest.yml index 93b8b29ad70a..4d431393f147 100644 --- a/metricbeat/module/kafka/consumer/manifest.yml +++ b/metricbeat/module/kafka/consumer/manifest.yml @@ -18,4 +18,20 @@ input: - mbean: 'kafka.consumer:client-id=*,type=consumer-metrics' attributes: - attr: incoming-byte-total - field: bytes_in + field: in.bytes_per_sec + - mbean: 'kafka.consumer:client-id=*,type=consumer-fetch-manager-metrics' + attributes: + - attr: records-lag-max + field: max_lag + - mbean: 'kafka\.consumer:type=ZookeeperConsumerConnector,name=ZooKeeperCommitsPerSec,clientId=.*' + attributes: + - attr: Count + field: zookeeper_commits + - mbean: 'kafka\.consumer:type=ZookeeperConsumerConnector,name=KafkaCommitsPerSec,clientId=.*' + attributes: + - attr: Count + field: kafka_commits + - mbean: 'kafka\.consumer:type=ConsumerTopicMetrics,name=MessagesPerSec,clientId=.*' + attributes: + - attr: Count + field: messages_in diff --git a/metricbeat/module/kafka/fields.go b/metricbeat/module/kafka/fields.go index 4176d06ada76..fcbffca68c4f 100644 --- a/metricbeat/module/kafka/fields.go +++ b/metricbeat/module/kafka/fields.go @@ -32,5 +32,5 @@ func init() { // AssetKafka returns asset data. // This is the base64 encoded gzipped contents of ../metricbeat/module/kafka. func AssetKafka() string { - return "eJzUWk+P27YTve+nGOS0OUQ5/X6HPRRok6LYJmmCNAWKXgSaHFnsUqRCUt44n74gKcmy/lCS5U2bPa1lcd7jcGY4fPQLeMDjHTyQ7IHcAFhuBd7Bszfu87MbAIaGal5aruQd/HADAOC/g0KxSuANgMmVtilVMuP7O8iIMO6pRoHE4B3sndmMo2Dmzg9/AZIUeIJ0f/ZYule1qsr6yQjuuZmuqZ1WD6jbx2P2Jm2Gv5+8BXilpKkK1PCLGwr3MlO6IG4A5OSAsEOUoJEwyLQq4LYelhPJBJf7M5M2R6CNPU/ledJ5oT+X7nw4O3vczEeoHkR0Sp1pcXYzikMY02jMKNgDHh+V7hNZhkfYAbXlBlkLMVgzq0pOE/f/YN2G0BHYT86OtzmFgVornVDFhkg9j87CeFPgTCVDtJJoy93Y5Gz91iJ9aMwAZ1EUP7t0BCvuvzOwPyT/XCFwBirzEVue0KV/EHw4zyPk4LehA0Qy/ymAJlcpCHXsFmg1pyYkeCh19Te/vvuzM7YtcDu0ZGFeFzskMpZR79wLYHNiwebcAB5QWuDGoRGLDKxanKwNqMbPFRqb0JxIiSL5XGGFieFfMcbkU47g3mkWorYCfvSy6tQnUGrFKopJRrhAlpaoU4NUyWiNcTw0sZ5HGAi1ncaugRI1jFoKxDKhiI0yy9DS/HJeVHC3TN5K6yhnrdK4iV0pOPW7TyKQMNQpCqTuc79kD6iF96F531PdAF9JKpDIdC2Netw16Bg0xlH5qtQDYok6YdxQJSVSO0fjL6Xe+DFAhXK7Um1sw+IM6eCXkuvZnDpRCe8/DRfXoigpjsvZNCOehI45SrpmjXw61Wu7jYtQ+yQTlcnTkZAbZo3ag3/7kgCtGxq0ye5o0aQ8WukdHJdUFVzuwQ24BqSqZlNBVXavromp8W+kFueLZf3eRuACjSH7de6txywFbQCb5n1dG9EeIUYaifa777SV8DvcokwquORFVYQtklh4zDnNz49EBiUz55umAauADLu3JeERorG2PhuN5IDahYSsih1qt4n78Q07BpnSQMCUSHnGad12btjKqdJsC73awongicso14u8N59TTcvTeMvnl2vN1dniXpBl/aRafYSfMnSuRDR/sdyKxF/feOBUaqQup+7g/8n/LjoxX1N4gHnxYcoDEBMhICZELJgqnAkSzZNJ/HFxAmYEinU8+hhr1JgFAkl/VV/HgaYTdwprceB1VJIohfa8veyIt4rDSdiYc4TKMoP9VmaZBtYGfbABXFrVURF26AqWS6Q4g+J8J4Y1q04rY1VxYuJsASOWgLG6m6yjyKNq1fL5D8Pd66wtm5cnX3TFrCil0JCvqIdRPj8aw/cSWdPnu8Vwi+IblLpdmgrDDTVrYbl4FUjdv4bb4DiD1jp6gW3C2fP50pUr03fXpUTOTE0CFuj6hHT79Lm0qCURvc2jBugmbqxirN67x4ys37cjZeOSOD0QLshOYG3XNFLYnh9QdvTPlTEq8REj4XH53vqbN9wUvr6A2qfZcZtgT0PovTc8T+iCneiC9TxtP64Wf4tuKLYzLiAMg8uA5ukky6CzPcFSvg0CHmdwG7rjQTvZcZWZZrCpaXvLfTQ5AODMTDOopcsn8MPHYHncEdMekeYoaTpHa6eUGJ7qFzK7l4y7aDPAs8YB7rjPJRUVQ9ZcoHD5wpFp5V10Oxzc3v/+cdFMTDoTY08yCdtK2vMUJxsouML6/9z2TKFR8fKFaw+WlrXIlWSU36ZDph1cXk5T693IwvXOIENeS08l9TXfmmvx7UdyPq6B1VwuuTrfzmnyvDpsy8Il2Urx8kM9aky8bL/7TsVL0vRz6a7KMtSp17Cih2Z/32yJAFKoSvq9J4x1/bDS/euVWZGNWJqnhn/FlByip7iYRGmmzmKLgAvyZQ64IF+8drsYOHLzS5VmqUHJFqnG07Knw94qv6YarT5eTMRqjqw2VYvXWwn5ivwfImT85Xctxv/Li7U2TRo/DH/XsAZwRXrMAc79HmLRup+c2xT0q/zswZRKGrycQRi/gQJX6SPhszHWQt6/fA9uAFg+0Z7M3W0suBs9v9wI16Sqsv5+xXbYRPD/CQAA//8NHQ2M" + return "eJzUWs2O3DYSvs9TFHwaHyyfdg9zWGDXXgQT27HhOECQi8AmS93MSKRMUj3TfvqApKTWL0W1epx4TtOSqr6PVcVischX8ICnO3gg2QO5ATDc5HgHL97Z3y9uABhqqnhpuBR38J8bAAD3DgrJqhxvAPRBKpNSKTK+v4OM5No+VZgj0XgHe6s245gzfefEX4EgBZ4h7Z85lfZTJauyfjKB21fTVbVT8gFV+3hK36xO//c/pwHeSKGrAhX8ZEXhXmRSFcQKwIEcEXaIAhQSBpmSBdzWYgciWM7FvqfSHBBoo89ReZl0PhiOpTseznqPm/HkcgARHFJnWJzdTOIQxhRqPQn2gKdHqYZE4vAIO6IyXCNrIUY+M7LkNLH/j/w2hg7AfrF6nM45DFRKqoRKNkYaWHQRxqkCqyoZo5VEGW5lk57/1iJ9atQAZ0EUN7p0Aitsvx7Yb4J/rRA4A5m5iC3P6MI98DZc5uHn4PehA0Qw98uDJldJCHXsFmgUp9pPcJ/q6jc/f/i9I9smuB0aEjmvix0SEZpRH+wHYA7EgDlwDXhEYYBri0YMMjAyerI2oAq/VqhNQg9ECMyTrxVWmGj+DUNMvhwQ7DeNI2ot4KTjstOQQKkkqygmGeE5srRElWqkUgRzjOWhiHE8vCDUehq9GkpUMKnJE8tySUyQWYaGHi7nRXNu3eS0tIay2iqFV2DXt9sSKVEVO1QBc13BRvEcgqZZzaTMOXWrcZIjYahSzJHa30NVI0b+e2i+d67bAF8JmiMR6Voatdw16GjU2lL5JuUDYokqYVxTKQRSs0TjDynfORmgubSrdK1sQ7CO6eBTydVijjlT8d8/DxdbskmRn+LZNBLPQkefBF3jIzeHat9u45LLfZLllT6kEyE3njVyD+7rSwK0LvDQJFwku5NB3aTWJVguqCy42IOV8lnWDtgpvJiErMw6FrIye3ltFgr/RGqQraPSSF2NSoFakz3qlAfLkZ4zaplt8NcJhwtAr+D+C1Cv5e6V0FvdGwHXQDU73HW1drvPnqi223c/aL3tap2o9FpwwYuq8DOKGHg8cHro9w00Cqb75ZMGI4GMtzgxgeHDsNa+WMaRIyobEudyzsk37BhkUgEBXSLlGaf13mxDvUulYlvo1RrOBM9cJrmuJLg2cTX7g8Zqbp7ZfazsOXnt5CZPaU6CjSAXXOTJBVcbSmOZJaS2YEmpLAo+2jrMDlhmmUZbsTgpO962mllJwTUJt8O/6/Qao+0cn0TbjeA5rflk6h+4Ly/IqcMUurqrOaeo35xt/kKZNJBthso9p1IhtRn0Dv6d/Ctkv9km4jV7sbDcj52zAIT6shDqzUYMFXo92ubJLP50vxYWerbreAwxpjPhdG6P6BkPvfo2DDSfpuewogOv0zgOUmhbkHFdr1Uczr3eJUP4nBZPoQPSBn2dF7kwstNY3aFdluxECjMo+nUXrPE6rbSRRTc/GgKMGALaqO5knUSebODHj38c7m5Vadm8Ptui298PUvJ78hX5MMjnv1rzvUDWbPWtM6xTXDlaF8dzYbghZ0Wmizee1P1buPWG02iMpefZJpy9XE5dB6mH5rqUSE/VLGCBtipMtw+fC4NKkHyweNQA3Ykbyhir1+4pJevX7UDauCROj4TnZJdjrVc3pwN7fkTRORJaGaMCHzEQHpevrb84xU3iG54pDWl2zJaz5yH00SleJnTBSnSBP8/Lj83F36MaCq2MEYRhdD7aPJ1l6Vvtz+DK976Hzxnc+up4VE52TKXnGWwq2t5zF00WADjT8wzq04tnsMNnr3naEPMWEfokaLpEaydlPu7hRDK7F4zbaNPAs8YAwDVwQfOKIWvOlLl4Zcm0JzxoVzi4vf/1c9RIdLoQY88yCNOeai1TnC2g4Ar+/39bM/lCxTWrbHkQm9YCtzSC/DZtMs3oPsc8tcElFbjeHmTMK3ZXUt98WHNTaPuWnE93PGsul9wm2s5pdr86Lsv8QfjKVvWnWmqqVd2++0Fb1aSp59JdlWWoUtepDG6a3RUcQ3IghayEW3u8rK2HpRqesC42pImhh1Tzb5iS42JPc64href2YlHABXlaAm6aqdHAgcswVCqWahQs6oxgvsltsbc221OFRp0uJmIUR1arqo8qthJyGfkfRMg3++ujl7/ZWWunSWOH8VWvNYArpscS4NIVsSi/n43bJPSr3ATTpRQaL2fg5TdQ4DJ9JHwxxlrI+9cfwQqA4TPlyTzW6tPw/lGWPxiXlXGnaqbDaiWP+pwkyurleSGOOqn+KwAA//8jv6eW" } diff --git a/metricbeat/module/kafka/producer/_meta/fields.yml b/metricbeat/module/kafka/producer/_meta/fields.yml index 1edd723e0a4b..d4d97d8d6f31 100644 --- a/metricbeat/module/kafka/producer/_meta/fields.yml +++ b/metricbeat/module/kafka/producer/_meta/fields.yml @@ -42,6 +42,9 @@ - name: io_wait description: The producer I/O wait time type: float - - name: bytes_out + - name: out.bytes_per_sec description: The rate of bytes going out for the producer type: float + - name: message_rate + description: The producer message rate + type: float diff --git a/metricbeat/module/kafka/producer/manifest.yml b/metricbeat/module/kafka/producer/manifest.yml index 9f8af3452f9b..a10e366103b5 100644 --- a/metricbeat/module/kafka/producer/manifest.yml +++ b/metricbeat/module/kafka/producer/manifest.yml @@ -33,7 +33,14 @@ input: field: response_rate - attr: io-wait-time-ns-avg field: io_wait - - mbean: 'kafka.producer:client-id=console-producer,node-id=*,type=producer-node-metrics' + - mbean: 'kafka.producer:client-id=*,node-id=*,type=producer-node-metrics' attributes: - attr: outgoing-byte-total - field: bytes_out + field: out.bytes_per_sec + - mbean: 'kafka\.producer:type=ProducerTopicMetrics,name=MessagesPerSec,clientId=.*' + attributes: + - attr: Count + field: message_rate + + + diff --git a/metricbeat/module/kafka/producer/producer_integration_test.go b/metricbeat/module/kafka/producer/producer_integration_test.go new file mode 100644 index 000000000000..0513f06023e1 --- /dev/null +++ b/metricbeat/module/kafka/producer/producer_integration_test.go @@ -0,0 +1,66 @@ +// Licensed to Elasticsearch B.V. under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Elasticsearch B.V. licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +// +build integration + +package producer + +import ( + "testing" + "time" + + "github.com/stretchr/testify/assert" + + "github.com/elastic/beats/libbeat/tests/compose" + mbtest "github.com/elastic/beats/metricbeat/mb/testing" + // Register input module and metricset + _ "github.com/elastic/beats/metricbeat/module/jolokia" + _ "github.com/elastic/beats/metricbeat/module/jolokia/jmx" +) + +func TestData(t *testing.T) { + service := compose.EnsureUp(t, "kafka", + compose.UpWithTimeout(600*time.Second), + compose.UpWithAdvertisedHostEnvFileForPort(9092), + ) + + m := mbtest.NewFetcher(t, getConfig(service.HostForPort(8775))) + m.WriteEvents(t, "") +} + +func TestFetch(t *testing.T) { + service := compose.EnsureUp(t, "kafka", + compose.UpWithTimeout(600*time.Second), + compose.UpWithAdvertisedHostEnvFileForPort(9092), + ) + + m := mbtest.NewFetcher(t, getConfig(service.HostForPort(8775))) + events, errs := m.FetchEvents() + if len(errs) > 0 { + t.Fatalf("Expected 0 error, had %d. %v\n", len(errs), errs) + } + assert.NotEmpty(t, events) + t.Logf("%s/%s event: %+v", m.Module().Name(), m.Name(), events[0]) +} + +func getConfig(host string) map[string]interface{} { + return map[string]interface{}{ + "module": "kafka", + "metricsets": []string{"producer"}, + "hosts": []string{host}, + } +} diff --git a/metricbeat/module/kafka/producer/producer_test.go b/metricbeat/module/kafka/producer/producer_test.go new file mode 100644 index 000000000000..2cf33e994a56 --- /dev/null +++ b/metricbeat/module/kafka/producer/producer_test.go @@ -0,0 +1,33 @@ +// Licensed to Elasticsearch B.V. under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Elasticsearch B.V. licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package producer + +import ( + "os" + + "github.com/elastic/beats/metricbeat/mb" + // Register input module and metricset + _ "github.com/elastic/beats/metricbeat/module/jolokia" + _ "github.com/elastic/beats/metricbeat/module/jolokia/jmx" +) + +func init() { + // To be moved to some kind of helper + os.Setenv("BEAT_STRICT_PERMS", "false") + mb.Registry.SetSecondarySource(mb.NewLightModulesSource("../../../module")) +} diff --git a/metricbeat/tests/system/test_kafka.py b/metricbeat/tests/system/test_kafka.py index c0680567dbea..aec402ed38a1 100644 --- a/metricbeat/tests/system/test_kafka.py +++ b/metricbeat/tests/system/test_kafka.py @@ -3,6 +3,7 @@ import unittest from nose.plugins.attrib import attr from nose.plugins.skip import SkipTest +from parameterized import parameterized class KafkaTest(metricbeat.BaseTest): @@ -36,7 +37,36 @@ def test_partition(self): "password": self.PASSWORD, }]) proc = self.start_beat() - self.wait_until(lambda: self.output_lines() > 0, max_timeout=20) + self.wait_until(lambda: self.output_lines() > 0, max_timeout=60) + proc.check_kill_and_wait() + + output = self.read_output_json() + self.assertTrue(len(output) >= 1) + evt = output[0] + print(evt) + + self.assert_fields_are_documented(evt) + + @parameterized.expand([ + ('consumer', '8774/tcp'), + ('producer', '8775/tcp'), + ('broker', '8779/tcp'), + ]) + @unittest.skipUnless(metricbeat.INTEGRATION_TESTS, "integration test") + def test_kafka_jmx(self, metricset, port): + """ + kafka jmx metricsets tests + """ + host = self.compose_host(port=port) + modules = [{ + "name": "kafka", + "metricsets": [metricset], + "hosts": [host], + "period": "1s" + }] + self.render_config_template(modules=modules) + proc = self.start_beat(home=self.beat_path) + self.wait_until(lambda: self.output_lines() > 0, max_timeout=60) proc.check_kill_and_wait() output = self.read_output_json() diff --git a/x-pack/metricbeat/docker-compose.yml b/x-pack/metricbeat/docker-compose.yml index 9e4e5f3ae8f0..34d8ea5f6404 100644 --- a/x-pack/metricbeat/docker-compose.yml +++ b/x-pack/metricbeat/docker-compose.yml @@ -3,6 +3,7 @@ services: beat: build: ../../metricbeat environment: + - BEAT_STRICT_PERMS=false - TEST_ENVIRONMENT=false working_dir: /go/src/github.com/elastic/beats/x-pack/metricbeat volumes: