Skip to content

Commit

Permalink
[query] Fix using median with aggregateWithWildcards and support more…
Browse files Browse the repository at this point in the history
… aggregate functions (#3469)
  • Loading branch information
yyin-sc authored May 6, 2021
1 parent b770ecf commit c1e540c
Show file tree
Hide file tree
Showing 2 changed files with 98 additions and 44 deletions.
88 changes: 59 additions & 29 deletions src/query/graphite/native/aggregation_functions.go
Original file line number Diff line number Diff line change
Expand Up @@ -338,20 +338,44 @@ func sumSeriesWithWildcards(
// aggregateWithWildcards splits the given set of series into sub-groupings
// based on wildcard matches in the hierarchy, then aggregate the values in
// each grouping based on the given function.
// Similar to combineSeriesWithWildcards function but more general, as it
// supports any aggregate functions while combineSeriesWithWildcards only
// support aggregation with existing ts.ConsolidationFunc.
func aggregateWithWildcards(
ctx *common.Context,
series singlePathSpec,
fname string,
positions ...int,
) (ts.SeriesList, error) {
f, fexists := summarizeFuncs[fname]
if !fexists {
err := xerrors.NewInvalidParamsError(fmt.Errorf(
"invalid func %s", fname))
return ts.NewSeriesList(), err
if len(series.Values) == 0 {
return ts.SeriesList(series), nil
}

toAggregate := splitSeriesIntoSubgroups(series, positions)

newSeries := make([]*ts.Series, 0, len(toAggregate))
for name, toAggregateSeries := range toAggregate {
seriesList := ts.SeriesList{
Values: toAggregateSeries,
Metadata: series.Metadata,
}
aggregated, err := aggregate(ctx, singlePathSpec(seriesList), fname)
if err != nil {
return ts.NewSeriesList(), err
}
renamedSeries := aggregated.Values[0].RenamedTo(name)
newSeries = append(newSeries, renamedSeries)
}

return combineSeriesWithWildcards(ctx, series, positions, f.specificationFunc, f.consolidationFunc)
r := ts.SeriesList(series)

r.Values = newSeries

// Ranging over hash map to create results destroys
// any sort order on the incoming series list
r.SortApplied = false

return r, nil
}

// combineSeriesWithWildcards splits the given set of series into sub-groupings
Expand All @@ -368,6 +392,34 @@ func combineSeriesWithWildcards(
return ts.SeriesList(series), nil
}

toCombine := splitSeriesIntoSubgroups(series, positions)

newSeries := make([]*ts.Series, 0, len(toCombine))
for name, toCombineSeries := range toCombine {
seriesList := ts.SeriesList{
Values: toCombineSeries,
Metadata: series.Metadata,
}
combined, err := combineSeries(ctx, multiplePathSpecs(seriesList), name, f)
if err != nil {
return ts.NewSeriesList(), err
}
combined.Values[0].Specification = sf(seriesList)
newSeries = append(newSeries, combined.Values...)
}

r := ts.SeriesList(series)

r.Values = newSeries

// Ranging over hash map to create results destroys
// any sort order on the incoming series list
r.SortApplied = false

return r, nil
}

func splitSeriesIntoSubgroups(series singlePathSpec, positions []int) map[string][]*ts.Series {
var (
toCombine = make(map[string][]*ts.Series)
wildcards = make(map[int]struct{})
Expand All @@ -392,29 +444,7 @@ func combineSeriesWithWildcards(
toCombine[newName] = append(toCombine[newName], series)
}

newSeries := make([]*ts.Series, 0, len(toCombine))
for name, combinedSeries := range toCombine {
seriesList := ts.SeriesList{
Values: combinedSeries,
Metadata: series.Metadata,
}
combined, err := combineSeries(ctx, multiplePathSpecs(seriesList), name, f)
if err != nil {
return ts.NewSeriesList(), err
}
combined.Values[0].Specification = sf(seriesList)
newSeries = append(newSeries, combined.Values...)
}

r := ts.SeriesList(series)

r.Values = newSeries

// Ranging over hash map to create results destroys
// any sort order on the incoming series list
r.SortApplied = false

return r, nil
return toCombine
}

// splits a slice into chunks
Expand Down
54 changes: 39 additions & 15 deletions src/query/graphite/native/aggregation_functions_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -651,26 +651,50 @@ func TestAggregateWithWildcards(t *testing.T) {
)
defer ctx.Close()

outSeries, err := aggregateWithWildcards(ctx, singlePathSpec{
Values: inputs,
}, "sum", 1, 2)
require.NoError(t, err)
require.Equal(t, 2, len(outSeries.Values))

outSeries, _ = sortByName(ctx, singlePathSpec(outSeries), false, false)

expectedOutputs := []struct {
type result struct {
name string
sumOfVals float64
}

tests := []struct {
fname string
nodes []int
expectedResults []result
}{
{"servers.status.400", 90 * 12},
{"servers.status.500", 30 * 12},
{"avg", []int{1, 2}, []result{
{"servers.status.400", ((20 + 30 + 40) / 3) * 12},
{"servers.status.500", ((2 + 4 + 6 + 8 + 10) / 5) * 12},
}},
{"max", []int{2, 4}, []result{
{"servers.status.400", 40 * 12},
{"servers.status.500", 10 * 12},
}},
{"min", []int{2, -1}, []result{
{"servers.status.400", 20 * 12},
{"servers.status.500", 2 * 12},
}},
{"median", []int{1, 2}, []result{
{"servers.status.400", 30 * 12},
{"servers.status.500", 6 * 12},
}},
}

for i, expected := range expectedOutputs {
series := outSeries.Values[i]
assert.Equal(t, expected.name, series.Name())
assert.Equal(t, expected.sumOfVals, series.SafeSum())
for _, test := range tests {
outSeries, err := aggregateWithWildcards(ctx, singlePathSpec{
Values: inputs,
}, test.fname, 1, 2)
require.NoError(t, err)
require.Equal(t, 2, len(outSeries.Values))

outSeries, _ = sortByName(ctx, singlePathSpec(outSeries), false, false)

for i, expected := range test.expectedResults {
series := outSeries.Values[i]
assert.Equal(t, expected.name, series.Name(), "wrong name for %v %s (%d)", test.nodes, test.fname, i)

assert.Equal(t, expected.sumOfVals, series.SafeSum(),
"wrong result for %v %s (%d)", test.nodes, test.fname, i)
}
}
}

Expand Down

0 comments on commit c1e540c

Please sign in to comment.