From 9ea5682541759bbeb5d67e115c55ad6ccbe84eae Mon Sep 17 00:00:00 2001 From: teddywahle <69990143+teddywahle@users.noreply.github.com> Date: Sun, 20 Sep 2020 21:50:31 -0700 Subject: [PATCH] [query] Implemented the Graphite `divideSeriesLists` function (#2585) --- .../graphite/native/aggregation_functions.go | 75 ++++++++++++++----- .../native/aggregation_functions_test.go | 60 +++++++++++++++ .../graphite/native/builtin_functions.go | 1 + .../graphite/native/builtin_functions_test.go | 1 + 4 files changed, 119 insertions(+), 18 deletions(-) diff --git a/src/query/graphite/native/aggregation_functions.go b/src/query/graphite/native/aggregation_functions.go index 5ea1949431..4a3069795b 100644 --- a/src/query/graphite/native/aggregation_functions.go +++ b/src/query/graphite/native/aggregation_functions.go @@ -26,6 +26,7 @@ import ( "strings" "github.com/m3db/m3/src/query/graphite/common" + "github.com/m3db/m3/src/query/block" "github.com/m3db/m3/src/query/graphite/errors" "github.com/m3db/m3/src/query/graphite/ts" ) @@ -89,7 +90,37 @@ func maxSeries(ctx *common.Context, series multiplePathSpecs) (ts.SeriesList, er return combineSeries(ctx, series, wrapPathExpr("maxSeries", ts.SeriesList(series)), ts.Max) } -// divideSeries divides one series list by another series +func divideSeriesHelper(ctx *common.Context, dividendSeries, divisorSeries *ts.Series, metadata block.ResultMetadata) (*ts.Series, error) { + normalized, minBegin, _, lcmMillisPerStep, err := common.Normalize(ctx, ts.SeriesList{ + Values: []*ts.Series{dividendSeries, divisorSeries}, + Metadata: metadata, + }) + if err != nil { + return nil, err + } + + // NB(bl): Normalized must give back exactly two series of the same length. + dividend, divisor := normalized.Values[0], normalized.Values[1] + numSteps := dividend.Len() + vals := ts.NewValues(ctx, lcmMillisPerStep, numSteps) + for i := 0; i < numSteps; i++ { + dividendVal := dividend.ValueAt(i) + divisorVal := divisor.ValueAt(i) + if !math.IsNaN(dividendVal) && !math.IsNaN(divisorVal) && divisorVal != 0 { + value := dividendVal / divisorVal + vals.SetValueAt(i, value) + } + } + + // The individual series will be named divideSeries(X, X), even if it is generated by divideSeriesLists + // Based on Graphite source code (link below) + // https://github.com/graphite-project/graphite-web/blob/17a34e7966f7a46eded30c2362765c74eea899cb/webapp/graphite/render/functions.py#L901 + name := fmt.Sprintf("divideSeries(%s,%s)", dividend.Name(), divisor.Name()) + quotientSeries := ts.NewSeries(ctx, name, minBegin, vals) + return quotientSeries, nil +} + +// divideSeries divides one series list by another single series func divideSeries(ctx *common.Context, dividendSeriesList, divisorSeriesList singlePathSpec) (ts.SeriesList, error) { if len(divisorSeriesList.Values) != 1 { err := errors.NewInvalidParamsError(fmt.Errorf( @@ -106,27 +137,35 @@ func divideSeries(ctx *common.Context, dividendSeriesList, divisorSeriesList sin divisorSeries := divisorSeriesList.Values[0] results := make([]*ts.Series, len(dividendSeriesList.Values)) for idx, dividendSeries := range dividendSeriesList.Values { - normalized, minBegin, _, lcmMillisPerStep, err := common.Normalize(ctx, ts.SeriesList{ - Values: []*ts.Series{dividendSeries, divisorSeries}, - Metadata: divisorSeriesList.Metadata.CombineMetadata(dividendSeriesList.Metadata), - }) + metadata := divisorSeriesList.Metadata.CombineMetadata(dividendSeriesList.Metadata) + quotientSeries, err := divideSeriesHelper(ctx, dividendSeries, divisorSeries, metadata) if err != nil { return ts.NewSeriesList(), err } - // NB(bl): Normalized must give back exactly two series of the same length. - dividend, divisor := normalized.Values[0], normalized.Values[1] - numSteps := dividend.Len() - vals := ts.NewValues(ctx, lcmMillisPerStep, numSteps) - for i := 0; i < numSteps; i++ { - dividendVal := dividend.ValueAt(i) - divisorVal := divisor.ValueAt(i) - if !math.IsNaN(dividendVal) && !math.IsNaN(divisorVal) && divisorVal != 0 { - value := dividendVal / divisorVal - vals.SetValueAt(i, value) - } + results[idx] = quotientSeries + } + + r := ts.SeriesList(dividendSeriesList) + r.Values = results + return r, nil +} + + +// divideSeriesLists divides one series list by another series list +func divideSeriesLists(ctx *common.Context, dividendSeriesList, divisorSeriesList singlePathSpec) (ts.SeriesList, error) { + if len(dividendSeriesList.Values) != len(divisorSeriesList.Values) { + err := errors.NewInvalidParamsError(fmt.Errorf( + "divideSeriesLists both SeriesLists must have exactly the same length")) + return ts.NewSeriesList(), err + } + results := make([]*ts.Series, len(dividendSeriesList.Values)) + for idx, dividendSeries := range dividendSeriesList.Values { + divisorSeries := divisorSeriesList.Values[idx] + metadata := divisorSeriesList.Metadata.CombineMetadata(dividendSeriesList.Metadata) + quotientSeries, err := divideSeriesHelper(ctx, dividendSeries, divisorSeries, metadata) + if err != nil { + return ts.NewSeriesList(), err } - name := fmt.Sprintf("divideSeries(%s,%s)", dividend.Name(), divisor.Name()) - quotientSeries := ts.NewSeries(ctx, name, minBegin, vals) results[idx] = quotientSeries } diff --git a/src/query/graphite/native/aggregation_functions_test.go b/src/query/graphite/native/aggregation_functions_test.go index 8ea1343ee4..772ea2965f 100644 --- a/src/query/graphite/native/aggregation_functions_test.go +++ b/src/query/graphite/native/aggregation_functions_test.go @@ -255,6 +255,66 @@ func TestDivideSeries(t *testing.T) { require.Error(t, err) } +func TestDivideSeriesLists(t *testing.T) { + ctx, consolidationTestSeries := newConsolidationTestSeries() + defer ctx.Close() + + // multiple series, different start/end times + nan := math.NaN() + series, err := divideSeriesLists(ctx, singlePathSpec{ + Values: consolidationTestSeries[:2], + }, singlePathSpec{ + Values: consolidationTestSeries[2:], + }) + require.Nil(t, err) + expected := []common.TestSeries{ + { + Name: "divideSeries(a,c)", + Data: []float64{nan, nan, nan, 0.5882, 0.5882, 0.5882, nan, nan, nan}, + }, + { + Name: "divideSeries(b,d)", + Data: []float64{nan, nan, nan, 5, 5, 5, nan, nan, nan}, + }, + } + + common.CompareOutputsAndExpected(t, 10000, consolidationStartTime, + []common.TestSeries{expected[0]}, []*ts.Series{series.Values[0]}) + common.CompareOutputsAndExpected(t, 10000, consolidationStartTime.Add(-30*time.Second), + []common.TestSeries{expected[1]}, []*ts.Series{series.Values[1]}) + + // different millisPerStep, same start/end times + consolidationTestSeries[0], consolidationTestSeries[2] = consolidationTestSeries[2], consolidationTestSeries[0] + consolidationTestSeries[1], consolidationTestSeries[3] = consolidationTestSeries[3], consolidationTestSeries[1] + series, err = divideSeriesLists(ctx, singlePathSpec{ + Values: consolidationTestSeries[:2], + }, singlePathSpec{ + Values: consolidationTestSeries[2:], + }) + require.Nil(t, err) + expected = []common.TestSeries{ + { + Name: "divideSeries(c,a)", + Data: []float64{nan, nan, nan, 1.7, 1.7, 1.7, nan, nan, nan}, + }, + { + Name: "divideSeries(d,b)", + Data: []float64{nan, nan, nan, 0.2, 0.2, 0.2, nan, nan, nan}, + }, + } + common.CompareOutputsAndExpected(t, 10000, consolidationStartTime, + []common.TestSeries{expected[0]}, []*ts.Series{series.Values[0]}) + + // error - multiple divisor series + series, err = divideSeries(ctx, singlePathSpec{ + Values: consolidationTestSeries, + }, singlePathSpec{ + Values: consolidationTestSeries, + }) + require.Error(t, err) +} + + func TestAverageSeriesWithWildcards(t *testing.T) { ctx, _ := newConsolidationTestSeries() defer ctx.Close() diff --git a/src/query/graphite/native/builtin_functions.go b/src/query/graphite/native/builtin_functions.go index 41fa704ef0..19c43d0458 100644 --- a/src/query/graphite/native/builtin_functions.go +++ b/src/query/graphite/native/builtin_functions.go @@ -2129,6 +2129,7 @@ func init() { MustRegisterFunction(derivative) MustRegisterFunction(diffSeries) MustRegisterFunction(divideSeries) + MustRegisterFunction(divideSeriesLists) MustRegisterFunction(exclude) MustRegisterFunction(exponentialMovingAverage) MustRegisterFunction(fallbackSeries) diff --git a/src/query/graphite/native/builtin_functions_test.go b/src/query/graphite/native/builtin_functions_test.go index bfe4b691d4..cd7a4017ca 100644 --- a/src/query/graphite/native/builtin_functions_test.go +++ b/src/query/graphite/native/builtin_functions_test.go @@ -3118,6 +3118,7 @@ func TestFunctionsRegistered(t *testing.T) { "derivative", "diffSeries", "divideSeries", + "divideSeriesLists", "exclude", "exponentialMovingAverage", "fallbackSeries",