Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[query] Implemented the Graphite delay function #2567

Merged
merged 38 commits into from
Sep 8, 2020
Merged
Show file tree
Hide file tree
Changes from 20 commits
Commits
Show all changes
38 commits
Select commit Hold shift + click to select a range
4e901d7
Added the moving movingMin function
teddywahle Aug 27, 2020
f42685a
Merge branch 'master' into twahle-moving-min
teddywahle Aug 27, 2020
f0df4ce
worked on moving min
teddywahle Aug 27, 2020
bef6377
Merge branch 'twahle-moving-min' of https://github.com/teddywahle/m3 …
teddywahle Aug 27, 2020
8ce6dc3
more work on the moving min func
teddywahle Aug 27, 2020
30ef946
Merge branch 'master' into twahle-moving-min
teddywahle Aug 28, 2020
cf9288f
updated movingMedian
teddywahle Aug 28, 2020
50b0ae9
Merge branch 'twahle-moving-min' of https://github.com/teddywahle/m3 …
teddywahle Aug 28, 2020
1ebe629
Apply suggestions from code review
teddywahle Aug 28, 2020
ebaafce
testMovingFunction
teddywahle Aug 28, 2020
2a0643c
wrote test movingSum function
teddywahle Aug 28, 2020
cb088b2
Added delay function
teddywahle Aug 28, 2020
2e183a5
built out more of the core logic for delay function
teddywahle Aug 28, 2020
dbe7b33
fixed up delay test
teddywahle Aug 28, 2020
53222b9
Merge branch 'master' into twahle-moving-min
teddywahle Aug 31, 2020
8c2b6bf
updated tests
teddywahle Aug 31, 2020
01792e7
Fixed up tests
teddywahle Aug 31, 2020
a1f7cad
Merge branch 'master' into graphite-delay-function
teddywahle Aug 31, 2020
2ea3fa8
Merge branch 'graphite-delay-function' of https://github.com/teddywah…
teddywahle Aug 31, 2020
29a032d
Update percentiles.go
teddywahle Aug 31, 2020
dba69f4
Apply suggestions from code review
teddywahle Aug 31, 2020
28799cb
Merge branch 'master' into graphite-delay-function
teddywahle Aug 31, 2020
117e24f
Update builtin_functions.go
teddywahle Aug 31, 2020
a3b7abb
Apply suggestions from code review
teddywahle Aug 31, 2020
a2ab5aa
Apply suggestions from code review
teddywahle Aug 31, 2020
5ff388b
Merge branch 'master' into graphite-delay-function
teddywahle Sep 1, 2020
b9a5ecc
clean up delay and add comments
teddywahle Sep 1, 2020
09ed1ce
Merge branch 'master' into graphite-delay-function
teddywahle Sep 2, 2020
9e70f82
Merge branch 'master' into graphite-delay-function
teddywahle Sep 3, 2020
9be8bfa
removed contextShift from delay function
teddywahle Sep 3, 2020
99852b2
updated testdelay function
teddywahle Sep 3, 2020
0184b23
Merge branch 'master' into graphite-delay-function
teddywahle Sep 3, 2020
ad6fc10
Merge branch 'master' into graphite-delay-function
teddywahle Sep 4, 2020
f5e30d3
Update src/query/graphite/native/builtin_functions.go
teddywahle Sep 4, 2020
1d54439
Updated delay after PR review
teddywahle Sep 4, 2020
c545823
Merge branch 'master' into graphite-delay-function
teddywahle Sep 7, 2020
d20b425
Merge branch 'master' into graphite-delay-function
robskillington Sep 8, 2020
5aa39ee
Update src/query/graphite/native/builtin_functions.go
robskillington Sep 8, 2020
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
125 changes: 124 additions & 1 deletion src/query/graphite/native/builtin_functions.go
Original file line number Diff line number Diff line change
Expand Up @@ -199,7 +199,6 @@ func limit(_ *common.Context, series singlePathSpec, n int) (ts.SeriesList, erro
return r, nil
}

