Skip to content

Commit

Permalink
Add support for range vector selectors as the top-level expression.
Browse files Browse the repository at this point in the history
  • Loading branch information
charleskorn committed May 2, 2024
1 parent 2935a5f commit bd6c902
Show file tree
Hide file tree
Showing 3 changed files with 241 additions and 56 deletions.
18 changes: 14 additions & 4 deletions pkg/streamingpromql/benchmarks/benchmarks.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,9 @@ import (
var MetricSizes = []int{1, 100, 2000}

type BenchCase struct {
Expr string
Steps int
Expr string
Steps int
InstantQueryOnly bool
}

func (c BenchCase) Name() string {
Expand Down Expand Up @@ -75,6 +76,11 @@ func TestCases(metricSizes []int) []BenchCase {
{
Expr: "a_X",
},
// Range vector selector.
{
Expr: "a_X[1m]",
InstantQueryOnly: true,
},
// Simple rate.
{
Expr: "rate(a_X[1m])",
Expand Down Expand Up @@ -198,7 +204,7 @@ func TestCases(metricSizes []int) []BenchCase {
tmp = append(tmp, c)
} else {
for _, count := range metricSizes {
tmp = append(tmp, BenchCase{Expr: strings.ReplaceAll(c.Expr, "X", strconv.Itoa(count)), Steps: c.Steps})
tmp = append(tmp, BenchCase{Expr: strings.ReplaceAll(c.Expr, "X", strconv.Itoa(count)), Steps: c.Steps, InstantQueryOnly: c.InstantQueryOnly})
}
}
}
Expand All @@ -207,7 +213,11 @@ func TestCases(metricSizes []int) []BenchCase {
// No step will be replaced by cases with the standard step.
tmp = []BenchCase{}
for _, c := range cases {
if c.Steps != 0 {
if c.Steps != 0 || c.InstantQueryOnly {
if c.InstantQueryOnly && c.Steps != 0 {
panic(fmt.Sprintf("invalid test case '%v': configured as instant query with non-zero number of steps %v", c.Expr, c.Steps))
}

if c.Steps >= NumIntervals {
// Note that this doesn't check we have enough data to cover any range selectors.
panic(fmt.Sprintf("invalid test case '%v' with %v steps: test setup only creates %v steps", c.Expr, c.Steps, NumIntervals))
Expand Down
127 changes: 111 additions & 16 deletions pkg/streamingpromql/engine_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,10 @@ import (
"testing"
"time"

"github.com/prometheus/prometheus/model/labels"
"github.com/prometheus/prometheus/model/timestamp"
"github.com/prometheus/prometheus/promql"
"github.com/prometheus/prometheus/util/teststorage"
"github.com/stretchr/testify/require"
)

Expand All @@ -23,16 +26,16 @@ func TestUnsupportedPromQLFeatures(t *testing.T) {
// The goal of this is not to list every conceivable expression that is unsupported, but to cover all the
// different cases and make sure we produce a reasonable error message when these cases are encountered.
unsupportedExpressions := map[string]string{
"a + b": "PromQL expression type *parser.BinaryExpr",
"1 + 2": "PromQL expression type *parser.BinaryExpr",
"metric{} + other_metric{}": "PromQL expression type *parser.BinaryExpr",
"1": "PromQL expression type *parser.NumberLiteral",
"metric{} offset 2h": "instant vector selector with 'offset'",
"avg(metric{})": "'avg' aggregation",
"sum without(l) (metric{})": "grouping with 'without'",
"rate(metric{}[5m] offset 2h)": "range vector selector with 'offset'",
"avg_over_time(metric{}[5m])": "'avg_over_time' function",
"-sum(metric{})": "PromQL expression type *parser.UnaryExpr",
"a + b": "PromQL expression type *parser.BinaryExpr",
"1 + 2": "scalar value as top-level expression",
"metric{} + other_metric{}": "PromQL expression type *parser.BinaryExpr",
"1": "scalar value as top-level expression",
"metric{} offset 2h": "instant vector selector with 'offset'",
"avg(metric{})": "'avg' aggregation",
"sum without(l) (metric{})": "grouping with 'without'",
"rate(metric{}[5m:1m])": "PromQL expression type *parser.SubqueryExpr",
"avg_over_time(metric{}[5m])": "'avg_over_time' function",
"-sum(metric{})": "PromQL expression type *parser.UnaryExpr",
}

for expression, expectedError := range unsupportedExpressions {
Expand All @@ -53,12 +56,8 @@ func TestUnsupportedPromQLFeatures(t *testing.T) {

// These expressions are also unsupported, but are only valid as instant queries.
unsupportedInstantQueryExpressions := map[string]string{
"'a'": "PromQL expression type *parser.StringLiteral",
"metric{}[5m]": "PromQL expression type *parser.MatrixSelector",
"metric{}[5m] offset 2h": "PromQL expression type *parser.MatrixSelector",
"metric{}[5m] @ 123": "PromQL expression type *parser.MatrixSelector",
"metric{}[5m] @ start()": "PromQL expression type *parser.MatrixSelector",
"metric{}[5m] @ end()": "PromQL expression type *parser.MatrixSelector",
"'a'": "string value as top-level expression",
"metric{}[5m] offset 2h": "range vector selector with 'offset'",
"metric{}[5m:1m]": "PromQL expression type *parser.SubqueryExpr",
}

Expand Down Expand Up @@ -159,3 +158,99 @@ func TestOurTestCases(t *testing.T) {
})
}
}

// Testing instant queries that return a range vector is not supported by Prometheus' PromQL testing framework,
// and adding support for this would be quite involved.
//
// So instead, we test these few cases here instead.
func TestRangeVectorSelectors(t *testing.T) {
opts := NewTestEngineOpts()
streamingEngine, err := NewEngine(opts)
require.NoError(t, err)

prometheusEngine := promql.NewEngine(opts)

storage := teststorage.New(t)
defer storage.Close()

series1 := labels.FromStrings("__name__", "some_metric", "env", "1")
series2 := labels.FromStrings("__name__", "some_metric", "env", "2")
baseT := time.Date(2024, 5, 1, 0, 0, 0, 0, time.UTC)

a := storage.Appender(context.Background())
for seriesIdx, series := range []labels.Labels{series1, series2} {
for timeStep := 0; timeStep < 5; timeStep++ {
ts := baseT.Add(time.Duration(timeStep) * time.Minute)
v := float64(timeStep * (seriesIdx + 1))
_, err := a.Append(0, series, timestamp.FromTime(ts), v)
require.NoError(t, err)
}
}

require.NoError(t, a.Commit())

testCases := map[string]struct {
expr string
expected *promql.Result
ts time.Time
}{
"matches series with points in range": {
expr: "some_metric[1m]",
ts: baseT.Add(2 * time.Minute),
expected: &promql.Result{
Value: promql.Matrix{
{
Metric: series1,
Floats: []promql.FPoint{
{T: timestamp.FromTime(baseT.Add(time.Minute)), F: 1},
{T: timestamp.FromTime(baseT.Add(2 * time.Minute)), F: 2},
},
},
{
Metric: series2,
Floats: []promql.FPoint{
{T: timestamp.FromTime(baseT.Add(time.Minute)), F: 2},
{T: timestamp.FromTime(baseT.Add(2 * time.Minute)), F: 4},
},
},
},
},
},
"matches no series": {
expr: "some_nonexistent_metric[1m]",
ts: baseT,
expected: &promql.Result{
Value: promql.Matrix{},
},
},
"no samples in range": {
expr: "some_nonexistent_metric[1m]",
ts: baseT.Add(20 * time.Minute),
expected: &promql.Result{
Value: promql.Matrix{},
},
},
}

for name, testCase := range testCases {
t.Run(name, func(t *testing.T) {
runTest := func(t *testing.T, eng promql.QueryEngine, expr string, ts time.Time, expected *promql.Result) {
q, err := eng.NewInstantQuery(context.Background(), storage, nil, expr, ts)
require.NoError(t, err)
defer q.Close()

res := q.Exec(context.Background())
require.Equal(t, expected, res)
}

t.Run("streaming engine", func(t *testing.T) {
runTest(t, streamingEngine, testCase.expr, testCase.ts, testCase.expected)
})

// Run the tests against Prometheus' engine to ensure our test cases are valid.
t.Run("Prometheus' engine", func(t *testing.T) {
runTest(t, prometheusEngine, testCase.expr, testCase.ts, testCase.expected)
})
})
}
}
Loading

0 comments on commit bd6c902

Please sign in to comment.