From 7232795e1f5fb1868c83111f5aab72ca0f3d9891 Mon Sep 17 00:00:00 2001 From: Ed Welch Date: Fri, 19 Jul 2024 14:13:26 -0400 Subject: [PATCH] fix: add a retry middleware to all the stats handlers (#13584) Signed-off-by: Edward Welch --- .../queryrange/queryrangebase/retry.go | 24 +++++++++++++ pkg/querier/queryrange/querysharding.go | 23 +++++++----- pkg/querier/queryrange/querysharding_test.go | 6 ++++ pkg/querier/queryrange/roundtrip.go | 32 +++++++++++++++++ pkg/querier/queryrange/shard_resolver.go | 35 ++++++++++--------- 5 files changed, 95 insertions(+), 25 deletions(-) diff --git a/pkg/querier/queryrange/queryrangebase/retry.go b/pkg/querier/queryrange/queryrangebase/retry.go index a3f2693b1ba7..1bd619324ef1 100644 --- a/pkg/querier/queryrange/queryrangebase/retry.go +++ b/pkg/querier/queryrange/queryrangebase/retry.go @@ -2,6 +2,7 @@ package queryrangebase import ( "context" + "reflect" "time" "github.com/go-kit/log" @@ -81,20 +82,28 @@ func (r retry) Do(ctx context.Context, req Request) (Response, error) { query := req.GetQuery() for ; tries < r.maxRetries; tries++ { + // Make sure the context isn't done before sending the request if ctx.Err() != nil { return nil, ctx.Err() } + resp, err := r.next.Do(ctx, req) if err == nil { return resp, nil } + // Make sure the context isn't done before retrying the request + if ctx.Err() != nil { + return nil, ctx.Err() + } + // Retry if we get a HTTP 500 or an unknown error. if code := grpcutil.ErrorToStatusCode(err); code == codes.Unknown || code/100 == 5 { lastErr = err level.Error(util_log.WithContext(ctx, r.log)).Log( "msg", "error processing request", "try", tries, + "type", logImplementingType(req), "query", query, "query_hash", util.HashedQuery(query), "start", start.Format(time.RFC3339Nano), @@ -113,3 +122,18 @@ func (r retry) Do(ctx context.Context, req Request) (Response, error) { } return nil, lastErr } + +func logImplementingType(i Request) string { + if i == nil { + return "nil" + } + + t := reflect.TypeOf(i) + + // Check if it's a pointer and get the underlying type if so + if t.Kind() == reflect.Ptr { + t = t.Elem() + } + + return t.String() +} diff --git a/pkg/querier/queryrange/querysharding.go b/pkg/querier/queryrange/querysharding.go index 07da7abfb651..bd5c26079636 100644 --- a/pkg/querier/queryrange/querysharding.go +++ b/pkg/querier/queryrange/querysharding.go @@ -42,6 +42,7 @@ func NewQueryShardMiddleware( limits Limits, maxShards int, statsHandler queryrangebase.Handler, + retryNextHandler queryrangebase.Handler, shardAggregation []string, ) queryrangebase.Middleware { noshards := !hasShards(confs) @@ -56,7 +57,7 @@ func NewQueryShardMiddleware( } mapperware := queryrangebase.MiddlewareFunc(func(next queryrangebase.Handler) queryrangebase.Handler { - return newASTMapperware(confs, engineOpts, next, statsHandler, logger, shardingMetrics, limits, maxShards, shardAggregation) + return newASTMapperware(confs, engineOpts, next, retryNextHandler, statsHandler, logger, shardingMetrics, limits, maxShards, shardAggregation) }) return queryrangebase.MiddlewareFunc(func(next queryrangebase.Handler) queryrangebase.Handler { @@ -76,6 +77,7 @@ func newASTMapperware( confs ShardingConfigs, engineOpts logql.EngineOpts, next queryrangebase.Handler, + retryNextHandler queryrangebase.Handler, statsHandler queryrangebase.Handler, logger log.Logger, metrics *logql.MapperMetrics, @@ -88,6 +90,7 @@ func newASTMapperware( logger: log.With(logger, "middleware", "QueryShard.astMapperware"), limits: limits, next: next, + retryNextHandler: retryNextHandler, statsHandler: next, ng: logql.NewDownstreamEngine(engineOpts, DownstreamHandler{next: next, limits: limits}, limits, logger), metrics: metrics, @@ -103,14 +106,15 @@ func newASTMapperware( } type astMapperware struct { - confs ShardingConfigs - logger log.Logger - limits Limits - next queryrangebase.Handler - statsHandler queryrangebase.Handler - ng *logql.DownstreamEngine - metrics *logql.MapperMetrics - maxShards int + confs ShardingConfigs + logger log.Logger + limits Limits + next queryrangebase.Handler + retryNextHandler queryrangebase.Handler + statsHandler queryrangebase.Handler + ng *logql.DownstreamEngine + metrics *logql.MapperMetrics + maxShards int // Feature flag for sharding range and vector aggregations such as // quantile_ver_time with probabilistic data structures. @@ -191,6 +195,7 @@ func (ast *astMapperware) Do(ctx context.Context, r queryrangebase.Request) (que ast.maxShards, r, ast.statsHandler, + ast.retryNextHandler, ast.next, ast.limits, ) diff --git a/pkg/querier/queryrange/querysharding_test.go b/pkg/querier/queryrange/querysharding_test.go index b17b38d6932a..809013ffb002 100644 --- a/pkg/querier/queryrange/querysharding_test.go +++ b/pkg/querier/queryrange/querysharding_test.go @@ -168,6 +168,7 @@ func Test_astMapper(t *testing.T) { }, testEngineOpts, handler, + handler, nil, log.NewNopLogger(), nilShardingMetrics, @@ -307,6 +308,7 @@ func Test_astMapper_QuerySizeLimits(t *testing.T) { }, testEngineOpts, handler, + handler, nil, log.NewNopLogger(), nilShardingMetrics, @@ -352,6 +354,7 @@ func Test_ShardingByPass(t *testing.T) { }, testEngineOpts, handler, + handler, nil, log.NewNopLogger(), nilShardingMetrics, @@ -439,6 +442,7 @@ func Test_InstantSharding(t *testing.T) { }, 0, nil, + nil, []string{}, ) response, err := sharding.Wrap(queryrangebase.HandlerFunc(func(c context.Context, r queryrangebase.Request) (queryrangebase.Response, error) { @@ -718,6 +722,7 @@ func TestShardingAcrossConfigs_ASTMapper(t *testing.T) { confs, testEngineOpts, handler, + handler, nil, log.NewNopLogger(), nilShardingMetrics, @@ -853,6 +858,7 @@ func Test_ASTMapper_MaxLookBackPeriod(t *testing.T) { testSchemasTSDB, engineOpts, queryHandler, + queryHandler, statsHandler, log.NewNopLogger(), nilShardingMetrics, diff --git a/pkg/querier/queryrange/roundtrip.go b/pkg/querier/queryrange/roundtrip.go index 8ea588d11fd6..dc2b3bace165 100644 --- a/pkg/querier/queryrange/roundtrip.go +++ b/pkg/querier/queryrange/roundtrip.go @@ -285,6 +285,10 @@ func NewMiddleware( func NewDetectedLabelsTripperware(cfg Config, opts logql.EngineOpts, logger log.Logger, l Limits, schema config.SchemaConfig, metrics *Metrics, mw base.Middleware, namespace string, merger base.Merger, limits Limits, iqo util.IngesterQueryOptions) (base.Middleware, error) { return base.MiddlewareFunc(func(next base.Handler) base.Handler { statsHandler := mw.Wrap(next) + if cfg.MaxRetries > 0 { + rm := base.NewRetryMiddleware(logger, cfg.MaxRetries, metrics.RetryMiddlewareMetrics, namespace) + statsHandler = rm.Wrap(statsHandler) + } splitter := newDefaultSplitter(limits, iqo) queryRangeMiddleware := []base.Middleware{ @@ -553,6 +557,12 @@ func getOperation(path string) string { func NewLogFilterTripperware(cfg Config, engineOpts logql.EngineOpts, log log.Logger, limits Limits, schema config.SchemaConfig, merger base.Merger, iqo util.IngesterQueryOptions, c cache.Cache, metrics *Metrics, indexStatsTripperware base.Middleware, metricsNamespace string) (base.Middleware, error) { return base.MiddlewareFunc(func(next base.Handler) base.Handler { statsHandler := indexStatsTripperware.Wrap(next) + retryNextHandler := next + if cfg.MaxRetries > 0 { + rm := base.NewRetryMiddleware(log, cfg.MaxRetries, metrics.RetryMiddlewareMetrics, metricsNamespace) + statsHandler = rm.Wrap(statsHandler) + retryNextHandler = rm.Wrap(next) + } queryRangeMiddleware := []base.Middleware{ QueryMetricsMiddleware(metrics.QueryMetrics), @@ -592,6 +602,7 @@ func NewLogFilterTripperware(cfg Config, engineOpts logql.EngineOpts, log log.Lo limits, 0, // 0 is unlimited shards statsHandler, + retryNextHandler, cfg.ShardAggregations, ), ) @@ -618,6 +629,12 @@ func NewLogFilterTripperware(cfg Config, engineOpts logql.EngineOpts, log log.Lo func NewLimitedTripperware(cfg Config, engineOpts logql.EngineOpts, log log.Logger, limits Limits, schema config.SchemaConfig, metrics *Metrics, merger base.Merger, iqo util.IngesterQueryOptions, indexStatsTripperware base.Middleware, metricsNamespace string) (base.Middleware, error) { return base.MiddlewareFunc(func(next base.Handler) base.Handler { statsHandler := indexStatsTripperware.Wrap(next) + retryNextHandler := next + if cfg.MaxRetries > 0 { + rm := base.NewRetryMiddleware(log, cfg.MaxRetries, metrics.RetryMiddlewareMetrics, metricsNamespace) + statsHandler = rm.Wrap(statsHandler) + retryNextHandler = rm.Wrap(next) + } queryRangeMiddleware := []base.Middleware{ StatsCollectorMiddleware(), @@ -639,6 +656,7 @@ func NewLimitedTripperware(cfg Config, engineOpts logql.EngineOpts, log log.Logg // and overwhelming the frontend, therefore we fix the number of shards to prevent this. 32, // 0 is unlimited shards statsHandler, + retryNextHandler, cfg.ShardAggregations, ), ) @@ -854,6 +872,12 @@ func NewMetricTripperware(cfg Config, engineOpts logql.EngineOpts, log log.Logge return base.MiddlewareFunc(func(next base.Handler) base.Handler { statsHandler := indexStatsTripperware.Wrap(next) + retryNextHandler := next + if cfg.MaxRetries > 0 { + rm := base.NewRetryMiddleware(log, cfg.MaxRetries, metrics.RetryMiddlewareMetrics, metricsNamespace) + statsHandler = rm.Wrap(statsHandler) + retryNextHandler = rm.Wrap(next) + } queryRangeMiddleware := []base.Middleware{ QueryMetricsMiddleware(metrics.QueryMetrics), @@ -895,6 +919,7 @@ func NewMetricTripperware(cfg Config, engineOpts logql.EngineOpts, log log.Logge limits, 0, // 0 is unlimited shards statsHandler, + retryNextHandler, cfg.ShardAggregations, ), ) @@ -976,6 +1001,12 @@ func NewInstantMetricTripperware( return base.MiddlewareFunc(func(next base.Handler) base.Handler { statsHandler := indexStatsTripperware.Wrap(next) + retryNextHandler := next + if cfg.MaxRetries > 0 { + rm := base.NewRetryMiddleware(log, cfg.MaxRetries, metrics.RetryMiddlewareMetrics, metricsNamespace) + statsHandler = rm.Wrap(statsHandler) + retryNextHandler = rm.Wrap(next) + } queryRangeMiddleware := []base.Middleware{ StatsCollectorMiddleware(), @@ -1003,6 +1034,7 @@ func NewInstantMetricTripperware( limits, 0, // 0 is unlimited shards statsHandler, + retryNextHandler, cfg.ShardAggregations, ), ) diff --git a/pkg/querier/queryrange/shard_resolver.go b/pkg/querier/queryrange/shard_resolver.go index 33438c371781..31366d0a0dd7 100644 --- a/pkg/querier/queryrange/shard_resolver.go +++ b/pkg/querier/queryrange/shard_resolver.go @@ -37,21 +37,22 @@ func shardResolverForConf( maxParallelism int, maxShards int, r queryrangebase.Request, - statsHandler, next queryrangebase.Handler, + statsHandler, next, retryNext queryrangebase.Handler, limits Limits, ) (logql.ShardResolver, bool) { if conf.IndexType == types.TSDBType { return &dynamicShardResolver{ - ctx: ctx, - logger: logger, - statsHandler: statsHandler, - next: next, - limits: limits, - from: model.Time(r.GetStart().UnixMilli()), - through: model.Time(r.GetEnd().UnixMilli()), - maxParallelism: maxParallelism, - maxShards: maxShards, - defaultLookback: defaultLookback, + ctx: ctx, + logger: logger, + statsHandler: statsHandler, + retryNextHandler: retryNext, + next: next, + limits: limits, + from: model.Time(r.GetStart().UnixMilli()), + through: model.Time(r.GetEnd().UnixMilli()), + maxParallelism: maxParallelism, + maxShards: maxShards, + defaultLookback: defaultLookback, }, true } if conf.RowShards < 2 { @@ -64,10 +65,11 @@ type dynamicShardResolver struct { ctx context.Context // TODO(owen-d): shouldn't have to fork handlers here -- one should just transparently handle the right logic // depending on the underlying type? - statsHandler queryrangebase.Handler // index stats handler (hooked up to results cache, etc) - next queryrangebase.Handler // next handler in the chain (used for non-stats reqs) - logger log.Logger - limits Limits + statsHandler queryrangebase.Handler // index stats handler (hooked up to results cache, etc) + retryNextHandler queryrangebase.Handler // next handler wrapped with retries + next queryrangebase.Handler // next handler in the chain (used for non-stats reqs) + logger log.Logger + limits Limits from, through model.Time maxParallelism int @@ -251,7 +253,8 @@ func (r *dynamicShardResolver) ShardingRanges(expr syntax.Expr, targetBytesPerSh exprStr := expr.String() // try to get shards for the given expression // if it fails, fallback to linearshards based on stats - resp, err := r.next.Do(r.ctx, &logproto.ShardsRequest{ + // use the retry handler here to retry transient errors + resp, err := r.retryNextHandler.Do(r.ctx, &logproto.ShardsRequest{ From: adjustedFrom, Through: r.through, Query: expr.String(),