Skip to content

Commit

Permalink
[query] Implemented the Graphite integralByInterval function (#2596)
Browse files Browse the repository at this point in the history
  • Loading branch information
teddywahle authored Sep 21, 2020
1 parent a66fb7d commit f2ebf5c
Show file tree
Hide file tree
Showing 2 changed files with 96 additions and 23 deletions.
56 changes: 47 additions & 9 deletions src/query/graphite/native/builtin_functions.go
Original file line number Diff line number Diff line change
Expand Up @@ -271,7 +271,7 @@ func delay(
func delayValuesHelper(ctx *common.Context, series *ts.Series, steps int) ts.Values {
output := ts.NewValues(ctx, series.MillisPerStep(), series.Len())
for i := steps; i < series.Len(); i++ {
output.SetValueAt(i, series.ValueAt(i - steps))
output.SetValueAt(i, series.ValueAt(i-steps))
}
return output
}
Expand All @@ -281,7 +281,7 @@ func delayValuesHelper(ctx *common.Context, series *ts.Series, steps int) ts.Val
// Useful for filtering out a part of a series of data from a wider range of data.
func timeSlice(ctx *common.Context, inputPath singlePathSpec, start string, end string) (ts.SeriesList, error) {
var (
now = time.Now()
now = time.Now()
tzOffsetForAbsoluteTime time.Duration
)
startTime, err := graphite.ParseTime(start, now, tzOffsetForAbsoluteTime)
Expand Down Expand Up @@ -633,8 +633,8 @@ func lowestCurrent(_ *common.Context, input singlePathSpec, n int) (ts.SeriesLis
type windowSizeFunc func(stepSize int) int

type windowSizeParsed struct {
deltaValue time.Duration
stringValue string
deltaValue time.Duration
stringValue string
windowSizeFunc windowSizeFunc
}

Expand Down Expand Up @@ -841,7 +841,7 @@ func exponentialMovingAverage(ctx *common.Context, input singlePathSpec, windowS
curr := bootstrap.ValueAt(i + offset)
if !math.IsNaN(curr) {
// formula: ema(current) = constant * (Current Value) + (1 - constant) * ema(previous)
ema = emaConstant * curr + (1 - emaConstant) * ema
ema = emaConstant*curr + (1-emaConstant)*ema
vals.SetValueAt(i, ema)
} else {
vals.SetValueAt(i, math.NaN())
Expand Down Expand Up @@ -1086,6 +1086,46 @@ func integral(ctx *common.Context, input singlePathSpec) (ts.SeriesList, error)
return r, nil
}

// integralByInterval will do the same as integral funcion, except it resets the total to 0
// at the given time in the parameter “from”. Useful for finding totals per hour/day/week.
func integralByInterval(ctx *common.Context, input singlePathSpec, intervalString string) (ts.SeriesList, error) {
intervalUnit, err := common.ParseInterval(intervalString)
if err != nil {
return ts.NewSeriesList(), err
}
results := make([]*ts.Series, 0, len(input.Values))

for _, series := range input.Values {
var (
stepsPerInterval = intervalUnit.Milliseconds() / int64(series.MillisPerStep())
outVals = ts.NewValues(ctx, series.MillisPerStep(), series.Len())
stepCounter int64
currentSum float64
)

for i := 0; i < series.Len(); i++ {
if stepCounter == stepsPerInterval {
// startNewInterval
stepCounter = 0
currentSum = 0.0
}
n := series.ValueAt(i)
if !math.IsNaN(n) {
currentSum += n
}
outVals.SetValueAt(i, currentSum)
stepCounter += 1
}

newName := fmt.Sprintf("integralByInterval(%s, %s)", series.Name(), intervalString)
results = append(results, ts.NewSeries(ctx, newName, series.StartTime(), outVals))
}

r := ts.SeriesList(input)
r.Values = results
return r, nil
}

// This is the opposite of the integral function. This is useful for taking a
// running total metric and calculating the delta between subsequent data
// points.
Expand Down Expand Up @@ -1798,8 +1838,6 @@ func movingMinHelper(window []float64, vals ts.MutableValues, windowPoints int,
}
}



func newMovingBinaryTransform(
ctx *common.Context,
input singlePathSpec,
Expand Down Expand Up @@ -1830,7 +1868,7 @@ func newMovingBinaryTransform(

bootstrapStartTime, bootstrapEndTime := ctx.StartTime.Add(-interval), ctx.StartTime
return &binaryContextShifter{
ContextShiftFunc: contextShiftingFn,
ContextShiftFunc: contextShiftingFn,
BinaryTransformer: func(bootstrapped, original ts.SeriesList) (ts.SeriesList, error) {
bootstrapList, err := combineBootstrapWithOriginal(ctx,
bootstrapStartTime, bootstrapEndTime,
Expand Down Expand Up @@ -1910,7 +1948,6 @@ func movingMin(ctx *common.Context, input singlePathSpec, windowSize genericInte
return newMovingBinaryTransform(ctx, input, windowSize, "movingMin", movingMinHelper)
}


// legendValue takes one metric or a wildcard seriesList and a string in quotes.
// Appends a value to the metric name in the legend. Currently one or several of:
// "last", "avg", "total", "min", "max".
Expand Down Expand Up @@ -2145,6 +2182,7 @@ func init() {
MustRegisterFunction(holtWintersForecast)
MustRegisterFunction(identity)
MustRegisterFunction(integral)
MustRegisterFunction(integralByInterval)
MustRegisterFunction(isNonNull)
MustRegisterFunction(keepLastValue).WithDefaultParams(map[uint8]interface{}{
2: -1, // limit
Expand Down
63 changes: 49 additions & 14 deletions src/query/graphite/native/builtin_functions_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -643,8 +643,8 @@ func testMovingFunction(t *testing.T, target, expectedName string, values, boots
}

var (
testGeneralFunctionStart = time.Now().Add(time.Minute * -11).Truncate(time.Minute)
testGeneralFunctionEnd = time.Now().Add(time.Minute * -3).Truncate(time.Minute)
testGeneralFunctionStart = time.Now().Add(time.Minute * -11).Truncate(time.Minute)
testGeneralFunctionEnd = time.Now().Add(time.Minute * -3).Truncate(time.Minute)
)

// testGeneralFunction is a copy of testMovingFunction but without any logic for bootstrapping values
Expand All @@ -654,8 +654,8 @@ func testGeneralFunction(t *testing.T, target, expectedName string, values, outp

engine := NewEngine(
&common.MovingFunctionStorage{
StepMillis: 60000,
Values: values,
StepMillis: 60000,
Values: values,
},
)
phonyContext := common.NewContext(common.ContextOptions{
Expand Down Expand Up @@ -694,11 +694,11 @@ func TestMovingAverageSuccess(t *testing.T) {

func TestExponentialMovingAverageSuccess(t *testing.T) {
tests := []struct {
target string
target string
expectedName string
bootstrap []float64
inputs []float64
expected []float64
bootstrap []float64
inputs []float64
expected []float64
}{
{
"exponentialMovingAverage(foo.bar.baz, 3)",
Expand Down Expand Up @@ -1823,6 +1823,40 @@ func TestIntegral(t *testing.T) {
}
}

/*
seriesList = self._gen_series_list_with_data(key='test',start=0,end=600,step=60,data=[None, 1, 2, 3, 4, 5, None, 6, 7, 8])
expected = [TimeSeries("integralByInterval(test,'2min')", 0, 600, 60, [0, 1, 2, 5, 4, 9, 0, 6, 7, 15])]
*/
func TestIntegralByInterval(t *testing.T) {
ctx := common.NewTestContext()
defer ctx.Close()

invals := []float64{
math.NaN(), 1, 2, 3, 4, 5, math.NaN(), 6, 7, 8,
}

outvals := []float64{
0, 1, 2, 5, 4, 9, 0, 6, 7, 15,
}

series := ts.NewSeries(ctx, "hello", time.Now(),
common.NewTestSeriesValues(ctx, 60000, invals))

r, err := integralByInterval(ctx, singlePathSpec{
Values: []*ts.Series{series},
}, "2min")
require.NoError(t, err)

output := r.Values
require.Equal(t, 1, len(output))
assert.Equal(t, "integralByInterval(hello, 2min)", output[0].Name())
assert.Equal(t, series.StartTime(), output[0].StartTime())
require.Equal(t, len(outvals), output[0].Len())
for i, expected := range outvals {
xtest.Equalish(t, expected, output[0].ValueAt(i), "incorrect value at %d", i)
}
}

func TestDerivative(t *testing.T) {
ctx := common.NewTestContext()
defer ctx.Close()
Expand Down Expand Up @@ -2983,8 +3017,8 @@ func TestDelay(t *testing.T) {
}

var (
testDelayStart = time.Now().Truncate(time.Minute)
testDelayEnd = testMovingFunctionEnd.Add(time.Minute)
testDelayStart = time.Now().Truncate(time.Minute)
testDelayEnd = testMovingFunctionEnd.Add(time.Minute)
)

func testDelay(t *testing.T, target, expectedName string, values, output []float64) {
Expand All @@ -2993,8 +3027,8 @@ func testDelay(t *testing.T, target, expectedName string, values, output []float

engine := NewEngine(
&common.MovingFunctionStorage{
StepMillis: 10000,
Values: values,
StepMillis: 10000,
Values: values,
},
)
phonyContext := common.NewContext(common.ContextOptions{
Expand All @@ -3020,8 +3054,8 @@ func testDelay(t *testing.T, target, expectedName string, values, output []float
}

func TestTimeSlice(t *testing.T) {
values := []float64{math.NaN(),1.0,2.0,3.0,math.NaN(),5.0,6.0,math.NaN(),7.0,8.0,9.0}
expected := []float64{math.NaN(),math.NaN(),math.NaN(),3.0,math.NaN(),5.0,6.0,math.NaN(),7.0,math.NaN(),math.NaN()}
values := []float64{math.NaN(), 1.0, 2.0, 3.0, math.NaN(), 5.0, 6.0, math.NaN(), 7.0, 8.0, 9.0}
expected := []float64{math.NaN(), math.NaN(), math.NaN(), 3.0, math.NaN(), 5.0, 6.0, math.NaN(), 7.0, math.NaN(), math.NaN()}

testGeneralFunction(t, "timeSlice(foo.bar.baz, '-9min','-3min')", "timeSlice(foo.bar.baz, -9min, -3min)", values, expected)
}
Expand Down Expand Up @@ -3134,6 +3168,7 @@ func TestFunctionsRegistered(t *testing.T) {
"holtWintersForecast",
"identity",
"integral",
"integralByInterval",
"isNonNull",
"keepLastValue",
"legendValue",
Expand Down

0 comments on commit f2ebf5c

Please sign in to comment.