From a905f0920cecc53698f74529dce034b9c24ee2a7 Mon Sep 17 00:00:00 2001 From: Ed Welch Date: Fri, 19 Jul 2024 14:13:26 +0000 Subject: [PATCH] fix retry code to handle grpc status codes. updated newer stats retries to be wrapped with spans Signed-off-by: Edward Welch --- .../queryrange/queryrangebase/retry.go | 13 ++++++++--- pkg/querier/queryrange/roundtrip.go | 23 +++++++++++-------- 2 files changed, 24 insertions(+), 12 deletions(-) diff --git a/pkg/querier/queryrange/queryrangebase/retry.go b/pkg/querier/queryrange/queryrangebase/retry.go index 1bd619324ef1..691d20fa5b5e 100644 --- a/pkg/querier/queryrange/queryrangebase/retry.go +++ b/pkg/querier/queryrange/queryrangebase/retry.go @@ -11,7 +11,6 @@ import ( "github.com/grafana/dskit/grpcutil" "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/client_golang/prometheus/promauto" - "google.golang.org/grpc/codes" "github.com/grafana/loki/v3/pkg/util" util_log "github.com/grafana/loki/v3/pkg/util/log" @@ -97,8 +96,13 @@ func (r retry) Do(ctx context.Context, req Request) (Response, error) { return nil, ctx.Err() } - // Retry if we get a HTTP 500 or an unknown error. - if code := grpcutil.ErrorToStatusCode(err); code == codes.Unknown || code/100 == 5 { + code := grpcutil.ErrorToStatusCode(err) + // Error handling is tricky... There are many places we wrap any error and set an HTTP style status code + // but there are also places where we return an existing GRPC object which will use GRPC status codes + // If the code is < 100 it's a gRPC status code, currently we retry all of these, even codes.Canceled + // because when our pools close connections they do so with a cancel and we want to retry these + // If it's > 100, it's an HTTP code and we only retry 5xx + if code < 100 || code/100 == 5 { lastErr = err level.Error(util_log.WithContext(ctx, r.log)).Log( "msg", "error processing request", @@ -112,10 +116,13 @@ func (r retry) Do(ctx context.Context, req Request) (Response, error) { "end_delta", time.Since(end), "length", end.Sub(start), "retry_in", bk.NextDelay(), + "code", code, "err", err, ) bk.Wait() continue + } else { + level.Warn(util_log.WithContext(ctx, r.log)).Log("msg", "received an error but not a retryable code, this is possibly a bug.", "code", code, "err", err) } return nil, err diff --git a/pkg/querier/queryrange/roundtrip.go b/pkg/querier/queryrange/roundtrip.go index dc2b3bace165..8ed3b2695113 100644 --- a/pkg/querier/queryrange/roundtrip.go +++ b/pkg/querier/queryrange/roundtrip.go @@ -286,8 +286,9 @@ func NewDetectedLabelsTripperware(cfg Config, opts logql.EngineOpts, logger log. return base.MiddlewareFunc(func(next base.Handler) base.Handler { statsHandler := mw.Wrap(next) if cfg.MaxRetries > 0 { + tr := base.InstrumentMiddleware("retry", metrics.InstrumentMiddlewareMetrics) rm := base.NewRetryMiddleware(logger, cfg.MaxRetries, metrics.RetryMiddlewareMetrics, namespace) - statsHandler = rm.Wrap(statsHandler) + statsHandler = queryrangebase.MergeMiddlewares(tr, rm).Wrap(statsHandler) } splitter := newDefaultSplitter(limits, iqo) @@ -559,9 +560,10 @@ func NewLogFilterTripperware(cfg Config, engineOpts logql.EngineOpts, log log.Lo statsHandler := indexStatsTripperware.Wrap(next) retryNextHandler := next if cfg.MaxRetries > 0 { + tr := base.InstrumentMiddleware("retry", metrics.InstrumentMiddlewareMetrics) rm := base.NewRetryMiddleware(log, cfg.MaxRetries, metrics.RetryMiddlewareMetrics, metricsNamespace) - statsHandler = rm.Wrap(statsHandler) - retryNextHandler = rm.Wrap(next) + statsHandler = queryrangebase.MergeMiddlewares(tr, rm).Wrap(statsHandler) + retryNextHandler = queryrangebase.MergeMiddlewares(tr, rm).Wrap(next) } queryRangeMiddleware := []base.Middleware{ @@ -631,9 +633,10 @@ func NewLimitedTripperware(cfg Config, engineOpts logql.EngineOpts, log log.Logg statsHandler := indexStatsTripperware.Wrap(next) retryNextHandler := next if cfg.MaxRetries > 0 { + tr := base.InstrumentMiddleware("retry", metrics.InstrumentMiddlewareMetrics) rm := base.NewRetryMiddleware(log, cfg.MaxRetries, metrics.RetryMiddlewareMetrics, metricsNamespace) - statsHandler = rm.Wrap(statsHandler) - retryNextHandler = rm.Wrap(next) + statsHandler = queryrangebase.MergeMiddlewares(tr, rm).Wrap(statsHandler) + retryNextHandler = queryrangebase.MergeMiddlewares(tr, rm).Wrap(next) } queryRangeMiddleware := []base.Middleware{ @@ -874,9 +877,10 @@ func NewMetricTripperware(cfg Config, engineOpts logql.EngineOpts, log log.Logge statsHandler := indexStatsTripperware.Wrap(next) retryNextHandler := next if cfg.MaxRetries > 0 { + tr := base.InstrumentMiddleware("retry", metrics.InstrumentMiddlewareMetrics) rm := base.NewRetryMiddleware(log, cfg.MaxRetries, metrics.RetryMiddlewareMetrics, metricsNamespace) - statsHandler = rm.Wrap(statsHandler) - retryNextHandler = rm.Wrap(next) + statsHandler = queryrangebase.MergeMiddlewares(tr, rm).Wrap(statsHandler) + retryNextHandler = queryrangebase.MergeMiddlewares(tr, rm).Wrap(next) } queryRangeMiddleware := []base.Middleware{ @@ -1003,9 +1007,10 @@ func NewInstantMetricTripperware( statsHandler := indexStatsTripperware.Wrap(next) retryNextHandler := next if cfg.MaxRetries > 0 { + tr := base.InstrumentMiddleware("retry", metrics.InstrumentMiddlewareMetrics) rm := base.NewRetryMiddleware(log, cfg.MaxRetries, metrics.RetryMiddlewareMetrics, metricsNamespace) - statsHandler = rm.Wrap(statsHandler) - retryNextHandler = rm.Wrap(next) + statsHandler = queryrangebase.MergeMiddlewares(tr, rm).Wrap(statsHandler) + retryNextHandler = queryrangebase.MergeMiddlewares(tr, rm).Wrap(next) } queryRangeMiddleware := []base.Middleware{