Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Mimir query engine: track active queries #8277

Merged
merged 8 commits into from
Jun 6, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
* [FEATURE] Continuous-test: now runable as a module with `mimir -target=continuous-test`. #7747
* [FEATURE] Store-gateway: Allow specific tenants to be enabled or disabled via `-store-gateway.enabled-tenants` or `-store-gateway.disabled-tenants` CLI flags or their corresponding YAML settings. #7653
* [FEATURE] New `-<prefix>.s3.bucket-lookup-type` flag configures lookup style type, used to access bucket in s3 compatible providers. #7684
* [FEATURE] Querier: add experimental streaming PromQL engine, enabled with `-querier.promql-engine=streaming`. #7693 #7898 #7899 #8023 #8058 #8096 #8121 #8197 #8230 #8247
* [FEATURE] Querier: add experimental streaming PromQL engine, enabled with `-querier.promql-engine=streaming`. #7693 #7898 #7899 #8023 #8058 #8096 #8121 #8197 #8230 #8247 #8277
* [FEATURE] New `/ingester/unregister-on-shutdown` HTTP endpoint allows dynamic access to ingesters' `-ingester.ring.unregister-on-shutdown` configuration. #7739
* [FEATURE] Server: added experimental [PROXY protocol support](https://www.haproxy.org/download/2.3/doc/proxy-protocol.txt). The PROXY protocol support can be enabled via `-server.proxy-protocol-enabled=true`. When enabled, the support is added both to HTTP and gRPC listening ports. #7698
* [FEATURE] mimirtool: Add `runtime-config verify` sub-command, for verifying Mimir runtime config files. #8123
Expand Down
14 changes: 8 additions & 6 deletions pkg/streamingpromql/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,16 +36,18 @@ func NewEngine(opts promql.EngineOpts, limitsProvider QueryLimitsProvider) (prom
}

return &Engine{
lookbackDelta: lookbackDelta,
timeout: opts.Timeout,
limitsProvider: limitsProvider,
lookbackDelta: lookbackDelta,
timeout: opts.Timeout,
limitsProvider: limitsProvider,
activeQueryTracker: opts.ActiveQueryTracker,
}, nil
}

type Engine struct {
lookbackDelta time.Duration
timeout time.Duration
limitsProvider QueryLimitsProvider
lookbackDelta time.Duration
timeout time.Duration
limitsProvider QueryLimitsProvider
activeQueryTracker promql.QueryTracker
}

