Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Improve observability of enqueuing requests to query-scheduler #5818

Merged
merged 6 commits into from
Aug 24, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,7 @@
* [ENHANCEMENT] Querier: add experimental per-query chunks limit based on an estimate of the number of chunks that will be sent from ingesters and store-gateways that is enforced earlier during query evaluation. This limit is disabled by default and can be configured with `-querier.max-estimated-fetched-chunks-per-query-multiplier`. #5765
* [ENHANCEMENT] Ingester: add UI for listing tenants with TSDB on given ingester and viewing details of tenants's TSDB on given ingester. #5803
* [ENHANCEMENT] Querier: improve observability of calls to store-gateways during queries. #5809
* [ENHANCEMENT] Query-frontend: improve tracing of interactions with query-scheduler. #5818
* [BUGFIX] Ingester: Handle when previous ring state is leaving and the number of tokens has changed. #5204
* [BUGFIX] Querier: fix issue where queries that use the `timestamp()` function fail with `execution: attempted to read series at index 0 from stream, but the stream has already been exhausted` if streaming chunks from ingesters to queriers is enabled. #5370
* [BUGFIX] memberlist: bring back `memberlist_client_kv_store_count` metric that used to exist in Cortex, but got lost during dskit updates before Mimir 2.0. #5377
Expand Down
18 changes: 17 additions & 1 deletion pkg/frontend/v2/frontend.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ import (
"github.com/grafana/mimir/pkg/querier/stats"
"github.com/grafana/mimir/pkg/scheduler/schedulerdiscovery"
"github.com/grafana/mimir/pkg/util/httpgrpcutil"
"github.com/grafana/mimir/pkg/util/spanlogger"
)

// Config for a Frontend.
Expand Down Expand Up @@ -97,6 +98,7 @@ type frontendRequest struct {
userID string
statsEnabled bool

ctx context.Context
cancel context.CancelFunc

enqueue chan enqueueResult
Expand Down Expand Up @@ -199,6 +201,7 @@ func (f *Frontend) RoundTripGRPC(ctx context.Context, req *httpgrpc.HTTPRequest)
}
}

spanLogger := spanlogger.FromContext(ctx, f.log)
ctx, cancel := context.WithCancel(ctx)
defer cancel()

Expand All @@ -208,6 +211,7 @@ func (f *Frontend) RoundTripGRPC(ctx context.Context, req *httpgrpc.HTTPRequest)
userID: userID,
statsEnabled: stats.IsEnabled(ctx),

ctx: ctx,
cancel: cancel,

// Buffer of 1 to ensure response or error can be written to the channel
Expand All @@ -222,9 +226,12 @@ func (f *Frontend) RoundTripGRPC(ctx context.Context, req *httpgrpc.HTTPRequest)
retries := f.cfg.WorkerConcurrency + 1 // To make sure we hit at least two different schedulers.

enqueueAgain:
level.Debug(spanLogger).Log("msg", "enqueuing request")
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Note to reviewers: I've deliberately added this request started event and the "request succeeded" event below despite adding the span in enqueueRequest, as it's possible there's a delay between a request being added to f.requestsCh and it being picked up and enqueued by a worker.


var cancelCh chan<- uint64
select {
case <-ctx.Done():
level.Debug(spanLogger).Log("msg", "request context cancelled while enqueuing request, aborting", "err", ctx.Err())
return nil, ctx.Err()

case f.requestsCh <- freq:
Expand All @@ -236,27 +243,36 @@ enqueueAgain:
} else if enqRes.status == failed {
retries--
if retries > 0 {
level.Debug(spanLogger).Log("msg", "enqueuing request failed, will retry")
goto enqueueAgain
}
}

level.Debug(spanLogger).Log("msg", "enqueuing request failed, retries are exhausted, aborting")

return nil, httpgrpc.Errorf(http.StatusInternalServerError, "failed to enqueue request")
}

level.Debug(spanLogger).Log("msg", "request enqueued successfully, waiting for response")

