Skip to content

Commit

Permalink
http listener refactor
Browse files Browse the repository at this point in the history
in this commit:

- chunks out the http request body to avoid making very large
  allocations.
- establishes a limit for the maximum http request body size that the
  listener will accept.
- utilizes a pool of byte buffers to reduce GC pressure.
  • Loading branch information
sparrc committed Oct 18, 2016
1 parent 91f48e7 commit 1bb9945
Show file tree
Hide file tree
Showing 2 changed files with 143 additions and 34 deletions.
29 changes: 29 additions & 0 deletions plugins/inputs/http_listener/bufferpool.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
package http_listener

type pool struct {
n int
buffers chan []byte
}

func NewPool() *pool {
return &pool{
buffers: make(chan []byte, 50),
}
}

func (p *pool) get() []byte {
select {
case b := <-p.buffers:
return b
default:
return make([]byte, MAX_ALLOCATION_SIZE)
}
}

func (p *pool) put(b []byte) {
select {
case p.buffers <- b:
default:
// drop it
}
}
148 changes: 114 additions & 34 deletions plugins/inputs/http_listener/http_listener.go
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
package http_listener

import (
"bufio"
"bytes"
"fmt"
"compress/gzip"
"io"
"log"
"net"
"net/http"
Expand All @@ -17,18 +17,31 @@ import (
"github.com/influxdata/telegraf/plugins/parsers"
)

const (
// DEFAULT_REQUEST_BODY_MAX is the default maximum request body size, in bytes.
// if the request body is over this size, we will return an HTTP 413 error.
// 512 MB
DEFAULT_REQUEST_BODY_MAX = 512 * 1000 * 1000

// MAX_ALLOCATION_SIZE is the maximum size, in bytes, of a single allocation
// of bytes that will be made handling a single HTTP request.
// 1 MB
MAX_ALLOCATION_SIZE = 1 * 1000 * 1000
)

type HttpListener struct {
ServiceAddress string
ReadTimeout internal.Duration
WriteTimeout internal.Duration
MaxBodySize int64

sync.Mutex
wg sync.WaitGroup

listener *stoppableListener.StoppableListener

parser parsers.Parser
acc telegraf.Accumulator
pool *pool
}

const sampleConfig = `
Expand Down Expand Up @@ -61,7 +74,12 @@ func (t *HttpListener) Start(acc telegraf.Accumulator) error {
t.Lock()
defer t.Unlock()

if t.MaxBodySize == 0 {
t.MaxBodySize = DEFAULT_REQUEST_BODY_MAX
}

t.acc = acc
t.pool = NewPool()

var rawListener, err = net.Listen("tcp", t.ServiceAddress)
if err != nil {
Expand All @@ -87,8 +105,6 @@ func (t *HttpListener) Stop() {
t.listener.Stop()
t.listener.Close()

t.wg.Wait()

log.Println("I! Stopped HTTP listener service on ", t.ServiceAddress)
}

Expand All @@ -111,37 +127,9 @@ func (t *HttpListener) httpListen() error {
}

func (t *HttpListener) ServeHTTP(res http.ResponseWriter, req *http.Request) {
t.wg.Add(1)
defer t.wg.Done()

switch req.URL.Path {
case "/write":
var http400msg bytes.Buffer
var partial string
scanner := bufio.NewScanner(req.Body)
scanner.Buffer([]byte(""), 128*1024)
for scanner.Scan() {
metrics, err := t.parser.Parse(scanner.Bytes())
if err == nil {
for _, m := range metrics {
t.acc.AddFields(m.Name(), m.Fields(), m.Tags(), m.Time())
}
partial = "partial write: "
} else {
http400msg.WriteString(err.Error() + " ")
}
}

if err := scanner.Err(); err != nil {
http.Error(res, "Internal server error: "+err.Error(), http.StatusInternalServerError)
} else if http400msg.Len() > 0 {
res.Header().Set("Content-Type", "application/json")
res.Header().Set("X-Influxdb-Version", "1.0")
res.WriteHeader(http.StatusBadRequest)
res.Write([]byte(fmt.Sprintf(`{"error":"%s%s"}`, partial, http400msg.String())))
} else {
res.WriteHeader(http.StatusNoContent)
}
t.serveWrite(res, req)
case "/query":
// Deliver a dummy response to the query endpoint, as some InfluxDB
// clients test endpoint availability with a query
Expand All @@ -158,6 +146,98 @@ func (t *HttpListener) ServeHTTP(res http.ResponseWriter, req *http.Request) {
}
}

func (t *HttpListener) serveWrite(res http.ResponseWriter, req *http.Request) {
// Check that the content length is not too large for us to handle.
if req.ContentLength > t.MaxBodySize {
toolarge(res)
return
}

// Handle gzip request bodies
var body io.ReadCloser
if req.Header.Get("Content-Encoding") == "gzip" {
r, err := gzip.NewReader(req.Body)
if err != nil {
log.Println("E! " + err.Error())
badrequest(res)
return
}
body = http.MaxBytesReader(res, r, t.MaxBodySize)
} else {
body = http.MaxBytesReader(res, req.Body, t.MaxBodySize)
}

var return400 bool
var buf []byte
var nextbuf []byte
bufi := 0
for {
if len(nextbuf) == 0 {
buf = t.pool.get()
} else {
buf = nextbuf
}
n, err := io.ReadFull(body, buf[bufi:])

if err != nil && err != io.ErrUnexpectedEOF {
// problem reading the request body
badrequest(res)
t.pool.put(buf)
return
}

if err == io.ErrUnexpectedEOF || n == 0 {
// finished reading the request body
if err := t.parse(buf[:n+bufi]); err != nil {
log.Println("E! " + err.Error())
return400 = true
}
t.pool.put(buf)
if return400 {
badrequest(res)
} else {
res.WriteHeader(http.StatusNoContent)
}
return
}

i := bytes.LastIndexByte(buf, '\n')
// TODO handle a line longer than the max buffer size
if err := t.parse(buf[:i]); err != nil {
log.Println("E! " + err.Error())
return400 = true
}
nextbuf = t.pool.get()
copy(nextbuf[:len(buf)-i], buf[i:])
bufi = len(buf) - i
t.pool.put(buf)
}
}

func (t *HttpListener) parse(b []byte) error {
metrics, err := t.parser.Parse(b)

for _, m := range metrics {
t.acc.AddFields(m.Name(), m.Fields(), m.Tags(), m.Time())
}

return err
}

func toolarge(res http.ResponseWriter) {
res.Header().Set("Content-Type", "application/json")
res.Header().Set("X-Influxdb-Version", "1.0")
res.WriteHeader(http.StatusRequestEntityTooLarge)
res.Write([]byte(`{"error":"http: request body too large"}`))
}

func badrequest(res http.ResponseWriter) {
res.Header().Set("Content-Type", "application/json")
res.Header().Set("X-Influxdb-Version", "1.0")
res.WriteHeader(http.StatusBadRequest)
res.Write([]byte(`{"error":"http: bad request"}`))
}

func init() {
inputs.Add("http_listener", func() telegraf.Input {
return &HttpListener{}
Expand Down

0 comments on commit 1bb9945

Please sign in to comment.