diff --git a/CHANGELOG.md b/CHANGELOG.md index aa6bd7f1..a8fdfe88 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,6 +1,3 @@ -## Unreleased -* [BREAKING] Rename --es.cluster_settings to --collector.clustersettings - ## 1.5.0 / 2022-07-28 * [FEATURE] Add metrics collection for data stream statistics #592 diff --git a/collector/cluster_settings.go b/collector/cluster_settings.go index b6b0bb97..c9c374e2 100644 --- a/collector/cluster_settings.go +++ b/collector/cluster_settings.go @@ -14,139 +14,157 @@ package collector import ( - "context" "encoding/json" + "fmt" "io/ioutil" "net/http" "net/url" + "path" "strconv" "github.com/go-kit/log" + "github.com/go-kit/log/level" "github.com/imdario/mergo" "github.com/prometheus/client_golang/prometheus" ) -func init() { - registerCollector("clustersettings", defaultDisabled, NewClusterSettings) -} - -type ClusterSettingsCollector struct { +// ClusterSettings information struct +type ClusterSettings struct { logger log.Logger - u *url.URL - hc *http.Client -} + client *http.Client + url *url.URL -func NewClusterSettings(logger log.Logger, u *url.URL, hc *http.Client) (Collector, error) { - return &ClusterSettingsCollector{ - logger: logger, - u: u, - hc: hc, - }, nil + up prometheus.Gauge + shardAllocationEnabled prometheus.Gauge + maxShardsPerNode prometheus.Gauge + totalScrapes, jsonParseFailures prometheus.Counter } -var clusterSettingsDesc = map[string]*prometheus.Desc{ - "shardAllocationEnabled": prometheus.NewDesc( - prometheus.BuildFQName(namespace, "clustersettings_stats", "shard_allocation_enabled"), - "Current mode of cluster wide shard routing allocation settings.", - nil, nil, - ), - - "maxShardsPerNode": prometheus.NewDesc( - prometheus.BuildFQName(namespace, "clustersettings_stats", "max_shards_per_node"), - "Current maximum number of shards per node setting.", - nil, nil, - ), +// NewClusterSettings defines Cluster Settings Prometheus metrics +func NewClusterSettings(logger log.Logger, client *http.Client, url *url.URL) *ClusterSettings { + return &ClusterSettings{ + logger: logger, + client: client, + url: url, + + up: prometheus.NewGauge(prometheus.GaugeOpts{ + Name: prometheus.BuildFQName(namespace, "clustersettings_stats", "up"), + Help: "Was the last scrape of the Elasticsearch cluster settings endpoint successful.", + }), + totalScrapes: prometheus.NewCounter(prometheus.CounterOpts{ + Name: prometheus.BuildFQName(namespace, "clustersettings_stats", "total_scrapes"), + Help: "Current total Elasticsearch cluster settings scrapes.", + }), + shardAllocationEnabled: prometheus.NewGauge(prometheus.GaugeOpts{ + Name: prometheus.BuildFQName(namespace, "clustersettings_stats", "shard_allocation_enabled"), + Help: "Current mode of cluster wide shard routing allocation settings.", + }), + maxShardsPerNode: prometheus.NewGauge(prometheus.GaugeOpts{ + Name: prometheus.BuildFQName(namespace, "clustersettings_stats", "max_shards_per_node"), + Help: "Current maximum number of shards per node setting.", + }), + jsonParseFailures: prometheus.NewCounter(prometheus.CounterOpts{ + Name: prometheus.BuildFQName(namespace, "clustersettings_stats", "json_parse_failures"), + Help: "Number of errors while parsing JSON.", + }), + } } -// clusterSettingsResponse is a representation of a Elasticsearch Cluster Settings -type clusterSettingsResponse struct { - Defaults clusterSettingsSection `json:"defaults"` - Persistent clusterSettingsSection `json:"persistent"` - Transient clusterSettingsSection `json:"transient"` +// Describe add Snapshots metrics descriptions +func (cs *ClusterSettings) Describe(ch chan<- *prometheus.Desc) { + ch <- cs.up.Desc() + ch <- cs.totalScrapes.Desc() + ch <- cs.shardAllocationEnabled.Desc() + ch <- cs.maxShardsPerNode.Desc() + ch <- cs.jsonParseFailures.Desc() } -// clusterSettingsSection is a representation of a Elasticsearch Cluster Settings -type clusterSettingsSection struct { - Cluster clusterSettingsCluster `json:"cluster"` -} +func (cs *ClusterSettings) getAndParseURL(u *url.URL, data interface{}) error { + res, err := cs.client.Get(u.String()) + if err != nil { + return fmt.Errorf("failed to get from %s://%s:%s%s: %s", + u.Scheme, u.Hostname(), u.Port(), u.Path, err) + } -// clusterSettingsCluster is a representation of a Elasticsearch clusterSettingsCluster Settings -type clusterSettingsCluster struct { - Routing clusterSettingsRouting `json:"routing"` - // This can be either a JSON object (which does not contain the value we are interested in) or a string - MaxShardsPerNode interface{} `json:"max_shards_per_node"` -} + defer func() { + err = res.Body.Close() + if err != nil { + _ = level.Warn(cs.logger).Log( + "msg", "failed to close http.Client", + "err", err, + ) + } + }() -// clusterSettingsRouting is a representation of a Elasticsearch Cluster shard routing configuration -type clusterSettingsRouting struct { - Allocation clusterSettingsAllocation `json:"allocation"` -} + if res.StatusCode != http.StatusOK { + return fmt.Errorf("HTTP Request failed with code %d", res.StatusCode) + } -// clusterSettingsAllocation is a representation of a Elasticsearch Cluster shard routing allocation settings -type clusterSettingsAllocation struct { - Enabled string `json:"enable"` -} + bts, err := ioutil.ReadAll(res.Body) + if err != nil { + cs.jsonParseFailures.Inc() + return err + } -// ClusterSettings information struct -type ClusterSettings struct { - logger log.Logger - client *http.Client - url *url.URL + if err := json.Unmarshal(bts, data); err != nil { + cs.jsonParseFailures.Inc() + return err + } - maxShardsPerNode prometheus.Gauge + return nil } -func (c *ClusterSettingsCollector) Update(ctx context.Context, ch chan<- prometheus.Metric) error { - u := c.u.ResolveReference(&url.URL{Path: "_cluster/settings"}) +func (cs *ClusterSettings) fetchAndDecodeClusterSettingsStats() (ClusterSettingsResponse, error) { + + u := *cs.url + u.Path = path.Join(u.Path, "/_cluster/settings") q := u.Query() q.Set("include_defaults", "true") u.RawQuery = q.Encode() - - req, err := http.NewRequestWithContext(ctx, "GET", u.String(), nil) - if err != nil { - return err - } - - resp, err := c.hc.Do(req) + u.RawPath = q.Encode() + var csfr ClusterSettingsFullResponse + var csr ClusterSettingsResponse + err := cs.getAndParseURL(&u, &csfr) if err != nil { - return err + return csr, err } - defer resp.Body.Close() - b, err := ioutil.ReadAll(resp.Body) + err = mergo.Merge(&csr, csfr.Defaults, mergo.WithOverride) if err != nil { - return err + return csr, err } - var data clusterSettingsResponse - err = json.Unmarshal(b, &data) + err = mergo.Merge(&csr, csfr.Persistent, mergo.WithOverride) if err != nil { - return err + return csr, err } + err = mergo.Merge(&csr, csfr.Transient, mergo.WithOverride) - // Merge all settings into one struct - merged := data.Defaults + return csr, err +} - err = mergo.Merge(&merged, data.Persistent, mergo.WithOverride) - if err != nil { - return err - } - err = mergo.Merge(&merged, data.Transient, mergo.WithOverride) - if err != nil { - return err - } +// Collect gets cluster settings metric values +func (cs *ClusterSettings) Collect(ch chan<- prometheus.Metric) { - // Max shards per node - if maxShardsPerNodeString, ok := merged.Cluster.MaxShardsPerNode.(string); ok { - maxShardsPerNode, err := strconv.ParseInt(maxShardsPerNodeString, 10, 64) - if err == nil { - ch <- prometheus.MustNewConstMetric( - clusterSettingsDesc["maxShardsPerNode"], - prometheus.GaugeValue, - float64(maxShardsPerNode), - ) - } + cs.totalScrapes.Inc() + defer func() { + ch <- cs.up + ch <- cs.totalScrapes + ch <- cs.jsonParseFailures + ch <- cs.shardAllocationEnabled + ch <- cs.maxShardsPerNode + }() + + csr, err := cs.fetchAndDecodeClusterSettingsStats() + if err != nil { + cs.shardAllocationEnabled.Set(0) + cs.up.Set(0) + _ = level.Warn(cs.logger).Log( + "msg", "failed to fetch and decode cluster settings stats", + "err", err, + ) + return } + cs.up.Set(1) - // Shard allocation enabled shardAllocationMap := map[string]int{ "all": 0, "primaries": 1, @@ -154,11 +172,12 @@ func (c *ClusterSettingsCollector) Update(ctx context.Context, ch chan<- prometh "none": 3, } - ch <- prometheus.MustNewConstMetric( - clusterSettingsDesc["shardAllocationEnabled"], - prometheus.GaugeValue, - float64(shardAllocationMap[merged.Cluster.Routing.Allocation.Enabled]), - ) + cs.shardAllocationEnabled.Set(float64(shardAllocationMap[csr.Cluster.Routing.Allocation.Enabled])) - return nil + if maxShardsPerNodeString, ok := csr.Cluster.MaxShardsPerNode.(string); ok { + maxShardsPerNode, err := strconv.ParseInt(maxShardsPerNodeString, 10, 64) + if err == nil { + cs.maxShardsPerNode.Set(float64(maxShardsPerNode)) + } + } } diff --git a/collector/cluster_settings_response.go b/collector/cluster_settings_response.go new file mode 100644 index 00000000..ac2fcb8d --- /dev/null +++ b/collector/cluster_settings_response.go @@ -0,0 +1,43 @@ +// Copyright 2021 The Prometheus Authors +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package collector + +// ClusterSettingsFullResponse is a representation of a Elasticsearch Cluster Settings +type ClusterSettingsFullResponse struct { + Defaults ClusterSettingsResponse `json:"defaults"` + Persistent ClusterSettingsResponse `json:"persistent"` + Transient ClusterSettingsResponse `json:"transient"` +} + +// ClusterSettingsResponse is a representation of a Elasticsearch Cluster Settings +type ClusterSettingsResponse struct { + Cluster Cluster `json:"cluster"` +} + +// Cluster is a representation of a Elasticsearch Cluster Settings +type Cluster struct { + Routing Routing `json:"routing"` + // This can be either a JSON object (which does not contain the value we are interested in) or a string + MaxShardsPerNode interface{} `json:"max_shards_per_node"` +} + +// Routing is a representation of a Elasticsearch Cluster shard routing configuration +type Routing struct { + Allocation Allocation `json:"allocation"` +} + +// Allocation is a representation of a Elasticsearch Cluster shard routing allocation settings +type Allocation struct { + Enabled string `json:"enable"` +} diff --git a/collector/cluster_settings_test.go b/collector/cluster_settings_test.go index b2134318..d9f18f3d 100644 --- a/collector/cluster_settings_test.go +++ b/collector/cluster_settings_test.go @@ -14,112 +14,84 @@ package collector import ( - "context" "io" "net/http" "net/http/httptest" "net/url" "os" - "strings" "testing" "github.com/go-kit/log" - "github.com/prometheus/client_golang/prometheus" - "github.com/prometheus/client_golang/prometheus/testutil" ) -type wrapCollector struct { - c Collector -} - -func (w wrapCollector) Describe(ch chan<- *prometheus.Desc) { -} - -func (w wrapCollector) Collect(ch chan<- prometheus.Metric) { - w.c.Update(context.Background(), ch) -} - func TestClusterSettingsStats(t *testing.T) { // Testcases created using: // docker run -d -p 9200:9200 elasticsearch:VERSION-alpine // curl http://localhost:9200/_cluster/settings/?include_defaults=true + files := []string{"../fixtures/settings-5.4.2.json", "../fixtures/settings-merge-5.4.2.json"} + for _, filename := range files { + f, _ := os.Open(filename) + defer f.Close() + for hn, handler := range map[string]http.Handler{ + "plain": http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + io.Copy(w, f) + }), + } { + ts := httptest.NewServer(handler) + defer ts.Close() - tests := []struct { - name string - file string - want string - }{ - // MaxShardsPerNode is empty in older versions - { - name: "5.4.2", - file: "../fixtures/settings-5.4.2.json", - want: ` -# HELP elasticsearch_clustersettings_stats_shard_allocation_enabled Current mode of cluster wide shard routing allocation settings. -# TYPE elasticsearch_clustersettings_stats_shard_allocation_enabled gauge -elasticsearch_clustersettings_stats_shard_allocation_enabled 0 -`, - }, - { - name: "5.4.2-merge", - file: "../fixtures/settings-merge-5.4.2.json", - want: ` -# HELP elasticsearch_clustersettings_stats_shard_allocation_enabled Current mode of cluster wide shard routing allocation settings. -# TYPE elasticsearch_clustersettings_stats_shard_allocation_enabled gauge -elasticsearch_clustersettings_stats_shard_allocation_enabled 0 -`, - }, - { - name: "7.3.0", - file: "../fixtures/settings-7.3.0.json", - want: ` -# HELP elasticsearch_clustersettings_stats_max_shards_per_node Current maximum number of shards per node setting. -# TYPE elasticsearch_clustersettings_stats_max_shards_per_node gauge -elasticsearch_clustersettings_stats_max_shards_per_node 1000 -# HELP elasticsearch_clustersettings_stats_shard_allocation_enabled Current mode of cluster wide shard routing allocation settings. -# TYPE elasticsearch_clustersettings_stats_shard_allocation_enabled gauge -elasticsearch_clustersettings_stats_shard_allocation_enabled 0 -`, - }, - { - name: "7.17.5-persistent-clustermaxshardspernode", - file: "../fixtures/settings-persistent-clustermaxshardspernode-7.17.5.json", - want: ` -# HELP elasticsearch_clustersettings_stats_max_shards_per_node Current maximum number of shards per node setting. -# TYPE elasticsearch_clustersettings_stats_max_shards_per_node gauge -elasticsearch_clustersettings_stats_max_shards_per_node 1000 -# HELP elasticsearch_clustersettings_stats_shard_allocation_enabled Current mode of cluster wide shard routing allocation settings. -# TYPE elasticsearch_clustersettings_stats_shard_allocation_enabled gauge -elasticsearch_clustersettings_stats_shard_allocation_enabled 0 -`, - }, - } - - for _, tt := range tests { - t.Run(tt.name, func(t *testing.T) { - f, err := os.Open(tt.file) + u, err := url.Parse(ts.URL) if err != nil { - t.Fatal(err) + t.Fatalf("Failed to parse URL: %s", err) } - defer f.Close() + c := NewClusterSettings(log.NewNopLogger(), http.DefaultClient, u) + nsr, err := c.fetchAndDecodeClusterSettingsStats() + if err != nil { + t.Fatalf("Failed to fetch or decode cluster settings stats: %s", err) + } + t.Logf("[%s/%s] Cluster Settings Stats Response: %+v", hn, filename, nsr) + if nsr.Cluster.Routing.Allocation.Enabled != "ALL" { + t.Errorf("Wrong setting for cluster routing allocation enabled") + } + if nsr.Cluster.MaxShardsPerNode != nil { + t.Errorf("MaxShardsPerNode should be empty on older releases") + } + } + } +} - ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { +func TestClusterMaxShardsPerNode(t *testing.T) { + // settings-7.3.0.json testcase created using: + // docker run -d -p 9200:9200 elasticsearch:VERSION-alpine + // curl http://localhost:9200/_cluster/settings/?include_defaults=true + // settings-persistent-clustermaxshartspernode-7.17.json testcase created using: + // docker run -d -p 9200:9200 elasticsearch:VERSION + // curl -X PUT http://localhost:9200/_cluster/settings -H 'Content-Type: application/json' -d '{"persistent":{"cluster.max_shards_per_node":1000}}' + // curl http://localhost:9200/_cluster/settings/?include_defaults=true + files := []string{"../fixtures/settings-7.3.0.json", "../fixtures/settings-persistent-clustermaxshartspernode-7.17.5.json"} + for _, filename := range files { + f, _ := os.Open(filename) + defer f.Close() + for hn, handler := range map[string]http.Handler{ + "plain": http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { io.Copy(w, f) - })) + }), + } { + ts := httptest.NewServer(handler) defer ts.Close() - u, err := url.Parse(ts.URL) if err != nil { - t.Fatal(err) + t.Fatalf("Failed to parse URL: %s", err) } - - c, err := NewClusterSettings(log.NewNopLogger(), u, http.DefaultClient) + c := NewClusterSettings(log.NewNopLogger(), http.DefaultClient, u) + nsr, err := c.fetchAndDecodeClusterSettingsStats() if err != nil { - t.Fatal(err) + t.Fatalf("Failed to fetch or decode cluster settings stats: %s", err) } - - if err := testutil.CollectAndCompare(wrapCollector{c}, strings.NewReader(tt.want)); err != nil { - t.Fatal(err) + t.Logf("[%s/%s] Cluster Settings Stats Response: %+v", hn, filename, nsr) + if nsr.Cluster.MaxShardsPerNode != "1000" { + t.Errorf("Wrong value for MaxShardsPerNode") } - }) + } } } diff --git a/collector/collector.go b/collector/collector.go index c4fb53d7..c08a9994 100644 --- a/collector/collector.go +++ b/collector/collector.go @@ -33,8 +33,8 @@ const ( // Namespace defines the common namespace to be used by all metrics. namespace = "elasticsearch" - defaultEnabled = true - defaultDisabled = false + defaultEnabled = true + // defaultDisabled = false ) type factoryFunc func(logger log.Logger, u *url.URL, hc *http.Client) (Collector, error) diff --git a/fixtures/settings-persistent-clustermaxshardspernode-7.17.5.json b/fixtures/settings-persistent-clustermaxshartspernode-7.17.5.json similarity index 100% rename from fixtures/settings-persistent-clustermaxshardspernode-7.17.5.json rename to fixtures/settings-persistent-clustermaxshartspernode-7.17.5.json diff --git a/go.mod b/go.mod index d06d330f..4cdc7b0c 100644 --- a/go.mod +++ b/go.mod @@ -29,7 +29,6 @@ require ( github.com/aws/smithy-go v1.13.5 // indirect github.com/beorn7/perks v1.0.1 // indirect github.com/cespare/xxhash/v2 v2.1.2 // indirect - github.com/davecgh/go-spew v1.1.1 // indirect github.com/go-logfmt/logfmt v0.5.1 // indirect github.com/golang/protobuf v1.5.2 // indirect github.com/jpillora/backoff v1.0.0 // indirect diff --git a/main.go b/main.go index 3c7efd7a..5680950f 100644 --- a/main.go +++ b/main.go @@ -80,6 +80,9 @@ func main() { esExportIndexAliases = kingpin.Flag("es.aliases", "Export informational alias metrics."). Default("true").Bool() + esExportClusterSettings = kingpin.Flag("es.cluster_settings", + "Export stats for cluster settings."). + Default("false").Bool() esExportILM = kingpin.Flag("es.ilm", "Export index lifecycle politics for indices in the cluster."). Default("false").Bool() @@ -226,6 +229,10 @@ func main() { prometheus.MustRegister(collector.NewDataStream(logger, httpClient, esURL)) } + if *esExportClusterSettings { + prometheus.MustRegister(collector.NewClusterSettings(logger, httpClient, esURL)) + } + if *esExportIndicesSettings { prometheus.MustRegister(collector.NewIndicesSettings(logger, httpClient, esURL)) }