select {
case <-ctx.Done():
level.Debug(spanLogger).Log("msg", "request context cancelled after enqueuing request, aborting", "err", ctx.Err())

if cancelCh != nil {
select {
case cancelCh <- freq.queryID:
// cancellation sent.
default:
// failed to cancel, ignore.
level.Warn(f.log).Log("msg", "failed to send cancellation request to scheduler, queue full")
level.Warn(spanLogger).Log("msg", "failed to send cancellation request to scheduler, queue full")
}
}
return nil, ctx.Err()

case resp := <-freq.response:
level.Debug(spanLogger).Log("msg", "received response")

if stats.ShouldTrackHTTPGRPCResponse(resp.HttpResponse) {
stats := stats.FromContext(ctx)
stats.Merge(resp.Stats) // Safe if stats is nil.
Expand Down
121 changes: 70 additions & 51 deletions pkg/frontend/v2/frontend_scheduler_worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (
"github.com/grafana/mimir/pkg/scheduler/schedulerdiscovery"
"github.com/grafana/mimir/pkg/scheduler/schedulerpb"
"github.com/grafana/mimir/pkg/util/servicediscovery"
"github.com/grafana/mimir/pkg/util/spanlogger"
)

const (
Expand Down Expand Up @@ -332,60 +333,10 @@ func (w *frontendSchedulerWorker) schedulerLoop(loop schedulerpb.SchedulerForFro
return nil

case req := <-w.requestsCh:
err := loop.Send(&schedulerpb.FrontendToScheduler{
Type: schedulerpb.ENQUEUE,
QueryID: req.queryID,
UserID: req.userID,
HttpRequest: req.request,
FrontendAddress: w.frontendAddr,
StatsEnabled: req.statsEnabled,
})
w.enqueuedRequests.Inc()

if err != nil {
req.enqueue <- enqueueResult{status: failed}
if err := w.enqueueRequest(loop, req); err != nil {
return err
}

resp, err := loop.Recv()
if err != nil {
req.enqueue <- enqueueResult{status: failed}
return err
}

switch resp.Status {
case schedulerpb.OK:
req.enqueue <- enqueueResult{status: waitForResponse, cancelCh: w.cancelCh}
// Response will come from querier.

case schedulerpb.SHUTTING_DOWN:
// Scheduler is shutting down, report failure to enqueue and stop this loop.
req.enqueue <- enqueueResult{status: failed}
return errors.New("scheduler is shutting down")

case schedulerpb.ERROR:
req.enqueue <- enqueueResult{status: waitForResponse}
req.response <- &frontendv2pb.QueryResultRequest{
HttpResponse: &httpgrpc.HTTPResponse{
Code: http.StatusInternalServerError,
Body: []byte(err.Error()),
},
}

case schedulerpb.TOO_MANY_REQUESTS_PER_TENANT:
req.enqueue <- enqueueResult{status: waitForResponse}
req.response <- &frontendv2pb.QueryResultRequest{
HttpResponse: &httpgrpc.HTTPResponse{
Code: http.StatusTooManyRequests,
Body: []byte("too many outstanding requests"),
},
}

default:
level.Error(w.log).Log("msg", "unknown response status from the scheduler", "resp", resp, "queryID", req.queryID)
req.enqueue <- enqueueResult{status: failed}
}

case reqID := <-w.cancelCh:
err := loop.Send(&schedulerpb.FrontendToScheduler{
Type: schedulerpb.CANCEL,
Expand All @@ -408,3 +359,71 @@ func (w *frontendSchedulerWorker) schedulerLoop(loop schedulerpb.SchedulerForFro
}
}
}

// enqueueRequest sends a request to this worker's scheduler, and returns an error if no further requests should be sent to the scheduler.
func (w *frontendSchedulerWorker) enqueueRequest(loop schedulerpb.SchedulerForFrontend_FrontendLoopClient, req *frontendRequest) error {
spanLogger, _ := spanlogger.NewWithLogger(req.ctx, w.log, "frontendSchedulerWorker.enqueueRequest")
spanLogger.Span.SetTag("scheduler_address", w.conn.Target())
defer spanLogger.Span.Finish()

err := loop.Send(&schedulerpb.FrontendToScheduler{
Type: schedulerpb.ENQUEUE,
QueryID: req.queryID,
UserID: req.userID,
HttpRequest: req.request,
FrontendAddress: w.frontendAddr,
StatsEnabled: req.statsEnabled,
})
w.enqueuedRequests.Inc()

if err != nil {
level.Warn(spanLogger).Log("msg", "received error while sending request to scheduler", "err", err)
req.enqueue <- enqueueResult{status: failed}
return err
}

resp, err := loop.Recv()
if err != nil {
level.Warn(spanLogger).Log("msg", "received error while receiving response", "err", err)
req.enqueue <- enqueueResult{status: failed}
return err
}

switch resp.Status {
case schedulerpb.OK:
req.enqueue <- enqueueResult{status: waitForResponse, cancelCh: w.cancelCh}
// Response will come from querier.

case schedulerpb.SHUTTING_DOWN:
// Scheduler is shutting down, report failure to enqueue and stop this loop.
level.Warn(spanLogger).Log("msg", "scheduler reported that it is shutting down")
req.enqueue <- enqueueResult{status: failed}
return errors.New("scheduler is shutting down")

case schedulerpb.ERROR:
level.Warn(spanLogger).Log("msg", "scheduler returned error", "err", resp.Error)
req.enqueue <- enqueueResult{status: waitForResponse}
req.response <- &frontendv2pb.QueryResultRequest{
HttpResponse: &httpgrpc.HTTPResponse{
Code: http.StatusInternalServerError,
Body: []byte(resp.Error),
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice catch!

},
}

case schedulerpb.TOO_MANY_REQUESTS_PER_TENANT:
level.Warn(spanLogger).Log("msg", "scheduler reported it has too many outstanding requests")
req.enqueue <- enqueueResult{status: waitForResponse}
req.response <- &frontendv2pb.QueryResultRequest{
HttpResponse: &httpgrpc.HTTPResponse{
Code: http.StatusTooManyRequests,
Body: []byte("too many outstanding requests"),
},
}

default:
level.Error(spanLogger).Log("msg", "unknown response status from the scheduler", "resp", resp, "queryID", req.queryID)
req.enqueue <- enqueueResult{status: failed}
}

return nil
}
Loading