Skip to content

Commit

Permalink
MQE: mangle native histograms in returned slices to catch use-after-r…
Browse files Browse the repository at this point in the history
…eturn bugs + fix issues found (#10010)

* Mangle histograms in returned slices to catch use-after-return bugs

* Fix issue where native histograms returned from an instant query could be corrupted

* Ensure queries evaluated in `TestSubqueries` are cleaned up

* Fix issue where native histograms returned `sum` or `avg` could be corrupted

* Add changelog entry

# Conflicts:
#	CHANGELOG.md

* Add test for mangling behaviour

* Address PR feedback
  • Loading branch information
charleskorn authored Nov 26, 2024
1 parent f173783 commit a9a5ff3
Show file tree
Hide file tree
Showing 7 changed files with 99 additions and 2 deletions.
2 changes: 1 addition & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
* [CHANGE] Ingester: remove experimental flags `-ingest-storage.kafka.ongoing-records-per-fetch` and `-ingest-storage.kafka.startup-records-per-fetch`. They are removed in favour of `-ingest-storage.kafka.max-buffered-bytes`. #9906
* [CHANGE] Ingester: Replace `cortex_discarded_samples_total` label from `sample-out-of-bounds` to `sample-timestamp-too-old`. #9885
* [CHANGE] Ruler: the `/prometheus/config/v1/rules` does not return an error anymore if a rule group is missing in the object storage after been successfully returned by listing the storage, because it could have been deleted in the meanwhile. #9936
* [FEATURE] Querier: add experimental streaming PromQL engine, enabled with `-querier.query-engine=mimir`. #9367 #9368 #9398 #9399 #9403 #9417 #9418 #9419 #9420 #9482 #9504 #9505 #9507 #9518 #9531 #9532 #9533 #9553 #9558 #9588 #9589 #9639 #9641 #9642 #9651 #9664 #9681 #9717 #9719 #9724 #9874 #9998 #10007
* [FEATURE] Querier: add experimental streaming PromQL engine, enabled with `-querier.query-engine=mimir`. #9367 #9368 #9398 #9399 #9403 #9417 #9418 #9419 #9420 #9482 #9504 #9505 #9507 #9518 #9531 #9532 #9533 #9553 #9558 #9588 #9589 #9639 #9641 #9642 #9651 #9664 #9681 #9717 #9719 #9724 #9874 #9998 #10007 #10010
* [FEATURE] Distributor: Add support for `lz4` OTLP compression. #9763
* [FEATURE] Query-frontend: added experimental configuration options `query-frontend.cache-errors` and `query-frontend.results-cache-ttl-for-errors` to allow non-transient responses to be cached. When set to `true` error responses from hitting limits or bad data are cached for a short TTL. #9028
* [FEATURE] Query-frontend: add middleware to control access to specific PromQL experimental functions on a per-tenant basis. #9798
Expand Down
5 changes: 5 additions & 0 deletions pkg/streamingpromql/engine_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,10 @@ import (
"github.com/grafana/mimir/pkg/util/globalerror"
)

func init() {
types.EnableManglingReturnedSlices = true
}

func TestUnsupportedPromQLFeatures(t *testing.T) {
featureToggles := EnableAllFeatures

Expand Down Expand Up @@ -1191,6 +1195,7 @@ func TestSubqueries(t *testing.T) {

res := qry.Exec(context.Background())
testutils.RequireEqualResults(t, testCase.Query, &testCase.Result, res)
qry.Close()
}

// Ensure our test cases are correct by running them against Prometheus' engine too.
Expand Down
3 changes: 3 additions & 0 deletions pkg/streamingpromql/operators/aggregations/avg.go
Original file line number Diff line number Diff line change
Expand Up @@ -292,6 +292,9 @@ func (g *AvgAggregationGroup) ComputeOutputSeries(timeRange types.QueryTimeRange
if h != nil && h != invalidCombinationOfHistograms {
t := timeRange.StartT + int64(i)*timeRange.IntervalMilliseconds
histogramPoints = append(histogramPoints, promql.HPoint{T: t, H: h.Compact(0)})

// Remove histogram from slice to ensure it's not mutated when the slice is reused.
g.histograms[i] = nil
}
}
}
Expand Down
3 changes: 3 additions & 0 deletions pkg/streamingpromql/operators/aggregations/sum.go
Original file line number Diff line number Diff line change
Expand Up @@ -194,6 +194,9 @@ func (g *SumAggregationGroup) ComputeOutputSeries(timeRange types.QueryTimeRange
if h != nil && h != invalidCombinationOfHistograms {
t := timeRange.StartT + int64(i)*timeRange.IntervalMilliseconds
histogramPoints = append(histogramPoints, promql.HPoint{T: t, H: h.Compact(0)})

// Remove histogram from slice to ensure it's not mutated when the slice is reused.
g.histogramSums[i] = nil
}
}
}
Expand Down
3 changes: 3 additions & 0 deletions pkg/streamingpromql/query.go
Original file line number Diff line number Diff line change
Expand Up @@ -659,6 +659,9 @@ func (q *Query) populateVectorFromInstantVectorOperator(ctx context.Context, o t
T: ts,
H: point.H,
})

