Skip to content

Commit

Permalink
[query] Fix histogram grouping bug (#2247)
Browse files Browse the repository at this point in the history
  • Loading branch information
arnikola authored Apr 3, 2020
1 parent c756f62 commit 934a636
Show file tree
Hide file tree
Showing 2 changed files with 131 additions and 37 deletions.
79 changes: 47 additions & 32 deletions src/query/functions/linear/histogram_quantile.go
Original file line number Diff line number Diff line change
Expand Up @@ -133,13 +133,30 @@ func (b indexedBuckets) Less(i, j int) bool {

type bucketedSeries map[string]indexedBuckets

func gatherSeriesToBuckets(metas []block.SeriesMeta) bucketedSeries {
type validSeriesBuckets []indexedBuckets

func (b validSeriesBuckets) Len() int { return len(b) }
func (b validSeriesBuckets) Swap(i, j int) { b[i], b[j] = b[j], b[i] }
func (b validSeriesBuckets) Less(i, j int) bool {
if len(b[i].buckets) == 0 {
return false
}

if len(b[j].buckets) == 0 {
return true
}

// An arbitrarily chosen sort that guarantees deterministic results.
return b[i].buckets[0].idx < b[j].buckets[0].idx
}

func gatherSeriesToBuckets(metas []block.SeriesMeta) validSeriesBuckets {
bucketsForID := make(bucketedSeries, initIndexBucketLength)
for i, meta := range metas {
tags := meta.Tags
value, found := tags.Bucket()
if !found {
// This series does not have a bucket tag; drop it from the output.
// this series does not have a bucket tag; drop it from the output.
continue
}

Expand All @@ -158,7 +175,7 @@ func gatherSeriesToBuckets(metas []block.SeriesMeta) bucketedSeries {
}

if buckets, found := bucketsForID[string(id)]; !found {
// Add a single indexed bucket for this ID with the current index only.
// add a single indexed bucket for this ID with the current index only.
newBuckets := make([]indexedBucket, 0, initIndexBucketLength)
newBuckets = append(newBuckets, newBucket)
bucketsForID[string(id)] = indexedBuckets{
Expand All @@ -171,23 +188,29 @@ func gatherSeriesToBuckets(metas []block.SeriesMeta) bucketedSeries {
}
}

return bucketsForID
return sanitizeBuckets(bucketsForID)
}

// sanitize sorts the bucket maps by upper bound, dropping any series which
// have less than two buckets, or any that do not have an upper bound of +Inf
func sanitizeBuckets(bucketMap bucketedSeries) {
for k, buckets := range bucketMap {
func sanitizeBuckets(bucketMap bucketedSeries) validSeriesBuckets {
validSeriesBuckets := make(validSeriesBuckets, 0, len(bucketMap))
for _, buckets := range bucketMap {
if len(buckets.buckets) < 2 {
delete(bucketMap, k)
continue
}

sort.Sort(buckets)
maxBound := buckets.buckets[len(buckets.buckets)-1].upperBound
if !math.IsInf(maxBound, 1) {
delete(bucketMap, k)
continue
}

validSeriesBuckets = append(validSeriesBuckets, buckets)
}

sort.Sort(validSeriesBuckets)
return validSeriesBuckets
}

func bucketQuantile(q float64, buckets []bucketValue) float64 {
Expand Down Expand Up @@ -257,34 +280,30 @@ func (n *histogramQuantileNode) ProcessBlock(

meta := b.Meta()
seriesMetas := utils.FlattenMetadata(meta, stepIter.SeriesMeta())
bucketedSeries := gatherSeriesToBuckets(seriesMetas)
seriesBuckets := gatherSeriesToBuckets(seriesMetas)

q := n.op.q
if q < 0 || q > 1 {
return processInvalidQuantile(queryCtx, q, bucketedSeries, meta, stepIter, n.controller)
return processInvalidQuantile(queryCtx, q, seriesBuckets, meta, stepIter, n.controller)
}

return processValidQuantile(queryCtx, q, bucketedSeries, meta, stepIter, n.controller)
return processValidQuantile(queryCtx, q, seriesBuckets, meta, stepIter, n.controller)
}

func setupBuilder(
queryCtx *models.QueryContext,
bucketedSeries bucketedSeries,
seriesBuckets validSeriesBuckets,
meta block.Metadata,
stepIter block.StepIter,
controller *transform.Controller,
) (block.Builder, error) {
metas := make([]block.SeriesMeta, len(bucketedSeries))
idx := 0
for _, v := range bucketedSeries {
metas[idx] = block.SeriesMeta{
metas := make([]block.SeriesMeta, 0, len(seriesBuckets))
for _, v := range seriesBuckets {
metas = append(metas, block.SeriesMeta{
Tags: v.tags,
}

idx++
})
}

meta.Tags, metas = utils.DedupeMetadata(metas, meta.Tags.Opts)
builder, err := controller.BlockBuilder(queryCtx, meta, metas)
if err != nil {
return nil, err
Expand All @@ -300,14 +319,12 @@ func setupBuilder(
func processValidQuantile(
queryCtx *models.QueryContext,
q float64,
bucketedSeries bucketedSeries,
seriesBuckets validSeriesBuckets,
meta block.Metadata,
stepIter block.StepIter,
controller *transform.Controller,
) (block.Block, error) {
sanitizeBuckets(bucketedSeries)

builder, err := setupBuilder(queryCtx, bucketedSeries, meta, stepIter, controller)
builder, err := setupBuilder(queryCtx, seriesBuckets, meta, stepIter, controller)
if err != nil {
return nil, err
}
Expand All @@ -317,9 +334,8 @@ func processValidQuantile(
values := step.Values()
bucketValues := make([]bucketValue, 0, initIndexBucketLength)

aggregatedValues := make([]float64, len(bucketedSeries))
idx := 0
for _, b := range bucketedSeries {
aggregatedValues := make([]float64, 0, len(seriesBuckets))
for _, b := range seriesBuckets {
buckets := b.buckets
// clear previous bucket values.
bucketValues = bucketValues[:0]
Expand All @@ -336,8 +352,7 @@ func processValidQuantile(
}
}

aggregatedValues[idx] = bucketQuantile(q, bucketValues)
idx++
aggregatedValues = append(aggregatedValues, bucketQuantile(q, bucketValues))
}

if err := builder.AppendValues(index, aggregatedValues); err != nil {
Expand All @@ -355,12 +370,12 @@ func processValidQuantile(
func processInvalidQuantile(
queryCtx *models.QueryContext,
q float64,
bucketedSeries bucketedSeries,
seriesBuckets validSeriesBuckets,
meta block.Metadata,
stepIter block.StepIter,
controller *transform.Controller,
) (block.Block, error) {
builder, err := setupBuilder(queryCtx, bucketedSeries, meta, stepIter, controller)
builder, err := setupBuilder(queryCtx, seriesBuckets, meta, stepIter, controller)
if err != nil {
return nil, err
}
Expand All @@ -373,7 +388,7 @@ func processInvalidQuantile(
}

setValue := math.Inf(sign)
outValues := make([]float64, len(bucketedSeries))
outValues := make([]float64, len(seriesBuckets))
util.Memset(outValues, setValue)
for index := 0; stepIter.Next(); index++ {
if err := builder.AppendValues(index, outValues); err != nil {
Expand Down
89 changes: 84 additions & 5 deletions src/query/functions/linear/histogram_quantile_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,7 @@ func TestGatherSeriesToBuckets(t *testing.T) {
},
}

assert.Equal(t, expected, actual)
assert.Equal(t, sanitizeBuckets(expected), actual)
}

func TestSanitizeBuckets(t *testing.T) {
Expand Down Expand Up @@ -132,8 +132,8 @@ func TestSanitizeBuckets(t *testing.T) {
},
}

actual := bucketedSeries{
`{bar="baz"}`: indexedBuckets{
expected := validSeriesBuckets{
indexedBuckets{
buckets: []indexedBucket{
{upperBound: 1, idx: 0},
{upperBound: 2, idx: 3},
Expand All @@ -143,8 +143,7 @@ func TestSanitizeBuckets(t *testing.T) {
},
}

sanitizeBuckets(bucketed)
assert.Equal(t, actual, bucketed)
assert.Equal(t, expected, sanitizeBuckets(bucketed))
}

func TestBucketQuantile(t *testing.T) {
Expand Down Expand Up @@ -295,3 +294,83 @@ func TestQuantileFunctionForInvalidQValues(t *testing.T) {
actual = testQuantileFunctionWithQ(t, 0.8)
test.EqualsWithNansWithDelta(t, [][]float64{{15.6, 20, math.NaN(), 2, math.NaN()}}, actual, 0.00001)
}

func testWithMultipleBuckets(t *testing.T, q float64) [][]float64 {
args := make([]interface{}, 0, 1)
args = append(args, q)
op, err := NewHistogramQuantileOp(args, HistogramQuantileType)
require.NoError(t, err)

name := []byte("name")
bucket := []byte("bucket")
tagOpts := models.NewTagOptions().
SetIDSchemeType(models.TypeQuoted).
SetMetricName(name).
SetBucketName(bucket)

tags := models.NewTags(3, tagOpts).SetName([]byte("foo")).AddTag(models.Tag{
Name: []byte("bar"),
Value: []byte("baz"),
})

tagsTwo := models.NewTags(3, tagOpts).SetName([]byte("qux")).AddTag(models.Tag{
Name: []byte("quaz"),
Value: []byte("quail"),
})

seriesMetas := []block.SeriesMeta{
{Tags: tags.Clone().SetBucket([]byte("1"))},
{Tags: tags.Clone().SetBucket([]byte("2"))},
{Tags: tags.Clone().SetBucket([]byte("5"))},
{Tags: tags.Clone().SetBucket([]byte("10"))},
{Tags: tags.Clone().SetBucket([]byte("20"))},
{Tags: tags.Clone().SetBucket([]byte("Inf"))},
{Tags: tagsTwo.Clone().SetBucket([]byte("1"))},
{Tags: tagsTwo.Clone().SetBucket([]byte("2"))},
{Tags: tagsTwo.Clone().SetBucket([]byte("5"))},
{Tags: tagsTwo.Clone().SetBucket([]byte("10"))},
{Tags: tagsTwo.Clone().SetBucket([]byte("20"))},
{Tags: tagsTwo.Clone().SetBucket([]byte("Inf"))},
}

v := [][]float64{
{1, 1, 11, math.NaN(), math.NaN()},
{2, 2, 12, 13, math.NaN()},
{5, 5, 15, math.NaN(), math.NaN()},
{10, 10, 20, math.NaN(), math.NaN()},
{15, 15, 25, math.NaN(), math.NaN()},
{16, 19, math.NaN(), 71, 1},
{21, 31, 411, math.NaN(), math.NaN()},
{22, 32, 412, 513, math.NaN()},
{25, 35, 415, math.NaN(), math.NaN()},
{210, 310, 420, math.NaN(), math.NaN()},
{215, 315, 425, math.NaN(), math.NaN()},
{216, 319, math.NaN(), 571, 601},
}

bounds := models.Bounds{
Start: time.Now(),
Duration: time.Minute * 5,
StepSize: time.Minute,
}

bl := test.NewBlockFromValuesWithSeriesMeta(bounds, seriesMetas, v)
c, sink := executor.NewControllerWithSink(parser.NodeID(1))
node := op.(histogramQuantileOp).Node(c, transform.Options{})
err = node.Process(models.NoopQueryContext(), parser.NodeID(0), bl)
require.NoError(t, err)

return sink.Values
}

func TestQuantileFunctionForMultipleBuckets(t *testing.T) {
for i := 0; i < 100; i++ {
actual := testWithMultipleBuckets(t, 0.8)
expected := [][]float64{
{15.6, 20, math.NaN(), 2, math.NaN()},
{8.99459, 9.00363, math.NaN(), 1.78089, math.NaN()},
}

test.EqualsWithNansWithDelta(t, expected, actual, 0.00001)
}
}

0 comments on commit 934a636

Please sign in to comment.