From f5c08d96b9349cc3e75d2e86fa9026bc6c8e47d9 Mon Sep 17 00:00:00 2001 From: Jaime Yera Hidalgo <106755265+jaimeyh@users.noreply.github.com> Date: Fri, 14 Jun 2024 11:47:24 +0200 Subject: [PATCH] Revert "collector: add tasks API collection (#778)" This reverts commit 7a98b17d2147985d28ed655209623bfe56dd71f3. --- collector/cluster_info.go | 2 +- collector/cluster_settings_test.go | 13 +++ collector/collector_test.go | 36 ------- collector/nodes_test.go | 2 +- collector/tasks.go | 143 ---------------------------- collector/tasks_test.go | 78 --------------- pkg/clusterinfo/clusterinfo_test.go | 2 +- 7 files changed, 16 insertions(+), 260 deletions(-) delete mode 100644 collector/collector_test.go delete mode 100644 collector/tasks.go delete mode 100644 collector/tasks_test.go diff --git a/collector/cluster_info.go b/collector/cluster_info.go index 1714986a..d42ca57c 100644 --- a/collector/cluster_info.go +++ b/collector/cluster_info.go @@ -77,7 +77,7 @@ type VersionInfo struct { LuceneVersion semver.Version `json:"lucene_version"` } -func (c *ClusterInfoCollector) Update(_ context.Context, ch chan<- prometheus.Metric) error { +func (c *ClusterInfoCollector) Update(ctx context.Context, ch chan<- prometheus.Metric) error { resp, err := c.hc.Get(c.u.String()) if err != nil { return err diff --git a/collector/cluster_settings_test.go b/collector/cluster_settings_test.go index b4b98a09..dac21857 100644 --- a/collector/cluster_settings_test.go +++ b/collector/cluster_settings_test.go @@ -14,6 +14,7 @@ package collector import ( + "context" "io" "net/http" "net/http/httptest" @@ -23,9 +24,21 @@ import ( "testing" "github.com/go-kit/log" + "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/client_golang/prometheus/testutil" ) +type wrapCollector struct { + c Collector +} + +func (w wrapCollector) Describe(ch chan<- *prometheus.Desc) { +} + +func (w wrapCollector) Collect(ch chan<- prometheus.Metric) { + w.c.Update(context.Background(), ch) +} + func TestClusterSettingsStats(t *testing.T) { // Testcases created using: // docker run -d -p 9200:9200 elasticsearch:VERSION-alpine diff --git a/collector/collector_test.go b/collector/collector_test.go deleted file mode 100644 index 80c7fa5d..00000000 --- a/collector/collector_test.go +++ /dev/null @@ -1,36 +0,0 @@ -// Copyright 2023 The Prometheus Authors -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package collector - -import ( - "context" - - "github.com/prometheus/client_golang/prometheus" -) - -// wrapCollector is a util to let you test your Collector implementation. -// -// Use this with prometheus/client_golang/prometheus/testutil to test metric output, for example: -// -// testutil.CollectAndCompare(wrapCollector{c}, strings.NewReader(want)) -type wrapCollector struct { - c Collector -} - -func (w wrapCollector) Describe(_ chan<- *prometheus.Desc) { -} - -func (w wrapCollector) Collect(ch chan<- prometheus.Metric) { - w.c.Update(context.Background(), ch) -} diff --git a/collector/nodes_test.go b/collector/nodes_test.go index 8f9468db..3b4e4a5a 100644 --- a/collector/nodes_test.go +++ b/collector/nodes_test.go @@ -138,7 +138,7 @@ type basicAuth struct { Next http.Handler } -func (h *basicAuth) checkAuth(_ http.ResponseWriter, r *http.Request) bool { +func (h *basicAuth) checkAuth(w http.ResponseWriter, r *http.Request) bool { s := strings.SplitN(r.Header.Get("Authorization"), " ", 2) if len(s) != 2 { return false diff --git a/collector/tasks.go b/collector/tasks.go deleted file mode 100644 index e171b67c..00000000 --- a/collector/tasks.go +++ /dev/null @@ -1,143 +0,0 @@ -// Copyright 2023 The Prometheus Authors -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package collector - -import ( - "context" - "encoding/json" - "fmt" - "io" - "net/http" - "net/url" - - "github.com/alecthomas/kingpin/v2" - "github.com/go-kit/log" - "github.com/go-kit/log/level" - "github.com/prometheus/client_golang/prometheus" -) - -// filterByTask global required because collector interface doesn't expose any way to take -// constructor args. -var actionFilter string - -var taskActionDesc = prometheus.NewDesc( - prometheus.BuildFQName(namespace, "task_stats", "action"), - "Number of tasks of a certain action", - []string{"action"}, nil) - -func init() { - kingpin.Flag("tasks.actions", - "Filter on task actions. Used in same way as Task API actions param"). - Default("indices:*").StringVar(&actionFilter) - registerCollector("tasks", defaultDisabled, NewTaskCollector) -} - -// Task Information Struct -type TaskCollector struct { - logger log.Logger - hc *http.Client - u *url.URL -} - -// NewTaskCollector defines Task Prometheus metrics -func NewTaskCollector(logger log.Logger, u *url.URL, hc *http.Client) (Collector, error) { - level.Info(logger).Log("msg", "task collector created", - "actionFilter", actionFilter, - ) - - return &TaskCollector{ - logger: logger, - hc: hc, - u: u, - }, nil -} - -func (t *TaskCollector) Update(ctx context.Context, ch chan<- prometheus.Metric) error { - tasks, err := t.fetchTasks(ctx) - if err != nil { - return fmt.Errorf("failed to fetch and decode task stats: %w", err) - } - - stats := AggregateTasks(tasks) - for action, count := range stats.CountByAction { - ch <- prometheus.MustNewConstMetric( - taskActionDesc, - prometheus.GaugeValue, - float64(count), - action, - ) - } - return nil -} - -func (t *TaskCollector) fetchTasks(_ context.Context) (tasksResponse, error) { - u := t.u.ResolveReference(&url.URL{Path: "_tasks"}) - q := u.Query() - q.Set("group_by", "none") - q.Set("actions", actionFilter) - u.RawQuery = q.Encode() - - var tr tasksResponse - res, err := t.hc.Get(u.String()) - if err != nil { - return tr, fmt.Errorf("failed to get data stream stats health from %s://%s:%s%s: %s", - u.Scheme, u.Hostname(), u.Port(), u.Path, err) - } - - defer func() { - err = res.Body.Close() - if err != nil { - level.Warn(t.logger).Log( - "msg", "failed to close http.Client", - "err", err, - ) - } - }() - - if res.StatusCode != http.StatusOK { - return tr, fmt.Errorf("HTTP Request to %v failed with code %d", u.String(), res.StatusCode) - } - - bts, err := io.ReadAll(res.Body) - if err != nil { - return tr, err - } - - err = json.Unmarshal(bts, &tr) - return tr, err -} - -// tasksResponse is a representation of the Task management API. -type tasksResponse struct { - Tasks []taskResponse `json:"tasks"` -} - -// taskResponse is a representation of the individual task item returned by task API endpoint. -// -// We only parse a very limited amount of this API for use in aggregation. -type taskResponse struct { - Action string `json:"action"` -} - -type aggregatedTaskStats struct { - CountByAction map[string]int64 -} - -func AggregateTasks(t tasksResponse) aggregatedTaskStats { - actions := map[string]int64{} - for _, task := range t.Tasks { - actions[task.Action]++ - } - return aggregatedTaskStats{CountByAction: actions} -} diff --git a/collector/tasks_test.go b/collector/tasks_test.go deleted file mode 100644 index d5da114f..00000000 --- a/collector/tasks_test.go +++ /dev/null @@ -1,78 +0,0 @@ -// Copyright 2023 The Prometheus Authors -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package collector - -import ( - "fmt" - "net/http" - "net/http/httptest" - "net/url" - "strings" - "testing" - - "github.com/go-kit/log" - "github.com/prometheus/client_golang/prometheus/testutil" -) - -func TestTasks(t *testing.T) { - // Test data was collected by running the following: - // # create container - // docker run -d --name elasticsearch -p 9200:9200 -p 9300:9300 -e "discovery.type=single-node" elasticsearch:7.17.11 - // sleep 15 - // # start some busy work in background - // for i in $(seq 1 500) - // do - // curl -o /dev/null -sX POST "localhost:9200/a1/_doc" -H 'Content-Type: application/json' -d'{"a1": "'"$i"'"}' - // sleep .01 - // curl -o /dev/null -sX POST "localhost:9200/a1/_doc" -H 'Content-Type: application/json' -d'{"a2": "'"$i"'"}' - // sleep .01 - // curl -o /dev/null -sX POST "localhost:9200/a1/_doc" -H 'Content-Type: application/json' -d'{"a3": "'"$i"'"}' - // sleep .01 - // done & - // # try and collect a good sample - // curl -X GET 'localhost:9200/_tasks?group_by=none&actions=indices:*' - // # cleanup - // docker rm --force elasticsearch - tcs := map[string]string{ - "7.17": `{"tasks":[{"node":"9lWCm1y_QkujaAg75bVx7A","id":70,"type":"transport","action":"indices:admin/index_template/put","start_time_in_millis":1695900464655,"running_time_in_nanos":308640039,"cancellable":false,"headers":{}},{"node":"9lWCm1y_QkujaAg75bVx7A","id":73,"type":"transport","action":"indices:admin/index_template/put","start_time_in_millis":1695900464683,"running_time_in_nanos":280672000,"cancellable":false,"headers":{}},{"node":"9lWCm1y_QkujaAg75bVx7A","id":76,"type":"transport","action":"indices:admin/index_template/put","start_time_in_millis":1695900464711,"running_time_in_nanos":253247906,"cancellable":false,"headers":{}},{"node":"9lWCm1y_QkujaAg75bVx7A","id":93,"type":"transport","action":"indices:admin/index_template/put","start_time_in_millis":1695900464904,"running_time_in_nanos":60230460,"cancellable":false,"headers":{}},{"node":"9lWCm1y_QkujaAg75bVx7A","id":50,"type":"transport","action":"indices:data/write/index","start_time_in_millis":1695900464229,"running_time_in_nanos":734480468,"cancellable":false,"headers":{}},{"node":"9lWCm1y_QkujaAg75bVx7A","id":51,"type":"transport","action":"indices:admin/auto_create","start_time_in_millis":1695900464235,"running_time_in_nanos":729223933,"cancellable":false,"headers":{}}]}`, - } - want := `# HELP elasticsearch_task_stats_action Number of tasks of a certain action -# TYPE elasticsearch_task_stats_action gauge -elasticsearch_task_stats_action{action="indices:admin/auto_create"} 1 -elasticsearch_task_stats_action{action="indices:admin/index_template/put"} 4 -elasticsearch_task_stats_action{action="indices:data/write/index"} 1 -` - for ver, out := range tcs { - t.Run(ver, func(t *testing.T) { - ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, _ *http.Request) { - fmt.Fprintln(w, out) - })) - defer ts.Close() - - u, err := url.Parse(ts.URL) - if err != nil { - t.Fatalf("Failed to parse URL: %s", err) - } - - c, err := NewTaskCollector(log.NewNopLogger(), u, ts.Client()) - if err != nil { - t.Fatalf("Failed to create collector: %v", err) - } - - if err := testutil.CollectAndCompare(wrapCollector{c}, strings.NewReader(want)); err != nil { - t.Fatalf("Metrics did not match: %v", err) - } - }) - } -} diff --git a/pkg/clusterinfo/clusterinfo_test.go b/pkg/clusterinfo/clusterinfo_test.go index 91841682..beb3ff6b 100644 --- a/pkg/clusterinfo/clusterinfo_test.go +++ b/pkg/clusterinfo/clusterinfo_test.go @@ -44,7 +44,7 @@ const ( type mockES struct{} -func (mockES) ServeHTTP(w http.ResponseWriter, _ *http.Request) { +func (mockES) ServeHTTP(w http.ResponseWriter, r *http.Request) { fmt.Fprintf(w, `{ "name" : "%s",