// timeShift draws the selected metrics shifted in time. If no sign is given, a minus sign ( - ) is
// implied which will shift the metric back in time. If a plus sign ( + ) is given, the metric will
teddywahle marked this conversation as resolved.
Show resolved Hide resolved
// be shifted forward in time
func timeShift(
Expand All @@ -215,6 +214,7 @@ func timeShift(
}

shift, err := common.ParseInterval(timeShiftS)

teddywahle marked this conversation as resolved.
Show resolved Hide resolved
if err != nil {
return nil, errors.NewInvalidParamsError(fmt.Errorf("invalid timeShift parameter %s: %v", timeShiftS, err))
}
Expand Down Expand Up @@ -242,6 +242,50 @@ func timeShift(
}, nil
}

// implied which will shift the metric back in time. If a plus sign ( + ) is given, the metric will
// be shifted forward in time
teddywahle marked this conversation as resolved.
Show resolved Hide resolved
func delay(
ctx *common.Context,
_ singlePathSpec,
steps int,
) (*unaryContextShifter, error) {

contextShiftingFn := func(c *common.Context) *common.Context {
opts := common.NewChildContextOptions()
opts.AdjustTimeRange(0, 0, 0, 0)
childCtx := c.NewChildContext(opts)
return childCtx
}

transformerFn := func(input ts.SeriesList) (ts.SeriesList, error) {
output := make([]*ts.Series, input.Len())

for i, series := range input.Values {

delayedVals := delayValues(ctx, *series, steps)
delayedSeries := ts.NewSeries(ctx, series.Name(), series.StartTime(), delayedVals)
renamedSeries := delayedSeries.RenamedTo(fmt.Sprintf("delay(%s, %s)", delayedSeries.Name(), steps))
output[i] = renamedSeries
Copy link
Collaborator

@robskillington robskillington Sep 4, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: Prefer to pre-allocate slice but use append for collecting values.

e.g.

output := make([]*ts.Series, 0, input.Len())
// ...
output = append(output, renamedSeries)

We find using this wherever possible causes less bugs in general (i.e. if you have a continue that skips some entries in future in the middle of the loop, etc).


}
input.Values = output
return input, nil
}

return &unaryContextShifter{
ContextShiftFunc: contextShiftingFn,
UnaryTransformer: transformerFn,
}, nil
}

func delayValues(ctx *common.Context, series ts.Series, steps int ) (ts.Values) {
output := ts.NewValues(ctx, series.MillisPerStep(), series.Len())
for i := steps; i < series.Len(); i++ {
output.SetValueAt(i, series.ValueAt(i - steps))
}
return output
}

