Skip to content

Commit

Permalink
Various minor fixes to elasticsearch/node_stats metricset x-pack code (
Browse files Browse the repository at this point in the history
…#8474)

* Making load stats optional because they may not have been collected yet

* Making cgroups stats optional as ES may not be in a containerized environment

* Reusing schema

* Fixing license field

* Fixing reset URI
  • Loading branch information
ycombinator authored Oct 1, 2018
1 parent 184793f commit 22e4022
Show file tree
Hide file tree
Showing 3 changed files with 32 additions and 44 deletions.
4 changes: 2 additions & 2 deletions metricbeat/module/elasticsearch/cluster_stats/data_xpack.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,9 +34,9 @@ import (

func clusterNeedsTLSEnabled(license, stackStats common.MapStr) (bool, error) {
// TLS does not need to be enabled if license type is something other than trial
value, err := license.GetValue("license.type")
value, err := license.GetValue("type")
if err != nil {
return false, elastic.MakeErrorForMissingField("license.type", elastic.Elasticsearch)
return false, elastic.MakeErrorForMissingField("type", elastic.Elasticsearch)
}

licenseType, ok := value.(string)
Expand Down
12 changes: 11 additions & 1 deletion metricbeat/module/elasticsearch/elasticsearch.go
Original file line number Diff line number Diff line change
Expand Up @@ -196,11 +196,21 @@ func GetLicense(http *helper.HTTP, resetURI string) (common.MapStr, error) {
return nil, err
}

err = json.Unmarshal(content, &license)
var data common.MapStr
err = json.Unmarshal(content, &data)
if err != nil {
return nil, err
}

l, err := data.GetValue("license")
if err != nil {
return nil, err
}
license, ok := l.(map[string]interface{})
if !ok {
return nil, elastic.MakeErrorForMissingField("license", elastic.Elasticsearch)
}

// Cache license for a minute
licenseCache.set(license, time.Minute)
}
Expand Down
60 changes: 19 additions & 41 deletions metricbeat/module/elasticsearch/node_stats/data_xpack.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,9 +89,9 @@ var (
"os": c.Dict("os", s.Schema{
"cpu": c.Dict("cpu", s.Schema{
"load_average": c.Dict("load_average", s.Schema{
"1m": c.Float("1m"),
"5m": c.Float("5m"),
"15m": c.Float("15m"),
"1m": c.Float("1m", s.Optional),
"5m": c.Float("5m", s.Optional),
"15m": c.Float("15m", s.Optional),
}),
}),
"cgroup": c.Dict("cgroup", s.Schema{
Expand All @@ -115,7 +115,7 @@ var (
"limit_in_bytes": c.Str("limit_in_bytes"),
"usage_in_bytes": c.Str("usage_in_bytes"),
}),
}),
}, c.DictOptional),
}),
"process": c.Dict("process", s.Schema{
"open_file_descriptors": c.Int("open_file_descriptors"),
Expand Down Expand Up @@ -144,41 +144,13 @@ var (
}),
}),
"thread_pool": c.Dict("thread_pool", s.Schema{
"bulk": c.Dict("bulk", s.Schema{
"threads": c.Int("threads"),
"queue": c.Int("queue"),
"rejected": c.Int("rejected"),
}),
"generic": c.Dict("generic", s.Schema{
"threads": c.Int("threads"),
"queue": c.Int("queue"),
"rejected": c.Int("rejected"),
}),
"get": c.Dict("get", s.Schema{
"threads": c.Int("threads"),
"queue": c.Int("queue"),
"rejected": c.Int("rejected"),
}),
"index": c.Dict("index", s.Schema{
"threads": c.Int("threads"),
"queue": c.Int("queue"),
"rejected": c.Int("rejected"),
}),
"management": c.Dict("management", s.Schema{
"threads": c.Int("threads"),
"queue": c.Int("queue"),
"rejected": c.Int("rejected"),
}),
"search": c.Dict("search", s.Schema{
"threads": c.Int("threads"),
"queue": c.Int("queue"),
"rejected": c.Int("rejected"),
}),
"watcher": c.Dict("watcher", s.Schema{
"threads": c.Int("threads"),
"queue": c.Int("queue"),
"rejected": c.Int("rejected"),
}),
"analyze": c.Dict("analyze", threadPoolStatsSchema),
"write": c.Dict("write", threadPoolStatsSchema),
"generic": c.Dict("generic", threadPoolStatsSchema),
"get": c.Dict("get", threadPoolStatsSchema),
"management": c.Dict("management", threadPoolStatsSchema),
"search": c.Dict("search", threadPoolStatsSchema),
"watcher": c.Dict("watcher", threadPoolStatsSchema),
}),
"fs": c.Dict("fs", s.Schema{
"summary": c.Dict("total", s.Schema{
Expand All @@ -188,6 +160,12 @@ var (
}),
}),
}

threadPoolStatsSchema = s.Schema{
"threads": c.Int("threads"),
"queue": c.Int("queue"),
"rejected": c.Int("rejected"),
}
)

func eventsMappingXPack(r mb.ReporterV2, m *MetricSet, content []byte) {
Expand All @@ -204,13 +182,13 @@ func eventsMappingXPack(r mb.ReporterV2, m *MetricSet, content []byte) {
// master node will not be accurate anymore as often in these cases a proxy is in front
// of ES and it's not know if the request will be routed to the same node as before.
for nodeID, node := range nodesStruct.Nodes {
clusterID, err := elasticsearch.GetClusterID(m.HTTP, m.HostData().SanitizedURI, nodeID)
clusterID, err := elasticsearch.GetClusterID(m.HTTP, m.HTTP.GetURI(), nodeID)
if err != nil {
logp.Err("could not fetch cluster id: %s", err)
continue
}

isMaster, _ := elasticsearch.IsMaster(m.HTTP, m.HostData().SanitizedURI)
isMaster, _ := elasticsearch.IsMaster(m.HTTP, m.HTTP.GetURI())

event := mb.Event{}
// Build source_node object
Expand Down

0 comments on commit 22e4022

Please sign in to comment.