Skip to content

Commit

Permalink
Basic retrying query frontend.
Browse files Browse the repository at this point in the history
Signed-off-by: Tom Wilkie <tom.wilkie@gmail.com>
  • Loading branch information
tomwilkie committed Aug 16, 2018
1 parent 4ffec28 commit f807690
Show file tree
Hide file tree
Showing 11 changed files with 1,466 additions and 63 deletions.
2 changes: 2 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,14 @@ cmd/configs/configs
cmd/distributor/distributor
cmd/ingester/ingester
cmd/querier/querier
cmd/query-frontend/query-frontend
cmd/ruler/ruler
cmd/table-manager/table-manager
cmd/lite/lite
.uptodate
.pkg
.cache
pkg/ingester/client/cortex.pb.go
pkg/querier/frontend/frontend.pb.go
pkg/ring/ring.pb.go
images/
2 changes: 1 addition & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ images:
@echo > /dev/null

# Generating proto code is automated.
PROTO_DEFS := $(shell find . $(DONT_FIND) -type f -name '*.proto' -print)
PROTO_DEFS := $(shell find . $(DONT_FIND) -type f -name '*.proto' -print) vendor/github.com/weaveworks/common/httpgrpc/httpgrpc.proto
PROTO_GOS := $(patsubst %.proto,%.pb.go,$(PROTO_DEFS))

# Building binaries is now automated. The convention is to build a binary
Expand Down
12 changes: 11 additions & 1 deletion cmd/querier/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,13 +14,15 @@ import (
"github.com/prometheus/prometheus/web/api/v1"
"github.com/prometheus/tsdb"

httpgrpc_server "github.com/weaveworks/common/httpgrpc/server"
"github.com/weaveworks/common/middleware"
"github.com/weaveworks/common/server"
"github.com/weaveworks/common/tracing"
"github.com/weaveworks/cortex/pkg/chunk"
"github.com/weaveworks/cortex/pkg/chunk/storage"
"github.com/weaveworks/cortex/pkg/distributor"
"github.com/weaveworks/cortex/pkg/querier"
"github.com/weaveworks/cortex/pkg/querier/frontend"
"github.com/weaveworks/cortex/pkg/ring"
"github.com/weaveworks/cortex/pkg/util"
)
Expand All @@ -39,9 +41,10 @@ func main() {
chunkStoreConfig chunk.StoreConfig
schemaConfig chunk.SchemaConfig
storageConfig storage.Config
workerConfig frontend.WorkerConfig
)
util.RegisterFlags(&serverConfig, &ringConfig, &distributorConfig, &querierConfig,
&chunkStoreConfig, &schemaConfig, &storageConfig)
&chunkStoreConfig, &schemaConfig, &storageConfig, &workerConfig)
flag.Parse()

// Setting the environment variable JAEGER_AGENT_HOST enables tracing
Expand Down Expand Up @@ -86,6 +89,13 @@ func main() {
}
defer chunkStore.Stop()

worker, err := frontend.NewWorker(workerConfig, httpgrpc_server.NewServer(server.HTTP), util.Logger)
if err != nil {
level.Error(util.Logger).Log("err", err)
os.Exit(1)
}
defer worker.Stop()

