From 0b1598a05ae0e1940cd786108f454ee7e8d03c70 Mon Sep 17 00:00:00 2001 From: Charles Korn Date: Wed, 23 Aug 2023 15:28:37 +1000 Subject: [PATCH 1/5] Add events to help diagnose queuing request mystery. --- pkg/frontend/v2/frontend.go | 16 +++++++++++++++- 1 file changed, 15 insertions(+), 1 deletion(-) diff --git a/pkg/frontend/v2/frontend.go b/pkg/frontend/v2/frontend.go index 7f10726ca9b..759148d483c 100644 --- a/pkg/frontend/v2/frontend.go +++ b/pkg/frontend/v2/frontend.go @@ -32,6 +32,7 @@ import ( "github.com/grafana/mimir/pkg/querier/stats" "github.com/grafana/mimir/pkg/scheduler/schedulerdiscovery" "github.com/grafana/mimir/pkg/util/httpgrpcutil" + "github.com/grafana/mimir/pkg/util/spanlogger" ) // Config for a Frontend. @@ -199,6 +200,7 @@ func (f *Frontend) RoundTripGRPC(ctx context.Context, req *httpgrpc.HTTPRequest) } } + spanLogger := spanlogger.FromContext(ctx, f.log) ctx, cancel := context.WithCancel(ctx) defer cancel() @@ -222,9 +224,12 @@ func (f *Frontend) RoundTripGRPC(ctx context.Context, req *httpgrpc.HTTPRequest) retries := f.cfg.WorkerConcurrency + 1 // To make sure we hit at least two different schedulers. enqueueAgain: + level.Debug(spanLogger).Log("msg", "enqueuing request") + var cancelCh chan<- uint64 select { case <-ctx.Done(): + level.Debug(spanLogger).Log("msg", "request context cancelled while enqueuing request, aborting", "err", ctx.Err()) return nil, ctx.Err() case f.requestsCh <- freq: @@ -236,27 +241,36 @@ enqueueAgain: } else if enqRes.status == failed { retries-- if retries > 0 { + level.Debug(spanLogger).Log("msg", "enqueuing request failed, will retry") goto enqueueAgain } } + level.Debug(spanLogger).Log("msg", "enqueuing request failed, retries are exhausted, aborting") + return nil, httpgrpc.Errorf(http.StatusInternalServerError, "failed to enqueue request") } + level.Debug(spanLogger).Log("msg", "request enqueued successfully, waiting for response") + select { case <-ctx.Done(): + level.Debug(spanLogger).Log("msg", "request context cancelled after enqueuing request, aborting", "err", ctx.Err()) + if cancelCh != nil { select { case cancelCh <- freq.queryID: // cancellation sent. default: // failed to cancel, ignore. - level.Warn(f.log).Log("msg", "failed to send cancellation request to scheduler, queue full") + level.Warn(spanLogger).Log("msg", "failed to send cancellation request to scheduler, queue full") } } return nil, ctx.Err() case resp := <-freq.response: + level.Debug(spanLogger).Log("msg", "received response") + if stats.ShouldTrackHTTPGRPCResponse(resp.HttpResponse) { stats := stats.FromContext(ctx) stats.Merge(resp.Stats) // Safe if stats is nil. From 74dea1f22a630a34dc3f052e208bd9fcc02b113c Mon Sep 17 00:00:00 2001 From: Charles Korn Date: Wed, 23 Aug 2023 15:34:20 +1000 Subject: [PATCH 2/5] Extract method. --- pkg/frontend/v2/frontend_scheduler_worker.go | 111 ++++++++++--------- 1 file changed, 60 insertions(+), 51 deletions(-) diff --git a/pkg/frontend/v2/frontend_scheduler_worker.go b/pkg/frontend/v2/frontend_scheduler_worker.go index 932f1be6ab1..f322fc9158e 100644 --- a/pkg/frontend/v2/frontend_scheduler_worker.go +++ b/pkg/frontend/v2/frontend_scheduler_worker.go @@ -332,60 +332,10 @@ func (w *frontendSchedulerWorker) schedulerLoop(loop schedulerpb.SchedulerForFro return nil case req := <-w.requestsCh: - err := loop.Send(&schedulerpb.FrontendToScheduler{ - Type: schedulerpb.ENQUEUE, - QueryID: req.queryID, - UserID: req.userID, - HttpRequest: req.request, - FrontendAddress: w.frontendAddr, - StatsEnabled: req.statsEnabled, - }) - w.enqueuedRequests.Inc() - - if err != nil { - req.enqueue <- enqueueResult{status: failed} + if err := w.enqueueRequest(loop, req); err != nil { return err } - resp, err := loop.Recv() - if err != nil { - req.enqueue <- enqueueResult{status: failed} - return err - } - - switch resp.Status { - case schedulerpb.OK: - req.enqueue <- enqueueResult{status: waitForResponse, cancelCh: w.cancelCh} - // Response will come from querier. - - case schedulerpb.SHUTTING_DOWN: - // Scheduler is shutting down, report failure to enqueue and stop this loop. - req.enqueue <- enqueueResult{status: failed} - return errors.New("scheduler is shutting down") - - case schedulerpb.ERROR: - req.enqueue <- enqueueResult{status: waitForResponse} - req.response <- &frontendv2pb.QueryResultRequest{ - HttpResponse: &httpgrpc.HTTPResponse{ - Code: http.StatusInternalServerError, - Body: []byte(err.Error()), - }, - } - - case schedulerpb.TOO_MANY_REQUESTS_PER_TENANT: - req.enqueue <- enqueueResult{status: waitForResponse} - req.response <- &frontendv2pb.QueryResultRequest{ - HttpResponse: &httpgrpc.HTTPResponse{ - Code: http.StatusTooManyRequests, - Body: []byte("too many outstanding requests"), - }, - } - - default: - level.Error(w.log).Log("msg", "unknown response status from the scheduler", "resp", resp, "queryID", req.queryID) - req.enqueue <- enqueueResult{status: failed} - } - case reqID := <-w.cancelCh: err := loop.Send(&schedulerpb.FrontendToScheduler{ Type: schedulerpb.CANCEL, @@ -408,3 +358,62 @@ func (w *frontendSchedulerWorker) schedulerLoop(loop schedulerpb.SchedulerForFro } } } + +// enqueueRequest sends a request to this worker's scheduler, and returns an error if no further requests should be sent to the scheduler. +func (w *frontendSchedulerWorker) enqueueRequest(loop schedulerpb.SchedulerForFrontend_FrontendLoopClient, req *frontendRequest) error { + err := loop.Send(&schedulerpb.FrontendToScheduler{ + Type: schedulerpb.ENQUEUE, + QueryID: req.queryID, + UserID: req.userID, + HttpRequest: req.request, + FrontendAddress: w.frontendAddr, + StatsEnabled: req.statsEnabled, + }) + w.enqueuedRequests.Inc() + + if err != nil { + req.enqueue <- enqueueResult{status: failed} + return err + } + + resp, err := loop.Recv() + if err != nil { + req.enqueue <- enqueueResult{status: failed} + return err + } + + switch resp.Status { + case schedulerpb.OK: + req.enqueue <- enqueueResult{status: waitForResponse, cancelCh: w.cancelCh} + // Response will come from querier. + + case schedulerpb.SHUTTING_DOWN: + // Scheduler is shutting down, report failure to enqueue and stop this loop. + req.enqueue <- enqueueResult{status: failed} + return errors.New("scheduler is shutting down") + + case schedulerpb.ERROR: + req.enqueue <- enqueueResult{status: waitForResponse} + req.response <- &frontendv2pb.QueryResultRequest{ + HttpResponse: &httpgrpc.HTTPResponse{ + Code: http.StatusInternalServerError, + Body: []byte(err.Error()), + }, + } + + case schedulerpb.TOO_MANY_REQUESTS_PER_TENANT: + req.enqueue <- enqueueResult{status: waitForResponse} + req.response <- &frontendv2pb.QueryResultRequest{ + HttpResponse: &httpgrpc.HTTPResponse{ + Code: http.StatusTooManyRequests, + Body: []byte("too many outstanding requests"), + }, + } + + default: + level.Error(w.log).Log("msg", "unknown response status from the scheduler", "resp", resp, "queryID", req.queryID) + req.enqueue <- enqueueResult{status: failed} + } + + return nil +} From e354aa417970b2577c3d5bb5ca9d12218e16e03f Mon Sep 17 00:00:00 2001 From: Charles Korn Date: Wed, 23 Aug 2023 15:46:03 +1000 Subject: [PATCH 3/5] Create span while enqueuing request in scheduler, and fix incorrect error reported when the scheduler reports an error. --- pkg/frontend/v2/frontend.go | 2 ++ pkg/frontend/v2/frontend_scheduler_worker.go | 14 ++++++++++++-- 2 files changed, 14 insertions(+), 2 deletions(-) diff --git a/pkg/frontend/v2/frontend.go b/pkg/frontend/v2/frontend.go index 759148d483c..c8c067f4d6c 100644 --- a/pkg/frontend/v2/frontend.go +++ b/pkg/frontend/v2/frontend.go @@ -98,6 +98,7 @@ type frontendRequest struct { userID string statsEnabled bool + ctx context.Context cancel context.CancelFunc enqueue chan enqueueResult @@ -210,6 +211,7 @@ func (f *Frontend) RoundTripGRPC(ctx context.Context, req *httpgrpc.HTTPRequest) userID: userID, statsEnabled: stats.IsEnabled(ctx), + ctx: ctx, cancel: cancel, // Buffer of 1 to ensure response or error can be written to the channel diff --git a/pkg/frontend/v2/frontend_scheduler_worker.go b/pkg/frontend/v2/frontend_scheduler_worker.go index f322fc9158e..adddc7ef0e2 100644 --- a/pkg/frontend/v2/frontend_scheduler_worker.go +++ b/pkg/frontend/v2/frontend_scheduler_worker.go @@ -25,6 +25,7 @@ import ( "github.com/grafana/mimir/pkg/scheduler/schedulerdiscovery" "github.com/grafana/mimir/pkg/scheduler/schedulerpb" "github.com/grafana/mimir/pkg/util/servicediscovery" + "github.com/grafana/mimir/pkg/util/spanlogger" ) const ( @@ -361,6 +362,10 @@ func (w *frontendSchedulerWorker) schedulerLoop(loop schedulerpb.SchedulerForFro // enqueueRequest sends a request to this worker's scheduler, and returns an error if no further requests should be sent to the scheduler. func (w *frontendSchedulerWorker) enqueueRequest(loop schedulerpb.SchedulerForFrontend_FrontendLoopClient, req *frontendRequest) error { + spanLogger, _ := spanlogger.NewWithLogger(req.ctx, w.log, "frontendSchedulerWorker.enqueueRequest") + spanLogger.Span.SetTag("scheduler_address", w.conn.Target()) + defer spanLogger.Span.Finish() + err := loop.Send(&schedulerpb.FrontendToScheduler{ Type: schedulerpb.ENQUEUE, QueryID: req.queryID, @@ -372,12 +377,14 @@ func (w *frontendSchedulerWorker) enqueueRequest(loop schedulerpb.SchedulerForFr w.enqueuedRequests.Inc() if err != nil { + level.Warn(spanLogger).Log("msg", "received error while enqueuing request", "err", err) req.enqueue <- enqueueResult{status: failed} return err } resp, err := loop.Recv() if err != nil { + level.Warn(spanLogger).Log("msg", "received error while receiving response", "err", err) req.enqueue <- enqueueResult{status: failed} return err } @@ -389,19 +396,22 @@ func (w *frontendSchedulerWorker) enqueueRequest(loop schedulerpb.SchedulerForFr case schedulerpb.SHUTTING_DOWN: // Scheduler is shutting down, report failure to enqueue and stop this loop. + level.Warn(spanLogger).Log("msg", "scheduler reported that it is shutting down") req.enqueue <- enqueueResult{status: failed} return errors.New("scheduler is shutting down") case schedulerpb.ERROR: + level.Warn(spanLogger).Log("msg", "scheduler returned error", "err", resp.Error) req.enqueue <- enqueueResult{status: waitForResponse} req.response <- &frontendv2pb.QueryResultRequest{ HttpResponse: &httpgrpc.HTTPResponse{ Code: http.StatusInternalServerError, - Body: []byte(err.Error()), + Body: []byte(resp.Error), }, } case schedulerpb.TOO_MANY_REQUESTS_PER_TENANT: + level.Warn(spanLogger).Log("msg", "scheduler reported it has too many outstanding requests") req.enqueue <- enqueueResult{status: waitForResponse} req.response <- &frontendv2pb.QueryResultRequest{ HttpResponse: &httpgrpc.HTTPResponse{ @@ -411,7 +421,7 @@ func (w *frontendSchedulerWorker) enqueueRequest(loop schedulerpb.SchedulerForFr } default: - level.Error(w.log).Log("msg", "unknown response status from the scheduler", "resp", resp, "queryID", req.queryID) + level.Error(spanLogger).Log("msg", "unknown response status from the scheduler", "resp", resp, "queryID", req.queryID) req.enqueue <- enqueueResult{status: failed} } From 04cdb91694d4564ec14dcd5095936a94669c966e Mon Sep 17 00:00:00 2001 From: Charles Korn Date: Wed, 23 Aug 2023 16:05:19 +1000 Subject: [PATCH 4/5] Add changelog entry. --- CHANGELOG.md | 1 + 1 file changed, 1 insertion(+) diff --git a/CHANGELOG.md b/CHANGELOG.md index 018b678c087..3314048c3ae 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -85,6 +85,7 @@ * [ENHANCEMENT] Improve tracing of internal HTTP requests sent over httpgrpc #5782 * [ENHANCEMENT] Querier: add experimental per-query chunks limit based on an estimate of the number of chunks that will be sent from ingesters and store-gateways that is enforced earlier during query evaluation. This limit is disabled by default and can be configured with `-querier.max-estimated-fetched-chunks-per-query-multiplier`. #5765 * [ENHANCEMENT] Ingester: add UI for listing tenants with TSDB on given ingester and viewing details of tenants's TSDB on given ingester. #5803 +* [ENHANCEMENT] Query-frontend: improve tracing of interactions with query-scheduler. #5818 * [BUGFIX] Ingester: Handle when previous ring state is leaving and the number of tokens has changed. #5204 * [BUGFIX] Querier: fix issue where queries that use the `timestamp()` function fail with `execution: attempted to read series at index 0 from stream, but the stream has already been exhausted` if streaming chunks from ingesters to queriers is enabled. #5370 * [BUGFIX] memberlist: bring back `memberlist_client_kv_store_count` metric that used to exist in Cortex, but got lost during dskit updates before Mimir 2.0. #5377 From ca81afdd3995f811e0a6d9634e9cee80acd662eb Mon Sep 17 00:00:00 2001 From: Charles Korn Date: Thu, 24 Aug 2023 11:11:44 +1000 Subject: [PATCH 5/5] Update pkg/frontend/v2/frontend_scheduler_worker.go MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Co-authored-by: Peter Štibraný --- pkg/frontend/v2/frontend_scheduler_worker.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pkg/frontend/v2/frontend_scheduler_worker.go b/pkg/frontend/v2/frontend_scheduler_worker.go index adddc7ef0e2..410141d412b 100644 --- a/pkg/frontend/v2/frontend_scheduler_worker.go +++ b/pkg/frontend/v2/frontend_scheduler_worker.go @@ -377,7 +377,7 @@ func (w *frontendSchedulerWorker) enqueueRequest(loop schedulerpb.SchedulerForFr w.enqueuedRequests.Inc() if err != nil { - level.Warn(spanLogger).Log("msg", "received error while enqueuing request", "err", err) + level.Warn(spanLogger).Log("msg", "received error while sending request to scheduler", "err", err) req.enqueue <- enqueueResult{status: failed} return err }