Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[query] Implemented the Graphite highest and lowest functions #2623

Merged
merged 16 commits into from
Sep 23, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
74 changes: 53 additions & 21 deletions src/query/graphite/native/builtin_functions.go
Original file line number Diff line number Diff line change
Expand Up @@ -562,36 +562,52 @@ func takeByFunction(input singlePathSpec, n int, sr ts.SeriesReducer, sort ts.Di
return common.Head(r, n)
}

func getReducer(f string) (ts.SeriesReducer, error) {
sa := ts.SeriesReducerApproach(f)
r, ok := sa.SafeReducer()
if !ok {
return r, errors.NewInvalidParamsError(fmt.Errorf("invalid function %s", f))
}
return r, nil
}

// highest takes one metric or a wildcard seriesList followed by an integer N and an aggregation function.
// Out of all metrics passed, draws only the N metrics with the highest
// aggregated value over the time period specified.
func highest(_ *common.Context, input singlePathSpec, n int, f string) (ts.SeriesList, error) {
reducer, err := getReducer(f)
if err != nil {
return ts.NewSeriesList(), err
}
return takeByFunction(input, n, reducer, ts.Descending)
}

// highestSum takes one metric or a wildcard seriesList followed by an integer
// n. Out of all metrics passed, draws only the N metrics with the highest
// total value in the time period specified.
func highestSum(_ *common.Context, input singlePathSpec, n int) (ts.SeriesList, error) {
sr := ts.SeriesReducerSum.Reducer()
return takeByFunction(input, n, sr, ts.Descending)
func highestSum(ctx *common.Context, input singlePathSpec, n int) (ts.SeriesList, error) {
return highest(ctx, input, n, "sum")
}

// highestMax takes one metric or a wildcard seriesList followed by an integer
// n. Out of all metrics passed, draws only the N metrics with the highest
// maximum value in the time period specified.
func highestMax(_ *common.Context, input singlePathSpec, n int) (ts.SeriesList, error) {
sr := ts.SeriesReducerMax.Reducer()
return takeByFunction(input, n, sr, ts.Descending)
func highestMax(ctx *common.Context, input singlePathSpec, n int) (ts.SeriesList, error) {
return highest(ctx, input, n, "max")
}

// highestCurrent takes one metric or a wildcard seriesList followed by an
// integer n. Out of all metrics passed, draws only the N metrics with the
// highest value at the end of the time period specified.
func highestCurrent(_ *common.Context, input singlePathSpec, n int) (ts.SeriesList, error) {
sr := ts.SeriesReducerLast.Reducer()
return takeByFunction(input, n, sr, ts.Descending)
func highestCurrent(ctx *common.Context, input singlePathSpec, n int) (ts.SeriesList, error) {
return highest(ctx, input, n, "current")
}

// highestAverage takes one metric or a wildcard seriesList followed by an
// integer n. Out of all metrics passed, draws only the top N metrics with the
// highest average value for the time period specified.
func highestAverage(_ *common.Context, input singlePathSpec, n int) (ts.SeriesList, error) {
sr := ts.SeriesReducerAvg.Reducer()
return takeByFunction(input, n, sr, ts.Descending)
func highestAverage(ctx *common.Context, input singlePathSpec, n int) (ts.SeriesList, error) {
return highest(ctx, input, n, "average")
}

// fallbackSeries takes one metric or a wildcard seriesList, and a second fallback metric.
Expand All @@ -608,25 +624,33 @@ func fallbackSeries(_ *common.Context, input singlePathSpec, fallback singlePath
// N. Draws the N most deviant metrics. To find the deviants, the standard
// deviation (sigma) of each series is taken and ranked. The top N standard
// deviations are returned.
func mostDeviant(_ *common.Context, input singlePathSpec, n int) (ts.SeriesList, error) {
sr := ts.SeriesReducerStdDev.Reducer()
return takeByFunction(input, n, sr, ts.Descending)
func mostDeviant(ctx *common.Context, input singlePathSpec, n int) (ts.SeriesList, error) {
return highest(ctx, input, n, "stddev")
}

// lowest takes one metric or a wildcard seriesList followed by an integer N and an aggregation function.
// Out of all metrics passed, draws only the N metrics with the lowest
// aggregated value over the time period specified.
func lowest(_ *common.Context, input singlePathSpec, n int, f string) (ts.SeriesList, error) {
reducer, err := getReducer(f)
if err != nil {
return ts.NewSeriesList(), err
}
return takeByFunction(input, n, reducer, ts.Ascending)
}

// lowestAverage takes one metric or a wildcard seriesList followed by an
// integer n. Out of all metrics passed, draws only the top N metrics with the
// lowest average value for the time period specified.
func lowestAverage(_ *common.Context, input singlePathSpec, n int) (ts.SeriesList, error) {
sr := ts.SeriesReducerAvg.Reducer()
return takeByFunction(input, n, sr, ts.Ascending)
func lowestAverage(ctx *common.Context, input singlePathSpec, n int) (ts.SeriesList, error) {
return lowest(ctx, input, n, "average")
}

// lowestCurrent takes one metric or a wildcard seriesList followed by an
// integer n. Out of all metrics passed, draws only the N metrics with the
// lowest value at the end of the time period specified.
func lowestCurrent(_ *common.Context, input singlePathSpec, n int) (ts.SeriesList, error) {
sr := ts.SeriesReducerLast.Reducer()
return takeByFunction(input, n, sr, ts.Ascending)
func lowestCurrent(ctx *common.Context, input singlePathSpec, n int) (ts.SeriesList, error) {
return lowest(ctx, input, n, "current")
}

// windowSizeFunc calculates window size for moving average calculation
Expand Down Expand Up @@ -2173,6 +2197,10 @@ func init() {
MustRegisterFunction(group)
MustRegisterFunction(groupByNode)
MustRegisterFunction(groupByNodes)
MustRegisterFunction(highest).WithDefaultParams(map[uint8]interface{}{
2: 1, // n,
3: "average", // f
})
MustRegisterFunction(highestAverage)
MustRegisterFunction(highestCurrent)
MustRegisterFunction(highestMax)
Expand All @@ -2192,6 +2220,10 @@ func init() {
MustRegisterFunction(logarithm).WithDefaultParams(map[uint8]interface{}{
2: 10, // base
})
MustRegisterFunction(lowest).WithDefaultParams(map[uint8]interface{}{
2: 1, // n,
3: "average", // f
})
MustRegisterFunction(lowestAverage)
MustRegisterFunction(lowestCurrent)
MustRegisterFunction(maxSeries)
Expand Down
120 changes: 120 additions & 0 deletions src/query/graphite/native/builtin_functions_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1090,10 +1090,20 @@ type nIntParamGoldenData struct {
outputs []common.TestSeries
}

// nIntParamGoldenDataWithAgg holds test data for functions that take an additional "n" int parameter
// It also holds an aggregation function
type nIntParamGoldenDataWithAgg struct {
nIntParamGoldenData
aggFunc string
}

// rankingFunc selects the n lowest or highest series based on certain metric of the
// series (e.g., maximum, minimum, average).
type rankingFunc func(ctx *common.Context, input singlePathSpec, n int) (ts.SeriesList, error)

// testRanking can be used to test the ranking alias functions
// (e.g. lowestAverage, highestMax, highestAverage, lowestCurrent)
// these functions are all aliases of the "meta-ranking" functions (i.e. highest and lowest)
func testRanking(t *testing.T, ctx *common.Context, tests []nIntParamGoldenData, f rankingFunc) {
start := time.Now()
step := 100
Expand All @@ -1113,6 +1123,73 @@ func testRanking(t *testing.T, ctx *common.Context, tests []nIntParamGoldenData,
}
}

// testOrderedAggregationFunc is a helper function for testing lowest and highest
func testOrderedAggregationFunc(t *testing.T, ctx *common.Context, tests []nIntParamGoldenDataWithAgg, isLowest bool) {
f := highest
if isLowest {
f = lowest
}

start := time.Now()
step := 100
for _, test := range tests {
input := singlePathSpec{Values: generateSeriesList(ctx, start, test.inputs, step)}
outputs, err := f(ctx, input, test.n, test.aggFunc)

if test.n < 0 {
require.NotNil(t, err)
require.Equal(t, "n must be positive", err.Error())
assert.Nil(t, outputs.Values, "Nil timeseries should be returned")
continue
}

require.NoError(t, err)
common.CompareOutputsAndExpected(t, step, start,
test.outputs, outputs.Values)
}
}

func TestHighest(t *testing.T) {
ctx := common.NewTestContext()
defer ctx.Close()

tests := []nIntParamGoldenDataWithAgg{
{
nIntParamGoldenData{
testInput,
0,
nil,
},
"sum",
},
{
nIntParamGoldenData{
testInput,
1,
[]common.TestSeries{testInput[0]},
},
"current",
},
{
nIntParamGoldenData{
testInput,
2,
[]common.TestSeries{testInput[4], testInput[2]},
},
"average",
},
{
nIntParamGoldenData{
testInput,
len(testInput) + 10, // force sort
[]common.TestSeries{testInput[0], testInput[3], testInput[4], testInput[2], testInput[1]},
},
"last",
},
}
testOrderedAggregationFunc(t, ctx, tests, false)
}

func TestHighestCurrent(t *testing.T) {
ctx := common.NewTestContext()
defer ctx.Close()
Expand Down Expand Up @@ -1283,6 +1360,47 @@ func TestMostDeviant(t *testing.T) {
testRanking(t, ctx, tests, mostDeviant)
}

func TestLowest(t *testing.T) {
ctx := common.NewTestContext()
defer ctx.Close()

tests := []nIntParamGoldenDataWithAgg{
{
nIntParamGoldenData{
testInput,
0,
nil,
},
"max",
},
{
nIntParamGoldenData{
testInput,
2,
[]common.TestSeries{testInput[1], testInput[3]},
},
"sum",
},
{
nIntParamGoldenData{
testInput,
2,
[]common.TestSeries{testInput[1], testInput[2]},
},
"current",
},
{
nIntParamGoldenData{
testInput,
3,
[]common.TestSeries{testInput[1], testInput[3], testInput[0]},
},
"average",
},
}
testOrderedAggregationFunc(t, ctx, tests, true)
}

func TestLowestAverage(t *testing.T) {
ctx := common.NewTestContext()
defer ctx.Close()
Expand Down Expand Up @@ -3159,6 +3277,7 @@ func TestFunctionsRegistered(t *testing.T) {
"group",
"groupByNode",
"groupByNodes",
"highest",
"highestAverage",
"highestCurrent",
"highestMax",
Expand All @@ -3175,6 +3294,7 @@ func TestFunctionsRegistered(t *testing.T) {
"limit",
"log",
"logarithm",
"lowest",
"lowestAverage",
"lowestCurrent",
"max",
Expand Down
21 changes: 14 additions & 7 deletions src/query/graphite/ts/series_reducer.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,11 +28,15 @@ type SeriesReducerApproach string
// The standard set of reducers
const (
SeriesReducerAvg SeriesReducerApproach = "avg"
SeriesReducerSum SeriesReducerApproach = "total"
SeriesReducerSum SeriesReducerApproach = "sum"
SeriesReducerMin SeriesReducerApproach = "min"
SeriesReducerMax SeriesReducerApproach = "max"
SeriesReducerStdDev SeriesReducerApproach = "stddev"
SeriesReducerLast SeriesReducerApproach = "last"

SeriesReducerAverage SeriesReducerApproach = "average" // alias for "avg"
SeriesReducerTotal SeriesReducerApproach = "total" // alias for "sum"
SeriesReducerCurrent SeriesReducerApproach = "current" // alias for "last"
)

// SeriesReducer reduces a series to a single value.
Expand All @@ -55,10 +59,13 @@ func (sa SeriesReducerApproach) Reducer() SeriesReducer {
}

var seriesReducers = map[SeriesReducerApproach]SeriesReducer{
SeriesReducerAvg: func(b *Series) float64 { return b.SafeAvg() },
SeriesReducerSum: func(b *Series) float64 { return b.SafeSum() },
SeriesReducerMin: func(b *Series) float64 { return b.SafeMin() },
SeriesReducerMax: func(b *Series) float64 { return b.SafeMax() },
SeriesReducerStdDev: func(b *Series) float64 { return b.SafeStdDev() },
SeriesReducerLast: func(b *Series) float64 { return b.SafeLastValue() },
SeriesReducerAvg: func(b *Series) float64 { return b.SafeAvg() },
SeriesReducerAverage: func(b *Series) float64 { return b.SafeAvg() },
SeriesReducerTotal: func(b *Series) float64 { return b.SafeSum() },
SeriesReducerSum: func(b *Series) float64 { return b.SafeSum() },
SeriesReducerMin: func(b *Series) float64 { return b.SafeMin() },
SeriesReducerMax: func(b *Series) float64 { return b.SafeMax() },
SeriesReducerStdDev: func(b *Series) float64 { return b.SafeStdDev() },
SeriesReducerLast: func(b *Series) float64 { return b.SafeLastValue() },
SeriesReducerCurrent: func(b *Series) float64 { return b.SafeLastValue() },
}