Skip to content

Commit

Permalink
[query] Fix graphite summarize/summarize to add missing agg functions (
Browse files Browse the repository at this point in the history
  • Loading branch information
yyin-sc authored Jul 2, 2021
1 parent c35fa2d commit aa932d9
Show file tree
Hide file tree
Showing 4 changed files with 49 additions and 86 deletions.
11 changes: 11 additions & 0 deletions src/query/graphite/common/aggregation.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,7 @@ var SafeAggregationFns = map[string]SafeAggregationFn{
"range": SafeRange,
"multiply": SafeMul,
"last": SafeLast,
"count": SafeCount,
}

// SafeSort sorts the input slice and returns the number of NaNs in the input.
Expand Down Expand Up @@ -246,6 +247,16 @@ func SafeLast(input []float64) (float64, int, bool) {
return safeValues[len(safeValues)-1], nans, true
}

// SafeCount returns the number of valid values in the input slice and the number of NaNs in the input.
func SafeCount(input []float64) (float64, int, bool) {
safeValues, nans, ok := safeValues(input)
if !ok {
return 0, 0, false
}

return float64(len(safeValues)), nans, true
}

func safeValues(input []float64) ([]float64, int, bool) {
nans := 0
safeValues := make([]float64, 0, len(input))
Expand Down
10 changes: 10 additions & 0 deletions src/query/graphite/common/aggregation_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -167,6 +167,16 @@ func TestAggregationFunctions(t *testing.T) {
1,
},
},
{
input: input{
"count",
[]float64{1, 2, 3, math.NaN()},
},
output: output{
3,
1,
},
},
}

for _, test := range tests {
Expand Down
100 changes: 14 additions & 86 deletions src/query/graphite/native/summarize.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,20 +42,19 @@ func summarize(
fname = "sum"
}

safeAggFn, ok := common.SafeAggregationFns[fname]
if !ok {
return ts.NewSeriesList(), errors.NewInvalidParamsError(fmt.Errorf(
"aggregate function not supported: %s", fname))
}

interval, err := common.ParseInterval(intervalS)
if err != nil || interval <= 0 {
err := errors.NewInvalidParamsError(fmt.Errorf(
"invalid interval %s: %v", interval, err))
return ts.NewSeriesList(), err
}

f, fexists := summarizeFuncs[fname]
if !fexists {
err := errors.NewInvalidParamsError(fmt.Errorf(
"invalid func %s", fname))
return ts.NewSeriesList(), err
}

alignString := ""
if alignToFrom {
alignString = ", true"
Expand All @@ -64,7 +63,7 @@ func summarize(
results := make([]*ts.Series, len(series.Values))
for i, series := range series.Values {
name := fmt.Sprintf("summarize(%s, \"%s\", \"%s\"%s)", series.Name(), intervalS, fname, alignString)
results[i] = summarizeTimeSeries(ctx, name, series, interval, f, alignToFrom)
results[i] = summarizeTimeSeries(ctx, name, series, interval, safeAggFn, alignToFrom)
}

r := ts.SeriesList(series)
Expand All @@ -73,26 +72,22 @@ func summarize(
}

type summarizeBucket struct {
count int
accum float64
vals []float64
vals []float64
}

func summarizeTimeSeries(
ctx *common.Context,
newName string,
series *ts.Series,
interval time.Duration,
funcInfo funcInfo,
safeAggFn common.SafeAggregationFn,
alignToFrom bool,
) *ts.Series {
var (
startTimeInSecs = int(series.StartTime().Unix())
intervalInSecs = int(interval / time.Second)
intervalInMsecs = intervalInSecs * 1000
buckets = make(map[int]*summarizeBucket)
f = funcInfo.consolidationFunc
fname = funcInfo.fname
)

for i := 0; i < series.Len(); i++ {
Expand All @@ -107,15 +102,9 @@ func summarizeTimeSeries(
}

if bucket, exists := buckets[bucketInterval]; exists {
if fname == "median" {
bucket.vals = append(bucket.vals, n)
} else {
bucket.accum = f(bucket.accum, n, bucket.count)
}
bucket.count++
bucket.vals = append(bucket.vals, n)
} else {
vals := []float64{n}
buckets[bucketInterval] = &summarizeBucket{1, n, vals}
buckets[bucketInterval] = &summarizeBucket{[]float64{n}}
}
}

Expand Down Expand Up @@ -146,10 +135,9 @@ func summarizeTimeSeries(

bucket, bucketExists := buckets[bucketInterval]
if bucketExists {
if fname == "median" {
newValues.SetValueAt(i, ts.Median(bucket.vals, bucket.count))
} else {
newValues.SetValueAt(i, bucket.accum)
safeValue, _, safe := safeAggFn(bucket.vals)
if safe {
newValues.SetValueAt(i, safeValue)
}
}
}
Expand Down Expand Up @@ -193,63 +181,3 @@ func sumSpecificationFunc(series ts.SeriesList) string {
func averageSpecificationFunc(series ts.SeriesList) string {
return wrapPathExpr("averageSeries", series)
}

func maxSpecificationFunc(series ts.SeriesList) string {
return wrapPathExpr("maxSeries", series)
}

func minSpecificationFunc(series ts.SeriesList) string {
return wrapPathExpr("minSeries", series)
}

func lastSpecificationFunc(series ts.SeriesList) string {
return wrapPathExpr("lastSeries", series)
}

type funcInfo struct {
fname string
consolidationFunc ts.ConsolidationFunc
specificationFunc specificationFunc
}

var (
sumFuncInfo = funcInfo{
fname: "sum",
consolidationFunc: ts.Sum,
specificationFunc: sumSpecificationFunc,
}
maxFuncInfo = funcInfo{
fname: "max",
consolidationFunc: ts.Max,
specificationFunc: maxSpecificationFunc,
}
minFuncInfo = funcInfo{
fname: "min",
consolidationFunc: ts.Min,
specificationFunc: minSpecificationFunc,
}
lastFuncInfo = funcInfo{
fname: "last",
consolidationFunc: ts.Last,
specificationFunc: lastSpecificationFunc,
}
avgFuncInfo = funcInfo{
fname: "avg",
consolidationFunc: ts.Avg,
specificationFunc: averageSpecificationFunc,
}

summarizeFuncs = map[string]funcInfo{
"sum": sumFuncInfo,
"max": maxFuncInfo,
"min": minFuncInfo,
"last": lastFuncInfo,
"avg": avgFuncInfo,
"average": avgFuncInfo,
"sumSeries": sumFuncInfo,
"maxSeries": maxFuncInfo,
"minSeries": minFuncInfo,
"averageSeries": avgFuncInfo,
"": sumFuncInfo,
}
)
14 changes: 14 additions & 0 deletions src/query/graphite/native/summarize_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -150,6 +150,20 @@ func TestSmartSummarize(t *testing.T) {
end,
[]float64{15, 51},
},
{"smartSummarize(foo, \"40s\", \"median\")",
"40s",
"median",
start,
end,
[]float64{1.5, 5.5, 9.5},
},
{"smartSummarize(foo, \"30s\", \"median\")",
"30s",
"median",
start,
end,
[]float64{1, 4, 7, 10},
},
}

for _, test := range tests {
Expand Down

0 comments on commit aa932d9

Please sign in to comment.