Skip to content

Commit

Permalink
Move write HTTP handler into own package
Browse files Browse the repository at this point in the history
Move the write (ingest) handler to its own package.
  • Loading branch information
mattbostock committed Jul 23, 2017
1 parent 01d34fd commit e047551
Show file tree
Hide file tree
Showing 3 changed files with 147 additions and 116 deletions.
3 changes: 2 additions & 1 deletion acceptance_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import (
"github.com/golang/snappy"
"github.com/mattbostock/athensdb/internal/remote"
"github.com/mattbostock/athensdb/internal/testutil"
"github.com/mattbostock/athensdb/internal/write"
"github.com/prometheus/common/model"
)

Expand Down Expand Up @@ -140,7 +141,7 @@ func postWriteRequest(req *remote.WriteRequest) (*http.Response, error) {
}

compressed := snappy.Encode(nil, data)
u := fmt.Sprintf("%s%s", httpBaseURL, writeRoute)
u := fmt.Sprintf("%s%s", httpBaseURL, write.Route)
resp, err := http.Post(u, "snappy", bytes.NewBuffer(compressed))
if err != nil {
return nil, err
Expand Down
140 changes: 140 additions & 0 deletions internal/write/write.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,140 @@
package write

import (
"bytes"
"context"
"fmt"
"io/ioutil"
"net/http"
"sort"
"sync"

"github.com/Sirupsen/logrus"
"github.com/golang/protobuf/proto"
"github.com/golang/snappy"
"github.com/mattbostock/athensdb/internal/cluster"
"github.com/mattbostock/athensdb/internal/remote"
"github.com/prometheus/prometheus/pkg/labels"
"github.com/prometheus/prometheus/storage"
"golang.org/x/net/context/ctxhttp"
)

const (
HttpHeaderInternalWrite = "X-AthensDB-Internal-Write-Version"
Route = "/receive"
)

var (
store storage.Storage
log *logrus.Logger
)

func SetLogger(l *logrus.Logger) {
log = l
}
func SetStore(s storage.Storage) {
store = s
}

func Handler(w http.ResponseWriter, r *http.Request) {
compressed, err := ioutil.ReadAll(r.Body)
if err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}

reqBuf, err := snappy.Decode(nil, compressed)
if err != nil {
http.Error(w, err.Error(), http.StatusBadRequest)
return
}

var req remote.WriteRequest
if err := proto.Unmarshal(reqBuf, &req); err != nil {
http.Error(w, err.Error(), http.StatusBadRequest)
return
}

appender, err := store.Appender()
if err != nil {
// FIXME: Make error more useful
http.Error(w, err.Error(), http.StatusBadRequest)
return
}
defer appender.Commit()

var samples uint32
for _, ts := range req.Timeseries {
m := make(labels.Labels, 0, len(ts.Labels))
for _, l := range ts.Labels {
m = append(m, labels.Label{
Name: l.Name,
Value: l.Value,
})
}
sort.Sort(m)

for _, s := range ts.Samples {
// FIXME: Look at using AddFast
appender.Add(m, s.TimestampMs, s.Value)
samples++
}
}

// This is an internal write, so don't replicate it to other nodes
if r.Header.Get(HttpHeaderInternalWrite) != "" {
log.Debugf("Received %d samples from another node in the cluster", samples)
return
}

var wg sync.WaitGroup
// FIXME: Avoid panic if the cluster is not yet initialised
nodes := cluster.Nodes()
var wgErrChan = make(chan error, len(nodes))
for _, node := range nodes {
if node.Name() == cluster.LocalNode().Name() {
log.Debugf("Skipping local node %s", node)
continue
}

wg.Add(1)
go func(n *cluster.Node) {
defer wg.Done()

log.Debugf("Writing %d samples to %s", samples, n)

httpAddr, err := n.HTTPAddr()
if err != nil {
wgErrChan <- err
return
}
apiURL := fmt.Sprintf("%s%s%s", "http://", httpAddr, Route)
nodeReq, err := http.NewRequest("POST", apiURL, bytes.NewBuffer(compressed))
if err != nil {
wgErrChan <- err
return
}
nodeReq.Header.Add("Content-Encoding", "snappy")
nodeReq.Header.Set("Content-Type", "application/x-protobuf")
nodeReq.Header.Set("X-Prometheus-Remote-Write-Version", "0.1.0")
nodeReq.Header.Set(HttpHeaderInternalWrite, "0.0.1")

// FIXME set timeout using context
httpResp, err := ctxhttp.Do(context.TODO(), http.DefaultClient, nodeReq)
if err != nil {
wgErrChan <- err
return
}
defer httpResp.Body.Close()
}(node)
}
// FIXME cancel requests if one fails
wg.Wait()

select {
case err := <-wgErrChan:
http.Error(w, err.Error(), http.StatusBadRequest)
return
default:
}
}
120 changes: 5 additions & 115 deletions main.go
Original file line number Diff line number Diff line change
@@ -1,39 +1,28 @@
package main

