Skip to content

Commit

Permalink
Streaming PromQL engine: add support for range vector selectors as th…
Browse files Browse the repository at this point in the history
…e top-level expression (#8023)

* Introduce RangeVectorOperator interface and split RangeVectorSelectorWithTransformation into two types.

* Add more explanation for RangeVectorStepData.

* Improve performance of rate() calculations by exposing range vector operator range rather than calculating it at each time step.

* Add changelog entry

* Rename RingBuffer.Points() to HeadAndTail(), and add CopyPoints() method.

* Add support for range vector selectors as the top-level expression.

* Remove outdated comment

* Clarify more comments.

* Simplify test setup

* Ensure matrix series are always sorted by labels

* Rename `Next` to `NextSeries`

* Document whether RangeStart and RangeEnd are inclusive or exclusive.

* Document when Selector.Timestamp is set.

* Rename `NextStep` to `NextStepSamples`

* Rename `RingBuffer.HeadAndTail` to `RingBuffer.UnsafePoints`

* Remove unnecessary checks in `Close` methods

* Fix issue where range vector selectors incorrectly return points from outside a step's range, or `rate` uses points from outside a step's range

* Add tests to verify behaviour when a series contains a stale marker.

* Address PR feedback: name return parameters

Co-authored-by: Marco Pracucci <marco@pracucci.com>

* Address PR feedback: query metric that exists but has no samples in range

---------

Co-authored-by: Marco Pracucci <marco@pracucci.com>
  • Loading branch information
charleskorn and pracucci authored May 9, 2024
1 parent 1126f0c commit b03ecce
Show file tree
Hide file tree
Showing 16 changed files with 729 additions and 305 deletions.
2 changes: 1 addition & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
* [FEATURE] Continuous-test: now runable as a module with `mimir -target=continuous-test`. #7747
* [FEATURE] Store-gateway: Allow specific tenants to be enabled or disabled via `-store-gateway.enabled-tenants` or `-store-gateway.disabled-tenants` CLI flags or their corresponding YAML settings. #7653
* [FEATURE] New `-<prefix>.s3.bucket-lookup-type` flag configures lookup style type, used to access bucket in s3 compatible providers. #7684
* [FEATURE] Querier: add experimental streaming PromQL engine, enabled with `-querier.promql-engine=streaming`. #7693 #7898 #7899 #8058
* [FEATURE] Querier: add experimental streaming PromQL engine, enabled with `-querier.promql-engine=streaming`. #7693 #7898 #7899 #8023 #8058
* [FEATURE] New `/ingester/unregister-on-shutdown` HTTP endpoint allows dynamic access to ingesters' `-ingester.ring.unregister-on-shutdown` configuration. #7739
* [FEATURE] Server: added experimental [PROXY protocol support](https://www.haproxy.org/download/2.3/doc/proxy-protocol.txt). The PROXY protocol support can be enabled via `-server.proxy-protocol-enabled=true`. When enabled, the support is added both to HTTP and gRPC listening ports. #7698
* [ENHANCEMENT] Reduced memory allocations in functions used to propagate contextual information between gRPC calls. #7529
Expand Down
16 changes: 8 additions & 8 deletions pkg/streamingpromql/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -53,13 +53,13 @@ flowchart TB
```

Each of these operators satisfies the `InstantVectorOperator` interface, defined [here](./operator/operator.go).
The two key methods of this interface are `SeriesMetadata()` and `Next()`:
The two key methods of this interface are `SeriesMetadata()` and `NextSeries()`:

`SeriesMetadata()` returns the list of all series' labels that will be returned by the operator[^2].
In our example, the instant vector selector operator would return all the matching `some_metric` series, and the `sum` aggregation operator would return one series for each unique value of `environment`.

`Next()` is then called by the consuming operator to read each series' data, one series at a time.
In our example, the `sum` aggregation operator would call `Next()` on the instant vector selector operator to get the first series' data, then again to get the second series' data and so on.
`NextSeries()` is then called by the consuming operator to read each series' data, one series at a time.
In our example, the `sum` aggregation operator would call `NextSeries()` on the instant vector selector operator to get the first series' data, then again to get the second series' data and so on.

Elaborating on the example from before, the overall query would proceed like this, assuming the request is received over HTTP:

Expand All @@ -75,16 +75,16 @@ Elaborating on the example from before, the overall query would proceed like thi
1. `sum` aggregation operator computes output series (one per unique value of `environment`) based on input series from instant vector selector
1. `max` aggregation operator computes output series based on input series from `sum` aggregation operator
- in this case, there's just one output series, given no grouping is being performed
1. root of the query calls `Next()` on `max` aggregation operator until all series have been returned
1. `max` aggregation operator calls `Next()` on `sum` aggregation operator
1. `sum` aggregation operator calls `Next()` on instant vector selector operator
1. root of the query calls `NextSeries()` on `max` aggregation operator until all series have been returned
1. `max` aggregation operator calls `NextSeries()` on `sum` aggregation operator
1. `sum` aggregation operator calls `NextSeries()` on instant vector selector operator
- instant vector selector returns samples for next series
1. `sum` aggregation operator updates its running totals for the relevant output series
1. if all input series have now been seen for the output series just updated, `sum` aggregation operator returns that output series and removes it from its internal state
1. otherwise, it calls `Next()` again and repeats
1. otherwise, it calls `NextSeries()` again and repeats
1. `max` aggregation operator updates its running maximum based on the series returned
1. if all input series have been seen, `max` aggregation operator returns
1. otherwise, it calls `Next()` again and repeats
1. otherwise, it calls `NextSeries()` again and repeats
1. query HTTP API handler converts returned result to wire format (either JSON or Protobuf) and sends to caller
1. query HTTP API handler calls `Query.Close()` to release remaining resources

Expand Down
18 changes: 14 additions & 4 deletions pkg/streamingpromql/benchmarks/benchmarks.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,9 @@ import (
var MetricSizes = []int{1, 100, 2000}

type BenchCase struct {
Expr string
Steps int
Expr string
Steps int
InstantQueryOnly bool
}

func (c BenchCase) Name() string {
Expand Down Expand Up @@ -75,6 +76,11 @@ func TestCases(metricSizes []int) []BenchCase {
{
Expr: "a_X",
},
// Range vector selector.
{
Expr: "a_X[1m]",
InstantQueryOnly: true,
},
// Simple rate.
{
Expr: "rate(a_X[1m])",
Expand Down Expand Up @@ -198,7 +204,7 @@ func TestCases(metricSizes []int) []BenchCase {
tmp = append(tmp, c)
} else {
for _, count := range metricSizes {
tmp = append(tmp, BenchCase{Expr: strings.ReplaceAll(c.Expr, "X", strconv.Itoa(count)), Steps: c.Steps})
tmp = append(tmp, BenchCase{Expr: strings.ReplaceAll(c.Expr, "X", strconv.Itoa(count)), Steps: c.Steps, InstantQueryOnly: c.InstantQueryOnly})
}
}
}
Expand All @@ -207,7 +213,11 @@ func TestCases(metricSizes []int) []BenchCase {
// No step will be replaced by cases with the standard step.
tmp = []BenchCase{}
for _, c := range cases {
if c.Steps != 0 {
if c.Steps != 0 || c.InstantQueryOnly {
if c.InstantQueryOnly && c.Steps != 0 {
panic(fmt.Sprintf("invalid test case '%v': configured as instant query with non-zero number of steps %v", c.Expr, c.Steps))
}

if c.Steps >= NumIntervals {
// Note that this doesn't check we have enough data to cover any range selectors.
panic(fmt.Sprintf("invalid test case '%v' with %v steps: test setup only creates %v steps", c.Expr, c.Steps, NumIntervals))
Expand Down
147 changes: 131 additions & 16 deletions pkg/streamingpromql/engine_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@ import (
"testing"
"time"

"github.com/prometheus/prometheus/model/labels"
"github.com/prometheus/prometheus/model/timestamp"
"github.com/prometheus/prometheus/promql"
"github.com/stretchr/testify/require"
)
Expand All @@ -23,16 +25,16 @@ func TestUnsupportedPromQLFeatures(t *testing.T) {
// The goal of this is not to list every conceivable expression that is unsupported, but to cover all the
// different cases and make sure we produce a reasonable error message when these cases are encountered.
unsupportedExpressions := map[string]string{
"a + b": "PromQL expression type *parser.BinaryExpr",
"1 + 2": "PromQL expression type *parser.BinaryExpr",
"metric{} + other_metric{}": "PromQL expression type *parser.BinaryExpr",
"1": "PromQL expression type *parser.NumberLiteral",
"metric{} offset 2h": "instant vector selector with 'offset'",
"avg(metric{})": "'avg' aggregation",
"sum without(l) (metric{})": "grouping with 'without'",
"rate(metric{}[5m] offset 2h)": "range vector selector with 'offset'",
"avg_over_time(metric{}[5m])": "'avg_over_time' function",
"-sum(metric{})": "PromQL expression type *parser.UnaryExpr",
"a + b": "PromQL expression type *parser.BinaryExpr",
"1 + 2": "scalar value as top-level expression",
"metric{} + other_metric{}": "PromQL expression type *parser.BinaryExpr",
"1": "scalar value as top-level expression",
"metric{} offset 2h": "instant vector selector with 'offset'",
"avg(metric{})": "'avg' aggregation",
"sum without(l) (metric{})": "grouping with 'without'",
"rate(metric{}[5m:1m])": "PromQL expression type *parser.SubqueryExpr",
"avg_over_time(metric{}[5m])": "'avg_over_time' function",
"-sum(metric{})": "PromQL expression type *parser.UnaryExpr",
}

for expression, expectedError := range unsupportedExpressions {
Expand All @@ -53,12 +55,8 @@ func TestUnsupportedPromQLFeatures(t *testing.T) {

// These expressions are also unsupported, but are only valid as instant queries.
unsupportedInstantQueryExpressions := map[string]string{
"'a'": "PromQL expression type *parser.StringLiteral",
"metric{}[5m]": "PromQL expression type *parser.MatrixSelector",
"metric{}[5m] offset 2h": "PromQL expression type *parser.MatrixSelector",
"metric{}[5m] @ 123": "PromQL expression type *parser.MatrixSelector",
"metric{}[5m] @ start()": "PromQL expression type *parser.MatrixSelector",
"metric{}[5m] @ end()": "PromQL expression type *parser.MatrixSelector",
"'a'": "string value as top-level expression",
"metric{}[5m] offset 2h": "range vector selector with 'offset'",
"metric{}[5m:1m]": "PromQL expression type *parser.SubqueryExpr",
}

Expand Down Expand Up @@ -159,3 +157,120 @@ func TestOurTestCases(t *testing.T) {
})
}
}

// Testing instant queries that return a range vector is not supported by Prometheus' PromQL testing framework,
// and adding support for this would be quite involved.
//
// So instead, we test these few cases here instead.
func TestRangeVectorSelectors(t *testing.T) {
opts := NewTestEngineOpts()
streamingEngine, err := NewEngine(opts)
require.NoError(t, err)

prometheusEngine := promql.NewEngine(opts)

baseT := timestamp.Time(0)
storage := promql.LoadedStorage(t, `
load 1m
some_metric{env="1"} 0+1x4
some_metric{env="2"} 0+2x4
some_metric_with_gaps 0 1 _ 3
some_metric_with_stale_marker 0 1 stale 3
`)
t.Cleanup(func() { require.NoError(t, storage.Close()) })

testCases := map[string]struct {
expr string
expected *promql.Result
ts time.Time
}{
"matches series with points in range": {
expr: "some_metric[1m]",
ts: baseT.Add(2 * time.Minute),
expected: &promql.Result{
Value: promql.Matrix{
{
Metric: labels.FromStrings("__name__", "some_metric", "env", "1"),
Floats: []promql.FPoint{
{T: timestamp.FromTime(baseT.Add(time.Minute)), F: 1},
{T: timestamp.FromTime(baseT.Add(2 * time.Minute)), F: 2},
},
},
{
Metric: labels.FromStrings("__name__", "some_metric", "env", "2"),
Floats: []promql.FPoint{
{T: timestamp.FromTime(baseT.Add(time.Minute)), F: 2},
{T: timestamp.FromTime(baseT.Add(2 * time.Minute)), F: 4},
},
},
},
},
},
"matches no series": {
expr: "some_nonexistent_metric[1m]",
ts: baseT,
expected: &promql.Result{
Value: promql.Matrix{},
},
},
"no samples in range": {
expr: "some_metric[1m]",
ts: baseT.Add(20 * time.Minute),
expected: &promql.Result{
Value: promql.Matrix{},
},
},
"does not return points outside range if last selected point does not align to end of range": {
expr: "some_metric_with_gaps[1m]",
ts: baseT.Add(2 * time.Minute),
expected: &promql.Result{
Value: promql.Matrix{
{
Metric: labels.FromStrings("__name__", "some_metric_with_gaps"),
Floats: []promql.FPoint{
{T: timestamp.FromTime(baseT.Add(time.Minute)), F: 1},
},
},
},
},
},
"metric with stale marker": {
expr: "some_metric_with_stale_marker[3m]",
ts: baseT.Add(3 * time.Minute),
expected: &promql.Result{
Value: promql.Matrix{
{
Metric: labels.FromStrings("__name__", "some_metric_with_stale_marker"),
Floats: []promql.FPoint{
{T: timestamp.FromTime(baseT), F: 0},
{T: timestamp.FromTime(baseT.Add(time.Minute)), F: 1},
{T: timestamp.FromTime(baseT.Add(3 * time.Minute)), F: 3},
},
},
},
},
},
}

for name, testCase := range testCases {
t.Run(name, func(t *testing.T) {
runTest := func(t *testing.T, eng promql.QueryEngine, expr string, ts time.Time, expected *promql.Result) {
q, err := eng.NewInstantQuery(context.Background(), storage, nil, expr, ts)
require.NoError(t, err)
defer q.Close()

res := q.Exec(context.Background())
require.Equal(t, expected, res)
}

t.Run("streaming engine", func(t *testing.T) {
runTest(t, streamingEngine, testCase.expr, testCase.ts, testCase.expected)
})

// Run the tests against Prometheus' engine to ensure our test cases are valid.
t.Run("Prometheus' engine", func(t *testing.T) {
runTest(t, prometheusEngine, testCase.expr, testCase.ts, testCase.expected)
})
})
}
}
4 changes: 2 additions & 2 deletions pkg/streamingpromql/operator/aggregation.go
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,7 @@ func (a *Aggregation) labelsForGroup(m labels.Labels, lb *labels.Builder) labels
return lb.Labels()
}

func (a *Aggregation) Next(ctx context.Context) (InstantVectorSeriesData, error) {
func (a *Aggregation) NextSeries(ctx context.Context) (InstantVectorSeriesData, error) {
if len(a.remainingGroups) == 0 {
// No more groups left.
return InstantVectorSeriesData{}, EOS
Expand All @@ -135,7 +135,7 @@ func (a *Aggregation) Next(ctx context.Context) (InstantVectorSeriesData, error)

// Iterate through inner series until the desired group is complete
for thisGroup.remainingSeriesCount > 0 {
s, err := a.Inner.Next(ctx)
s, err := a.Inner.NextSeries(ctx)

if err != nil {
if errors.Is(err, EOS) {
Expand Down
4 changes: 2 additions & 2 deletions pkg/streamingpromql/operator/aggregation_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -106,8 +106,8 @@ func (t *testOperator) SeriesMetadata(_ context.Context) ([]SeriesMetadata, erro
return labelsToSeriesMetadata(t.series), nil
}

func (t *testOperator) Next(_ context.Context) (InstantVectorSeriesData, error) {
panic("Next() not supported")
func (t *testOperator) NextSeries(_ context.Context) (InstantVectorSeriesData, error) {
panic("NextSeries() not supported")
}

func (t *testOperator) Close() {
Expand Down
8 changes: 3 additions & 5 deletions pkg/streamingpromql/operator/instant_vector_selector.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,13 +29,13 @@ type InstantVectorSelector struct {
var _ InstantVectorOperator = &InstantVectorSelector{}

func (v *InstantVectorSelector) SeriesMetadata(ctx context.Context) ([]SeriesMetadata, error) {
// Compute value we need on every call to Next() once, here.
// Compute value we need on every call to NextSeries() once, here.
v.numSteps = stepCount(v.Selector.Start, v.Selector.End, v.Selector.Interval)

return v.Selector.SeriesMetadata(ctx)
}

func (v *InstantVectorSelector) Next(_ context.Context) (InstantVectorSeriesData, error) {
func (v *InstantVectorSelector) NextSeries(_ context.Context) (InstantVectorSeriesData, error) {
if v.memoizedIterator == nil {
v.memoizedIterator = storage.NewMemoizedEmptyIterator(v.Selector.LookbackDelta.Milliseconds())
}
Expand Down Expand Up @@ -100,7 +100,5 @@ func (v *InstantVectorSelector) Next(_ context.Context) (InstantVectorSeriesData
}

func (v *InstantVectorSelector) Close() {
if v.Selector != nil {
v.Selector.Close()
}
v.Selector.Close()
}
Loading

0 comments on commit b03ecce

Please sign in to comment.