diff --git a/pkg/streamingpromql/benchmarks/benchmarks.go b/pkg/streamingpromql/benchmarks/benchmarks.go index 86d5714d157..7bb990617ba 100644 --- a/pkg/streamingpromql/benchmarks/benchmarks.go +++ b/pkg/streamingpromql/benchmarks/benchmarks.go @@ -25,8 +25,9 @@ import ( var MetricSizes = []int{1, 100, 2000} type BenchCase struct { - Expr string - Steps int + Expr string + Steps int + InstantQueryOnly bool } func (c BenchCase) Name() string { @@ -75,6 +76,11 @@ func TestCases(metricSizes []int) []BenchCase { { Expr: "a_X", }, + // Range vector selector. + { + Expr: "a_X[1m]", + InstantQueryOnly: true, + }, // Simple rate. { Expr: "rate(a_X[1m])", @@ -198,7 +204,7 @@ func TestCases(metricSizes []int) []BenchCase { tmp = append(tmp, c) } else { for _, count := range metricSizes { - tmp = append(tmp, BenchCase{Expr: strings.ReplaceAll(c.Expr, "X", strconv.Itoa(count)), Steps: c.Steps}) + tmp = append(tmp, BenchCase{Expr: strings.ReplaceAll(c.Expr, "X", strconv.Itoa(count)), Steps: c.Steps, InstantQueryOnly: c.InstantQueryOnly}) } } } @@ -207,7 +213,11 @@ func TestCases(metricSizes []int) []BenchCase { // No step will be replaced by cases with the standard step. tmp = []BenchCase{} for _, c := range cases { - if c.Steps != 0 { + if c.Steps != 0 || c.InstantQueryOnly { + if c.InstantQueryOnly && c.Steps != 0 { + panic(fmt.Sprintf("invalid test case '%v': configured as instant query with non-zero number of steps %v", c.Expr, c.Steps)) + } + if c.Steps >= NumIntervals { // Note that this doesn't check we have enough data to cover any range selectors. panic(fmt.Sprintf("invalid test case '%v' with %v steps: test setup only creates %v steps", c.Expr, c.Steps, NumIntervals)) diff --git a/pkg/streamingpromql/engine_test.go b/pkg/streamingpromql/engine_test.go index 05bc841b8cd..13b80b04862 100644 --- a/pkg/streamingpromql/engine_test.go +++ b/pkg/streamingpromql/engine_test.go @@ -10,7 +10,10 @@ import ( "testing" "time" + "github.com/prometheus/prometheus/model/labels" + "github.com/prometheus/prometheus/model/timestamp" "github.com/prometheus/prometheus/promql" + "github.com/prometheus/prometheus/util/teststorage" "github.com/stretchr/testify/require" ) @@ -23,16 +26,16 @@ func TestUnsupportedPromQLFeatures(t *testing.T) { // The goal of this is not to list every conceivable expression that is unsupported, but to cover all the // different cases and make sure we produce a reasonable error message when these cases are encountered. unsupportedExpressions := map[string]string{ - "a + b": "PromQL expression type *parser.BinaryExpr", - "1 + 2": "PromQL expression type *parser.BinaryExpr", - "metric{} + other_metric{}": "PromQL expression type *parser.BinaryExpr", - "1": "PromQL expression type *parser.NumberLiteral", - "metric{} offset 2h": "instant vector selector with 'offset'", - "avg(metric{})": "'avg' aggregation", - "sum without(l) (metric{})": "grouping with 'without'", - "rate(metric{}[5m] offset 2h)": "range vector selector with 'offset'", - "avg_over_time(metric{}[5m])": "'avg_over_time' function", - "-sum(metric{})": "PromQL expression type *parser.UnaryExpr", + "a + b": "PromQL expression type *parser.BinaryExpr", + "1 + 2": "scalar value as top-level expression", + "metric{} + other_metric{}": "PromQL expression type *parser.BinaryExpr", + "1": "scalar value as top-level expression", + "metric{} offset 2h": "instant vector selector with 'offset'", + "avg(metric{})": "'avg' aggregation", + "sum without(l) (metric{})": "grouping with 'without'", + "rate(metric{}[5m:1m])": "PromQL expression type *parser.SubqueryExpr", + "avg_over_time(metric{}[5m])": "'avg_over_time' function", + "-sum(metric{})": "PromQL expression type *parser.UnaryExpr", } for expression, expectedError := range unsupportedExpressions { @@ -53,12 +56,8 @@ func TestUnsupportedPromQLFeatures(t *testing.T) { // These expressions are also unsupported, but are only valid as instant queries. unsupportedInstantQueryExpressions := map[string]string{ - "'a'": "PromQL expression type *parser.StringLiteral", - "metric{}[5m]": "PromQL expression type *parser.MatrixSelector", - "metric{}[5m] offset 2h": "PromQL expression type *parser.MatrixSelector", - "metric{}[5m] @ 123": "PromQL expression type *parser.MatrixSelector", - "metric{}[5m] @ start()": "PromQL expression type *parser.MatrixSelector", - "metric{}[5m] @ end()": "PromQL expression type *parser.MatrixSelector", + "'a'": "string value as top-level expression", + "metric{}[5m] offset 2h": "range vector selector with 'offset'", "metric{}[5m:1m]": "PromQL expression type *parser.SubqueryExpr", } @@ -159,3 +158,99 @@ func TestOurTestCases(t *testing.T) { }) } } + +// Testing instant queries that return a range vector is not supported by Prometheus' PromQL testing framework, +// and adding support for this would be quite involved. +// +// So instead, we test these few cases here instead. +func TestRangeVectorSelectors(t *testing.T) { + opts := NewTestEngineOpts() + streamingEngine, err := NewEngine(opts) + require.NoError(t, err) + + prometheusEngine := promql.NewEngine(opts) + + storage := teststorage.New(t) + defer storage.Close() + + series1 := labels.FromStrings("__name__", "some_metric", "env", "1") + series2 := labels.FromStrings("__name__", "some_metric", "env", "2") + baseT := time.Date(2024, 5, 1, 0, 0, 0, 0, time.UTC) + + a := storage.Appender(context.Background()) + for seriesIdx, series := range []labels.Labels{series1, series2} { + for timeStep := 0; timeStep < 5; timeStep++ { + ts := baseT.Add(time.Duration(timeStep) * time.Minute) + v := float64(timeStep * (seriesIdx + 1)) + _, err := a.Append(0, series, timestamp.FromTime(ts), v) + require.NoError(t, err) + } + } + + require.NoError(t, a.Commit()) + + testCases := map[string]struct { + expr string + expected *promql.Result + ts time.Time + }{ + "matches series with points in range": { + expr: "some_metric[1m]", + ts: baseT.Add(2 * time.Minute), + expected: &promql.Result{ + Value: promql.Matrix{ + { + Metric: series1, + Floats: []promql.FPoint{ + {T: timestamp.FromTime(baseT.Add(time.Minute)), F: 1}, + {T: timestamp.FromTime(baseT.Add(2 * time.Minute)), F: 2}, + }, + }, + { + Metric: series2, + Floats: []promql.FPoint{ + {T: timestamp.FromTime(baseT.Add(time.Minute)), F: 2}, + {T: timestamp.FromTime(baseT.Add(2 * time.Minute)), F: 4}, + }, + }, + }, + }, + }, + "matches no series": { + expr: "some_nonexistent_metric[1m]", + ts: baseT, + expected: &promql.Result{ + Value: promql.Matrix{}, + }, + }, + "no samples in range": { + expr: "some_nonexistent_metric[1m]", + ts: baseT.Add(20 * time.Minute), + expected: &promql.Result{ + Value: promql.Matrix{}, + }, + }, + } + + for name, testCase := range testCases { + t.Run(name, func(t *testing.T) { + runTest := func(t *testing.T, eng promql.QueryEngine, expr string, ts time.Time, expected *promql.Result) { + q, err := eng.NewInstantQuery(context.Background(), storage, nil, expr, ts) + require.NoError(t, err) + defer q.Close() + + res := q.Exec(context.Background()) + require.Equal(t, expected, res) + } + + t.Run("streaming engine", func(t *testing.T) { + runTest(t, streamingEngine, testCase.expr, testCase.ts, testCase.expected) + }) + + // Run the tests against Prometheus' engine to ensure our test cases are valid. + t.Run("Prometheus' engine", func(t *testing.T) { + runTest(t, prometheusEngine, testCase.expr, testCase.ts, testCase.expected) + }) + }) + } +} diff --git a/pkg/streamingpromql/query.go b/pkg/streamingpromql/query.go index fc92243f792..72a536f5826 100644 --- a/pkg/streamingpromql/query.go +++ b/pkg/streamingpromql/query.go @@ -25,7 +25,7 @@ type Query struct { queryable storage.Queryable opts promql.QueryOpts statement *parser.EvalStmt - root operator.InstantVectorOperator + root operator.Operator engine *Engine qs string @@ -64,15 +64,29 @@ func newQuery(queryable storage.Queryable, opts promql.QueryOpts, qs string, sta } } - q.root, err = q.convertToOperator(expr) - if err != nil { - return nil, err + switch expr.Type() { + case parser.ValueTypeMatrix: + q.root, err = q.convertToRangeVectorOperator(expr) + if err != nil { + return nil, err + } + case parser.ValueTypeVector: + q.root, err = q.convertToInstantVectorOperator(expr) + if err != nil { + return nil, err + } + default: + return nil, NewNotSupportedError(fmt.Sprintf("%s value as top-level expression", parser.DocumentedType(expr.Type()))) } return q, nil } -func (q *Query) convertToOperator(expr parser.Expr) (operator.InstantVectorOperator, error) { +func (q *Query) convertToInstantVectorOperator(expr parser.Expr) (operator.InstantVectorOperator, error) { + if expr.Type() != parser.ValueTypeVector { + return nil, fmt.Errorf("cannot create instant vector operator for expression that produces a %s", parser.DocumentedType(expr.Type())) + } + interval := q.statement.Interval if q.IsInstant() { @@ -117,7 +131,7 @@ func (q *Query) convertToOperator(expr parser.Expr) (operator.InstantVectorOpera slices.Sort(e.Grouping) - inner, err := q.convertToOperator(e.Expr) + inner, err := q.convertToInstantVectorOperator(e.Expr) if err != nil { return nil, err } @@ -139,36 +153,59 @@ func (q *Query) convertToOperator(expr parser.Expr) (operator.InstantVectorOpera return nil, fmt.Errorf("expected exactly one argument for rate, got %v", len(e.Args)) } - matrixSelector, ok := e.Args[0].(*parser.MatrixSelector) - if !ok { - // Should be caught by the PromQL parser, but we check here for safety. - return nil, NewNotSupportedError(fmt.Sprintf("unsupported rate argument type %T", e.Args[0])) + inner, err := q.convertToRangeVectorOperator(e.Args[0]) + if err != nil { + return nil, err } - vectorSelector := matrixSelector.VectorSelector.(*parser.VectorSelector) + return &operator.RangeVectorFunction{ + Inner: inner, + }, nil + case *parser.StepInvariantExpr: + // One day, we'll do something smarter here. + return q.convertToInstantVectorOperator(e.Expr) + case *parser.ParenExpr: + return q.convertToInstantVectorOperator(e.Expr) + default: + return nil, NewNotSupportedError(fmt.Sprintf("PromQL expression type %T", e)) + } +} + +func (q *Query) convertToRangeVectorOperator(expr parser.Expr) (operator.RangeVectorOperator, error) { + if expr.Type() != parser.ValueTypeMatrix { + return nil, fmt.Errorf("cannot create range vector operator for expression that produces a %s", parser.DocumentedType(expr.Type())) + } + + switch e := expr.(type) { + case *parser.MatrixSelector: + vectorSelector := e.VectorSelector.(*parser.VectorSelector) if vectorSelector.OriginalOffset != 0 || vectorSelector.Offset != 0 { return nil, NewNotSupportedError("range vector selector with 'offset'") } - return &operator.RangeVectorFunction{ - Inner: &operator.RangeVectorSelector{ - Selector: &operator.Selector{ - Queryable: q.queryable, - Start: timestamp.FromTime(q.statement.Start), - End: timestamp.FromTime(q.statement.End), - Timestamp: vectorSelector.Timestamp, - Interval: interval.Milliseconds(), - Range: matrixSelector.Range, - Matchers: vectorSelector.LabelMatchers, - }, + interval := q.statement.Interval + + if q.IsInstant() { + interval = time.Millisecond + } + + return &operator.RangeVectorSelector{ + Selector: &operator.Selector{ + Queryable: q.queryable, + Start: timestamp.FromTime(q.statement.Start), + End: timestamp.FromTime(q.statement.End), + Timestamp: vectorSelector.Timestamp, + Interval: interval.Milliseconds(), + Range: e.Range, + Matchers: vectorSelector.LabelMatchers, }, }, nil case *parser.StepInvariantExpr: // One day, we'll do something smarter here. - return q.convertToOperator(e.Expr) + return q.convertToRangeVectorOperator(e.Expr) case *parser.ParenExpr: - return q.convertToOperator(e.Expr) + return q.convertToRangeVectorOperator(e.Expr) default: return nil, NewNotSupportedError(fmt.Sprintf("PromQL expression type %T", e)) } @@ -187,31 +224,44 @@ func (q *Query) Exec(ctx context.Context) *promql.Result { } defer operator.PutSeriesMetadataSlice(series) - if q.IsInstant() { - v, err := q.populateVector(ctx, series) + switch q.statement.Expr.Type() { + case parser.ValueTypeMatrix: + v, err := q.populateMatrixFromRangeVectorOperator(ctx, q.root.(operator.RangeVectorOperator), series) if err != nil { return &promql.Result{Err: err} } q.result = &promql.Result{Value: v} - } else { - m, err := q.populateMatrix(ctx, series) - if err != nil { - return &promql.Result{Value: m} - } + case parser.ValueTypeVector: + if q.IsInstant() { + v, err := q.populateVectorFromInstantVectorOperator(ctx, q.root.(operator.InstantVectorOperator), series) + if err != nil { + return &promql.Result{Err: err} + } - q.result = &promql.Result{Value: m} + q.result = &promql.Result{Value: v} + } else { + v, err := q.populateMatrixFromInstantVectorOperator(ctx, q.root.(operator.InstantVectorOperator), series) + if err != nil { + return &promql.Result{Err: err} + } + + q.result = &promql.Result{Value: v} + } + default: + // This should be caught in newQuery above. + return &promql.Result{Err: NewNotSupportedError(fmt.Sprintf("unsupported result type %s", parser.DocumentedType(q.statement.Expr.Type())))} } return q.result } -func (q *Query) populateVector(ctx context.Context, series []operator.SeriesMetadata) (promql.Vector, error) { +func (q *Query) populateVectorFromInstantVectorOperator(ctx context.Context, o operator.InstantVectorOperator, series []operator.SeriesMetadata) (promql.Vector, error) { ts := timeMilliseconds(q.statement.Start) v := operator.GetVector(len(series)) for i, s := range series { - d, err := q.root.Next(ctx) + d, err := o.Next(ctx) if err != nil { if errors.Is(err, operator.EOS) { return nil, fmt.Errorf("expected %v series, but only received %v", len(series), i) @@ -245,11 +295,11 @@ func (q *Query) populateVector(ctx context.Context, series []operator.SeriesMeta return v, nil } -func (q *Query) populateMatrix(ctx context.Context, series []operator.SeriesMetadata) (promql.Matrix, error) { +func (q *Query) populateMatrixFromInstantVectorOperator(ctx context.Context, o operator.InstantVectorOperator, series []operator.SeriesMetadata) (promql.Matrix, error) { m := operator.GetMatrix(len(series)) for i, s := range series { - d, err := q.root.Next(ctx) + d, err := o.Next(ctx) if err != nil { if errors.Is(err, operator.EOS) { return nil, fmt.Errorf("expected %v series, but only received %v", len(series), i) @@ -275,6 +325,36 @@ func (q *Query) populateMatrix(ctx context.Context, series []operator.SeriesMeta return m, nil } +func (q *Query) populateMatrixFromRangeVectorOperator(ctx context.Context, o operator.RangeVectorOperator, series []operator.SeriesMetadata) (promql.Matrix, error) { + m := operator.GetMatrix(len(series)) + b := &operator.RingBuffer{} + defer b.Close() + + for i, s := range series { + err := o.Next(ctx) + if err != nil { + if errors.Is(err, operator.EOS) { + return nil, fmt.Errorf("expected %v series, but only received %v", len(series), i) + } + + return nil, err + } + + b.Reset() + _, err = o.NextStep(b) + if err != nil { + return nil, err + } + + m = append(m, promql.Series{ + Metric: s.Labels, + Floats: b.CopyPoints(), + }) + } + + return m, nil +} + func (q *Query) Close() { if q.result == nil { return