From f807690176ec98508a03c5ba29cbd8740c309ebd Mon Sep 17 00:00:00 2001 From: Tom Wilkie Date: Sun, 29 Jul 2018 12:47:55 +0100 Subject: [PATCH 01/10] Basic retrying query frontend. Signed-off-by: Tom Wilkie --- .gitignore | 2 + Makefile | 2 +- cmd/querier/main.go | 12 +- cmd/query-frontend/Dockerfile | 10 + cmd/query-frontend/main.go | 53 + pkg/querier/config.go | 2 +- pkg/querier/frontend/frontend.go | 212 ++++ pkg/querier/frontend/frontend.proto | 23 + pkg/querier/frontend/worker.go | 148 +++ .../weaveworks/common/httpgrpc/httpgrpc.pb.go | 1018 ++++++++++++++++- .../common/httpgrpc/server/server.go | 47 +- 11 files changed, 1466 insertions(+), 63 deletions(-) create mode 100644 cmd/query-frontend/Dockerfile create mode 100644 cmd/query-frontend/main.go create mode 100644 pkg/querier/frontend/frontend.go create mode 100644 pkg/querier/frontend/frontend.proto create mode 100644 pkg/querier/frontend/worker.go diff --git a/.gitignore b/.gitignore index e0c01dc65c..12df738aa7 100644 --- a/.gitignore +++ b/.gitignore @@ -3,6 +3,7 @@ cmd/configs/configs cmd/distributor/distributor cmd/ingester/ingester cmd/querier/querier +cmd/query-frontend/query-frontend cmd/ruler/ruler cmd/table-manager/table-manager cmd/lite/lite @@ -10,5 +11,6 @@ cmd/lite/lite .pkg .cache pkg/ingester/client/cortex.pb.go +pkg/querier/frontend/frontend.pb.go pkg/ring/ring.pb.go images/ diff --git a/Makefile b/Makefile index 432747c352..3daf7e49cd 100644 --- a/Makefile +++ b/Makefile @@ -31,7 +31,7 @@ images: @echo > /dev/null # Generating proto code is automated. -PROTO_DEFS := $(shell find . $(DONT_FIND) -type f -name '*.proto' -print) +PROTO_DEFS := $(shell find . $(DONT_FIND) -type f -name '*.proto' -print) vendor/github.com/weaveworks/common/httpgrpc/httpgrpc.proto PROTO_GOS := $(patsubst %.proto,%.pb.go,$(PROTO_DEFS)) # Building binaries is now automated. The convention is to build a binary diff --git a/cmd/querier/main.go b/cmd/querier/main.go index ac2054708c..e1f2835d50 100644 --- a/cmd/querier/main.go +++ b/cmd/querier/main.go @@ -14,6 +14,7 @@ import ( "github.com/prometheus/prometheus/web/api/v1" "github.com/prometheus/tsdb" + httpgrpc_server "github.com/weaveworks/common/httpgrpc/server" "github.com/weaveworks/common/middleware" "github.com/weaveworks/common/server" "github.com/weaveworks/common/tracing" @@ -21,6 +22,7 @@ import ( "github.com/weaveworks/cortex/pkg/chunk/storage" "github.com/weaveworks/cortex/pkg/distributor" "github.com/weaveworks/cortex/pkg/querier" + "github.com/weaveworks/cortex/pkg/querier/frontend" "github.com/weaveworks/cortex/pkg/ring" "github.com/weaveworks/cortex/pkg/util" ) @@ -39,9 +41,10 @@ func main() { chunkStoreConfig chunk.StoreConfig schemaConfig chunk.SchemaConfig storageConfig storage.Config + workerConfig frontend.WorkerConfig ) util.RegisterFlags(&serverConfig, &ringConfig, &distributorConfig, &querierConfig, - &chunkStoreConfig, &schemaConfig, &storageConfig) + &chunkStoreConfig, &schemaConfig, &storageConfig, &workerConfig) flag.Parse() // Setting the environment variable JAEGER_AGENT_HOST enables tracing @@ -86,6 +89,13 @@ func main() { } defer chunkStore.Stop() + worker, err := frontend.NewWorker(workerConfig, httpgrpc_server.NewServer(server.HTTP), util.Logger) + if err != nil { + level.Error(util.Logger).Log("err", err) + os.Exit(1) + } + defer worker.Stop() + queryable, engine := querier.Make(querierConfig, dist, chunkStore) api := v1.NewAPI( engine, diff --git a/cmd/query-frontend/Dockerfile b/cmd/query-frontend/Dockerfile new file mode 100644 index 0000000000..526837146c --- /dev/null +++ b/cmd/query-frontend/Dockerfile @@ -0,0 +1,10 @@ +FROM alpine:3.8 +RUN apk add --no-cache ca-certificates +COPY query-frontend /bin/query-frontend +EXPOSE 80 +ENTRYPOINT [ "/bin/query-frontend" ] + +ARG revision +LABEL org.opencontainers.image.title="query-frontend" \ + org.opencontainers.image.source="https://github.com/weaveworks/cortex/tree/master/cmd/query-frontend" \ + org.opencontainers.image.revision="${revision}" diff --git a/cmd/query-frontend/main.go b/cmd/query-frontend/main.go new file mode 100644 index 0000000000..550c84b0a3 --- /dev/null +++ b/cmd/query-frontend/main.go @@ -0,0 +1,53 @@ +package main + +import ( + "flag" + "os" + + "github.com/go-kit/kit/log/level" + "google.golang.org/grpc" + + "github.com/weaveworks/common/middleware" + "github.com/weaveworks/common/server" + "github.com/weaveworks/common/tracing" + "github.com/weaveworks/cortex/pkg/querier/frontend" + "github.com/weaveworks/cortex/pkg/util" +) + +func main() { + var ( + serverConfig = server.Config{ + MetricsNamespace: "cortex", + GRPCMiddleware: []grpc.UnaryServerInterceptor{ + middleware.ServerUserHeaderInterceptor, + }, + } + frontendConfig frontend.Config + ) + util.RegisterFlags(&serverConfig, &frontendConfig) + flag.Parse() + + // Setting the environment variable JAEGER_AGENT_HOST enables tracing + trace := tracing.NewFromEnv("query-frontend") + defer trace.Close() + + util.InitLogger(&serverConfig) + + server, err := server.New(serverConfig) + if err != nil { + level.Error(util.Logger).Log("msg", "error initializing server", "err", err) + os.Exit(1) + } + defer server.Shutdown() + + f, err := frontend.New(frontendConfig, util.Logger) + if err != nil { + level.Error(util.Logger).Log("msg", "error initializing frontend", "err", err) + os.Exit(1) + } + defer f.Close() + + frontend.RegisterFrontendServer(server.GRPC, f) + server.HTTP.PathPrefix("/api/prom").Handler(middleware.AuthenticateUser.Wrap(f)) + server.Run() +} diff --git a/pkg/querier/config.go b/pkg/querier/config.go index f19f784a97..a72c8f0d17 100644 --- a/pkg/querier/config.go +++ b/pkg/querier/config.go @@ -18,7 +18,7 @@ type Config struct { Iterators bool } -// RegisterFlags adds the flags required to config this to the given FlagSet +// RegisterFlags adds the flags required to config this to the given FlagSet. func (cfg *Config) RegisterFlags(f *flag.FlagSet) { f.IntVar(&cfg.MaxConcurrent, "querier.max-concurrent", 20, "The maximum number of concurrent queries.") f.DurationVar(&cfg.Timeout, "querier.timeout", 2*time.Minute, "The timeout for a query.") diff --git a/pkg/querier/frontend/frontend.go b/pkg/querier/frontend/frontend.go new file mode 100644 index 0000000000..0f8990f9ba --- /dev/null +++ b/pkg/querier/frontend/frontend.go @@ -0,0 +1,212 @@ +package frontend + +import ( + "flag" + "math/rand" + "net/http" + "sync" + + "github.com/go-kit/kit/log" + "github.com/go-kit/kit/log/level" + "github.com/weaveworks/common/httpgrpc" + "github.com/weaveworks/common/httpgrpc/server" + "github.com/weaveworks/common/user" +) + +var ( + errServerClosing = httpgrpc.Errorf(http.StatusTeapot, "server closing down") + errTooManyRequest = httpgrpc.Errorf(http.StatusTooManyRequests, "too many outstanding requests") + errCanceled = httpgrpc.Errorf(http.StatusInternalServerError, "context cancelled") +) + +// Config for a Frontend. +type Config struct { + MaxOutstandingPerTenant int + MaxRetries int +} + +// RegisterFlags adds the flags required to config this to the given FlagSet. +func (cfg *Config) RegisterFlags(f *flag.FlagSet) { + f.IntVar(&cfg.MaxOutstandingPerTenant, "querier.max-outstanding-requests-per-tenant", 100, "") + f.IntVar(&cfg.MaxRetries, "querier.max-retries-per-request", 5, "") +} + +// Frontend queues HTTP requests, dispatches them to backends, and handles retries +// for requests which failed. +type Frontend struct { + cfg Config + log log.Logger + + mtx sync.Mutex + cond *sync.Cond + closed bool + queues map[string]chan *request +} + +type request struct { + request *httpgrpc.HTTPRequest + err chan error + response chan *httpgrpc.HTTPResponse +} + +// New creates a new frontend. +func New(cfg Config, log log.Logger) (*Frontend, error) { + f := &Frontend{ + cfg: cfg, + log: log, + queues: map[string]chan *request{}, + } + f.cond = sync.NewCond(&f.mtx) + return f, nil +} + +// Close stops new requests and errors out any pending requests. +func (f *Frontend) Close() { + f.mtx.Lock() + defer f.mtx.Unlock() + + f.closed = true + f.cond.Broadcast() + + for _, queue := range f.queues { + close(queue) + for request := range queue { + request.err <- errServerClosing + } + } +} + +// ServeHTTP serves HTTP requests. +func (f *Frontend) ServeHTTP(w http.ResponseWriter, r *http.Request) { + if err := f.serveHTTP(w, r); err != nil { + server.WriteError(w, err) + } +} + +func (f *Frontend) serveHTTP(w http.ResponseWriter, r *http.Request) error { + ctx := r.Context() + userID, err := user.ExtractOrgID(ctx) + if err != nil { + return err + } + + req, err := server.HTTPRequest(r) + if err != nil { + return err + } + + request := &request{ + request: req, + // Buffer of 1 to ensure response can be written even if client has gone away. + err: make(chan error, 1), + response: make(chan *httpgrpc.HTTPResponse, 1), + } + + var lastErr error + for retries := 0; retries < f.cfg.MaxRetries; retries++ { + if err := f.queueRequest(userID, request); err != nil { + return err + } + + var resp *httpgrpc.HTTPResponse + select { + case <-ctx.Done(): + // TODO propagate cancellation. + //request.Cancel() + return errCanceled + + case resp = <-request.response: + case lastErr = <-request.err: + level.Error(f.log).Log("msg", "error processing request", "try", retries, "err", lastErr) + resp, _ = httpgrpc.HTTPResponseFromError(lastErr) + } + + // Only fail is we get a valid HTTP non-500; otherwise retry. + if resp != nil && resp.Code/100 != 5 { + server.WriteResponse(w, resp) + return nil + } + } + + return lastErr +} + +// Process allows backends to pull requests from the frontend. +func (f *Frontend) Process(server Frontend_ProcessServer) error { + for { + request := f.getNextRequest() + if request == nil { + // Occurs when server is shutting down. + return nil + } + + if err := server.Send(&ProcessRequest{ + HttpRequest: request.request, + }); err != nil { + request.err <- err + return err + } + + response, err := server.Recv() + if err != nil { + request.err <- err + return err + } + + request.response <- response.HttpResponse + } +} + +func (f *Frontend) queueRequest(userID string, req *request) error { + f.mtx.Lock() + defer f.mtx.Unlock() + + if f.closed { + return errServerClosing + } + + queue, ok := f.queues[userID] + if !ok { + queue = make(chan *request, f.cfg.MaxOutstandingPerTenant) + f.queues[userID] = queue + } + + select { + case queue <- req: + f.cond.Signal() + return nil + default: + return errTooManyRequest + } +} + +// getQueue picks a random queue and takes the next request off of it, so we +// faily process users queries. Will block if there are no requests. +func (f *Frontend) getNextRequest() *request { + f.mtx.Lock() + defer f.mtx.Unlock() + + for len(f.queues) == 0 && !f.closed { + f.cond.Wait() + } + + if f.closed { + return nil + } + + i, n := 0, rand.Intn(len(f.queues)) + for userID, queue := range f.queues { + if i < n { + i++ + continue + } + + request := <-queue + if len(queue) == 0 { + delete(f.queues, userID) + } + return request + } + + panic("should never happen") +} diff --git a/pkg/querier/frontend/frontend.proto b/pkg/querier/frontend/frontend.proto new file mode 100644 index 0000000000..9b2703ef1a --- /dev/null +++ b/pkg/querier/frontend/frontend.proto @@ -0,0 +1,23 @@ +syntax = "proto3"; + +package frontend; + +option go_package = "frontend"; + +import "github.com/gogo/protobuf/gogoproto/gogo.proto"; +import "github.com/weaveworks/common/httpgrpc/httpgrpc.proto"; + +option (gogoproto.marshaler_all) = true; +option (gogoproto.unmarshaler_all) = true; + +service Frontend { + rpc Process(stream ProcessResponse) returns (stream ProcessRequest) {}; +} + +message ProcessRequest { + httpgrpc.HTTPRequest httpRequest = 1; +} + +message ProcessResponse { + httpgrpc.HTTPResponse httpResponse = 1; +} diff --git a/pkg/querier/frontend/worker.go b/pkg/querier/frontend/worker.go new file mode 100644 index 0000000000..201864d930 --- /dev/null +++ b/pkg/querier/frontend/worker.go @@ -0,0 +1,148 @@ +package frontend + +import ( + "context" + "flag" + "net/http" + "sync" + "time" + + "github.com/go-kit/kit/log" + "github.com/go-kit/kit/log/level" + "github.com/grpc-ecosystem/grpc-opentracing/go/otgrpc" + "github.com/mwitkow/go-grpc-middleware" + "github.com/opentracing/opentracing-go" + "github.com/weaveworks/common/httpgrpc" + "github.com/weaveworks/common/httpgrpc/server" + "github.com/weaveworks/common/middleware" + "google.golang.org/grpc" + + "github.com/weaveworks/cortex/pkg/util" +) + +var ( + backoffConfig = util.BackoffConfig{ + MinBackoff: 50 * time.Millisecond, + MaxBackoff: 1 * time.Second, + } +) + +// WorkerConfig is config for a worker. +type WorkerConfig struct { + Address string + Parallelism int +} + +// RegisterFlags adds the flags required to config this to the given FlagSet. +func (cfg *WorkerConfig) RegisterFlags(f *flag.FlagSet) { + f.StringVar(&cfg.Address, "querier.frontend-address", "", "") + f.IntVar(&cfg.Parallelism, "querier.worker-parallelism", 1, "") +} + +// Worker is the counter-part to the frontend, actually processing requests. +type Worker struct { + cfg WorkerConfig + log log.Logger + server *server.Server + + client FrontendClient + ctx context.Context + cancel context.CancelFunc + wg sync.WaitGroup +} + +// NewWorker creates a new Worker. +func NewWorker(cfg WorkerConfig, server *server.Server, log log.Logger) (*Worker, error) { + client, err := connect(cfg.Address) + if err != nil { + return nil, err + } + + ctx, cancel := context.WithCancel(context.Background()) + worker := &Worker{ + cfg: cfg, + log: log, + server: server, + + client: client, + ctx: ctx, + cancel: cancel, + } + worker.wg.Add(cfg.Parallelism) + for i := 0; i < cfg.Parallelism; i++ { + go worker.run() + } + return worker, nil +} + +// Stop the worker. +func (w *Worker) Stop() { + w.cancel() + w.wg.Wait() +} + +// Run infinitely loops, trying to establish a connection to the frontend to +// begin request processing. +func (w *Worker) run() { + defer w.wg.Done() + + backoff := util.NewBackoff(w.ctx, backoffConfig) + for backoff.Ongoing() { + c, err := w.client.Process(w.ctx) + if err != nil { + level.Error(w.log).Log("msg", "error contacting frontend", "err", err) + backoff.Wait() + continue + } + + if err := w.loop(w.ctx, c); err != nil { + level.Error(w.log).Log("msg", "error processing requests", "err", err) + backoff.Wait() + continue + } + + backoff.Reset() + } +} + +func connect(address string) (FrontendClient, error) { + conn, err := grpc.Dial( + address, + grpc.WithInsecure(), + grpc.WithUnaryInterceptor(grpc_middleware.ChainUnaryClient( + otgrpc.OpenTracingClientInterceptor(opentracing.GlobalTracer()), + middleware.ClientUserHeaderInterceptor, + )), + ) + if err != nil { + return nil, err + } + return NewFrontendClient(conn), nil +} + +func (w *Worker) loop(ctx context.Context, c Frontend_ProcessClient) error { + for { + request, err := c.Recv() + if err != nil { + return err + } + + response, err := w.server.Handle(ctx, request.HttpRequest) + if err != nil { + var ok bool + response, ok = httpgrpc.HTTPResponseFromError(err) + if !ok { + response = &httpgrpc.HTTPResponse{ + Code: http.StatusInternalServerError, + Body: []byte(err.Error()), + } + } + } + + if err := c.Send(&ProcessResponse{ + HttpResponse: response, + }); err != nil { + return err + } + } +} diff --git a/vendor/github.com/weaveworks/common/httpgrpc/httpgrpc.pb.go b/vendor/github.com/weaveworks/common/httpgrpc/httpgrpc.pb.go index a617fc8578..52c2cea0d3 100644 --- a/vendor/github.com/weaveworks/common/httpgrpc/httpgrpc.pb.go +++ b/vendor/github.com/weaveworks/common/httpgrpc/httpgrpc.pb.go @@ -1,28 +1,32 @@ -// Code generated by protoc-gen-go. +// Code generated by protoc-gen-gogo. DO NOT EDIT. // source: github.com/weaveworks/common/httpgrpc/httpgrpc.proto -// DO NOT EDIT! /* -Package httpgrpc is a generated protocol buffer package. + Package httpgrpc is a generated protocol buffer package. -It is generated from these files: - github.com/weaveworks/common/httpgrpc/httpgrpc.proto + It is generated from these files: + github.com/weaveworks/common/httpgrpc/httpgrpc.proto -It has these top-level messages: - HTTPRequest - HTTPResponse - Header + It has these top-level messages: + HTTPRequest + HTTPResponse + Header */ package httpgrpc -import proto "github.com/golang/protobuf/proto" +import proto "github.com/gogo/protobuf/proto" import fmt "fmt" import math "math" -import ( - context "golang.org/x/net/context" - grpc "google.golang.org/grpc" -) +import bytes "bytes" + +import strings "strings" +import reflect "reflect" + +import context "golang.org/x/net/context" +import grpc "google.golang.org/grpc" + +import io "io" // Reference imports to suppress errors if they are not otherwise used. var _ = proto.Marshal @@ -33,19 +37,18 @@ var _ = math.Inf // is compatible with the proto package it is being compiled against. // A compilation error at this line likely means your copy of the // proto package needs to be updated. -const _ = proto.ProtoPackageIsVersion2 // please upgrade the proto package +const _ = proto.GoGoProtoPackageIsVersion2 // please upgrade the proto package type HTTPRequest struct { - Method string `protobuf:"bytes,1,opt,name=method" json:"method,omitempty"` - Url string `protobuf:"bytes,2,opt,name=url" json:"url,omitempty"` + Method string `protobuf:"bytes,1,opt,name=method,proto3" json:"method,omitempty"` + Url string `protobuf:"bytes,2,opt,name=url,proto3" json:"url,omitempty"` Headers []*Header `protobuf:"bytes,3,rep,name=headers" json:"headers,omitempty"` Body []byte `protobuf:"bytes,4,opt,name=body,proto3" json:"body,omitempty"` } func (m *HTTPRequest) Reset() { *m = HTTPRequest{} } -func (m *HTTPRequest) String() string { return proto.CompactTextString(m) } func (*HTTPRequest) ProtoMessage() {} -func (*HTTPRequest) Descriptor() ([]byte, []int) { return fileDescriptor0, []int{0} } +func (*HTTPRequest) Descriptor() ([]byte, []int) { return fileDescriptorHttpgrpc, []int{0} } func (m *HTTPRequest) GetMethod() string { if m != nil { @@ -76,15 +79,14 @@ func (m *HTTPRequest) GetBody() []byte { } type HTTPResponse struct { - Code int32 `protobuf:"varint,1,opt,name=Code" json:"Code,omitempty"` + Code int32 `protobuf:"varint,1,opt,name=Code,json=code,proto3" json:"Code,omitempty"` Headers []*Header `protobuf:"bytes,2,rep,name=headers" json:"headers,omitempty"` Body []byte `protobuf:"bytes,3,opt,name=body,proto3" json:"body,omitempty"` } func (m *HTTPResponse) Reset() { *m = HTTPResponse{} } -func (m *HTTPResponse) String() string { return proto.CompactTextString(m) } func (*HTTPResponse) ProtoMessage() {} -func (*HTTPResponse) Descriptor() ([]byte, []int) { return fileDescriptor0, []int{1} } +func (*HTTPResponse) Descriptor() ([]byte, []int) { return fileDescriptorHttpgrpc, []int{1} } func (m *HTTPResponse) GetCode() int32 { if m != nil { @@ -108,14 +110,13 @@ func (m *HTTPResponse) GetBody() []byte { } type Header struct { - Key string `protobuf:"bytes,1,opt,name=key" json:"key,omitempty"` + Key string `protobuf:"bytes,1,opt,name=key,proto3" json:"key,omitempty"` Values []string `protobuf:"bytes,2,rep,name=values" json:"values,omitempty"` } func (m *Header) Reset() { *m = Header{} } -func (m *Header) String() string { return proto.CompactTextString(m) } func (*Header) ProtoMessage() {} -func (*Header) Descriptor() ([]byte, []int) { return fileDescriptor0, []int{2} } +func (*Header) Descriptor() ([]byte, []int) { return fileDescriptorHttpgrpc, []int{2} } func (m *Header) GetKey() string { if m != nil { @@ -136,6 +137,159 @@ func init() { proto.RegisterType((*HTTPResponse)(nil), "httpgrpc.HTTPResponse") proto.RegisterType((*Header)(nil), "httpgrpc.Header") } +func (this *HTTPRequest) Equal(that interface{}) bool { + if that == nil { + return this == nil + } + + that1, ok := that.(*HTTPRequest) + if !ok { + that2, ok := that.(HTTPRequest) + if ok { + that1 = &that2 + } else { + return false + } + } + if that1 == nil { + return this == nil + } else if this == nil { + return false + } + if this.Method != that1.Method { + return false + } + if this.Url != that1.Url { + return false + } + if len(this.Headers) != len(that1.Headers) { + return false + } + for i := range this.Headers { + if !this.Headers[i].Equal(that1.Headers[i]) { + return false + } + } + if !bytes.Equal(this.Body, that1.Body) { + return false + } + return true +} +func (this *HTTPResponse) Equal(that interface{}) bool { + if that == nil { + return this == nil + } + + that1, ok := that.(*HTTPResponse) + if !ok { + that2, ok := that.(HTTPResponse) + if ok { + that1 = &that2 + } else { + return false + } + } + if that1 == nil { + return this == nil + } else if this == nil { + return false + } + if this.Code != that1.Code { + return false + } + if len(this.Headers) != len(that1.Headers) { + return false + } + for i := range this.Headers { + if !this.Headers[i].Equal(that1.Headers[i]) { + return false + } + } + if !bytes.Equal(this.Body, that1.Body) { + return false + } + return true +} +func (this *Header) Equal(that interface{}) bool { + if that == nil { + return this == nil + } + + that1, ok := that.(*Header) + if !ok { + that2, ok := that.(Header) + if ok { + that1 = &that2 + } else { + return false + } + } + if that1 == nil { + return this == nil + } else if this == nil { + return false + } + if this.Key != that1.Key { + return false + } + if len(this.Values) != len(that1.Values) { + return false + } + for i := range this.Values { + if this.Values[i] != that1.Values[i] { + return false + } + } + return true +} +func (this *HTTPRequest) GoString() string { + if this == nil { + return "nil" + } + s := make([]string, 0, 8) + s = append(s, "&httpgrpc.HTTPRequest{") + s = append(s, "Method: "+fmt.Sprintf("%#v", this.Method)+",\n") + s = append(s, "Url: "+fmt.Sprintf("%#v", this.Url)+",\n") + if this.Headers != nil { + s = append(s, "Headers: "+fmt.Sprintf("%#v", this.Headers)+",\n") + } + s = append(s, "Body: "+fmt.Sprintf("%#v", this.Body)+",\n") + s = append(s, "}") + return strings.Join(s, "") +} +func (this *HTTPResponse) GoString() string { + if this == nil { + return "nil" + } + s := make([]string, 0, 7) + s = append(s, "&httpgrpc.HTTPResponse{") + s = append(s, "Code: "+fmt.Sprintf("%#v", this.Code)+",\n") + if this.Headers != nil { + s = append(s, "Headers: "+fmt.Sprintf("%#v", this.Headers)+",\n") + } + s = append(s, "Body: "+fmt.Sprintf("%#v", this.Body)+",\n") + s = append(s, "}") + return strings.Join(s, "") +} +func (this *Header) GoString() string { + if this == nil { + return "nil" + } + s := make([]string, 0, 6) + s = append(s, "&httpgrpc.Header{") + s = append(s, "Key: "+fmt.Sprintf("%#v", this.Key)+",\n") + s = append(s, "Values: "+fmt.Sprintf("%#v", this.Values)+",\n") + s = append(s, "}") + return strings.Join(s, "") +} +func valueToGoStringHttpgrpc(v interface{}, typ string) string { + rv := reflect.ValueOf(v) + if rv.IsNil() { + return "nil" + } + pv := reflect.Indirect(rv).Interface() + return fmt.Sprintf("func(v %v) *%v { return &v } ( %#v )", typ, typ, pv) +} // Reference imports to suppress errors if they are not otherwise used. var _ context.Context @@ -209,27 +363,797 @@ var _HTTP_serviceDesc = grpc.ServiceDesc{ Metadata: "github.com/weaveworks/common/httpgrpc/httpgrpc.proto", } +func (m *HTTPRequest) Marshal() (dAtA []byte, err error) { + size := m.Size() + dAtA = make([]byte, size) + n, err := m.MarshalTo(dAtA) + if err != nil { + return nil, err + } + return dAtA[:n], nil +} + +func (m *HTTPRequest) MarshalTo(dAtA []byte) (int, error) { + var i int + _ = i + var l int + _ = l + if len(m.Method) > 0 { + dAtA[i] = 0xa + i++ + i = encodeVarintHttpgrpc(dAtA, i, uint64(len(m.Method))) + i += copy(dAtA[i:], m.Method) + } + if len(m.Url) > 0 { + dAtA[i] = 0x12 + i++ + i = encodeVarintHttpgrpc(dAtA, i, uint64(len(m.Url))) + i += copy(dAtA[i:], m.Url) + } + if len(m.Headers) > 0 { + for _, msg := range m.Headers { + dAtA[i] = 0x1a + i++ + i = encodeVarintHttpgrpc(dAtA, i, uint64(msg.Size())) + n, err := msg.MarshalTo(dAtA[i:]) + if err != nil { + return 0, err + } + i += n + } + } + if len(m.Body) > 0 { + dAtA[i] = 0x22 + i++ + i = encodeVarintHttpgrpc(dAtA, i, uint64(len(m.Body))) + i += copy(dAtA[i:], m.Body) + } + return i, nil +} + +func (m *HTTPResponse) Marshal() (dAtA []byte, err error) { + size := m.Size() + dAtA = make([]byte, size) + n, err := m.MarshalTo(dAtA) + if err != nil { + return nil, err + } + return dAtA[:n], nil +} + +func (m *HTTPResponse) MarshalTo(dAtA []byte) (int, error) { + var i int + _ = i + var l int + _ = l + if m.Code != 0 { + dAtA[i] = 0x8 + i++ + i = encodeVarintHttpgrpc(dAtA, i, uint64(m.Code)) + } + if len(m.Headers) > 0 { + for _, msg := range m.Headers { + dAtA[i] = 0x12 + i++ + i = encodeVarintHttpgrpc(dAtA, i, uint64(msg.Size())) + n, err := msg.MarshalTo(dAtA[i:]) + if err != nil { + return 0, err + } + i += n + } + } + if len(m.Body) > 0 { + dAtA[i] = 0x1a + i++ + i = encodeVarintHttpgrpc(dAtA, i, uint64(len(m.Body))) + i += copy(dAtA[i:], m.Body) + } + return i, nil +} + +func (m *Header) Marshal() (dAtA []byte, err error) { + size := m.Size() + dAtA = make([]byte, size) + n, err := m.MarshalTo(dAtA) + if err != nil { + return nil, err + } + return dAtA[:n], nil +} + +func (m *Header) MarshalTo(dAtA []byte) (int, error) { + var i int + _ = i + var l int + _ = l + if len(m.Key) > 0 { + dAtA[i] = 0xa + i++ + i = encodeVarintHttpgrpc(dAtA, i, uint64(len(m.Key))) + i += copy(dAtA[i:], m.Key) + } + if len(m.Values) > 0 { + for _, s := range m.Values { + dAtA[i] = 0x12 + i++ + l = len(s) + for l >= 1<<7 { + dAtA[i] = uint8(uint64(l)&0x7f | 0x80) + l >>= 7 + i++ + } + dAtA[i] = uint8(l) + i++ + i += copy(dAtA[i:], s) + } + } + return i, nil +} + +func encodeVarintHttpgrpc(dAtA []byte, offset int, v uint64) int { + for v >= 1<<7 { + dAtA[offset] = uint8(v&0x7f | 0x80) + v >>= 7 + offset++ + } + dAtA[offset] = uint8(v) + return offset + 1 +} +func (m *HTTPRequest) Size() (n int) { + var l int + _ = l + l = len(m.Method) + if l > 0 { + n += 1 + l + sovHttpgrpc(uint64(l)) + } + l = len(m.Url) + if l > 0 { + n += 1 + l + sovHttpgrpc(uint64(l)) + } + if len(m.Headers) > 0 { + for _, e := range m.Headers { + l = e.Size() + n += 1 + l + sovHttpgrpc(uint64(l)) + } + } + l = len(m.Body) + if l > 0 { + n += 1 + l + sovHttpgrpc(uint64(l)) + } + return n +} + +func (m *HTTPResponse) Size() (n int) { + var l int + _ = l + if m.Code != 0 { + n += 1 + sovHttpgrpc(uint64(m.Code)) + } + if len(m.Headers) > 0 { + for _, e := range m.Headers { + l = e.Size() + n += 1 + l + sovHttpgrpc(uint64(l)) + } + } + l = len(m.Body) + if l > 0 { + n += 1 + l + sovHttpgrpc(uint64(l)) + } + return n +} + +func (m *Header) Size() (n int) { + var l int + _ = l + l = len(m.Key) + if l > 0 { + n += 1 + l + sovHttpgrpc(uint64(l)) + } + if len(m.Values) > 0 { + for _, s := range m.Values { + l = len(s) + n += 1 + l + sovHttpgrpc(uint64(l)) + } + } + return n +} + +func sovHttpgrpc(x uint64) (n int) { + for { + n++ + x >>= 7 + if x == 0 { + break + } + } + return n +} +func sozHttpgrpc(x uint64) (n int) { + return sovHttpgrpc(uint64((x << 1) ^ uint64((int64(x) >> 63)))) +} +func (this *HTTPRequest) String() string { + if this == nil { + return "nil" + } + s := strings.Join([]string{`&HTTPRequest{`, + `Method:` + fmt.Sprintf("%v", this.Method) + `,`, + `Url:` + fmt.Sprintf("%v", this.Url) + `,`, + `Headers:` + strings.Replace(fmt.Sprintf("%v", this.Headers), "Header", "Header", 1) + `,`, + `Body:` + fmt.Sprintf("%v", this.Body) + `,`, + `}`, + }, "") + return s +} +func (this *HTTPResponse) String() string { + if this == nil { + return "nil" + } + s := strings.Join([]string{`&HTTPResponse{`, + `Code:` + fmt.Sprintf("%v", this.Code) + `,`, + `Headers:` + strings.Replace(fmt.Sprintf("%v", this.Headers), "Header", "Header", 1) + `,`, + `Body:` + fmt.Sprintf("%v", this.Body) + `,`, + `}`, + }, "") + return s +} +func (this *Header) String() string { + if this == nil { + return "nil" + } + s := strings.Join([]string{`&Header{`, + `Key:` + fmt.Sprintf("%v", this.Key) + `,`, + `Values:` + fmt.Sprintf("%v", this.Values) + `,`, + `}`, + }, "") + return s +} +func valueToStringHttpgrpc(v interface{}) string { + rv := reflect.ValueOf(v) + if rv.IsNil() { + return "nil" + } + pv := reflect.Indirect(rv).Interface() + return fmt.Sprintf("*%v", pv) +} +func (m *HTTPRequest) Unmarshal(dAtA []byte) error { + l := len(dAtA) + iNdEx := 0 + for iNdEx < l { + preIndex := iNdEx + var wire uint64 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowHttpgrpc + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + wire |= (uint64(b) & 0x7F) << shift + if b < 0x80 { + break + } + } + fieldNum := int32(wire >> 3) + wireType := int(wire & 0x7) + if wireType == 4 { + return fmt.Errorf("proto: HTTPRequest: wiretype end group for non-group") + } + if fieldNum <= 0 { + return fmt.Errorf("proto: HTTPRequest: illegal tag %d (wire type %d)", fieldNum, wire) + } + switch fieldNum { + case 1: + if wireType != 2 { + return fmt.Errorf("proto: wrong wireType = %d for field Method", wireType) + } + var stringLen uint64 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowHttpgrpc + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + stringLen |= (uint64(b) & 0x7F) << shift + if b < 0x80 { + break + } + } + intStringLen := int(stringLen) + if intStringLen < 0 { + return ErrInvalidLengthHttpgrpc + } + postIndex := iNdEx + intStringLen + if postIndex > l { + return io.ErrUnexpectedEOF + } + m.Method = string(dAtA[iNdEx:postIndex]) + iNdEx = postIndex + case 2: + if wireType != 2 { + return fmt.Errorf("proto: wrong wireType = %d for field Url", wireType) + } + var stringLen uint64 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowHttpgrpc + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + stringLen |= (uint64(b) & 0x7F) << shift + if b < 0x80 { + break + } + } + intStringLen := int(stringLen) + if intStringLen < 0 { + return ErrInvalidLengthHttpgrpc + } + postIndex := iNdEx + intStringLen + if postIndex > l { + return io.ErrUnexpectedEOF + } + m.Url = string(dAtA[iNdEx:postIndex]) + iNdEx = postIndex + case 3: + if wireType != 2 { + return fmt.Errorf("proto: wrong wireType = %d for field Headers", wireType) + } + var msglen int + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowHttpgrpc + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + msglen |= (int(b) & 0x7F) << shift + if b < 0x80 { + break + } + } + if msglen < 0 { + return ErrInvalidLengthHttpgrpc + } + postIndex := iNdEx + msglen + if postIndex > l { + return io.ErrUnexpectedEOF + } + m.Headers = append(m.Headers, &Header{}) + if err := m.Headers[len(m.Headers)-1].Unmarshal(dAtA[iNdEx:postIndex]); err != nil { + return err + } + iNdEx = postIndex + case 4: + if wireType != 2 { + return fmt.Errorf("proto: wrong wireType = %d for field Body", wireType) + } + var byteLen int + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowHttpgrpc + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + byteLen |= (int(b) & 0x7F) << shift + if b < 0x80 { + break + } + } + if byteLen < 0 { + return ErrInvalidLengthHttpgrpc + } + postIndex := iNdEx + byteLen + if postIndex > l { + return io.ErrUnexpectedEOF + } + m.Body = append(m.Body[:0], dAtA[iNdEx:postIndex]...) + if m.Body == nil { + m.Body = []byte{} + } + iNdEx = postIndex + default: + iNdEx = preIndex + skippy, err := skipHttpgrpc(dAtA[iNdEx:]) + if err != nil { + return err + } + if skippy < 0 { + return ErrInvalidLengthHttpgrpc + } + if (iNdEx + skippy) > l { + return io.ErrUnexpectedEOF + } + iNdEx += skippy + } + } + + if iNdEx > l { + return io.ErrUnexpectedEOF + } + return nil +} +func (m *HTTPResponse) Unmarshal(dAtA []byte) error { + l := len(dAtA) + iNdEx := 0 + for iNdEx < l { + preIndex := iNdEx + var wire uint64 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowHttpgrpc + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + wire |= (uint64(b) & 0x7F) << shift + if b < 0x80 { + break + } + } + fieldNum := int32(wire >> 3) + wireType := int(wire & 0x7) + if wireType == 4 { + return fmt.Errorf("proto: HTTPResponse: wiretype end group for non-group") + } + if fieldNum <= 0 { + return fmt.Errorf("proto: HTTPResponse: illegal tag %d (wire type %d)", fieldNum, wire) + } + switch fieldNum { + case 1: + if wireType != 0 { + return fmt.Errorf("proto: wrong wireType = %d for field Code", wireType) + } + m.Code = 0 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowHttpgrpc + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + m.Code |= (int32(b) & 0x7F) << shift + if b < 0x80 { + break + } + } + case 2: + if wireType != 2 { + return fmt.Errorf("proto: wrong wireType = %d for field Headers", wireType) + } + var msglen int + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowHttpgrpc + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + msglen |= (int(b) & 0x7F) << shift + if b < 0x80 { + break + } + } + if msglen < 0 { + return ErrInvalidLengthHttpgrpc + } + postIndex := iNdEx + msglen + if postIndex > l { + return io.ErrUnexpectedEOF + } + m.Headers = append(m.Headers, &Header{}) + if err := m.Headers[len(m.Headers)-1].Unmarshal(dAtA[iNdEx:postIndex]); err != nil { + return err + } + iNdEx = postIndex + case 3: + if wireType != 2 { + return fmt.Errorf("proto: wrong wireType = %d for field Body", wireType) + } + var byteLen int + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowHttpgrpc + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + byteLen |= (int(b) & 0x7F) << shift + if b < 0x80 { + break + } + } + if byteLen < 0 { + return ErrInvalidLengthHttpgrpc + } + postIndex := iNdEx + byteLen + if postIndex > l { + return io.ErrUnexpectedEOF + } + m.Body = append(m.Body[:0], dAtA[iNdEx:postIndex]...) + if m.Body == nil { + m.Body = []byte{} + } + iNdEx = postIndex + default: + iNdEx = preIndex + skippy, err := skipHttpgrpc(dAtA[iNdEx:]) + if err != nil { + return err + } + if skippy < 0 { + return ErrInvalidLengthHttpgrpc + } + if (iNdEx + skippy) > l { + return io.ErrUnexpectedEOF + } + iNdEx += skippy + } + } + + if iNdEx > l { + return io.ErrUnexpectedEOF + } + return nil +} +func (m *Header) Unmarshal(dAtA []byte) error { + l := len(dAtA) + iNdEx := 0 + for iNdEx < l { + preIndex := iNdEx + var wire uint64 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowHttpgrpc + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + wire |= (uint64(b) & 0x7F) << shift + if b < 0x80 { + break + } + } + fieldNum := int32(wire >> 3) + wireType := int(wire & 0x7) + if wireType == 4 { + return fmt.Errorf("proto: Header: wiretype end group for non-group") + } + if fieldNum <= 0 { + return fmt.Errorf("proto: Header: illegal tag %d (wire type %d)", fieldNum, wire) + } + switch fieldNum { + case 1: + if wireType != 2 { + return fmt.Errorf("proto: wrong wireType = %d for field Key", wireType) + } + var stringLen uint64 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowHttpgrpc + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + stringLen |= (uint64(b) & 0x7F) << shift + if b < 0x80 { + break + } + } + intStringLen := int(stringLen) + if intStringLen < 0 { + return ErrInvalidLengthHttpgrpc + } + postIndex := iNdEx + intStringLen + if postIndex > l { + return io.ErrUnexpectedEOF + } + m.Key = string(dAtA[iNdEx:postIndex]) + iNdEx = postIndex + case 2: + if wireType != 2 { + return fmt.Errorf("proto: wrong wireType = %d for field Values", wireType) + } + var stringLen uint64 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowHttpgrpc + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + stringLen |= (uint64(b) & 0x7F) << shift + if b < 0x80 { + break + } + } + intStringLen := int(stringLen) + if intStringLen < 0 { + return ErrInvalidLengthHttpgrpc + } + postIndex := iNdEx + intStringLen + if postIndex > l { + return io.ErrUnexpectedEOF + } + m.Values = append(m.Values, string(dAtA[iNdEx:postIndex])) + iNdEx = postIndex + default: + iNdEx = preIndex + skippy, err := skipHttpgrpc(dAtA[iNdEx:]) + if err != nil { + return err + } + if skippy < 0 { + return ErrInvalidLengthHttpgrpc + } + if (iNdEx + skippy) > l { + return io.ErrUnexpectedEOF + } + iNdEx += skippy + } + } + + if iNdEx > l { + return io.ErrUnexpectedEOF + } + return nil +} +func skipHttpgrpc(dAtA []byte) (n int, err error) { + l := len(dAtA) + iNdEx := 0 + for iNdEx < l { + var wire uint64 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return 0, ErrIntOverflowHttpgrpc + } + if iNdEx >= l { + return 0, io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + wire |= (uint64(b) & 0x7F) << shift + if b < 0x80 { + break + } + } + wireType := int(wire & 0x7) + switch wireType { + case 0: + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return 0, ErrIntOverflowHttpgrpc + } + if iNdEx >= l { + return 0, io.ErrUnexpectedEOF + } + iNdEx++ + if dAtA[iNdEx-1] < 0x80 { + break + } + } + return iNdEx, nil + case 1: + iNdEx += 8 + return iNdEx, nil + case 2: + var length int + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return 0, ErrIntOverflowHttpgrpc + } + if iNdEx >= l { + return 0, io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + length |= (int(b) & 0x7F) << shift + if b < 0x80 { + break + } + } + iNdEx += length + if length < 0 { + return 0, ErrInvalidLengthHttpgrpc + } + return iNdEx, nil + case 3: + for { + var innerWire uint64 + var start int = iNdEx + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return 0, ErrIntOverflowHttpgrpc + } + if iNdEx >= l { + return 0, io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + innerWire |= (uint64(b) & 0x7F) << shift + if b < 0x80 { + break + } + } + innerWireType := int(innerWire & 0x7) + if innerWireType == 4 { + break + } + next, err := skipHttpgrpc(dAtA[start:]) + if err != nil { + return 0, err + } + iNdEx = start + next + } + return iNdEx, nil + case 4: + return iNdEx, nil + case 5: + iNdEx += 4 + return iNdEx, nil + default: + return 0, fmt.Errorf("proto: illegal wireType %d", wireType) + } + } + panic("unreachable") +} + +var ( + ErrInvalidLengthHttpgrpc = fmt.Errorf("proto: negative length found during unmarshaling") + ErrIntOverflowHttpgrpc = fmt.Errorf("proto: integer overflow") +) + func init() { - proto.RegisterFile("github.com/weaveworks/common/httpgrpc/httpgrpc.proto", fileDescriptor0) -} - -var fileDescriptor0 = []byte{ - // 264 bytes of a gzipped FileDescriptorProto - 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x09, 0x6e, 0x88, 0x02, 0xff, 0x8c, 0x91, 0x4f, 0x4f, 0x83, 0x40, - 0x10, 0xc5, 0x6d, 0x41, 0xb4, 0xd3, 0x1e, 0x9a, 0x4d, 0x6c, 0x88, 0x27, 0x42, 0x62, 0x24, 0x1e, - 0x20, 0x41, 0x2f, 0x1e, 0xd5, 0x0b, 0x47, 0xb3, 0xe9, 0xc9, 0x1b, 0x7f, 0x26, 0xc5, 0x14, 0x18, - 0xdc, 0x5d, 0x4a, 0xfa, 0xed, 0xcd, 0x2e, 0xb4, 0x12, 0x4f, 0xbd, 0xbd, 0x37, 0xbc, 0xf0, 0x9b, - 0xb7, 0x03, 0x2f, 0xbb, 0x6f, 0x55, 0x76, 0x59, 0x98, 0x53, 0x1d, 0xf5, 0x98, 0x1e, 0xb0, 0x27, - 0xb1, 0x97, 0x51, 0x4e, 0x75, 0x4d, 0x4d, 0x54, 0x2a, 0xd5, 0xee, 0x44, 0x9b, 0x9f, 0x45, 0xd8, - 0x0a, 0x52, 0xc4, 0x6e, 0x4f, 0xde, 0xef, 0x61, 0x99, 0x6c, 0xb7, 0x9f, 0x1c, 0x7f, 0x3a, 0x94, - 0x8a, 0x6d, 0xc0, 0xa9, 0x51, 0x95, 0x54, 0xb8, 0x33, 0x6f, 0x16, 0x2c, 0xf8, 0xe8, 0xd8, 0x1a, - 0xac, 0x4e, 0x54, 0xee, 0xdc, 0x0c, 0xb5, 0x64, 0x4f, 0x70, 0x53, 0x62, 0x5a, 0xa0, 0x90, 0xae, - 0xe5, 0x59, 0xc1, 0x32, 0x5e, 0x87, 0x67, 0x48, 0x62, 0x3e, 0xf0, 0x53, 0x80, 0x31, 0xb0, 0x33, - 0x2a, 0x8e, 0xae, 0xed, 0xcd, 0x82, 0x15, 0x37, 0xda, 0xcf, 0x60, 0x35, 0x80, 0x65, 0x4b, 0x8d, - 0x44, 0x9d, 0xf9, 0xa0, 0x02, 0x0d, 0xf7, 0x9a, 0x1b, 0x3d, 0x65, 0xcc, 0x2f, 0x65, 0x58, 0x13, - 0x46, 0x0c, 0xce, 0x10, 0xd3, 0xfb, 0xef, 0xf1, 0x38, 0x96, 0xd2, 0x52, 0x37, 0x3d, 0xa4, 0x55, - 0x87, 0xc3, 0xaf, 0x17, 0x7c, 0x74, 0xf1, 0x1b, 0xd8, 0x7a, 0x2f, 0xf6, 0x0a, 0x4e, 0x92, 0x36, - 0x45, 0x85, 0xec, 0x6e, 0x02, 0xfd, 0x7b, 0xaa, 0xfb, 0xcd, 0xff, 0xf1, 0x50, 0xc4, 0xbf, 0x7a, - 0x7f, 0xfc, 0x7a, 0xb8, 0xe8, 0x2a, 0x99, 0x63, 0xae, 0xf1, 0xfc, 0x1b, 0x00, 0x00, 0xff, 0xff, - 0x1c, 0x0f, 0x07, 0xc6, 0xc5, 0x01, 0x00, 0x00, + proto.RegisterFile("github.com/weaveworks/common/httpgrpc/httpgrpc.proto", fileDescriptorHttpgrpc) +} + +var fileDescriptorHttpgrpc = []byte{ + // 319 bytes of a gzipped FileDescriptorProto + 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0xe2, 0x32, 0x49, 0xcf, 0x2c, 0xc9, + 0x28, 0x4d, 0xd2, 0x4b, 0xce, 0xcf, 0xd5, 0x2f, 0x4f, 0x4d, 0x2c, 0x4b, 0x2d, 0xcf, 0x2f, 0xca, + 0x2e, 0xd6, 0x4f, 0xce, 0xcf, 0xcd, 0xcd, 0xcf, 0xd3, 0xcf, 0x28, 0x29, 0x29, 0x48, 0x2f, 0x2a, + 0x48, 0x86, 0x33, 0xf4, 0x0a, 0x8a, 0xf2, 0x4b, 0xf2, 0x85, 0x38, 0x60, 0x7c, 0xa5, 0x72, 0x2e, + 0x6e, 0x8f, 0x90, 0x90, 0x80, 0xa0, 0xd4, 0xc2, 0xd2, 0xd4, 0xe2, 0x12, 0x21, 0x31, 0x2e, 0xb6, + 0xdc, 0xd4, 0x92, 0x8c, 0xfc, 0x14, 0x09, 0x46, 0x05, 0x46, 0x0d, 0xce, 0x20, 0x28, 0x4f, 0x48, + 0x80, 0x8b, 0xb9, 0xb4, 0x28, 0x47, 0x82, 0x09, 0x2c, 0x08, 0x62, 0x0a, 0x69, 0x71, 0xb1, 0x67, + 0xa4, 0x26, 0xa6, 0xa4, 0x16, 0x15, 0x4b, 0x30, 0x2b, 0x30, 0x6b, 0x70, 0x1b, 0x09, 0xe8, 0xc1, + 0x2d, 0xf1, 0x00, 0x4b, 0x04, 0xc1, 0x14, 0x08, 0x09, 0x71, 0xb1, 0x24, 0xe5, 0xa7, 0x54, 0x4a, + 0xb0, 0x28, 0x30, 0x6a, 0xf0, 0x04, 0x81, 0xd9, 0x4a, 0x49, 0x5c, 0x3c, 0x10, 0x8b, 0x8b, 0x0b, + 0xf2, 0xf3, 0x8a, 0x53, 0x41, 0x6a, 0x9c, 0xf3, 0x53, 0x52, 0xc1, 0xf6, 0xb2, 0x06, 0xb1, 0x24, + 0xe7, 0xa7, 0xa4, 0x22, 0xdb, 0xc1, 0x44, 0xac, 0x1d, 0xcc, 0x48, 0x76, 0x18, 0x71, 0xb1, 0x41, + 0x94, 0x81, 0xdc, 0x9f, 0x9d, 0x5a, 0x09, 0xf5, 0x14, 0x88, 0x09, 0xf2, 0x69, 0x59, 0x62, 0x4e, + 0x69, 0x2a, 0xc4, 0x68, 0xce, 0x20, 0x28, 0xcf, 0xc8, 0x91, 0x8b, 0x05, 0xe4, 0x2e, 0x21, 0x4b, + 0x2e, 0x36, 0x8f, 0xc4, 0xbc, 0x94, 0x9c, 0x54, 0x21, 0x51, 0x24, 0x4b, 0x11, 0x41, 0x25, 0x25, + 0x86, 0x2e, 0x0c, 0xf1, 0x88, 0x12, 0x83, 0x53, 0xf0, 0x85, 0x87, 0x72, 0x0c, 0x37, 0x1e, 0xca, + 0x31, 0x7c, 0x78, 0x28, 0xc7, 0xd8, 0xf0, 0x48, 0x8e, 0x71, 0xc5, 0x23, 0x39, 0xc6, 0x13, 0x8f, + 0xe4, 0x18, 0x2f, 0x3c, 0x92, 0x63, 0x7c, 0xf0, 0x48, 0x8e, 0xf1, 0xc5, 0x23, 0x39, 0x86, 0x0f, + 0x8f, 0xe4, 0x18, 0x27, 0x3c, 0x96, 0x63, 0x88, 0x52, 0x25, 0x2a, 0x06, 0x93, 0xd8, 0xc0, 0x31, + 0x67, 0x0c, 0x08, 0x00, 0x00, 0xff, 0xff, 0x49, 0xec, 0x86, 0xd6, 0xf1, 0x01, 0x00, 0x00, } diff --git a/vendor/github.com/weaveworks/common/httpgrpc/server/server.go b/vendor/github.com/weaveworks/common/httpgrpc/server/server.go index 7acebb8b93..1fdb607e40 100644 --- a/vendor/github.com/weaveworks/common/httpgrpc/server/server.go +++ b/vendor/github.com/weaveworks/common/httpgrpc/server/server.go @@ -66,7 +66,7 @@ func (s Server) Handle(ctx context.Context, r *httpgrpc.HTTPRequest) (*httpgrpc. if recorder.Code/100 == 5 { return nil, httpgrpc.ErrorFromHTTPResponse(resp) } - return resp, err + return resp, nil } // Client is a http.Handler that forwards the request over gRPC. @@ -143,13 +143,37 @@ func NewClient(address string) (*Client, error) { }, nil } -// ServeHTTP implements http.Handler -func (c *Client) ServeHTTP(w http.ResponseWriter, r *http.Request) { +func HTTPRequest(r *http.Request) (*httpgrpc.HTTPRequest, error) { body, err := ioutil.ReadAll(r.Body) if err != nil { + return nil, err + } + return &httpgrpc.HTTPRequest{ + Method: r.Method, + Url: r.RequestURI, + Body: body, + Headers: fromHeader(r.Header), + }, nil +} + +func WriteResponse(w http.ResponseWriter, resp *httpgrpc.HTTPResponse) error { + toHeader(resp.Headers, w.Header()) + w.WriteHeader(int(resp.Code)) + _, err := w.Write(resp.Body) + return err +} + +func WriteError(w http.ResponseWriter, err error) { + resp, ok := httpgrpc.HTTPResponseFromError(err) + if ok { + WriteResponse(w, resp) + } else { http.Error(w, err.Error(), http.StatusInternalServerError) - return } +} + +// ServeHTTP implements http.Handler +func (c *Client) ServeHTTP(w http.ResponseWriter, r *http.Request) { if tracer := opentracing.GlobalTracer(); tracer != nil { if span := opentracing.SpanFromContext(r.Context()); span != nil { if err := tracer.Inject(span.Context(), opentracing.HTTPHeaders, opentracing.HTTPHeadersCarrier(r.Header)); err != nil { @@ -157,13 +181,12 @@ func (c *Client) ServeHTTP(w http.ResponseWriter, r *http.Request) { } } } - req := &httpgrpc.HTTPRequest{ - Method: r.Method, - Url: r.RequestURI, - Body: body, - Headers: fromHeader(r.Header), - } + req, err := HTTPRequest(r) + if err != nil { + http.Error(w, err.Error(), http.StatusInternalServerError) + return + } resp, err := c.client.Handle(r.Context(), req) if err != nil { // Some errors will actually contain a valid resp, just need to unpack it @@ -176,9 +199,7 @@ func (c *Client) ServeHTTP(w http.ResponseWriter, r *http.Request) { } } - toHeader(resp.Headers, w.Header()) - w.WriteHeader(int(resp.Code)) - if _, err := w.Write(resp.Body); err != nil { + if err := WriteResponse(w, resp); err != nil { http.Error(w, err.Error(), http.StatusInternalServerError) return } From 37db514b7d2bbd0d04914aeb3dbd7bd8a820ec51 Mon Sep 17 00:00:00 2001 From: Tom Wilkie Date: Mon, 30 Jul 2018 22:12:46 +0100 Subject: [PATCH 02/10] Add bunch of metrics. Signed-off-by: Tom Wilkie --- cmd/querier/main.go | 1 + pkg/querier/frontend/frontend.go | 39 ++++++++++++++++++++++++++++---- 2 files changed, 35 insertions(+), 5 deletions(-) diff --git a/cmd/querier/main.go b/cmd/querier/main.go index e1f2835d50..86091978f0 100644 --- a/cmd/querier/main.go +++ b/cmd/querier/main.go @@ -89,6 +89,7 @@ func main() { } defer chunkStore.Stop() + // TODO this avoids our middleware for logging and latecy collection. worker, err := frontend.NewWorker(workerConfig, httpgrpc_server.NewServer(server.HTTP), util.Logger) if err != nil { level.Error(util.Logger).Log("err", err) diff --git a/pkg/querier/frontend/frontend.go b/pkg/querier/frontend/frontend.go index 0f8990f9ba..cc2acc8c57 100644 --- a/pkg/querier/frontend/frontend.go +++ b/pkg/querier/frontend/frontend.go @@ -5,15 +5,36 @@ import ( "math/rand" "net/http" "sync" + "time" "github.com/go-kit/kit/log" "github.com/go-kit/kit/log/level" + "github.com/prometheus/client_golang/prometheus" + "github.com/prometheus/client_golang/prometheus/promauto" "github.com/weaveworks/common/httpgrpc" "github.com/weaveworks/common/httpgrpc/server" "github.com/weaveworks/common/user" ) var ( + queueDutation = promauto.NewHistogram(prometheus.HistogramOpts{ + Namespace: "cortex", + Name: "query_frontend_queue_duration_seconds", + Help: "Time spend by requests queued.", + Buckets: prometheus.DefBuckets, + }) + retries = promauto.NewHistogram(prometheus.HistogramOpts{ + Namespace: "cortex", + Name: "query_frontend_retries", + Help: "Number of times a request is retried.", + Buckets: []float64{0, 1, 2, 3, 4, 5}, + }) + queueLength = promauto.NewGauge(prometheus.GaugeOpts{ + Namespace: "cortex", + Name: "query_frontend_queue_length", + Help: "Number of queries in the queue.", + }) + errServerClosing = httpgrpc.Errorf(http.StatusTeapot, "server closing down") errTooManyRequest = httpgrpc.Errorf(http.StatusTooManyRequests, "too many outstanding requests") errCanceled = httpgrpc.Errorf(http.StatusInternalServerError, "context cancelled") @@ -44,9 +65,10 @@ type Frontend struct { } type request struct { - request *httpgrpc.HTTPRequest - err chan error - response chan *httpgrpc.HTTPResponse + enqueueTime time.Time + request *httpgrpc.HTTPRequest + err chan error + response chan *httpgrpc.HTTPResponse } // New creates a new frontend. @@ -103,7 +125,7 @@ func (f *Frontend) serveHTTP(w http.ResponseWriter, r *http.Request) error { } var lastErr error - for retries := 0; retries < f.cfg.MaxRetries; retries++ { + for tries := 0; tries < f.cfg.MaxRetries; tries++ { if err := f.queueRequest(userID, request); err != nil { return err } @@ -117,12 +139,13 @@ func (f *Frontend) serveHTTP(w http.ResponseWriter, r *http.Request) error { case resp = <-request.response: case lastErr = <-request.err: - level.Error(f.log).Log("msg", "error processing request", "try", retries, "err", lastErr) + level.Error(f.log).Log("msg", "error processing request", "try", tries, "err", lastErr) resp, _ = httpgrpc.HTTPResponseFromError(lastErr) } // Only fail is we get a valid HTTP non-500; otherwise retry. if resp != nil && resp.Code/100 != 5 { + retries.Observe(float64(tries)) server.WriteResponse(w, resp) return nil } @@ -158,6 +181,8 @@ func (f *Frontend) Process(server Frontend_ProcessServer) error { } func (f *Frontend) queueRequest(userID string, req *request) error { + req.enqueueTime = time.Now() + f.mtx.Lock() defer f.mtx.Unlock() @@ -173,6 +198,7 @@ func (f *Frontend) queueRequest(userID string, req *request) error { select { case queue <- req: + queueLength.Add(1) f.cond.Signal() return nil default: @@ -205,6 +231,9 @@ func (f *Frontend) getNextRequest() *request { if len(queue) == 0 { delete(f.queues, userID) } + + queueDutation.Observe(time.Now().Sub(request.enqueueTime).Seconds()) + queueLength.Add(-1) return request } From a200a9ac3ec9a0c8350cf0dad2bc80a6b04c87d6 Mon Sep 17 00:00:00 2001 From: Tom Wilkie Date: Tue, 31 Jul 2018 10:03:22 +0100 Subject: [PATCH 03/10] Resolve and connect to multiple frontends. Signed-off-by: Tom Wilkie --- pkg/querier/frontend/worker.go | 153 ++++++++++++++++++++++++--------- 1 file changed, 110 insertions(+), 43 deletions(-) diff --git a/pkg/querier/frontend/worker.go b/pkg/querier/frontend/worker.go index 201864d930..c6cf72ab1e 100644 --- a/pkg/querier/frontend/worker.go +++ b/pkg/querier/frontend/worker.go @@ -16,6 +16,7 @@ import ( "github.com/weaveworks/common/httpgrpc/server" "github.com/weaveworks/common/middleware" "google.golang.org/grpc" + "google.golang.org/grpc/naming" "github.com/weaveworks/cortex/pkg/util" ) @@ -29,73 +30,138 @@ var ( // WorkerConfig is config for a worker. type WorkerConfig struct { - Address string - Parallelism int + Address string + Parallelism int + DNSLookupDuration time.Duration } // RegisterFlags adds the flags required to config this to the given FlagSet. func (cfg *WorkerConfig) RegisterFlags(f *flag.FlagSet) { - f.StringVar(&cfg.Address, "querier.frontend-address", "", "") - f.IntVar(&cfg.Parallelism, "querier.worker-parallelism", 1, "") + f.StringVar(&cfg.Address, "querier.frontend-address", "", "Address of query frontend service.") + f.IntVar(&cfg.Parallelism, "querier.worker-parallelism", 1, "Number of simultaneous queries to process.") + f.DurationVar(&cfg.DNSLookupDuration, "querier.dns-lookup-period", 10*time.Second, "How often to query DNS.") } // Worker is the counter-part to the frontend, actually processing requests. -type Worker struct { +type Worker interface { + Stop() +} + +type worker struct { cfg WorkerConfig log log.Logger server *server.Server - client FrontendClient - ctx context.Context - cancel context.CancelFunc - wg sync.WaitGroup + ctx context.Context + cancel context.CancelFunc + watcher naming.Watcher + wg sync.WaitGroup +} + +type noopWorker struct { } +func (noopWorker) Stop() {} + // NewWorker creates a new Worker. -func NewWorker(cfg WorkerConfig, server *server.Server, log log.Logger) (*Worker, error) { - client, err := connect(cfg.Address) +func NewWorker(cfg WorkerConfig, server *server.Server, log log.Logger) (Worker, error) { + if cfg.Address == "" { + return noopWorker{}, nil + } + + resolver, err := naming.NewDNSResolverWithFreq(cfg.DNSLookupDuration) + if err != nil { + return nil, err + } + + watcher, err := resolver.Resolve(cfg.Address) if err != nil { return nil, err } ctx, cancel := context.WithCancel(context.Background()) - worker := &Worker{ + + w := &worker{ cfg: cfg, log: log, server: server, - client: client, - ctx: ctx, - cancel: cancel, + ctx: ctx, + cancel: cancel, + watcher: watcher, } - worker.wg.Add(cfg.Parallelism) - for i := 0; i < cfg.Parallelism; i++ { - go worker.run() - } - return worker, nil + w.wg.Add(1) + go w.watchDNSLoop() + return w, nil } // Stop the worker. -func (w *Worker) Stop() { +func (w *worker) Stop() { + w.watcher.Close() w.cancel() w.wg.Wait() } -// Run infinitely loops, trying to establish a connection to the frontend to -// begin request processing. -func (w *Worker) run() { +// watchDNSLoop watches for changes in DNS and starts or stops workers. +func (w *worker) watchDNSLoop() { + defer w.wg.Done() + + cancels := map[string]context.CancelFunc{} + defer func() { + for _, cancel := range cancels { + cancel() + } + }() + + for updates, err := w.watcher.Next(); err != nil; { + for _, update := range updates { + switch update.Op { + case naming.Add: + ctx, cancel := context.WithCancel(w.ctx) + cancels[update.Addr] = cancel + w.runMany(ctx, update.Addr) + + case naming.Delete: + if cancel, ok := cancels[update.Addr]; ok { + cancel() + } + + default: + panic("unknown op") + } + } + } +} + +// runMany starts N runOne loops for a given address. +func (w *worker) runMany(ctx context.Context, address string) error { + client, err := connect(address) + if err != nil { + return err + } + + w.wg.Add(w.cfg.Parallelism) + for i := 0; i < w.cfg.Parallelism; i++ { + go w.runOne(ctx, client) + } + return nil +} + +// runOne loops, trying to establish a stream to the frontend to begin +// request processing. +func (w *worker) runOne(ctx context.Context, client FrontendClient) { defer w.wg.Done() - backoff := util.NewBackoff(w.ctx, backoffConfig) + backoff := util.NewBackoff(ctx, backoffConfig) for backoff.Ongoing() { - c, err := w.client.Process(w.ctx) + c, err := client.Process(ctx) if err != nil { level.Error(w.log).Log("msg", "error contacting frontend", "err", err) backoff.Wait() continue } - if err := w.loop(w.ctx, c); err != nil { + if err := w.process(ctx, c); err != nil { level.Error(w.log).Log("msg", "error processing requests", "err", err) backoff.Wait() continue @@ -105,22 +171,8 @@ func (w *Worker) run() { } } -func connect(address string) (FrontendClient, error) { - conn, err := grpc.Dial( - address, - grpc.WithInsecure(), - grpc.WithUnaryInterceptor(grpc_middleware.ChainUnaryClient( - otgrpc.OpenTracingClientInterceptor(opentracing.GlobalTracer()), - middleware.ClientUserHeaderInterceptor, - )), - ) - if err != nil { - return nil, err - } - return NewFrontendClient(conn), nil -} - -func (w *Worker) loop(ctx context.Context, c Frontend_ProcessClient) error { +// process loops processing requests on an established stream. +func (w *worker) process(ctx context.Context, c Frontend_ProcessClient) error { for { request, err := c.Recv() if err != nil { @@ -146,3 +198,18 @@ func (w *Worker) loop(ctx context.Context, c Frontend_ProcessClient) error { } } } + +func connect(address string) (FrontendClient, error) { + conn, err := grpc.Dial( + address, + grpc.WithInsecure(), + grpc.WithUnaryInterceptor(grpc_middleware.ChainUnaryClient( + otgrpc.OpenTracingClientInterceptor(opentracing.GlobalTracer()), + middleware.ClientUserHeaderInterceptor, + )), + ) + if err != nil { + return nil, err + } + return NewFrontendClient(conn), nil +} From d4e032c29d62954d1cca84280ccf87d28732e5f8 Mon Sep 17 00:00:00 2001 From: Tom Wilkie Date: Tue, 31 Jul 2018 11:41:42 +0100 Subject: [PATCH 04/10] Add test, more logging, better cancellation. Signed-off-by: Tom Wilkie --- pkg/querier/frontend/frontend.go | 43 ++++++---- pkg/querier/frontend/frontend_test.go | 110 ++++++++++++++++++++++++++ pkg/querier/frontend/worker.go | 25 ++++-- 3 files changed, 156 insertions(+), 22 deletions(-) create mode 100644 pkg/querier/frontend/frontend_test.go diff --git a/pkg/querier/frontend/frontend.go b/pkg/querier/frontend/frontend.go index cc2acc8c57..d619f2294f 100644 --- a/pkg/querier/frontend/frontend.go +++ b/pkg/querier/frontend/frontend.go @@ -1,6 +1,7 @@ package frontend import ( + "context" "flag" "math/rand" "net/http" @@ -139,16 +140,18 @@ func (f *Frontend) serveHTTP(w http.ResponseWriter, r *http.Request) error { case resp = <-request.response: case lastErr = <-request.err: - level.Error(f.log).Log("msg", "error processing request", "try", tries, "err", lastErr) resp, _ = httpgrpc.HTTPResponseFromError(lastErr) } - // Only fail is we get a valid HTTP non-500; otherwise retry. - if resp != nil && resp.Code/100 != 5 { - retries.Observe(float64(tries)) - server.WriteResponse(w, resp) - return nil + // Only retry is we get a HTTP 500 or non-HTTP error. + if resp == nil || resp.Code/100 == 5 { + level.Error(f.log).Log("msg", "error processing request", "try", tries, "err", lastErr, "resp", resp) + continue } + + retries.Observe(float64(tries)) + server.WriteResponse(w, resp) + return nil } return lastErr @@ -156,11 +159,19 @@ func (f *Frontend) serveHTTP(w http.ResponseWriter, r *http.Request) error { // Process allows backends to pull requests from the frontend. func (f *Frontend) Process(server Frontend_ProcessServer) error { + + // If this request is canceled, ping the condition to unblock. This is done + // once, here (instead of in getNextRequest) as we expect calls to Process to + // process many requests. + go func() { + <-server.Context().Done() + f.cond.Broadcast() + }() + for { - request := f.getNextRequest() - if request == nil { - // Occurs when server is shutting down. - return nil + request, err := f.getNextRequest(server.Context()) + if err != nil { + return err } if err := server.Send(&ProcessRequest{ @@ -208,16 +219,20 @@ func (f *Frontend) queueRequest(userID string, req *request) error { // getQueue picks a random queue and takes the next request off of it, so we // faily process users queries. Will block if there are no requests. -func (f *Frontend) getNextRequest() *request { +func (f *Frontend) getNextRequest(ctx context.Context) (*request, error) { f.mtx.Lock() defer f.mtx.Unlock() - for len(f.queues) == 0 && !f.closed { + for len(f.queues) == 0 && !f.closed && ctx.Err() == nil { f.cond.Wait() } + if err := ctx.Err(); err != nil { + return nil, err + } + if f.closed { - return nil + return nil, errServerClosing } i, n := 0, rand.Intn(len(f.queues)) @@ -234,7 +249,7 @@ func (f *Frontend) getNextRequest() *request { queueDutation.Observe(time.Now().Sub(request.enqueueTime).Seconds()) queueLength.Add(-1) - return request + return request, nil } panic("should never happen") diff --git a/pkg/querier/frontend/frontend_test.go b/pkg/querier/frontend/frontend_test.go new file mode 100644 index 0000000000..01971f79b5 --- /dev/null +++ b/pkg/querier/frontend/frontend_test.go @@ -0,0 +1,110 @@ +package frontend + +import ( + "context" + "fmt" + "io/ioutil" + "net" + "net/http" + "sync/atomic" + "testing" + + "github.com/go-kit/kit/log" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + httpgrpc_server "github.com/weaveworks/common/httpgrpc/server" + "github.com/weaveworks/common/middleware" + "google.golang.org/grpc" + + "github.com/weaveworks/common/user" + "github.com/weaveworks/cortex/pkg/util" +) + +func TestFrontend(t *testing.T) { + handler := http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + w.Write([]byte("Hello World")) + }) + test := func(addr string) { + req, err := http.NewRequest("GET", fmt.Sprintf("http://%s/", addr), nil) + require.NoError(t, err) + err = user.InjectOrgIDIntoHTTPRequest(user.InjectOrgID(context.Background(), "1"), req) + require.NoError(t, err) + + resp, err := http.DefaultClient.Do(req) + require.NoError(t, err) + require.Equal(t, 200, resp.StatusCode) + + body, err := ioutil.ReadAll(resp.Body) + require.NoError(t, err) + + assert.Equal(t, "Hello World", string(body)) + } + testFrontend(t, handler, test) +} + +func TestFrontendRetries(t *testing.T) { + try := int32(0) + handler := http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + if atomic.AddInt32(&try, 1) == 5 { + w.Write([]byte("Hello World")) + return + } + + w.WriteHeader(http.StatusInternalServerError) + }) + test := func(addr string) { + req, err := http.NewRequest("GET", fmt.Sprintf("http://%s/", addr), nil) + require.NoError(t, err) + err = user.InjectOrgIDIntoHTTPRequest(user.InjectOrgID(context.Background(), "1"), req) + require.NoError(t, err) + + resp, err := http.DefaultClient.Do(req) + require.NoError(t, err) + require.Equal(t, 200, resp.StatusCode) + + body, err := ioutil.ReadAll(resp.Body) + require.NoError(t, err) + + assert.Equal(t, "Hello World", string(body)) + } + testFrontend(t, handler, test) +} + +func testFrontend(t *testing.T, handler http.Handler, test func(addr string)) { + logger := log.NewNopLogger() //log.NewLogfmtLogger(log.NewSyncWriter(os.Stderr)) + + var ( + config Config + workerConfig WorkerConfig + ) + util.DefaultValues(&config, &workerConfig) + + grpcListen, err := net.Listen("tcp", "") + require.NoError(t, err) + workerConfig.Address = grpcListen.Addr().String() + + httpListen, err := net.Listen("tcp", "") + require.NoError(t, err) + + frontend, err := New(config, logger) + require.NoError(t, err) + + grpcServer := grpc.NewServer() + defer grpcServer.GracefulStop() + + RegisterFrontendServer(grpcServer, frontend) + + httpServer := http.Server{ + Handler: middleware.AuthenticateUser.Wrap(frontend), + } + defer httpServer.Shutdown(context.Background()) + + go httpServer.Serve(httpListen) + go grpcServer.Serve(grpcListen) + + worker, err := NewWorker(workerConfig, httpgrpc_server.NewServer(handler), logger) + require.NoError(t, err) + defer worker.Stop() + + test(httpListen.Addr().String()) +} diff --git a/pkg/querier/frontend/worker.go b/pkg/querier/frontend/worker.go index c6cf72ab1e..03385aaa1d 100644 --- a/pkg/querier/frontend/worker.go +++ b/pkg/querier/frontend/worker.go @@ -12,12 +12,12 @@ import ( "github.com/grpc-ecosystem/grpc-opentracing/go/otgrpc" "github.com/mwitkow/go-grpc-middleware" "github.com/opentracing/opentracing-go" - "github.com/weaveworks/common/httpgrpc" - "github.com/weaveworks/common/httpgrpc/server" - "github.com/weaveworks/common/middleware" "google.golang.org/grpc" "google.golang.org/grpc/naming" + "github.com/weaveworks/common/httpgrpc" + "github.com/weaveworks/common/httpgrpc/server" + "github.com/weaveworks/common/middleware" "github.com/weaveworks/cortex/pkg/util" ) @@ -38,7 +38,7 @@ type WorkerConfig struct { // RegisterFlags adds the flags required to config this to the given FlagSet. func (cfg *WorkerConfig) RegisterFlags(f *flag.FlagSet) { f.StringVar(&cfg.Address, "querier.frontend-address", "", "Address of query frontend service.") - f.IntVar(&cfg.Parallelism, "querier.worker-parallelism", 1, "Number of simultaneous queries to process.") + f.IntVar(&cfg.Parallelism, "querier.worker-parallelism", 10, "Number of simultaneous queries to process.") f.DurationVar(&cfg.DNSLookupDuration, "querier.dns-lookup-period", 10*time.Second, "How often to query DNS.") } @@ -66,6 +66,7 @@ func (noopWorker) Stop() {} // NewWorker creates a new Worker. func NewWorker(cfg WorkerConfig, server *server.Server, log log.Logger) (Worker, error) { if cfg.Address == "" { + level.Info(log).Log("msg", "no address specified, not starting worker") return noopWorker{}, nil } @@ -113,15 +114,23 @@ func (w *worker) watchDNSLoop() { } }() - for updates, err := w.watcher.Next(); err != nil; { + for { + updates, err := w.watcher.Next() + if err != nil { + level.Error(w.log).Log("msg", "error from DNS watcher", "err", err) + return + } + for _, update := range updates { switch update.Op { case naming.Add: + level.Debug(w.log).Log("msg", "adding connection", "addr", update.Addr) ctx, cancel := context.WithCancel(w.ctx) cancels[update.Addr] = cancel w.runMany(ctx, update.Addr) case naming.Delete: + level.Debug(w.log).Log("msg", "removing connection", "addr", update.Addr) if cancel, ok := cancels[update.Addr]; ok { cancel() } @@ -134,17 +143,17 @@ func (w *worker) watchDNSLoop() { } // runMany starts N runOne loops for a given address. -func (w *worker) runMany(ctx context.Context, address string) error { +func (w *worker) runMany(ctx context.Context, address string) { client, err := connect(address) if err != nil { - return err + level.Error(w.log).Log("msg", "error connecting", "addr", address, "err", err) + return } w.wg.Add(w.cfg.Parallelism) for i := 0; i < w.cfg.Parallelism; i++ { go w.runOne(ctx, client) } - return nil } // runOne loops, trying to establish a stream to the frontend to begin From a1a739961d5d0b68bd4004f7782f7d9de4ff5df0 Mon Sep 17 00:00:00 2001 From: Tom Wilkie Date: Tue, 31 Jul 2018 12:00:12 +0100 Subject: [PATCH 05/10] Frontend should wait for queue to be drained before exiting. Signed-off-by: Tom Wilkie --- pkg/querier/frontend/frontend.go | 27 +++++++-------------------- 1 file changed, 7 insertions(+), 20 deletions(-) diff --git a/pkg/querier/frontend/frontend.go b/pkg/querier/frontend/frontend.go index d619f2294f..b4ecf3cf6a 100644 --- a/pkg/querier/frontend/frontend.go +++ b/pkg/querier/frontend/frontend.go @@ -61,7 +61,6 @@ type Frontend struct { mtx sync.Mutex cond *sync.Cond - closed bool queues map[string]chan *request } @@ -87,15 +86,8 @@ func New(cfg Config, log log.Logger) (*Frontend, error) { func (f *Frontend) Close() { f.mtx.Lock() defer f.mtx.Unlock() - - f.closed = true - f.cond.Broadcast() - - for _, queue := range f.queues { - close(queue) - for request := range queue { - request.err <- errServerClosing - } + for len(f.queues) > 0 { + f.cond.Wait() } } @@ -197,10 +189,6 @@ func (f *Frontend) queueRequest(userID string, req *request) error { f.mtx.Lock() defer f.mtx.Unlock() - if f.closed { - return errServerClosing - } - queue, ok := f.queues[userID] if !ok { queue = make(chan *request, f.cfg.MaxOutstandingPerTenant) @@ -210,7 +198,7 @@ func (f *Frontend) queueRequest(userID string, req *request) error { select { case queue <- req: queueLength.Add(1) - f.cond.Signal() + f.cond.Broadcast() return nil default: return errTooManyRequest @@ -223,7 +211,7 @@ func (f *Frontend) getNextRequest(ctx context.Context) (*request, error) { f.mtx.Lock() defer f.mtx.Unlock() - for len(f.queues) == 0 && !f.closed && ctx.Err() == nil { + for len(f.queues) == 0 && ctx.Err() == nil { f.cond.Wait() } @@ -231,10 +219,6 @@ func (f *Frontend) getNextRequest(ctx context.Context) (*request, error) { return nil, err } - if f.closed { - return nil, errServerClosing - } - i, n := 0, rand.Intn(len(f.queues)) for userID, queue := range f.queues { if i < n { @@ -247,6 +231,9 @@ func (f *Frontend) getNextRequest(ctx context.Context) (*request, error) { delete(f.queues, userID) } + // Tell close() we've processed a request. + f.cond.Broadcast() + queueDutation.Observe(time.Now().Sub(request.enqueueTime).Seconds()) queueLength.Add(-1) return request, nil From e67ecd6a9d52bc1829006373243349d339f59cd6 Mon Sep 17 00:00:00 2001 From: Tom Wilkie Date: Tue, 31 Jul 2018 21:07:25 +0100 Subject: [PATCH 06/10] Document some flags. Signed-off-by: Tom Wilkie --- pkg/querier/frontend/frontend.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/pkg/querier/frontend/frontend.go b/pkg/querier/frontend/frontend.go index b4ecf3cf6a..49da4768df 100644 --- a/pkg/querier/frontend/frontend.go +++ b/pkg/querier/frontend/frontend.go @@ -49,8 +49,8 @@ type Config struct { // RegisterFlags adds the flags required to config this to the given FlagSet. func (cfg *Config) RegisterFlags(f *flag.FlagSet) { - f.IntVar(&cfg.MaxOutstandingPerTenant, "querier.max-outstanding-requests-per-tenant", 100, "") - f.IntVar(&cfg.MaxRetries, "querier.max-retries-per-request", 5, "") + f.IntVar(&cfg.MaxOutstandingPerTenant, "querier.max-outstanding-requests-per-tenant", 100, "Maximum number of outstanding requests per tenant per frontend; requests beyond this error with HTTP 429.") + f.IntVar(&cfg.MaxRetries, "querier.max-retries-per-request", 5, "Maximum number of retries for a single request; beyon this, the downstream error is returned.") } // Frontend queues HTTP requests, dispatches them to backends, and handles retries From 5123bc8650ef97c0e41bd16fca17e986aba37efb Mon Sep 17 00:00:00 2001 From: Tom Wilkie Date: Mon, 13 Aug 2018 22:10:42 +0100 Subject: [PATCH 07/10] Spell dutation correctly. Signed-off-by: Tom Wilkie --- pkg/querier/frontend/frontend.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/pkg/querier/frontend/frontend.go b/pkg/querier/frontend/frontend.go index 49da4768df..ed0b96a2d0 100644 --- a/pkg/querier/frontend/frontend.go +++ b/pkg/querier/frontend/frontend.go @@ -18,7 +18,7 @@ import ( ) var ( - queueDutation = promauto.NewHistogram(prometheus.HistogramOpts{ + queueDuration = promauto.NewHistogram(prometheus.HistogramOpts{ Namespace: "cortex", Name: "query_frontend_queue_duration_seconds", Help: "Time spend by requests queued.", @@ -234,7 +234,7 @@ func (f *Frontend) getNextRequest(ctx context.Context) (*request, error) { // Tell close() we've processed a request. f.cond.Broadcast() - queueDutation.Observe(time.Now().Sub(request.enqueueTime).Seconds()) + queueDuration.Observe(time.Now().Sub(request.enqueueTime).Seconds()) queueLength.Add(-1) return request, nil } From 0bd19b6036dfe54b730549cb73a37cb7460b3cfa Mon Sep 17 00:00:00 2001 From: Tom Wilkie Date: Mon, 13 Aug 2018 22:17:17 +0100 Subject: [PATCH 08/10] Typo. Signed-off-by: Tom Wilkie --- pkg/querier/frontend/frontend.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pkg/querier/frontend/frontend.go b/pkg/querier/frontend/frontend.go index ed0b96a2d0..c7332e9d83 100644 --- a/pkg/querier/frontend/frontend.go +++ b/pkg/querier/frontend/frontend.go @@ -206,7 +206,7 @@ func (f *Frontend) queueRequest(userID string, req *request) error { } // getQueue picks a random queue and takes the next request off of it, so we -// faily process users queries. Will block if there are no requests. +// fairly process users queries. Will block if there are no requests. func (f *Frontend) getNextRequest(ctx context.Context) (*request, error) { f.mtx.Lock() defer f.mtx.Unlock() From 47abed431438d10d39e767b17cd68949fe3b29c6 Mon Sep 17 00:00:00 2001 From: Tom Wilkie Date: Thu, 16 Aug 2018 13:41:39 +0100 Subject: [PATCH 09/10] Add some basic documentation for the query frontend service. Signed-off-by: Tom Wilkie --- docs/architecture.md | 28 ++++++++++++++++++++++++++++ 1 file changed, 28 insertions(+) create mode 100644 docs/architecture.md diff --git a/docs/architecture.md b/docs/architecture.md new file mode 100644 index 0000000000..c790d2224c --- /dev/null +++ b/docs/architecture.md @@ -0,0 +1,28 @@ +# Cortex Architecture + +*NB this document is a work-in-progress.* + +The Cortex architecture consists of multiple, horizontally scalable microservices. Each microservice uses the most appropriate technique for horizontal scaling; most are stateless and can handle requests for any users, and some (the ingesters) are semi-stateful and depend on consistent hashing. + +For more details on the Cortex architecture, you should read / watch: +- The original design doc "[Project Frankenstein: A multi tenant, scale out Prometheus](https://docs.google.com/document/d/1C7yhMnb1x2sfeoe45f4mnnKConvroWhJ8KQZwIHJOuw/edit#heading=h.nimsq29kl184)" +- PromCon 2016 Talk: "[Multitenant, Scale-Out Prometheus](https://promcon.io/2016-berlin/talks/multitenant-scale-out-prometheus/)" +- KubeCon Prometheus Day talk "Weave Cortex: Multi-tenant, horizontally scalable Prometheus as a Service" [slides](http://www.slideshare.net/weaveworks/weave-cortex-multitenant-horizontally-scalable-prometheus-as-a-service) [video](https://www.youtube.com/watch?v=9Uctgnazfwk) +- PromCon 2017 Talk: "[Cortex: Prometheus as a Service, One Year On](https://promcon.io/2017-munich/talks/cortex-prometheus-as-a-service-one-year-on/)" +- CNCF TOC Presentation; "Horizontally Scalable, Multi-tenant Prometheus" [slides](https://docs.google.com/presentation/d/190oIFgujktVYxWZLhLYN4q8p9dtQYoe4sxHgn4deBSI/edit#slide=id.g3b8e2d6f7e_0_6) + +## Query Path + +### Query Frontend + +The query frontend is an optional job which accepts HTTP requests and queues them by tenant ID, retrying them on errors. This allow for the occasional large query which would otherwise cause a querier OOM, allowing us to over-provision querier parallelism. Also, it prevents multiple large from being convoyed on a single querier by distributing them FIFO across all queriers. And finally, it prevent a single tenant from DoSing other tenants by fairly scheduling queries between tenants. + +The query frontend job accepts gRPC streaming requests from the queriers, which then "pull" requests from the frontend. For HA it is recommended you run multiple frontends - the queriers will connect to (and pull requests from) all of them. To get the benefit of the fair scheduling, it is recommended you run fewer frontends than queriers - two should suffice. + +See the document "[Cortex Query Woes](https://docs.google.com/document/d/1lsvSkv0tiAMPQv-V8vI2LZ8f4i9JuTRsuPI_i-XcAqY)" for more details design discussion. In the future, query splitting, query alignment and query results caching will be added to the frontend. + +The query frontend is completely optional - you can continue to use the queriers directly. If you want to use the query frontend, direct incoming authenticated traffic at them and set the `-querier.frontend-address` flag on the queriers. + +### Queriers + +The queriers handled the actual PromQL evaluation. They embed the chunk store client code for fetching data from long-term storage, and communicate with the ingesters for more recent data. From 1247fe0855c91fe4f9d2436b582dd79f035000cf Mon Sep 17 00:00:00 2001 From: Tom Wilkie Date: Thu, 16 Aug 2018 16:45:28 +0100 Subject: [PATCH 10/10] Review feedback. Signed-off-by: Tom Wilkie --- docs/architecture.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/docs/architecture.md b/docs/architecture.md index c790d2224c..e08669ca6d 100644 --- a/docs/architecture.md +++ b/docs/architecture.md @@ -15,7 +15,7 @@ For more details on the Cortex architecture, you should read / watch: ### Query Frontend -The query frontend is an optional job which accepts HTTP requests and queues them by tenant ID, retrying them on errors. This allow for the occasional large query which would otherwise cause a querier OOM, allowing us to over-provision querier parallelism. Also, it prevents multiple large from being convoyed on a single querier by distributing them FIFO across all queriers. And finally, it prevent a single tenant from DoSing other tenants by fairly scheduling queries between tenants. +The query frontend is an optional job which accepts HTTP requests and queues them by tenant ID, retrying them on errors. This allow for the occasional large query which would otherwise cause a querier OOM, allowing us to over-provision querier parallelism. Also, it prevents multiple large requests from being convoyed on a single querier by distributing them FIFO across all queriers. And finally, it prevent a single tenant from DoSing other tenants by fairly scheduling queries between tenants. The query frontend job accepts gRPC streaming requests from the queriers, which then "pull" requests from the frontend. For HA it is recommended you run multiple frontends - the queriers will connect to (and pull requests from) all of them. To get the benefit of the fair scheduling, it is recommended you run fewer frontends than queriers - two should suffice.