diff --git a/src/query/graphite/native/builtin_functions.go b/src/query/graphite/native/builtin_functions.go index 22117f5c3a..8a6cc1cfcc 100644 --- a/src/query/graphite/native/builtin_functions.go +++ b/src/query/graphite/native/builtin_functions.go @@ -26,8 +26,10 @@ import ( "math" "math/rand" "regexp" + "runtime" "sort" "strings" + "sync" "time" "github.com/m3db/m3/src/query/graphite/common" @@ -35,6 +37,7 @@ import ( "github.com/m3db/m3/src/query/graphite/graphite" "github.com/m3db/m3/src/query/graphite/ts" "github.com/m3db/m3/src/query/util" + xerrors "github.com/m3db/m3/src/x/errors" ) const ( @@ -94,6 +97,66 @@ func sortByMaxima(ctx *common.Context, series singlePathSpec) (ts.SeriesList, er return highestMax(ctx, series, len(series.Values)) } +// useSeriesAbove compares the maximum of each series against the given `value`. If the series +// maximum is greater than `value`, the regular expression search and replace is +// applied against the series name to plot a related metric. +// +// e.g. given useSeriesAbove(ganglia.metric1.reqs,10,'reqs','time'), +// the response time metric will be plotted only when the maximum value of the +// corresponding request/s metric is > 10 +// Example: useSeriesAbove(ganglia.metric1.reqs,10,"reqs","time") +func useSeriesAbove(ctx *common.Context, seriesList singlePathSpec, maxAllowedValue float64, search, replace string) (ts.SeriesList, error) { + var ( + mu sync.Mutex + wg sync.WaitGroup + multiErr xerrors.MultiError + newNames []string + + output = make([]*ts.Series, 0, len(seriesList.Values)) + maxConcurrency = runtime.NumCPU() / 2 + ) + + for _, series := range seriesList.Values { + if series.SafeMax() > maxAllowedValue { + seriesName := strings.Replace(series.Name(), search, replace, -1) + newNames = append(newNames, seriesName) + } + } + + for _, newNameChunk := range chunkArrayHelper(newNames, maxConcurrency) { + if multiErr.LastError() != nil { + return ts.NewSeriesList(), multiErr.LastError() + } + + for _, newTarget := range newNameChunk { + wg.Add(1) + go func() { + defer wg.Done() + resultSeriesList, err := evaluateTarget(ctx, newTarget) + + if err != nil { + mu.Lock() + multiErr = multiErr.Add(err) + mu.Unlock() + return + } + + mu.Lock() + for _, resultSeries := range resultSeriesList.Values { + resultSeries.Specification = newTarget + output = append(output, resultSeries) + } + mu.Unlock() + }() + } + wg.Wait() + } + + r := ts.NewSeriesList() + r.Values = output + return r, nil +} + // sortByMinima sorts timeseries by the minimum value across the time period specified. func sortByMinima(ctx *common.Context, series singlePathSpec) (ts.SeriesList, error) { return lowest(ctx, series, len(series.Values), "min") @@ -2399,6 +2462,7 @@ func init() { MustRegisterFunction(transformNull).WithDefaultParams(map[uint8]interface{}{ 2: 0.0, // defaultValue }) + MustRegisterFunction(useSeriesAbove) MustRegisterFunction(weightedAverage) // alias functions - in alpha ordering diff --git a/src/query/graphite/native/builtin_functions_test.go b/src/query/graphite/native/builtin_functions_test.go index cdd104c6e3..c9d82510a7 100644 --- a/src/query/graphite/native/builtin_functions_test.go +++ b/src/query/graphite/native/builtin_functions_test.go @@ -302,6 +302,73 @@ func TestScale(t *testing.T) { } } +func TestUseSeriesAbove(t *testing.T) { + var ( + ctrl = xgomock.NewController(t) + store = storage.NewMockStorage(ctrl) + now = time.Now().Truncate(time.Hour) + engine = NewEngine(store) + startTime = now.Add(-3 * time.Minute) + endTime = now.Add(-time.Minute) + ctx = common.NewContext(common.ContextOptions{Start: startTime, End: endTime, Engine: engine}) + stepSize = 60000 + ) + + defer ctrl.Finish() + defer ctx.Close() + + store.EXPECT().FetchByQuery(gomock.Any(), "foo.bar.q.zed", gomock.Any()).DoAndReturn( + buildTestSeriesFn(stepSize, "foo.bar.q.zed")) + store.EXPECT().FetchByQuery(gomock.Any(), "foo.bar.g.zed", gomock.Any()).DoAndReturn( + buildTestSeriesFn(stepSize, "foo.bar.g.zed")) + store.EXPECT().FetchByQuery(gomock.Any(), "foo.bar.x.zed", gomock.Any()).DoAndReturn( + buildTestSeriesFn(stepSize, "foo.bar.x.zed")).Times(2) + store.EXPECT().FetchByQuery(gomock.Any(), "foo.bar.g.zed.g", gomock.Any()).Return( + &storage.FetchResult{SeriesList: []*ts.Series{ts.NewSeries(ctx, "foo.bar.g.zed.g", startTime, + common.NewTestSeriesValues(ctx, 60000, []float64{10, 20, 30}))}}, nil) + store.EXPECT().FetchByQuery(gomock.Any(), "foo.bar.q.zed.q", gomock.Any()).Return( + &storage.FetchResult{SeriesList: []*ts.Series{ts.NewSeries(ctx, "foo.bar.q.zed.q", startTime, + common.NewTestSeriesValues(ctx, 60000, []float64{1, 2, 3}))}}, nil) + + tests := []struct { + target string + expected common.TestSeries + }{ + { + "useSeriesAbove(foo.bar.q.zed, -1, 'q', 'g')", + common.TestSeries{ + Name: "foo.bar.g.zed", + Data: []float64{1.0, 1.0}, + }, + }, + // two replacements + { + "useSeriesAbove(foo.bar.g.zed.g, 15, 'g', 'q')", + common.TestSeries{ + Name: "foo.bar.q.zed.q", + Data: []float64{1.0, 2.0, 3.0}, + }, + }, + // no replacments + { + "useSeriesAbove(foo.bar.x.zed, 1, 'p', 'g')", + common.TestSeries{ + Name: "foo.bar.x.zed", + Data: []float64{2.0, 2.0}, + }, + }, + } + + for _, test := range tests { + expr, err := engine.Compile(test.target) + require.NoError(t, err) + res, err := expr.Execute(ctx) + require.NoError(t, err) + common.CompareOutputsAndExpected(t, stepSize, startTime, + []common.TestSeries{test.expected}, res.Values) + } +} + func TestPercentileOfSeriesErrors(t *testing.T) { ctx := common.NewTestContext() @@ -3421,6 +3488,7 @@ func TestFunctionsRegistered(t *testing.T) { "timeShift", "timeSlice", "transformNull", + "useSeriesAbove", "weightedAverage", }