import (
"bytes"
"fmt"
"io/ioutil"
"net"
"net/http"
"os"
"sort"
"sync"
"time"

log "github.com/Sirupsen/logrus"
"github.com/golang/protobuf/proto"
"github.com/golang/snappy"
v1API "github.com/mattbostock/athensdb/internal/api/v1"
"github.com/mattbostock/athensdb/internal/cluster"
"github.com/mattbostock/athensdb/internal/remote"
"github.com/mattbostock/athensdb/internal/write"
"github.com/prometheus/common/route"
"github.com/prometheus/prometheus/pkg/labels"
"github.com/prometheus/prometheus/promql"
"github.com/prometheus/prometheus/storage/tsdb"
"golang.org/x/net/context"
"golang.org/x/net/context/ctxhttp"
"gopkg.in/alecthomas/kingpin.v2"
)

const (
defaultHTTPAddr = "localhost:9080"
defaultPeerAddr = "localhost:7946"

apiRoute = "/api/v1"
writeRoute = "/receive"

httpHeaderInternalWrite = "X-AthensDB-Internal-Write-Version"
apiRoute = "/api/v1"
)

var (
Expand Down Expand Up @@ -121,108 +110,9 @@ func main() {
var api = v1API.NewAPI(queryEngine, localStorage)
api.Register(router.WithPrefix(apiRoute))

router.Post(writeRoute, func(w http.ResponseWriter, r *http.Request) {
compressed, err := ioutil.ReadAll(r.Body)
if err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}

reqBuf, err := snappy.Decode(nil, compressed)
if err != nil {
http.Error(w, err.Error(), http.StatusBadRequest)
return
}

var req remote.WriteRequest
if err := proto.Unmarshal(reqBuf, &req); err != nil {
http.Error(w, err.Error(), http.StatusBadRequest)
return
}

appender, err := localStorage.Appender()
if err != nil {
// FIXME: Make error more useful
http.Error(w, err.Error(), http.StatusBadRequest)
return
}
defer appender.Commit()

var samples uint32
for _, ts := range req.Timeseries {
m := make(labels.Labels, 0, len(ts.Labels))
for _, l := range ts.Labels {
m = append(m, labels.Label{
Name: l.Name,
Value: l.Value,
})
}
sort.Sort(m)

for _, s := range ts.Samples {
// FIXME: Look at using AddFast
appender.Add(m, s.TimestampMs, s.Value)
samples++
}
}

// This is an internal write, so don't replicate it to other nodes
if r.Header.Get(httpHeaderInternalWrite) != "" {
log.Debugf("Received %d samples from another node in the cluster", samples)
return
}

var wg sync.WaitGroup
// FIXME: Avoid panic if the cluster is not yet initialised
nodes := cluster.Nodes()
var wgErrChan = make(chan error, len(nodes))
for _, node := range nodes {
if node.Name() == cluster.LocalNode().Name() {
log.Debugf("Skipping local node %s", node)
continue
}

wg.Add(1)
go func(n *cluster.Node) {
defer wg.Done()

log.Debugf("Writing %d samples to %s", samples, n)

httpAddr, err := n.HTTPAddr()
if err != nil {
wgErrChan <- err
return
}
apiURL := fmt.Sprintf("%s%s%s", "http://", httpAddr, writeRoute)
nodeReq, err := http.NewRequest("POST", apiURL, bytes.NewBuffer(compressed))
if err != nil {
wgErrChan <- err
return
}
nodeReq.Header.Add("Content-Encoding", "snappy")
nodeReq.Header.Set("Content-Type", "application/x-protobuf")
nodeReq.Header.Set("X-Prometheus-Remote-Write-Version", "0.1.0")
nodeReq.Header.Set(httpHeaderInternalWrite, "0.0.1")

// FIXME set timeout using context
httpResp, err := ctxhttp.Do(context.TODO(), http.DefaultClient, nodeReq)
if err != nil {
wgErrChan <- err
return
}
defer httpResp.Body.Close()
}(node)
}
// FIXME cancel requests if one fails
wg.Wait()

select {
case err := <-wgErrChan:
http.Error(w, err.Error(), http.StatusBadRequest)
return
default:
}
})
write.SetLogger(log.StandardLogger())
write.SetStore(localStorage)
router.Post(write.Route, write.Handler)

if err := cluster.Join(&cluster.Config{
HTTPAdvertiseAddr: *config.httpAdvertiseAddr,
Expand Down

0 comments on commit e047551

Please sign in to comment.