Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Cherry-pick #14677 to 7.x: Add integration/system tests for Kafka JMX metricsets #14784

Merged
merged 1 commit into from
Nov 26, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions metricbeat/docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ services:
beat:
build: ${PWD}/.
environment:
- BEAT_STRICT_PERMS=false
- TEST_ENVIRONMENT=false
working_dir: /go/src/github.com/elastic/beats/metricbeat
volumes:
Expand Down
111 changes: 105 additions & 6 deletions metricbeat/docs/fields.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -15007,6 +15007,24 @@ type: float

--

*`kafka.broker.request.produce.failed`*::
+
--
The number of failed produce requests

type: float

--

*`kafka.broker.request.fetch.failed`*::
+
--
The number of client fetch request failures

type: float

--

*`kafka.broker.replication.leader_elections`*::
+
--
Expand Down Expand Up @@ -15070,7 +15088,43 @@ type: float

--

*`kafka.broker.topic.net.bytes_in`*::
*`kafka.broker.topic.net.in.bytes_per_sec`*::
+
--
The incoming byte rate per topic

type: float

--

*`kafka.broker.topic.net.out.bytes_per_sec`*::
+
--
The outgoing byte rate per topic

type: float

--

*`kafka.broker.topic.net.rejected.bytes_per_sec`*::
+
--
The rejected byte rate per topic

type: float

--

*`kafka.broker.topic.messages_in`*::
+
--
The incoming message rate per topic

type: float

--

*`kafka.broker.net.in.bytes_per_sec`*::
+
--
The incoming byte rate
Expand All @@ -15079,7 +15133,7 @@ type: float

--

*`kafka.broker.topic.net.bytes_out`*::
*`kafka.broker.net.out.bytes_per_sec`*::
+
--
The outgoing byte rate
Expand All @@ -15088,7 +15142,7 @@ type: float

--

*`kafka.broker.topic.net.bytes_rejected`*::
*`kafka.broker.net.rejected.bytes_per_sec`*::
+
--
The rejected byte rate
Expand All @@ -15097,7 +15151,7 @@ type: float

--

*`kafka.broker.topic.messages_in`*::
*`kafka.broker.messages_in`*::
+
--
The incoming message rate
Expand Down Expand Up @@ -15148,7 +15202,7 @@ type: float

--

*`kafka.consumer.bytes_in`*::
*`kafka.consumer.in.bytes_per_sec`*::
+
--
The rate of bytes coming in to the consumer
Expand All @@ -15157,6 +15211,42 @@ type: float

--

*`kafka.consumer.max_lag`*::
+
--
The maximum consumer lag

type: float

--

*`kafka.consumer.zookeeper_commits`*::
+
--
The rate of offset commits to ZooKeeper

type: float

--

*`kafka.consumer.kafka_commits`*::
+
--
The rate of offset commits to Kafka

type: float

--

*`kafka.consumer.messages_in`*::
+
--
The rate of consumer message consumption

type: float

--

[float]
=== consumergroup

Expand Down Expand Up @@ -15575,7 +15665,7 @@ type: float

--

*`kafka.producer.bytes_out`*::
*`kafka.producer.out.bytes_per_sec`*::
+
--
The rate of bytes going out for the producer
Expand All @@ -15584,6 +15674,15 @@ type: float

--

*`kafka.producer.message_rate`*::
+
--
The producer message rate

type: float

--

[[exported-fields-kibana]]
== Kibana fields

Expand Down
26 changes: 22 additions & 4 deletions metricbeat/module/kafka/broker/_meta/fields.yml
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,12 @@
- name: request.fetch.failed_per_second
description: The rate of client fetch request failures per second
type: float
- name: request.produce.failed
description: The number of failed produce requests
type: float
- name: request.fetch.failed
description: The number of client fetch request failures
type: float
- name: replication.leader_elections
description: The leader election rate
type: float
Expand All @@ -36,15 +42,27 @@
- name: log.flush_rate
description: The log flush rate
type: float
- name: topic.net.bytes_in
- name: topic.net.in.bytes_per_sec
description: The incoming byte rate per topic
type: float
- name: topic.net.out.bytes_per_sec
description: The outgoing byte rate per topic
type: float
- name: topic.net.rejected.bytes_per_sec
description: The rejected byte rate per topic
type: float
- name: topic.messages_in
description: The incoming message rate per topic
type: float
- name: net.in.bytes_per_sec
description: The incoming byte rate
type: float
- name: topic.net.bytes_out
- name: net.out.bytes_per_sec
description: The outgoing byte rate
type: float
- name: topic.net.bytes_rejected
- name: net.rejected.bytes_per_sec
description: The rejected byte rate
type: float
- name: topic.messages_in
- name: messages_in
description: The incoming message rate
type: float
66 changes: 66 additions & 0 deletions metricbeat/module/kafka/broker/broker_integration_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
// Licensed to Elasticsearch B.V. under one or more contributor
// license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright
// ownership. Elasticsearch B.V. licenses this file to you under
// the Apache License, Version 2.0 (the "License"); you may
// not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

