From 2c6f0df35bdddd9d78d4b66443f03e417c5f3a50 Mon Sep 17 00:00:00 2001 From: Dominik Rosiek <58699848+sumo-drosiek@users.noreply.github.com> Date: Tue, 27 Jul 2021 23:10:50 +0200 Subject: [PATCH] feat(http_listener_v2): allows multiple paths and add path_tag (#9529) --- plugins/inputs/http_listener_v2/README.md | 11 +++- .../http_listener_v2/http_listener_v2.go | 32 +++++++++-- .../http_listener_v2/http_listener_v2_test.go | 56 +++++++++++++++++++ .../parsers/prometheusremotewrite/README.md | 4 +- 4 files changed, 93 insertions(+), 10 deletions(-) diff --git a/plugins/inputs/http_listener_v2/README.md b/plugins/inputs/http_listener_v2/README.md index 108a8d50a9a2a..a87ec3f833890 100644 --- a/plugins/inputs/http_listener_v2/README.md +++ b/plugins/inputs/http_listener_v2/README.md @@ -19,7 +19,14 @@ This is a sample configuration for the plugin. service_address = ":8080" ## Path to listen to. - # path = "/telegraf" + ## This option is deprecated and only available for backward-compatibility. Please use paths instead. + # path = "" + + ## Paths to listen to. + # paths = ["/telegraf"] + + ## Save path as http_listener_v2_path tag if set to true + # path_tag = false ## HTTP methods to accept. # methods = ["POST", "PUT"] @@ -59,7 +66,7 @@ This is a sample configuration for the plugin. ## Each data format has its own unique set of configuration options, read ## more about them here: ## https://github.com/influxdata/telegraf/blob/master/docs/DATA_FORMATS_INPUT.md - data_format = "json" + data_format = "influx" ``` ### Metrics: diff --git a/plugins/inputs/http_listener_v2/http_listener_v2.go b/plugins/inputs/http_listener_v2/http_listener_v2.go index 89714bb0818b1..5b511de57fb54 100644 --- a/plugins/inputs/http_listener_v2/http_listener_v2.go +++ b/plugins/inputs/http_listener_v2/http_listener_v2.go @@ -15,6 +15,7 @@ import ( "github.com/golang/snappy" "github.com/influxdata/telegraf" "github.com/influxdata/telegraf/config" + "github.com/influxdata/telegraf/internal/choice" tlsint "github.com/influxdata/telegraf/plugins/common/tls" "github.com/influxdata/telegraf/plugins/inputs" "github.com/influxdata/telegraf/plugins/parsers" @@ -26,8 +27,9 @@ import ( const defaultMaxBodySize = 500 * 1024 * 1024 const ( - body = "body" - query = "query" + body = "body" + query = "query" + pathTag = "http_listener_v2_path" ) // TimeFunc provides a timestamp for the metrics @@ -37,6 +39,8 @@ type TimeFunc func() time.Time type HTTPListenerV2 struct { ServiceAddress string `toml:"service_address"` Path string `toml:"path"` + Paths []string `toml:"paths"` + PathTag bool `toml:"path_tag"` Methods []string `toml:"methods"` DataSource string `toml:"data_source"` ReadTimeout config.Duration `toml:"read_timeout"` @@ -64,7 +68,14 @@ const sampleConfig = ` service_address = ":8080" ## Path to listen to. - # path = "/telegraf" + ## This option is deprecated and only available for backward-compatibility. Please use paths instead. + # path = "" + + ## Paths to listen to. + # paths = ["/telegraf"] + + ## Save path as http_listener_v2_path tag if set to true + # path_tag = false ## HTTP methods to accept. # methods = ["POST", "PUT"] @@ -75,7 +86,7 @@ const sampleConfig = ` # write_timeout = "10s" ## Maximum allowed http request body size in bytes. - ## 0 means to use the default of 524,288,00 bytes (500 mebibytes) + ## 0 means to use the default of 524,288,000 bytes (500 mebibytes) # max_body_size = "500MB" ## Part of the request to consume. Available options are "body" and @@ -136,6 +147,11 @@ func (h *HTTPListenerV2) Start(acc telegraf.Accumulator) error { h.WriteTimeout = config.Duration(time.Second * 10) } + // Append h.Path to h.Paths + if h.Path != "" && !choice.Contains(h.Path, h.Paths) { + h.Paths = append(h.Paths, h.Path) + } + h.acc = acc tlsConf, err := h.ServerConfig.TLSConfig() @@ -189,7 +205,7 @@ func (h *HTTPListenerV2) Stop() { func (h *HTTPListenerV2) ServeHTTP(res http.ResponseWriter, req *http.Request) { handler := h.serveWrite - if req.URL.Path != h.Path { + if !choice.Contains(req.URL.Path, h.Paths) { handler = http.NotFound } @@ -251,6 +267,10 @@ func (h *HTTPListenerV2) serveWrite(res http.ResponseWriter, req *http.Request) } } + if h.PathTag { + m.AddTag(pathTag, req.URL.Path) + } + h.acc.AddMetric(m) } @@ -370,7 +390,7 @@ func init() { return &HTTPListenerV2{ ServiceAddress: ":8080", TimeFunc: time.Now, - Path: "/telegraf", + Paths: []string{"/telegraf"}, Methods: []string{"POST", "PUT"}, DataSource: body, } diff --git a/plugins/inputs/http_listener_v2/http_listener_v2_test.go b/plugins/inputs/http_listener_v2/http_listener_v2_test.go index 6b906f9cec3e3..5daaf2785ffe3 100644 --- a/plugins/inputs/http_listener_v2/http_listener_v2_test.go +++ b/plugins/inputs/http_listener_v2/http_listener_v2_test.go @@ -230,6 +230,62 @@ func TestWriteHTTP(t *testing.T) { ) } +// http listener should add request path as configured path_tag +func TestWriteHTTPWithPathTag(t *testing.T) { + listener := newTestHTTPListenerV2() + listener.PathTag = true + + acc := &testutil.Accumulator{} + require.NoError(t, listener.Start(acc)) + defer listener.Stop() + + // post single message to listener + resp, err := http.Post(createURL(listener, "http", "/write", "db=mydb"), "", bytes.NewBuffer([]byte(testMsgNoNewline))) + require.NoError(t, err) + require.NoError(t, resp.Body.Close()) + require.EqualValues(t, 204, resp.StatusCode) + + acc.Wait(1) + acc.AssertContainsTaggedFields(t, "cpu_load_short", + map[string]interface{}{"value": float64(12)}, + map[string]string{"host": "server01", "http_listener_v2_path": "/write"}, + ) +} + +// http listener should add request path as configured path_tag (trimming it before) +func TestWriteHTTPWithMultiplePaths(t *testing.T) { + listener := newTestHTTPListenerV2() + listener.Paths = []string{"/alternative_write"} + listener.PathTag = true + + acc := &testutil.Accumulator{} + require.NoError(t, listener.Start(acc)) + defer listener.Stop() + + // post single message to /write + resp, err := http.Post(createURL(listener, "http", "/write", "db=mydb"), "", bytes.NewBuffer([]byte(testMsgNoNewline))) + require.NoError(t, err) + require.NoError(t, resp.Body.Close()) + require.EqualValues(t, 204, resp.StatusCode) + + // post single message to /alternative_write + resp, err = http.Post(createURL(listener, "http", "/alternative_write", "db=mydb"), "", bytes.NewBuffer([]byte(testMsgNoNewline))) + require.NoError(t, err) + require.NoError(t, resp.Body.Close()) + require.EqualValues(t, 204, resp.StatusCode) + + acc.Wait(1) + acc.AssertContainsTaggedFields(t, "cpu_load_short", + map[string]interface{}{"value": float64(12)}, + map[string]string{"host": "server01", "http_listener_v2_path": "/write"}, + ) + + acc.AssertContainsTaggedFields(t, "cpu_load_short", + map[string]interface{}{"value": float64(12)}, + map[string]string{"host": "server01", "http_listener_v2_path": "/alternative_write"}, + ) +} + // http listener should add a newline at the end of the buffer if it's not there func TestWriteHTTPNoNewline(t *testing.T) { listener := newTestHTTPListenerV2() diff --git a/plugins/parsers/prometheusremotewrite/README.md b/plugins/parsers/prometheusremotewrite/README.md index b409e9e6d5c8f..6d2c17ef898dc 100644 --- a/plugins/parsers/prometheusremotewrite/README.md +++ b/plugins/parsers/prometheusremotewrite/README.md @@ -9,8 +9,8 @@ Converts prometheus remote write samples directly into Telegraf metrics. It can ## Address and port to host HTTP listener on service_address = ":1234" - ## Path to listen to. - path = "/receive" + ## Paths to listen to. + paths = ["/receive"] ## Data format to consume. data_format = "prometheusremotewrite"