diff --git a/pkg/frontend/querymiddleware/querysharding_test_utils_test.go b/pkg/frontend/querymiddleware/querysharding_test_utils_test.go index b0032d1f84d..4dcaaa539db 100644 --- a/pkg/frontend/querymiddleware/querysharding_test_utils_test.go +++ b/pkg/frontend/querymiddleware/querysharding_test_utils_test.go @@ -73,7 +73,7 @@ func newMockShardedQueryable( sets := genLabels(labelSet, labelBuckets) xs := make([]storage.Series, 0, len(sets)) for _, ls := range sets { - xs = append(xs, series.NewConcreteSeries(ls, samples)) + xs = append(xs, series.NewConcreteSeries(ls, samples, nil)) } return &mockShardedQueryable{ diff --git a/pkg/frontend/querymiddleware/sharded_queryable.go b/pkg/frontend/querymiddleware/sharded_queryable.go index 5a82817dbfd..d4124c01243 100644 --- a/pkg/frontend/querymiddleware/sharded_queryable.go +++ b/pkg/frontend/querymiddleware/sharded_queryable.go @@ -248,7 +248,7 @@ func newSeriesSetFromEmbeddedQueriesResults(results [][]SampleStream, hints *sto }) } - set = append(set, series.NewConcreteSeries(mimirpb.FromLabelAdaptersToLabels(stream.Labels), samples)) + set = append(set, series.NewConcreteSeries(mimirpb.FromLabelAdaptersToLabels(stream.Labels), samples, nil)) } } return series.NewConcreteSeriesSet(set) diff --git a/pkg/querier/matrix.go b/pkg/querier/matrix.go index 4939cbc433b..45275b45fc2 100644 --- a/pkg/querier/matrix.go +++ b/pkg/querier/matrix.go @@ -9,22 +9,37 @@ import ( "github.com/prometheus/common/model" "github.com/prometheus/prometheus/tsdb/chunkenc" + "github.com/grafana/mimir/pkg/mimirpb" "github.com/grafana/mimir/pkg/storage/chunk" "github.com/grafana/mimir/pkg/storage/series" - "github.com/grafana/mimir/pkg/util" + "github.com/grafana/mimir/pkg/util/modelutil" ) func mergeChunks(chunks []chunk.Chunk, from, through model.Time) chunkenc.Iterator { - samples := make([][]model.SamplePair, 0, len(chunks)) + var ( + samples = make([][]model.SamplePair, 0, len(chunks)) + histograms [][]mimirpb.Histogram + mergedSamples []model.SamplePair + mergedHistograms []mimirpb.Histogram + ) for _, c := range chunks { - ss, err := c.Samples(from, through) + sf, sh, err := c.Samples(from, through) if err != nil { return series.NewErrIterator(err) } - - samples = append(samples, ss) + if len(sf) > 0 { + samples = append(samples, sf) + } + if len(sh) > 0 { + histograms = append(histograms, sh) + } + } + if len(histograms) > 0 { + mergedHistograms = modelutil.MergeNHistogramSets(histograms...) + } + if len(samples) > 0 { + mergedSamples = modelutil.MergeNSampleSets(samples...) } - merged := util.MergeNSampleSets(samples...) - return series.NewConcreteSeriesIterator(series.NewConcreteSeries(nil, merged)) + return series.NewConcreteSeriesIterator(series.NewConcreteSeries(nil, mergedSamples, mergedHistograms)) } diff --git a/pkg/storage/chunk/chunk.go b/pkg/storage/chunk/chunk.go index 10a5e371cf2..3e31ec6a523 100644 --- a/pkg/storage/chunk/chunk.go +++ b/pkg/storage/chunk/chunk.go @@ -6,6 +6,7 @@ package chunk import ( + "fmt" "io" "unsafe" @@ -13,6 +14,8 @@ import ( "github.com/prometheus/prometheus/model/histogram" "github.com/prometheus/prometheus/model/labels" "github.com/prometheus/prometheus/tsdb/chunkenc" + + "github.com/grafana/mimir/pkg/mimirpb" ) const ( @@ -120,24 +123,38 @@ func NewChunk(metric labels.Labels, c EncodedChunk, from, through model.Time) Ch } } -// Samples returns all SamplePairs for the chunk. -func (c *Chunk) Samples(from, through model.Time) ([]model.SamplePair, error) { +// Samples returns all SamplePairs and Histograms for the chunk. +func (c *Chunk) Samples(from, through model.Time) ([]model.SamplePair, []mimirpb.Histogram, error) { it := c.Data.NewIterator(nil) return rangeValues(it, from, through) } // rangeValues is a utility function that retrieves all values within the given // range from an Iterator. -func rangeValues(it Iterator, oldestInclusive, newestInclusive model.Time) ([]model.SamplePair, error) { - result := []model.SamplePair{} - if it.FindAtOrAfter(oldestInclusive) == chunkenc.ValNone { - return result, it.Err() +func rangeValues(it Iterator, oldestInclusive, newestInclusive model.Time) ([]model.SamplePair, []mimirpb.Histogram, error) { + resultFloat := []model.SamplePair{} + resultHist := []mimirpb.Histogram{} + currValType := it.FindAtOrAfter(oldestInclusive) + if currValType == chunkenc.ValNone { + return resultFloat, resultHist, it.Err() } - for !it.Value().Timestamp.After(newestInclusive) { - result = append(result, it.Value()) - if it.Scan() == chunkenc.ValNone { + for !model.Time(it.Timestamp()).After(newestInclusive) { + switch currValType { + case chunkenc.ValFloat: + resultFloat = append(resultFloat, it.Value()) + case chunkenc.ValHistogram: + t, h := it.AtHistogram() + resultHist = append(resultHist, mimirpb.FromHistogramToHistogramProto(t, h)) + case chunkenc.ValFloatHistogram: + t, h := it.AtFloatHistogram() + resultHist = append(resultHist, mimirpb.FromFloatHistogramToHistogramProto(t, h)) + default: + return nil, nil, fmt.Errorf("unknown value type %v in iterator", currValType) + } + currValType = it.Scan() + if currValType == chunkenc.ValNone { break } } - return result, it.Err() + return resultFloat, resultHist, it.Err() } diff --git a/pkg/storage/series/series_set.go b/pkg/storage/series/series_set.go index d1139516aa1..f57c105e788 100644 --- a/pkg/storage/series/series_set.go +++ b/pkg/storage/series/series_set.go @@ -14,6 +14,8 @@ import ( "github.com/prometheus/prometheus/model/labels" "github.com/prometheus/prometheus/storage" "github.com/prometheus/prometheus/tsdb/chunkenc" + + "github.com/grafana/mimir/pkg/mimirpb" ) // ConcreteSeriesSet implements storage.SeriesSet. @@ -55,15 +57,17 @@ func (c *ConcreteSeriesSet) Warnings() storage.Warnings { // ConcreteSeries implements storage.Series. type ConcreteSeries struct { - labels labels.Labels - samples []model.SamplePair + labels labels.Labels + samples []model.SamplePair + histograms []mimirpb.Histogram } -// NewConcreteSeries instantiates an in memory series from a list of samples & labels -func NewConcreteSeries(ls labels.Labels, samples []model.SamplePair) *ConcreteSeries { +// NewConcreteSeries instantiates an in memory series from a list of samples & histograms & labels +func NewConcreteSeries(ls labels.Labels, samples []model.SamplePair, histograms []mimirpb.Histogram) *ConcreteSeries { return &ConcreteSeries{ - labels: ls, - samples: samples, + labels: ls, + samples: samples, + histograms: histograms, } } @@ -79,52 +83,138 @@ func (c *ConcreteSeries) Iterator(_ chunkenc.Iterator) chunkenc.Iterator { // concreteSeriesIterator implements chunkenc.Iterator. type concreteSeriesIterator struct { - cur int - series *ConcreteSeries + curFloat int + curHisto int + atHisto bool + series *ConcreteSeries } // NewConcreteSeriesIterator instantiates an in memory chunkenc.Iterator func NewConcreteSeriesIterator(series *ConcreteSeries) chunkenc.Iterator { return &concreteSeriesIterator{ - cur: -1, - series: series, + curFloat: -1, + curHisto: -1, + atHisto: false, + series: series, + } +} + +// atTypeHisto is an internal method to differentiate between histogram and float histogram value types +// Checking that c.curHisto is a valid index in the c.series.histograms array and that +// c.atHisto is true must be done outside of this +func (c *concreteSeriesIterator) atTypeHisto() chunkenc.ValueType { + if c.series.histograms[c.curHisto].IsFloatHistogram() { + return chunkenc.ValFloatHistogram + } + return chunkenc.ValHistogram +} + +// atType returns current timestamp and value type +func (c *concreteSeriesIterator) atType() (int64, chunkenc.ValueType) { + if c.atHisto { + if c.curHisto < 0 || c.curHisto >= len(c.series.histograms) { + return 0, chunkenc.ValNone + } + return c.series.histograms[c.curHisto].Timestamp, c.atTypeHisto() } + if c.curFloat < 0 || c.curFloat >= len(c.series.samples) { + return 0, chunkenc.ValNone + } + return int64(c.series.samples[c.curFloat].Timestamp), chunkenc.ValFloat } func (c *concreteSeriesIterator) Seek(t int64) chunkenc.ValueType { - c.cur = sort.Search(len(c.series.samples), func(n int) bool { + oldTime, oldType := c.atType() + if oldTime >= t { // only advance via Seek + return oldType + } + + c.curFloat = sort.Search(len(c.series.samples), func(n int) bool { return c.series.samples[n].Timestamp >= model.Time(t) }) - if c.cur < len(c.series.samples) { + c.curHisto = sort.Search(len(c.series.histograms), func(n int) bool { + return c.series.histograms[n].Timestamp >= t + }) + + if c.curFloat >= len(c.series.samples) && c.curHisto >= len(c.series.histograms) { + return chunkenc.ValNone + } + if c.curFloat >= len(c.series.samples) { + c.atHisto = true + return c.atTypeHisto() + } + if c.curHisto >= len(c.series.histograms) { + c.atHisto = false return chunkenc.ValFloat } - return chunkenc.ValNone + if int64(c.series.samples[c.curFloat].Timestamp) < c.series.histograms[c.curHisto].Timestamp { + c.curHisto-- + c.atHisto = false + return chunkenc.ValFloat + } + c.curFloat-- + c.atHisto = true + return c.atTypeHisto() } func (c *concreteSeriesIterator) At() (t int64, v float64) { - s := c.series.samples[c.cur] + if c.atHisto { + panic(errors.New("concreteSeriesIterator: Calling At() when cursor is at histogram")) + } + s := c.series.samples[c.curFloat] return int64(s.Timestamp), float64(s.Value) } func (c *concreteSeriesIterator) Next() chunkenc.ValueType { - c.cur++ - if c.cur < len(c.series.samples) { + if c.curFloat+1 >= len(c.series.samples) && c.curHisto+1 >= len(c.series.histograms) { + c.curFloat = len(c.series.samples) + c.curHisto = len(c.series.histograms) + return chunkenc.ValNone + } + if c.curFloat+1 >= len(c.series.samples) { + c.curHisto++ + c.atHisto = true + return c.atTypeHisto() + } + if c.curHisto+1 >= len(c.series.histograms) { + c.curFloat++ + c.atHisto = false return chunkenc.ValFloat } - return chunkenc.ValNone + if int64(c.series.samples[c.curFloat+1].Timestamp) < c.series.histograms[c.curHisto+1].Timestamp { + c.curFloat++ + c.atHisto = false + return chunkenc.ValFloat + } + c.curHisto++ + c.atHisto = true + return c.atTypeHisto() } func (c *concreteSeriesIterator) AtHistogram() (int64, *histogram.Histogram) { - panic(errors.New("concreteSeriesIterator: AtHistogram not implemented")) + if !c.atHisto { + panic(errors.New("concreteSeriesIterator: Calling AtHistogram() when cursor is not at histogram")) + } + h := c.series.histograms[c.curHisto] + return h.Timestamp, mimirpb.FromHistogramProtoToHistogram(&h) } func (c *concreteSeriesIterator) AtFloatHistogram() (int64, *histogram.FloatHistogram) { - panic(errors.New("concreteSeriesIterator: AtHistogram not implemented")) + if !c.atHisto { + panic(errors.New("concreteSeriesIterator: Calling AtFloatHistogram() when cursor is not at histogram")) + } + h := c.series.histograms[c.curHisto] + if h.IsFloatHistogram() { + return h.Timestamp, mimirpb.FromHistogramProtoToFloatHistogram(&h) + } + return h.Timestamp, mimirpb.FromHistogramProtoToHistogram(&h).ToFloat() } func (c *concreteSeriesIterator) AtT() int64 { - s := c.series.samples[c.cur] - return int64(s.Timestamp) + if c.atHisto { + return c.series.histograms[c.curHisto].Timestamp + } + return int64(c.series.samples[c.curFloat].Timestamp) } func (c *concreteSeriesIterator) Err() error { @@ -177,6 +267,7 @@ func MatrixToSeriesSet(m model.Matrix) storage.SeriesSet { series = append(series, &ConcreteSeries{ labels: metricToLabels(ss.Metric), samples: ss.Values, + // histograms: ss.Histograms, // cannot convert the decoded matrix form to the expected encoded format. this method is only used in tests so ignoring histogram support for now }) } return NewConcreteSeriesSet(series) @@ -187,8 +278,7 @@ func LabelsToSeriesSet(ls []labels.Labels) storage.SeriesSet { series := make([]storage.Series, 0, len(ls)) for _, l := range ls { series = append(series, &ConcreteSeries{ - labels: l, - samples: nil, + labels: l, }) } return NewConcreteSeriesSet(series) diff --git a/pkg/storage/series/series_set_test.go b/pkg/storage/series/series_set_test.go index 9c051afecfa..90c864f8643 100644 --- a/pkg/storage/series/series_set_test.go +++ b/pkg/storage/series/series_set_test.go @@ -11,22 +11,37 @@ import ( "github.com/prometheus/common/model" "github.com/prometheus/prometheus/model/labels" "github.com/prometheus/prometheus/storage" + "github.com/prometheus/prometheus/tsdb/chunkenc" "github.com/stretchr/testify/require" + + "github.com/grafana/mimir/pkg/mimirpb" + "github.com/grafana/mimir/pkg/util/test" +) + +var ( + generateTestHistogram = test.GenerateTestHistogram + generateTestFloatHistogram = test.GenerateTestFloatHistogram ) func TestConcreteSeriesSet(t *testing.T) { series1 := &ConcreteSeries{ labels: labels.FromStrings("foo", "bar"), - samples: []model.SamplePair{{Value: 1, Timestamp: 2}}, + samples: []model.SamplePair{{Timestamp: 1, Value: 2}}, } series2 := &ConcreteSeries{ labels: labels.FromStrings("foo", "baz"), - samples: []model.SamplePair{{Value: 3, Timestamp: 4}}, + samples: []model.SamplePair{{Timestamp: 3, Value: 4}}, } - c := NewConcreteSeriesSet([]storage.Series{series2, series1}) + series3 := &ConcreteSeries{ + labels: labels.FromStrings("foo", "bay"), + histograms: []mimirpb.Histogram{mimirpb.FromHistogramToHistogramProto(5, generateTestHistogram(6))}, + } + c := NewConcreteSeriesSet([]storage.Series{series3, series2, series1}) require.True(t, c.Next()) require.Equal(t, series1, c.At()) require.True(t, c.Next()) + require.Equal(t, series3, c.At()) + require.True(t, c.Next()) require.Equal(t, series2, c.At()) require.False(t, c.Next()) } @@ -51,3 +66,88 @@ func TestMatrixToSeriesSetSortsMetricLabels(t *testing.T) { l := ss.At().Labels() require.Equal(t, labels.FromStrings(model.MetricNameLabel, "testmetric", "a", "b", "c", "d", "e", "f", "g", "h"), l) } + +func TestConcreteSeriesSetIterator(t *testing.T) { + series := &ConcreteSeries{ + labels: labels.FromStrings("foo", "bar"), + samples: []model.SamplePair{{Timestamp: 1, Value: 2}, {Timestamp: 5, Value: 6}, {Timestamp: 9, Value: 10}}, + histograms: []mimirpb.Histogram{mimirpb.FromHistogramToHistogramProto(3, generateTestHistogram(4)), mimirpb.FromFloatHistogramToHistogramProto(7, generateTestFloatHistogram(8)), mimirpb.FromHistogramToHistogramProto(11, generateTestHistogram(12))}, + } + + // test next + it := series.Iterator(nil) + require.Equal(t, chunkenc.ValFloat, it.Next()) + ts, v := it.At() + require.Equal(t, int64(1), ts) + require.Equal(t, float64(2), v) + require.Equal(t, chunkenc.ValHistogram, it.Next()) + ts, h := it.AtHistogram() + require.Equal(t, int64(3), ts) + require.Equal(t, generateTestHistogram(4), h) + require.Equal(t, chunkenc.ValFloat, it.Next()) + ts, v = it.At() + require.Equal(t, int64(5), ts) + require.Equal(t, float64(6), v) + require.Equal(t, chunkenc.ValFloatHistogram, it.Next()) + ts, fh := it.AtFloatHistogram() + require.Equal(t, int64(7), ts) + require.Equal(t, generateTestFloatHistogram(8), fh) + require.Equal(t, chunkenc.ValFloat, it.Next()) + ts, v = it.At() + require.Equal(t, int64(9), ts) + require.Equal(t, float64(10), v) + require.Equal(t, chunkenc.ValHistogram, it.Next()) + ts, h = it.AtHistogram() + require.Equal(t, int64(11), ts) + require.Equal(t, generateTestHistogram(12), h) + // You can also call AtFloatHistogram() on ValHistogram. + ts, fh = it.AtFloatHistogram() + require.Equal(t, int64(11), ts) + require.Equal(t, generateTestHistogram(12).ToFloat(), fh) + + require.Equal(t, chunkenc.ValNone, it.Next()) + + // test seek to same and next + it = series.Iterator(nil) + require.Equal(t, chunkenc.ValHistogram, it.Seek(3)) // Seek to middle + ts, h = it.AtHistogram() + require.Equal(t, int64(3), ts) + require.Equal(t, generateTestHistogram(4), h) + require.Equal(t, chunkenc.ValHistogram, it.Seek(3)) // Seek to same place + ts, h = it.AtHistogram() + require.Equal(t, int64(3), ts) + require.Equal(t, generateTestHistogram(4), h) + require.Equal(t, chunkenc.ValFloat, it.Next()) + ts, v = it.At() + require.Equal(t, int64(5), ts) + require.Equal(t, float64(6), v) + + // test seek to earlier and next, then to later and next + it = series.Iterator(nil) + require.Equal(t, chunkenc.ValHistogram, it.Seek(3)) // Seek to middle + ts, h = it.AtHistogram() + require.Equal(t, int64(3), ts) + require.Equal(t, generateTestHistogram(4), h) + require.Equal(t, chunkenc.ValHistogram, it.Seek(1)) // Ensure seek doesn't do anything if already past seek target. + ts, h = it.AtHistogram() + require.Equal(t, int64(3), ts) + require.Equal(t, generateTestHistogram(4), h) + require.Equal(t, chunkenc.ValFloat, it.Next()) + ts, v = it.At() + require.Equal(t, int64(5), ts) + require.Equal(t, float64(6), v) + require.Equal(t, chunkenc.ValFloatHistogram, it.Seek(7)) // Seek to later + ts, fh = it.AtFloatHistogram() + require.Equal(t, int64(7), ts) + require.Equal(t, generateTestFloatHistogram(8), fh) + require.Equal(t, chunkenc.ValFloat, it.Next()) + ts, v = it.At() + require.Equal(t, int64(9), ts) + require.Equal(t, float64(10), v) + require.Equal(t, chunkenc.ValHistogram, it.Seek(11)) // Seek to end + ts, h = it.AtHistogram() + require.Equal(t, int64(11), ts) + require.Equal(t, generateTestHistogram(12), h) + require.Equal(t, chunkenc.ValNone, it.Seek(13)) // Seek to past end + require.Equal(t, chunkenc.ValNone, it.Seek(13)) // Ensure that seeking to same end still returns ValNone +} diff --git a/pkg/util/chunkcompat/compat.go b/pkg/util/chunkcompat/compat.go index 8485fc15def..6dd4834ed8f 100644 --- a/pkg/util/chunkcompat/compat.go +++ b/pkg/util/chunkcompat/compat.go @@ -14,7 +14,7 @@ import ( "github.com/grafana/mimir/pkg/ingester/client" "github.com/grafana/mimir/pkg/mimirpb" "github.com/grafana/mimir/pkg/storage/chunk" - "github.com/grafana/mimir/pkg/util" + "github.com/grafana/mimir/pkg/util/modelutil" ) // StreamsToMatrix converts a slice of QueryStreamResponse to a model.Matrix. @@ -52,18 +52,33 @@ func SeriesChunksToMatrix(from, through model.Time, serieses []client.TimeSeries } samples := []model.SamplePair{} + histograms := []mimirpb.Histogram{} for _, chunk := range chunks { - ss, err := chunk.Samples(from, through) + sf, sh, err := chunk.Samples(from, through) if err != nil { return nil, err } - samples = util.MergeSampleSets(samples, ss) + samples = modelutil.MergeSampleSets(samples, sf) + histograms = modelutil.MergeHistogramSets(histograms, sh) } - result = append(result, &model.SampleStream{ + stream := &model.SampleStream{ Metric: metric, - Values: samples, - }) + } + if len(samples) > 0 { + stream.Values = samples + } + if len(histograms) > 0 { + histogramsDecoded := make([]model.SampleHistogramPair, 0, len(histograms)) + for _, h := range histograms { + histogramsDecoded = append(histogramsDecoded, model.SampleHistogramPair{ + Timestamp: model.Time(h.Timestamp), + Histogram: mimirpb.FromHistogramProtoToPromHistogram(&h), + }) + } + stream.Histograms = histogramsDecoded + } + result = append(result, stream) } return result, nil } @@ -87,6 +102,8 @@ func TimeseriesToMatrix(from, through model.Time, series []mimirpb.TimeSeries) ( Timestamp: model.Time(sam.TimestampMs), Value: model.SampleValue(sam.Value), }) + + // Only used in tests. Add native histogram support later: https://github.com/grafana/mimir/issues/4378 } result = append(result, &model.SampleStream{ diff --git a/pkg/util/modelutil/merger.go b/pkg/util/modelutil/merger.go new file mode 100644 index 00000000000..855c2cdf54d --- /dev/null +++ b/pkg/util/modelutil/merger.go @@ -0,0 +1,92 @@ +// SPDX-License-Identifier: AGPL-3.0-only +// Provenance-includes-location: https://github.com/cortexproject/cortex/blob/master/pkg/util/merger.go +// Provenance-includes-location: https://github.com/thanos-io/thanos/blob/main/pkg/strutil/merge.go +// Provenance-includes-license: Apache-2.0 +// Provenance-includes-copyright: The Cortex Authors. +// Provenance-includes-copyright: The Thanos Authors. + +package modelutil + +import ( + "github.com/prometheus/common/model" + + "github.com/grafana/mimir/pkg/mimirpb" +) + +// MergeSampleSets merges and dedupes two sets of already sorted sample pairs. +func MergeSampleSets(a, b []model.SamplePair) []model.SamplePair { + result := make([]model.SamplePair, 0, len(a)+len(b)) + i, j := 0, 0 + for i < len(a) && j < len(b) { + if a[i].Timestamp < b[j].Timestamp { + result = append(result, a[i]) + i++ + } else if a[i].Timestamp > b[j].Timestamp { + result = append(result, b[j]) + j++ + } else { + result = append(result, a[i]) + i++ + j++ + } + } + // Add the rest of a or b. One of them is empty now. + result = append(result, a[i:]...) + result = append(result, b[j:]...) + return result +} + +// MergeNSampleSets merges and dedupes n sets of already sorted sample pairs. +func MergeNSampleSets(sampleSets ...[]model.SamplePair) []model.SamplePair { + l := len(sampleSets) + switch l { + case 0: + return []model.SamplePair{} + case 1: + return sampleSets[0] + } + + n := l / 2 + left := MergeNSampleSets(sampleSets[:n]...) + right := MergeNSampleSets(sampleSets[n:]...) + return MergeSampleSets(left, right) +} + +// MergeHistogramSets merges and dedupes two sets of already sorted histograms. +func MergeHistogramSets(a, b []mimirpb.Histogram) []mimirpb.Histogram { + result := make([]mimirpb.Histogram, 0, len(a)+len(b)) + i, j := 0, 0 + for i < len(a) && j < len(b) { + if a[i].GetTimestamp() < b[j].GetTimestamp() { + result = append(result, a[i]) + i++ + } else if a[i].GetTimestamp() > b[j].GetTimestamp() { + result = append(result, b[j]) + j++ + } else { + result = append(result, a[i]) + i++ + j++ + } + } + // Add the rest of a or b. One of them is empty now. + result = append(result, a[i:]...) + result = append(result, b[j:]...) + return result +} + +// MergeNHistogramSets merges and dedupes n sets of already sorted histograms. +func MergeNHistogramSets(sampleSets ...[]mimirpb.Histogram) []mimirpb.Histogram { + l := len(sampleSets) + switch l { + case 0: + return []mimirpb.Histogram{} + case 1: + return sampleSets[0] + } + + n := l / 2 + left := MergeNHistogramSets(sampleSets[:n]...) + right := MergeNHistogramSets(sampleSets[n:]...) + return MergeHistogramSets(left, right) +} diff --git a/pkg/util/modelutil/merger_test.go b/pkg/util/modelutil/merger_test.go new file mode 100644 index 00000000000..402d9ecadc9 --- /dev/null +++ b/pkg/util/modelutil/merger_test.go @@ -0,0 +1,161 @@ +// SPDX-License-Identifier: AGPL-3.0-only +// Provenance-includes-location: https://github.com/cortexproject/cortex/blob/master/pkg/util/merger_test.go +// Provenance-includes-license: Apache-2.0 +// Provenance-includes-copyright: The Cortex Authors. + +package modelutil + +import ( + "testing" + "time" + + "github.com/prometheus/common/model" + "github.com/stretchr/testify/require" + + "github.com/grafana/mimir/pkg/mimirpb" + "github.com/grafana/mimir/pkg/util/test" +) + +func TestMergeSampleSets(t *testing.T) { + now := model.Now() + sample1 := model.SamplePair{Timestamp: now, Value: 1} + sample2 := model.SamplePair{Timestamp: now.Add(1 * time.Second), Value: 2} + sample3 := model.SamplePair{Timestamp: now.Add(4 * time.Second), Value: 3} + sample4 := model.SamplePair{Timestamp: now.Add(8 * time.Second), Value: 7} + + for _, c := range []struct { + samplesA []model.SamplePair + samplesB []model.SamplePair + expected []model.SamplePair + }{ + { + samplesA: []model.SamplePair{}, + samplesB: []model.SamplePair{}, + expected: []model.SamplePair{}, + }, + { + samplesA: []model.SamplePair{sample1}, + samplesB: []model.SamplePair{}, + expected: []model.SamplePair{sample1}, + }, + { + samplesA: []model.SamplePair{}, + samplesB: []model.SamplePair{sample1}, + expected: []model.SamplePair{sample1}, + }, + { + samplesA: []model.SamplePair{sample1}, + samplesB: []model.SamplePair{sample1}, + expected: []model.SamplePair{sample1}, + }, + { + samplesA: []model.SamplePair{sample1, sample2, sample3}, + samplesB: []model.SamplePair{sample1, sample3, sample4}, + expected: []model.SamplePair{sample1, sample2, sample3, sample4}, + }, + } { + samples := MergeSampleSets(c.samplesA, c.samplesB) + require.Equal(t, c.expected, samples) + } +} + +func TestMergeNSampleSets(t *testing.T) { + now := model.Now() + sample1 := model.SamplePair{Timestamp: now, Value: 1} + sample2 := model.SamplePair{Timestamp: now.Add(1 * time.Second), Value: 2} + sample3 := model.SamplePair{Timestamp: now.Add(4 * time.Second), Value: 3} + sample4 := model.SamplePair{Timestamp: now.Add(8 * time.Second), Value: 7} + + for _, c := range []struct { + sampleSets [][]model.SamplePair + expected []model.SamplePair + }{ + { + sampleSets: [][]model.SamplePair{{}, {}, {}}, + expected: []model.SamplePair{}, + }, + { + sampleSets: [][]model.SamplePair{ + {sample1, sample2}, + {sample2}, + {sample1, sample3, sample4}, + }, + expected: []model.SamplePair{sample1, sample2, sample3, sample4}, + }, + } { + samples := MergeNSampleSets(c.sampleSets...) + require.Equal(t, c.expected, samples) + } +} + +func TestMergeHistogramSets(t *testing.T) { + now := model.Now() + sample1 := mimirpb.FromFloatHistogramToHistogramProto(int64(now), test.GenerateTestFloatHistogram(1)) + sample2 := mimirpb.FromHistogramToHistogramProto(int64(now.Add(1*time.Second)), test.GenerateTestHistogram(2)) + sample3 := mimirpb.FromFloatHistogramToHistogramProto(int64(now.Add(4*time.Second)), test.GenerateTestFloatHistogram(3)) + sample4 := mimirpb.FromHistogramToHistogramProto(int64(now.Add(8*time.Second)), test.GenerateTestHistogram(7)) + + for _, c := range []struct { + samplesA []mimirpb.Histogram + samplesB []mimirpb.Histogram + expected []mimirpb.Histogram + }{ + { + samplesA: []mimirpb.Histogram{}, + samplesB: []mimirpb.Histogram{}, + expected: []mimirpb.Histogram{}, + }, + { + samplesA: []mimirpb.Histogram{sample1}, + samplesB: []mimirpb.Histogram{}, + expected: []mimirpb.Histogram{sample1}, + }, + { + samplesA: []mimirpb.Histogram{}, + samplesB: []mimirpb.Histogram{sample1}, + expected: []mimirpb.Histogram{sample1}, + }, + { + samplesA: []mimirpb.Histogram{sample1}, + samplesB: []mimirpb.Histogram{sample1}, + expected: []mimirpb.Histogram{sample1}, + }, + { + samplesA: []mimirpb.Histogram{sample1, sample2, sample3}, + samplesB: []mimirpb.Histogram{sample1, sample3, sample4}, + expected: []mimirpb.Histogram{sample1, sample2, sample3, sample4}, + }, + } { + samples := MergeHistogramSets(c.samplesA, c.samplesB) + require.Equal(t, c.expected, samples) + } +} + +func TestMergeNHistogramSets(t *testing.T) { + now := model.Now() + sample1 := mimirpb.FromFloatHistogramToHistogramProto(int64(now), test.GenerateTestFloatHistogram(1)) + sample2 := mimirpb.FromHistogramToHistogramProto(int64(now.Add(1*time.Second)), test.GenerateTestHistogram(2)) + sample3 := mimirpb.FromFloatHistogramToHistogramProto(int64(now.Add(4*time.Second)), test.GenerateTestFloatHistogram(3)) + sample4 := mimirpb.FromHistogramToHistogramProto(int64(now.Add(8*time.Second)), test.GenerateTestHistogram(7)) + + for _, c := range []struct { + sampleSets [][]mimirpb.Histogram + expected []mimirpb.Histogram + }{ + { + sampleSets: [][]mimirpb.Histogram{{}, {}, {}}, + expected: []mimirpb.Histogram{}, + }, + { + sampleSets: [][]mimirpb.Histogram{ + {sample1, sample2}, + {sample2}, + {sample1, sample3, sample4}, + }, + expected: []mimirpb.Histogram{sample1, sample2, sample3, sample4}, + }, + } { + samples := MergeNHistogramSets(c.sampleSets...) + require.Equal(t, c.expected, samples) + } +}