From 97c290105a9791b1cf8fbde055831a530b2c42b9 Mon Sep 17 00:00:00 2001 From: Artem Nikolayevsky Date: Mon, 11 Feb 2019 16:32:37 -0500 Subject: [PATCH 1/5] [query] Add histogram quantile function --- src/cmd/services/m3query/config/config.go | 9 + .../services/m3query/config/config_test.go | 4 +- src/query/functions/aggregation/base_test.go | 22 +- .../functions/linear/histogram_quantile.go | 375 ++++++++++++++++++ .../linear/histogram_quantile_test.go | 142 +++++++ src/query/models/options.go | 20 +- src/query/models/options_test.go | 15 +- src/query/models/tags.go | 10 + src/query/models/tags_test.go | 38 ++ src/query/models/types.go | 8 +- src/query/parser/promql/parse_test.go | 2 + src/query/parser/promql/types.go | 4 + src/query/storage/converter.go | 41 +- src/query/storage/converter_test.go | 31 ++ src/query/storage/validator/storage.go | 3 + 15 files changed, 701 insertions(+), 23 deletions(-) create mode 100644 src/query/functions/linear/histogram_quantile.go create mode 100644 src/query/functions/linear/histogram_quantile_test.go diff --git a/src/cmd/services/m3query/config/config.go b/src/cmd/services/m3query/config/config.go index 20d8e48be4..caf051fd94 100644 --- a/src/cmd/services/m3query/config/config.go +++ b/src/cmd/services/m3query/config/config.go @@ -299,6 +299,10 @@ type TagOptionsConfiguration struct { // If not provided, defaults to `__name__`. MetricName string `yaml:"metricName"` + // BucketName specifies the tag name that corresponds to the metric's bucket. + // If not provided, defaults to `le`. + BucketName string `yaml:"bucketName"` + // Scheme determines the default ID generation scheme. Defaults to TypeLegacy. Scheme models.IDSchemeType `yaml:"idScheme"` } @@ -311,6 +315,11 @@ func TagOptionsFromConfig(cfg TagOptionsConfiguration) (models.TagOptions, error opts = opts.SetMetricName([]byte(name)) } + bucket := cfg.BucketName + if bucket != "" { + opts = opts.SetBucketName([]byte(bucket)) + } + if cfg.Scheme == models.TypeDefault { cfg.Scheme = models.TypeLegacy } diff --git a/src/cmd/services/m3query/config/config_test.go b/src/cmd/services/m3query/config/config_test.go index 061c76cb76..3ebae18410 100644 --- a/src/cmd/services/m3query/config/config_test.go +++ b/src/cmd/services/m3query/config/config_test.go @@ -103,15 +103,17 @@ func TestDefaultTagOptionsConfig(t *testing.T) { opts, err := TagOptionsFromConfig(cfg) require.NoError(t, err) assert.Equal(t, []byte("__name__"), opts.MetricName()) + assert.Equal(t, []byte("le"), opts.BucketName()) assert.Equal(t, models.TypeLegacy, opts.IDSchemeType()) } func TestTagOptionsConfig(t *testing.T) { var cfg TagOptionsConfiguration - config := "metricName: abcdefg\nidScheme: prepend_meta" + config := "metricName: abcdefg\nidScheme: prepend_meta\nbucketName: foo" require.NoError(t, yaml.Unmarshal([]byte(config), &cfg)) opts, err := TagOptionsFromConfig(cfg) require.NoError(t, err) assert.Equal(t, []byte("abcdefg"), opts.MetricName()) + assert.Equal(t, []byte("foo"), opts.BucketName()) assert.Equal(t, models.TypePrependMeta, opts.IDSchemeType()) } diff --git a/src/query/functions/aggregation/base_test.go b/src/query/functions/aggregation/base_test.go index 0af94abfb9..c2731cfae5 100644 --- a/src/query/functions/aggregation/base_test.go +++ b/src/query/functions/aggregation/base_test.go @@ -38,12 +38,12 @@ import ( var ( seriesMetas = []block.SeriesMeta{ - {Tags: test.StringTagsToTags(test.StringTags{{"a", "1"}, {"d", "4"}})}, - {Tags: test.StringTagsToTags(test.StringTags{{"a", "1"}, {"d", "4"}})}, - {Tags: test.StringTagsToTags(test.StringTags{{"a", "1"}, {"b", "2"}, {"d", "4"}})}, - {Tags: test.StringTagsToTags(test.StringTags{{"a", "2"}, {"b", "2"}, {"d", "4"}})}, - {Tags: test.StringTagsToTags(test.StringTags{{"b", "2"}, {"d", "4"}})}, - {Tags: test.StringTagsToTags(test.StringTags{{"c", "3"}, {"d", "4"}})}, + {Tags: test.StringTagsToTags(test.StringTags{{N: "a", V: "1"}, {N: "d", V: "4"}})}, + {Tags: test.StringTagsToTags(test.StringTags{{N: "a", V: "1"}, {N: "d", V: "4"}})}, + {Tags: test.StringTagsToTags(test.StringTags{{N: "a", V: "1"}, {N: "b", V: "2"}, {N: "d", V: "4"}})}, + {Tags: test.StringTagsToTags(test.StringTags{{N: "a", V: "2"}, {N: "b", V: "2"}, {N: "d", V: "4"}})}, + {Tags: test.StringTagsToTags(test.StringTags{{N: "b", V: "2"}, {N: "d", V: "4"}})}, + {Tags: test.StringTagsToTags(test.StringTags{{N: "c", V: "3"}, {N: "d", V: "4"}})}, } v = [][]float64{ {0, math.NaN(), 2, 3, 4}, @@ -168,11 +168,11 @@ func TestFunctionFilteringWithoutD(t *testing.T) { } expectedMetas := []block.SeriesMeta{ - {Name: typeBytes, Tags: test.StringTagsToTags(test.StringTags{{"a", "1"}})}, - {Name: typeBytes, Tags: test.StringTagsToTags(test.StringTags{{"a", "1"}, {"b", "2"}})}, - {Name: typeBytes, Tags: test.StringTagsToTags(test.StringTags{{"a", "2"}, {"b", "2"}})}, - {Name: typeBytes, Tags: test.StringTagsToTags(test.StringTags{{"b", "2"}})}, - {Name: typeBytes, Tags: test.StringTagsToTags(test.StringTags{{"c", "3"}})}, + {Name: typeBytes, Tags: test.StringTagsToTags(test.StringTags{{N: "a", V: "1"}})}, + {Name: typeBytes, Tags: test.StringTagsToTags(test.StringTags{{N: "a", V: "1"}, {N: "b", V: "2"}})}, + {Name: typeBytes, Tags: test.StringTagsToTags(test.StringTags{{N: "a", V: "2"}, {N: "b", V: "2"}})}, + {Name: typeBytes, Tags: test.StringTagsToTags(test.StringTags{{N: "b", V: "2"}})}, + {Name: typeBytes, Tags: test.StringTagsToTags(test.StringTags{{N: "c", V: "3"}})}, } expectedMetaTags := models.EmptyTags() diff --git a/src/query/functions/linear/histogram_quantile.go b/src/query/functions/linear/histogram_quantile.go new file mode 100644 index 0000000000..cc6d9f6ec8 --- /dev/null +++ b/src/query/functions/linear/histogram_quantile.go @@ -0,0 +1,375 @@ +// Copyright (c) 2019 Uber Technologies, Inc. +// +// Permission is hereby granted, free of charge, to any person obtaining a copy +// of this software and associated documentation files (the "Software"), to deal +// in the Software without restriction, including without limitation the rights +// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +// copies of the Software, and to permit persons to whom the Software is +// furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in +// all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +// THE SOFTWARE. + +package linear + +import ( + "fmt" + "math" + "sort" + "strconv" + + "github.com/m3db/m3/src/query/block" + "github.com/m3db/m3/src/query/executor/transform" + "github.com/m3db/m3/src/query/functions/utils" + "github.com/m3db/m3/src/query/models" + "github.com/m3db/m3/src/query/parser" + "github.com/m3db/m3/src/query/ts" +) + +const ( + // HistogramQuantileType calculates the quantile for histogram buckets. + // + // NB: each sample must contain a tag with a bucket name (given by tag + // options) that denotes the upper bound of that bucket; series without this + // tag are ignored. + HistogramQuantileType = "histogram_quantile" + initIndexBucketLength = 10 +) + +// NewHistogramQuantileOp creates a new histogram quantile operation. +func NewHistogramQuantileOp( + args []interface{}, + opType string, +) (parser.Params, error) { + if len(args) != 1 { + return emptyOp, fmt.Errorf("invalid number of args for clamp: %d", len(args)) + } + + if opType != HistogramQuantileType { + return emptyOp, fmt.Errorf("operator not supported: %s", opType) + } + + q, ok := args[0].(float64) + if !ok { + return emptyOp, fmt.Errorf("unable to cast to scalar argument: %v", args[0]) + } + + return newHistogramQuantileOp(q, opType), nil +} + +// histogramQuantileOp stores required properties for histogram quantile ops. +type histogramQuantileOp struct { + q float64 + opType string +} + +// OpType for the operator. +func (o histogramQuantileOp) OpType() string { + return o.opType +} + +// String representation. +func (o histogramQuantileOp) String() string { + return fmt.Sprintf("type: %s", o.OpType()) +} + +// Node creates an execution node. +func (o histogramQuantileOp) Node( + controller *transform.Controller, + _ transform.Options, +) transform.OpNode { + return &histogramQuantileNode{ + op: o, + controller: controller, + } +} + +func newHistogramQuantileOp( + q float64, + opType string, +) histogramQuantileOp { + return histogramQuantileOp{ + q: q, + opType: opType, + } +} + +type histogramQuantileNode struct { + op histogramQuantileOp + controller *transform.Controller +} + +type bucketValue struct { + upperBound float64 + value float64 +} + +type indexedBucket struct { + upperBound float64 + idx int +} + +type indexedBuckets struct { + buckets []indexedBucket + tags models.Tags +} + +func (b indexedBuckets) Len() int { return len(b.buckets) } +func (b indexedBuckets) Swap(i, j int) { + b.buckets[i], b.buckets[j] = b.buckets[j], b.buckets[i] +} +func (b indexedBuckets) Less(i, j int) bool { + return b.buckets[i].upperBound < b.buckets[j].upperBound +} + +type bucketedSeries map[string]indexedBuckets + +func gatherSeriesToBuckets(metas []block.SeriesMeta) bucketedSeries { + bucketsForID := make(bucketedSeries, initIndexBucketLength) + for i, meta := range metas { + tags := meta.Tags + value, found := tags.Bucket() + if !found { + // This series does not have a bucket tag; drop it from the output. + continue + } + + bound, err := strconv.ParseFloat(string(value), 64) + if err != nil { + // invalid bounds value for the bucket; drop it from the output. + continue + } + + excludeTags := [][]byte{tags.Opts.MetricName(), tags.Opts.BucketName()} + tagsWithoutKeys := tags.TagsWithoutKeys(excludeTags) + id := tagsWithoutKeys.ID() + if buckets, found := bucketsForID[string(id)]; !found { + // Add a single indexed bucket for this ID with the current index only. + newBuckets := make([]indexedBucket, 0, initIndexBucketLength) + newBucket := indexedBucket{ + upperBound: bound, + idx: i, + } + + newBuckets = append(newBuckets, newBucket) + bucketsForID[string(id)] = indexedBuckets{ + buckets: newBuckets, + tags: tagsWithoutKeys, + } + } else { + newBucket := indexedBucket{ + upperBound: bound, + idx: i, + } + + buckets.buckets = append(buckets.buckets, newBucket) + bucketsForID[string(id)] = buckets + } + } + + return bucketsForID +} + +// sanitize sorts the bucket maps by upper bound, dropping any series which +// have less than two buckets, or any that do not have an upper bound of +Inf +func sanitizeBuckets(bucketMap bucketedSeries) { + for k, buckets := range bucketMap { + if len(buckets.buckets) < 2 { + delete(bucketMap, k) + } + + sort.Sort(buckets) + maxBound := buckets.buckets[len(buckets.buckets)-1].upperBound + if !math.IsInf(maxBound, +1) { + delete(bucketMap, k) + } + } +} + +func bucketQuantile(q float64, buckets []bucketValue) float64 { + // NB: some valid buckets may have been purged if the values at the current + // step for that series are not present. + if len(buckets) < 2 { + return math.NaN() + } + + // NB: similar situation here if the max bound bucket does not have a value + // at this point, it is necessary to re-check. + if !math.IsInf(buckets[len(buckets)-1].upperBound, +1) { + return math.NaN() + } + + rank := q * buckets[len(buckets)-1].value + + bucketIndex := sort.Search(len(buckets)-1, func(i int) bool { + return buckets[i].value >= rank + }) + + if bucketIndex == len(buckets)-1 { + return buckets[len(buckets)-2].upperBound + } + + if bucketIndex == 0 && buckets[0].upperBound <= 0 { + return buckets[0].upperBound + } + + var ( + bucketStart float64 + bucketEnd = buckets[bucketIndex].upperBound + count = buckets[bucketIndex].value + ) + + if bucketIndex > 0 { + bucketStart = buckets[bucketIndex-1].upperBound + count -= buckets[bucketIndex-1].value + rank -= buckets[bucketIndex-1].value + } + + return bucketStart + (bucketEnd-bucketStart)*rank/count +} + +// Process the block +func (n *histogramQuantileNode) Process(ID parser.NodeID, b block.Block) error { + stepIter, err := b.StepIter() + if err != nil { + return err + } + + meta := stepIter.Meta() + seriesMetas := utils.FlattenMetadata(meta, stepIter.SeriesMeta()) + bucketedSeries := gatherSeriesToBuckets(seriesMetas) + + q := n.op.q + if q < 0 || q > 1 { + return processInvalidQuantile(q, bucketedSeries, meta, stepIter, n.controller) + } + + return processValidQuantile(q, bucketedSeries, meta, stepIter, n.controller) +} + +func setupBuilder( + bucketedSeries bucketedSeries, + meta block.Metadata, + stepIter block.StepIter, + controller *transform.Controller, +) (block.Builder, error) { + metas := make([]block.SeriesMeta, len(bucketedSeries)) + idx := 0 + for _, v := range bucketedSeries { + metas[idx] = block.SeriesMeta{ + Tags: v.tags, + } + + idx++ + } + + meta.Tags, metas = utils.DedupeMetadata(metas) + builder, err := controller.BlockBuilder(meta, metas) + if err != nil { + return nil, err + } + + if err = builder.AddCols(stepIter.StepCount()); err != nil { + return nil, err + } + + return builder, nil +} + +func processValidQuantile( + q float64, + bucketedSeries bucketedSeries, + meta block.Metadata, + stepIter block.StepIter, + controller *transform.Controller, +) error { + sanitizeBuckets(bucketedSeries) + + builder, err := setupBuilder(bucketedSeries, meta, stepIter, controller) + if err != nil { + return err + } + + for index := 0; stepIter.Next(); index++ { + step := stepIter.Current() + values := step.Values() + bucketValues := make([]bucketValue, 0, initIndexBucketLength) + + aggregatedValues := make([]float64, 0, len(bucketedSeries)) + for _, b := range bucketedSeries { + buckets := b.buckets + // clear previous bucket values. + bucketValues = bucketValues[:0] + for _, bucket := range buckets { + // Only add non-NaN values to contention for the calculation. + val := values[bucket.idx] + if !math.IsNaN(val) { + bucketValues = append( + bucketValues, bucketValue{ + upperBound: bucket.upperBound, + value: val, + }, + ) + } + } + + aggregatedValues = append(aggregatedValues, bucketQuantile(q, bucketValues)) + } + + builder.AppendValues(index, aggregatedValues) + } + + if err = stepIter.Err(); err != nil { + return err + } + + nextBlock := builder.Build() + defer nextBlock.Close() + return controller.Process(nextBlock) +} + +func processInvalidQuantile( + q float64, + bucketedSeries bucketedSeries, + meta block.Metadata, + stepIter block.StepIter, + controller *transform.Controller, +) error { + builder, err := setupBuilder(bucketedSeries, meta, stepIter, controller) + if err != nil { + return err + } + + // Set the values to an infinity of the appropriate sign; anything less than 0 + // becomes -Inf, anything greather than one becomes +Inf. + sign := 1 + if q < 0 { + sign = -1 + } + + setValue := math.Inf(sign) + outValues := make([]float64, len(bucketedSeries)) + ts.Memset(outValues, setValue) + + for index := 0; stepIter.Next(); index++ { + cloned := make([]float64, len(outValues)) + copy(cloned, outValues) + builder.AppendValues(index, cloned) + } + + if err = stepIter.Err(); err != nil { + return err + } + + nextBlock := builder.Build() + defer nextBlock.Close() + return controller.Process(nextBlock) +} diff --git a/src/query/functions/linear/histogram_quantile_test.go b/src/query/functions/linear/histogram_quantile_test.go new file mode 100644 index 0000000000..26640267f8 --- /dev/null +++ b/src/query/functions/linear/histogram_quantile_test.go @@ -0,0 +1,142 @@ +// Copyright (c) 2019 Uber Technologies, Inc. +// +// Permission is hereby granted, free of charge, to any person obtaining a copy +// of this software and associated documentation files (the "Software"), to deal +// in the Software without restriction, including without limitation the rights +// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +// copies of the Software, and to permit persons to whom the Software is +// furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in +// all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +// THE SOFTWARE. + +package linear + +import ( + "math" + "testing" + + "github.com/m3db/m3/src/query/block" + "github.com/m3db/m3/src/query/models" + + "github.com/stretchr/testify/assert" +) + +func TestGatherSeriesToBuckets(t *testing.T) { + name := []byte("name") + bucket := []byte("bucket") + tagOpts := models.NewTagOptions(). + SetIDSchemeType(models.TypeQuoted). + SetMetricName(name). + SetBucketName(bucket) + + tags := models.NewTags(3, tagOpts).SetName([]byte("foo")).AddTag(models.Tag{ + Name: []byte("bar"), + Value: []byte("baz"), + }) + + noBucketMeta := block.SeriesMeta{Tags: tags} + invalidBucketMeta := block.SeriesMeta{Tags: tags.Clone().SetBucket([]byte("string"))} + validMeta := block.SeriesMeta{Tags: tags.Clone().SetBucket([]byte("0.1"))} + validMeta2 := block.SeriesMeta{Tags: tags.Clone().SetBucket([]byte("0.1"))} + validMeta3 := block.SeriesMeta{Tags: tags.Clone().SetBucket([]byte("10"))} + infMeta := block.SeriesMeta{Tags: tags.Clone().SetBucket([]byte("Inf"))} + validMetaMoreTags := block.SeriesMeta{Tags: tags.Clone().SetBucket([]byte("0.1")).AddTag(models.Tag{ + Name: []byte("qux"), + Value: []byte("qar"), + })} + + metas := []block.SeriesMeta{ + validMeta, noBucketMeta, invalidBucketMeta, validMeta2, validMetaMoreTags, validMeta3, infMeta, + } + + actual := gatherSeriesToBuckets(metas) + expected := bucketedSeries{ + `{bar="baz"}`: indexedBuckets{ + buckets: []indexedBucket{ + {upperBound: 0.1, idx: 0}, + {upperBound: 0.1, idx: 3}, + {upperBound: 10, idx: 5}, + {upperBound: math.Inf(1), idx: 6}, + }, + tags: models.NewTags(1, tagOpts).AddTag(models.Tag{ + Name: []byte("bar"), + Value: []byte("baz"), + }), + }, + `{bar="baz",qux="qar"}`: indexedBuckets{ + buckets: []indexedBucket{ + {upperBound: 0.1, idx: 4}, + }, + tags: models.NewTags(1, tagOpts).AddTag(models.Tag{ + Name: []byte("bar"), + Value: []byte("baz"), + }).AddTag(models.Tag{ + Name: []byte("qux"), + Value: []byte("qar"), + }), + }, + } + + assert.Equal(t, expected, actual) +} + +func TestSanitizeBuckets(t *testing.T) { + bucketed := bucketedSeries{ + `{bar="baz"}`: indexedBuckets{ + buckets: []indexedBucket{ + {upperBound: 10, idx: 5}, + {upperBound: math.Inf(1), idx: 6}, + {upperBound: 1, idx: 0}, + {upperBound: 2, idx: 3}, + }, + }, + `{with="neginf"}`: indexedBuckets{ + buckets: []indexedBucket{ + {upperBound: 10, idx: 5}, + {upperBound: math.Inf(-1), idx: 6}, + {upperBound: 1, idx: 0}, + {upperBound: 2, idx: 3}, + }, + }, + `{no="infinity"}`: indexedBuckets{ + buckets: []indexedBucket{ + {upperBound: 0.1, idx: 4}, + {upperBound: 0.2, idx: 14}, + {upperBound: 0.3, idx: 114}, + }, + }, + `{just="infinity"}`: indexedBuckets{ + buckets: []indexedBucket{ + {upperBound: math.Inf(1), idx: 4}, + }, + }, + `{just="neg-infinity"}`: indexedBuckets{ + buckets: []indexedBucket{ + {upperBound: math.Inf(-1), idx: 4}, + }, + }, + } + + actual := bucketedSeries{ + `{bar="baz"}`: indexedBuckets{ + buckets: []indexedBucket{ + {upperBound: 1, idx: 0}, + {upperBound: 2, idx: 3}, + {upperBound: 10, idx: 5}, + {upperBound: math.Inf(1), idx: 6}, + }, + }, + } + + sanitizeBuckets(bucketed) + assert.Equal(t, actual, bucketed) +} diff --git a/src/query/models/options.go b/src/query/models/options.go index 43dc923af7..f0617e0eb2 100644 --- a/src/query/models/options.go +++ b/src/query/models/options.go @@ -26,13 +26,16 @@ import ( var ( defaultMetricName = []byte("__name__") + defaultBucketName = []byte("le") - errNoName = errors.New("metric name is missing or empty") + errNoName = errors.New("metric name is missing or empty") + errNoBucket = errors.New("bucket name is missing or empty") ) type tagOptions struct { version int idScheme IDSchemeType + bucketName []byte metricName []byte } @@ -41,6 +44,7 @@ func NewTagOptions() TagOptions { return &tagOptions{ version: 0, metricName: defaultMetricName, + bucketName: defaultBucketName, idScheme: TypeLegacy, } } @@ -50,6 +54,10 @@ func (o *tagOptions) Validate() error { return errNoName } + if o.bucketName == nil || len(o.bucketName) == 0 { + return errNoBucket + } + return o.idScheme.Validate() } @@ -63,6 +71,16 @@ func (o *tagOptions) MetricName() []byte { return o.metricName } +func (o *tagOptions) SetBucketName(bucketName []byte) TagOptions { + opts := *o + opts.bucketName = bucketName + return &opts +} + +func (o *tagOptions) BucketName() []byte { + return o.bucketName +} + func (o *tagOptions) SetIDSchemeType(scheme IDSchemeType) TagOptions { opts := *o opts.idScheme = scheme diff --git a/src/query/models/options_test.go b/src/query/models/options_test.go index ef33719448..de27b54fda 100644 --- a/src/query/models/options_test.go +++ b/src/query/models/options_test.go @@ -36,10 +36,12 @@ func TestDefaultTagOptions(t *testing.T) { func TestValidTagOptions(t *testing.T) { opts := NewTagOptions(). SetIDSchemeType(TypePrependMeta). - SetMetricName([]byte("name")) + SetMetricName([]byte("name")). + SetBucketName([]byte("bucket")) assert.NoError(t, opts.Validate()) assert.Equal(t, []byte("name"), opts.MetricName()) + assert.Equal(t, []byte("bucket"), opts.BucketName()) assert.Equal(t, TypePrependMeta, opts.IDSchemeType()) } @@ -54,6 +56,17 @@ func TestBadNameTagOptions(t *testing.T) { assert.EqualError(t, opts.Validate(), msg) } +func TestBadBucketTagOptions(t *testing.T) { + msg := errNoBucket.Error() + opts := NewTagOptions(). + SetBucketName(nil) + assert.EqualError(t, opts.Validate(), msg) + + opts = NewTagOptions(). + SetBucketName([]byte{}) + assert.EqualError(t, opts.Validate(), msg) +} + func TestBadSchemeTagOptions(t *testing.T) { msg := "invalid config id schema type 'unknown': should be one of" + " [legacy quoted prepend_meta graphite]" diff --git a/src/query/models/tags.go b/src/query/models/tags.go index 65e73060fa..a32348fe58 100644 --- a/src/query/models/tags.go +++ b/src/query/models/tags.go @@ -346,6 +346,16 @@ func (t Tags) Name() ([]byte, bool) { return t.Get(t.Opts.MetricName()) } +// SetBucket sets the bucket tag value. +func (t Tags) SetBucket(value []byte) Tags { + return t.AddOrUpdateTag(Tag{Name: t.Opts.BucketName(), Value: value}) +} + +// Bucket gets the bucket tag value. +func (t Tags) Bucket() ([]byte, bool) { + return t.Get(t.Opts.BucketName()) +} + // AddTags is used to add a list of tags and maintain sorted order. func (t Tags) AddTags(tags []Tag) Tags { t.Tags = append(t.Tags, tags...) diff --git a/src/query/models/tags_test.go b/src/query/models/tags_test.go index 57b35244ac..4a6080930f 100644 --- a/src/query/models/tags_test.go +++ b/src/query/models/tags_test.go @@ -226,18 +226,56 @@ func TestUpdateName(t *testing.T) { actual, found := tags.Get(name) assert.False(t, found) assert.Nil(t, actual) + actual, found = tags.Name() + assert.False(t, found) + assert.Nil(t, actual) value := []byte("n") tags = tags.SetName(value) actual, found = tags.Get(name) assert.True(t, found) assert.Equal(t, value, actual) + actual, found = tags.Name() + assert.True(t, found) + assert.Equal(t, value, actual) value2 := []byte("abc") tags = tags.SetName(value2) actual, found = tags.Get(name) assert.True(t, found) assert.Equal(t, value2, actual) + actual, found = tags.Name() + assert.True(t, found) + assert.Equal(t, value2, actual) +} + +func TestUpdateBucket(t *testing.T) { + name := []byte("!") + tags := NewTags(1, NewTagOptions().SetBucketName(name)) + actual, found := tags.Get(name) + assert.False(t, found) + assert.Nil(t, actual) + actual, found = tags.Bucket() + assert.False(t, found) + assert.Nil(t, actual) + + value := []byte("n") + tags = tags.SetBucket(value) + actual, found = tags.Get(name) + assert.True(t, found) + assert.Equal(t, value, actual) + actual, found = tags.Bucket() + assert.True(t, found) + assert.Equal(t, value, actual) + + value2 := []byte("abc") + tags = tags.SetBucket(value2) + actual, found = tags.Get(name) + assert.True(t, found) + assert.Equal(t, value2, actual) + actual, found = tags.Bucket() + assert.True(t, found) + assert.Equal(t, value2, actual) } func TestAddOrUpdateTags(t *testing.T) { diff --git a/src/query/models/types.go b/src/query/models/types.go index 9031a130f8..d85361713c 100644 --- a/src/query/models/types.go +++ b/src/query/models/types.go @@ -78,10 +78,14 @@ const ( type TagOptions interface { // Validate validates these tag options. Validate() error - // SetMetricName sets the name for the `metric name` metric. + // SetMetricName sets the name for the `metric name` tag. SetMetricName(metricName []byte) TagOptions - // MetricName gets the name for the metric name `metric`. + // MetricName gets the name for the `metric name` tag. MetricName() []byte + // SetBucketName sets the name for the `bucket label` tag. + SetBucketName(metricName []byte) TagOptions + // BucketName gets the name for the `bucket label` tag. + BucketName() []byte // SetIDSchemeType sets the ID generation scheme type. SetIDSchemeType(scheme IDSchemeType) TagOptions // IDSchemeType gets the ID generation scheme type. diff --git a/src/query/parser/promql/parse_test.go b/src/query/parser/promql/parse_test.go index f8aa574a47..79f34f4b8e 100644 --- a/src/query/parser/promql/parse_test.go +++ b/src/query/parser/promql/parse_test.go @@ -129,6 +129,8 @@ var linearParseTests = []struct { {"minute(up)", linear.MinuteType}, {"month(up)", linear.MonthType}, {"year(up)", linear.YearType}, + + {"histogram_quantile(1,up)", linear.HistogramQuantileType}, } func TestLinearParses(t *testing.T) { diff --git a/src/query/parser/promql/types.go b/src/query/parser/promql/types.go index f67e63dacd..f7bd292dd2 100644 --- a/src/query/parser/promql/types.go +++ b/src/query/parser/promql/types.go @@ -189,6 +189,10 @@ func NewFunctionExpr( p, err = linear.NewClampOp(argValues, name) return p, true, err + case linear.HistogramQuantileType: + p, err = linear.NewHistogramQuantileOp(argValues, name) + return p, true, err + case linear.RoundType: p, err = linear.NewRoundOp(argValues) return p, true, err diff --git a/src/query/storage/converter.go b/src/query/storage/converter.go index 06b118e670..5cf11d0f80 100644 --- a/src/query/storage/converter.go +++ b/src/query/storage/converter.go @@ -23,6 +23,7 @@ package storage import ( "bytes" "fmt" + "sort" "sync" "time" @@ -55,10 +56,11 @@ func PromWriteTSToM3( } } -// The default name for the name tag in Prometheus metrics. -// This can be overwritten by setting tagOptions in the config +// The default name for the name and bucket tags in Prometheus metrics. +// This can be overwritten by setting tagOptions in the config. var ( - promDefaultName = []byte("__name__") + promDefaultName = []byte("__name__") + promDefaultBucketName = []byte("le") ) // PromLabelsToM3Tags converts Prometheus labels to M3 tags @@ -69,13 +71,16 @@ func PromLabelsToM3Tags( tags := models.NewTags(len(labels), tagOptions) tagList := make([]models.Tag, 0, len(labels)) for _, label := range labels { - // If this label corresponds to the Prometheus name, + name := label.Name + // If this label corresponds to the Prometheus name or bucket name, // instead set it as the given name tag from the config file. - if bytes.Equal(promDefaultName, label.Name) { + if bytes.Equal(promDefaultName, name) { tags = tags.SetName(label.Value) + } else if bytes.Equal(promDefaultBucketName, name) { + tags = tags.SetBucket(label.Value) } else { tagList = append(tagList, models.Tag{ - Name: label.Name, + Name: name, Value: label.Value, }) } @@ -192,6 +197,14 @@ func SeriesToPromTS(series *ts.Series) prompb.TimeSeries { return prompb.TimeSeries{Labels: labels, Samples: samples} } +type sortableLabels []prompb.Label + +func (t sortableLabels) Len() int { return len(t) } +func (t sortableLabels) Swap(i, j int) { t[i], t[j] = t[j], t[i] } +func (t sortableLabels) Less(i, j int) bool { + return bytes.Compare(t[i].Name, t[j].Name) == -1 +} + // TagsToPromLabels converts tags to prometheus labels. func TagsToPromLabels(tags models.Tags) []*prompb.Label { // Perform bulk allocation upfront then convert to pointers afterwards @@ -199,10 +212,24 @@ func TagsToPromLabels(tags models.Tags) []*prompb.Label { // if modifying. l := tags.Len() labels := make([]prompb.Label, 0, l) + + metricName := tags.Opts.MetricName() + bucketName := tags.Opts.BucketName() for _, t := range tags.Tags { - labels = append(labels, prompb.Label{Name: t.Name, Value: t.Value}) + if bytes.Equal(t.Name, metricName) { + labels = append(labels, + prompb.Label{Name: promDefaultName, Value: t.Value}) + } else if bytes.Equal(t.Name, bucketName) { + labels = append(labels, + prompb.Label{Name: promDefaultBucketName, Value: t.Value}) + } else { + labels = append(labels, prompb.Label{Name: t.Name, Value: t.Value}) + } } + // Sort here since name and label may be added in a different order in tags + // if default metric name or bucket names are overridden. + sort.Sort(sortableLabels(labels)) labelsPointers := make([]*prompb.Label, 0, l) for i := range labels { labelsPointers = append(labelsPointers, &labels[i]) diff --git a/src/query/storage/converter_test.go b/src/query/storage/converter_test.go index 97cb7799c4..72da7001e6 100644 --- a/src/query/storage/converter_test.go +++ b/src/query/storage/converter_test.go @@ -40,6 +40,37 @@ import ( "github.com/stretchr/testify/require" ) +func TestLabelConversion(t *testing.T) { + // NB: sorted order (__name__, foo, le) + labels := []*prompb.Label{ + &prompb.Label{Name: promDefaultName, Value: []byte("name-val")}, + &prompb.Label{Name: []byte("foo"), Value: []byte("bar")}, + &prompb.Label{Name: promDefaultBucketName, Value: []byte("bucket-val")}, + } + + opts := models.NewTagOptions(). + SetMetricName([]byte("name")). + SetBucketName([]byte("bucket")) + + tags := PromLabelsToM3Tags(labels, opts) + name, found := tags.Name() + assert.True(t, found) + assert.Equal(t, []byte("name-val"), name) + name, found = tags.Get([]byte("name")) + assert.True(t, found) + assert.Equal(t, []byte("name-val"), name) + + bucket, found := tags.Bucket() + assert.True(t, found) + assert.Equal(t, []byte("bucket-val"), bucket) + bucket, found = tags.Get([]byte("bucket")) + assert.True(t, found) + assert.Equal(t, []byte("bucket-val"), bucket) + + reverted := TagsToPromLabels(tags) + assert.Equal(t, labels, reverted) +} + func verifyExpandSeries(t *testing.T, ctrl *gomock.Controller, num int, pools xsync.PooledWorkerPool) { testTags := seriesiter.GenerateTag() iters := seriesiter.NewMockSeriesIters(ctrl, testTags, num, 2) diff --git a/src/query/storage/validator/storage.go b/src/query/storage/validator/storage.go index d994e058db..d934ed76b7 100644 --- a/src/query/storage/validator/storage.go +++ b/src/query/storage/validator/storage.go @@ -117,10 +117,13 @@ func PromResultToSeriesList(promReadResp prometheus.PromResp, tagOptions models. } metricName := string(tagOptions.MetricName()) + bucketName := string(tagOptions.BucketName()) tags := models.NewTags(len(result.Metric), tagOptions) for name, val := range result.Metric { if name == metricName { tags = tags.SetName([]byte(val)) + } else if name == bucketName { + tags = tags.SetBucket([]byte(val)) } else { tags = tags.AddTag(models.Tag{ Name: []byte(name), From 1001948c72bfabad5aa878cf6c9f73b82cad2943 Mon Sep 17 00:00:00 2001 From: Artem Nikolayevsky Date: Tue, 12 Feb 2019 13:39:22 -0500 Subject: [PATCH 2/5] Additional test for the entire operation. --- .../linear/histogram_quantile_test.go | 141 ++++++++++++++++++ 1 file changed, 141 insertions(+) diff --git a/src/query/functions/linear/histogram_quantile_test.go b/src/query/functions/linear/histogram_quantile_test.go index 26640267f8..a4bb994500 100644 --- a/src/query/functions/linear/histogram_quantile_test.go +++ b/src/query/functions/linear/histogram_quantile_test.go @@ -23,11 +23,17 @@ package linear import ( "math" "testing" + "time" "github.com/m3db/m3/src/query/block" + "github.com/m3db/m3/src/query/executor/transform" "github.com/m3db/m3/src/query/models" + "github.com/m3db/m3/src/query/parser" + "github.com/m3db/m3/src/query/test" + "github.com/m3db/m3/src/query/test/executor" "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" ) func TestGatherSeriesToBuckets(t *testing.T) { @@ -140,3 +146,138 @@ func TestSanitizeBuckets(t *testing.T) { sanitizeBuckets(bucketed) assert.Equal(t, actual, bucketed) } + +func TestBucketQuantile(t *testing.T) { + // single bucket returns nan + actual := bucketQuantile(0.5, []bucketValue{{upperBound: 1, value: 1}}) + assert.True(t, math.IsNaN(actual)) + + // bucket with no infinity returns nan + actual = bucketQuantile(0.5, []bucketValue{ + {upperBound: 1, value: 1}, + {upperBound: 2, value: 2}, + }) + assert.True(t, math.IsNaN(actual)) + + actual = bucketQuantile(0.5, []bucketValue{ + {upperBound: 1, value: 1}, + {upperBound: math.Inf(1), value: 22}, + }) + assert.Equal(t, float64(1), actual) + + // NB: tested against Prom + buckets := []bucketValue{ + {upperBound: 1, value: 1}, + {upperBound: 2, value: 2}, + {upperBound: 5, value: 5}, + {upperBound: 10, value: 10}, + {upperBound: 20, value: 15}, + {upperBound: math.Inf(1), value: 16}, + } + + actual = bucketQuantile(0, buckets) + assert.InDelta(t, float64(0), actual, 0.0001) + + actual = bucketQuantile(0.15, buckets) + assert.InDelta(t, 2.4, actual, 0.0001) + + actual = bucketQuantile(0.2, buckets) + assert.InDelta(t, float64(3.2), actual, 0.0001) + + actual = bucketQuantile(0.5, buckets) + assert.InDelta(t, float64(8), actual, 0.0001) + + actual = bucketQuantile(0.8, buckets) + assert.InDelta(t, float64(15.6), actual, 0.0001) + + actual = bucketQuantile(1, buckets) + assert.InDelta(t, float64(20), actual, 0.0001) +} + +func TestNewOp(t *testing.T) { + args := make([]interface{}, 0, 1) + _, err := NewHistogramQuantileOp(args, HistogramQuantileType) + assert.Error(t, err) + + args = append(args, "invalid") + _, err = NewHistogramQuantileOp(args, HistogramQuantileType) + assert.Error(t, err) + + args[0] = 2.0 + _, err = NewHistogramQuantileOp(args, ClampMaxType) + assert.Error(t, err) + + op, err := NewHistogramQuantileOp(args, HistogramQuantileType) + assert.NoError(t, err) + + assert.Equal(t, HistogramQuantileType, op.OpType()) + assert.Equal(t, "type: histogram_quantile", op.String()) +} + +func testQuantileFunctionWithQ(t *testing.T, q float64) [][]float64 { + args := make([]interface{}, 0, 1) + args = append(args, q) + op, err := NewHistogramQuantileOp(args, HistogramQuantileType) + require.NoError(t, err) + + name := []byte("name") + bucket := []byte("bucket") + tagOpts := models.NewTagOptions(). + SetIDSchemeType(models.TypeQuoted). + SetMetricName(name). + SetBucketName(bucket) + + tags := models.NewTags(3, tagOpts).SetName([]byte("foo")).AddTag(models.Tag{ + Name: []byte("bar"), + Value: []byte("baz"), + }) + + seriesMetas := []block.SeriesMeta{ + {Tags: tags.Clone().SetBucket([]byte("1"))}, + {Tags: tags.Clone().SetBucket([]byte("2"))}, + {Tags: tags.Clone().SetBucket([]byte("5"))}, + {Tags: tags.Clone().SetBucket([]byte("10"))}, + {Tags: tags.Clone().SetBucket([]byte("20"))}, + {Tags: tags.Clone().SetBucket([]byte("Inf"))}, + // this series should not be part of the output, since it has no bucket tag. + {Tags: tags.Clone()}, + } + + v := [][]float64{ + {1, 1, 11, 12, 1}, + {2, 2, 12, 13, 2}, + {5, 5, 15, 40, 5}, + {10, 10, 20, 50, 10}, + {15, 15, 25, 70, 15}, + {16, 19, 26, 71, 1}, + } + + bounds := models.Bounds{ + Start: time.Now(), + Duration: time.Minute * 5, + StepSize: time.Minute, + } + + bl := test.NewBlockFromValuesWithSeriesMeta(bounds, seriesMetas, v) + c, sink := executor.NewControllerWithSink(parser.NodeID(1)) + node := op.(histogramQuantileOp).Node(c, transform.Options{}) + err = node.Process(parser.NodeID(0), bl) + require.NoError(t, err) + + return sink.Values +} + +var ( + inf = math.Inf(+1) + ninf = math.Inf(-1) +) + +func TestQuantileFunctionFilteringWithoutA(t *testing.T) { + actual := testQuantileFunctionWithQ(t, -1) + assert.Equal(t, [][]float64{{ninf, ninf, ninf, ninf, ninf}}, actual) + actual = testQuantileFunctionWithQ(t, 1.1) + assert.Equal(t, [][]float64{{inf, inf, inf, inf, inf}}, actual) + + actual = testQuantileFunctionWithQ(t, 0.8) + test.EqualsWithNansWithDelta(t, [][]float64{{15.6, 20, 11.6, 13.4, 0.8}}, actual, 0.00001) +} From 8865edbcf99c4fd431d0162a0cbf0254a8f7c7fc Mon Sep 17 00:00:00 2001 From: Artem Nikolayevsky Date: Tue, 12 Feb 2019 14:28:34 -0500 Subject: [PATCH 3/5] Fix to a couple of nits. --- src/query/functions/linear/histogram_quantile.go | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/src/query/functions/linear/histogram_quantile.go b/src/query/functions/linear/histogram_quantile.go index cc6d9f6ec8..7f7180ca1b 100644 --- a/src/query/functions/linear/histogram_quantile.go +++ b/src/query/functions/linear/histogram_quantile.go @@ -50,7 +50,8 @@ func NewHistogramQuantileOp( opType string, ) (parser.Params, error) { if len(args) != 1 { - return emptyOp, fmt.Errorf("invalid number of args for clamp: %d", len(args)) + return emptyOp, fmt.Errorf( + "invalid number of args for histogram_quantile: %d", len(args)) } if opType != HistogramQuantileType { @@ -188,7 +189,7 @@ func sanitizeBuckets(bucketMap bucketedSeries) { sort.Sort(buckets) maxBound := buckets.buckets[len(buckets.buckets)-1].upperBound - if !math.IsInf(maxBound, +1) { + if !math.IsInf(maxBound, 1) { delete(bucketMap, k) } } @@ -203,7 +204,7 @@ func bucketQuantile(q float64, buckets []bucketValue) float64 { // NB: similar situation here if the max bound bucket does not have a value // at this point, it is necessary to re-check. - if !math.IsInf(buckets[len(buckets)-1].upperBound, +1) { + if !math.IsInf(buckets[len(buckets)-1].upperBound, 1) { return math.NaN() } From 7c39eb434ba14df240d9d8f5d534661e52998a21 Mon Sep 17 00:00:00 2001 From: Artem Nikolayevsky Date: Wed, 13 Feb 2019 16:19:23 -0500 Subject: [PATCH 4/5] PR response. --- src/query/functions/linear/histogram_quantile.go | 11 +++++------ src/query/functions/linear/histogram_quantile_test.go | 10 +++++++++- src/query/models/tags_test.go | 9 +++++++++ 3 files changed, 23 insertions(+), 7 deletions(-) diff --git a/src/query/functions/linear/histogram_quantile.go b/src/query/functions/linear/histogram_quantile.go index 7f7180ca1b..1d5fb4463e 100644 --- a/src/query/functions/linear/histogram_quantile.go +++ b/src/query/functions/linear/histogram_quantile.go @@ -304,7 +304,8 @@ func processValidQuantile( values := step.Values() bucketValues := make([]bucketValue, 0, initIndexBucketLength) - aggregatedValues := make([]float64, 0, len(bucketedSeries)) + aggregatedValues := make([]float64, len(bucketedSeries)) + idx := 0 for _, b := range bucketedSeries { buckets := b.buckets // clear previous bucket values. @@ -322,7 +323,8 @@ func processValidQuantile( } } - aggregatedValues = append(aggregatedValues, bucketQuantile(q, bucketValues)) + aggregatedValues[idx] = bucketQuantile(q, bucketValues) + idx++ } builder.AppendValues(index, aggregatedValues) @@ -359,11 +361,8 @@ func processInvalidQuantile( setValue := math.Inf(sign) outValues := make([]float64, len(bucketedSeries)) ts.Memset(outValues, setValue) - for index := 0; stepIter.Next(); index++ { - cloned := make([]float64, len(outValues)) - copy(cloned, outValues) - builder.AppendValues(index, cloned) + builder.AppendValues(index, outValues) } if err = stepIter.Err(); err != nil { diff --git a/src/query/functions/linear/histogram_quantile_test.go b/src/query/functions/linear/histogram_quantile_test.go index a4bb994500..4d55c026e8 100644 --- a/src/query/functions/linear/histogram_quantile_test.go +++ b/src/query/functions/linear/histogram_quantile_test.go @@ -159,6 +159,14 @@ func TestBucketQuantile(t *testing.T) { }) assert.True(t, math.IsNaN(actual)) + // bucket with negative infinity bound returns nan + actual = bucketQuantile(0.5, []bucketValue{ + {upperBound: 1, value: 1}, + {upperBound: 2, value: 2}, + {upperBound: math.Inf(-1), value: 22}, + }) + assert.True(t, math.IsNaN(actual)) + actual = bucketQuantile(0.5, []bucketValue{ {upperBound: 1, value: 1}, {upperBound: math.Inf(1), value: 22}, @@ -272,7 +280,7 @@ var ( ninf = math.Inf(-1) ) -func TestQuantileFunctionFilteringWithoutA(t *testing.T) { +func TestQuantileFunctionForInvalidQValues(t *testing.T) { actual := testQuantileFunctionWithQ(t, -1) assert.Equal(t, [][]float64{{ninf, ninf, ninf, ninf, ninf}}, actual) actual = testQuantileFunctionWithQ(t, 1.1) diff --git a/src/query/models/tags_test.go b/src/query/models/tags_test.go index 4a6080930f..56594f8008 100644 --- a/src/query/models/tags_test.go +++ b/src/query/models/tags_test.go @@ -276,6 +276,15 @@ func TestUpdateBucket(t *testing.T) { actual, found = tags.Bucket() assert.True(t, found) assert.Equal(t, value2, actual) + + value3 := []byte("") + tags = tags.SetBucket(value3) + actual, found = tags.Get(name) + assert.True(t, found) + assert.Equal(t, value3, actual) + actual, found = tags.Bucket() + assert.True(t, found) + assert.Equal(t, value3, actual) } func TestAddOrUpdateTags(t *testing.T) { From f6df282b2d3a9a17bb15e47155e7a18d84df037b Mon Sep 17 00:00:00 2001 From: Artem Nikolayevsky Date: Thu, 14 Feb 2019 12:30:48 -0500 Subject: [PATCH 5/5] PR response --- .../functions/linear/histogram_quantile.go | 15 +++++--------- .../linear/histogram_quantile_test.go | 20 ++++++++++++------- 2 files changed, 18 insertions(+), 17 deletions(-) diff --git a/src/query/functions/linear/histogram_quantile.go b/src/query/functions/linear/histogram_quantile.go index 1d5fb4463e..377c9365fc 100644 --- a/src/query/functions/linear/histogram_quantile.go +++ b/src/query/functions/linear/histogram_quantile.go @@ -152,25 +152,20 @@ func gatherSeriesToBuckets(metas []block.SeriesMeta) bucketedSeries { excludeTags := [][]byte{tags.Opts.MetricName(), tags.Opts.BucketName()} tagsWithoutKeys := tags.TagsWithoutKeys(excludeTags) id := tagsWithoutKeys.ID() + newBucket := indexedBucket{ + upperBound: bound, + idx: i, + } + if buckets, found := bucketsForID[string(id)]; !found { // Add a single indexed bucket for this ID with the current index only. newBuckets := make([]indexedBucket, 0, initIndexBucketLength) - newBucket := indexedBucket{ - upperBound: bound, - idx: i, - } - newBuckets = append(newBuckets, newBucket) bucketsForID[string(id)] = indexedBuckets{ buckets: newBuckets, tags: tagsWithoutKeys, } } else { - newBucket := indexedBucket{ - upperBound: bound, - idx: i, - } - buckets.buckets = append(buckets.buckets, newBucket) bucketsForID[string(id)] = buckets } diff --git a/src/query/functions/linear/histogram_quantile_test.go b/src/query/functions/linear/histogram_quantile_test.go index 4d55c026e8..714623b3cf 100644 --- a/src/query/functions/linear/histogram_quantile_test.go +++ b/src/query/functions/linear/histogram_quantile_test.go @@ -173,6 +173,12 @@ func TestBucketQuantile(t *testing.T) { }) assert.Equal(t, float64(1), actual) + actual = bucketQuantile(0.8, []bucketValue{ + {upperBound: 2, value: 13}, + {upperBound: math.Inf(1), value: 71}, + }) + assert.Equal(t, float64(2), actual) + // NB: tested against Prom buckets := []bucketValue{ {upperBound: 1, value: 1}, @@ -252,12 +258,12 @@ func testQuantileFunctionWithQ(t *testing.T, q float64) [][]float64 { } v := [][]float64{ - {1, 1, 11, 12, 1}, - {2, 2, 12, 13, 2}, - {5, 5, 15, 40, 5}, - {10, 10, 20, 50, 10}, - {15, 15, 25, 70, 15}, - {16, 19, 26, 71, 1}, + {1, 1, 11, math.NaN(), math.NaN()}, + {2, 2, 12, 13, math.NaN()}, + {5, 5, 15, math.NaN(), math.NaN()}, + {10, 10, 20, math.NaN(), math.NaN()}, + {15, 15, 25, math.NaN(), math.NaN()}, + {16, 19, math.NaN(), 71, 1}, } bounds := models.Bounds{ @@ -287,5 +293,5 @@ func TestQuantileFunctionForInvalidQValues(t *testing.T) { assert.Equal(t, [][]float64{{inf, inf, inf, inf, inf}}, actual) actual = testQuantileFunctionWithQ(t, 0.8) - test.EqualsWithNansWithDelta(t, [][]float64{{15.6, 20, 11.6, 13.4, 0.8}}, actual, 0.00001) + test.EqualsWithNansWithDelta(t, [][]float64{{15.6, 20, math.NaN(), 2, math.NaN()}}, actual, 0.00001) }