diff --git a/CHANGELOG.md b/CHANGELOG.md index cd3b840cf9b5f..7bc5e7f8d70e1 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -30,7 +30,7 @@ continue sending logs to /var/log/telegraf/telegraf.log. - [#1542](https://github.com/influxdata/telegraf/pull/1542): Add filestack webhook plugin. - [#1599](https://github.com/influxdata/telegraf/pull/1599): Add server hostname for each docker measurements. - [#1697](https://github.com/influxdata/telegraf/pull/1697): Add NATS output plugin. -- [#1407](https://github.com/influxdata/telegraf/pull/1407): HTTP service listener input plugin. +- [#1407](https://github.com/influxdata/telegraf/pull/1407) & [#1915](https://github.com/influxdata/telegraf/pull/1915): HTTP service listener input plugin. - [#1699](https://github.com/influxdata/telegraf/pull/1699): Add database blacklist option for Postgresql - [#1791](https://github.com/influxdata/telegraf/pull/1791): Add Docker container state metrics to Docker input plugin output - [#1755](https://github.com/influxdata/telegraf/issues/1755): Add support to SNMP for IP & MAC address conversion. diff --git a/internal/buffer/buffer.go b/internal/buffer/buffer.go index 75899f0f89ac5..58cd1c3764d7e 100644 --- a/internal/buffer/buffer.go +++ b/internal/buffer/buffer.go @@ -14,7 +14,7 @@ type Buffer struct { // total metrics added total int - sync.Mutex + mu sync.Mutex } // NewBuffer returns a Buffer @@ -65,13 +65,13 @@ func (b *Buffer) Add(metrics ...telegraf.Metric) { // the batch will be of maximum length batchSize. It can be less than batchSize, // if the length of Buffer is less than batchSize. func (b *Buffer) Batch(batchSize int) []telegraf.Metric { - b.Lock() + b.mu.Lock() n := min(len(b.buf), batchSize) out := make([]telegraf.Metric, n) for i := 0; i < n; i++ { out[i] = <-b.buf } - b.Unlock() + b.mu.Unlock() return out } diff --git a/plugins/inputs/http_listener/http_listener.go b/plugins/inputs/http_listener/http_listener.go index b63f622951877..ddc9ac7bf8cec 100644 --- a/plugins/inputs/http_listener/http_listener.go +++ b/plugins/inputs/http_listener/http_listener.go @@ -28,7 +28,7 @@ const ( DEFAULT_MAX_LINE_SIZE = 64 * 1024 ) -type HttpListener struct { +type HTTPListener struct { ServiceAddress string ReadTimeout internal.Duration WriteTimeout internal.Duration @@ -63,24 +63,23 @@ const sampleConfig = ` max_line_size = 0 ` -func (h *HttpListener) SampleConfig() string { +func (h *HTTPListener) SampleConfig() string { return sampleConfig } -func (h *HttpListener) Description() string { +func (h *HTTPListener) Description() string { return "Influx HTTP write listener" } -func (h *HttpListener) Gather(_ telegraf.Accumulator) error { +func (h *HTTPListener) Gather(_ telegraf.Accumulator) error { log.Printf("D! The http_listener has created %d buffers", h.pool.ncreated()) return nil } // Start starts the http listener service. -func (h *HttpListener) Start(acc telegraf.Accumulator) error { +func (h *HTTPListener) Start(acc telegraf.Accumulator) error { h.mu.Lock() defer h.mu.Unlock() - h.parser = influx.InfluxParser{} if h.MaxBodySize == 0 { h.MaxBodySize = DEFAULT_MAX_BODY_SIZE @@ -110,7 +109,7 @@ func (h *HttpListener) Start(acc telegraf.Accumulator) error { } // Stop cleans up all resources -func (h *HttpListener) Stop() { +func (h *HTTPListener) Stop() { h.mu.Lock() defer h.mu.Unlock() @@ -124,7 +123,7 @@ func (h *HttpListener) Stop() { // like server.Serve, httpListen will always return a non-nil error, for this // reason, the error returned should probably be ignored. // see https://golang.org/pkg/net/http/#Server.Serve -func (h *HttpListener) httpListen() error { +func (h *HTTPListener) httpListen() error { if h.ReadTimeout.Duration < time.Second { h.ReadTimeout.Duration = time.Second * 10 } @@ -141,7 +140,7 @@ func (h *HttpListener) httpListen() error { return server.Serve(h.listener) } -func (h *HttpListener) ServeHTTP(res http.ResponseWriter, req *http.Request) { +func (h *HTTPListener) ServeHTTP(res http.ResponseWriter, req *http.Request) { switch req.URL.Path { case "/write": h.serveWrite(res, req) @@ -161,7 +160,7 @@ func (h *HttpListener) ServeHTTP(res http.ResponseWriter, req *http.Request) { } } -func (h *HttpListener) serveWrite(res http.ResponseWriter, req *http.Request) { +func (h *HTTPListener) serveWrite(res http.ResponseWriter, req *http.Request) { // Check that the content length is not too large for us to handle. if req.ContentLength > h.MaxBodySize { tooLarge(res) @@ -171,8 +170,9 @@ func (h *HttpListener) serveWrite(res http.ResponseWriter, req *http.Request) { // Handle gzip request bodies body := req.Body + var err error if req.Header.Get("Content-Encoding") == "gzip" { - body, err := gzip.NewReader(req.Body) + body, err = gzip.NewReader(req.Body) defer body.Close() if err != nil { log.Println("E! " + err.Error()) @@ -185,7 +185,7 @@ func (h *HttpListener) serveWrite(res http.ResponseWriter, req *http.Request) { var return400 bool var hangingBytes bool buf := h.pool.get() - defer func() { h.pool.put(buf) }() + defer h.pool.put(buf) bufStart := 0 for { n, err := io.ReadFull(body, buf[bufStart:]) @@ -261,7 +261,7 @@ func (h *HttpListener) serveWrite(res http.ResponseWriter, req *http.Request) { } } -func (h *HttpListener) parse(b []byte, t time.Time) error { +func (h *HTTPListener) parse(b []byte, t time.Time) error { metrics, err := h.parser.ParseWithDefaultTime(b, t) for _, m := range metrics { @@ -287,7 +287,7 @@ func badRequest(res http.ResponseWriter) { func init() { inputs.Add("http_listener", func() telegraf.Input { - return &HttpListener{ + return &HTTPListener{ ServiceAddress: ":8186", } }) diff --git a/plugins/inputs/http_listener/http_listener_test.go b/plugins/inputs/http_listener/http_listener_test.go index 9f021885e46f0..84cf209ff5f2a 100644 --- a/plugins/inputs/http_listener/http_listener_test.go +++ b/plugins/inputs/http_listener/http_listener_test.go @@ -2,6 +2,7 @@ package http_listener import ( "bytes" + "io/ioutil" "net/http" "sync" "testing" @@ -26,15 +27,15 @@ cpu_load_short,host=server06 value=12.0 1422568543702900257 emptyMsg = "" ) -func newTestHttpListener() *HttpListener { - listener := &HttpListener{ +func newTestHTTPListener() *HTTPListener { + listener := &HTTPListener{ ServiceAddress: ":8186", } return listener } func TestWriteHTTP(t *testing.T) { - listener := newTestHttpListener() + listener := newTestHTTPListener() acc := &testutil.Accumulator{} require.NoError(t, listener.Start(acc)) @@ -81,7 +82,7 @@ func TestWriteHTTP(t *testing.T) { } func TestWriteHTTPMaxLineSizeIncrease(t *testing.T) { - listener := &HttpListener{ + listener := &HTTPListener{ ServiceAddress: ":8296", MaxLineSize: 128 * 1000, } @@ -92,15 +93,121 @@ func TestWriteHTTPMaxLineSizeIncrease(t *testing.T) { time.Sleep(time.Millisecond * 25) - // Post a gigantic metric to the listener and verify that an error is returned: + // Post a gigantic metric to the listener and verify that it writes OK this time: resp, err := http.Post("http://localhost:8296/write?db=mydb", "", bytes.NewBuffer([]byte(hugeMetric))) require.NoError(t, err) require.EqualValues(t, 204, resp.StatusCode) } +func TestWriteHTTPVerySmallMaxBody(t *testing.T) { + listener := &HTTPListener{ + ServiceAddress: ":8297", + MaxBodySize: 4096, + } + + acc := &testutil.Accumulator{} + require.NoError(t, listener.Start(acc)) + defer listener.Stop() + + time.Sleep(time.Millisecond * 25) + + resp, err := http.Post("http://localhost:8297/write", "", bytes.NewBuffer([]byte(hugeMetric))) + require.NoError(t, err) + require.EqualValues(t, 413, resp.StatusCode) +} + +func TestWriteHTTPVerySmallMaxLineSize(t *testing.T) { + listener := &HTTPListener{ + ServiceAddress: ":8298", + MaxLineSize: 70, + } + + acc := &testutil.Accumulator{} + require.NoError(t, listener.Start(acc)) + defer listener.Stop() + + time.Sleep(time.Millisecond * 25) + + resp, err := http.Post("http://localhost:8298/write", "", bytes.NewBuffer([]byte(testMsgs))) + require.NoError(t, err) + require.EqualValues(t, 204, resp.StatusCode) + + time.Sleep(time.Millisecond * 15) + hostTags := []string{"server02", "server03", + "server04", "server05", "server06"} + for _, hostTag := range hostTags { + acc.AssertContainsTaggedFields(t, "cpu_load_short", + map[string]interface{}{"value": float64(12)}, + map[string]string{"host": hostTag}, + ) + } +} + +func TestWriteHTTPLargeLinesSkipped(t *testing.T) { + listener := &HTTPListener{ + ServiceAddress: ":8300", + MaxLineSize: 100, + } + + acc := &testutil.Accumulator{} + require.NoError(t, listener.Start(acc)) + defer listener.Stop() + + time.Sleep(time.Millisecond * 25) + + resp, err := http.Post("http://localhost:8300/write", "", bytes.NewBuffer([]byte(hugeMetric+testMsgs))) + require.NoError(t, err) + require.EqualValues(t, 400, resp.StatusCode) + + time.Sleep(time.Millisecond * 15) + hostTags := []string{"server02", "server03", + "server04", "server05", "server06"} + for _, hostTag := range hostTags { + acc.AssertContainsTaggedFields(t, "cpu_load_short", + map[string]interface{}{"value": float64(12)}, + map[string]string{"host": hostTag}, + ) + } +} + +// test that writing gzipped data works +func TestWriteHTTPGzippedData(t *testing.T) { + listener := &HTTPListener{ + ServiceAddress: ":8299", + } + + acc := &testutil.Accumulator{} + require.NoError(t, listener.Start(acc)) + defer listener.Stop() + + time.Sleep(time.Millisecond * 25) + + data, err := ioutil.ReadFile("./testdata/testmsgs.gz") + require.NoError(t, err) + + req, err := http.NewRequest("POST", "http://localhost:8299/write", bytes.NewBuffer(data)) + require.NoError(t, err) + req.Header.Set("Content-Encoding", "gzip") + + client := &http.Client{} + resp, err := client.Do(req) + require.NoError(t, err) + require.EqualValues(t, 204, resp.StatusCode) + + time.Sleep(time.Millisecond * 50) + hostTags := []string{"server02", "server03", + "server04", "server05", "server06"} + for _, hostTag := range hostTags { + acc.AssertContainsTaggedFields(t, "cpu_load_short", + map[string]interface{}{"value": float64(12)}, + map[string]string{"host": hostTag}, + ) + } +} + // writes 25,000 metrics to the listener with 10 different writers func TestWriteHTTPHighTraffic(t *testing.T) { - listener := &HttpListener{ServiceAddress: ":8286"} + listener := &HTTPListener{ServiceAddress: ":8286"} acc := &testutil.Accumulator{} require.NoError(t, listener.Start(acc)) @@ -123,14 +230,14 @@ func TestWriteHTTPHighTraffic(t *testing.T) { } wg.Wait() - time.Sleep(time.Millisecond * 50) + time.Sleep(time.Millisecond * 250) listener.Gather(acc) require.Equal(t, int64(25000), int64(acc.NMetrics())) } func TestReceive404ForInvalidEndpoint(t *testing.T) { - listener := newTestHttpListener() + listener := newTestHTTPListener() acc := &testutil.Accumulator{} require.NoError(t, listener.Start(acc)) @@ -147,7 +254,7 @@ func TestReceive404ForInvalidEndpoint(t *testing.T) { func TestWriteHTTPInvalid(t *testing.T) { time.Sleep(time.Millisecond * 250) - listener := newTestHttpListener() + listener := newTestHTTPListener() acc := &testutil.Accumulator{} require.NoError(t, listener.Start(acc)) @@ -164,7 +271,7 @@ func TestWriteHTTPInvalid(t *testing.T) { func TestWriteHTTPEmpty(t *testing.T) { time.Sleep(time.Millisecond * 250) - listener := newTestHttpListener() + listener := newTestHTTPListener() acc := &testutil.Accumulator{} require.NoError(t, listener.Start(acc)) @@ -181,7 +288,7 @@ func TestWriteHTTPEmpty(t *testing.T) { func TestQueryAndPingHTTP(t *testing.T) { time.Sleep(time.Millisecond * 250) - listener := newTestHttpListener() + listener := newTestHTTPListener() acc := &testutil.Accumulator{} require.NoError(t, listener.Start(acc)) diff --git a/plugins/inputs/http_listener/testdata/testmsgs.gz b/plugins/inputs/http_listener/testdata/testmsgs.gz new file mode 100644 index 0000000000000..f524dc07128b9 Binary files /dev/null and b/plugins/inputs/http_listener/testdata/testmsgs.gz differ