From e5348a33dcde5bbafbdbd7775f395ec749d2317d Mon Sep 17 00:00:00 2001 From: Evan Yin Date: Wed, 5 May 2021 17:43:28 -0700 Subject: [PATCH 1/4] Fix bugs in aggregateWithWildcards and update it to support more aggregate functions --- .ci | 2 +- .../graphite/native/aggregation_functions.go | 89 ++++++++++++------- .../native/aggregation_functions_test.go | 53 ++++++++--- 3 files changed, 99 insertions(+), 45 deletions(-) diff --git a/.ci b/.ci index b045911f33..96907c2669 160000 --- a/.ci +++ b/.ci @@ -1 +1 @@ -Subproject commit b045911f33a3396918b4120a59bd83c54e2d9034 +Subproject commit 96907c2669187b166eead31d9e9a5bc4fcbb9b52 diff --git a/src/query/graphite/native/aggregation_functions.go b/src/query/graphite/native/aggregation_functions.go index b51acf90a2..818cd514fa 100644 --- a/src/query/graphite/native/aggregation_functions.go +++ b/src/query/graphite/native/aggregation_functions.go @@ -338,21 +338,44 @@ func sumSeriesWithWildcards( // aggregateWithWildcards splits the given set of series into sub-groupings // based on wildcard matches in the hierarchy, then aggregate the values in // each grouping based on the given function. +// Similar to combineSeriesWithWildcards function but more general, as it +// supports any aggregate functions while combineSeriesWithWildcards only +// support aggregation with existing ts.ConsolidationFunc. func aggregateWithWildcards( ctx *common.Context, series singlePathSpec, fname string, positions ...int, ) (ts.SeriesList, error) { - f, fexists := summarizeFuncs[fname] - if !fexists { - err := xerrors.NewInvalidParamsError(fmt.Errorf( - "invalid func %s", fname)) - return ts.NewSeriesList(), err + if len(series.Values) == 0 { + return ts.SeriesList(series), nil } - return combineSeriesWithWildcards(ctx, series, positions, f.specificationFunc, f.consolidationFunc) -} + toAggregate, _ := splitSeriesIntoSubgroups(series, positions) + + newSeries := make([]*ts.Series, 0, len(toAggregate)) + for name, toAggregateSeries := range toAggregate { + seriesList := ts.SeriesList{ + Values: toAggregateSeries, + Metadata: series.Metadata, + } + aggregated, err := aggregate(ctx, singlePathSpec(seriesList), fname) + if err != nil { + return ts.NewSeriesList(), err + } + renamedSeries := aggregated.Values[0].RenamedTo(name) + newSeries = append(newSeries, renamedSeries) + } + + r := ts.SeriesList(series) + + r.Values = newSeries + + // Ranging over hash map to create results destroys + // any sort order on the incoming series list + r.SortApplied = false + + return r, nil} // combineSeriesWithWildcards splits the given set of series into sub-groupings // based on wildcard matches in the hierarchy, then combines the values in each @@ -368,6 +391,34 @@ func combineSeriesWithWildcards( return ts.SeriesList(series), nil } + toCombine, _ := splitSeriesIntoSubgroups(series, positions) + + newSeries := make([]*ts.Series, 0, len(toCombine)) + for name, toCombineSeries := range toCombine { + seriesList := ts.SeriesList{ + Values: toCombineSeries, + Metadata: series.Metadata, + } + combined, err := combineSeries(ctx, multiplePathSpecs(seriesList), name, f) + if err != nil { + return ts.NewSeriesList(), err + } + combined.Values[0].Specification = sf(seriesList) + newSeries = append(newSeries, combined.Values...) + } + + r := ts.SeriesList(series) + + r.Values = newSeries + + // Ranging over hash map to create results destroys + // any sort order on the incoming series list + r.SortApplied = false + + return r, nil +} + +func splitSeriesIntoSubgroups(series singlePathSpec, positions []int) (map[string][]*ts.Series, error) { var ( toCombine = make(map[string][]*ts.Series) wildcards = make(map[int]struct{}) @@ -392,29 +443,7 @@ func combineSeriesWithWildcards( toCombine[newName] = append(toCombine[newName], series) } - newSeries := make([]*ts.Series, 0, len(toCombine)) - for name, combinedSeries := range toCombine { - seriesList := ts.SeriesList{ - Values: combinedSeries, - Metadata: series.Metadata, - } - combined, err := combineSeries(ctx, multiplePathSpecs(seriesList), name, f) - if err != nil { - return ts.NewSeriesList(), err - } - combined.Values[0].Specification = sf(seriesList) - newSeries = append(newSeries, combined.Values...) - } - - r := ts.SeriesList(series) - - r.Values = newSeries - - // Ranging over hash map to create results destroys - // any sort order on the incoming series list - r.SortApplied = false - - return r, nil + return toCombine, nil } // splits a slice into chunks diff --git a/src/query/graphite/native/aggregation_functions_test.go b/src/query/graphite/native/aggregation_functions_test.go index 8e9af03045..abdcb4b8e6 100644 --- a/src/query/graphite/native/aggregation_functions_test.go +++ b/src/query/graphite/native/aggregation_functions_test.go @@ -651,26 +651,51 @@ func TestAggregateWithWildcards(t *testing.T) { ) defer ctx.Close() - outSeries, err := aggregateWithWildcards(ctx, singlePathSpec{ - Values: inputs, - }, "sum", 1, 2) - require.NoError(t, err) - require.Equal(t, 2, len(outSeries.Values)) - - outSeries, _ = sortByName(ctx, singlePathSpec(outSeries), false, false) - expectedOutputs := []struct { + type result struct { name string sumOfVals float64 + } + + tests := []struct { + fname string + nodes []int + expectedResults []result }{ - {"servers.status.400", 90 * 12}, - {"servers.status.500", 30 * 12}, + {"avg", []int{1, 2}, []result{ + {"servers.status.400", ((20 + 30 + 40 ) / 3) * 12}, + {"servers.status.500", ((2 + 4 + 6 + 8 + 10) / 5) * 12}, + }}, + {"max", []int{2, 4}, []result{ + {"servers.status.400", 40 * 12}, + {"servers.status.500", 10 * 12}, + }}, + {"min", []int{2, -1}, []result{ + {"servers.status.400", 20 * 12}, + {"servers.status.500", 2 * 12}, + }}, + {"median", []int{1, 2}, []result{ + {"servers.status.400", 30 * 12}, + {"servers.status.500", 6 * 12}, + }}, } - for i, expected := range expectedOutputs { - series := outSeries.Values[i] - assert.Equal(t, expected.name, series.Name()) - assert.Equal(t, expected.sumOfVals, series.SafeSum()) + for _, test := range tests { + outSeries, err := aggregateWithWildcards(ctx, singlePathSpec{ + Values: inputs, + }, test.fname, 1, 2) + require.NoError(t, err) + require.Equal(t, 2, len(outSeries.Values)) + + outSeries, _ = sortByName(ctx, singlePathSpec(outSeries), false, false) + + for i, expected := range test.expectedResults { + series := outSeries.Values[i] + assert.Equal(t, expected.name, series.Name(),"wrong name for %v %s (%d)", test.nodes, test.fname, i) + + assert.Equal(t, expected.sumOfVals, series.SafeSum(), + "wrong result for %v %s (%d)", test.nodes, test.fname, i) + } } } From 8496f43bd4a57786ed5877bd033e0d732c5843ba Mon Sep 17 00:00:00 2001 From: Rob Skillington Date: Thu, 6 May 2021 11:29:07 -0400 Subject: [PATCH 2/4] Update .ci --- .ci | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.ci b/.ci index 96907c2669..b045911f33 160000 --- a/.ci +++ b/.ci @@ -1 +1 @@ -Subproject commit 96907c2669187b166eead31d9e9a5bc4fcbb9b52 +Subproject commit b045911f33a3396918b4120a59bd83c54e2d9034 From 6b1c24970f5beeddee362bae5c568ecad3d295f4 Mon Sep 17 00:00:00 2001 From: Rob Skillington Date: Thu, 6 May 2021 11:32:49 -0400 Subject: [PATCH 3/4] Remove unused error return value from splitSeriesIntoSubgroups --- src/query/graphite/native/aggregation_functions.go | 11 ++++++----- 1 file changed, 6 insertions(+), 5 deletions(-) diff --git a/src/query/graphite/native/aggregation_functions.go b/src/query/graphite/native/aggregation_functions.go index 818cd514fa..c2561034b4 100644 --- a/src/query/graphite/native/aggregation_functions.go +++ b/src/query/graphite/native/aggregation_functions.go @@ -351,7 +351,7 @@ func aggregateWithWildcards( return ts.SeriesList(series), nil } - toAggregate, _ := splitSeriesIntoSubgroups(series, positions) + toAggregate := splitSeriesIntoSubgroups(series, positions) newSeries := make([]*ts.Series, 0, len(toAggregate)) for name, toAggregateSeries := range toAggregate { @@ -375,7 +375,8 @@ func aggregateWithWildcards( // any sort order on the incoming series list r.SortApplied = false - return r, nil} + return r, nil +} // combineSeriesWithWildcards splits the given set of series into sub-groupings // based on wildcard matches in the hierarchy, then combines the values in each @@ -391,7 +392,7 @@ func combineSeriesWithWildcards( return ts.SeriesList(series), nil } - toCombine, _ := splitSeriesIntoSubgroups(series, positions) + toCombine := splitSeriesIntoSubgroups(series, positions) newSeries := make([]*ts.Series, 0, len(toCombine)) for name, toCombineSeries := range toCombine { @@ -418,7 +419,7 @@ func combineSeriesWithWildcards( return r, nil } -func splitSeriesIntoSubgroups(series singlePathSpec, positions []int) (map[string][]*ts.Series, error) { +func splitSeriesIntoSubgroups(series singlePathSpec, positions []int) map[string][]*ts.Series { var ( toCombine = make(map[string][]*ts.Series) wildcards = make(map[int]struct{}) @@ -443,7 +444,7 @@ func splitSeriesIntoSubgroups(series singlePathSpec, positions []int) (map[strin toCombine[newName] = append(toCombine[newName], series) } - return toCombine, nil + return toCombine } // splits a slice into chunks From 5308cb1b880b89c6f2b0674516b7982b29cdd0a6 Mon Sep 17 00:00:00 2001 From: Rob Skillington Date: Thu, 6 May 2021 11:33:51 -0400 Subject: [PATCH 4/4] Fix lint --- src/query/graphite/native/aggregation_functions_test.go | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/src/query/graphite/native/aggregation_functions_test.go b/src/query/graphite/native/aggregation_functions_test.go index abdcb4b8e6..4b0b9ed6a0 100644 --- a/src/query/graphite/native/aggregation_functions_test.go +++ b/src/query/graphite/native/aggregation_functions_test.go @@ -651,7 +651,6 @@ func TestAggregateWithWildcards(t *testing.T) { ) defer ctx.Close() - type result struct { name string sumOfVals float64 @@ -663,7 +662,7 @@ func TestAggregateWithWildcards(t *testing.T) { expectedResults []result }{ {"avg", []int{1, 2}, []result{ - {"servers.status.400", ((20 + 30 + 40 ) / 3) * 12}, + {"servers.status.400", ((20 + 30 + 40) / 3) * 12}, {"servers.status.500", ((2 + 4 + 6 + 8 + 10) / 5) * 12}, }}, {"max", []int{2, 4}, []result{ @@ -691,7 +690,7 @@ func TestAggregateWithWildcards(t *testing.T) { for i, expected := range test.expectedResults { series := outSeries.Values[i] - assert.Equal(t, expected.name, series.Name(),"wrong name for %v %s (%d)", test.nodes, test.fname, i) + assert.Equal(t, expected.name, series.Name(), "wrong name for %v %s (%d)", test.nodes, test.fname, i) assert.Equal(t, expected.sumOfVals, series.SafeSum(), "wrong result for %v %s (%d)", test.nodes, test.fname, i)