From a9a5ff3dd4f97440ae0d43265aeebcebd3789954 Mon Sep 17 00:00:00 2001 From: Charles Korn Date: Tue, 26 Nov 2024 15:25:47 +1100 Subject: [PATCH] MQE: mangle native histograms in returned slices to catch use-after-return bugs + fix issues found (#10010) * Mangle histograms in returned slices to catch use-after-return bugs * Fix issue where native histograms returned from an instant query could be corrupted * Ensure queries evaluated in `TestSubqueries` are cleaned up * Fix issue where native histograms returned `sum` or `avg` could be corrupted * Add changelog entry # Conflicts: # CHANGELOG.md * Add test for mangling behaviour * Address PR feedback --- CHANGELOG.md | 2 +- pkg/streamingpromql/engine_test.go | 5 ++ .../operators/aggregations/avg.go | 3 ++ .../operators/aggregations/sum.go | 3 ++ pkg/streamingpromql/query.go | 3 ++ pkg/streamingpromql/types/limiting_pool.go | 47 ++++++++++++++++++- .../types/limiting_pool_test.go | 38 +++++++++++++++ 7 files changed, 99 insertions(+), 2 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 4b993c3e115..cb3c177074a 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -19,7 +19,7 @@ * [CHANGE] Ingester: remove experimental flags `-ingest-storage.kafka.ongoing-records-per-fetch` and `-ingest-storage.kafka.startup-records-per-fetch`. They are removed in favour of `-ingest-storage.kafka.max-buffered-bytes`. #9906 * [CHANGE] Ingester: Replace `cortex_discarded_samples_total` label from `sample-out-of-bounds` to `sample-timestamp-too-old`. #9885 * [CHANGE] Ruler: the `/prometheus/config/v1/rules` does not return an error anymore if a rule group is missing in the object storage after been successfully returned by listing the storage, because it could have been deleted in the meanwhile. #9936 -* [FEATURE] Querier: add experimental streaming PromQL engine, enabled with `-querier.query-engine=mimir`. #9367 #9368 #9398 #9399 #9403 #9417 #9418 #9419 #9420 #9482 #9504 #9505 #9507 #9518 #9531 #9532 #9533 #9553 #9558 #9588 #9589 #9639 #9641 #9642 #9651 #9664 #9681 #9717 #9719 #9724 #9874 #9998 #10007 +* [FEATURE] Querier: add experimental streaming PromQL engine, enabled with `-querier.query-engine=mimir`. #9367 #9368 #9398 #9399 #9403 #9417 #9418 #9419 #9420 #9482 #9504 #9505 #9507 #9518 #9531 #9532 #9533 #9553 #9558 #9588 #9589 #9639 #9641 #9642 #9651 #9664 #9681 #9717 #9719 #9724 #9874 #9998 #10007 #10010 * [FEATURE] Distributor: Add support for `lz4` OTLP compression. #9763 * [FEATURE] Query-frontend: added experimental configuration options `query-frontend.cache-errors` and `query-frontend.results-cache-ttl-for-errors` to allow non-transient responses to be cached. When set to `true` error responses from hitting limits or bad data are cached for a short TTL. #9028 * [FEATURE] Query-frontend: add middleware to control access to specific PromQL experimental functions on a per-tenant basis. #9798 diff --git a/pkg/streamingpromql/engine_test.go b/pkg/streamingpromql/engine_test.go index 49c6098117f..fa4a3555ccd 100644 --- a/pkg/streamingpromql/engine_test.go +++ b/pkg/streamingpromql/engine_test.go @@ -40,6 +40,10 @@ import ( "github.com/grafana/mimir/pkg/util/globalerror" ) +func init() { + types.EnableManglingReturnedSlices = true +} + func TestUnsupportedPromQLFeatures(t *testing.T) { featureToggles := EnableAllFeatures @@ -1191,6 +1195,7 @@ func TestSubqueries(t *testing.T) { res := qry.Exec(context.Background()) testutils.RequireEqualResults(t, testCase.Query, &testCase.Result, res) + qry.Close() } // Ensure our test cases are correct by running them against Prometheus' engine too. diff --git a/pkg/streamingpromql/operators/aggregations/avg.go b/pkg/streamingpromql/operators/aggregations/avg.go index 063d99105ea..a1e783d617f 100644 --- a/pkg/streamingpromql/operators/aggregations/avg.go +++ b/pkg/streamingpromql/operators/aggregations/avg.go @@ -292,6 +292,9 @@ func (g *AvgAggregationGroup) ComputeOutputSeries(timeRange types.QueryTimeRange if h != nil && h != invalidCombinationOfHistograms { t := timeRange.StartT + int64(i)*timeRange.IntervalMilliseconds histogramPoints = append(histogramPoints, promql.HPoint{T: t, H: h.Compact(0)}) + + // Remove histogram from slice to ensure it's not mutated when the slice is reused. + g.histograms[i] = nil } } } diff --git a/pkg/streamingpromql/operators/aggregations/sum.go b/pkg/streamingpromql/operators/aggregations/sum.go index 2bf0d78b73f..d3bea361462 100644 --- a/pkg/streamingpromql/operators/aggregations/sum.go +++ b/pkg/streamingpromql/operators/aggregations/sum.go @@ -194,6 +194,9 @@ func (g *SumAggregationGroup) ComputeOutputSeries(timeRange types.QueryTimeRange if h != nil && h != invalidCombinationOfHistograms { t := timeRange.StartT + int64(i)*timeRange.IntervalMilliseconds histogramPoints = append(histogramPoints, promql.HPoint{T: t, H: h.Compact(0)}) + + // Remove histogram from slice to ensure it's not mutated when the slice is reused. + g.histogramSums[i] = nil } } } diff --git a/pkg/streamingpromql/query.go b/pkg/streamingpromql/query.go index 1f3868126ae..6eac62c8305 100644 --- a/pkg/streamingpromql/query.go +++ b/pkg/streamingpromql/query.go @@ -659,6 +659,9 @@ func (q *Query) populateVectorFromInstantVectorOperator(ctx context.Context, o t T: ts, H: point.H, }) + + // Remove histogram from slice to ensure it's not mutated when the slice is reused. + d.Histograms[0].H = nil } else { types.PutInstantVectorSeriesData(d, q.memoryConsumptionTracker) diff --git a/pkg/streamingpromql/types/limiting_pool.go b/pkg/streamingpromql/types/limiting_pool.go index 5fe1563f6aa..54a1da345f0 100644 --- a/pkg/streamingpromql/types/limiting_pool.go +++ b/pkg/streamingpromql/types/limiting_pool.go @@ -29,12 +29,17 @@ const ( ) var ( + // EnableManglingReturnedSlices enables mangling values in slices returned to pool to aid in detecting use-after-return bugs. + // Only used in tests. + EnableManglingReturnedSlices = false + FPointSlicePool = NewLimitingBucketedPool( pool.NewBucketedPool(1, maxExpectedPointsPerSeries, pointsPerSeriesBucketFactor, func(size int) []promql.FPoint { return make([]promql.FPoint, 0, size) }), FPointSize, false, + nil, ) HPointSlicePool = NewLimitingBucketedPool( @@ -43,6 +48,10 @@ var ( }), HPointSize, false, + func(point promql.HPoint) promql.HPoint { + point.H = mangleHistogram(point.H) + return point + }, ) VectorPool = NewLimitingBucketedPool( @@ -51,6 +60,7 @@ var ( }), VectorSampleSize, false, + nil, ) Float64SlicePool = NewLimitingBucketedPool( @@ -59,6 +69,7 @@ var ( }), Float64Size, true, + nil, ) BoolSlicePool = NewLimitingBucketedPool( @@ -67,6 +78,7 @@ var ( }), BoolSize, true, + nil, ) HistogramSlicePool = NewLimitingBucketedPool( @@ -75,9 +87,34 @@ var ( }), HistogramPointerSize, true, + mangleHistogram, ) ) +func mangleHistogram(h *histogram.FloatHistogram) *histogram.FloatHistogram { + if h == nil { + return nil + } + + h.ZeroCount = 12345678 + h.Count = 12345678 + h.Sum = 12345678 + + for i := range h.NegativeBuckets { + h.NegativeBuckets[i] = 12345678 + } + + for i := range h.PositiveBuckets { + h.PositiveBuckets[i] = 12345678 + } + + for i := range h.CustomValues { + h.CustomValues[i] = 12345678 + } + + return h +} + // LimitingBucketedPool pools slices across multiple query evaluations, and applies any max in-memory bytes limit. // // LimitingBucketedPool only estimates the in-memory size of the slices it returns. For example, it ignores the overhead of slice headers, @@ -86,13 +123,15 @@ type LimitingBucketedPool[S ~[]E, E any] struct { inner *pool.BucketedPool[S, E] elementSize uint64 clearOnGet bool + mangle func(E) E } -func NewLimitingBucketedPool[S ~[]E, E any](inner *pool.BucketedPool[S, E], elementSize uint64, clearOnGet bool) *LimitingBucketedPool[S, E] { +func NewLimitingBucketedPool[S ~[]E, E any](inner *pool.BucketedPool[S, E], elementSize uint64, clearOnGet bool, mangle func(E) E) *LimitingBucketedPool[S, E] { return &LimitingBucketedPool[S, E]{ inner: inner, elementSize: elementSize, clearOnGet: clearOnGet, + mangle: mangle, } } @@ -131,6 +170,12 @@ func (p *LimitingBucketedPool[S, E]) Put(s S, tracker *limiting.MemoryConsumptio return } + if EnableManglingReturnedSlices && p.mangle != nil { + for i, e := range s { + s[i] = p.mangle(e) + } + } + tracker.DecreaseMemoryConsumption(uint64(cap(s)) * p.elementSize) p.inner.Put(s) } diff --git a/pkg/streamingpromql/types/limiting_pool_test.go b/pkg/streamingpromql/types/limiting_pool_test.go index 14777e6be2b..418af2efcda 100644 --- a/pkg/streamingpromql/types/limiting_pool_test.go +++ b/pkg/streamingpromql/types/limiting_pool_test.go @@ -28,6 +28,7 @@ func TestLimitingBucketedPool_Unlimited(t *testing.T) { pool.NewBucketedPool(1, 1000, 2, func(size int) []promql.FPoint { return make([]promql.FPoint, 0, size) }), FPointSize, false, + nil, ) // Get a slice from the pool, the current and peak stats should be updated based on the capacity of the slice returned, not the size requested. @@ -80,6 +81,7 @@ func TestLimitingPool_Limited(t *testing.T) { pool.NewBucketedPool(1, 1000, 2, func(size int) []promql.FPoint { return make([]promql.FPoint, 0, size) }), FPointSize, false, + nil, ) // Get a slice from the pool beneath the limit. @@ -190,6 +192,42 @@ func TestLimitingPool_ClearsReturnedSlices(t *testing.T) { }) } +func TestLimitingPool_Mangling(t *testing.T) { + currentEnableManglingReturnedSlices := EnableManglingReturnedSlices + defer func() { + // Ensure we reset this back to the default state given it applies globally. + EnableManglingReturnedSlices = currentEnableManglingReturnedSlices + }() + + _, metric := createRejectedMetric() + tracker := limiting.NewMemoryConsumptionTracker(0, metric) + + p := NewLimitingBucketedPool( + pool.NewBucketedPool(1, 1000, 2, func(size int) []int { return make([]int, 0, size) }), + 1, + false, + func(_ int) int { return 123 }, + ) + + // Test with mangling disabled. + EnableManglingReturnedSlices = false + s, err := p.Get(4, tracker) + require.NoError(t, err) + s = append(s, 1000, 2000, 3000, 4000) + + p.Put(s, tracker) + require.Equal(t, []int{1000, 2000, 3000, 4000}, s, "returned slice should not be mangled when mangling is disabled") + + // Test with mangling enabled. + EnableManglingReturnedSlices = true + s, err = p.Get(4, tracker) + require.NoError(t, err) + s = append(s, 1000, 2000, 3000, 4000) + + p.Put(s, tracker) + require.Equal(t, []int{123, 123, 123, 123}, s, "returned slice should be mangled when mangling is enabled") +} + func assertRejectedQueryCount(t *testing.T, reg *prometheus.Registry, expectedRejectionCount int) { expected := fmt.Sprintf(` # TYPE %s counter