From e047551a49b27cbb2358b5f857c1218cf214383e Mon Sep 17 00:00:00 2001 From: Matt Bostock Date: Sun, 23 Jul 2017 10:50:43 +0100 Subject: [PATCH] Move write HTTP handler into own package Move the write (ingest) handler to its own package. --- acceptance_test.go | 3 +- internal/write/write.go | 140 ++++++++++++++++++++++++++++++++++++++++ main.go | 120 ++-------------------------------- 3 files changed, 147 insertions(+), 116 deletions(-) create mode 100644 internal/write/write.go diff --git a/acceptance_test.go b/acceptance_test.go index 3c2080f7..3af4d6ca 100644 --- a/acceptance_test.go +++ b/acceptance_test.go @@ -17,6 +17,7 @@ import ( "github.com/golang/snappy" "github.com/mattbostock/athensdb/internal/remote" "github.com/mattbostock/athensdb/internal/testutil" + "github.com/mattbostock/athensdb/internal/write" "github.com/prometheus/common/model" ) @@ -140,7 +141,7 @@ func postWriteRequest(req *remote.WriteRequest) (*http.Response, error) { } compressed := snappy.Encode(nil, data) - u := fmt.Sprintf("%s%s", httpBaseURL, writeRoute) + u := fmt.Sprintf("%s%s", httpBaseURL, write.Route) resp, err := http.Post(u, "snappy", bytes.NewBuffer(compressed)) if err != nil { return nil, err diff --git a/internal/write/write.go b/internal/write/write.go new file mode 100644 index 00000000..bcc48430 --- /dev/null +++ b/internal/write/write.go @@ -0,0 +1,140 @@ +package write + +import ( + "bytes" + "context" + "fmt" + "io/ioutil" + "net/http" + "sort" + "sync" + + "github.com/Sirupsen/logrus" + "github.com/golang/protobuf/proto" + "github.com/golang/snappy" + "github.com/mattbostock/athensdb/internal/cluster" + "github.com/mattbostock/athensdb/internal/remote" + "github.com/prometheus/prometheus/pkg/labels" + "github.com/prometheus/prometheus/storage" + "golang.org/x/net/context/ctxhttp" +) + +const ( + HttpHeaderInternalWrite = "X-AthensDB-Internal-Write-Version" + Route = "/receive" +) + +var ( + store storage.Storage + log *logrus.Logger +) + +func SetLogger(l *logrus.Logger) { + log = l +} +func SetStore(s storage.Storage) { + store = s +} + +func Handler(w http.ResponseWriter, r *http.Request) { + compressed, err := ioutil.ReadAll(r.Body) + if err != nil { + http.Error(w, err.Error(), http.StatusInternalServerError) + return + } + + reqBuf, err := snappy.Decode(nil, compressed) + if err != nil { + http.Error(w, err.Error(), http.StatusBadRequest) + return + } + + var req remote.WriteRequest + if err := proto.Unmarshal(reqBuf, &req); err != nil { + http.Error(w, err.Error(), http.StatusBadRequest) + return + } + + appender, err := store.Appender() + if err != nil { + // FIXME: Make error more useful + http.Error(w, err.Error(), http.StatusBadRequest) + return + } + defer appender.Commit() + + var samples uint32 + for _, ts := range req.Timeseries { + m := make(labels.Labels, 0, len(ts.Labels)) + for _, l := range ts.Labels { + m = append(m, labels.Label{ + Name: l.Name, + Value: l.Value, + }) + } + sort.Sort(m) + + for _, s := range ts.Samples { + // FIXME: Look at using AddFast + appender.Add(m, s.TimestampMs, s.Value) + samples++ + } + } + + // This is an internal write, so don't replicate it to other nodes + if r.Header.Get(HttpHeaderInternalWrite) != "" { + log.Debugf("Received %d samples from another node in the cluster", samples) + return + } + + var wg sync.WaitGroup + // FIXME: Avoid panic if the cluster is not yet initialised + nodes := cluster.Nodes() + var wgErrChan = make(chan error, len(nodes)) + for _, node := range nodes { + if node.Name() == cluster.LocalNode().Name() { + log.Debugf("Skipping local node %s", node) + continue + } + + wg.Add(1) + go func(n *cluster.Node) { + defer wg.Done() + + log.Debugf("Writing %d samples to %s", samples, n) + + httpAddr, err := n.HTTPAddr() + if err != nil { + wgErrChan <- err + return + } + apiURL := fmt.Sprintf("%s%s%s", "http://", httpAddr, Route) + nodeReq, err := http.NewRequest("POST", apiURL, bytes.NewBuffer(compressed)) + if err != nil { + wgErrChan <- err + return + } + nodeReq.Header.Add("Content-Encoding", "snappy") + nodeReq.Header.Set("Content-Type", "application/x-protobuf") + nodeReq.Header.Set("X-Prometheus-Remote-Write-Version", "0.1.0") + nodeReq.Header.Set(HttpHeaderInternalWrite, "0.0.1") + + // FIXME set timeout using context + httpResp, err := ctxhttp.Do(context.TODO(), http.DefaultClient, nodeReq) + if err != nil { + wgErrChan <- err + return + } + defer httpResp.Body.Close() + }(node) + } + // FIXME cancel requests if one fails + wg.Wait() + + select { + case err := <-wgErrChan: + http.Error(w, err.Error(), http.StatusBadRequest) + return + default: + } +} diff --git a/main.go b/main.go index 218af82c..8d09b031 100644 --- a/main.go +++ b/main.go @@ -1,28 +1,20 @@ package main import ( - "bytes" "fmt" - "io/ioutil" "net" "net/http" "os" - "sort" - "sync" "time" log "github.com/Sirupsen/logrus" - "github.com/golang/protobuf/proto" - "github.com/golang/snappy" v1API "github.com/mattbostock/athensdb/internal/api/v1" "github.com/mattbostock/athensdb/internal/cluster" - "github.com/mattbostock/athensdb/internal/remote" + "github.com/mattbostock/athensdb/internal/write" "github.com/prometheus/common/route" - "github.com/prometheus/prometheus/pkg/labels" "github.com/prometheus/prometheus/promql" "github.com/prometheus/prometheus/storage/tsdb" "golang.org/x/net/context" - "golang.org/x/net/context/ctxhttp" "gopkg.in/alecthomas/kingpin.v2" ) @@ -30,10 +22,7 @@ const ( defaultHTTPAddr = "localhost:9080" defaultPeerAddr = "localhost:7946" - apiRoute = "/api/v1" - writeRoute = "/receive" - - httpHeaderInternalWrite = "X-AthensDB-Internal-Write-Version" + apiRoute = "/api/v1" ) var ( @@ -121,108 +110,9 @@ func main() { var api = v1API.NewAPI(queryEngine, localStorage) api.Register(router.WithPrefix(apiRoute)) - router.Post(writeRoute, func(w http.ResponseWriter, r *http.Request) { - compressed, err := ioutil.ReadAll(r.Body) - if err != nil { - http.Error(w, err.Error(), http.StatusInternalServerError) - return - } - - reqBuf, err := snappy.Decode(nil, compressed) - if err != nil { - http.Error(w, err.Error(), http.StatusBadRequest) - return - } - - var req remote.WriteRequest - if err := proto.Unmarshal(reqBuf, &req); err != nil { - http.Error(w, err.Error(), http.StatusBadRequest) - return - } - - appender, err := localStorage.Appender() - if err != nil { - // FIXME: Make error more useful - http.Error(w, err.Error(), http.StatusBadRequest) - return - } - defer appender.Commit() - - var samples uint32 - for _, ts := range req.Timeseries { - m := make(labels.Labels, 0, len(ts.Labels)) - for _, l := range ts.Labels { - m = append(m, labels.Label{ - Name: l.Name, - Value: l.Value, - }) - } - sort.Sort(m) - - for _, s := range ts.Samples { - // FIXME: Look at using AddFast - appender.Add(m, s.TimestampMs, s.Value) - samples++ - } - } - - // This is an internal write, so don't replicate it to other nodes - if r.Header.Get(httpHeaderInternalWrite) != "" { - log.Debugf("Received %d samples from another node in the cluster", samples) - return - } - - var wg sync.WaitGroup - // FIXME: Avoid panic if the cluster is not yet initialised - nodes := cluster.Nodes() - var wgErrChan = make(chan error, len(nodes)) - for _, node := range nodes { - if node.Name() == cluster.LocalNode().Name() { - log.Debugf("Skipping local node %s", node) - continue - } - - wg.Add(1) - go func(n *cluster.Node) { - defer wg.Done() - - log.Debugf("Writing %d samples to %s", samples, n) - - httpAddr, err := n.HTTPAddr() - if err != nil { - wgErrChan <- err - return - } - apiURL := fmt.Sprintf("%s%s%s", "http://", httpAddr, writeRoute) - nodeReq, err := http.NewRequest("POST", apiURL, bytes.NewBuffer(compressed)) - if err != nil { - wgErrChan <- err - return - } - nodeReq.Header.Add("Content-Encoding", "snappy") - nodeReq.Header.Set("Content-Type", "application/x-protobuf") - nodeReq.Header.Set("X-Prometheus-Remote-Write-Version", "0.1.0") - nodeReq.Header.Set(httpHeaderInternalWrite, "0.0.1") - - // FIXME set timeout using context - httpResp, err := ctxhttp.Do(context.TODO(), http.DefaultClient, nodeReq) - if err != nil { - wgErrChan <- err - return - } - defer httpResp.Body.Close() - }(node) - } - // FIXME cancel requests if one fails - wg.Wait() - - select { - case err := <-wgErrChan: - http.Error(w, err.Error(), http.StatusBadRequest) - return - default: - } - }) + write.SetLogger(log.StandardLogger()) + write.SetStore(localStorage) + router.Post(write.Route, write.Handler) if err := cluster.Join(&cluster.Config{ HTTPAdvertiseAddr: *config.httpAdvertiseAddr,