From 222de4fdb73c83fdd77043043b8c94bd9cea5b9c Mon Sep 17 00:00:00 2001 From: Jean-Louis Dupond Date: Tue, 18 Jun 2019 14:58:22 +0200 Subject: [PATCH 1/5] Elasticsearch: save master for each server (Closes: #2650) Like described in #2650, the current implementation of isMaster was incorrect. As calls were done concurrently, the isMaster value was prone to a race condition. Also when multiple elasticsearch clusters were specified, this was broken. To fix this, a map was added which contains the nodeID and masterID. So for each node we know which one is master (if nodeID == masterID). Test data taken from existing pull request. --- plugins/inputs/elasticsearch/elasticsearch.go | 74 +++- .../elasticsearch/elasticsearch_test.go | 52 ++- plugins/inputs/elasticsearch/testdata_test.go | 410 ++++++++++++++++++ 3 files changed, 500 insertions(+), 36 deletions(-) diff --git a/plugins/inputs/elasticsearch/elasticsearch.go b/plugins/inputs/elasticsearch/elasticsearch.go index 13c567b30605a..65e8233d431df 100644 --- a/plugins/inputs/elasticsearch/elasticsearch.go +++ b/plugins/inputs/elasticsearch/elasticsearch.go @@ -137,9 +137,16 @@ type Elasticsearch struct { NodeStats []string tls.ClientConfig - client *http.Client - catMasterResponseTokens []string - isMaster bool + client *http.Client + serverInfo map[string]serverInfo +} +type serverInfo struct { + nodeID string + masterID string +} + +func (i serverInfo) isMaster() bool { + return i.nodeID == i.masterID } // NewElasticsearch return a new instance of Elasticsearch @@ -186,25 +193,37 @@ func (e *Elasticsearch) Gather(acc telegraf.Accumulator) error { e.client = client } - var wg sync.WaitGroup - wg.Add(len(e.Servers)) + if e.ClusterStats { + e.serverInfo = make(map[string]serverInfo) + for _, serv := range e.Servers { + go func(s string, acc telegraf.Accumulator) { + info := serverInfo{} - for _, serv := range e.Servers { - go func(s string, acc telegraf.Accumulator) { - defer wg.Done() - url := e.nodeStatsUrl(s) - e.isMaster = false + // Gather node ID + if err := e.gatherNodeID(&info, s+"/_nodes/_local/name"); err != nil { + acc.AddError(fmt.Errorf(mask.ReplaceAllString(err.Error(), "http(s)://XXX:XXX@"))) + return + } - if e.ClusterStats { // get cat/master information here so NodeStats can determine // whether this node is the Master - if err := e.setCatMaster(s + "/_cat/master"); err != nil { + if err := e.setCatMaster(&info, s+"/_cat/master"); err != nil { acc.AddError(fmt.Errorf(mask.ReplaceAllString(err.Error(), "http(s)://XXX:XXX@"))) return } - } + }(serv, acc) + } + } - // Always gather node states + var wg sync.WaitGroup + wg.Add(len(e.Servers)) + + for _, serv := range e.Servers { + go func(s string, acc telegraf.Accumulator) { + defer wg.Done() + url := e.nodeStatsUrl(s) + + // Always gather node stats if err := e.gatherNodeStats(url, acc); err != nil { acc.AddError(fmt.Errorf(mask.ReplaceAllString(err.Error(), "http(s)://XXX:XXX@"))) return @@ -221,7 +240,7 @@ func (e *Elasticsearch) Gather(acc telegraf.Accumulator) error { } } - if e.ClusterStats && (e.isMaster || !e.ClusterStatsOnlyFromMaster || !e.Local) { + if e.ClusterStats && (e.serverInfo[s].isMaster() || !e.ClusterStatsOnlyFromMaster || !e.Local) { if err := e.gatherClusterStats(s+"/_cluster/stats", acc); err != nil { acc.AddError(fmt.Errorf(mask.ReplaceAllString(err.Error(), "http(s)://XXX:XXX@"))) return @@ -267,6 +286,22 @@ func (e *Elasticsearch) nodeStatsUrl(baseUrl string) string { return fmt.Sprintf("%s/%s", url, strings.Join(e.NodeStats, ",")) } +func (e *Elasticsearch) gatherNodeID(info *serverInfo, url string) error { + nodeStats := &struct { + ClusterName string `json:"cluster_name"` + Nodes map[string]*nodeStat `json:"nodes"` + }{} + if err := e.gatherJsonData(url, nodeStats); err != nil { + return err + } + + // Only 1 should be returned + for id := range nodeStats.Nodes { + info.nodeID = id + } + return nil +} + func (e *Elasticsearch) gatherNodeStats(url string, acc telegraf.Accumulator) error { nodeStats := &struct { ClusterName string `json:"cluster_name"` @@ -284,11 +319,6 @@ func (e *Elasticsearch) gatherNodeStats(url string, acc telegraf.Accumulator) er "cluster_name": nodeStats.ClusterName, } - if e.ClusterStats { - // check for master - e.isMaster = (id == e.catMasterResponseTokens[0]) - } - for k, v := range n.Attributes { tags["node_attribute_"+k] = v } @@ -405,7 +435,7 @@ func (e *Elasticsearch) gatherClusterStats(url string, acc telegraf.Accumulator) return nil } -func (e *Elasticsearch) setCatMaster(url string) error { +func (e *Elasticsearch) setCatMaster(info *serverInfo, url string) error { r, err := e.client.Get(url) if err != nil { return err @@ -423,7 +453,7 @@ func (e *Elasticsearch) setCatMaster(url string) error { return err } - e.catMasterResponseTokens = strings.Split(string(response), " ") + info.masterID = strings.Split(string(response), " ")[0] return nil } diff --git a/plugins/inputs/elasticsearch/elasticsearch_test.go b/plugins/inputs/elasticsearch/elasticsearch_test.go index ec6951fbdbcda..07d7b8821019c 100644 --- a/plugins/inputs/elasticsearch/elasticsearch_test.go +++ b/plugins/inputs/elasticsearch/elasticsearch_test.go @@ -9,6 +9,7 @@ import ( "github.com/influxdata/telegraf/testutil" "fmt" + "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" ) @@ -22,6 +23,9 @@ func defaultTags() map[string]string { "node_host": "test", } } +func defaultServerInfo() serverInfo { + return serverInfo{nodeID: "", masterID: "SDFsfSDFsdfFSDSDfSFDSDF"} +} type transportMock struct { statusCode int @@ -49,8 +53,8 @@ func (t *transportMock) RoundTrip(r *http.Request) (*http.Response, error) { func (t *transportMock) CancelRequest(_ *http.Request) { } -func checkIsMaster(es *Elasticsearch, expected bool, t *testing.T) { - if es.isMaster != expected { +func checkIsMaster(es *Elasticsearch, server string, expected bool, t *testing.T) { + if es.serverInfo[server].isMaster() != expected { msg := fmt.Sprintf("IsMaster set incorrectly") assert.Fail(t, msg) } @@ -73,13 +77,15 @@ func TestGather(t *testing.T) { es := newElasticsearchWithClient() es.Servers = []string{"http://example.com:9200"} es.client.Transport = newTransportMock(http.StatusOK, nodeStatsResponse) + es.serverInfo = make(map[string]serverInfo) + es.serverInfo["http://example.com:9200"] = defaultServerInfo() var acc testutil.Accumulator if err := acc.GatherError(es.Gather); err != nil { t.Fatal(err) } - checkIsMaster(es, false, t) + checkIsMaster(es, es.Servers[0], false, t) checkNodeStatsResult(t, &acc) } @@ -88,13 +94,15 @@ func TestGatherIndividualStats(t *testing.T) { es.Servers = []string{"http://example.com:9200"} es.NodeStats = []string{"jvm", "process"} es.client.Transport = newTransportMock(http.StatusOK, nodeStatsResponseJVMProcess) + es.serverInfo = make(map[string]serverInfo) + es.serverInfo["http://example.com:9200"] = defaultServerInfo() var acc testutil.Accumulator if err := acc.GatherError(es.Gather); err != nil { t.Fatal(err) } - checkIsMaster(es, false, t) + checkIsMaster(es, es.Servers[0], false, t) tags := defaultTags() acc.AssertDoesNotContainsTaggedFields(t, "elasticsearch_indices", nodestatsIndicesExpected, tags) @@ -112,13 +120,15 @@ func TestGatherNodeStats(t *testing.T) { es := newElasticsearchWithClient() es.Servers = []string{"http://example.com:9200"} es.client.Transport = newTransportMock(http.StatusOK, nodeStatsResponse) + es.serverInfo = make(map[string]serverInfo) + es.serverInfo["http://example.com:9200"] = defaultServerInfo() var acc testutil.Accumulator if err := es.gatherNodeStats("junk", &acc); err != nil { t.Fatal(err) } - checkIsMaster(es, false, t) + checkIsMaster(es, es.Servers[0], false, t) checkNodeStatsResult(t, &acc) } @@ -128,11 +138,13 @@ func TestGatherClusterHealthEmptyClusterHealth(t *testing.T) { es.ClusterHealth = true es.ClusterHealthLevel = "" es.client.Transport = newTransportMock(http.StatusOK, clusterHealthResponse) + es.serverInfo = make(map[string]serverInfo) + es.serverInfo["http://example.com:9200"] = defaultServerInfo() var acc testutil.Accumulator require.NoError(t, es.gatherClusterHealth("junk", &acc)) - checkIsMaster(es, false, t) + checkIsMaster(es, es.Servers[0], false, t) acc.AssertContainsTaggedFields(t, "elasticsearch_cluster_health", clusterHealthExpected, @@ -153,11 +165,13 @@ func TestGatherClusterHealthSpecificClusterHealth(t *testing.T) { es.ClusterHealth = true es.ClusterHealthLevel = "cluster" es.client.Transport = newTransportMock(http.StatusOK, clusterHealthResponse) + es.serverInfo = make(map[string]serverInfo) + es.serverInfo["http://example.com:9200"] = defaultServerInfo() var acc testutil.Accumulator require.NoError(t, es.gatherClusterHealth("junk", &acc)) - checkIsMaster(es, false, t) + checkIsMaster(es, es.Servers[0], false, t) acc.AssertContainsTaggedFields(t, "elasticsearch_cluster_health", clusterHealthExpected, @@ -178,11 +192,13 @@ func TestGatherClusterHealthAlsoIndicesHealth(t *testing.T) { es.ClusterHealth = true es.ClusterHealthLevel = "indices" es.client.Transport = newTransportMock(http.StatusOK, clusterHealthResponseWithIndices) + es.serverInfo = make(map[string]serverInfo) + es.serverInfo["http://example.com:9200"] = defaultServerInfo() var acc testutil.Accumulator require.NoError(t, es.gatherClusterHealth("junk", &acc)) - checkIsMaster(es, false, t) + checkIsMaster(es, es.Servers[0], false, t) acc.AssertContainsTaggedFields(t, "elasticsearch_cluster_health", clusterHealthExpected, @@ -202,13 +218,17 @@ func TestGatherClusterStatsMaster(t *testing.T) { es := newElasticsearchWithClient() es.ClusterStats = true es.Servers = []string{"http://example.com:9200"} + es.serverInfo = make(map[string]serverInfo) + es.serverInfo["http://example.com:9200"] = serverInfo{nodeID: "SDFsfSDFsdfFSDSDfSFDSDF", masterID: ""} // first get catMaster es.client.Transport = newTransportMock(http.StatusOK, IsMasterResult) - require.NoError(t, es.setCatMaster("junk")) + info := es.serverInfo["http://example.com:9200"] + require.NoError(t, es.setCatMaster(&info, "junk")) + es.serverInfo["http://example.com:9200"] = info IsMasterResultTokens := strings.Split(string(IsMasterResult), " ") - if es.catMasterResponseTokens[0] != IsMasterResultTokens[0] { + if es.serverInfo["http://example.com:9200"].masterID != IsMasterResultTokens[0] { msg := fmt.Sprintf("catmaster is incorrect") assert.Fail(t, msg) } @@ -221,7 +241,7 @@ func TestGatherClusterStatsMaster(t *testing.T) { t.Fatal(err) } - checkIsMaster(es, true, t) + checkIsMaster(es, es.Servers[0], true, t) checkNodeStatsResult(t, &acc) // now test the clusterstats method @@ -243,13 +263,17 @@ func TestGatherClusterStatsNonMaster(t *testing.T) { es := newElasticsearchWithClient() es.ClusterStats = true es.Servers = []string{"http://example.com:9200"} + es.serverInfo = make(map[string]serverInfo) + es.serverInfo["http://example.com:9200"] = serverInfo{nodeID: "SDFsfSDFsdfFSDSDfSFDSDF", masterID: ""} // first get catMaster es.client.Transport = newTransportMock(http.StatusOK, IsNotMasterResult) - require.NoError(t, es.setCatMaster("junk")) + info := es.serverInfo["http://example.com:9200"] + require.NoError(t, es.setCatMaster(&info, "junk")) + es.serverInfo["http://example.com:9200"] = info IsNotMasterResultTokens := strings.Split(string(IsNotMasterResult), " ") - if es.catMasterResponseTokens[0] != IsNotMasterResultTokens[0] { + if es.serverInfo["http://example.com:9200"].masterID != IsNotMasterResultTokens[0] { msg := fmt.Sprintf("catmaster is incorrect") assert.Fail(t, msg) } @@ -263,7 +287,7 @@ func TestGatherClusterStatsNonMaster(t *testing.T) { } // ensure flag is clear so Cluster Stats would not be done - checkIsMaster(es, false, t) + checkIsMaster(es, es.Servers[0], false, t) checkNodeStatsResult(t, &acc) } diff --git a/plugins/inputs/elasticsearch/testdata_test.go b/plugins/inputs/elasticsearch/testdata_test.go index df50d0a2b942b..9074f5a3e4831 100644 --- a/plugins/inputs/elasticsearch/testdata_test.go +++ b/plugins/inputs/elasticsearch/testdata_test.go @@ -514,6 +514,416 @@ const nodeStatsResponse = ` "tripped": 0 } } + }, + "SDFsfSDFsdfFSDSDfSPOJUY": { + "timestamp": 1436365550135, + "name": "test.host.com", + "transport_address": "inet[/127.0.0.1:9300]", + "host": "test", + "ip": [ + "inet[/127.0.0.1:9300]", + "NONE" + ], + "attributes": { + "master": "true" + }, + "indices": { + "docs": { + "count": 29652, + "deleted": 5229 + }, + "store": { + "size_in_bytes": 37715234, + "throttle_time_in_millis": 215 + }, + "indexing": { + "index_total": 84790, + "index_time_in_millis": 29680, + "index_current": 0, + "delete_total": 13879, + "delete_time_in_millis": 1139, + "delete_current": 0, + "noop_update_total": 0, + "is_throttled": false, + "throttle_time_in_millis": 0 + }, + "get": { + "total": 1, + "time_in_millis": 2, + "exists_total": 0, + "exists_time_in_millis": 0, + "missing_total": 1, + "missing_time_in_millis": 2, + "current": 0 + }, + "search": { + "open_contexts": 0, + "query_total": 1452, + "query_time_in_millis": 5695, + "query_current": 0, + "fetch_total": 414, + "fetch_time_in_millis": 146, + "fetch_current": 0 + }, + "merges": { + "current": 0, + "current_docs": 0, + "current_size_in_bytes": 0, + "total": 133, + "total_time_in_millis": 21060, + "total_docs": 203672, + "total_size_in_bytes": 142900226 + }, + "refresh": { + "total": 1076, + "total_time_in_millis": 20078 + }, + "flush": { + "total": 115, + "total_time_in_millis": 2401 + }, + "warmer": { + "current": 0, + "total": 2319, + "total_time_in_millis": 448 + }, + "filter_cache": { + "memory_size_in_bytes": 7384, + "evictions": 0 + }, + "id_cache": { + "memory_size_in_bytes": 0 + }, + "fielddata": { + "memory_size_in_bytes": 12996, + "evictions": 0 + }, + "percolate": { + "total": 0, + "time_in_millis": 0, + "current": 0, + "memory_size_in_bytes": -1, + "memory_size": "-1b", + "queries": 0 + }, + "completion": { + "size_in_bytes": 0 + }, + "segments": { + "count": 134, + "memory_in_bytes": 1285212, + "index_writer_memory_in_bytes": 0, + "index_writer_max_memory_in_bytes": 172368955, + "version_map_memory_in_bytes": 611844, + "fixed_bit_set_memory_in_bytes": 0 + }, + "translog": { + "operations": 17702, + "size_in_bytes": 17 + }, + "suggest": { + "total": 0, + "time_in_millis": 0, + "current": 0 + }, + "query_cache": { + "memory_size_in_bytes": 0, + "evictions": 0, + "hit_count": 0, + "miss_count": 0 + }, + "recovery": { + "current_as_source": 0, + "current_as_target": 0, + "throttle_time_in_millis": 0 + } + }, + "os": { + "timestamp": 1436460392944, + "load_average": [ + 0.01, + 0.04, + 0.05 + ], + "mem": { + "free_in_bytes": 477761536, + "used_in_bytes": 1621868544, + "free_percent": 74, + "used_percent": 25, + "actual_free_in_bytes": 1565470720, + "actual_used_in_bytes": 534159360 + }, + "swap": { + "used_in_bytes": 0, + "free_in_bytes": 487997440 + } + }, + "process": { + "timestamp": 1436460392945, + "open_file_descriptors": 160, + "cpu": { + "percent": 2, + "sys_in_millis": 1870, + "user_in_millis": 13610, + "total_in_millis": 15480 + }, + "mem": { + "total_virtual_in_bytes": 4747890688 + } + }, + "jvm": { + "timestamp": 1436460392945, + "uptime_in_millis": 202245, + "mem": { + "heap_used_in_bytes": 52709568, + "heap_used_percent": 5, + "heap_committed_in_bytes": 259522560, + "heap_max_in_bytes": 1038876672, + "non_heap_used_in_bytes": 39634576, + "non_heap_committed_in_bytes": 40841216, + "pools": { + "young": { + "used_in_bytes": 32685760, + "max_in_bytes": 279183360, + "peak_used_in_bytes": 71630848, + "peak_max_in_bytes": 279183360 + }, + "survivor": { + "used_in_bytes": 8912880, + "max_in_bytes": 34865152, + "peak_used_in_bytes": 8912888, + "peak_max_in_bytes": 34865152 + }, + "old": { + "used_in_bytes": 11110928, + "max_in_bytes": 724828160, + "peak_used_in_bytes": 14354608, + "peak_max_in_bytes": 724828160 + } + } + }, + "threads": { + "count": 44, + "peak_count": 45 + }, + "gc": { + "collectors": { + "young": { + "collection_count": 2, + "collection_time_in_millis": 98 + }, + "old": { + "collection_count": 1, + "collection_time_in_millis": 24 + } + } + }, + "buffer_pools": { + "direct": { + "count": 40, + "used_in_bytes": 6304239, + "total_capacity_in_bytes": 6304239 + }, + "mapped": { + "count": 0, + "used_in_bytes": 0, + "total_capacity_in_bytes": 0 + } + } + }, + "thread_pool": { + "percolate": { + "threads": 123, + "queue": 23, + "active": 13, + "rejected": 235, + "largest": 23, + "completed": 33 + }, + "fetch_shard_started": { + "threads": 3, + "queue": 1, + "active": 5, + "rejected": 6, + "largest": 4, + "completed": 54 + }, + "listener": { + "threads": 1, + "queue": 2, + "active": 4, + "rejected": 8, + "largest": 1, + "completed": 1 + }, + "index": { + "threads": 6, + "queue": 8, + "active": 4, + "rejected": 2, + "largest": 3, + "completed": 6 + }, + "refresh": { + "threads": 23, + "queue": 7, + "active": 3, + "rejected": 4, + "largest": 8, + "completed": 3 + }, + "suggest": { + "threads": 2, + "queue": 7, + "active": 2, + "rejected": 1, + "largest": 8, + "completed": 3 + }, + "generic": { + "threads": 1, + "queue": 4, + "active": 6, + "rejected": 3, + "largest": 2, + "completed": 27 + }, + "warmer": { + "threads": 2, + "queue": 7, + "active": 3, + "rejected": 2, + "largest": 3, + "completed": 1 + }, + "search": { + "threads": 5, + "queue": 7, + "active": 2, + "rejected": 7, + "largest": 2, + "completed": 4 + }, + "flush": { + "threads": 3, + "queue": 8, + "active": 0, + "rejected": 1, + "largest": 5, + "completed": 3 + }, + "optimize": { + "threads": 3, + "queue": 4, + "active": 1, + "rejected": 2, + "largest": 7, + "completed": 3 + }, + "fetch_shard_store": { + "threads": 1, + "queue": 7, + "active": 4, + "rejected": 2, + "largest": 4, + "completed": 1 + }, + "management": { + "threads": 2, + "queue": 3, + "active": 1, + "rejected": 6, + "largest": 2, + "completed": 22 + }, + "get": { + "threads": 1, + "queue": 8, + "active": 4, + "rejected": 3, + "largest": 2, + "completed": 1 + }, + "merge": { + "threads": 6, + "queue": 4, + "active": 5, + "rejected": 2, + "largest": 5, + "completed": 1 + }, + "bulk": { + "threads": 4, + "queue": 5, + "active": 7, + "rejected": 3, + "largest": 1, + "completed": 4 + }, + "snapshot": { + "threads": 8, + "queue": 5, + "active": 6, + "rejected": 2, + "largest": 1, + "completed": 0 + } + }, + "fs": { + "timestamp": 1436460392946, + "total": { + "total_in_bytes": 19507089408, + "free_in_bytes": 16909316096, + "available_in_bytes": 15894814720 + }, + "data": [ + { + "path": "/usr/share/elasticsearch/data/elasticsearch/nodes/0", + "mount": "/usr/share/elasticsearch/data", + "type": "ext4", + "total_in_bytes": 19507089408, + "free_in_bytes": 16909316096, + "available_in_bytes": 15894814720 + } + ] + }, + "transport": { + "server_open": 13, + "rx_count": 6, + "rx_size_in_bytes": 1380, + "tx_count": 6, + "tx_size_in_bytes": 1380 + }, + "http": { + "current_open": 3, + "total_opened": 3 + }, + "breakers": { + "fielddata": { + "limit_size_in_bytes": 623326003, + "limit_size": "594.4mb", + "estimated_size_in_bytes": 0, + "estimated_size": "0b", + "overhead": 1.03, + "tripped": 0 + }, + "request": { + "limit_size_in_bytes": 415550668, + "limit_size": "396.2mb", + "estimated_size_in_bytes": 0, + "estimated_size": "0b", + "overhead": 1.0, + "tripped": 0 + }, + "parent": { + "limit_size_in_bytes": 727213670, + "limit_size": "693.5mb", + "estimated_size_in_bytes": 0, + "estimated_size": "0b", + "overhead": 1.0, + "tripped": 0 + } + } } } } From dbe04bd1b11be7ae86c5de22a71bc7f0fee4a89d Mon Sep 17 00:00:00 2001 From: Jean-Louis Dupond Date: Tue, 18 Jun 2019 15:34:00 +0200 Subject: [PATCH 2/5] Elasticsearch: add number_of_in_flight_fetch to cluster health The metric number_of_in_flight_fetch was not used in the cluster health. Add it and sort the fields like they are printed by ElasticSearch. Closes: #2779 --- plugins/inputs/elasticsearch/elasticsearch.go | 56 ++++++++++--------- plugins/inputs/elasticsearch/testdata_test.go | 3 + 2 files changed, 32 insertions(+), 27 deletions(-) diff --git a/plugins/inputs/elasticsearch/elasticsearch.go b/plugins/inputs/elasticsearch/elasticsearch.go index 65e8233d431df..c9973b029aaca 100644 --- a/plugins/inputs/elasticsearch/elasticsearch.go +++ b/plugins/inputs/elasticsearch/elasticsearch.go @@ -40,31 +40,32 @@ type nodeStat struct { } type clusterHealth struct { - ClusterName string `json:"cluster_name"` - Status string `json:"status"` - TimedOut bool `json:"timed_out"` - NumberOfNodes int `json:"number_of_nodes"` - NumberOfDataNodes int `json:"number_of_data_nodes"` ActivePrimaryShards int `json:"active_primary_shards"` ActiveShards int `json:"active_shards"` - RelocatingShards int `json:"relocating_shards"` - InitializingShards int `json:"initializing_shards"` - UnassignedShards int `json:"unassigned_shards"` + ActiveShardsPercentAsNumber float64 `json:"active_shards_percent_as_number"` + ClusterName string `json:"cluster_name"` DelayedUnassignedShards int `json:"delayed_unassigned_shards"` + InitializingShards int `json:"initializing_shards"` + NumberOfDataNodes int `json:"number_of_data_nodes"` + NumberOfInFlightFetch int `json:"number_of_in_flight_fetch"` + NumberOfNodes int `json:"number_of_nodes"` NumberOfPendingTasks int `json:"number_of_pending_tasks"` + RelocatingShards int `json:"relocating_shards"` + Status string `json:"status"` TaskMaxWaitingInQueueMillis int `json:"task_max_waiting_in_queue_millis"` - ActiveShardsPercentAsNumber float64 `json:"active_shards_percent_as_number"` + TimedOut bool `json:"timed_out"` + UnassignedShards int `json:"unassigned_shards"` Indices map[string]indexHealth `json:"indices"` } type indexHealth struct { - Status string `json:"status"` - NumberOfShards int `json:"number_of_shards"` - NumberOfReplicas int `json:"number_of_replicas"` ActivePrimaryShards int `json:"active_primary_shards"` ActiveShards int `json:"active_shards"` - RelocatingShards int `json:"relocating_shards"` InitializingShards int `json:"initializing_shards"` + NumberOfReplicas int `json:"number_of_replicas"` + NumberOfShards int `json:"number_of_shards"` + RelocatingShards int `json:"relocating_shards"` + Status string `json:"status"` UnassignedShards int `json:"unassigned_shards"` } @@ -361,20 +362,21 @@ func (e *Elasticsearch) gatherClusterHealth(url string, acc telegraf.Accumulator } measurementTime := time.Now() clusterFields := map[string]interface{}{ - "status": healthStats.Status, - "status_code": mapHealthStatusToCode(healthStats.Status), - "timed_out": healthStats.TimedOut, - "number_of_nodes": healthStats.NumberOfNodes, - "number_of_data_nodes": healthStats.NumberOfDataNodes, "active_primary_shards": healthStats.ActivePrimaryShards, "active_shards": healthStats.ActiveShards, - "relocating_shards": healthStats.RelocatingShards, - "initializing_shards": healthStats.InitializingShards, - "unassigned_shards": healthStats.UnassignedShards, + "active_shards_percent_as_number": healthStats.ActiveShardsPercentAsNumber, "delayed_unassigned_shards": healthStats.DelayedUnassignedShards, + "initializing_shards": healthStats.InitializingShards, + "number_of_data_nodes": healthStats.NumberOfDataNodes, + "number_of_in_flight_fetch": healthStats.NumberOfInFlightFetch, + "number_of_nodes": healthStats.NumberOfNodes, "number_of_pending_tasks": healthStats.NumberOfPendingTasks, + "relocating_shards": healthStats.RelocatingShards, + "status": healthStats.Status, + "status_code": mapHealthStatusToCode(healthStats.Status), "task_max_waiting_in_queue_millis": healthStats.TaskMaxWaitingInQueueMillis, - "active_shards_percent_as_number": healthStats.ActiveShardsPercentAsNumber, + "timed_out": healthStats.TimedOut, + "unassigned_shards": healthStats.UnassignedShards, } acc.AddFields( "elasticsearch_cluster_health", @@ -385,14 +387,14 @@ func (e *Elasticsearch) gatherClusterHealth(url string, acc telegraf.Accumulator for name, health := range healthStats.Indices { indexFields := map[string]interface{}{ - "status": health.Status, - "status_code": mapHealthStatusToCode(health.Status), - "number_of_shards": health.NumberOfShards, - "number_of_replicas": health.NumberOfReplicas, "active_primary_shards": health.ActivePrimaryShards, "active_shards": health.ActiveShards, - "relocating_shards": health.RelocatingShards, "initializing_shards": health.InitializingShards, + "number_of_replicas": health.NumberOfReplicas, + "number_of_shards": health.NumberOfShards, + "relocating_shards": health.RelocatingShards, + "status": health.Status, + "status_code": mapHealthStatusToCode(health.Status), "unassigned_shards": health.UnassignedShards, } acc.AddFields( diff --git a/plugins/inputs/elasticsearch/testdata_test.go b/plugins/inputs/elasticsearch/testdata_test.go index 9074f5a3e4831..ffdf9559d3bea 100644 --- a/plugins/inputs/elasticsearch/testdata_test.go +++ b/plugins/inputs/elasticsearch/testdata_test.go @@ -7,6 +7,7 @@ const clusterHealthResponse = ` "timed_out": false, "number_of_nodes": 3, "number_of_data_nodes": 3, + "number_of_in_flight_fetch": 0, "active_primary_shards": 5, "active_shards": 15, "relocating_shards": 0, @@ -26,6 +27,7 @@ const clusterHealthResponseWithIndices = ` "timed_out": false, "number_of_nodes": 3, "number_of_data_nodes": 3, + "number_of_in_flight_fetch": 0, "active_primary_shards": 5, "active_shards": 15, "relocating_shards": 0, @@ -66,6 +68,7 @@ var clusterHealthExpected = map[string]interface{}{ "timed_out": false, "number_of_nodes": 3, "number_of_data_nodes": 3, + "number_of_in_flight_fetch": 0, "active_primary_shards": 5, "active_shards": 15, "relocating_shards": 0, From 51cc30b0bc6f23a50a5330b1e92ce241a5c12df4 Mon Sep 17 00:00:00 2001 From: Jean-Louis Dupond Date: Mon, 24 Jun 2019 10:08:29 +0200 Subject: [PATCH 3/5] Elasticsearch: changes like requested --- plugins/inputs/elasticsearch/elasticsearch.go | 36 +++++++++++-------- .../elasticsearch/elasticsearch_test.go | 16 ++++----- 2 files changed, 30 insertions(+), 22 deletions(-) diff --git a/plugins/inputs/elasticsearch/elasticsearch.go b/plugins/inputs/elasticsearch/elasticsearch.go index c9973b029aaca..51bc4356c4b5d 100644 --- a/plugins/inputs/elasticsearch/elasticsearch.go +++ b/plugins/inputs/elasticsearch/elasticsearch.go @@ -138,8 +138,9 @@ type Elasticsearch struct { NodeStats []string tls.ClientConfig - client *http.Client - serverInfo map[string]serverInfo + client *http.Client + serverInfo map[string]serverInfo + serverInfoMutex sync.Mutex } type serverInfo struct { nodeID string @@ -200,18 +201,25 @@ func (e *Elasticsearch) Gather(acc telegraf.Accumulator) error { go func(s string, acc telegraf.Accumulator) { info := serverInfo{} + var err error + // Gather node ID - if err := e.gatherNodeID(&info, s+"/_nodes/_local/name"); err != nil { + if info.nodeID, err = e.gatherNodeID(s + "/_nodes/_local/name"); err != nil { acc.AddError(fmt.Errorf(mask.ReplaceAllString(err.Error(), "http(s)://XXX:XXX@"))) return } // get cat/master information here so NodeStats can determine // whether this node is the Master - if err := e.setCatMaster(&info, s+"/_cat/master"); err != nil { + if info.masterID, err = e.getCatMaster(s + "/_cat/master"); err != nil { acc.AddError(fmt.Errorf(mask.ReplaceAllString(err.Error(), "http(s)://XXX:XXX@"))) return } + + e.serverInfoMutex.Lock() + e.serverInfo[s] = info + e.serverInfoMutex.Unlock() + }(serv, acc) } } @@ -287,20 +295,20 @@ func (e *Elasticsearch) nodeStatsUrl(baseUrl string) string { return fmt.Sprintf("%s/%s", url, strings.Join(e.NodeStats, ",")) } -func (e *Elasticsearch) gatherNodeID(info *serverInfo, url string) error { +func (e *Elasticsearch) gatherNodeID(url string) (string, error) { nodeStats := &struct { ClusterName string `json:"cluster_name"` Nodes map[string]*nodeStat `json:"nodes"` }{} if err := e.gatherJsonData(url, nodeStats); err != nil { - return err + return "", err } // Only 1 should be returned for id := range nodeStats.Nodes { - info.nodeID = id + return id, nil } - return nil + return "", nil } func (e *Elasticsearch) gatherNodeStats(url string, acc telegraf.Accumulator) error { @@ -437,27 +445,27 @@ func (e *Elasticsearch) gatherClusterStats(url string, acc telegraf.Accumulator) return nil } -func (e *Elasticsearch) setCatMaster(info *serverInfo, url string) error { +func (e *Elasticsearch) getCatMaster(url string) (string, error) { r, err := e.client.Get(url) if err != nil { - return err + return "", err } defer r.Body.Close() if r.StatusCode != http.StatusOK { // NOTE: we are not going to read/discard r.Body under the assumption we'd prefer // to let the underlying transport close the connection and re-establish a new one for // future calls. - return fmt.Errorf("elasticsearch: Unable to retrieve master node information. API responded with status-code %d, expected %d", r.StatusCode, http.StatusOK) + return "", fmt.Errorf("elasticsearch: Unable to retrieve master node information. API responded with status-code %d, expected %d", r.StatusCode, http.StatusOK) } response, err := ioutil.ReadAll(r.Body) if err != nil { - return err + return "", err } - info.masterID = strings.Split(string(response), " ")[0] + masterID := strings.Split(string(response), " ")[0] - return nil + return masterID, nil } func (e *Elasticsearch) gatherJsonData(url string, v interface{}) error { diff --git a/plugins/inputs/elasticsearch/elasticsearch_test.go b/plugins/inputs/elasticsearch/elasticsearch_test.go index 07d7b8821019c..f536e8e1a5c0d 100644 --- a/plugins/inputs/elasticsearch/elasticsearch_test.go +++ b/plugins/inputs/elasticsearch/elasticsearch_test.go @@ -219,16 +219,17 @@ func TestGatherClusterStatsMaster(t *testing.T) { es.ClusterStats = true es.Servers = []string{"http://example.com:9200"} es.serverInfo = make(map[string]serverInfo) - es.serverInfo["http://example.com:9200"] = serverInfo{nodeID: "SDFsfSDFsdfFSDSDfSFDSDF", masterID: ""} + info := serverInfo{nodeID: "SDFsfSDFsdfFSDSDfSFDSDF", masterID: ""} // first get catMaster es.client.Transport = newTransportMock(http.StatusOK, IsMasterResult) - info := es.serverInfo["http://example.com:9200"] - require.NoError(t, es.setCatMaster(&info, "junk")) + masterID, err := es.getCatMaster("junk") + require.NoError(t, err) + info.masterID = masterID es.serverInfo["http://example.com:9200"] = info IsMasterResultTokens := strings.Split(string(IsMasterResult), " ") - if es.serverInfo["http://example.com:9200"].masterID != IsMasterResultTokens[0] { + if masterID != IsMasterResultTokens[0] { msg := fmt.Sprintf("catmaster is incorrect") assert.Fail(t, msg) } @@ -268,12 +269,11 @@ func TestGatherClusterStatsNonMaster(t *testing.T) { // first get catMaster es.client.Transport = newTransportMock(http.StatusOK, IsNotMasterResult) - info := es.serverInfo["http://example.com:9200"] - require.NoError(t, es.setCatMaster(&info, "junk")) - es.serverInfo["http://example.com:9200"] = info + masterID, err := es.getCatMaster("junk") + require.NoError(t, err) IsNotMasterResultTokens := strings.Split(string(IsNotMasterResult), " ") - if es.serverInfo["http://example.com:9200"].masterID != IsNotMasterResultTokens[0] { + if masterID != IsNotMasterResultTokens[0] { msg := fmt.Sprintf("catmaster is incorrect") assert.Fail(t, msg) } From 1cb5dbecf48509076580d9c2d8672768d51a37df Mon Sep 17 00:00:00 2001 From: Jean-Louis Dupond Date: Mon, 24 Jun 2019 10:28:47 +0200 Subject: [PATCH 4/5] Move cluster indices to elasticsearch_cluster_health_indices (Closes: #2711) --- plugins/inputs/elasticsearch/elasticsearch.go | 2 +- plugins/inputs/elasticsearch/elasticsearch_test.go | 12 ++++++------ 2 files changed, 7 insertions(+), 7 deletions(-) diff --git a/plugins/inputs/elasticsearch/elasticsearch.go b/plugins/inputs/elasticsearch/elasticsearch.go index 51bc4356c4b5d..5faf17296eea9 100644 --- a/plugins/inputs/elasticsearch/elasticsearch.go +++ b/plugins/inputs/elasticsearch/elasticsearch.go @@ -406,7 +406,7 @@ func (e *Elasticsearch) gatherClusterHealth(url string, acc telegraf.Accumulator "unassigned_shards": health.UnassignedShards, } acc.AddFields( - "elasticsearch_indices", + "elasticsearch_cluster_health_indices", indexFields, map[string]string{"index": name, "name": healthStats.ClusterName}, measurementTime, diff --git a/plugins/inputs/elasticsearch/elasticsearch_test.go b/plugins/inputs/elasticsearch/elasticsearch_test.go index f536e8e1a5c0d..2987841325c19 100644 --- a/plugins/inputs/elasticsearch/elasticsearch_test.go +++ b/plugins/inputs/elasticsearch/elasticsearch_test.go @@ -150,11 +150,11 @@ func TestGatherClusterHealthEmptyClusterHealth(t *testing.T) { clusterHealthExpected, map[string]string{"name": "elasticsearch_telegraf"}) - acc.AssertDoesNotContainsTaggedFields(t, "elasticsearch_indices", + acc.AssertDoesNotContainsTaggedFields(t, "elasticsearch_cluster_health_indices", v1IndexExpected, map[string]string{"index": "v1"}) - acc.AssertDoesNotContainsTaggedFields(t, "elasticsearch_indices", + acc.AssertDoesNotContainsTaggedFields(t, "elasticsearch_cluster_health_indices", v2IndexExpected, map[string]string{"index": "v2"}) } @@ -177,11 +177,11 @@ func TestGatherClusterHealthSpecificClusterHealth(t *testing.T) { clusterHealthExpected, map[string]string{"name": "elasticsearch_telegraf"}) - acc.AssertDoesNotContainsTaggedFields(t, "elasticsearch_indices", + acc.AssertDoesNotContainsTaggedFields(t, "elasticsearch_cluster_health_indices", v1IndexExpected, map[string]string{"index": "v1"}) - acc.AssertDoesNotContainsTaggedFields(t, "elasticsearch_indices", + acc.AssertDoesNotContainsTaggedFields(t, "elasticsearch_cluster_health_indices", v2IndexExpected, map[string]string{"index": "v2"}) } @@ -204,11 +204,11 @@ func TestGatherClusterHealthAlsoIndicesHealth(t *testing.T) { clusterHealthExpected, map[string]string{"name": "elasticsearch_telegraf"}) - acc.AssertContainsTaggedFields(t, "elasticsearch_indices", + acc.AssertContainsTaggedFields(t, "elasticsearch_cluster_health_indices", v1IndexExpected, map[string]string{"index": "v1", "name": "elasticsearch_telegraf"}) - acc.AssertContainsTaggedFields(t, "elasticsearch_indices", + acc.AssertContainsTaggedFields(t, "elasticsearch_cluster_health_indices", v2IndexExpected, map[string]string{"index": "v2", "name": "elasticsearch_telegraf"}) } From f962d3a0bdb1f572bebf78bf0feb6975f6b7486f Mon Sep 17 00:00:00 2001 From: Jean-Louis Dupond Date: Tue, 25 Jun 2019 08:27:52 +0200 Subject: [PATCH 5/5] Add Waitgroup --- plugins/inputs/elasticsearch/elasticsearch.go | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/plugins/inputs/elasticsearch/elasticsearch.go b/plugins/inputs/elasticsearch/elasticsearch.go index 5faf17296eea9..59b21f2cd0626 100644 --- a/plugins/inputs/elasticsearch/elasticsearch.go +++ b/plugins/inputs/elasticsearch/elasticsearch.go @@ -196,9 +196,13 @@ func (e *Elasticsearch) Gather(acc telegraf.Accumulator) error { } if e.ClusterStats { + var wgC sync.WaitGroup + wgC.Add(len(e.Servers)) + e.serverInfo = make(map[string]serverInfo) for _, serv := range e.Servers { go func(s string, acc telegraf.Accumulator) { + defer wgC.Done() info := serverInfo{} var err error @@ -222,6 +226,7 @@ func (e *Elasticsearch) Gather(acc telegraf.Accumulator) error { }(serv, acc) } + wgC.Wait() } var wg sync.WaitGroup