diff --git a/CHANGELOG.md b/CHANGELOG.md index 65c97f3d8dee..13136a5b41cb 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -6,6 +6,7 @@ ##### Enhancements +* [11243](https://github.com/grafana/loki/pull/11243) **kavirajk**: Inflight-logging: Add extra metadata to inflight requests logging. * [11110](https://github.com/grafana/loki/pull/11003) **MichelHollands**: Change the default of the `metrics-namespace` flag to 'loki'. * [11086](https://github.com/grafana/loki/pull/11086) **kandrew5**: Helm: Allow topologySpreadConstraints * [11003](https://github.com/grafana/loki/pull/11003) **MichelHollands**: Add the `metrics-namespace` flag to change the namespace of metrics currently using cortex as namespace. diff --git a/Makefile b/Makefile index b1cacb113533..ee022ba2129f 100644 --- a/Makefile +++ b/Makefile @@ -42,6 +42,8 @@ BUILD_IMAGE_VERSION ?= 0.31.2 # Docker image info IMAGE_PREFIX ?= grafana +BUILD_IMAGE_PREFIX ?= grafana + IMAGE_TAG ?= $(shell ./tools/image-tag) # Version info for binaries @@ -102,7 +104,7 @@ RM := --rm TTY := --tty DOCKER_BUILDKIT ?= 1 -BUILD_IMAGE = BUILD_IMAGE=$(IMAGE_PREFIX)/loki-build-image:$(BUILD_IMAGE_VERSION) +BUILD_IMAGE = BUILD_IMAGE=$(BUILD_IMAGE_PREFIX)/loki-build-image:$(BUILD_IMAGE_VERSION) PUSH_OCI=docker push TAG_OCI=docker tag ifeq ($(CI), true) diff --git a/pkg/logql/blocker.go b/pkg/logql/blocker.go index d38a640456c3..cbfdc6bf49e3 100644 --- a/pkg/logql/blocker.go +++ b/pkg/logql/blocker.go @@ -8,6 +8,7 @@ import ( "github.com/go-kit/log/level" "github.com/grafana/regexp" + "github.com/grafana/loki/pkg/util" logutil "github.com/grafana/loki/pkg/util/log" "github.com/grafana/loki/pkg/util/validation" ) @@ -43,7 +44,7 @@ func (qb *queryBlocker) isBlocked(ctx context.Context, tenant string) bool { for _, b := range blocks { if b.Hash > 0 { - if b.Hash == HashedQuery(query) { + if b.Hash == util.HashedQuery(query) { level.Warn(logger).Log("msg", "query blocker matched with hash policy", "hash", b.Hash, "query", query) return qb.block(b, typ, logger) } diff --git a/pkg/logql/blocker_test.go b/pkg/logql/blocker_test.go index 3dc3b72c8159..e0dc00bf622e 100644 --- a/pkg/logql/blocker_test.go +++ b/pkg/logql/blocker_test.go @@ -12,6 +12,7 @@ import ( "github.com/grafana/loki/pkg/logproto" "github.com/grafana/loki/pkg/logqlmodel" + "github.com/grafana/loki/pkg/util" "github.com/grafana/loki/pkg/util/validation" ) @@ -124,7 +125,7 @@ func TestEngine_ExecWithBlockedQueries(t *testing.T) { "correct FNV32 hash matches", defaultQuery, []*validation.BlockedQuery{ { - Hash: HashedQuery(defaultQuery), + Hash: util.HashedQuery(defaultQuery), }, }, logqlmodel.ErrBlocked, }, @@ -132,7 +133,7 @@ func TestEngine_ExecWithBlockedQueries(t *testing.T) { "incorrect FNV32 hash does not match", defaultQuery, []*validation.BlockedQuery{ { - Hash: HashedQuery(defaultQuery) + 1, + Hash: util.HashedQuery(defaultQuery) + 1, }, }, nil, }, diff --git a/pkg/logql/engine.go b/pkg/logql/engine.go index 1b85ea05ea76..af680a33b9a9 100644 --- a/pkg/logql/engine.go +++ b/pkg/logql/engine.go @@ -219,7 +219,7 @@ func (q *query) Exec(ctx context.Context) (logqlmodel.Result, error) { ) if q.logExecQuery { - queryHash := HashedQuery(q.params.Query()) + queryHash := util.HashedQuery(q.params.Query()) if GetRangeType(q.params) == InstantType { level.Info(logutil.WithContext(ctx, q.logger)).Log("msg", "executing query", "type", "instant", "query", q.params.Query(), "query_hash", queryHash) } else { diff --git a/pkg/logql/engine_test.go b/pkg/logql/engine_test.go index ef7d5e0538e3..548400644a31 100644 --- a/pkg/logql/engine_test.go +++ b/pkg/logql/engine_test.go @@ -2669,7 +2669,7 @@ func TestHashingStability(t *testing.T) { {`sum (count_over_time({app="myapp",env="myenv"} |= "error" |= "metrics.go" | logfmt [10s])) by(query_hash)`}, } { params.qs = test.qs - expectedQueryHash := HashedQuery(test.qs) + expectedQueryHash := util.HashedQuery(test.qs) // check that both places will end up having the same query hash, even though they're emitting different log lines. require.Regexp(t, diff --git a/pkg/logql/metrics.go b/pkg/logql/metrics.go index 3ba3a9c61535..94a4c2f9dd40 100644 --- a/pkg/logql/metrics.go +++ b/pkg/logql/metrics.go @@ -2,7 +2,6 @@ package logql import ( "context" - "hash/fnv" "strconv" "strings" "time" @@ -19,6 +18,7 @@ import ( "github.com/grafana/loki/pkg/logqlmodel" logql_stats "github.com/grafana/loki/pkg/logqlmodel/stats" "github.com/grafana/loki/pkg/querier/astmapper" + "github.com/grafana/loki/pkg/util" "github.com/grafana/loki/pkg/util/constants" "github.com/grafana/loki/pkg/util/httpreq" util_log "github.com/grafana/loki/pkg/util/log" @@ -120,7 +120,7 @@ func RecordRangeAndInstantQueryMetrics( logValues = append(logValues, []interface{}{ "latency", latencyType, // this can be used to filter log lines. "query", p.Query(), - "query_hash", HashedQuery(p.Query()), + "query_hash", util.HashedQuery(p.Query()), "query_type", queryType, "range_type", rt, "length", p.End().Sub(p.Start()), @@ -187,12 +187,6 @@ func RecordRangeAndInstantQueryMetrics( recordUsageStats(queryType, stats) } -func HashedQuery(query string) uint32 { - h := fnv.New32() - _, _ = h.Write([]byte(query)) - return h.Sum32() -} - func RecordLabelQueryMetrics( ctx context.Context, log log.Logger, @@ -225,7 +219,7 @@ func RecordLabelQueryMetrics( "status", status, "label", label, "query", query, - "query_hash", HashedQuery(query), + "query_hash", util.HashedQuery(query), "total_entries", stats.Summary.TotalEntriesReturned, ) @@ -276,7 +270,7 @@ func RecordSeriesQueryMetrics(ctx context.Context, log log.Logger, start, end ti "duration", time.Duration(int64(stats.Summary.ExecTime*float64(time.Second))), "status", status, "match", PrintMatches(match), - "query_hash", HashedQuery(PrintMatches(match)), + "query_hash", util.HashedQuery(PrintMatches(match)), "total_entries", stats.Summary.TotalEntriesReturned) if shard != nil { @@ -316,7 +310,7 @@ func RecordStatsQueryMetrics(ctx context.Context, log log.Logger, start, end tim "duration", time.Duration(int64(stats.Summary.ExecTime*float64(time.Second))), "status", status, "query", query, - "query_hash", HashedQuery(query), + "query_hash", util.HashedQuery(query), "total_entries", stats.Summary.TotalEntriesReturned) level.Info(logger).Log(logValues...) @@ -346,7 +340,7 @@ func RecordVolumeQueryMetrics(ctx context.Context, log log.Logger, start, end ti "latency", latencyType, "query_type", queryType, "query", query, - "query_hash", HashedQuery(query), + "query_hash", util.HashedQuery(query), "start", start.Format(time.RFC3339Nano), "end", end.Format(time.RFC3339Nano), "start_delta", time.Since(start), diff --git a/pkg/logql/metrics_test.go b/pkg/logql/metrics_test.go index 950a16bb39a7..06d4e2699494 100644 --- a/pkg/logql/metrics_test.go +++ b/pkg/logql/metrics_test.go @@ -18,6 +18,7 @@ import ( "github.com/grafana/loki/pkg/logproto" "github.com/grafana/loki/pkg/logqlmodel" "github.com/grafana/loki/pkg/logqlmodel/stats" + "github.com/grafana/loki/pkg/util" "github.com/grafana/loki/pkg/util/httpreq" util_log "github.com/grafana/loki/pkg/util/log" ) @@ -191,11 +192,11 @@ func Test_testToKeyValues(t *testing.T) { } func TestQueryHashing(t *testing.T) { - h1 := HashedQuery(`{app="myapp",env="myenv"} |= "error" |= "metrics.go" |= logfmt`) - h2 := HashedQuery(`{app="myapp",env="myenv"} |= "error" |= logfmt |= "metrics.go"`) + h1 := util.HashedQuery(`{app="myapp",env="myenv"} |= "error" |= "metrics.go" |= logfmt`) + h2 := util.HashedQuery(`{app="myapp",env="myenv"} |= "error" |= logfmt |= "metrics.go"`) // check that it capture differences of order. require.NotEqual(t, h1, h2) - h3 := HashedQuery(`{app="myapp",env="myenv"} |= "error" |= "metrics.go" |= logfmt`) + h3 := util.HashedQuery(`{app="myapp",env="myenv"} |= "error" |= "metrics.go" |= logfmt`) // check that it evaluate same queries as same hashes, even if evaluated at different timestamps. require.Equal(t, h1, h3) } diff --git a/pkg/querier/queryrange/queryrangebase/retry.go b/pkg/querier/queryrange/queryrangebase/retry.go index 5dbad8d82582..d051363771bb 100644 --- a/pkg/querier/queryrange/queryrangebase/retry.go +++ b/pkg/querier/queryrange/queryrangebase/retry.go @@ -11,6 +11,7 @@ import ( "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/client_golang/prometheus/promauto" + "github.com/grafana/loki/pkg/util" util_log "github.com/grafana/loki/pkg/util/log" ) @@ -73,6 +74,11 @@ func (r retry) Do(ctx context.Context, req Request) (Response, error) { MaxRetries: 0, } bk := backoff.New(ctx, cfg) + + start := req.GetStart() + end := req.GetEnd() + query := req.GetQuery() + for ; tries < r.maxRetries; tries++ { if ctx.Err() != nil { return nil, ctx.Err() @@ -86,7 +92,19 @@ func (r retry) Do(ctx context.Context, req Request) (Response, error) { httpResp, ok := httpgrpc.HTTPResponseFromError(err) if !ok || httpResp.Code/100 == 5 { lastErr = err - level.Error(util_log.WithContext(ctx, r.log)).Log("msg", "error processing request", "try", tries, "query", req.GetQuery(), "retry_in", bk.NextDelay(), "err", err) + level.Error(util_log.WithContext(ctx, r.log)).Log( + "msg", "error processing request", + "try", tries, + "query", query, + "query_hash", util.HashedQuery(query), + "start", start.Format(time.RFC3339Nano), + "end", end.Format(time.RFC3339Nano), + "start_delta", time.Since(start), + "end_delta", time.Since(end), + "length", end.Sub(start), + "retry_in", bk.NextDelay(), + "err", err, + ) bk.Wait() continue } diff --git a/pkg/querier/queryrange/roundtrip.go b/pkg/querier/queryrange/roundtrip.go index e2a2ed002169..2b24ab4a917d 100644 --- a/pkg/querier/queryrange/roundtrip.go +++ b/pkg/querier/queryrange/roundtrip.go @@ -24,6 +24,7 @@ import ( base "github.com/grafana/loki/pkg/querier/queryrange/queryrangebase" "github.com/grafana/loki/pkg/storage/chunk/cache" "github.com/grafana/loki/pkg/storage/config" + "github.com/grafana/loki/pkg/util" "github.com/grafana/loki/pkg/util/constants" logutil "github.com/grafana/loki/pkg/util/log" ) @@ -247,8 +248,19 @@ func (r roundTripper) Do(ctx context.Context, req base.Request) (base.Response, return nil, httpgrpc.Errorf(http.StatusBadRequest, err.Error()) } - queryHash := logql.HashedQuery(op.Query) - level.Info(logger).Log("msg", "executing query", "type", "range", "query", op.Query, "length", op.EndTs.Sub(op.StartTs), "step", op.Step, "query_hash", queryHash) + queryHash := util.HashedQuery(op.Query) + level.Info(logger).Log( + "msg", "executing query", + "type", "range", + "query", op.Query, + "start", op.StartTs.Format(time.RFC3339Nano), + "end", op.EndTs.Format(time.RFC3339Nano), + "start_delta", time.Since(op.StartTs), + "end_delta", time.Since(op.EndTs), + "length", op.EndTs.Sub(op.StartTs), + "step", op.Step, + "query_hash", queryHash, + ) switch e := expr.(type) { case syntax.SampleExpr: @@ -296,7 +308,7 @@ func (r roundTripper) Do(ctx context.Context, req base.Request) (base.Response, return nil, httpgrpc.Errorf(http.StatusBadRequest, err.Error()) } - queryHash := logql.HashedQuery(op.Query) + queryHash := util.HashedQuery(op.Query) level.Info(logger).Log("msg", "executing query", "type", "instant", "query", op.Query, "query_hash", queryHash) switch expr.(type) { diff --git a/pkg/ruler/compat.go b/pkg/ruler/compat.go index db6316e9986d..8f70d314da88 100644 --- a/pkg/ruler/compat.go +++ b/pkg/ruler/compat.go @@ -24,11 +24,11 @@ import ( "github.com/prometheus/prometheus/rules" "github.com/prometheus/prometheus/template" - "github.com/grafana/loki/pkg/logql" "github.com/grafana/loki/pkg/logql/syntax" ruler "github.com/grafana/loki/pkg/ruler/base" "github.com/grafana/loki/pkg/ruler/rulespb" - "github.com/grafana/loki/pkg/ruler/util" + rulerutil "github.com/grafana/loki/pkg/ruler/util" + "github.com/grafana/loki/pkg/util" ) // RulesLimits is the one function we need from limits.Overrides, and @@ -40,7 +40,7 @@ type RulesLimits interface { RulerRemoteWriteURL(userID string) string RulerRemoteWriteTimeout(userID string) time.Duration RulerRemoteWriteHeaders(userID string) map[string]string - RulerRemoteWriteRelabelConfigs(userID string) []*util.RelabelConfig + RulerRemoteWriteRelabelConfigs(userID string) []*rulerutil.RelabelConfig RulerRemoteWriteConfig(userID string, id string) *config.RemoteWriteConfig RulerRemoteWriteQueueCapacity(userID string) int RulerRemoteWriteQueueMinShards(userID string) int @@ -60,7 +60,7 @@ type RulesLimits interface { // and passing an altered timestamp. func queryFunc(evaluator Evaluator, checker readyChecker, userID string, logger log.Logger) rules.QueryFunc { return func(ctx context.Context, qs string, t time.Time) (promql.Vector, error) { - hash := logql.HashedQuery(qs) + hash := util.HashedQuery(qs) detail := rules.FromOriginContext(ctx) detailLog := log.With(logger, "rule_name", detail.Name, "rule_type", detail.Kind, "query", qs, "query_hash", hash) diff --git a/pkg/ruler/evaluator_jitter.go b/pkg/ruler/evaluator_jitter.go index ef337c73396b..449ca0e18011 100644 --- a/pkg/ruler/evaluator_jitter.go +++ b/pkg/ruler/evaluator_jitter.go @@ -10,8 +10,8 @@ import ( "github.com/go-kit/log" "github.com/go-kit/log/level" - "github.com/grafana/loki/pkg/logql" "github.com/grafana/loki/pkg/logqlmodel" + "github.com/grafana/loki/pkg/util" ) // EvaluatorWithJitter wraps a given Evaluator. It applies a consistent jitter based on a rule's query string by hashing @@ -44,7 +44,7 @@ func NewEvaluatorWithJitter(inner Evaluator, maxJitter time.Duration, hasher has } func (e *EvaluatorWithJitter) Eval(ctx context.Context, qs string, now time.Time) (*logqlmodel.Result, error) { - logger := log.With(e.logger, "query", qs, "query_hash", logql.HashedQuery(qs)) + logger := log.With(e.logger, "query", qs, "query_hash", util.HashedQuery(qs)) jitter := e.calculateJitter(qs, logger) if jitter > 0 { diff --git a/pkg/ruler/evaluator_remote.go b/pkg/ruler/evaluator_remote.go index 4f953876d6c0..97a0c1ce7f9d 100644 --- a/pkg/ruler/evaluator_remote.go +++ b/pkg/ruler/evaluator_remote.go @@ -36,8 +36,8 @@ import ( "google.golang.org/grpc/keepalive" "github.com/grafana/loki/pkg/loghttp" - "github.com/grafana/loki/pkg/logql" "github.com/grafana/loki/pkg/logqlmodel" + "github.com/grafana/loki/pkg/util" "github.com/grafana/loki/pkg/util/build" "github.com/grafana/loki/pkg/util/constants" "github.com/grafana/loki/pkg/util/httpreq" @@ -220,7 +220,7 @@ func (r *RemoteEvaluator) query(ctx context.Context, orgID, query string, ts tim args.Set("time", ts.Format(time.RFC3339Nano)) } body := []byte(args.Encode()) - hash := logql.HashedQuery(query) + hash := util.HashedQuery(query) req := httpgrpc.HTTPRequest{ Method: http.MethodPost, diff --git a/pkg/util/hash_fp.go b/pkg/util/hash_fp.go index 209b8b45c064..e7c0253865b6 100644 --- a/pkg/util/hash_fp.go +++ b/pkg/util/hash_fp.go @@ -1,6 +1,10 @@ package util -import "github.com/prometheus/common/model" +import ( + "hash/fnv" + + "github.com/prometheus/common/model" +) // HashFP simply moves entropy from the most significant 48 bits of the // fingerprint into the least significant 16 bits (by XORing) so that a simple @@ -12,3 +16,10 @@ import "github.com/prometheus/common/model" func HashFP(fp model.Fingerprint) uint32 { return uint32(fp ^ (fp >> 32) ^ (fp >> 16)) } + +// HashedQuery returns a unique hash value for the given `query`. +func HashedQuery(query string) uint32 { + h := fnv.New32() + _, _ = h.Write([]byte(query)) + return h.Sum32() +}