Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[query] Implemented the Graphite highest and lowest functions #2623

Merged
merged 16 commits into from
Sep 23, 2020
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
78 changes: 55 additions & 23 deletions src/query/graphite/native/builtin_functions.go
Original file line number Diff line number Diff line change
Expand Up @@ -562,36 +562,52 @@ func takeByFunction(input singlePathSpec, n int, sr ts.SeriesReducer, sort ts.Di
return common.Head(r, n)
}

func getReducer(f string) (ts.SeriesReducer, error) {
sa := ts.SeriesReducerApproach(f)
r, ok := sa.SafeReducer()
if !ok {
return r, errors.NewInvalidParamsError(fmt.Errorf("invalid function %s", f))
}
return r, nil
}

// highest takes one metric or a wildcard seriesList followed by an integer N and an aggregation function.
// Out of all metrics passed, draws only the N metrics with the highest
// aggregated value over the time period specified.
func highest(_ *common.Context, input singlePathSpec, n int, f string) (ts.SeriesList, error) {
reducer, err := getReducer(f)
if err != nil {
return ts.NewSeriesList(), err
}
return takeByFunction(input, n, reducer, ts.Descending)
}

// highestSum takes one metric or a wildcard seriesList followed by an integer
// n. Out of all metrics passed, draws only the N metrics with the highest
// total value in the time period specified.
func highestSum(_ *common.Context, input singlePathSpec, n int) (ts.SeriesList, error) {
sr := ts.SeriesReducerSum.Reducer()
return takeByFunction(input, n, sr, ts.Descending)
func highestSum(ctx *common.Context, input singlePathSpec, n int) (ts.SeriesList, error) {
return highest(ctx, input, n, "sum")
}

// highestMax takes one metric or a wildcard seriesList followed by an integer
// n. Out of all metrics passed, draws only the N metrics with the highest
// maximum value in the time period specified.
func highestMax(_ *common.Context, input singlePathSpec, n int) (ts.SeriesList, error) {
sr := ts.SeriesReducerMax.Reducer()
return takeByFunction(input, n, sr, ts.Descending)
func highestMax(ctx *common.Context, input singlePathSpec, n int) (ts.SeriesList, error) {
return highest(ctx, input, n, "max")
}

// highestCurrent takes one metric or a wildcard seriesList followed by an
// integer n. Out of all metrics passed, draws only the N metrics with the
// highest value at the end of the time period specified.
func highestCurrent(_ *common.Context, input singlePathSpec, n int) (ts.SeriesList, error) {
sr := ts.SeriesReducerLast.Reducer()
return takeByFunction(input, n, sr, ts.Descending)
func highestCurrent(ctx *common.Context, input singlePathSpec, n int) (ts.SeriesList, error) {
return highest(ctx, input, n, "current")
}

// highestAverage takes one metric or a wildcard seriesList followed by an
// integer n. Out of all metrics passed, draws only the top N metrics with the
// highest average value for the time period specified.
func highestAverage(_ *common.Context, input singlePathSpec, n int) (ts.SeriesList, error) {
sr := ts.SeriesReducerAvg.Reducer()
return takeByFunction(input, n, sr, ts.Descending)
func highestAverage(ctx *common.Context, input singlePathSpec, n int) (ts.SeriesList, error) {
return highest(ctx, input, n, "average")
}

// fallbackSeries takes one metric or a wildcard seriesList, and a second fallback metric.
Expand All @@ -608,25 +624,35 @@ func fallbackSeries(_ *common.Context, input singlePathSpec, fallback singlePath
// N. Draws the N most deviant metrics. To find the deviants, the standard
// deviation (sigma) of each series is taken and ranked. The top N standard
// deviations are returned.
func mostDeviant(_ *common.Context, input singlePathSpec, n int) (ts.SeriesList, error) {
sr := ts.SeriesReducerStdDev.Reducer()
return takeByFunction(input, n, sr, ts.Descending)
func mostDeviant(ctx *common.Context, input singlePathSpec, n int) (ts.SeriesList, error) {
return highest(ctx, input, n, "stddev")
}

// lowest takes one metric or a wildcard seriesList followed by an integer N and an aggregation function.
// Out of all metrics passed, draws only the N metrics with the lowest
// aggregated value over the time period specified.
func lowest(_ *common.Context, input singlePathSpec, n int, f string) (ts.SeriesList, error) {
reducer, err := getReducer(f)
if err != nil {
return ts.NewSeriesList(), err
}
return takeByFunction(input, n, reducer, ts.Ascending)
}

// lowestAverage takes one metric or a wildcard seriesList followed by an
// integer n. Out of all metrics passed, draws only the top N metrics with the
// lowest average value for the time period specified.
func lowestAverage(_ *common.Context, input singlePathSpec, n int) (ts.SeriesList, error) {
sr := ts.SeriesReducerAvg.Reducer()
return takeByFunction(input, n, sr, ts.Ascending)
func lowestAverage(ctx *common.Context, input singlePathSpec, n int) (ts.SeriesList, error) {
return lowest(ctx, input, n, "average")
}

// lowestCurrent takes one metric or a wildcard seriesList followed by an
// integer n. Out of all metrics passed, draws only the N metrics with the
// lowest value at the end of the time period specified.
func lowestCurrent(_ *common.Context, input singlePathSpec, n int) (ts.SeriesList, error) {
sr := ts.SeriesReducerLast.Reducer()
return takeByFunction(input, n, sr, ts.Ascending)
func lowestCurrent(ctx *common.Context, input singlePathSpec, n int) (ts.SeriesList, error) {
return lowest(ctx, input, n, "current")
}

teddywahle marked this conversation as resolved.
Show resolved Hide resolved
}

// windowSizeFunc calculates window size for moving average calculation
Expand Down Expand Up @@ -1657,8 +1683,6 @@ func aggregateLine(ctx *common.Context, seriesList singlePathSpec, f string) (ts
Values: []*ts.Series{renamed},
Metadata: seriesList.Metadata,
}, nil
teddywahle marked this conversation as resolved.
Show resolved Hide resolved
}

// changed takes one metric or a wildcard seriesList.
// Output 1 when the value changed, 0 when null or the same.
func changed(ctx *common.Context, seriesList singlePathSpec) (ts.SeriesList, error) {
Expand Down Expand Up @@ -1970,6 +1994,10 @@ func init() {
MustRegisterFunction(group)
MustRegisterFunction(groupByNode)
MustRegisterFunction(groupByNodes)
MustRegisterFunction(highest).WithDefaultParams(map[uint8]interface{}{
2: 1, // n,
3: "average", // f
})
MustRegisterFunction(highestAverage)
MustRegisterFunction(highestCurrent)
MustRegisterFunction(highestMax)
Expand All @@ -1988,6 +2016,10 @@ func init() {
MustRegisterFunction(logarithm).WithDefaultParams(map[uint8]interface{}{
2: 10, // base
})
MustRegisterFunction(lowest).WithDefaultParams(map[uint8]interface{}{
2: 1, // n,
3: "average", // f
})
MustRegisterFunction(lowestAverage)
MustRegisterFunction(lowestCurrent)
MustRegisterFunction(maxSeries)
Expand Down
11 changes: 10 additions & 1 deletion src/query/graphite/ts/series_reducer.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,11 +28,16 @@ type SeriesReducerApproach string
// The standard set of reducers
const (
SeriesReducerAvg SeriesReducerApproach = "avg"
SeriesReducerSum SeriesReducerApproach = "total"
SeriesReducerSum SeriesReducerApproach = "sum"
SeriesReducerMin SeriesReducerApproach = "min"
SeriesReducerMax SeriesReducerApproach = "max"
SeriesReducerStdDev SeriesReducerApproach = "stddev"
SeriesReducerLast SeriesReducerApproach = "last"

SeriesReducerAverage SeriesReducerApproach = "average" // alias for "avg"
SeriesReducerTotal SeriesReducerApproach = "total" // alias for "sum"
SeriesReducerCurrent SeriesReducerApproach = "current" // alias for "last"

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No empty endline before end of const block.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed.

teddywahle marked this conversation as resolved.
Show resolved Hide resolved
)

// SeriesReducer reduces a series to a single value.
Expand All @@ -56,9 +61,13 @@ func (sa SeriesReducerApproach) Reducer() SeriesReducer {

var seriesReducers = map[SeriesReducerApproach]SeriesReducer{
SeriesReducerAvg: func(b *Series) float64 { return b.SafeAvg() },
SeriesReducerAverage: func(b *Series) float64 { return b.SafeAvg() },
SeriesReducerTotal: func(b *Series) float64 { return b.SafeSum() },
SeriesReducerSum: func(b *Series) float64 { return b.SafeSum() },
SeriesReducerMin: func(b *Series) float64 { return b.SafeMin() },
SeriesReducerMax: func(b *Series) float64 { return b.SafeMax() },
SeriesReducerStdDev: func(b *Series) float64 { return b.SafeStdDev() },
SeriesReducerLast: func(b *Series) float64 { return b.SafeLastValue() },
SeriesReducerCurrent: func(b *Series) float64 { return b.SafeLastValue() },
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you run gofmt on this file? Seems to be not aligned (which gofmt fixes ":" alignment).

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done!


}