// absolute returns the absolute value of each element in the series.
func absolute(ctx *common.Context, input singlePathSpec) (ts.SeriesList, error) {
return transform(ctx, input,
Expand Down Expand Up @@ -1658,6 +1702,83 @@ func movingMedian(ctx *common.Context, _ singlePathSpec, windowSize string) (*bi
}, nil
}

// movingSum takes one metric or a wildcard seriesList followed by a a quoted string
// with a length of time like '1hour' or '5min'. Graphs the sum of the preceding
// datapoints for each point on the graph. All previous datapoints are set to None at
// the beginning of the graph.
func movingSum(ctx *common.Context, _ singlePathSpec, windowSize string) (*binaryContextShifter, error) {
interval, err := common.ParseInterval(windowSize)
if err != nil {
return nil, err
}

if interval <= 0 {
return nil, common.ErrInvalidIntervalFormat
}

contextShiftingFn := func(c *common.Context) *common.Context {
opts := common.NewChildContextOptions()
opts.AdjustTimeRange(0, 0, interval, 0)
childCtx := c.NewChildContext(opts)
return childCtx
}

bootstrapStartTime, bootstrapEndTime := ctx.StartTime.Add(-interval), ctx.StartTime
transformerFn := func(bootstrapped, original ts.SeriesList) (ts.SeriesList, error) {
bootstrapList, err := combineBootstrapWithOriginal(ctx,
bootstrapStartTime, bootstrapEndTime,
bootstrapped, singlePathSpec(original))
if err != nil {
return ts.NewSeriesList(), err
}

results := make([]*ts.Series, 0, original.Len())

for i, bootstrap := range bootstrapList.Values {
series := original.Values[i]
windowPoints := int(interval / (time.Duration(series.MillisPerStep()) * time.Millisecond))
if windowPoints <= 0 {
err := errors.NewInvalidParamsError(fmt.Errorf(
"non positive window points, windowSize=%s, stepSize=%d",
windowSize, series.MillisPerStep()))
return ts.NewSeriesList(), err
}
window := make([]float64, windowPoints)
util.Memset(window, math.NaN())
numSteps := series.Len()
offset := bootstrap.Len() - numSteps
vals := ts.NewValues(ctx, series.MillisPerStep(), numSteps)
for i := 0; i < numSteps; i++ {
for j := i + offset - windowPoints; j < i+offset; j++ {
if j < 0 || j >= bootstrap.Len() {
continue
}

idx := j - i - offset + windowPoints
if idx < 0 || idx > len(window)-1 {
continue
}

window[idx] = bootstrap.ValueAt(j)
}
vals.SetValueAt(i, common.SafeSum(window))
}
name := fmt.Sprintf("movingSum(%s,%q)", series.Name(), windowSize)
newSeries := ts.NewSeries(ctx, name, series.StartTime(), vals)
results = append(results, newSeries)
}

original.Values = results
return original, nil
}

return &binaryContextShifter{
ContextShiftFunc: contextShiftingFn,
BinaryTransformer: transformerFn,
}, nil
}


// legendValue takes one metric or a wildcard seriesList and a string in quotes.
// Appends a value to the metric name in the legend. Currently one or several of:
// "last", "avg", "total", "min", "max".
Expand Down Expand Up @@ -1872,6 +1993,7 @@ func init() {
MustRegisterFunction(dashed).WithDefaultParams(map[uint8]interface{}{
2: 5.0, // dashLength
})
MustRegisterFunction(delay)
MustRegisterFunction(derivative)
MustRegisterFunction(diffSeries)
MustRegisterFunction(divideSeries)
Expand Down Expand Up @@ -1906,6 +2028,7 @@ func init() {
MustRegisterFunction(mostDeviant)
MustRegisterFunction(movingAverage)
MustRegisterFunction(movingMedian)
MustRegisterFunction(movingSum)
teddywahle marked this conversation as resolved.
Show resolved Hide resolved
MustRegisterFunction(multiplySeries)
MustRegisterFunction(nonNegativeDerivative).WithDefaultParams(map[uint8]interface{}{
2: math.NaN(), // maxValue
Expand Down
84 changes: 81 additions & 3 deletions src/query/graphite/native/builtin_functions_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -685,6 +685,22 @@ func TestMovingAverageError(t *testing.T) {
testMovingFunctionError(t, "movingAverage(foo.bar.baz, 0)")
}

func TestMovingSumSuccess(t *testing.T) {
values := []float64{12.0, 19.0, -10.0, math.NaN(), 10.0}
bootstrap := []float64{3.0, 4.0, 5.0}
expected := []float64{12.0, 21.0, 36.0, 21.0, 9.0} // (3+4+5), (4+5+12), (5+12+19), (12+19-10), (19-10+Nan)

testMovingFunction(t, "movingSum(foo.bar.baz, '30s')", "movingSum(foo.bar.baz,\"30s\")", values, bootstrap, expected)
testMovingFunction(t, "movingSum(foo.bar.baz, '30s')", "movingSum(foo.bar.baz,3)", nil, nil, nil)

bootstrapEntireSeries := []float64{3.0, 4.0, 5.0, 12.0, 19.0, -10.0, math.NaN(), 10.0}
testMovingFunction(t, "movingSum(foo.bar.baz, '30s')", "movingSum(foo.bar.baz,\"30s\")", values, bootstrapEntireSeries, expected)
}

func TestMovingSumError(t *testing.T) {
testMovingFunctionError(t, "movingSum(foo.bar.baz, '-30s')")
}

teddywahle marked this conversation as resolved.
Show resolved Hide resolved
func TestIsNonNull(t *testing.T) {
ctx := common.NewTestContext()
defer ctx.Close()
Expand Down Expand Up @@ -2547,14 +2563,14 @@ func TestMovingMedianInvalidLimits(t *testing.T) {
func TestMovingMismatchedLimits(t *testing.T) {
// NB: this tests the behavior when query limits do not snap exactly to data
// points. When limits do not snap exactly, the first point should be omitted.
for _, fn := range []string{"movingAverage", "movingMedian"} {
for _, fn := range []string{"movingAverage", "movingMedian", "movingSum"} {
teddywahle marked this conversation as resolved.
Show resolved Hide resolved
for i := time.Duration(0); i < time.Minute; i += time.Second {
testMovingAverageInvalidLimits(t, fn, i)
testMovingFunctionInvalidLimits(t, fn, i)
teddywahle marked this conversation as resolved.
Show resolved Hide resolved
}
}
}

func testMovingAverageInvalidLimits(t *testing.T, fn string, offset time.Duration) {
func testMovingFunctionInvalidLimits(t *testing.T, fn string, offset time.Duration) {
teddywahle marked this conversation as resolved.
Show resolved Hide resolved
ctrl := xgomock.NewController(t)
defer ctrl.Finish()

Expand Down Expand Up @@ -2840,6 +2856,66 @@ func TestTimeShift(t *testing.T) {
[]common.TestSeries{expected}, res.Values)
}

func TestDelay(t *testing.T) {
var values = [3][]float64{
{54.0, 48.0, 92.0, 54.0, 14.0, 1.2},
{4.0, 5.0, math.NaN(), 6.4, 7.2, math.NaN()},
{math.NaN(), 8.0, 9.0, 10.6, 11.2, 12.2},
}
expected := [3][]float64{
{math.NaN(), math.NaN(), math.NaN(), 54.0, 48.0, 92.0},
{math.NaN(), math.NaN(), math.NaN(), 4.0, 5.0, math.NaN()},
{math.NaN(), math.NaN(), math.NaN(), math.NaN(), 8.0, 9.0},
}

for index, value := range values {
e := expected[index]
testDelay(t, "delay(foo.bar.baz, 3)", "delay(foo.bar.baz,3)", value, e)
}
}

var (
testDelayBootstrap = testMovingAverageStart.Add(-30 * time.Second)
testDelayStart = time.Now().Truncate(time.Minute)
testDelayEnd = testMovingAverageStart.Add(time.Minute)
)

func testDelay(t *testing.T, target, expectedName string, values, output []float64) {
ctx := common.NewTestContext()
defer ctx.Close()

emptyBootstrap := []float64{}
engine := NewEngine(
&common.MovingAverageStorage{
StepMillis: 10000,
Values: values,
Bootstrap: emptyBootstrap,
BootstrapStart: testDelayBootstrap,
},
)
phonyContext := common.NewContext(common.ContextOptions{
Start: testDelayStart,
End: testDelayEnd,
Engine: engine,
})

expr, err := phonyContext.Engine.(*Engine).Compile(target)
require.NoError(t, err)
res, err := expr.Execute(phonyContext)
require.NoError(t, err)
var expected []common.TestSeries

if output != nil {
expectedSeries := common.TestSeries{
Name: expectedName,
Data: output,
}
expected = append(expected, expectedSeries)
}
common.CompareOutputsAndExpected(t, 10000, testDelayStart,
expected, res.Values)
}

func TestDashed(t *testing.T) {
ctx := common.NewTestContext()
defer ctx.Close()
Expand Down Expand Up @@ -2927,6 +3003,7 @@ func TestFunctionsRegistered(t *testing.T) {
"currentAbove",
"currentBelow",
"dashed",
"delay",
"derivative",
"diffSeries",
"divideSeries",
Expand Down Expand Up @@ -2960,6 +3037,7 @@ func TestFunctionsRegistered(t *testing.T) {
"mostDeviant",
"movingAverage",
"movingMedian",
"movingSum",
teddywahle marked this conversation as resolved.
Show resolved Hide resolved
"multiplySeries",
"nonNegativeDerivative",
"nPercentile",
Expand Down