queryable, engine := querier.Make(querierConfig, dist, chunkStore)
api := v1.NewAPI(
engine,
Expand Down
10 changes: 10 additions & 0 deletions cmd/query-frontend/Dockerfile
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
FROM alpine:3.8
RUN apk add --no-cache ca-certificates
COPY query-frontend /bin/query-frontend
EXPOSE 80
ENTRYPOINT [ "/bin/query-frontend" ]

ARG revision
LABEL org.opencontainers.image.title="query-frontend" \
org.opencontainers.image.source="https://github.com/weaveworks/cortex/tree/master/cmd/query-frontend" \
org.opencontainers.image.revision="${revision}"
53 changes: 53 additions & 0 deletions cmd/query-frontend/main.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
package main

import (
"flag"
"os"

"github.com/go-kit/kit/log/level"
"google.golang.org/grpc"

"github.com/weaveworks/common/middleware"
"github.com/weaveworks/common/server"
"github.com/weaveworks/common/tracing"
"github.com/weaveworks/cortex/pkg/querier/frontend"
"github.com/weaveworks/cortex/pkg/util"
)

func main() {
var (
serverConfig = server.Config{
MetricsNamespace: "cortex",
GRPCMiddleware: []grpc.UnaryServerInterceptor{
middleware.ServerUserHeaderInterceptor,
},
}
frontendConfig frontend.Config
)
util.RegisterFlags(&serverConfig, &frontendConfig)
flag.Parse()

// Setting the environment variable JAEGER_AGENT_HOST enables tracing
trace := tracing.NewFromEnv("query-frontend")
defer trace.Close()

util.InitLogger(&serverConfig)

server, err := server.New(serverConfig)
if err != nil {
level.Error(util.Logger).Log("msg", "error initializing server", "err", err)
os.Exit(1)
}
defer server.Shutdown()

f, err := frontend.New(frontendConfig, util.Logger)
if err != nil {
level.Error(util.Logger).Log("msg", "error initializing frontend", "err", err)
os.Exit(1)
}
defer f.Close()

frontend.RegisterFrontendServer(server.GRPC, f)
server.HTTP.PathPrefix("/api/prom").Handler(middleware.AuthenticateUser.Wrap(f))
server.Run()
}
2 changes: 1 addition & 1 deletion pkg/querier/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ type Config struct {
Iterators bool
}

// RegisterFlags adds the flags required to config this to the given FlagSet
// RegisterFlags adds the flags required to config this to the given FlagSet.
func (cfg *Config) RegisterFlags(f *flag.FlagSet) {
f.IntVar(&cfg.MaxConcurrent, "querier.max-concurrent", 20, "The maximum number of concurrent queries.")
f.DurationVar(&cfg.Timeout, "querier.timeout", 2*time.Minute, "The timeout for a query.")
Expand Down
212 changes: 212 additions & 0 deletions pkg/querier/frontend/frontend.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,212 @@
package frontend

import (
"flag"
"math/rand"
"net/http"
"sync"

"github.com/go-kit/kit/log"
"github.com/go-kit/kit/log/level"
"github.com/weaveworks/common/httpgrpc"
"github.com/weaveworks/common/httpgrpc/server"
"github.com/weaveworks/common/user"
)

var (
errServerClosing = httpgrpc.Errorf(http.StatusTeapot, "server closing down")
errTooManyRequest = httpgrpc.Errorf(http.StatusTooManyRequests, "too many outstanding requests")
errCanceled = httpgrpc.Errorf(http.StatusInternalServerError, "context cancelled")
)

// Config for a Frontend.
type Config struct {
MaxOutstandingPerTenant int
MaxRetries int
}

// RegisterFlags adds the flags required to config this to the given FlagSet.
func (cfg *Config) RegisterFlags(f *flag.FlagSet) {
f.IntVar(&cfg.MaxOutstandingPerTenant, "querier.max-outstanding-requests-per-tenant", 100, "")
f.IntVar(&cfg.MaxRetries, "querier.max-retries-per-request", 5, "")
}

// Frontend queues HTTP requests, dispatches them to backends, and handles retries
// for requests which failed.
type Frontend struct {
cfg Config
log log.Logger

mtx sync.Mutex
cond *sync.Cond
closed bool
queues map[string]chan *request
}

type request struct {
request *httpgrpc.HTTPRequest
err chan error
response chan *httpgrpc.HTTPResponse
}

// New creates a new frontend.
func New(cfg Config, log log.Logger) (*Frontend, error) {
f := &Frontend{
cfg: cfg,
log: log,
queues: map[string]chan *request{},
}
f.cond = sync.NewCond(&f.mtx)
return f, nil
}

// Close stops new requests and errors out any pending requests.
func (f *Frontend) Close() {
f.mtx.Lock()
defer f.mtx.Unlock()

f.closed = true
f.cond.Broadcast()

for _, queue := range f.queues {
close(queue)
for request := range queue {
request.err <- errServerClosing
}
}
}

// ServeHTTP serves HTTP requests.
func (f *Frontend) ServeHTTP(w http.ResponseWriter, r *http.Request) {
if err := f.serveHTTP(w, r); err != nil {
server.WriteError(w, err)
}
}

func (f *Frontend) serveHTTP(w http.ResponseWriter, r *http.Request) error {
ctx := r.Context()
userID, err := user.ExtractOrgID(ctx)
if err != nil {
return err
}

req, err := server.HTTPRequest(r)
if err != nil {
return err
}

request := &request{
request: req,
// Buffer of 1 to ensure response can be written even if client has gone away.
err: make(chan error, 1),
response: make(chan *httpgrpc.HTTPResponse, 1),
}

var lastErr error
for retries := 0; retries < f.cfg.MaxRetries; retries++ {
if err := f.queueRequest(userID, request); err != nil {
return err
}

var resp *httpgrpc.HTTPResponse
select {
case <-ctx.Done():
// TODO propagate cancellation.
//request.Cancel()
return errCanceled

case resp = <-request.response:
case lastErr = <-request.err:
level.Error(f.log).Log("msg", "error processing request", "try", retries, "err", lastErr)
resp, _ = httpgrpc.HTTPResponseFromError(lastErr)
}

// Only fail is we get a valid HTTP non-500; otherwise retry.
if resp != nil && resp.Code/100 != 5 {
server.WriteResponse(w, resp)
return nil
}
}

return lastErr
}

// Process allows backends to pull requests from the frontend.
func (f *Frontend) Process(server Frontend_ProcessServer) error {
for {
request := f.getNextRequest()
if request == nil {
// Occurs when server is shutting down.
return nil
}

if err := server.Send(&ProcessRequest{
HttpRequest: request.request,
}); err != nil {
request.err <- err
return err
}

response, err := server.Recv()
if err != nil {
request.err <- err
return err
}

request.response <- response.HttpResponse
}
}

func (f *Frontend) queueRequest(userID string, req *request) error {
f.mtx.Lock()
defer f.mtx.Unlock()

if f.closed {
return errServerClosing
}

queue, ok := f.queues[userID]
if !ok {
queue = make(chan *request, f.cfg.MaxOutstandingPerTenant)
f.queues[userID] = queue
}

select {
case queue <- req:
f.cond.Signal()
return nil
default:
return errTooManyRequest
}
}

// getQueue picks a random queue and takes the next request off of it, so we
// faily process users queries. Will block if there are no requests.
func (f *Frontend) getNextRequest() *request {
f.mtx.Lock()
defer f.mtx.Unlock()

for len(f.queues) == 0 && !f.closed {
f.cond.Wait()
}

if f.closed {
return nil
}

i, n := 0, rand.Intn(len(f.queues))
for userID, queue := range f.queues {
if i < n {
i++
continue
}

request := <-queue
if len(queue) == 0 {
delete(f.queues, userID)
}
return request
}

panic("should never happen")
}
23 changes: 23 additions & 0 deletions pkg/querier/frontend/frontend.proto
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
syntax = "proto3";

package frontend;

option go_package = "frontend";

import "github.com/gogo/protobuf/gogoproto/gogo.proto";
import "github.com/weaveworks/common/httpgrpc/httpgrpc.proto";

option (gogoproto.marshaler_all) = true;
option (gogoproto.unmarshaler_all) = true;

service Frontend {
rpc Process(stream ProcessResponse) returns (stream ProcessRequest) {};
}

message ProcessRequest {
httpgrpc.HTTPRequest httpRequest = 1;
}

message ProcessResponse {
httpgrpc.HTTPResponse httpResponse = 1;
}
Loading

0 comments on commit f807690

Please sign in to comment.