diff --git a/CHANGELOG.md b/CHANGELOG.md index 73c55eb6765..7f545a1f84f 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -91,6 +91,7 @@ * [ENHANCEMENT] Querier: add experimental per-query chunks limit based on an estimate of the number of chunks that will be sent from ingesters and store-gateways that is enforced earlier during query evaluation. This limit is disabled by default and can be configured with `-querier.max-estimated-fetched-chunks-per-query-multiplier`. #5765 * [ENHANCEMENT] Ingester: add UI for listing tenants with TSDB on given ingester and viewing details of tenants's TSDB on given ingester. #5803 * [ENHANCEMENT] Querier: improve observability of calls to store-gateways during queries. #5809 +* [ENHANCEMENT] Query-frontend: improve tracing of interactions with query-scheduler. #5818 * [BUGFIX] Ingester: Handle when previous ring state is leaving and the number of tokens has changed. #5204 * [BUGFIX] Querier: fix issue where queries that use the `timestamp()` function fail with `execution: attempted to read series at index 0 from stream, but the stream has already been exhausted` if streaming chunks from ingesters to queriers is enabled. #5370 * [BUGFIX] memberlist: bring back `memberlist_client_kv_store_count` metric that used to exist in Cortex, but got lost during dskit updates before Mimir 2.0. #5377 diff --git a/pkg/frontend/v2/frontend.go b/pkg/frontend/v2/frontend.go index 7f10726ca9b..c8c067f4d6c 100644 --- a/pkg/frontend/v2/frontend.go +++ b/pkg/frontend/v2/frontend.go @@ -32,6 +32,7 @@ import ( "github.com/grafana/mimir/pkg/querier/stats" "github.com/grafana/mimir/pkg/scheduler/schedulerdiscovery" "github.com/grafana/mimir/pkg/util/httpgrpcutil" + "github.com/grafana/mimir/pkg/util/spanlogger" ) // Config for a Frontend. @@ -97,6 +98,7 @@ type frontendRequest struct { userID string statsEnabled bool + ctx context.Context cancel context.CancelFunc enqueue chan enqueueResult @@ -199,6 +201,7 @@ func (f *Frontend) RoundTripGRPC(ctx context.Context, req *httpgrpc.HTTPRequest) } } + spanLogger := spanlogger.FromContext(ctx, f.log) ctx, cancel := context.WithCancel(ctx) defer cancel() @@ -208,6 +211,7 @@ func (f *Frontend) RoundTripGRPC(ctx context.Context, req *httpgrpc.HTTPRequest) userID: userID, statsEnabled: stats.IsEnabled(ctx), + ctx: ctx, cancel: cancel, // Buffer of 1 to ensure response or error can be written to the channel @@ -222,9 +226,12 @@ func (f *Frontend) RoundTripGRPC(ctx context.Context, req *httpgrpc.HTTPRequest) retries := f.cfg.WorkerConcurrency + 1 // To make sure we hit at least two different schedulers. enqueueAgain: + level.Debug(spanLogger).Log("msg", "enqueuing request") + var cancelCh chan<- uint64 select { case <-ctx.Done(): + level.Debug(spanLogger).Log("msg", "request context cancelled while enqueuing request, aborting", "err", ctx.Err()) return nil, ctx.Err() case f.requestsCh <- freq: @@ -236,27 +243,36 @@ enqueueAgain: } else if enqRes.status == failed { retries-- if retries > 0 { + level.Debug(spanLogger).Log("msg", "enqueuing request failed, will retry") goto enqueueAgain } } + level.Debug(spanLogger).Log("msg", "enqueuing request failed, retries are exhausted, aborting") + return nil, httpgrpc.Errorf(http.StatusInternalServerError, "failed to enqueue request") } + level.Debug(spanLogger).Log("msg", "request enqueued successfully, waiting for response") + select { case <-ctx.Done(): + level.Debug(spanLogger).Log("msg", "request context cancelled after enqueuing request, aborting", "err", ctx.Err()) + if cancelCh != nil { select { case cancelCh <- freq.queryID: // cancellation sent. default: // failed to cancel, ignore. - level.Warn(f.log).Log("msg", "failed to send cancellation request to scheduler, queue full") + level.Warn(spanLogger).Log("msg", "failed to send cancellation request to scheduler, queue full") } } return nil, ctx.Err() case resp := <-freq.response: + level.Debug(spanLogger).Log("msg", "received response") + if stats.ShouldTrackHTTPGRPCResponse(resp.HttpResponse) { stats := stats.FromContext(ctx) stats.Merge(resp.Stats) // Safe if stats is nil. diff --git a/pkg/frontend/v2/frontend_scheduler_worker.go b/pkg/frontend/v2/frontend_scheduler_worker.go index 932f1be6ab1..410141d412b 100644 --- a/pkg/frontend/v2/frontend_scheduler_worker.go +++ b/pkg/frontend/v2/frontend_scheduler_worker.go @@ -25,6 +25,7 @@ import ( "github.com/grafana/mimir/pkg/scheduler/schedulerdiscovery" "github.com/grafana/mimir/pkg/scheduler/schedulerpb" "github.com/grafana/mimir/pkg/util/servicediscovery" + "github.com/grafana/mimir/pkg/util/spanlogger" ) const ( @@ -332,60 +333,10 @@ func (w *frontendSchedulerWorker) schedulerLoop(loop schedulerpb.SchedulerForFro return nil case req := <-w.requestsCh: - err := loop.Send(&schedulerpb.FrontendToScheduler{ - Type: schedulerpb.ENQUEUE, - QueryID: req.queryID, - UserID: req.userID, - HttpRequest: req.request, - FrontendAddress: w.frontendAddr, - StatsEnabled: req.statsEnabled, - }) - w.enqueuedRequests.Inc() - - if err != nil { - req.enqueue <- enqueueResult{status: failed} + if err := w.enqueueRequest(loop, req); err != nil { return err } - resp, err := loop.Recv() - if err != nil { - req.enqueue <- enqueueResult{status: failed} - return err - } - - switch resp.Status { - case schedulerpb.OK: - req.enqueue <- enqueueResult{status: waitForResponse, cancelCh: w.cancelCh} - // Response will come from querier. - - case schedulerpb.SHUTTING_DOWN: - // Scheduler is shutting down, report failure to enqueue and stop this loop. - req.enqueue <- enqueueResult{status: failed} - return errors.New("scheduler is shutting down") - - case schedulerpb.ERROR: - req.enqueue <- enqueueResult{status: waitForResponse} - req.response <- &frontendv2pb.QueryResultRequest{ - HttpResponse: &httpgrpc.HTTPResponse{ - Code: http.StatusInternalServerError, - Body: []byte(err.Error()), - }, - } - - case schedulerpb.TOO_MANY_REQUESTS_PER_TENANT: - req.enqueue <- enqueueResult{status: waitForResponse} - req.response <- &frontendv2pb.QueryResultRequest{ - HttpResponse: &httpgrpc.HTTPResponse{ - Code: http.StatusTooManyRequests, - Body: []byte("too many outstanding requests"), - }, - } - - default: - level.Error(w.log).Log("msg", "unknown response status from the scheduler", "resp", resp, "queryID", req.queryID) - req.enqueue <- enqueueResult{status: failed} - } - case reqID := <-w.cancelCh: err := loop.Send(&schedulerpb.FrontendToScheduler{ Type: schedulerpb.CANCEL, @@ -408,3 +359,71 @@ func (w *frontendSchedulerWorker) schedulerLoop(loop schedulerpb.SchedulerForFro } } } + +// enqueueRequest sends a request to this worker's scheduler, and returns an error if no further requests should be sent to the scheduler. +func (w *frontendSchedulerWorker) enqueueRequest(loop schedulerpb.SchedulerForFrontend_FrontendLoopClient, req *frontendRequest) error { + spanLogger, _ := spanlogger.NewWithLogger(req.ctx, w.log, "frontendSchedulerWorker.enqueueRequest") + spanLogger.Span.SetTag("scheduler_address", w.conn.Target()) + defer spanLogger.Span.Finish() + + err := loop.Send(&schedulerpb.FrontendToScheduler{ + Type: schedulerpb.ENQUEUE, + QueryID: req.queryID, + UserID: req.userID, + HttpRequest: req.request, + FrontendAddress: w.frontendAddr, + StatsEnabled: req.statsEnabled, + }) + w.enqueuedRequests.Inc() + + if err != nil { + level.Warn(spanLogger).Log("msg", "received error while sending request to scheduler", "err", err) + req.enqueue <- enqueueResult{status: failed} + return err + } + + resp, err := loop.Recv() + if err != nil { + level.Warn(spanLogger).Log("msg", "received error while receiving response", "err", err) + req.enqueue <- enqueueResult{status: failed} + return err + } + + switch resp.Status { + case schedulerpb.OK: + req.enqueue <- enqueueResult{status: waitForResponse, cancelCh: w.cancelCh} + // Response will come from querier. + + case schedulerpb.SHUTTING_DOWN: + // Scheduler is shutting down, report failure to enqueue and stop this loop. + level.Warn(spanLogger).Log("msg", "scheduler reported that it is shutting down") + req.enqueue <- enqueueResult{status: failed} + return errors.New("scheduler is shutting down") + + case schedulerpb.ERROR: + level.Warn(spanLogger).Log("msg", "scheduler returned error", "err", resp.Error) + req.enqueue <- enqueueResult{status: waitForResponse} + req.response <- &frontendv2pb.QueryResultRequest{ + HttpResponse: &httpgrpc.HTTPResponse{ + Code: http.StatusInternalServerError, + Body: []byte(resp.Error), + }, + } + + case schedulerpb.TOO_MANY_REQUESTS_PER_TENANT: + level.Warn(spanLogger).Log("msg", "scheduler reported it has too many outstanding requests") + req.enqueue <- enqueueResult{status: waitForResponse} + req.response <- &frontendv2pb.QueryResultRequest{ + HttpResponse: &httpgrpc.HTTPResponse{ + Code: http.StatusTooManyRequests, + Body: []byte("too many outstanding requests"), + }, + } + + default: + level.Error(spanLogger).Log("msg", "unknown response status from the scheduler", "resp", resp, "queryID", req.queryID) + req.enqueue <- enqueueResult{status: failed} + } + + return nil +}