From 40b06f9425eeba5b2443f72147ad99c315d758ea Mon Sep 17 00:00:00 2001 From: emilandresentac Date: Mon, 24 Jun 2024 09:50:14 -0700 Subject: [PATCH 1/2] Add metrics indexing_pressure.memory.limit_in_bytes and indexing_pressure.memory.current.current.all_in_bytes to allow auto-scaling based on how close the cluster nodes are to dropping indexing requests due to the indxing request memory buffer reaching capacity. Signed-off-by: emilandresentac --- collector/nodes.go | 51 +++++++++++++++++++++++++++++++++++++ collector/nodes_response.go | 46 ++++++++++++++++++++------------- 2 files changed, 80 insertions(+), 17 deletions(-) diff --git a/collector/nodes.go b/collector/nodes.go index 364c541d..ff770038 100644 --- a/collector/nodes.go +++ b/collector/nodes.go @@ -96,6 +96,7 @@ var ( defaultRoleLabels = []string{"cluster", "host", "name"} defaultThreadPoolLabels = append(defaultNodeLabels, "type") defaultBreakerLabels = append(defaultNodeLabels, "breaker") + defaultIndexingPressureLabels = append(defaultNodeLabels, "indexing_pressure") defaultFilesystemDataLabels = append(defaultNodeLabels, "mount", "path") defaultFilesystemIODeviceLabels = append(defaultNodeLabels, "device") defaultCacheLabels = append(defaultNodeLabels, "cache") @@ -150,6 +151,13 @@ type breakerMetric struct { Labels func(cluster string, node NodeStatsNodeResponse, breaker string) []string } +type indexingPressureMetric struct { + Type prometheus.ValueType + Desc *prometheus.Desc + Value func(indexingPressureStats NodeStatsIndexingPressureResponse) float64 + Labels func(cluster string, node NodeStatsNodeResponse, indexingPressure string) []string +} + type threadPoolMetric struct { Type prometheus.ValueType Desc *prometheus.Desc @@ -185,6 +193,7 @@ type Nodes struct { nodeMetrics []*nodeMetric gcCollectionMetrics []*gcCollectionMetric breakerMetrics []*breakerMetric + indexingPressureMetrics []*indexingPressureMetric threadPoolMetrics []*threadPoolMetric filesystemDataMetrics []*filesystemDataMetric filesystemIODeviceMetrics []*filesystemIODeviceMetric @@ -1607,6 +1616,36 @@ func NewNodes(logger log.Logger, client *http.Client, url *url.URL, all bool, no }, }, }, + indexingPressureMetrics: []*indexingPressureMetric{ + { + Type: prometheus.GaugeValue, + Desc: prometheus.NewDesc( + prometheus.BuildFQName(namespace, "indexing_pressure", "current_all_in_bytes"), + "Memory consumed, in bytes, by indexing requests in the coordinating, primary, or replica stage.", + defaultIndexingPressureLabels, nil, + ), + Value: func(indexingPressureMem NodeStatsIndexingPressureResponse) float64 { + return float64(indexingPressureMem.Current.AllInBytes) + }, + Labels: func(cluster string, node NodeStatsNodeResponse, indexingPressure string) []string { + return append(defaultNodeLabelValues(cluster, node), indexingPressure) + }, + }, + { + Type: prometheus.GaugeValue, + Desc: prometheus.NewDesc( + prometheus.BuildFQName(namespace, "indexing_pressure", "limit_in_bytes"), + "Configured memory limit, in bytes, for the indexing requests", + defaultIndexingPressureLabels, nil, + ), + Value: func(indexingPressureStats NodeStatsIndexingPressureResponse) float64 { + return float64(indexingPressureStats.LimitInBytes) + }, + Labels: func(cluster string, node NodeStatsNodeResponse, indexingPressure string) []string { + return append(defaultNodeLabelValues(cluster, node), indexingPressure) + }, + }, + }, threadPoolMetrics: []*threadPoolMetric{ { Type: prometheus.CounterValue, @@ -1919,6 +1958,18 @@ func (c *Nodes) Collect(ch chan<- prometheus.Metric) { } } + // Indexing Pressure stats + for indexingPressure, ipstats := range node.IndexingPressure { + for _, metric := range c.indexingPressureMetrics { + ch <- prometheus.MustNewConstMetric( + metric.Desc, + metric.Type, + metric.Value(ipstats), + metric.Labels(nodeStatsResp.ClusterName, node, indexingPressure)..., + ) + } + } + // Thread Pool stats for pool, pstats := range node.ThreadPool { for _, metric := range c.threadPoolMetrics { diff --git a/collector/nodes_response.go b/collector/nodes_response.go index 4985add0..5b756011 100644 --- a/collector/nodes_response.go +++ b/collector/nodes_response.go @@ -23,23 +23,24 @@ type nodeStatsResponse struct { // NodeStatsNodeResponse defines node stats information structure for nodes type NodeStatsNodeResponse struct { - Name string `json:"name"` - Host string `json:"host"` - Timestamp int64 `json:"timestamp"` - TransportAddress string `json:"transport_address"` - Hostname string `json:"hostname"` - Roles []string `json:"roles"` - Attributes map[string]string `json:"attributes"` - Indices NodeStatsIndicesResponse `json:"indices"` - OS NodeStatsOSResponse `json:"os"` - Network NodeStatsNetworkResponse `json:"network"` - FS NodeStatsFSResponse `json:"fs"` - ThreadPool map[string]NodeStatsThreadPoolPoolResponse `json:"thread_pool"` - JVM NodeStatsJVMResponse `json:"jvm"` - Breakers map[string]NodeStatsBreakersResponse `json:"breakers"` - HTTP map[string]interface{} `json:"http"` - Transport NodeStatsTransportResponse `json:"transport"` - Process NodeStatsProcessResponse `json:"process"` + Name string `json:"name"` + Host string `json:"host"` + Timestamp int64 `json:"timestamp"` + TransportAddress string `json:"transport_address"` + Hostname string `json:"hostname"` + Roles []string `json:"roles"` + Attributes map[string]string `json:"attributes"` + Indices NodeStatsIndicesResponse `json:"indices"` + OS NodeStatsOSResponse `json:"os"` + Network NodeStatsNetworkResponse `json:"network"` + FS NodeStatsFSResponse `json:"fs"` + ThreadPool map[string]NodeStatsThreadPoolPoolResponse `json:"thread_pool"` + JVM NodeStatsJVMResponse `json:"jvm"` + Breakers map[string]NodeStatsBreakersResponse `json:"breakers"` + HTTP map[string]interface{} `json:"http"` + Transport NodeStatsTransportResponse `json:"transport"` + Process NodeStatsProcessResponse `json:"process"` + IndexingPressure map[string]NodeStatsIndexingPressureResponse `json:"indexing_pressure"` } // NodeStatsBreakersResponse is a representation of a statistics about the field data circuit breaker @@ -50,6 +51,17 @@ type NodeStatsBreakersResponse struct { Tripped int64 `json:"tripped"` } +// NodeStatsIndexingPressureResponse is a representation of a elasticsearch indexing pressure +type NodeStatsIndexingPressureResponse struct { + Current NodeStatsIndexingPressureCurrentResponse `json:"current"` + LimitInBytes int64 `json:"limit_in_bytes"` +} + +// NodeStatsIndexingPressureMemoryCurrentResponse is a representation of a elasticsearch indexing pressure current memory usage +type NodeStatsIndexingPressureCurrentResponse struct { + AllInBytes int64 `json:"all_in_bytes"` +} + // NodeStatsJVMResponse is a representation of a JVM stats, memory pool information, garbage collection, buffer pools, number of loaded/unloaded classes type NodeStatsJVMResponse struct { BufferPools map[string]NodeStatsJVMBufferPoolResponse `json:"buffer_pools"` From d607c9faa90e20a12e413f38e8cc32dda892d6ae Mon Sep 17 00:00:00 2001 From: emilandresentac Date: Sat, 6 Jul 2024 13:30:38 -0700 Subject: [PATCH 2/2] Reduce labels per metric for indexing pressure metrics to cluster, node, and name to save on storage space. Signed-off-by: emilandresentac --- collector/nodes.go | 16 +++++++++++++--- 1 file changed, 13 insertions(+), 3 deletions(-) diff --git a/collector/nodes.go b/collector/nodes.go index ff770038..e99e8d5c 100644 --- a/collector/nodes.go +++ b/collector/nodes.go @@ -96,7 +96,7 @@ var ( defaultRoleLabels = []string{"cluster", "host", "name"} defaultThreadPoolLabels = append(defaultNodeLabels, "type") defaultBreakerLabels = append(defaultNodeLabels, "breaker") - defaultIndexingPressureLabels = append(defaultNodeLabels, "indexing_pressure") + defaultIndexingPressureLabels = []string{"cluster", "host", "name", "indexing_pressure"} defaultFilesystemDataLabels = append(defaultNodeLabels, "mount", "path") defaultFilesystemIODeviceLabels = append(defaultNodeLabels, "device") defaultCacheLabels = append(defaultNodeLabels, "cache") @@ -1628,7 +1628,12 @@ func NewNodes(logger log.Logger, client *http.Client, url *url.URL, all bool, no return float64(indexingPressureMem.Current.AllInBytes) }, Labels: func(cluster string, node NodeStatsNodeResponse, indexingPressure string) []string { - return append(defaultNodeLabelValues(cluster, node), indexingPressure) + return []string{ + cluster, + node.Host, + node.Name, + indexingPressure, + } }, }, { @@ -1642,7 +1647,12 @@ func NewNodes(logger log.Logger, client *http.Client, url *url.URL, all bool, no return float64(indexingPressureStats.LimitInBytes) }, Labels: func(cluster string, node NodeStatsNodeResponse, indexingPressure string) []string { - return append(defaultNodeLabelValues(cluster, node), indexingPressure) + return []string{ + cluster, + node.Host, + node.Name, + indexingPressure, + } }, }, },