// Remove histogram from slice to ensure it's not mutated when the slice is reused.
d.Histograms[0].H = nil
} else {
types.PutInstantVectorSeriesData(d, q.memoryConsumptionTracker)

Expand Down
47 changes: 46 additions & 1 deletion pkg/streamingpromql/types/limiting_pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,12 +29,17 @@ const (
)

var (
// EnableManglingReturnedSlices enables mangling values in slices returned to pool to aid in detecting use-after-return bugs.
// Only used in tests.
EnableManglingReturnedSlices = false

FPointSlicePool = NewLimitingBucketedPool(
pool.NewBucketedPool(1, maxExpectedPointsPerSeries, pointsPerSeriesBucketFactor, func(size int) []promql.FPoint {
return make([]promql.FPoint, 0, size)
}),
FPointSize,
false,
nil,
)

HPointSlicePool = NewLimitingBucketedPool(
Expand All @@ -43,6 +48,10 @@ var (
}),
HPointSize,
false,
func(point promql.HPoint) promql.HPoint {
point.H = mangleHistogram(point.H)
return point
},
)

VectorPool = NewLimitingBucketedPool(
Expand All @@ -51,6 +60,7 @@ var (
}),
VectorSampleSize,
false,
nil,
)

Float64SlicePool = NewLimitingBucketedPool(
Expand All @@ -59,6 +69,7 @@ var (
}),
Float64Size,
true,
nil,
)

BoolSlicePool = NewLimitingBucketedPool(
Expand All @@ -67,6 +78,7 @@ var (
}),
BoolSize,
true,
nil,
)

HistogramSlicePool = NewLimitingBucketedPool(
Expand All @@ -75,9 +87,34 @@ var (
}),
HistogramPointerSize,
true,
mangleHistogram,
)
)

func mangleHistogram(h *histogram.FloatHistogram) *histogram.FloatHistogram {
if h == nil {
return nil
}

h.ZeroCount = 12345678
h.Count = 12345678
h.Sum = 12345678

for i := range h.NegativeBuckets {
h.NegativeBuckets[i] = 12345678
}

for i := range h.PositiveBuckets {
h.PositiveBuckets[i] = 12345678
}

for i := range h.CustomValues {
h.CustomValues[i] = 12345678
}

return h
}

// LimitingBucketedPool pools slices across multiple query evaluations, and applies any max in-memory bytes limit.
//
// LimitingBucketedPool only estimates the in-memory size of the slices it returns. For example, it ignores the overhead of slice headers,
Expand All @@ -86,13 +123,15 @@ type LimitingBucketedPool[S ~[]E, E any] struct {
inner *pool.BucketedPool[S, E]
elementSize uint64
clearOnGet bool
mangle func(E) E
}

func NewLimitingBucketedPool[S ~[]E, E any](inner *pool.BucketedPool[S, E], elementSize uint64, clearOnGet bool) *LimitingBucketedPool[S, E] {
func NewLimitingBucketedPool[S ~[]E, E any](inner *pool.BucketedPool[S, E], elementSize uint64, clearOnGet bool, mangle func(E) E) *LimitingBucketedPool[S, E] {
return &LimitingBucketedPool[S, E]{
inner: inner,
elementSize: elementSize,
clearOnGet: clearOnGet,
mangle: mangle,
}
}

Expand Down Expand Up @@ -131,6 +170,12 @@ func (p *LimitingBucketedPool[S, E]) Put(s S, tracker *limiting.MemoryConsumptio
return
}

if EnableManglingReturnedSlices && p.mangle != nil {
for i, e := range s {
s[i] = p.mangle(e)
}
}

tracker.DecreaseMemoryConsumption(uint64(cap(s)) * p.elementSize)
p.inner.Put(s)
}
Expand Down
38 changes: 38 additions & 0 deletions pkg/streamingpromql/types/limiting_pool_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ func TestLimitingBucketedPool_Unlimited(t *testing.T) {
pool.NewBucketedPool(1, 1000, 2, func(size int) []promql.FPoint { return make([]promql.FPoint, 0, size) }),
FPointSize,
false,
nil,
)

// Get a slice from the pool, the current and peak stats should be updated based on the capacity of the slice returned, not the size requested.
Expand Down Expand Up @@ -80,6 +81,7 @@ func TestLimitingPool_Limited(t *testing.T) {
pool.NewBucketedPool(1, 1000, 2, func(size int) []promql.FPoint { return make([]promql.FPoint, 0, size) }),
FPointSize,
false,
nil,
)

// Get a slice from the pool beneath the limit.
Expand Down Expand Up @@ -190,6 +192,42 @@ func TestLimitingPool_ClearsReturnedSlices(t *testing.T) {
})
}

func TestLimitingPool_Mangling(t *testing.T) {
currentEnableManglingReturnedSlices := EnableManglingReturnedSlices
defer func() {
// Ensure we reset this back to the default state given it applies globally.
EnableManglingReturnedSlices = currentEnableManglingReturnedSlices
}()

_, metric := createRejectedMetric()
tracker := limiting.NewMemoryConsumptionTracker(0, metric)

p := NewLimitingBucketedPool(
pool.NewBucketedPool(1, 1000, 2, func(size int) []int { return make([]int, 0, size) }),
1,
false,
func(_ int) int { return 123 },
)

// Test with mangling disabled.
EnableManglingReturnedSlices = false
s, err := p.Get(4, tracker)
require.NoError(t, err)
s = append(s, 1000, 2000, 3000, 4000)

p.Put(s, tracker)
require.Equal(t, []int{1000, 2000, 3000, 4000}, s, "returned slice should not be mangled when mangling is disabled")

// Test with mangling enabled.
EnableManglingReturnedSlices = true
s, err = p.Get(4, tracker)
require.NoError(t, err)
s = append(s, 1000, 2000, 3000, 4000)

p.Put(s, tracker)
require.Equal(t, []int{123, 123, 123, 123}, s, "returned slice should be mangled when mangling is enabled")
}

func assertRejectedQueryCount(t *testing.T, reg *prometheus.Registry, expectedRejectionCount int) {
expected := fmt.Sprintf(`
# TYPE %s counter
Expand Down

0 comments on commit a9a5ff3

Please sign in to comment.