diff --git a/plugins/inputs/http_listener/bufferpool.go b/plugins/inputs/http_listener/bufferpool.go new file mode 100644 index 0000000000000..6ab627cccd74e --- /dev/null +++ b/plugins/inputs/http_listener/bufferpool.go @@ -0,0 +1,34 @@ +package http_listener + +type pool struct { + buffers chan []byte +} + +func NewPool(n int) *pool { + p := &pool{ + buffers: make(chan []byte, n), + } + for i := 0; i < n; i++ { + p.buffers <- make([]byte, MAX_LINE_SIZE) + } + return p +} + +func (p *pool) get() []byte { + select { + case b := <-p.buffers: + return b + default: + // pool is empty, so make a new buffer + return make([]byte, MAX_LINE_SIZE) + } +} + +func (p *pool) put(b []byte) { + select { + case p.buffers <- b: + default: + // the pool is full, so drop this buffer + b = nil + } +} diff --git a/plugins/inputs/http_listener/http_listener.go b/plugins/inputs/http_listener/http_listener.go index 2eeee8e75657d..4ae78c617f937 100644 --- a/plugins/inputs/http_listener/http_listener.go +++ b/plugins/inputs/http_listener/http_listener.go @@ -1,9 +1,9 @@ package http_listener import ( - "bufio" "bytes" - "fmt" + "compress/gzip" + "io" "log" "net" "net/http" @@ -17,27 +17,45 @@ import ( "github.com/influxdata/telegraf/plugins/parsers" ) +const ( + // DEFAULT_REQUEST_BODY_MAX is the default maximum request body size, in bytes. + // if the request body is over this size, we will return an HTTP 413 error. + // 512 MB + DEFAULT_REQUEST_BODY_MAX = 512 * 1000 * 1000 + + // MAX_LINE_SIZE is the maximum size, in bytes, that can be allocated for + // a single InfluxDB point. + // 1 MB + MAX_LINE_SIZE = 1 * 1000 * 1000 +) + type HttpListener struct { ServiceAddress string ReadTimeout internal.Duration WriteTimeout internal.Duration + MaxBodySize int64 sync.Mutex - wg sync.WaitGroup listener *stoppableListener.StoppableListener parser parsers.Parser acc telegraf.Accumulator + pool *pool } const sampleConfig = ` ## Address and port to host HTTP listener on service_address = ":8186" - ## timeouts + ## maximum duration before timing out read of the request read_timeout = "10s" + ## maximum duration before timing out write of the response write_timeout = "10s" + + ## Maximum allowed http request body size in bytes. + ## 0 means to use the default of 1,000,000,000 bytes (1 gigabyte) + max_body_size = 0 ` func (t *HttpListener) SampleConfig() string { @@ -61,7 +79,12 @@ func (t *HttpListener) Start(acc telegraf.Accumulator) error { t.Lock() defer t.Unlock() + if t.MaxBodySize == 0 { + t.MaxBodySize = DEFAULT_REQUEST_BODY_MAX + } + t.acc = acc + t.pool = NewPool(100) var rawListener, err = net.Listen("tcp", t.ServiceAddress) if err != nil { @@ -87,8 +110,6 @@ func (t *HttpListener) Stop() { t.listener.Stop() t.listener.Close() - t.wg.Wait() - log.Println("I! Stopped HTTP listener service on ", t.ServiceAddress) } @@ -111,37 +132,9 @@ func (t *HttpListener) httpListen() error { } func (t *HttpListener) ServeHTTP(res http.ResponseWriter, req *http.Request) { - t.wg.Add(1) - defer t.wg.Done() - switch req.URL.Path { case "/write": - var http400msg bytes.Buffer - var partial string - scanner := bufio.NewScanner(req.Body) - scanner.Buffer([]byte(""), 128*1024) - for scanner.Scan() { - metrics, err := t.parser.Parse(scanner.Bytes()) - if err == nil { - for _, m := range metrics { - t.acc.AddFields(m.Name(), m.Fields(), m.Tags(), m.Time()) - } - partial = "partial write: " - } else { - http400msg.WriteString(err.Error() + " ") - } - } - - if err := scanner.Err(); err != nil { - http.Error(res, "Internal server error: "+err.Error(), http.StatusInternalServerError) - } else if http400msg.Len() > 0 { - res.Header().Set("Content-Type", "application/json") - res.Header().Set("X-Influxdb-Version", "1.0") - res.WriteHeader(http.StatusBadRequest) - res.Write([]byte(fmt.Sprintf(`{"error":"%s%s"}`, partial, http400msg.String()))) - } else { - res.WriteHeader(http.StatusNoContent) - } + t.serveWrite(res, req) case "/query": // Deliver a dummy response to the query endpoint, as some InfluxDB // clients test endpoint availability with a query @@ -158,6 +151,126 @@ func (t *HttpListener) ServeHTTP(res http.ResponseWriter, req *http.Request) { } } +func (t *HttpListener) serveWrite(res http.ResponseWriter, req *http.Request) { + // Check that the content length is not too large for us to handle. + if req.ContentLength > t.MaxBodySize { + toolarge(res) + return + } + + // Handle gzip request bodies + var body io.ReadCloser + if req.Header.Get("Content-Encoding") == "gzip" { + r, err := gzip.NewReader(req.Body) + defer r.Close() + if err != nil { + log.Println("E! " + err.Error()) + badrequest(res) + return + } + body = http.MaxBytesReader(res, r, t.MaxBodySize) + } else { + body = http.MaxBytesReader(res, req.Body, t.MaxBodySize) + } + + var return400 bool + var buf []byte + var nextbuf []byte + bufi := 0 + for { + if len(nextbuf) == 0 { + buf = t.pool.get() + } else { + buf = nextbuf + } + n, err := io.ReadFull(body, buf[bufi:]) + + if err != nil && err != io.ErrUnexpectedEOF { + log.Println("E! " + err.Error()) + // problem reading the request body + badrequest(res) + t.pool.put(buf) + return + } + + if err == io.ErrUnexpectedEOF || n == 0 { + // finished reading the request body + if err := t.parse(buf[:n+bufi]); err != nil { + log.Println("E! " + err.Error()) + return400 = true + } + t.pool.put(buf) + if return400 { + badrequest(res) + } else { + res.WriteHeader(http.StatusNoContent) + } + return + } + + i := bytes.LastIndexByte(buf, '\n') + if i == -1 { + newlinei := findnewline(body) + log.Printf("E! http_listener received a single line of %d bytes, maximum is %d bytes", + MAX_LINE_SIZE+newlinei, MAX_LINE_SIZE) + return400 = true + continue + } + if err := t.parse(buf[:i]); err != nil { + log.Println("E! " + err.Error()) + return400 = true + } + nextbuf = t.pool.get() + copy(nextbuf[:len(buf)-i], buf[i:]) + bufi = len(buf) - i + t.pool.put(buf) + } +} + +func (t *HttpListener) parse(b []byte) error { + metrics, err := t.parser.Parse(b) + + for _, m := range metrics { + t.acc.AddFields(m.Name(), m.Fields(), m.Tags(), m.Time()) + } + + return err +} + +func toolarge(res http.ResponseWriter) { + res.Header().Set("Content-Type", "application/json") + res.Header().Set("X-Influxdb-Version", "1.0") + res.WriteHeader(http.StatusRequestEntityTooLarge) + res.Write([]byte(`{"error":"http: request body too large"}`)) +} + +func badrequest(res http.ResponseWriter) { + res.Header().Set("Content-Type", "application/json") + res.Header().Set("X-Influxdb-Version", "1.0") + res.WriteHeader(http.StatusBadRequest) + res.Write([]byte(`{"error":"http: bad request"}`)) +} + +// findnewline finds the next newline in the given reader. It returns the number +// of bytes it had to read to get there. +func findnewline(r io.Reader) int { + // drop any line longer than the max buffer size + counter := 0 + // read until the next newline: + var tmp [1]byte + for { + _, err := r.Read(tmp[:]) + if err != nil { + break + } + counter++ + if tmp[0] == '\n' { + break + } + } + return counter +} + func init() { inputs.Add("http_listener", func() telegraf.Input { return &HttpListener{}