From 22e4022df0d27bf9361b6fe61c91346595fd7aab Mon Sep 17 00:00:00 2001 From: Shaunak Kashyap Date: Mon, 1 Oct 2018 03:22:15 -0700 Subject: [PATCH] Various minor fixes to elasticsearch/node_stats metricset x-pack code (#8474) * Making load stats optional because they may not have been collected yet * Making cgroups stats optional as ES may not be in a containerized environment * Reusing schema * Fixing license field * Fixing reset URI --- .../elasticsearch/cluster_stats/data_xpack.go | 4 +- .../module/elasticsearch/elasticsearch.go | 12 +++- .../elasticsearch/node_stats/data_xpack.go | 60 ++++++------------- 3 files changed, 32 insertions(+), 44 deletions(-) diff --git a/metricbeat/module/elasticsearch/cluster_stats/data_xpack.go b/metricbeat/module/elasticsearch/cluster_stats/data_xpack.go index 7ab5c1459421..7727f1280602 100644 --- a/metricbeat/module/elasticsearch/cluster_stats/data_xpack.go +++ b/metricbeat/module/elasticsearch/cluster_stats/data_xpack.go @@ -34,9 +34,9 @@ import ( func clusterNeedsTLSEnabled(license, stackStats common.MapStr) (bool, error) { // TLS does not need to be enabled if license type is something other than trial - value, err := license.GetValue("license.type") + value, err := license.GetValue("type") if err != nil { - return false, elastic.MakeErrorForMissingField("license.type", elastic.Elasticsearch) + return false, elastic.MakeErrorForMissingField("type", elastic.Elasticsearch) } licenseType, ok := value.(string) diff --git a/metricbeat/module/elasticsearch/elasticsearch.go b/metricbeat/module/elasticsearch/elasticsearch.go index aebdf8477e14..468cf294a95b 100644 --- a/metricbeat/module/elasticsearch/elasticsearch.go +++ b/metricbeat/module/elasticsearch/elasticsearch.go @@ -196,11 +196,21 @@ func GetLicense(http *helper.HTTP, resetURI string) (common.MapStr, error) { return nil, err } - err = json.Unmarshal(content, &license) + var data common.MapStr + err = json.Unmarshal(content, &data) if err != nil { return nil, err } + l, err := data.GetValue("license") + if err != nil { + return nil, err + } + license, ok := l.(map[string]interface{}) + if !ok { + return nil, elastic.MakeErrorForMissingField("license", elastic.Elasticsearch) + } + // Cache license for a minute licenseCache.set(license, time.Minute) } diff --git a/metricbeat/module/elasticsearch/node_stats/data_xpack.go b/metricbeat/module/elasticsearch/node_stats/data_xpack.go index ebfd77c5c45d..d912da1c4de7 100644 --- a/metricbeat/module/elasticsearch/node_stats/data_xpack.go +++ b/metricbeat/module/elasticsearch/node_stats/data_xpack.go @@ -89,9 +89,9 @@ var ( "os": c.Dict("os", s.Schema{ "cpu": c.Dict("cpu", s.Schema{ "load_average": c.Dict("load_average", s.Schema{ - "1m": c.Float("1m"), - "5m": c.Float("5m"), - "15m": c.Float("15m"), + "1m": c.Float("1m", s.Optional), + "5m": c.Float("5m", s.Optional), + "15m": c.Float("15m", s.Optional), }), }), "cgroup": c.Dict("cgroup", s.Schema{ @@ -115,7 +115,7 @@ var ( "limit_in_bytes": c.Str("limit_in_bytes"), "usage_in_bytes": c.Str("usage_in_bytes"), }), - }), + }, c.DictOptional), }), "process": c.Dict("process", s.Schema{ "open_file_descriptors": c.Int("open_file_descriptors"), @@ -144,41 +144,13 @@ var ( }), }), "thread_pool": c.Dict("thread_pool", s.Schema{ - "bulk": c.Dict("bulk", s.Schema{ - "threads": c.Int("threads"), - "queue": c.Int("queue"), - "rejected": c.Int("rejected"), - }), - "generic": c.Dict("generic", s.Schema{ - "threads": c.Int("threads"), - "queue": c.Int("queue"), - "rejected": c.Int("rejected"), - }), - "get": c.Dict("get", s.Schema{ - "threads": c.Int("threads"), - "queue": c.Int("queue"), - "rejected": c.Int("rejected"), - }), - "index": c.Dict("index", s.Schema{ - "threads": c.Int("threads"), - "queue": c.Int("queue"), - "rejected": c.Int("rejected"), - }), - "management": c.Dict("management", s.Schema{ - "threads": c.Int("threads"), - "queue": c.Int("queue"), - "rejected": c.Int("rejected"), - }), - "search": c.Dict("search", s.Schema{ - "threads": c.Int("threads"), - "queue": c.Int("queue"), - "rejected": c.Int("rejected"), - }), - "watcher": c.Dict("watcher", s.Schema{ - "threads": c.Int("threads"), - "queue": c.Int("queue"), - "rejected": c.Int("rejected"), - }), + "analyze": c.Dict("analyze", threadPoolStatsSchema), + "write": c.Dict("write", threadPoolStatsSchema), + "generic": c.Dict("generic", threadPoolStatsSchema), + "get": c.Dict("get", threadPoolStatsSchema), + "management": c.Dict("management", threadPoolStatsSchema), + "search": c.Dict("search", threadPoolStatsSchema), + "watcher": c.Dict("watcher", threadPoolStatsSchema), }), "fs": c.Dict("fs", s.Schema{ "summary": c.Dict("total", s.Schema{ @@ -188,6 +160,12 @@ var ( }), }), } + + threadPoolStatsSchema = s.Schema{ + "threads": c.Int("threads"), + "queue": c.Int("queue"), + "rejected": c.Int("rejected"), + } ) func eventsMappingXPack(r mb.ReporterV2, m *MetricSet, content []byte) { @@ -204,13 +182,13 @@ func eventsMappingXPack(r mb.ReporterV2, m *MetricSet, content []byte) { // master node will not be accurate anymore as often in these cases a proxy is in front // of ES and it's not know if the request will be routed to the same node as before. for nodeID, node := range nodesStruct.Nodes { - clusterID, err := elasticsearch.GetClusterID(m.HTTP, m.HostData().SanitizedURI, nodeID) + clusterID, err := elasticsearch.GetClusterID(m.HTTP, m.HTTP.GetURI(), nodeID) if err != nil { logp.Err("could not fetch cluster id: %s", err) continue } - isMaster, _ := elasticsearch.IsMaster(m.HTTP, m.HostData().SanitizedURI) + isMaster, _ := elasticsearch.IsMaster(m.HTTP, m.HTTP.GetURI()) event := mb.Event{} // Build source_node object