Skip to content

Commit

Permalink
[query] Implemented the Graphite divideSeriesLists function (#2585)
Browse files Browse the repository at this point in the history
  • Loading branch information
teddywahle authored Sep 21, 2020
1 parent 35cac59 commit 9ea5682
Show file tree
Hide file tree
Showing 4 changed files with 119 additions and 18 deletions.
75 changes: 57 additions & 18 deletions src/query/graphite/native/aggregation_functions.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import (
"strings"

"github.com/m3db/m3/src/query/graphite/common"
"github.com/m3db/m3/src/query/block"
"github.com/m3db/m3/src/query/graphite/errors"
"github.com/m3db/m3/src/query/graphite/ts"
)
Expand Down Expand Up @@ -89,7 +90,37 @@ func maxSeries(ctx *common.Context, series multiplePathSpecs) (ts.SeriesList, er
return combineSeries(ctx, series, wrapPathExpr("maxSeries", ts.SeriesList(series)), ts.Max)
}

// divideSeries divides one series list by another series
func divideSeriesHelper(ctx *common.Context, dividendSeries, divisorSeries *ts.Series, metadata block.ResultMetadata) (*ts.Series, error) {
normalized, minBegin, _, lcmMillisPerStep, err := common.Normalize(ctx, ts.SeriesList{
Values: []*ts.Series{dividendSeries, divisorSeries},
Metadata: metadata,
})
if err != nil {
return nil, err
}

// NB(bl): Normalized must give back exactly two series of the same length.
dividend, divisor := normalized.Values[0], normalized.Values[1]
numSteps := dividend.Len()
vals := ts.NewValues(ctx, lcmMillisPerStep, numSteps)
for i := 0; i < numSteps; i++ {
dividendVal := dividend.ValueAt(i)
divisorVal := divisor.ValueAt(i)
if !math.IsNaN(dividendVal) && !math.IsNaN(divisorVal) && divisorVal != 0 {
value := dividendVal / divisorVal
vals.SetValueAt(i, value)
}
}

// The individual series will be named divideSeries(X, X), even if it is generated by divideSeriesLists
// Based on Graphite source code (link below)
// https://github.com/graphite-project/graphite-web/blob/17a34e7966f7a46eded30c2362765c74eea899cb/webapp/graphite/render/functions.py#L901
name := fmt.Sprintf("divideSeries(%s,%s)", dividend.Name(), divisor.Name())
quotientSeries := ts.NewSeries(ctx, name, minBegin, vals)
return quotientSeries, nil
}

// divideSeries divides one series list by another single series
func divideSeries(ctx *common.Context, dividendSeriesList, divisorSeriesList singlePathSpec) (ts.SeriesList, error) {
if len(divisorSeriesList.Values) != 1 {
err := errors.NewInvalidParamsError(fmt.Errorf(
Expand All @@ -106,27 +137,35 @@ func divideSeries(ctx *common.Context, dividendSeriesList, divisorSeriesList sin
divisorSeries := divisorSeriesList.Values[0]
results := make([]*ts.Series, len(dividendSeriesList.Values))
for idx, dividendSeries := range dividendSeriesList.Values {
normalized, minBegin, _, lcmMillisPerStep, err := common.Normalize(ctx, ts.SeriesList{
Values: []*ts.Series{dividendSeries, divisorSeries},
Metadata: divisorSeriesList.Metadata.CombineMetadata(dividendSeriesList.Metadata),
})
metadata := divisorSeriesList.Metadata.CombineMetadata(dividendSeriesList.Metadata)
quotientSeries, err := divideSeriesHelper(ctx, dividendSeries, divisorSeries, metadata)
if err != nil {
return ts.NewSeriesList(), err
}
// NB(bl): Normalized must give back exactly two series of the same length.
dividend, divisor := normalized.Values[0], normalized.Values[1]
numSteps := dividend.Len()
vals := ts.NewValues(ctx, lcmMillisPerStep, numSteps)
for i := 0; i < numSteps; i++ {
dividendVal := dividend.ValueAt(i)
divisorVal := divisor.ValueAt(i)
if !math.IsNaN(dividendVal) && !math.IsNaN(divisorVal) && divisorVal != 0 {
value := dividendVal / divisorVal
vals.SetValueAt(i, value)
}
results[idx] = quotientSeries
}

r := ts.SeriesList(dividendSeriesList)
r.Values = results
return r, nil
}


// divideSeriesLists divides one series list by another series list
func divideSeriesLists(ctx *common.Context, dividendSeriesList, divisorSeriesList singlePathSpec) (ts.SeriesList, error) {
if len(dividendSeriesList.Values) != len(divisorSeriesList.Values) {
err := errors.NewInvalidParamsError(fmt.Errorf(
"divideSeriesLists both SeriesLists must have exactly the same length"))
return ts.NewSeriesList(), err
}
results := make([]*ts.Series, len(dividendSeriesList.Values))
for idx, dividendSeries := range dividendSeriesList.Values {
divisorSeries := divisorSeriesList.Values[idx]
metadata := divisorSeriesList.Metadata.CombineMetadata(dividendSeriesList.Metadata)
quotientSeries, err := divideSeriesHelper(ctx, dividendSeries, divisorSeries, metadata)
if err != nil {
return ts.NewSeriesList(), err
}
name := fmt.Sprintf("divideSeries(%s,%s)", dividend.Name(), divisor.Name())
quotientSeries := ts.NewSeries(ctx, name, minBegin, vals)
results[idx] = quotientSeries
}

Expand Down
60 changes: 60 additions & 0 deletions src/query/graphite/native/aggregation_functions_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -255,6 +255,66 @@ func TestDivideSeries(t *testing.T) {
require.Error(t, err)
}

func TestDivideSeriesLists(t *testing.T) {
ctx, consolidationTestSeries := newConsolidationTestSeries()
defer ctx.Close()

// multiple series, different start/end times
nan := math.NaN()
series, err := divideSeriesLists(ctx, singlePathSpec{
Values: consolidationTestSeries[:2],
}, singlePathSpec{
Values: consolidationTestSeries[2:],
})
require.Nil(t, err)
expected := []common.TestSeries{
{
Name: "divideSeries(a,c)",
Data: []float64{nan, nan, nan, 0.5882, 0.5882, 0.5882, nan, nan, nan},
},
{
Name: "divideSeries(b,d)",
Data: []float64{nan, nan, nan, 5, 5, 5, nan, nan, nan},
},
}

common.CompareOutputsAndExpected(t, 10000, consolidationStartTime,
[]common.TestSeries{expected[0]}, []*ts.Series{series.Values[0]})
common.CompareOutputsAndExpected(t, 10000, consolidationStartTime.Add(-30*time.Second),
[]common.TestSeries{expected[1]}, []*ts.Series{series.Values[1]})

// different millisPerStep, same start/end times
consolidationTestSeries[0], consolidationTestSeries[2] = consolidationTestSeries[2], consolidationTestSeries[0]
consolidationTestSeries[1], consolidationTestSeries[3] = consolidationTestSeries[3], consolidationTestSeries[1]
series, err = divideSeriesLists(ctx, singlePathSpec{
Values: consolidationTestSeries[:2],
}, singlePathSpec{
Values: consolidationTestSeries[2:],
})
require.Nil(t, err)
expected = []common.TestSeries{
{
Name: "divideSeries(c,a)",
Data: []float64{nan, nan, nan, 1.7, 1.7, 1.7, nan, nan, nan},
},
{
Name: "divideSeries(d,b)",
Data: []float64{nan, nan, nan, 0.2, 0.2, 0.2, nan, nan, nan},
},
}
common.CompareOutputsAndExpected(t, 10000, consolidationStartTime,
[]common.TestSeries{expected[0]}, []*ts.Series{series.Values[0]})

// error - multiple divisor series
series, err = divideSeries(ctx, singlePathSpec{
Values: consolidationTestSeries,
}, singlePathSpec{
Values: consolidationTestSeries,
})
require.Error(t, err)
}


func TestAverageSeriesWithWildcards(t *testing.T) {
ctx, _ := newConsolidationTestSeries()
defer ctx.Close()
Expand Down
1 change: 1 addition & 0 deletions src/query/graphite/native/builtin_functions.go
Original file line number Diff line number Diff line change
Expand Up @@ -2129,6 +2129,7 @@ func init() {
MustRegisterFunction(derivative)
MustRegisterFunction(diffSeries)
MustRegisterFunction(divideSeries)
MustRegisterFunction(divideSeriesLists)
MustRegisterFunction(exclude)
MustRegisterFunction(exponentialMovingAverage)
MustRegisterFunction(fallbackSeries)
Expand Down
1 change: 1 addition & 0 deletions src/query/graphite/native/builtin_functions_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3118,6 +3118,7 @@ func TestFunctionsRegistered(t *testing.T) {
"derivative",
"diffSeries",
"divideSeries",
"divideSeriesLists",
"exclude",
"exponentialMovingAverage",
"fallbackSeries",
Expand Down

0 comments on commit 9ea5682

Please sign in to comment.