From 934a636e7bbfb46cc84ff4f8b381afb143402bbc Mon Sep 17 00:00:00 2001 From: arnikola Date: Fri, 3 Apr 2020 16:44:46 -0400 Subject: [PATCH] [query] Fix histogram grouping bug (#2247) --- .../functions/linear/histogram_quantile.go | 79 +++++++++------- .../linear/histogram_quantile_test.go | 89 +++++++++++++++++-- 2 files changed, 131 insertions(+), 37 deletions(-) diff --git a/src/query/functions/linear/histogram_quantile.go b/src/query/functions/linear/histogram_quantile.go index e8990af506..d2f56c90ba 100644 --- a/src/query/functions/linear/histogram_quantile.go +++ b/src/query/functions/linear/histogram_quantile.go @@ -133,13 +133,30 @@ func (b indexedBuckets) Less(i, j int) bool { type bucketedSeries map[string]indexedBuckets -func gatherSeriesToBuckets(metas []block.SeriesMeta) bucketedSeries { +type validSeriesBuckets []indexedBuckets + +func (b validSeriesBuckets) Len() int { return len(b) } +func (b validSeriesBuckets) Swap(i, j int) { b[i], b[j] = b[j], b[i] } +func (b validSeriesBuckets) Less(i, j int) bool { + if len(b[i].buckets) == 0 { + return false + } + + if len(b[j].buckets) == 0 { + return true + } + + // An arbitrarily chosen sort that guarantees deterministic results. + return b[i].buckets[0].idx < b[j].buckets[0].idx +} + +func gatherSeriesToBuckets(metas []block.SeriesMeta) validSeriesBuckets { bucketsForID := make(bucketedSeries, initIndexBucketLength) for i, meta := range metas { tags := meta.Tags value, found := tags.Bucket() if !found { - // This series does not have a bucket tag; drop it from the output. + // this series does not have a bucket tag; drop it from the output. continue } @@ -158,7 +175,7 @@ func gatherSeriesToBuckets(metas []block.SeriesMeta) bucketedSeries { } if buckets, found := bucketsForID[string(id)]; !found { - // Add a single indexed bucket for this ID with the current index only. + // add a single indexed bucket for this ID with the current index only. newBuckets := make([]indexedBucket, 0, initIndexBucketLength) newBuckets = append(newBuckets, newBucket) bucketsForID[string(id)] = indexedBuckets{ @@ -171,23 +188,29 @@ func gatherSeriesToBuckets(metas []block.SeriesMeta) bucketedSeries { } } - return bucketsForID + return sanitizeBuckets(bucketsForID) } // sanitize sorts the bucket maps by upper bound, dropping any series which // have less than two buckets, or any that do not have an upper bound of +Inf -func sanitizeBuckets(bucketMap bucketedSeries) { - for k, buckets := range bucketMap { +func sanitizeBuckets(bucketMap bucketedSeries) validSeriesBuckets { + validSeriesBuckets := make(validSeriesBuckets, 0, len(bucketMap)) + for _, buckets := range bucketMap { if len(buckets.buckets) < 2 { - delete(bucketMap, k) + continue } sort.Sort(buckets) maxBound := buckets.buckets[len(buckets.buckets)-1].upperBound if !math.IsInf(maxBound, 1) { - delete(bucketMap, k) + continue } + + validSeriesBuckets = append(validSeriesBuckets, buckets) } + + sort.Sort(validSeriesBuckets) + return validSeriesBuckets } func bucketQuantile(q float64, buckets []bucketValue) float64 { @@ -257,34 +280,30 @@ func (n *histogramQuantileNode) ProcessBlock( meta := b.Meta() seriesMetas := utils.FlattenMetadata(meta, stepIter.SeriesMeta()) - bucketedSeries := gatherSeriesToBuckets(seriesMetas) + seriesBuckets := gatherSeriesToBuckets(seriesMetas) q := n.op.q if q < 0 || q > 1 { - return processInvalidQuantile(queryCtx, q, bucketedSeries, meta, stepIter, n.controller) + return processInvalidQuantile(queryCtx, q, seriesBuckets, meta, stepIter, n.controller) } - return processValidQuantile(queryCtx, q, bucketedSeries, meta, stepIter, n.controller) + return processValidQuantile(queryCtx, q, seriesBuckets, meta, stepIter, n.controller) } func setupBuilder( queryCtx *models.QueryContext, - bucketedSeries bucketedSeries, + seriesBuckets validSeriesBuckets, meta block.Metadata, stepIter block.StepIter, controller *transform.Controller, ) (block.Builder, error) { - metas := make([]block.SeriesMeta, len(bucketedSeries)) - idx := 0 - for _, v := range bucketedSeries { - metas[idx] = block.SeriesMeta{ + metas := make([]block.SeriesMeta, 0, len(seriesBuckets)) + for _, v := range seriesBuckets { + metas = append(metas, block.SeriesMeta{ Tags: v.tags, - } - - idx++ + }) } - meta.Tags, metas = utils.DedupeMetadata(metas, meta.Tags.Opts) builder, err := controller.BlockBuilder(queryCtx, meta, metas) if err != nil { return nil, err @@ -300,14 +319,12 @@ func setupBuilder( func processValidQuantile( queryCtx *models.QueryContext, q float64, - bucketedSeries bucketedSeries, + seriesBuckets validSeriesBuckets, meta block.Metadata, stepIter block.StepIter, controller *transform.Controller, ) (block.Block, error) { - sanitizeBuckets(bucketedSeries) - - builder, err := setupBuilder(queryCtx, bucketedSeries, meta, stepIter, controller) + builder, err := setupBuilder(queryCtx, seriesBuckets, meta, stepIter, controller) if err != nil { return nil, err } @@ -317,9 +334,8 @@ func processValidQuantile( values := step.Values() bucketValues := make([]bucketValue, 0, initIndexBucketLength) - aggregatedValues := make([]float64, len(bucketedSeries)) - idx := 0 - for _, b := range bucketedSeries { + aggregatedValues := make([]float64, 0, len(seriesBuckets)) + for _, b := range seriesBuckets { buckets := b.buckets // clear previous bucket values. bucketValues = bucketValues[:0] @@ -336,8 +352,7 @@ func processValidQuantile( } } - aggregatedValues[idx] = bucketQuantile(q, bucketValues) - idx++ + aggregatedValues = append(aggregatedValues, bucketQuantile(q, bucketValues)) } if err := builder.AppendValues(index, aggregatedValues); err != nil { @@ -355,12 +370,12 @@ func processValidQuantile( func processInvalidQuantile( queryCtx *models.QueryContext, q float64, - bucketedSeries bucketedSeries, + seriesBuckets validSeriesBuckets, meta block.Metadata, stepIter block.StepIter, controller *transform.Controller, ) (block.Block, error) { - builder, err := setupBuilder(queryCtx, bucketedSeries, meta, stepIter, controller) + builder, err := setupBuilder(queryCtx, seriesBuckets, meta, stepIter, controller) if err != nil { return nil, err } @@ -373,7 +388,7 @@ func processInvalidQuantile( } setValue := math.Inf(sign) - outValues := make([]float64, len(bucketedSeries)) + outValues := make([]float64, len(seriesBuckets)) util.Memset(outValues, setValue) for index := 0; stepIter.Next(); index++ { if err := builder.AppendValues(index, outValues); err != nil { diff --git a/src/query/functions/linear/histogram_quantile_test.go b/src/query/functions/linear/histogram_quantile_test.go index 99a3e0695c..b328568210 100644 --- a/src/query/functions/linear/histogram_quantile_test.go +++ b/src/query/functions/linear/histogram_quantile_test.go @@ -92,7 +92,7 @@ func TestGatherSeriesToBuckets(t *testing.T) { }, } - assert.Equal(t, expected, actual) + assert.Equal(t, sanitizeBuckets(expected), actual) } func TestSanitizeBuckets(t *testing.T) { @@ -132,8 +132,8 @@ func TestSanitizeBuckets(t *testing.T) { }, } - actual := bucketedSeries{ - `{bar="baz"}`: indexedBuckets{ + expected := validSeriesBuckets{ + indexedBuckets{ buckets: []indexedBucket{ {upperBound: 1, idx: 0}, {upperBound: 2, idx: 3}, @@ -143,8 +143,7 @@ func TestSanitizeBuckets(t *testing.T) { }, } - sanitizeBuckets(bucketed) - assert.Equal(t, actual, bucketed) + assert.Equal(t, expected, sanitizeBuckets(bucketed)) } func TestBucketQuantile(t *testing.T) { @@ -295,3 +294,83 @@ func TestQuantileFunctionForInvalidQValues(t *testing.T) { actual = testQuantileFunctionWithQ(t, 0.8) test.EqualsWithNansWithDelta(t, [][]float64{{15.6, 20, math.NaN(), 2, math.NaN()}}, actual, 0.00001) } + +func testWithMultipleBuckets(t *testing.T, q float64) [][]float64 { + args := make([]interface{}, 0, 1) + args = append(args, q) + op, err := NewHistogramQuantileOp(args, HistogramQuantileType) + require.NoError(t, err) + + name := []byte("name") + bucket := []byte("bucket") + tagOpts := models.NewTagOptions(). + SetIDSchemeType(models.TypeQuoted). + SetMetricName(name). + SetBucketName(bucket) + + tags := models.NewTags(3, tagOpts).SetName([]byte("foo")).AddTag(models.Tag{ + Name: []byte("bar"), + Value: []byte("baz"), + }) + + tagsTwo := models.NewTags(3, tagOpts).SetName([]byte("qux")).AddTag(models.Tag{ + Name: []byte("quaz"), + Value: []byte("quail"), + }) + + seriesMetas := []block.SeriesMeta{ + {Tags: tags.Clone().SetBucket([]byte("1"))}, + {Tags: tags.Clone().SetBucket([]byte("2"))}, + {Tags: tags.Clone().SetBucket([]byte("5"))}, + {Tags: tags.Clone().SetBucket([]byte("10"))}, + {Tags: tags.Clone().SetBucket([]byte("20"))}, + {Tags: tags.Clone().SetBucket([]byte("Inf"))}, + {Tags: tagsTwo.Clone().SetBucket([]byte("1"))}, + {Tags: tagsTwo.Clone().SetBucket([]byte("2"))}, + {Tags: tagsTwo.Clone().SetBucket([]byte("5"))}, + {Tags: tagsTwo.Clone().SetBucket([]byte("10"))}, + {Tags: tagsTwo.Clone().SetBucket([]byte("20"))}, + {Tags: tagsTwo.Clone().SetBucket([]byte("Inf"))}, + } + + v := [][]float64{ + {1, 1, 11, math.NaN(), math.NaN()}, + {2, 2, 12, 13, math.NaN()}, + {5, 5, 15, math.NaN(), math.NaN()}, + {10, 10, 20, math.NaN(), math.NaN()}, + {15, 15, 25, math.NaN(), math.NaN()}, + {16, 19, math.NaN(), 71, 1}, + {21, 31, 411, math.NaN(), math.NaN()}, + {22, 32, 412, 513, math.NaN()}, + {25, 35, 415, math.NaN(), math.NaN()}, + {210, 310, 420, math.NaN(), math.NaN()}, + {215, 315, 425, math.NaN(), math.NaN()}, + {216, 319, math.NaN(), 571, 601}, + } + + bounds := models.Bounds{ + Start: time.Now(), + Duration: time.Minute * 5, + StepSize: time.Minute, + } + + bl := test.NewBlockFromValuesWithSeriesMeta(bounds, seriesMetas, v) + c, sink := executor.NewControllerWithSink(parser.NodeID(1)) + node := op.(histogramQuantileOp).Node(c, transform.Options{}) + err = node.Process(models.NoopQueryContext(), parser.NodeID(0), bl) + require.NoError(t, err) + + return sink.Values +} + +func TestQuantileFunctionForMultipleBuckets(t *testing.T) { + for i := 0; i < 100; i++ { + actual := testWithMultipleBuckets(t, 0.8) + expected := [][]float64{ + {15.6, 20, math.NaN(), 2, math.NaN()}, + {8.99459, 9.00363, math.NaN(), 1.78089, math.NaN()}, + } + + test.EqualsWithNansWithDelta(t, expected, actual, 0.00001) + } +}