From 0f2b29a0de67e9c9abb1ca4a96ec51699ec25345 Mon Sep 17 00:00:00 2001 From: Shaunak Kashyap Date: Wed, 19 Sep 2018 04:50:45 -0700 Subject: [PATCH] Adding elasticsearch/ccr metricset (#8335) This PR adds the `elasticsearch/ccr` metricset which collects cross-cluster replication (CCR) stats from the `GET _ccr/stats` Elasticsearch API and indexes them in `metricbeat-*`. --- CHANGELOG.asciidoc | 1 + metricbeat/docs/fields.asciidoc | 79 ++++++++++++++ .../docs/modules/elasticsearch.asciidoc | 4 + .../docs/modules/elasticsearch/ccr.asciidoc | 23 ++++ metricbeat/docs/modules_list.asciidoc | 3 +- metricbeat/helper/elastic/elastic.go | 16 +++ metricbeat/include/list.go | 1 + .../module/elasticsearch/ccr/_meta/data.json | 39 +++++++ .../elasticsearch/ccr/_meta/docs.asciidoc | 3 + .../module/elasticsearch/ccr/_meta/fields.yml | 40 +++++++ .../ccr/_meta/test/ccr_stats.700.json | 98 +++++++++++++++++ .../ccr/_meta/test/empty.700.json | 1 + metricbeat/module/elasticsearch/ccr/ccr.go | 100 ++++++++++++++++++ metricbeat/module/elasticsearch/ccr/data.go | 97 +++++++++++++++++ .../module/elasticsearch/ccr/data_test.go | 49 +++++++++ .../module/elasticsearch/elasticsearch.go | 9 ++ .../elasticsearch_integration_test.go | 57 +++++++++- metricbeat/module/elasticsearch/fields.go | 2 +- .../elasticsearch/test_elasticsearch.py | 25 +++++ metricbeat/module/kibana/kibana.go | 19 +--- 20 files changed, 645 insertions(+), 21 deletions(-) create mode 100644 metricbeat/docs/modules/elasticsearch/ccr.asciidoc create mode 100644 metricbeat/module/elasticsearch/ccr/_meta/data.json create mode 100644 metricbeat/module/elasticsearch/ccr/_meta/docs.asciidoc create mode 100644 metricbeat/module/elasticsearch/ccr/_meta/fields.yml create mode 100644 metricbeat/module/elasticsearch/ccr/_meta/test/ccr_stats.700.json create mode 100644 metricbeat/module/elasticsearch/ccr/_meta/test/empty.700.json create mode 100644 metricbeat/module/elasticsearch/ccr/ccr.go create mode 100644 metricbeat/module/elasticsearch/ccr/data.go create mode 100644 metricbeat/module/elasticsearch/ccr/data_test.go diff --git a/CHANGELOG.asciidoc b/CHANGELOG.asciidoc index 248c4a6eb5d..559f3bd06bb 100644 --- a/CHANGELOG.asciidoc +++ b/CHANGELOG.asciidoc @@ -130,6 +130,7 @@ https://github.com/elastic/beats/compare/v6.4.0...master[Check the HEAD diff] - Add support to renamed fields planned for redis 5.0. {pull}8167[8167] - Allow TCP helper to support delimiters and graphite module to accept multiple metrics in a single payload. {pull}8278[8278] - Added 'died' PID state to process_system metricset on system module{pull}8275[8275] +- Added `ccr` metricset to Elasticsearch module. {pull}8335[8335] *Packetbeat* diff --git a/metricbeat/docs/fields.asciidoc b/metricbeat/docs/fields.asciidoc index 8920a38f686..1b20c338988 100644 --- a/metricbeat/docs/fields.asciidoc +++ b/metricbeat/docs/fields.asciidoc @@ -3459,6 +3459,85 @@ type: keyword Elasticsearch state id. +-- + +[float] +== ccr fields + +Cross-cluster replication stats + + + + +*`elasticsearch.ccr.leader.index`*:: ++ +-- +type: keyword + +Name of leader index + + +-- + +*`elasticsearch.ccr.leader.max_seq_no`*:: ++ +-- +type: long + +Maximum sequence number of operation on the leader shard + + +-- + + +*`elasticsearch.ccr.follower.index`*:: ++ +-- +type: keyword + +Name of follower index + + +-- + +*`elasticsearch.ccr.follower.shard.number`*:: ++ +-- +type: long + +Number of the shard within the index + + +-- + +*`elasticsearch.ccr.follower.operations_indexed`*:: ++ +-- +type: long + +Number of operations indexed (replicated) into the follower shard from the leader shard + + +-- + +*`elasticsearch.ccr.follower.time_since_last_fetch.ms`*:: ++ +-- +type: long + +Time, in ms, since the follower last fetched from the leader + + +-- + +*`elasticsearch.ccr.follower.global_checkpoint`*:: ++ +-- +type: long + +Global checkpoint value on follower shard + + -- [float] diff --git a/metricbeat/docs/modules/elasticsearch.asciidoc b/metricbeat/docs/modules/elasticsearch.asciidoc index 807ae59f2aa..efc13983d75 100644 --- a/metricbeat/docs/modules/elasticsearch.asciidoc +++ b/metricbeat/docs/modules/elasticsearch.asciidoc @@ -50,6 +50,8 @@ This module supports TLS connection when using `ssl` config field, as described The following metricsets are available: +* <> + * <> * <> @@ -68,6 +70,8 @@ The following metricsets are available: * <> +include::elasticsearch/ccr.asciidoc[] + include::elasticsearch/cluster_stats.asciidoc[] include::elasticsearch/index.asciidoc[] diff --git a/metricbeat/docs/modules/elasticsearch/ccr.asciidoc b/metricbeat/docs/modules/elasticsearch/ccr.asciidoc new file mode 100644 index 00000000000..d362d4091e7 --- /dev/null +++ b/metricbeat/docs/modules/elasticsearch/ccr.asciidoc @@ -0,0 +1,23 @@ +//// +This file is generated! See scripts/docs_collector.py +//// + +[[metricbeat-metricset-elasticsearch-ccr]] +=== Elasticsearch ccr metricset + +beta[] + +include::../../../module/elasticsearch/ccr/_meta/docs.asciidoc[] + + +==== Fields + +For a description of each field in the metricset, see the +<> section. + +Here is an example document generated by this metricset: + +[source,json] +---- +include::../../../module/elasticsearch/ccr/_meta/data.json[] +---- diff --git a/metricbeat/docs/modules_list.asciidoc b/metricbeat/docs/modules_list.asciidoc index b9273300f5e..4507a5e340e 100644 --- a/metricbeat/docs/modules_list.asciidoc +++ b/metricbeat/docs/modules_list.asciidoc @@ -33,7 +33,8 @@ This file is generated! See scripts/docs_collector.py |<> beta[] |image:./images/icon-no.png[No prebuilt dashboards] | .1+| .1+| |<> beta[] |<> beta[] |image:./images/icon-no.png[No prebuilt dashboards] | -.9+| .9+| |<> beta[] +.10+| .10+| |<> beta[] +|<> beta[] |<> beta[] |<> beta[] |<> beta[] diff --git a/metricbeat/helper/elastic/elastic.go b/metricbeat/helper/elastic/elastic.go index 9562f8baf7f..2d37925e89e 100644 --- a/metricbeat/helper/elastic/elastic.go +++ b/metricbeat/helper/elastic/elastic.go @@ -21,6 +21,7 @@ import ( "fmt" "strings" + "github.com/elastic/beats/libbeat/common" "github.com/elastic/beats/metricbeat/mb" ) @@ -92,3 +93,18 @@ func ReportErrorForMissingField(field string, product Product, r mb.ReporterV2) func MakeErrorForMissingField(field string, product Product) error { return fmt.Errorf("Could not find field '%v' in %v stats API response", field, strings.Title(product.String())) } + +// IsFeatureAvailable returns whether a feature is available in the current product version +func IsFeatureAvailable(currentProductVersion, featureAvailableInProductVersion string) (bool, error) { + currentVersion, err := common.NewVersion(currentProductVersion) + if err != nil { + return false, err + } + + wantVersion, err := common.NewVersion(featureAvailableInProductVersion) + if err != nil { + return false, err + } + + return !currentVersion.LessThan(wantVersion), nil +} diff --git a/metricbeat/include/list.go b/metricbeat/include/list.go index b5032aae0e0..99d153a2fcf 100644 --- a/metricbeat/include/list.go +++ b/metricbeat/include/list.go @@ -54,6 +54,7 @@ import ( _ "github.com/elastic/beats/metricbeat/module/dropwizard" _ "github.com/elastic/beats/metricbeat/module/dropwizard/collector" _ "github.com/elastic/beats/metricbeat/module/elasticsearch" + _ "github.com/elastic/beats/metricbeat/module/elasticsearch/ccr" _ "github.com/elastic/beats/metricbeat/module/elasticsearch/cluster_stats" _ "github.com/elastic/beats/metricbeat/module/elasticsearch/index" _ "github.com/elastic/beats/metricbeat/module/elasticsearch/index_recovery" diff --git a/metricbeat/module/elasticsearch/ccr/_meta/data.json b/metricbeat/module/elasticsearch/ccr/_meta/data.json new file mode 100644 index 00000000000..993aa63a607 --- /dev/null +++ b/metricbeat/module/elasticsearch/ccr/_meta/data.json @@ -0,0 +1,39 @@ +{ + "@timestamp": "2017-10-12T08:05:34.853Z", + "beat": { + "hostname": "host.example.com", + "name": "host.example.com" + }, + "elasticsearch": { + "ccr": { + "follower": { + "global_checkpoint": 1, + "index": "my_follower", + "operations_indexed": 2, + "shard": { + "number": 0 + }, + "time_since_last_fetch": { + "ms": 4926 + } + }, + "leader": { + "index": "my_leader", + "max_seq_no": 1 + } + }, + "cluster": { + "id": "KSGkOjOuSg6whAgtpPyhQw", + "name": "elasticsearch" + } + }, + "metricset": { + "host": "127.0.0.1:9200", + "module": "elasticsearch", + "name": "ccr", + "rtt": 115 + }, + "service": { + "name": "elasticsearch" + } +} \ No newline at end of file diff --git a/metricbeat/module/elasticsearch/ccr/_meta/docs.asciidoc b/metricbeat/module/elasticsearch/ccr/_meta/docs.asciidoc new file mode 100644 index 00000000000..e38b8702e5f --- /dev/null +++ b/metricbeat/module/elasticsearch/ccr/_meta/docs.asciidoc @@ -0,0 +1,3 @@ +This is the `ccr` metricset of the Elasticsearch module. It interrogates the +Cross Cluster Replication Stats API endpoint to fetch information about shards +in the Elasticsearch cluster that are participating in cross-cluster replication. diff --git a/metricbeat/module/elasticsearch/ccr/_meta/fields.yml b/metricbeat/module/elasticsearch/ccr/_meta/fields.yml new file mode 100644 index 00000000000..a4119ec30b1 --- /dev/null +++ b/metricbeat/module/elasticsearch/ccr/_meta/fields.yml @@ -0,0 +1,40 @@ +- name: ccr + type: group + description: > + Cross-cluster replication stats + release: beta + fields: + - name: leader + type: group + fields: + - name: index + type: keyword + description: > + Name of leader index + - name: max_seq_no + type: long + description: > + Maximum sequence number of operation on the leader shard + - name: follower + type: group + fields: + - name: index + type: keyword + description: > + Name of follower index + - name: shard.number + type: long + description: > + Number of the shard within the index + - name: operations_indexed + type: long + description: > + Number of operations indexed (replicated) into the follower shard from the leader shard + - name: time_since_last_fetch.ms + type: long + description: > + Time, in ms, since the follower last fetched from the leader + - name: global_checkpoint + type: long + description: > + Global checkpoint value on follower shard diff --git a/metricbeat/module/elasticsearch/ccr/_meta/test/ccr_stats.700.json b/metricbeat/module/elasticsearch/ccr/_meta/test/ccr_stats.700.json new file mode 100644 index 00000000000..1905f466d8f --- /dev/null +++ b/metricbeat/module/elasticsearch/ccr/_meta/test/ccr_stats.700.json @@ -0,0 +1,98 @@ +{ + "my_follower": [ + { + "leader_index": "my_leader", + "follower_index": "my_follower", + "shard_id": 0, + "leader_global_checkpoint": 1, + "leader_max_seq_no": 1, + "follower_global_checkpoint": 1, + "follower_max_seq_no": 1, + "last_requested_seq_no": 1, + "number_of_concurrent_reads": 1, + "number_of_concurrent_writes": 0, + "number_of_queued_writes": 0, + "mapping_version": 2, + "total_fetch_time_millis": 21, + "number_of_successful_fetches": 142, + "number_of_failed_fetches": 6743, + "operations_received": 2, + "total_transferred_bytes": 166, + "total_index_time_millis": 48, + "number_of_successful_bulk_operations": 2, + "number_of_failed_bulk_operations": 0, + "number_of_operations_indexed": 2, + "fetch_exceptions": [ + { + "from_seq_no": 2, + "retries": 6743, + "exception": { + "type": "exception", + "reason": "NoShardAvailableActionException[No shard available for [Request{fromSeqNo=2, maxOperationCount=1024, shardId=[leader][0], expectedHistoryUUID=ki9Do9c-QQKVydB-6Txkdg, maxOperationSizeInBytes=9223372036854775807}]]; nested: RemoteTransportException[[d0XH9XU][127.0.0.1:9300][indices:data/read/xpack/ccr/shard_changes[s]]]; nested: IndexNotFoundException[no such index];", + "caused_by": { + "type": "no_shard_available_action_exception", + "reason": "No shard available for [Request{fromSeqNo=2, maxOperationCount=1024, shardId=[leader][0], expectedHistoryUUID=ki9Do9c-QQKVydB-6Txkdg, maxOperationSizeInBytes=9223372036854775807}]", + "caused_by": { + "type": "index_not_found_exception", + "reason": "no such index", + "index_uuid": "RH8_j_w0Q0GGmpY0HMVQ_A", + "index": "my_leader" + } + } + } + } + ], + "time_since_last_fetch_millis": 470 + }, + { + "leader_index": "my_leader", + "follower_index": "my_follower", + "shard_id": 1, + "leader_global_checkpoint": -1, + "leader_max_seq_no": -1, + "follower_global_checkpoint": -1, + "follower_max_seq_no": -1, + "last_requested_seq_no": -1, + "number_of_concurrent_reads": 0, + "number_of_concurrent_writes": 0, + "number_of_queued_writes": 0, + "mapping_version": 2, + "total_fetch_time_millis": 0, + "number_of_successful_fetches": 336, + "number_of_failed_fetches": 0, + "operations_received": 0, + "total_transferred_bytes": 0, + "total_index_time_millis": 0, + "number_of_successful_bulk_operations": 0, + "number_of_failed_bulk_operations": 0, + "number_of_operations_indexed": 0, + "fetch_exceptions": [], + "time_since_last_fetch_millis": 4323 + }, + { + "leader_index": "my_leader", + "follower_index": "my_follower", + "shard_id": 2, + "leader_global_checkpoint": 1, + "leader_max_seq_no": 1, + "follower_global_checkpoint": 1, + "follower_max_seq_no": 1, + "last_requested_seq_no": 1, + "number_of_concurrent_reads": 0, + "number_of_concurrent_writes": 0, + "number_of_queued_writes": 0, + "mapping_version": 2, + "total_fetch_time_millis": 0, + "number_of_successful_fetches": 372, + "number_of_failed_fetches": 0, + "operations_received": 2, + "total_transferred_bytes": 166, + "total_index_time_millis": 32, + "number_of_successful_bulk_operations": 2, + "number_of_failed_bulk_operations": 0, + "number_of_operations_indexed": 2, + "fetch_exceptions": [], + "time_since_last_fetch_millis": 4323 + } + ] +} diff --git a/metricbeat/module/elasticsearch/ccr/_meta/test/empty.700.json b/metricbeat/module/elasticsearch/ccr/_meta/test/empty.700.json new file mode 100644 index 00000000000..0967ef424bc --- /dev/null +++ b/metricbeat/module/elasticsearch/ccr/_meta/test/empty.700.json @@ -0,0 +1 @@ +{} diff --git a/metricbeat/module/elasticsearch/ccr/ccr.go b/metricbeat/module/elasticsearch/ccr/ccr.go new file mode 100644 index 00000000000..5a551029054 --- /dev/null +++ b/metricbeat/module/elasticsearch/ccr/ccr.go @@ -0,0 +1,100 @@ +// Licensed to Elasticsearch B.V. under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Elasticsearch B.V. licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package ccr + +import ( + "fmt" + + "github.com/elastic/beats/libbeat/common/cfgwarn" + "github.com/elastic/beats/libbeat/logp" + "github.com/elastic/beats/metricbeat/mb" + "github.com/elastic/beats/metricbeat/module/elasticsearch" +) + +func init() { + mb.Registry.MustAddMetricSet("elasticsearch", "ccr", New, + mb.WithHostParser(elasticsearch.HostParser), + ) +} + +const ( + ccrStatsPath = "/_ccr/stats" +) + +// MetricSet type defines all fields of the MetricSet +type MetricSet struct { + *elasticsearch.MetricSet +} + +// New create a new instance of the MetricSet +func New(base mb.BaseMetricSet) (mb.MetricSet, error) { + cfgwarn.Beta("The elasticsearch ccr metricset is beta") + + ms, err := elasticsearch.NewMetricSet(base, ccrStatsPath) + if err != nil { + return nil, err + } + return &MetricSet{MetricSet: ms}, nil +} + +// Fetch gathers stats for each follower shard from the _ccr/stats API +func (m *MetricSet) Fetch(r mb.ReporterV2) { + isMaster, err := elasticsearch.IsMaster(m.HTTP, m.HostData().SanitizedURI+ccrStatsPath) + if err != nil { + r.Error(err) + return + } + + // Not master, no event sent + if !isMaster { + logp.Debug("elasticsearch", "Trying to fetch ccr stats from a non master node.") + return + } + + info, err := elasticsearch.GetInfo(m.HTTP, m.HostData().SanitizedURI) + if err != nil { + r.Error(err) + return + } + + elasticsearchVersion := info.Version.Number + isCCRStatsAPIAvailable, err := elasticsearch.IsCCRStatsAPIAvailable(elasticsearchVersion) + if err != nil { + r.Error(err) + return + } + + if !isCCRStatsAPIAvailable { + const errorMsg = "the elasticsearch ccr metricset is only supported with Elasticsearch >= %v. " + + "You are currently running Elasticsearch %v" + r.Error(fmt.Errorf(errorMsg, elasticsearch.CCRStatsAPIAvailableVersion, elasticsearchVersion)) + return + } + + content, err := m.HTTP.FetchContent() + if err != nil { + r.Error(err) + return + } + + err = eventsMapping(r, *info, content) + if err != nil { + r.Error(err) + return + } +} diff --git a/metricbeat/module/elasticsearch/ccr/data.go b/metricbeat/module/elasticsearch/ccr/data.go new file mode 100644 index 00000000000..3581e248816 --- /dev/null +++ b/metricbeat/module/elasticsearch/ccr/data.go @@ -0,0 +1,97 @@ +// Licensed to Elasticsearch B.V. under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Elasticsearch B.V. licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package ccr + +import ( + "encoding/json" + "fmt" + + "github.com/joeshaw/multierror" + + "github.com/elastic/beats/libbeat/common" + s "github.com/elastic/beats/libbeat/common/schema" + c "github.com/elastic/beats/libbeat/common/schema/mapstriface" + "github.com/elastic/beats/metricbeat/mb" + "github.com/elastic/beats/metricbeat/module/elasticsearch" +) + +var ( + schema = s.Schema{ + "leader": s.Object{ + "index": c.Str("leader_index"), + "max_seq_no": c.Int("leader_max_seq_no"), + }, + "follower": s.Object{ + "index": c.Str("follower_index"), + "shard": s.Object{ + "number": c.Int("shard_id"), + }, + "operations_indexed": c.Int("number_of_operations_indexed"), + "time_since_last_fetch": s.Object{ + "ms": c.Int("time_since_last_fetch_millis"), + }, + "global_checkpoint": c.Int("follower_global_checkpoint"), + }, + } +) + +func eventsMapping(r mb.ReporterV2, info elasticsearch.Info, content []byte) error { + var data map[string]interface{} + err := json.Unmarshal(content, &data) + if err != nil { + r.Error(err) + return err + } + + var errors multierror.Errors + for _, followerShards := range data { + + shards, ok := followerShards.([]interface{}) + if !ok { + err := fmt.Errorf("shards is not an array") + errors = append(errors, err) + continue + } + + for _, s := range shards { + shard, ok := s.(map[string]interface{}) + if !ok { + err := fmt.Errorf("shard is not an object") + errors = append(errors, err) + continue + } + event := mb.Event{} + event.MetricSetFields, err = schema.Apply(shard) + if err != nil { + errors = append(errors, err) + continue + } + + event.RootFields = common.MapStr{} + event.RootFields.Put("service.name", "elasticsearch") + + event.ModuleFields = common.MapStr{} + event.ModuleFields.Put("cluster.name", info.ClusterName) + event.ModuleFields.Put("cluster.id", info.ClusterID) + + r.Event(event) + } + } + + return errors.Err() +} diff --git a/metricbeat/module/elasticsearch/ccr/data_test.go b/metricbeat/module/elasticsearch/ccr/data_test.go new file mode 100644 index 00000000000..297662137e5 --- /dev/null +++ b/metricbeat/module/elasticsearch/ccr/data_test.go @@ -0,0 +1,49 @@ +// Licensed to Elasticsearch B.V. under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Elasticsearch B.V. licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +// +build !integration + +package ccr + +import ( + "io/ioutil" + "testing" + + "github.com/stretchr/testify/assert" + + mbtest "github.com/elastic/beats/metricbeat/mb/testing" + "github.com/elastic/beats/metricbeat/module/elasticsearch" +) + +var info = elasticsearch.Info{ + ClusterID: "1234", + ClusterName: "helloworld", +} + +func TestMapper(t *testing.T) { + elasticsearch.TestMapperWithInfo(t, "./_meta/test/ccr_stats.*.json", eventsMapping) +} + +func TestEmpty(t *testing.T) { + input, err := ioutil.ReadFile("./_meta/test/empty.700.json") + assert.NoError(t, err) + + reporter := &mbtest.CapturingReporterV2{} + eventsMapping(reporter, info, input) + assert.Equal(t, 0, len(reporter.GetErrors())) + assert.Equal(t, 0, len(reporter.GetEvents())) +} diff --git a/metricbeat/module/elasticsearch/elasticsearch.go b/metricbeat/module/elasticsearch/elasticsearch.go index f460418b8ae..aebdf8477e1 100644 --- a/metricbeat/module/elasticsearch/elasticsearch.go +++ b/metricbeat/module/elasticsearch/elasticsearch.go @@ -30,6 +30,9 @@ import ( "github.com/elastic/beats/metricbeat/helper/elastic" ) +// CCRStatsAPIAvailableVersion is the version of Elasticsearch since when the CCR stats API is available +const CCRStatsAPIAvailableVersion = "6.5.0" + // Global clusterIdCache. Assumption is that the same node id never can belong to a different cluster id var clusterIDCache = map[string]string{} @@ -246,6 +249,12 @@ func PassThruField(fieldPath string, sourceData, targetData common.MapStr) error return nil } +// IsCCRStatsAPIAvailable returns whether the CCR stats API is available in the given version +// of Elasticsearch +func IsCCRStatsAPIAvailable(currentElasticsearchVersion string) (bool, error) { + return elastic.IsFeatureAvailable(currentElasticsearchVersion, CCRStatsAPIAvailableVersion) +} + // Global cache for license information. Assumption is that license information changes infrequently var licenseCache = &_licenseCache{} diff --git a/metricbeat/module/elasticsearch/elasticsearch_integration_test.go b/metricbeat/module/elasticsearch/elasticsearch_integration_test.go index 5ceff6d7187..fca4ecd0910 100644 --- a/metricbeat/module/elasticsearch/elasticsearch_integration_test.go +++ b/metricbeat/module/elasticsearch/elasticsearch_integration_test.go @@ -20,6 +20,8 @@ package elasticsearch_test import ( + "bytes" + "encoding/json" "fmt" "io/ioutil" "net" @@ -29,11 +31,11 @@ import ( "github.com/stretchr/testify/assert" + "github.com/elastic/beats/libbeat/common" "github.com/elastic/beats/libbeat/tests/compose" mbtest "github.com/elastic/beats/metricbeat/mb/testing" - - "bytes" - + "github.com/elastic/beats/metricbeat/module/elasticsearch" + _ "github.com/elastic/beats/metricbeat/module/elasticsearch/ccr" _ "github.com/elastic/beats/metricbeat/module/elasticsearch/cluster_stats" _ "github.com/elastic/beats/metricbeat/module/elasticsearch/index" _ "github.com/elastic/beats/metricbeat/module/elasticsearch/index_recovery" @@ -45,6 +47,7 @@ import ( ) var metricSets = []string{ + "ccr", "cluster_stats", "index", "index_recovery", @@ -69,6 +72,7 @@ func TestFetch(t *testing.T) { assert.NoError(t, err) for _, metricSet := range metricSets { + checkSkip(t, metricSet, host) t.Run(metricSet, func(t *testing.T) { f := mbtest.NewReportingMetricSetV2(t, getConfig(metricSet)) events, errs := mbtest.ReportingFetchV2(f) @@ -86,7 +90,9 @@ func TestFetch(t *testing.T) { func TestData(t *testing.T) { compose.EnsureUp(t, "elasticsearch") + host := net.JoinHostPort(getEnvHost(), getEnvPort()) for _, metricSet := range metricSets { + checkSkip(t, metricSet, host) t.Run(metricSet, func(t *testing.T) { f := mbtest.NewReportingMetricSetV2(t, getConfig(metricSet)) err := mbtest.WriteEventsReporterV2(f, t, metricSet) @@ -225,3 +231,48 @@ func checkExists(url string) bool { } return false } + +func checkSkip(t *testing.T, metricset string, host string) { + if metricset != "ccr" { + return + } + + version, err := getElasticsearchVersion(host) + if err != nil { + t.Fatal("getting elasticsearch version", err) + } + + isCCRStatsAPIAvailable, err := elasticsearch.IsCCRStatsAPIAvailable(version) + if err != nil { + t.Fatal("checking if elasticsearch CCR stats API is available", err) + } + + if !isCCRStatsAPIAvailable { + t.Skip("elasticsearch CCR stats API is not available until " + elasticsearch.CCRStatsAPIAvailableVersion) + } +} + +func getElasticsearchVersion(elasticsearchHostPort string) (string, error) { + resp, err := http.Get("http://" + elasticsearchHostPort + "/") + if err != nil { + return "", err + } + defer resp.Body.Close() + + body, err := ioutil.ReadAll(resp.Body) + if err != nil { + return "", err + } + + var data common.MapStr + err = json.Unmarshal(body, &data) + if err != nil { + return "", err + } + + version, err := data.GetValue("version.number") + if err != nil { + return "", err + } + return version.(string), nil +} diff --git a/metricbeat/module/elasticsearch/fields.go b/metricbeat/module/elasticsearch/fields.go index 97b59175742..8fac5dd05a0 100644 --- a/metricbeat/module/elasticsearch/fields.go +++ b/metricbeat/module/elasticsearch/fields.go @@ -31,5 +31,5 @@ func init() { // Asset returns asset data func Asset() string { - return "eJzsW9tu4zYQffdXDPLUAht9gB/6sui2KbDpopstUBSFlxbHNhNetCTlxP36gpTk6EJdIimxF7XfItnnnBnOkDMkcw0PeFgCcmIsiw0SHe8WAJZZjku4+rn8/GoBQNHEmiWWKbmEnxYAAJXvgFA05bgA0MiRGFzCGi1ZABi0lsmtWcLfV8bwq3/cs53SdhUruWHbJWwIN+6XG4acmqUHvwZJBDYFuo89JLiErVZpkj8JqKvClSFjnhqLOnJ/HV8WqA94eFSalp4HsbNP1QM5rmeJFq20jL4GKaMdlMYSizMTe0xP28VqGpTlYeshfJ/bVgeqR1jxqQ94WZTDSE3lVbsTenTVtaUGfthqRPkODsi5enwHGumPUVCIVBTDOuqeGaDi1oF5DcwPTVT7RsgjpYFSqbQNzEwMV3LbeNWjBuBOWcJBpmKNGtQmMxaYPMZEixJB3NtZpdweRWTg18jZlq05DhZFSSW65pTkoHt0FCqYpCyeL2JuMrgzjpnc4AEDZHZE07pnunwzQM1nh9nhnTb/9Hmo00cDdDX9lFnf4aZnSYlmgmjWiKLXkJVxHfrlFeK8P11CRAKF0odofbABpVNC7KMHhtQghY3SJcrG4sUkxacpi1YdYMxiVatMYNpSdeMU5VVJiM26ERw8v/SFP1Wxid4qB6iKU4HS+jizO8y8354IXhxFjhbrTnwVeRnTi2UaqzRGhv2LLcnQI3WjtCB2CW0/HmyKk+AMOWp2BnjUDvG49ba+WRBkwo60/cI655m3cu2zflGandYH7+tCasDd4Rkr0hirPerDqacuVs+rVm/2eCpbiAuzKo1Gmc/BzzdV1kgdUpg2X+WCzGulOBL5MuY7nSKw2vIZ5jaWbGe0+Y/CWo8bCLQytyV6izZqGeVR/Hce0lfD7aOc0e6UqU8oGTF7YRVc5nSgQCjVaEwn+7yrcllCdW0OD7pKdYyzOv6zh+x2fE47m+PLnP2Oz9nndXxZQtXx1RnVpKKW4ieZUNtq90uFNou8S4V2qdB69b+sQoNLb3XJ3EvmfoeZe9yO5tG9Wk9Z+QV/nT5qVMXzRbJvKYLgcK/W7aWeJXbGMus3tc4gw2yUWLLyUWyiRKsYjUG6cp2XpqtQdI/tIz8V4NmmO+6bgRzSxOSecEZXlFicVc/dDkvxmRls4JHZHSCzO9RAQDBjmNw6QZhFCSj33P9td8RCrFJOQSoLa4SEaIM00EA0AtsVvVPCuvb70+9t3jar+DLZHrVhqt6KT+XLUcOU93sxeL3vS6E/P8KN3KhhJzR9VvdZPkBQISrogLKCfBXYIUkiJpk92XrwK5IEnILKEuBs6F9sy0YI8nRaGwR5Gm+CVPL0Q3Gr5PUMw1HYcsoROZoyfFSe23m/HEWCq/iB8HBvMGoH8WZTgIPDRuqU5E4LLgTTL2g4lNUctzNmPuT2szQrnXTXkc+q0cInZqxf7otW5nybrDNqTb63pqTo93p2DmDGXnVUL+pJe2ax+71wTo0SpfhsWesmz7xfc7ijElfx9rQI3wgZ5KXfOW1Ka45buz6oXHZqX7mgb0hhQEjWjWr5CsBH8tQWgmXBCZKHM1H8CcnDUMmr83G0ly2GedvVE2ci+0tW2nTOUQeVBoVMzrm/HPAl685B8SXrzi3rTKr3bK+aV2ZnSLzPOfYl985B8SX3Tp17rRXwNo5ixTnGVunZquBf3sMRNJx1A2rgQlfXLuDEcviZAbbx2ImhrVeEIYM+QOhxAhodWT0klSrolb2eFUT/b78Hc3FjApd1YEoSfmAcwRyMRQFh6L4k9Af/J9lxOHpF42mOuAsBZE8YJ2v+tirq/4WWoKRMbleWmIdFnfoFe51fQ4BfIVbSEiYNEMhfgHtRRion6bjNUYParpSmjX+RGnsIeeMhoQlZunWmNLPhhBpzCBuAq97yCzJNuOAXwQelAZ+ISLgzKLXXgiQJq0k/5isTuGJy9S3FFKPGvDX6tJcJv5fmYRsx6i/5TglKD5AHz8Qoe7UbzXbHDDDj9xYH3G7ONnjncn/lsN0r6b5YPecNiDu/mUosDuHWyFVMrJtY/NnMzJeNd/mFV7/B64PmkZiCFClstBLR4r8AAAD//+uzAwg=" + return "eJzsW11v2zYXvvevOMhVCyT6Ab54b4q3XQY0K9Z0wDAMKi0d20z4oZCUG+/XD6Qkm5IoS7bl2MXsu1jOeZ5zeD5J6g6ecT0FZEQbmmgkKllOAAw1DKdw83//+5sJQIo6UTQzVIop/G8CAFD7DXCZ5gwnAAoZEo1TmKEhEwCNxlCx0FP460ZrdvO3/W4plYkTKeZ0MYU5Ydr+55wiS/XUCb8DQTi2CdqPWWc4hYWSeVZ+E2BXF+eLTFiuDarI/rV5WEl9xvUPqVLv+6Ds4lO3QCnXoUSTTliangKUpjsgtSEGRwZ2Mh1sCzVRLSB/sXpgPiip9V2ll8KM0YTYHzpI7f226WnVp7nwPjmGJEVVe9TFsUuUL46KFF9bT7vNO0D/6vNAOIKcl4w7kCoenLzGGl9iITvJMCkWhzH5TF4pzzlofMlRJAgi5zNUlpzMUBWLIwWYJVZs9ZI09K6YziVj8sfPtQQV555FcEpHhXFOsAwPG6tbQzsw+EHNkhaW381ts1A6dj/EkEnGY7iFgxIO3lWhjOl7oMJIx3pj2kKfuZJ8tx/5ShnKMdZUJBjb3BTP0STLiOsTqPZIOd4CFcD1LTjIOn2LDw4fW1p08l8wOSMsTpaYPGeSCnMC4p8cBmwxYEVYjjZe67bfWT30URm9zOVj5G8rI28u8K5Q7zGRzy3X8G6hEMUtrNGa5hYUpu+jIBEhUwzzCCWxHhYPVpjjQF2JjQblwM1CyTzgOjscp9dtHqUhzEvzTlnr/ZVPdDDhxD4dlco2pxTC75DRBZ0xHEwqJTXvGpOSFd3DwytTNBnPY+4LcRfsM6XCAxbIpZ920u62zQA2X1056bZOf18RttAIlaRhp0L7HWbaUsoU5UTRlhedglaBte6nt2nsrD1tQEQcuVTraLY2AabHuNhnJxhybWusVB5kq3g1u6G9i1ZTwCHFqjFhwnGl6t4yKqfLEJqxKzhaW53KREdvFQOpTHKOwjg/2zSz3YHgyKXI0Jykj23RK5D2pqmNVBhp+g92BEMP1blUnJgpdP3zYFUshWpwcJytAk7qDvK4cLq+mRMUxDaw/cR25pm3Mu2WP/ey02xdDGkl1YC5wxkrUpjIFar1uVMXbcZVpzV7LFUU4kqt2oaRj2fFj5cqG6BWUhi2rHJB5JmUDInYD/lR5Qi0UT7D2NqQxYg6/15p6+QGHM3HNkQt0EQdq3wQ/qMT6brh7lUuYJdSNxNKAUz37IJ9TCsUSJoq1Hon+rhV2adQr83hRZe5SnBUw391IncbvoQdzfA+Zr/hS/RxDe9TqBu+nlF1zhshfpaE2tW7Xzu0UehdO7Rrh9bLf78ODa6z1TVyr5H7E0buZjuaRU9ydkzl5+w0c9RBHc83QV9yBM7gSc66Wz1DzIht1q9yVogMo6XEkNh5sY4yJRPUGtPYTl4qjUPefegc+aUSXmy646rtyCFOVKwIo2mcEoOj8nlc+ufghcLanccCUrNEBQQ41ZqKhSWEhZeAtN+7v82SGEhkzlIQ0sAMISNKYxoYIFqObZveY9y68f/n39t8aHfxPtgKlaayOYofi1dKDUM+rfjget8XQn98hnsxl8NOaPq07tN8AKGKVNAAPoOyCiyRZBEV1JytHvyCJAPLoFYCrA79xdZXgpPX8+rAyevhKggpzr8UD1LcjbAclS7nXJGNKsNXZTvOu3IUcSaTZ8LCs8FBO4j380o4WNmYulsnhdGCheD4CxpWSjzG7YyRD7ldlqbeSXdT8kUNWvhKtXHlvhplLnfIuqDR5GcbSqp5r2fnAEacVQ+aRR1oTxZ7WnFr1CiTko0WtTZ5lvOalXtQ4ErWHRbhGyGDrPQbS9vU2uvWzQ/qd247lxP6lhQGuGRTqY6fuAu6XS7oE86QPF8I4y9InodSji/H0I42H2Zt209cCO1vRWuzM0etZR4kcnTM/WkFX6PuEhhfo+7Sok7nakVXsvt1gSMC72sp+xp7l8D4Gnvnjr3ODniRRIlkDBMj1Whd8KcPsBEajroBPXDFa9cu4JHt8BYBFsmhiaFrVoQhiz6A6CYBHexZPSC1LujEVi8aov+23YOxONeByzpwTBB+pAxBr7VBDmHRfUHoDv7PsuOwsYrC8xxxVwTIilBGZuxtWTTfQstQpFQsYkP086QJvcde5/eQwO+QSGEIFRoIlA/APvAl+UF62OaoRmViqbpeQN7/EPLeiYS2SO/WmVTUhAPqkEPYgLj6Lb8g0hEX/CL4KBXgK+EZswrl5o6TLKMN6rVXQqmIX3LMsf0q6MGnvZS7vTQntuWjzbdU93bK8uVX5zxHetnJbjSbJdVAtdtbHHC7Ofga9DiH7Y7J7ovVY96AeHSbqcTgEGyFTCbE2MTizmZGvmy8LC+8em+AE12Blm8eR5N/AwAA//9DInx2" } diff --git a/metricbeat/module/elasticsearch/test_elasticsearch.py b/metricbeat/module/elasticsearch/test_elasticsearch.py index 90906ac641d..20cf9d7452f 100644 --- a/metricbeat/module/elasticsearch/test_elasticsearch.py +++ b/metricbeat/module/elasticsearch/test_elasticsearch.py @@ -4,7 +4,10 @@ import unittest from elasticsearch import Elasticsearch, TransportError from parameterized import parameterized +from nose.plugins.skip import SkipTest +import urllib2 import json +import semver sys.path.append(os.path.join(os.path.dirname(__file__), '../../tests/system')) @@ -17,6 +20,7 @@ class Test(metricbeat.BaseTest): FIELDS = ["elasticsearch"] @parameterized.expand([ + "ccr", "index", "index_summary", "ml_job", @@ -30,6 +34,8 @@ def test_metricsets(self, metricset): """ elasticsearch metricset tests """ + self.check_skip(metricset) + if metricset == "ml_job": self.create_ml_job() @@ -65,3 +71,22 @@ def create_ml_job(self): path = "/_xpack/ml/anomaly_detectors/test" es.transport.perform_request('PUT', path, body=body) + + def check_skip(self, metricset): + if metricset != "ccr": + return + + version = self.get_version() + if semver.compare(version, "6.5.0") == -1: + # Skip CCR metricset system test for Elasticsearch versions < 6.5.0 as CCR Stats + # API endpoint is not available + raise SkipTest("elasticsearch/ccr metricset system test only valid with Elasticsearch versions >= 6.5.0") + + def get_version(self): + host = self.get_hosts()[0] + res = urllib2.urlopen("http://" + host + "/").read() + + body = json.loads(res) + version = body["version"]["number"] + + return version diff --git a/metricbeat/module/kibana/kibana.go b/metricbeat/module/kibana/kibana.go index 34075259e3e..8058b78e041 100644 --- a/metricbeat/module/kibana/kibana.go +++ b/metricbeat/module/kibana/kibana.go @@ -26,6 +26,7 @@ import ( "github.com/elastic/beats/libbeat/common" "github.com/elastic/beats/metricbeat/helper" + "github.com/elastic/beats/metricbeat/helper/elastic" "github.com/elastic/beats/metricbeat/mb" ) @@ -72,28 +73,14 @@ func GetVersion(http *helper.HTTP, currentPath string) (string, error) { return versionStr, nil } -func isKibanaAPIAvailable(currentKibanaVersion, apiAvailableInKibanaVersion string) (bool, error) { - currentVersion, err := common.NewVersion(currentKibanaVersion) - if err != nil { - return false, err - } - - wantVersion, err := common.NewVersion(apiAvailableInKibanaVersion) - if err != nil { - return false, err - } - - return !currentVersion.LessThan(wantVersion), nil -} - // IsStatsAPIAvailable returns whether the stats API is available in the given version of Kibana func IsStatsAPIAvailable(currentKibanaVersion string) (bool, error) { - return isKibanaAPIAvailable(currentKibanaVersion, StatsAPIAvailableVersion) + return elastic.IsFeatureAvailable(currentKibanaVersion, StatsAPIAvailableVersion) } // IsSettingsAPIAvailable returns whether the settings API is available in the given version of Kibana func IsSettingsAPIAvailable(currentKibanaVersion string) (bool, error) { - return isKibanaAPIAvailable(currentKibanaVersion, SettingsAPIAvailableVersion) + return elastic.IsFeatureAvailable(currentKibanaVersion, SettingsAPIAvailableVersion) } func fetchPath(http *helper.HTTP, currentPath, newPath string) ([]byte, error) {