Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add metrics to auto scale based on indexing pressure #904

Merged
merged 2 commits into from
Jul 11, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
61 changes: 61 additions & 0 deletions collector/nodes.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,7 @@ var (
defaultRoleLabels = []string{"cluster", "host", "name"}
defaultThreadPoolLabels = append(defaultNodeLabels, "type")
defaultBreakerLabels = append(defaultNodeLabels, "breaker")
defaultIndexingPressureLabels = []string{"cluster", "host", "name", "indexing_pressure"}
defaultFilesystemDataLabels = append(defaultNodeLabels, "mount", "path")
defaultFilesystemIODeviceLabels = append(defaultNodeLabels, "device")
defaultCacheLabels = append(defaultNodeLabels, "cache")
Expand Down Expand Up @@ -150,6 +151,13 @@ type breakerMetric struct {
Labels func(cluster string, node NodeStatsNodeResponse, breaker string) []string
}

type indexingPressureMetric struct {
Type prometheus.ValueType
Desc *prometheus.Desc
Value func(indexingPressureStats NodeStatsIndexingPressureResponse) float64
Labels func(cluster string, node NodeStatsNodeResponse, indexingPressure string) []string
}

type threadPoolMetric struct {
Type prometheus.ValueType
Desc *prometheus.Desc
Expand Down Expand Up @@ -185,6 +193,7 @@ type Nodes struct {
nodeMetrics []*nodeMetric
gcCollectionMetrics []*gcCollectionMetric
breakerMetrics []*breakerMetric
indexingPressureMetrics []*indexingPressureMetric
threadPoolMetrics []*threadPoolMetric
filesystemDataMetrics []*filesystemDataMetric
filesystemIODeviceMetrics []*filesystemIODeviceMetric
Expand Down Expand Up @@ -1607,6 +1616,46 @@ func NewNodes(logger log.Logger, client *http.Client, url *url.URL, all bool, no
},
},
},
indexingPressureMetrics: []*indexingPressureMetric{
{
Type: prometheus.GaugeValue,
Desc: prometheus.NewDesc(
prometheus.BuildFQName(namespace, "indexing_pressure", "current_all_in_bytes"),
"Memory consumed, in bytes, by indexing requests in the coordinating, primary, or replica stage.",
defaultIndexingPressureLabels, nil,
),
Value: func(indexingPressureMem NodeStatsIndexingPressureResponse) float64 {
return float64(indexingPressureMem.Current.AllInBytes)
},
Labels: func(cluster string, node NodeStatsNodeResponse, indexingPressure string) []string {
return []string{
cluster,
node.Host,
node.Name,
indexingPressure,
}
},
},
{
Type: prometheus.GaugeValue,
Desc: prometheus.NewDesc(
prometheus.BuildFQName(namespace, "indexing_pressure", "limit_in_bytes"),
"Configured memory limit, in bytes, for the indexing requests",
defaultIndexingPressureLabels, nil,
),
Value: func(indexingPressureStats NodeStatsIndexingPressureResponse) float64 {
return float64(indexingPressureStats.LimitInBytes)
},
Labels: func(cluster string, node NodeStatsNodeResponse, indexingPressure string) []string {
return []string{
cluster,
node.Host,
node.Name,
indexingPressure,
}
},
},
},
threadPoolMetrics: []*threadPoolMetric{
{
Type: prometheus.CounterValue,
Expand Down Expand Up @@ -1919,6 +1968,18 @@ func (c *Nodes) Collect(ch chan<- prometheus.Metric) {
}
}

// Indexing Pressure stats
for indexingPressure, ipstats := range node.IndexingPressure {
for _, metric := range c.indexingPressureMetrics {
ch <- prometheus.MustNewConstMetric(
metric.Desc,
metric.Type,
metric.Value(ipstats),
metric.Labels(nodeStatsResp.ClusterName, node, indexingPressure)...,
)
}
}

// Thread Pool stats
for pool, pstats := range node.ThreadPool {
for _, metric := range c.threadPoolMetrics {
Expand Down
46 changes: 29 additions & 17 deletions collector/nodes_response.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,23 +23,24 @@ type nodeStatsResponse struct {

// NodeStatsNodeResponse defines node stats information structure for nodes
type NodeStatsNodeResponse struct {
Name string `json:"name"`
Host string `json:"host"`
Timestamp int64 `json:"timestamp"`
TransportAddress string `json:"transport_address"`
Hostname string `json:"hostname"`
Roles []string `json:"roles"`
Attributes map[string]string `json:"attributes"`
Indices NodeStatsIndicesResponse `json:"indices"`
OS NodeStatsOSResponse `json:"os"`
Network NodeStatsNetworkResponse `json:"network"`
FS NodeStatsFSResponse `json:"fs"`
ThreadPool map[string]NodeStatsThreadPoolPoolResponse `json:"thread_pool"`
JVM NodeStatsJVMResponse `json:"jvm"`
Breakers map[string]NodeStatsBreakersResponse `json:"breakers"`
HTTP map[string]interface{} `json:"http"`
Transport NodeStatsTransportResponse `json:"transport"`
Process NodeStatsProcessResponse `json:"process"`
Name string `json:"name"`
Host string `json:"host"`
Timestamp int64 `json:"timestamp"`
TransportAddress string `json:"transport_address"`
Hostname string `json:"hostname"`
Roles []string `json:"roles"`
Attributes map[string]string `json:"attributes"`
Indices NodeStatsIndicesResponse `json:"indices"`
OS NodeStatsOSResponse `json:"os"`
Network NodeStatsNetworkResponse `json:"network"`
FS NodeStatsFSResponse `json:"fs"`
ThreadPool map[string]NodeStatsThreadPoolPoolResponse `json:"thread_pool"`
JVM NodeStatsJVMResponse `json:"jvm"`
Breakers map[string]NodeStatsBreakersResponse `json:"breakers"`
HTTP map[string]interface{} `json:"http"`
Transport NodeStatsTransportResponse `json:"transport"`
Process NodeStatsProcessResponse `json:"process"`
IndexingPressure map[string]NodeStatsIndexingPressureResponse `json:"indexing_pressure"`
}

// NodeStatsBreakersResponse is a representation of a statistics about the field data circuit breaker
Expand All @@ -50,6 +51,17 @@ type NodeStatsBreakersResponse struct {
Tripped int64 `json:"tripped"`
}

// NodeStatsIndexingPressureResponse is a representation of a elasticsearch indexing pressure
type NodeStatsIndexingPressureResponse struct {
Current NodeStatsIndexingPressureCurrentResponse `json:"current"`
LimitInBytes int64 `json:"limit_in_bytes"`
}

// NodeStatsIndexingPressureMemoryCurrentResponse is a representation of a elasticsearch indexing pressure current memory usage
type NodeStatsIndexingPressureCurrentResponse struct {
AllInBytes int64 `json:"all_in_bytes"`
}

// NodeStatsJVMResponse is a representation of a JVM stats, memory pool information, garbage collection, buffer pools, number of loaded/unloaded classes
type NodeStatsJVMResponse struct {
BufferPools map[string]NodeStatsJVMBufferPoolResponse `json:"buffer_pools"`
Expand Down
Loading