From 1bb9945b0f52d392af05d5ae297682727e09ab16 Mon Sep 17 00:00:00 2001 From: Cameron Sparr Date: Tue, 18 Oct 2016 12:22:23 +0100 Subject: [PATCH] http listener refactor in this commit: - chunks out the http request body to avoid making very large allocations. - establishes a limit for the maximum http request body size that the listener will accept. - utilizes a pool of byte buffers to reduce GC pressure. --- plugins/inputs/http_listener/bufferpool.go | 29 ++++ plugins/inputs/http_listener/http_listener.go | 148 ++++++++++++++---- 2 files changed, 143 insertions(+), 34 deletions(-) create mode 100644 plugins/inputs/http_listener/bufferpool.go diff --git a/plugins/inputs/http_listener/bufferpool.go b/plugins/inputs/http_listener/bufferpool.go new file mode 100644 index 0000000000000..667fd1c0f3c25 --- /dev/null +++ b/plugins/inputs/http_listener/bufferpool.go @@ -0,0 +1,29 @@ +package http_listener + +type pool struct { + n int + buffers chan []byte +} + +func NewPool() *pool { + return &pool{ + buffers: make(chan []byte, 50), + } +} + +func (p *pool) get() []byte { + select { + case b := <-p.buffers: + return b + default: + return make([]byte, MAX_ALLOCATION_SIZE) + } +} + +func (p *pool) put(b []byte) { + select { + case p.buffers <- b: + default: + // drop it + } +} diff --git a/plugins/inputs/http_listener/http_listener.go b/plugins/inputs/http_listener/http_listener.go index 2eeee8e75657d..4fa8e0a75998a 100644 --- a/plugins/inputs/http_listener/http_listener.go +++ b/plugins/inputs/http_listener/http_listener.go @@ -1,9 +1,9 @@ package http_listener import ( - "bufio" "bytes" - "fmt" + "compress/gzip" + "io" "log" "net" "net/http" @@ -17,18 +17,31 @@ import ( "github.com/influxdata/telegraf/plugins/parsers" ) +const ( + // DEFAULT_REQUEST_BODY_MAX is the default maximum request body size, in bytes. + // if the request body is over this size, we will return an HTTP 413 error. + // 512 MB + DEFAULT_REQUEST_BODY_MAX = 512 * 1000 * 1000 + + // MAX_ALLOCATION_SIZE is the maximum size, in bytes, of a single allocation + // of bytes that will be made handling a single HTTP request. + // 1 MB + MAX_ALLOCATION_SIZE = 1 * 1000 * 1000 +) + type HttpListener struct { ServiceAddress string ReadTimeout internal.Duration WriteTimeout internal.Duration + MaxBodySize int64 sync.Mutex - wg sync.WaitGroup listener *stoppableListener.StoppableListener parser parsers.Parser acc telegraf.Accumulator + pool *pool } const sampleConfig = ` @@ -61,7 +74,12 @@ func (t *HttpListener) Start(acc telegraf.Accumulator) error { t.Lock() defer t.Unlock() + if t.MaxBodySize == 0 { + t.MaxBodySize = DEFAULT_REQUEST_BODY_MAX + } + t.acc = acc + t.pool = NewPool() var rawListener, err = net.Listen("tcp", t.ServiceAddress) if err != nil { @@ -87,8 +105,6 @@ func (t *HttpListener) Stop() { t.listener.Stop() t.listener.Close() - t.wg.Wait() - log.Println("I! Stopped HTTP listener service on ", t.ServiceAddress) } @@ -111,37 +127,9 @@ func (t *HttpListener) httpListen() error { } func (t *HttpListener) ServeHTTP(res http.ResponseWriter, req *http.Request) { - t.wg.Add(1) - defer t.wg.Done() - switch req.URL.Path { case "/write": - var http400msg bytes.Buffer - var partial string - scanner := bufio.NewScanner(req.Body) - scanner.Buffer([]byte(""), 128*1024) - for scanner.Scan() { - metrics, err := t.parser.Parse(scanner.Bytes()) - if err == nil { - for _, m := range metrics { - t.acc.AddFields(m.Name(), m.Fields(), m.Tags(), m.Time()) - } - partial = "partial write: " - } else { - http400msg.WriteString(err.Error() + " ") - } - } - - if err := scanner.Err(); err != nil { - http.Error(res, "Internal server error: "+err.Error(), http.StatusInternalServerError) - } else if http400msg.Len() > 0 { - res.Header().Set("Content-Type", "application/json") - res.Header().Set("X-Influxdb-Version", "1.0") - res.WriteHeader(http.StatusBadRequest) - res.Write([]byte(fmt.Sprintf(`{"error":"%s%s"}`, partial, http400msg.String()))) - } else { - res.WriteHeader(http.StatusNoContent) - } + t.serveWrite(res, req) case "/query": // Deliver a dummy response to the query endpoint, as some InfluxDB // clients test endpoint availability with a query @@ -158,6 +146,98 @@ func (t *HttpListener) ServeHTTP(res http.ResponseWriter, req *http.Request) { } } +func (t *HttpListener) serveWrite(res http.ResponseWriter, req *http.Request) { + // Check that the content length is not too large for us to handle. + if req.ContentLength > t.MaxBodySize { + toolarge(res) + return + } + + // Handle gzip request bodies + var body io.ReadCloser + if req.Header.Get("Content-Encoding") == "gzip" { + r, err := gzip.NewReader(req.Body) + if err != nil { + log.Println("E! " + err.Error()) + badrequest(res) + return + } + body = http.MaxBytesReader(res, r, t.MaxBodySize) + } else { + body = http.MaxBytesReader(res, req.Body, t.MaxBodySize) + } + + var return400 bool + var buf []byte + var nextbuf []byte + bufi := 0 + for { + if len(nextbuf) == 0 { + buf = t.pool.get() + } else { + buf = nextbuf + } + n, err := io.ReadFull(body, buf[bufi:]) + + if err != nil && err != io.ErrUnexpectedEOF { + // problem reading the request body + badrequest(res) + t.pool.put(buf) + return + } + + if err == io.ErrUnexpectedEOF || n == 0 { + // finished reading the request body + if err := t.parse(buf[:n+bufi]); err != nil { + log.Println("E! " + err.Error()) + return400 = true + } + t.pool.put(buf) + if return400 { + badrequest(res) + } else { + res.WriteHeader(http.StatusNoContent) + } + return + } + + i := bytes.LastIndexByte(buf, '\n') + // TODO handle a line longer than the max buffer size + if err := t.parse(buf[:i]); err != nil { + log.Println("E! " + err.Error()) + return400 = true + } + nextbuf = t.pool.get() + copy(nextbuf[:len(buf)-i], buf[i:]) + bufi = len(buf) - i + t.pool.put(buf) + } +} + +func (t *HttpListener) parse(b []byte) error { + metrics, err := t.parser.Parse(b) + + for _, m := range metrics { + t.acc.AddFields(m.Name(), m.Fields(), m.Tags(), m.Time()) + } + + return err +} + +func toolarge(res http.ResponseWriter) { + res.Header().Set("Content-Type", "application/json") + res.Header().Set("X-Influxdb-Version", "1.0") + res.WriteHeader(http.StatusRequestEntityTooLarge) + res.Write([]byte(`{"error":"http: request body too large"}`)) +} + +func badrequest(res http.ResponseWriter) { + res.Header().Set("Content-Type", "application/json") + res.Header().Set("X-Influxdb-Version", "1.0") + res.WriteHeader(http.StatusBadRequest) + res.Write([]byte(`{"error":"http: bad request"}`)) +} + func init() { inputs.Add("http_listener", func() telegraf.Input { return &HttpListener{}