func (e *Engine) NewInstantQuery(ctx context.Context, q storage.Queryable, opts promql.QueryOpts, qs string, ts time.Time) (promql.Query, error) {
Expand Down
229 changes: 199 additions & 30 deletions pkg/streamingpromql/engine_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ package streamingpromql
import (
"context"
"errors"
"fmt"
"io"
"io/fs"
"os"
Expand Down Expand Up @@ -519,43 +520,211 @@ func TestMemoryConsumptionLimit(t *testing.T) {

for name, testCase := range testCases {
t.Run(name, func(t *testing.T) {
t.Run("range query", func(t *testing.T) {
opts := NewTestEngineOpts()
engine, err := NewEngine(opts, NewStaticQueryLimitsProvider(testCase.rangeQueryLimit))
require.NoError(t, err)
queryTypes := map[string]func() (promql.Query, error){
"range query": func() (promql.Query, error) {
opts := NewTestEngineOpts()
engine, err := NewEngine(opts, NewStaticQueryLimitsProvider(testCase.rangeQueryLimit))
if err != nil {
return nil, err
}

start := timestamp.Time(0)
return engine.NewRangeQuery(ctx, storage, nil, testCase.expr, start, start.Add(4*time.Minute), time.Minute)
},
"instant query": func() (promql.Query, error) {
opts := NewTestEngineOpts()
engine, err := NewEngine(opts, NewStaticQueryLimitsProvider(testCase.instantQueryLimit))
if err != nil {
return nil, err
}

start := timestamp.Time(0)
return engine.NewInstantQuery(ctx, storage, nil, testCase.expr, start)
},
}

start := timestamp.Time(0)
q, err := engine.NewRangeQuery(ctx, storage, nil, testCase.expr, start, start.Add(4*time.Minute), time.Minute)
require.NoError(t, err)
defer q.Close()
for queryType, createQuery := range queryTypes {
t.Run(queryType, func(t *testing.T) {
q, err := createQuery()
require.NoError(t, err)
t.Cleanup(q.Close)

res := q.Exec(ctx)
res := q.Exec(ctx)

if testCase.shouldSucceed {
require.NoError(t, res.Err)
} else {
require.ErrorContains(t, res.Err, globalerror.MaxEstimatedMemoryConsumptionPerQuery.Error())
}
})
if testCase.shouldSucceed {
require.NoError(t, res.Err)
} else {
require.ErrorContains(t, res.Err, globalerror.MaxEstimatedMemoryConsumptionPerQuery.Error())
}
})
}
})
}
}

t.Run("instant query", func(t *testing.T) {
opts := NewTestEngineOpts()
engine, err := NewEngine(opts, NewStaticQueryLimitsProvider(testCase.instantQueryLimit))
require.NoError(t, err)
func TestActiveQueryTracker(t *testing.T) {
for _, shouldSucceed := range []bool{true, false} {
t.Run(fmt.Sprintf("successful query = %v", shouldSucceed), func(t *testing.T) {
opts := NewTestEngineOpts()
tracker := &testQueryTracker{}
opts.ActiveQueryTracker = tracker
engine, err := NewEngine(opts, NewStaticQueryLimitsProvider(0))
require.NoError(t, err)

start := timestamp.Time(0)
q, err := engine.NewInstantQuery(ctx, storage, nil, testCase.expr, start)
require.NoError(t, err)
defer q.Close()
innerStorage := promqltest.LoadedStorage(t, "")
t.Cleanup(func() { require.NoError(t, innerStorage.Close()) })

res := q.Exec(ctx)
// Use a fake queryable as a way to check that the query is recorded as active while the query is in progress.
queryTrackingTestingQueryable := &activeQueryTrackerQueryable{
innerStorage: innerStorage,
tracker: tracker,
}

if testCase.shouldSucceed {
require.NoError(t, res.Err)
} else {
require.ErrorContains(t, res.Err, globalerror.MaxEstimatedMemoryConsumptionPerQuery.Error())
}
})
if !shouldSucceed {
queryTrackingTestingQueryable.err = errors.New("something went wrong inside the query")
}

queryTypes := map[string]func(expr string) (promql.Query, error){
"range": func(expr string) (promql.Query, error) {
return engine.NewRangeQuery(context.Background(), queryTrackingTestingQueryable, nil, expr, timestamp.Time(0), timestamp.Time(0).Add(time.Hour), time.Minute)
},
"instant": func(expr string) (promql.Query, error) {
return engine.NewInstantQuery(context.Background(), queryTrackingTestingQueryable, nil, expr, timestamp.Time(0))
},
}

for queryType, createQuery := range queryTypes {
t.Run(queryType+" query", func(t *testing.T) {
expr := "test_" + queryType + "_query"
queryTrackingTestingQueryable.activeQueryAtQueryTime = trackedQuery{}

q, err := createQuery(expr)
require.NoError(t, err)
defer q.Close()

res := q.Exec(context.Background())

if shouldSucceed {
require.NoError(t, res.Err)
} else {
require.EqualError(t, res.Err, "something went wrong inside the query")
}

// Check that the query was active in the query tracker while the query was executing.
require.Equal(t, expr, queryTrackingTestingQueryable.activeQueryAtQueryTime.expr)
require.False(t, queryTrackingTestingQueryable.activeQueryAtQueryTime.deleted)

// Check that the query has now been marked as deleted in the query tracker.
require.NotEmpty(t, tracker.queries)
trackedQuery := tracker.queries[len(tracker.queries)-1]
require.Equal(t, expr, trackedQuery.expr)
require.Equal(t, true, trackedQuery.deleted)
})
}
})
}
}

type testQueryTracker struct {
queries []trackedQuery
}

type trackedQuery struct {
expr string
deleted bool
}

func (qt *testQueryTracker) GetMaxConcurrent() int {
return 0
}

func (qt *testQueryTracker) Insert(_ context.Context, query string) (int, error) {
qt.queries = append(qt.queries, trackedQuery{
expr: query,
deleted: false,
})

return len(qt.queries) - 1, nil
}

func (qt *testQueryTracker) Delete(insertIndex int) {
qt.queries[insertIndex].deleted = true
}

type activeQueryTrackerQueryable struct {
tracker *testQueryTracker

activeQueryAtQueryTime trackedQuery

innerStorage storage.Queryable
err error
}

func (a *activeQueryTrackerQueryable) Querier(mint, maxt int64) (storage.Querier, error) {
a.activeQueryAtQueryTime = a.tracker.queries[len(a.tracker.queries)-1]

if a.err != nil {
return nil, a.err
}

return a.innerStorage.Querier(mint, maxt)
}

func TestActiveQueryTracker_WaitingForTrackerIncludesQueryTimeout(t *testing.T) {
tracker := &timeoutTestingQueryTracker{}
opts := NewTestEngineOpts()
opts.Timeout = 10 * time.Millisecond
opts.ActiveQueryTracker = tracker
engine, err := NewEngine(opts, NewStaticQueryLimitsProvider(0))
require.NoError(t, err)

queryTypes := map[string]func() (promql.Query, error){
"range": func() (promql.Query, error) {
return engine.NewRangeQuery(context.Background(), nil, nil, "some_test_query", timestamp.Time(0), timestamp.Time(0).Add(time.Hour), time.Minute)
},
"instant": func() (promql.Query, error) {
return engine.NewInstantQuery(context.Background(), nil, nil, "some_test_query", timestamp.Time(0))
},
}

for queryType, createQuery := range queryTypes {
t.Run(queryType+" query", func(t *testing.T) {
tracker.sawTimeout = false

q, err := createQuery()
require.NoError(t, err)
defer q.Close()

res := q.Exec(context.Background())

require.True(t, tracker.sawTimeout, "query tracker was not called with a context that timed out")

require.Error(t, res.Err)
require.ErrorIs(t, res.Err, context.DeadlineExceeded)
require.EqualError(t, res.Err, "context deadline exceeded: query timed out")
require.Nil(t, res.Value)
})
}
}

type timeoutTestingQueryTracker struct {
sawTimeout bool
}

func (t *timeoutTestingQueryTracker) GetMaxConcurrent() int {
return 0
}

func (t *timeoutTestingQueryTracker) Insert(ctx context.Context, _ string) (int, error) {
select {
case <-ctx.Done():
t.sawTimeout = true
return 0, context.Cause(ctx)
case <-time.After(time.Second):
return 0, errors.New("gave up waiting for query to time out")
}
}

func (t *timeoutTestingQueryTracker) Delete(_ int) {
panic("should not be called")
}
9 changes: 9 additions & 0 deletions pkg/streamingpromql/query.go
Original file line number Diff line number Diff line change
Expand Up @@ -272,6 +272,15 @@ func (q *Query) Exec(ctx context.Context) *promql.Result {
// (so that it runs before the cancellation of the context with timeout created above).
defer cancel(errQueryFinished)

if q.engine.activeQueryTracker != nil {
queryID, err := q.engine.activeQueryTracker.Insert(ctx, q.qs)
if err != nil {
return &promql.Result{Err: err}
}

defer q.engine.activeQueryTracker.Delete(queryID)
}

series, err := q.root.SeriesMetadata(ctx)
if err != nil {
return &promql.Result{Err: err}
Expand Down
Loading