// +build integration

package broker

import (
"testing"
"time"

"github.com/stretchr/testify/assert"

"github.com/elastic/beats/libbeat/tests/compose"
mbtest "github.com/elastic/beats/metricbeat/mb/testing"
// Register input module and metricset
_ "github.com/elastic/beats/metricbeat/module/jolokia"
_ "github.com/elastic/beats/metricbeat/module/jolokia/jmx"
)

func TestData(t *testing.T) {
service := compose.EnsureUp(t, "kafka",
compose.UpWithTimeout(600*time.Second),
compose.UpWithAdvertisedHostEnvFileForPort(9092),
)

m := mbtest.NewFetcher(t, getConfig(service.HostForPort(8779)))
m.WriteEvents(t, "")
}

func TestFetch(t *testing.T) {
service := compose.EnsureUp(t, "kafka",
compose.UpWithTimeout(600*time.Second),
compose.UpWithAdvertisedHostEnvFileForPort(9092),
)

m := mbtest.NewFetcher(t, getConfig(service.HostForPort(8779)))
events, errs := m.FetchEvents()
if len(errs) > 0 {
t.Fatalf("Expected 0 error, had %d. %v\n", len(errs), errs)
}
assert.NotEmpty(t, events)
t.Logf("%s/%s event: %+v", m.Module().Name(), m.Name(), events[0])
}

func getConfig(host string) map[string]interface{} {
return map[string]interface{}{
"module": "kafka",
"metricsets": []string{"broker"},
"hosts": []string{host},
}
}
33 changes: 33 additions & 0 deletions metricbeat/module/kafka/broker/broker_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
// Licensed to Elasticsearch B.V. under one or more contributor
// license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright
// ownership. Elasticsearch B.V. licenses this file to you under
// the Apache License, Version 2.0 (the "License"); you may
// not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

package broker

import (
"os"

"github.com/elastic/beats/metricbeat/mb"
// Register input module and metricset
_ "github.com/elastic/beats/metricbeat/module/jolokia"
_ "github.com/elastic/beats/metricbeat/module/jolokia/jmx"
)

func init() {
// To be moved to some kind of helper
os.Setenv("BEAT_STRICT_PERMS", "false")
mb.Registry.SetSecondarySource(mb.NewLightModulesSource("../../../module"))
}
34 changes: 30 additions & 4 deletions metricbeat/module/kafka/broker/manifest.yml
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,14 @@ input:
attributes:
- attr: MeanRate
field: request.fetch.failed_per_second
- mbean: 'kafka.server:name=FailedProduceRequestsPerSec,type=BrokerTopicMetrics'
attributes:
- attr: MeanRate
field: request.produce.failed
- mbean: 'kafka.server:name=FailedFetchRequestsPerSec,type=BrokerTopicMetrics'
attributes:
- attr: MeanRate
field: request.fetch.failed
- mbean: 'kafka.controller:name=LeaderElectionRateAndTimeMs,type=ControllerStats'
attributes:
- attr: MeanRate
Expand Down Expand Up @@ -47,19 +55,37 @@ input:
attributes:
- attr: MeanRate
field: log.flush_rate
- mbean: 'kafka.server:name=BytesRejectedPerSec,topic=*,type=BrokerTopicMetrics'
attributes:
- attr: MeanRate
field: topic.net.rejected.bytes_per_sec
- mbean: 'kafka.server:name=BytesInPerSec,topic=*,type=BrokerTopicMetrics,topic=*'
attributes:
- attr: MeanRate
field: topic.net.in.bytes_per_sec
- mbean: 'kafka.server:name=BytesOutPerSec,topic=*,type=BrokerTopicMetrics,topic=*'
attributes:
- attr: MeanRate
field: topic.net.out.bytes_per_sec
- mbean: 'kafka.server:type=BrokerTopicMetrics,topic=*,name=MessagesInPerSec,topic=*'
attributes:
- attr: MeanRate
field: topic.messages_in
- mbean: 'kafka.server:name=BytesRejectedPerSec,type=BrokerTopicMetrics'
attributes:
- attr: MeanRate
field: topic.net.bytes_rejected
field: net.rejected.bytes_per_sec
- mbean: 'kafka.server:name=BytesInPerSec,type=BrokerTopicMetrics'
attributes:
- attr: MeanRate
field: topic.net.bytes_in
field: net.in.bytes_per_sec
- mbean: 'kafka.server:name=BytesOutPerSec,type=BrokerTopicMetrics'
attributes:
- attr: MeanRate
field: topic.net.bytes_out
field: net.out.bytes_per_sec
- mbean: 'kafka.server:type=BrokerTopicMetrics,name=MessagesInPerSec'
attributes:
- attr: MeanRate
field: topic.messages_in
field: messages_in


Loading