-
Notifications
You must be signed in to change notification settings - Fork 794
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
This commit adds simple aggregation of Elasticsearch Tasks API. There are 4 new metrics; though 3 are just bookkeeping. elasticsearch_task_stats_action_total is a gague reporting the total number of tasks running for a given action. Because there are no stats endpoints available for this, this change introduces an aggregation step to group the number of tasks by action name. This metric is useful for ensuring long running actions of a specific kind stay within a specific limit. Of particular use to me is the action: 'indices:data/write/delete/byquery'. In my usecase, our ES access patterns mean we have a predefined limit of these actions running on the cluster. This change also adds two new CLI flags to manage the collection of tasks API: --es.tasks (to enable task collection) --es.tasks.actions (to filter tasks by action param) Issue #525 proposed addition of collection of these tasks.
- Loading branch information
Showing
4 changed files
with
287 additions
and
0 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,174 @@ | ||
// Copyright 2023 The Prometheus Authors | ||
// Licensed under the Apache License, Version 2.0 (the "License"); | ||
// you may not use this file except in compliance with the License. | ||
// You may obtain a copy of the License at | ||
// | ||
// http://www.apache.org/licenses/LICENSE-2.0 | ||
// | ||
// Unless required by applicable law or agreed to in writing, software | ||
// distributed under the License is distributed on an "AS IS" BASIS, | ||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
// See the License for the specific language governing permissions and | ||
// limitations under the License. | ||
|
||
package collector | ||
|
||
import ( | ||
"encoding/json" | ||
"fmt" | ||
"io" | ||
"net/http" | ||
"net/url" | ||
"path" | ||
|
||
"github.com/go-kit/log" | ||
"github.com/go-kit/log/level" | ||
"github.com/prometheus/client_golang/prometheus" | ||
) | ||
|
||
type taskByAction struct { | ||
Type prometheus.ValueType | ||
Desc *prometheus.Desc | ||
Value func(action string, count int64) float64 | ||
Labels func(action string, count int64) []string | ||
} | ||
|
||
var ( | ||
taskLabels = []string{"cluster", "action"} | ||
) | ||
|
||
// Task Information Struct | ||
type Task struct { | ||
logger log.Logger | ||
client *http.Client | ||
url *url.URL | ||
actions string | ||
|
||
up prometheus.Gauge | ||
totalScrapes, jsonParseFailures prometheus.Counter | ||
|
||
byActionMetrics []*taskByAction | ||
} | ||
|
||
// NewTask defines Task Prometheus metrics | ||
func NewTask(logger log.Logger, client *http.Client, url *url.URL, actions string) *Task { | ||
return &Task{ | ||
logger: logger, | ||
client: client, | ||
url: url, | ||
actions: actions, | ||
|
||
up: prometheus.NewGauge(prometheus.GaugeOpts{ | ||
Name: prometheus.BuildFQName(namespace, "task_stats", "up"), | ||
Help: "Was the last scrape of the ElasticSearch Task endpoint successful.", | ||
}), | ||
totalScrapes: prometheus.NewCounter(prometheus.CounterOpts{ | ||
Name: prometheus.BuildFQName(namespace, "task_stats", "total_scrapes"), | ||
Help: "Current total Elasticsearch snapshots scrapes.", | ||
}), | ||
jsonParseFailures: prometheus.NewCounter(prometheus.CounterOpts{ | ||
Name: prometheus.BuildFQName(namespace, "task_stats", "json_parse_failures"), | ||
Help: "Number of errors while parsing JSON.", | ||
}), | ||
byActionMetrics: []*taskByAction{ | ||
{ | ||
Type: prometheus.GaugeValue, | ||
Desc: prometheus.NewDesc( | ||
prometheus.BuildFQName(namespace, "task_stats", "action_total"), | ||
"Number of tasks of a certain action", | ||
[]string{"action"}, nil, | ||
), | ||
Value: func(action string, count int64) float64 { | ||
return float64(count) | ||
}, | ||
Labels: func(action string, count int64) []string { | ||
return []string{action} | ||
}, | ||
}, | ||
}, | ||
} | ||
} | ||
|
||
// Describe adds Task metrics descriptions | ||
func (t *Task) Describe(ch chan<- *prometheus.Desc) { | ||
for _, metric := range t.byActionMetrics { | ||
ch <- metric.Desc | ||
} | ||
|
||
ch <- t.up.Desc() | ||
ch <- t.totalScrapes.Desc() | ||
ch <- t.jsonParseFailures.Desc() | ||
} | ||
|
||
func (t *Task) fetchAndDecodeAndAggregateTaskStats() (*AggregatedTaskStats, error) { | ||
u := *t.url | ||
u.Path = path.Join(u.Path, "/_tasks") | ||
u.RawQuery = "group_by=none&actions=" + t.actions | ||
res, err := t.client.Get(u.String()) | ||
if err != nil { | ||
return nil, fmt.Errorf("failed to get data stream stats health from %s://%s:%s%s: %s", | ||
u.Scheme, u.Hostname(), u.Port(), u.Path, err) | ||
} | ||
|
||
defer func() { | ||
err = res.Body.Close() | ||
if err != nil { | ||
level.Warn(t.logger).Log( | ||
"msg", "failed to close http.Client", | ||
"err", err, | ||
) | ||
} | ||
}() | ||
|
||
if res.StatusCode != http.StatusOK { | ||
return nil, fmt.Errorf("HTTP Request to %v failed with code %d", u.String(), res.StatusCode) | ||
} | ||
|
||
bts, err := io.ReadAll(res.Body) | ||
if err != nil { | ||
t.jsonParseFailures.Inc() | ||
return nil, err | ||
} | ||
|
||
var tr TasksResponse | ||
if err := json.Unmarshal(bts, &tr); err != nil { | ||
t.jsonParseFailures.Inc() | ||
return nil, err | ||
} | ||
|
||
stats := AggregateTasks(tr) | ||
return stats, nil | ||
} | ||
|
||
// Collect gets Task metric values | ||
func (ds *Task) Collect(ch chan<- prometheus.Metric) { | ||
ds.totalScrapes.Inc() | ||
defer func() { | ||
ch <- ds.up | ||
ch <- ds.totalScrapes | ||
ch <- ds.jsonParseFailures | ||
}() | ||
|
||
stats, err := ds.fetchAndDecodeAndAggregateTaskStats() | ||
if err != nil { | ||
ds.up.Set(0) | ||
level.Warn(ds.logger).Log( | ||
"msg", "failed to fetch and decode task stats", | ||
"err", err, | ||
) | ||
return | ||
} | ||
|
||
for action, count := range stats.CountByAction { | ||
for _, metric := range ds.byActionMetrics { | ||
ch <- prometheus.MustNewConstMetric( | ||
metric.Desc, | ||
metric.Type, | ||
metric.Value(action, count), | ||
metric.Labels(action, count)..., | ||
) | ||
} | ||
} | ||
|
||
ds.up.Set(1) | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,39 @@ | ||
// Copyright 2022 The Prometheus Authors | ||
// Licensed under the Apache License, Version 2.0 (the "License"); | ||
// you may not use this file except in compliance with the License. | ||
// You may obtain a copy of the License at | ||
// | ||
// http://www.apache.org/licenses/LICENSE-2.0 | ||
// | ||
// Unless required by applicable law or agreed to in writing, software | ||
// distributed under the License is distributed on an "AS IS" BASIS, | ||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
// See the License for the specific language governing permissions and | ||
// limitations under the License. | ||
|
||
package collector | ||
|
||
// TasksResponse is a representation of the Task management API. | ||
type TasksResponse struct { | ||
Tasks []TaskResponse `json:"tasks"` | ||
} | ||
|
||
// TaskResponse is a representation of the individual task item returned by task API endpoint. | ||
// | ||
// We only parse a very limited amount of this API for use in aggregation. | ||
type TaskResponse struct { | ||
Action string `json:"action"` | ||
} | ||
|
||
type AggregatedTaskStats struct { | ||
CountByAction map[string]int64 | ||
} | ||
|
||
func AggregateTasks(t TasksResponse) *AggregatedTaskStats { | ||
actions := map[string]int64{} | ||
for _, task := range t.Tasks { | ||
actions[task.Action] += 1 | ||
} | ||
agg := &AggregatedTaskStats{CountByAction: actions} | ||
return agg | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,64 @@ | ||
package collector | ||
|
||
import ( | ||
"fmt" | ||
"net/http" | ||
"net/http/httptest" | ||
"net/url" | ||
"testing" | ||
|
||
"github.com/go-kit/log" | ||
) | ||
|
||
func TestTasks(t *testing.T) { | ||
// Test data was collected by running the following: | ||
// docker run -d --name elasticsearch -p 9200:9200 -p 9300:9300 -e "discovery.type=single-node" elasticsearch:7.17.11 | ||
// sleep 15 | ||
// # start some busy work | ||
// for i in $(seq 1 1000); do \ | ||
// curl -o /dev/null -s -X POST "localhost:9200/a1/_doc" -H 'Content-Type: application/json' \ | ||
// -d'{"abc": "'$i'"}'; done & | ||
// curl -X POST "localhost:9200/a1/_delete_by_query?requests_per_second=1&wait_for_completion=false" \ | ||
// -H 'Content-Type: application/json' -d'{"query": {"match_all": {}}} | ||
// # try and collect a good sample | ||
// curl -X GET 'localhost:9200/_tasks?group_by=none&actions=indices:*' | ||
// docker rm elasticsearch | ||
tcs := map[string]string{ | ||
"7.17": `{"tasks":[{"node":"NVe9ksxcSu6AJTKlIfI24A","id":17223,"type":"transport","action":"indices:data/write/delete/byquery","start_time_in_millis":1695214684290,"running_time_in_nanos":8003510219,"cancellable":true,"cancelled":false,"headers":{}},{"node":"NVe9ksxcSu6AJTKlIfI24A","id":20890,"type":"transport","action":"indices:data/write/index","start_time_in_millis":1695214692292,"running_time_in_nanos":1611966,"cancellable":false,"headers":{}},{"node":"NVe9ksxcSu6AJTKlIfI24A","id":20891,"type":"transport","action":"indices:data/write/bulk[s]","start_time_in_millis":1695214692292,"running_time_in_nanos":1467298,"cancellable":false,"parent_task_id":"NVe9ksxcSu6AJTKlIfI24A:20890","headers":{}},{"node":"NVe9ksxcSu6AJTKlIfI24A","id":20892,"type":"direct","action":"indices:data/write/bulk[s][p]","start_time_in_millis":1695214692292,"running_time_in_nanos":1437170,"cancellable":false,"parent_task_id":"NVe9ksxcSu6AJTKlIfI24A:20891","headers":{}}]}`, | ||
} | ||
for ver, out := range tcs { | ||
ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, _ *http.Request) { | ||
fmt.Fprintln(w, out) | ||
})) | ||
defer ts.Close() | ||
|
||
u, err := url.Parse(ts.URL) | ||
if err != nil { | ||
t.Fatalf("Failed to parse URL: %s", err) | ||
} | ||
|
||
task := NewTask(log.NewNopLogger(), http.DefaultClient, u, "indices:*") | ||
stats, err := task.fetchAndDecodeAndAggregateTaskStats() | ||
if err != nil { | ||
t.Fatalf("Failed to fetch or decode data stream stats: %s", err) | ||
} | ||
t.Logf("[%s] Task Response: %+v", ver, stats) | ||
|
||
// validate actions aggregations | ||
if len(stats.CountByAction) != 4 { | ||
t.Fatal("expected to get 4 tasks") | ||
} | ||
if stats.CountByAction["indices:data/write/index"] != 1 { | ||
t.Fatal("excpected action indices:data/write/delete/byquery to have count 1") | ||
} | ||
if stats.CountByAction["indices:data/write/bulk[s]"] != 1 { | ||
t.Fatal("excpected action indices:data/write/bulk[s] to have count 1") | ||
} | ||
if stats.CountByAction["indices:data/write/bulk[s][p]"] != 1 { | ||
t.Fatal("excpected action indices:data/write/bulk[s][p] to have count 1") | ||
} | ||
if stats.CountByAction["indices:data/write/delete/byquery"] != 1 { | ||
t.Fatal("excpected action indices:data/write/delete/byquery to have count 1") | ||
} | ||
} | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters