diff --git a/src/query/graphite/native/aggregation_functions.go b/src/query/graphite/native/aggregation_functions.go index b4b82b0634..a0cd8e569c 100644 --- a/src/query/graphite/native/aggregation_functions.go +++ b/src/query/graphite/native/aggregation_functions.go @@ -105,8 +105,7 @@ func medianSeries(ctx *common.Context, series multiplePathSpecs) (ts.SeriesList, return ts.NewSeriesList(), err } numSteps := ts.NumSteps(start, end, millisPerStep) - values := ts.NewValues(ctx, millisPerStep, numSteps) - + values := ts.NewValues(ctx, millisPerStep, numSteps) valuesAtTime := make([]float64, len(normalized.Values)) for i := 0; i < numSteps; i++ { @@ -561,24 +560,59 @@ func applyByNode(ctx *common.Context, seriesList singlePathSpec, nodeNum int, te // // sumSeries(foo.by-function.server1.*.cpu.load5),sumSeries(foo.by-function.server2.*.cpu.load5),... func groupByNode(ctx *common.Context, series singlePathSpec, node int, fname string) (ts.SeriesList, error) { - metaSeries := make(map[string][]*ts.Series) - for _, s := range series.Values { - parts := strings.Split(s.Name(), ".") + return groupByNodes(ctx, series, fname, []int{node}...) +} + +func findFirstMetricExpression(seriesName string) (string, bool) { + idxOfRightParen := strings.Index(seriesName, ")") + if idxOfRightParen == -1 { + return "", false + } + substring := seriesName[:idxOfRightParen] + idxOfLeftParen := strings.LastIndex(substring, "(") + if idxOfLeftParen == -1 { + return "", false + } + return seriesName[idxOfLeftParen+1 : idxOfRightParen], true +} - n := node +func getParts(series *ts.Series) []string { + seriesName := series.Name() + if metricExpr, ok := findFirstMetricExpression(seriesName); ok { + seriesName = metricExpr + } + return strings.Split(seriesName, ".") +} + +func getAggregationKey(series *ts.Series, nodes []int) string { + parts := getParts(series) + + keys := make([]string, 0, len(nodes)) + for _, n := range nodes { if n < 0 { n = len(parts) + n } - if n >= len(parts) || n < 0 { - return aggregate(ctx, series, fname) + if n < len(parts) { + keys = append(keys, parts[n]) + } else { + keys = append(keys, "") } + } + return strings.Join(keys, ".") +} + +func getMetaSeriesGrouping(seriesList singlePathSpec, nodes []int) map[string][]*ts.Series { + metaSeries := make(map[string][]*ts.Series) - key := parts[n] - metaSeries[key] = append(metaSeries[key], s) + if len(nodes) > 0 { + for _, s := range seriesList.Values { + key := getAggregationKey(s, nodes) + metaSeries[key] = append(metaSeries[key], s) + } } - return applyFnToMetaSeries(ctx, series, metaSeries, fname) + return metaSeries } // Takes a serieslist and maps a callback to subgroups within as defined by multiple nodes @@ -592,37 +626,16 @@ func groupByNode(ctx *common.Context, series singlePathSpec, node int, fname str // sumSeries(ganglia.server2.*.cpu.load5),sumSeries(ganglia.server2.*.cpu.load10),sumSeries(ganglia.server2.*.cpu.load15),... // // NOTE: if len(nodes) = 0, aggregate all series into 1 series. -func groupByNodes(ctx *common.Context, series singlePathSpec, fname string, nodes ...int) (ts.SeriesList, error) { - metaSeries := make(map[string][]*ts.Series) - - nodeLen := len(nodes) - if nodeLen == 0 { - key := "*" // put into single group, not ideal, but more graphite-ish. - for _, s := range series.Values { - metaSeries[key] = append(metaSeries[key], s) - } - } else { - for _, s := range series.Values { - parts := strings.Split(s.Name(), ".") +func groupByNodes(ctx *common.Context, seriesList singlePathSpec, fname string, nodes ...int) (ts.SeriesList, error) { + metaSeries := getMetaSeriesGrouping(seriesList, nodes) - var keys []string - for _, n := range nodes { - if n < 0 { - n = len(parts) + n - } - - if n >= len(parts) || n < 0 { - return aggregate(ctx, series, fname) - } - - keys = append(keys, parts[n]) - } - key := strings.Join(keys, ".") - metaSeries[key] = append(metaSeries[key], s) - } + if len(metaSeries) == 0 { + // if nodes is an empty slice or every node in nodes exceeds the number + // of parts in each series, just treat it like aggregate + return aggregate(ctx, seriesList, fname) } - return applyFnToMetaSeries(ctx, series, metaSeries, fname) + return applyFnToMetaSeries(ctx, seriesList, metaSeries, fname) } func applyFnToMetaSeries(ctx *common.Context, series singlePathSpec, metaSeries map[string][]*ts.Series, fname string) (ts.SeriesList, error) { diff --git a/src/query/graphite/native/aggregation_functions_test.go b/src/query/graphite/native/aggregation_functions_test.go index 45d7054487..cfdd9d1cd5 100644 --- a/src/query/graphite/native/aggregation_functions_test.go +++ b/src/query/graphite/native/aggregation_functions_test.go @@ -712,7 +712,7 @@ func TestGroupByNodes(t *testing.T) { end, _ = time.Parse(time.RFC1123, "Mon, 27 Jul 2015 19:43:19 GMT") ctx = common.NewContext(common.ContextOptions{Start: start, End: end}) inputs = []*ts.Series{ - ts.NewSeries(ctx, "servers.foo-1.pod1.status.500", start, + ts.NewSeries(ctx, "transformNull(servers.foo-1.pod1.status.500)", start, ts.NewConstantValues(ctx, 2, 12, 10000)), ts.NewSeries(ctx, "servers.foo-2.pod1.status.500", start, ts.NewConstantValues(ctx, 4, 12, 10000)), @@ -755,6 +755,12 @@ func TestGroupByNodes(t *testing.T) { {"pod2.400", 40 * 12}, {"pod2.500", 10 * 12}, }}, + {"max", []int{2, 4, 100}, []result{ // test with a node number that exceeds num parts + {"pod1.400.", 30 * 12}, + {"pod1.500.", 6 * 12}, + {"pod2.400.", 40 * 12}, + {"pod2.500.", 10 * 12}, + }}, {"min", []int{2, -1}, []result{ // test negative index handling {"pod1.400", 20 * 12}, {"pod1.500", 2 * 12}, @@ -762,7 +768,10 @@ func TestGroupByNodes(t *testing.T) { {"pod2.500", 8 * 12}, }}, {"sum", []int{}, []result{ // test empty slice handing. - {"*", (2 + 4 + 6 + 8 + 10 + 20 + 30 + 40) * 12}, + {"sumSeries(transformNull(servers.foo-1.pod1.status.500),servers.foo-2.pod1.status.500,servers.foo-3.pod1.status.500,servers.foo-1.pod2.status.500,servers.foo-2.pod2.status.500,servers.foo-1.pod1.status.400,servers.foo-2.pod1.status.400,servers.foo-3.pod2.status.400)", (2 + 4 + 6 + 8 + 10 + 20 + 30 + 40) * 12}, + }}, + {"sum", []int{100}, []result{ // test all nodes out of bounds + {"", (2 + 4 + 6 + 8 + 10 + 20 + 30 + 40) * 12}, }}, } diff --git a/src/query/graphite/native/builtin_functions.go b/src/query/graphite/native/builtin_functions.go index 79fa1e493f..4528c07c1d 100644 --- a/src/query/graphite/native/builtin_functions.go +++ b/src/query/graphite/native/builtin_functions.go @@ -990,7 +990,7 @@ func exponentialMovingAverage(ctx *common.Context, input singlePathSpec, windowS } // totalFunc takes an index and returns a total value for that index -type totalFunc func(int) float64 +type totalFunc func(int, *ts.Series) float64 func totalBySum(seriesList []*ts.Series, index int) float64 { s, hasValue := 0.0, false @@ -1008,7 +1008,7 @@ func totalBySum(seriesList []*ts.Series, index int) float64 { } // asPercent calculates a percentage of the total of a wildcard series. -func asPercent(ctx *common.Context, input singlePathSpec, total genericInterface) (ts.SeriesList, error) { +func asPercent(ctx *common.Context, input singlePathSpec, total genericInterface, nodes ...int) (ts.SeriesList, error) { if len(input.Values) == 0 { return ts.SeriesList(input), nil } @@ -1029,24 +1029,56 @@ func asPercent(ctx *common.Context, input singlePathSpec, total genericInterface if total.Len() == 0 { // normalize input and sum up input as the total series toNormalize = input.Values - tf = func(idx int) float64 { return totalBySum(normalized, idx) } + tf = func(idx int, _ *ts.Series) float64 { return totalBySum(normalized, idx) } } else { // check total is a single-series list and normalize all of them if total.Len() != 1 { err := errors.NewInvalidParamsError(errors.New("total must be a single series")) return ts.NewSeriesList(), err } + if len(nodes) > 0 { + // group the series by specified nodes and then sum those groups + groupedTotal, err := groupByNodes(ctx, input, "sum", nodes...) + if err != nil { + return ts.NewSeriesList(), err + } + toNormalize = append(input.Values, groupedTotal.Values[0]) + metaSeriesSumByKey := make(map[string]*ts.Series) - toNormalize = append(input.Values, total.Values[0]) - tf = func(idx int) float64 { return normalized[len(normalized)-1].ValueAt(idx) } - totalText = total.Values[0].Name() + // map the aggregation key to the aggregated series + for _, series := range groupedTotal.Values { + metaSeriesSumByKey[series.Name()] = series + } + + tf = func(idx int, series *ts.Series) float64 { + // find which aggregation key this series falls under + // and return the sum for that aggregated group + key := getAggregationKey(series, nodes) + return metaSeriesSumByKey[key].ValueAt(idx) + } + totalText = groupedTotal.Values[0].Name() + } else { + toNormalize = append(input.Values, total.Values[0]) + tf = func(idx int, _ *ts.Series) float64 { return normalized[len(normalized)-1].ValueAt(idx) } + totalText = total.Values[0].Name() + } } case float64: toNormalize = input.Values - tf = func(idx int) float64 { return totalArg } + tf = func(idx int, _ *ts.Series) float64 { return totalArg } totalText = fmt.Sprintf(common.FloatingPointFormat, totalArg) + case nil: + // if total is nil, the total is the sum of all the input series + toNormalize = input.Values + var err error + summedSeries, err := sumSeries(ctx, multiplePathSpecs(input)) + if err != nil { + return ts.NewSeriesList(), err + } + tf = func(idx int, _ *ts.Series) float64 { return summedSeries.Values[0].ValueAt(idx) } + totalText = summedSeries.Values[0].Name() default: - err := errors.NewInvalidParamsError(errors.New("total is neither an int nor a series")) + err := errors.NewInvalidParamsError(errors.New("total must be either an int, a series, or nil")) return ts.NewSeriesList(), err } @@ -1066,8 +1098,8 @@ func asPercent(ctx *common.Context, input singlePathSpec, total genericInterface values = append(values, percents) } for i := 0; i < normalized[0].Len(); i++ { - t := tf(i) for j := 0; j < numInputSeries; j++ { + t := tf(i, normalized[j]) v := normalized[j].ValueAt(i) if !math.IsNaN(v) && !math.IsNaN(t) && t != 0 { values[j].SetValueAt(i, 100.0*v/t) @@ -2348,6 +2380,7 @@ func init() { }) MustRegisterFunction(asPercent).WithDefaultParams(map[uint8]interface{}{ 2: []*ts.Series(nil), // total + 3: nil, // nodes }) MustRegisterFunction(averageAbove) MustRegisterFunction(averageSeries) @@ -2378,7 +2411,9 @@ func init() { MustRegisterFunction(groupByNode).WithDefaultParams(map[uint8]interface{}{ 3: "average", // fname }) - MustRegisterFunction(groupByNodes) + MustRegisterFunction(groupByNodes).WithDefaultParams(map[uint8]interface{}{ + 3: nil, // nodes + }) MustRegisterFunction(highest).WithDefaultParams(map[uint8]interface{}{ 2: 1, // n, 3: "average", // f diff --git a/src/query/graphite/native/builtin_functions_test.go b/src/query/graphite/native/builtin_functions_test.go index 23855073d6..68e0108b42 100644 --- a/src/query/graphite/native/builtin_functions_test.go +++ b/src/query/graphite/native/builtin_functions_test.go @@ -1966,6 +1966,46 @@ func TestAsPercentWithFloatTotal(t *testing.T) { } } +func TestAsPercentWithNilTotal(t *testing.T) { + ctx := common.NewTestContext() + defer ctx.Close() + + nan := math.NaN() + tests := []struct { + valuesStep int + values []float64 + outputStep int + output []float64 + }{ + { + 60, + []float64{12.0, 14.0, 16.0, nan, 20.0}, + 60, + []float64{100, 100, 100, nan, 100}, + }, + } + + for _, test := range tests { + timeSeries := ts.NewSeries(ctx, "", ctx.StartTime, + common.NewTestSeriesValues(ctx, test.valuesStep, test.values)) + r, err := asPercent(ctx, singlePathSpec{ + Values: []*ts.Series{timeSeries}, + }, nil) + require.NoError(t, err) + + output := r.Values + require.Equal(t, 1, len(output)) + require.Equal(t, output[0].MillisPerStep(), test.outputStep) + expectedName := fmt.Sprintf("asPercent(, sumSeries())") + assert.Equal(t, expectedName, output[0].Name()) + + for step := 0; step < output[0].Len(); step++ { + v := output[0].ValueAt(step) + xtest.Equalish(t, math.Trunc(v), test.output[step]) + } + } +} + func TestAsPercentWithSeriesList(t *testing.T) { ctx := common.NewTestContext() defer ctx.Close()