Skip to content

Commit

Permalink
Revert "Refactor cluster settings collector (prometheus-community#656)"
Browse files Browse the repository at this point in the history
This reverts commit 9bfb1bf.
  • Loading branch information
jaimeyh authored Jun 14, 2024
1 parent 9edfca7 commit 55d9dac
Show file tree
Hide file tree
Showing 8 changed files with 223 additions and 186 deletions.
3 changes: 0 additions & 3 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,6 +1,3 @@
## Unreleased
* [BREAKING] Rename --es.cluster_settings to --collector.clustersettings

## 1.5.0 / 2022-07-28

* [FEATURE] Add metrics collection for data stream statistics #592
Expand Down
217 changes: 118 additions & 99 deletions collector/cluster_settings.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,151 +14,170 @@
package collector

import (
"context"
"encoding/json"
"fmt"
"io/ioutil"
"net/http"
"net/url"
"path"
"strconv"

"github.com/go-kit/log"
"github.com/go-kit/log/level"
"github.com/imdario/mergo"
"github.com/prometheus/client_golang/prometheus"
)

func init() {
registerCollector("clustersettings", defaultDisabled, NewClusterSettings)
}

type ClusterSettingsCollector struct {
// ClusterSettings information struct
type ClusterSettings struct {
logger log.Logger
u *url.URL
hc *http.Client
}
client *http.Client
url *url.URL

func NewClusterSettings(logger log.Logger, u *url.URL, hc *http.Client) (Collector, error) {
return &ClusterSettingsCollector{
logger: logger,
u: u,
hc: hc,
}, nil
up prometheus.Gauge
shardAllocationEnabled prometheus.Gauge
maxShardsPerNode prometheus.Gauge
totalScrapes, jsonParseFailures prometheus.Counter
}

var clusterSettingsDesc = map[string]*prometheus.Desc{
"shardAllocationEnabled": prometheus.NewDesc(
prometheus.BuildFQName(namespace, "clustersettings_stats", "shard_allocation_enabled"),
"Current mode of cluster wide shard routing allocation settings.",
nil, nil,
),

"maxShardsPerNode": prometheus.NewDesc(
prometheus.BuildFQName(namespace, "clustersettings_stats", "max_shards_per_node"),
"Current maximum number of shards per node setting.",
nil, nil,
),
// NewClusterSettings defines Cluster Settings Prometheus metrics
func NewClusterSettings(logger log.Logger, client *http.Client, url *url.URL) *ClusterSettings {
return &ClusterSettings{
logger: logger,
client: client,
url: url,

up: prometheus.NewGauge(prometheus.GaugeOpts{
Name: prometheus.BuildFQName(namespace, "clustersettings_stats", "up"),
Help: "Was the last scrape of the Elasticsearch cluster settings endpoint successful.",
}),
totalScrapes: prometheus.NewCounter(prometheus.CounterOpts{
Name: prometheus.BuildFQName(namespace, "clustersettings_stats", "total_scrapes"),
Help: "Current total Elasticsearch cluster settings scrapes.",
}),
shardAllocationEnabled: prometheus.NewGauge(prometheus.GaugeOpts{
Name: prometheus.BuildFQName(namespace, "clustersettings_stats", "shard_allocation_enabled"),
Help: "Current mode of cluster wide shard routing allocation settings.",
}),
maxShardsPerNode: prometheus.NewGauge(prometheus.GaugeOpts{
Name: prometheus.BuildFQName(namespace, "clustersettings_stats", "max_shards_per_node"),
Help: "Current maximum number of shards per node setting.",
}),
jsonParseFailures: prometheus.NewCounter(prometheus.CounterOpts{
Name: prometheus.BuildFQName(namespace, "clustersettings_stats", "json_parse_failures"),
Help: "Number of errors while parsing JSON.",
}),
}
}

// clusterSettingsResponse is a representation of a Elasticsearch Cluster Settings
type clusterSettingsResponse struct {
Defaults clusterSettingsSection `json:"defaults"`
Persistent clusterSettingsSection `json:"persistent"`
Transient clusterSettingsSection `json:"transient"`
// Describe add Snapshots metrics descriptions
func (cs *ClusterSettings) Describe(ch chan<- *prometheus.Desc) {
ch <- cs.up.Desc()
ch <- cs.totalScrapes.Desc()
ch <- cs.shardAllocationEnabled.Desc()
ch <- cs.maxShardsPerNode.Desc()
ch <- cs.jsonParseFailures.Desc()
}

// clusterSettingsSection is a representation of a Elasticsearch Cluster Settings
type clusterSettingsSection struct {
Cluster clusterSettingsCluster `json:"cluster"`
}
func (cs *ClusterSettings) getAndParseURL(u *url.URL, data interface{}) error {
res, err := cs.client.Get(u.String())
if err != nil {
return fmt.Errorf("failed to get from %s://%s:%s%s: %s",
u.Scheme, u.Hostname(), u.Port(), u.Path, err)
}

// clusterSettingsCluster is a representation of a Elasticsearch clusterSettingsCluster Settings
type clusterSettingsCluster struct {
Routing clusterSettingsRouting `json:"routing"`
// This can be either a JSON object (which does not contain the value we are interested in) or a string
MaxShardsPerNode interface{} `json:"max_shards_per_node"`
}
defer func() {
err = res.Body.Close()
if err != nil {
_ = level.Warn(cs.logger).Log(
"msg", "failed to close http.Client",
"err", err,
)
}
}()

// clusterSettingsRouting is a representation of a Elasticsearch Cluster shard routing configuration
type clusterSettingsRouting struct {
Allocation clusterSettingsAllocation `json:"allocation"`
}
if res.StatusCode != http.StatusOK {
return fmt.Errorf("HTTP Request failed with code %d", res.StatusCode)
}

// clusterSettingsAllocation is a representation of a Elasticsearch Cluster shard routing allocation settings
type clusterSettingsAllocation struct {
Enabled string `json:"enable"`
}
bts, err := ioutil.ReadAll(res.Body)
if err != nil {
cs.jsonParseFailures.Inc()
return err
}

// ClusterSettings information struct
type ClusterSettings struct {
logger log.Logger
client *http.Client
url *url.URL
if err := json.Unmarshal(bts, data); err != nil {
cs.jsonParseFailures.Inc()
return err
}

maxShardsPerNode prometheus.Gauge
return nil
}

func (c *ClusterSettingsCollector) Update(ctx context.Context, ch chan<- prometheus.Metric) error {
u := c.u.ResolveReference(&url.URL{Path: "_cluster/settings"})
func (cs *ClusterSettings) fetchAndDecodeClusterSettingsStats() (ClusterSettingsResponse, error) {

u := *cs.url
u.Path = path.Join(u.Path, "/_cluster/settings")
q := u.Query()
q.Set("include_defaults", "true")
u.RawQuery = q.Encode()

req, err := http.NewRequestWithContext(ctx, "GET", u.String(), nil)
if err != nil {
return err
}

resp, err := c.hc.Do(req)
u.RawPath = q.Encode()
var csfr ClusterSettingsFullResponse
var csr ClusterSettingsResponse
err := cs.getAndParseURL(&u, &csfr)
if err != nil {
return err
return csr, err
}
defer resp.Body.Close()
b, err := ioutil.ReadAll(resp.Body)
err = mergo.Merge(&csr, csfr.Defaults, mergo.WithOverride)
if err != nil {
return err
return csr, err
}
var data clusterSettingsResponse
err = json.Unmarshal(b, &data)
err = mergo.Merge(&csr, csfr.Persistent, mergo.WithOverride)
if err != nil {
return err
return csr, err
}
err = mergo.Merge(&csr, csfr.Transient, mergo.WithOverride)

// Merge all settings into one struct
merged := data.Defaults
return csr, err
}

err = mergo.Merge(&merged, data.Persistent, mergo.WithOverride)
if err != nil {
return err
}
err = mergo.Merge(&merged, data.Transient, mergo.WithOverride)
if err != nil {
return err
}
// Collect gets cluster settings metric values
func (cs *ClusterSettings) Collect(ch chan<- prometheus.Metric) {

// Max shards per node
if maxShardsPerNodeString, ok := merged.Cluster.MaxShardsPerNode.(string); ok {
maxShardsPerNode, err := strconv.ParseInt(maxShardsPerNodeString, 10, 64)
if err == nil {
ch <- prometheus.MustNewConstMetric(
clusterSettingsDesc["maxShardsPerNode"],
prometheus.GaugeValue,
float64(maxShardsPerNode),
)
}
cs.totalScrapes.Inc()
defer func() {
ch <- cs.up
ch <- cs.totalScrapes
ch <- cs.jsonParseFailures
ch <- cs.shardAllocationEnabled
ch <- cs.maxShardsPerNode
}()

csr, err := cs.fetchAndDecodeClusterSettingsStats()
if err != nil {
cs.shardAllocationEnabled.Set(0)
cs.up.Set(0)
_ = level.Warn(cs.logger).Log(
"msg", "failed to fetch and decode cluster settings stats",
"err", err,
)
return
}
cs.up.Set(1)

// Shard allocation enabled
shardAllocationMap := map[string]int{
"all": 0,
"primaries": 1,
"new_primaries": 2,
"none": 3,
}

ch <- prometheus.MustNewConstMetric(
clusterSettingsDesc["shardAllocationEnabled"],
prometheus.GaugeValue,
float64(shardAllocationMap[merged.Cluster.Routing.Allocation.Enabled]),
)
cs.shardAllocationEnabled.Set(float64(shardAllocationMap[csr.Cluster.Routing.Allocation.Enabled]))

return nil
if maxShardsPerNodeString, ok := csr.Cluster.MaxShardsPerNode.(string); ok {
maxShardsPerNode, err := strconv.ParseInt(maxShardsPerNodeString, 10, 64)
if err == nil {
cs.maxShardsPerNode.Set(float64(maxShardsPerNode))
}
}
}
43 changes: 43 additions & 0 deletions collector/cluster_settings_response.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
// Copyright 2021 The Prometheus Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package collector

// ClusterSettingsFullResponse is a representation of a Elasticsearch Cluster Settings
type ClusterSettingsFullResponse struct {
Defaults ClusterSettingsResponse `json:"defaults"`
Persistent ClusterSettingsResponse `json:"persistent"`
Transient ClusterSettingsResponse `json:"transient"`
}

// ClusterSettingsResponse is a representation of a Elasticsearch Cluster Settings
type ClusterSettingsResponse struct {
Cluster Cluster `json:"cluster"`
}

// Cluster is a representation of a Elasticsearch Cluster Settings
type Cluster struct {
Routing Routing `json:"routing"`
// This can be either a JSON object (which does not contain the value we are interested in) or a string
MaxShardsPerNode interface{} `json:"max_shards_per_node"`
}

// Routing is a representation of a Elasticsearch Cluster shard routing configuration
type Routing struct {
Allocation Allocation `json:"allocation"`
}

// Allocation is a representation of a Elasticsearch Cluster shard routing allocation settings
type Allocation struct {
Enabled string `json:"enable"`
}
Loading

0 comments on commit 55d9dac

Please sign in to comment.