From e4e0185df74de79404bbf6d47fe57c4268f0297d Mon Sep 17 00:00:00 2001 From: Theodore Wahle Date: Tue, 22 Sep 2020 07:17:30 -0700 Subject: [PATCH 01/17] added the graphite applyByNode function --- .../graphite/native/aggregation_functions.go | 93 +++++++++++++++++- .../native/aggregation_functions_test.go | 94 ++++++++++++++++++- .../graphite/native/builtin_functions.go | 3 + 3 files changed, 185 insertions(+), 5 deletions(-) diff --git a/src/query/graphite/native/aggregation_functions.go b/src/query/graphite/native/aggregation_functions.go index 9d2c1a2c2c..dc8a721f94 100644 --- a/src/query/graphite/native/aggregation_functions.go +++ b/src/query/graphite/native/aggregation_functions.go @@ -22,12 +22,13 @@ package native import ( "fmt" - "math" - "strings" - "github.com/m3db/m3/src/query/graphite/common" "github.com/m3db/m3/src/query/graphite/errors" + "github.com/m3db/m3/src/query/graphite/storage" "github.com/m3db/m3/src/query/graphite/ts" + "math" + "sort" + "strings" ) func wrapPathExpr(wrapper string, series ts.SeriesList) string { @@ -220,6 +221,92 @@ func combineSeriesWithWildcards( return r, nil } +func evaluateTarget(ctx *common.Context, target string) ([]*ts.Series, error) { + fetchOptions := storage.FetchOptions{ + StartTime: ctx.StartTime, + EndTime: ctx.EndTime, + DataOptions: storage.DataOptions{ + Timeout: ctx.Timeout, + }, + } + ctxCopy := ctx.NewChildContext(common.NewChildContextOptions()) + result, err := ctx.Engine.FetchByQuery(ctxCopy, target, fetchOptions) + if err != nil { + return nil, err + } + return result.SeriesList, nil +} + +/* +applyByNode takes a seriesList and applies some complicated function (described by a string), replacing templates with unique +prefixes of keys from the seriesList (the key is all nodes up to the index given as `nodeNum`). + +If the `newName` parameter is provided, the name of the resulting series will be given by that parameter, with any +"%" characters replaced by the unique prefix. + +Example: + +.. code-block:: none + +&target=applyByNode(servers.*.disk.bytes_free,1,"divideSeries(%.disk.bytes_free,sumSeries(%.disk.bytes_*))") + +Would find all series which match `servers.*.disk.bytes_free`, then trim them down to unique series up to the node +given by nodeNum, then fill them into the template function provided (replacing % by the prefixes). + +Additional Examples: + +Given keys of + +- `stats.counts.haproxy.web.2XX` +- `stats.counts.haproxy.web.3XX` +- `stats.counts.haproxy.web.5XX` +- `stats.counts.haproxy.microservice.2XX` +- `stats.counts.haproxy.microservice.3XX` +- `stats.counts.haproxy.microservice.5XX` + +The following will return the rate of 5XX's per service: + +.. code-block:: none + +applyByNode(stats.counts.haproxy.*.*XX, 3, "asPercent(%.5XX, sumSeries(%.*XX))", "%.pct_5XX") + +The output series would have keys `stats.counts.haproxy.web.pct_5XX` and `stats.counts.haproxy.microservice.pct_5XX`. +*/ +func applyByNode(ctx *common.Context, seriesList singlePathSpec, nodeNum int, templateFunction string, newName string) (ts.SeriesList, error) { + // using this as a set + prefixMap := make(map[string]int) + for _, series := range seriesList.Values { + prefix := strings.Join(strings.Split(series.Name(), ".")[:nodeNum + 1], ".") + prefixMap[prefix] = 1 + } + // transform to slice + prefixes := []string{} + for p := range prefixMap { + prefixes = append(prefixes, p) + } + sort.Strings(prefixes) + + var output []*ts.Series + for _, prefix := range prefixes { + newTarget := strings.ReplaceAll(templateFunction, "%", prefix) + resultSeriesList, err := evaluateTarget(ctx, newTarget) + if err != nil { + return ts.NewSeriesList(), err + } + for _, resultSeries := range resultSeriesList { + if newName != "" { + resultSeries = resultSeries.RenamedTo(strings.ReplaceAll(newName, "%", prefix)) + } + resultSeries.Specification = prefix + output = append(output, resultSeries) + } + } + + r := ts.SeriesList(seriesList) + r.Values = output + return r, nil +} + // groupByNode takes a serieslist and maps a callback to subgroups within as defined by a common node // // &target=groupByNode(foo.by-function.*.*.cpu.load5,2,"sumSeries") diff --git a/src/query/graphite/native/aggregation_functions_test.go b/src/query/graphite/native/aggregation_functions_test.go index fae525aa30..bd0a1ad727 100644 --- a/src/query/graphite/native/aggregation_functions_test.go +++ b/src/query/graphite/native/aggregation_functions_test.go @@ -22,6 +22,8 @@ package native import ( "fmt" + "github.com/golang/mock/gomock" + xgomock "github.com/m3db/m3/src/x/test" "math" "sort" "testing" @@ -331,6 +333,94 @@ func TestSumSeriesWithWildcards(t *testing.T) { } } +func TestApplyByNode(t *testing.T) { + var ( + ctrl = xgomock.NewController(t) + store = storage.NewMockStorage(ctrl) + engine = NewEngine(store) + start, _ = time.Parse(time.RFC1123, "Mon, 27 Jul 2015 19:41:19 GMT") + end, _ = time.Parse(time.RFC1123, "Mon, 27 Jul 2015 19:43:19 GMT") + ctx = common.NewContext(common.ContextOptions{Start: start, End: end, Engine: engine}) + millisPerStep = 60000 + inputs = []*ts.Series{ + ts.NewSeries(ctx, "servers.s1.disk.bytes_used", start, + common.NewTestSeriesValues(ctx, millisPerStep, []float64{10, 20, 30})), + ts.NewSeries(ctx, "servers.s1.disk.bytes_free", start, + common.NewTestSeriesValues(ctx, millisPerStep, []float64{90, 80, 70})), + ts.NewSeries(ctx, "servers.s2.disk.bytes_used", start, + common.NewTestSeriesValues(ctx, millisPerStep, []float64{1, 2, 3})), + ts.NewSeries(ctx, "servers.s2.disk.bytes_free", start, + common.NewTestSeriesValues(ctx, millisPerStep, []float64{99, 98, 97})), + } + ) + + defer ctrl.Finish() + defer ctx.Close() + + + store.EXPECT().FetchByQuery(gomock.Any(), "divideSeries(servers.s1.disk.bytes_used, sumSeries(servers.s1.disk.bytes_*))", gomock.Any()).Return( + &storage.FetchResult{SeriesList: []*ts.Series{ts.NewSeries(ctx, "divideSeries(servers.s1.disk.bytes_used,sumSeries(servers.s1.disk.bytes_used,servers.s1.disk.bytes_free))", start, + common.NewTestSeriesValues(ctx, 60000, []float64{0.10, 0.20, 0.30})) }}, nil).Times(2) + store.EXPECT().FetchByQuery(gomock.Any(), "divideSeries(servers.s2.disk.bytes_used, sumSeries(servers.s2.disk.bytes_*))", gomock.Any()).Return( + &storage.FetchResult{SeriesList: []*ts.Series{ts.NewSeries(ctx, "divideSeries(servers.s2.disk.bytes_used,sumSeries(servers.s2.disk.bytes_used,servers.s2.disk.bytes_free))", start, + common.NewTestSeriesValues(ctx, 60000, []float64{0.01, 0.02, 0.03})) }}, nil).Times(2) + + tests := []struct { + nodeNum int + templateFunction string + newName string + expectedResults []common.TestSeries + }{ + { + nodeNum: 1, + templateFunction: "divideSeries(%.disk.bytes_used, sumSeries(%.disk.bytes_*))", + newName: "", + expectedResults: []common.TestSeries{ + { + Name: "divideSeries(servers.s1.disk.bytes_used,sumSeries(servers.s1.disk.bytes_used,servers.s1.disk.bytes_free))", + Data: []float64{0.10, 0.20, 0.30}, + }, + { + Name: "divideSeries(servers.s2.disk.bytes_used,sumSeries(servers.s2.disk.bytes_used,servers.s2.disk.bytes_free))", + Data: []float64{0.01, 0.02, 0.03}, + }, + }, + }, + { + nodeNum: 1, + templateFunction: "divideSeries(%.disk.bytes_used, sumSeries(%.disk.bytes_*))", + newName: "%.disk.pct_used", + expectedResults: []common.TestSeries{ + { + Name: "servers.s1.disk.pct_used", + Data: []float64{0.10, 0.20, 0.30}, + }, + { + Name: "servers.s2.disk.pct_used", + Data: []float64{0.01, 0.02, 0.03}, + }, + }, + }, + } + + for _, test := range tests { + outSeries, err := applyByNode( + ctx, + singlePathSpec{ + Values: inputs, + }, + test.nodeNum, + test.templateFunction, + test.newName, + ) + require.NoError(t, err) + require.Equal(t, len(test.expectedResults), len(outSeries.Values)) + + outSeries, _ = sortByName(ctx, singlePathSpec(outSeries)) + common.CompareOutputsAndExpected(t, 60000, start, test.expectedResults, outSeries.Values) + } +} + func TestGroupByNode(t *testing.T) { var ( start, _ = time.Parse(time.RFC1123, "Mon, 27 Jul 2015 19:41:19 GMT") @@ -435,7 +525,7 @@ func TestGroupByNodes(t *testing.T) { tests := []struct { fname string - nodes []int + nodes []int expectedResults []result }{ {"avg", []int{2, 4}, []result{ // test normal group by nodes @@ -457,7 +547,7 @@ func TestGroupByNodes(t *testing.T) { {"pod2.500", 8 * 12}, }}, {"sum", []int{}, []result{ // test empty slice handing. - {"*", (2 + 4 + 6 + 8 + 10 + 20 + 30 + 40) * 12}, + {"*", (2 + 4 + 6 + 8 + 10 + 20 + 30 + 40) * 12}, }}, } diff --git a/src/query/graphite/native/builtin_functions.go b/src/query/graphite/native/builtin_functions.go index a8541e7c0f..504a215743 100644 --- a/src/query/graphite/native/builtin_functions.go +++ b/src/query/graphite/native/builtin_functions.go @@ -1869,6 +1869,9 @@ func init() { MustRegisterFunction(aliasByMetric) MustRegisterFunction(aliasByNode) MustRegisterFunction(aliasSub) + MustRegisterFunction(applyByNode).WithDefaultParams(map[uint8]interface{}{ + 4: "", // newName + }) MustRegisterFunction(asPercent).WithDefaultParams(map[uint8]interface{}{ 2: []*ts.Series(nil), // total }) From a81a3af39fba330f0ee9664806569faf00de520c Mon Sep 17 00:00:00 2001 From: Theodore Wahle Date: Tue, 22 Sep 2020 07:21:09 -0700 Subject: [PATCH 02/17] ran go fmt --- src/query/graphite/native/aggregation_functions.go | 11 +++++------ .../graphite/native/aggregation_functions_test.go | 11 +++++------ 2 files changed, 10 insertions(+), 12 deletions(-) diff --git a/src/query/graphite/native/aggregation_functions.go b/src/query/graphite/native/aggregation_functions.go index 4d1e00f440..0342d0ace5 100644 --- a/src/query/graphite/native/aggregation_functions.go +++ b/src/query/graphite/native/aggregation_functions.go @@ -22,8 +22,8 @@ package native import ( "fmt" - "github.com/m3db/m3/src/query/graphite/common" "github.com/m3db/m3/src/query/block" + "github.com/m3db/m3/src/query/graphite/common" "github.com/m3db/m3/src/query/graphite/errors" "github.com/m3db/m3/src/query/graphite/storage" "github.com/m3db/m3/src/query/graphite/ts" @@ -112,7 +112,7 @@ func divideSeriesHelper(ctx *common.Context, dividendSeries, divisorSeries *ts.S vals.SetValueAt(i, value) } } - + // The individual series will be named divideSeries(X, X), even if it is generated by divideSeriesLists // Based on Graphite source code (link below) // https://github.com/graphite-project/graphite-web/blob/17a34e7966f7a46eded30c2362765c74eea899cb/webapp/graphite/render/functions.py#L901 @@ -138,7 +138,7 @@ func divideSeries(ctx *common.Context, dividendSeriesList, divisorSeriesList sin divisorSeries := divisorSeriesList.Values[0] results := make([]*ts.Series, len(dividendSeriesList.Values)) for idx, dividendSeries := range dividendSeriesList.Values { - metadata := divisorSeriesList.Metadata.CombineMetadata(dividendSeriesList.Metadata) + metadata := divisorSeriesList.Metadata.CombineMetadata(dividendSeriesList.Metadata) quotientSeries, err := divideSeriesHelper(ctx, dividendSeries, divisorSeries, metadata) if err != nil { return ts.NewSeriesList(), err @@ -151,7 +151,6 @@ func divideSeries(ctx *common.Context, dividendSeriesList, divisorSeriesList sin return r, nil } - // divideSeriesLists divides one series list by another series list func divideSeriesLists(ctx *common.Context, dividendSeriesList, divisorSeriesList singlePathSpec) (ts.SeriesList, error) { if len(dividendSeriesList.Values) != len(divisorSeriesList.Values) { @@ -162,7 +161,7 @@ func divideSeriesLists(ctx *common.Context, dividendSeriesList, divisorSeriesLis results := make([]*ts.Series, len(dividendSeriesList.Values)) for idx, dividendSeries := range dividendSeriesList.Values { divisorSeries := divisorSeriesList.Values[idx] - metadata := divisorSeriesList.Metadata.CombineMetadata(dividendSeriesList.Metadata) + metadata := divisorSeriesList.Metadata.CombineMetadata(dividendSeriesList.Metadata) quotientSeries, err := divideSeriesHelper(ctx, dividendSeries, divisorSeries, metadata) if err != nil { return ts.NewSeriesList(), err @@ -334,7 +333,7 @@ func applyByNode(ctx *common.Context, seriesList singlePathSpec, nodeNum int, te // using this as a set prefixMap := make(map[string]int) for _, series := range seriesList.Values { - prefix := strings.Join(strings.Split(series.Name(), ".")[:nodeNum + 1], ".") + prefix := strings.Join(strings.Split(series.Name(), ".")[:nodeNum+1], ".") prefixMap[prefix] = 1 } // transform to slice diff --git a/src/query/graphite/native/aggregation_functions_test.go b/src/query/graphite/native/aggregation_functions_test.go index 449dd32553..728c33c175 100644 --- a/src/query/graphite/native/aggregation_functions_test.go +++ b/src/query/graphite/native/aggregation_functions_test.go @@ -316,7 +316,6 @@ func TestDivideSeriesLists(t *testing.T) { require.Error(t, err) } - func TestAverageSeriesWithWildcards(t *testing.T) { ctx, _ := newConsolidationTestSeries() defer ctx.Close() @@ -393,7 +392,6 @@ func TestSumSeriesWithWildcards(t *testing.T) { } } - func TestApplyByNode(t *testing.T) { var ( ctrl = xgomock.NewController(t) @@ -418,13 +416,12 @@ func TestApplyByNode(t *testing.T) { defer ctrl.Finish() defer ctx.Close() - store.EXPECT().FetchByQuery(gomock.Any(), "divideSeries(servers.s1.disk.bytes_used, sumSeries(servers.s1.disk.bytes_*))", gomock.Any()).Return( &storage.FetchResult{SeriesList: []*ts.Series{ts.NewSeries(ctx, "divideSeries(servers.s1.disk.bytes_used,sumSeries(servers.s1.disk.bytes_used,servers.s1.disk.bytes_free))", start, - common.NewTestSeriesValues(ctx, 60000, []float64{0.10, 0.20, 0.30})) }}, nil).Times(2) + common.NewTestSeriesValues(ctx, 60000, []float64{0.10, 0.20, 0.30}))}}, nil).Times(2) store.EXPECT().FetchByQuery(gomock.Any(), "divideSeries(servers.s2.disk.bytes_used, sumSeries(servers.s2.disk.bytes_*))", gomock.Any()).Return( &storage.FetchResult{SeriesList: []*ts.Series{ts.NewSeries(ctx, "divideSeries(servers.s2.disk.bytes_used,sumSeries(servers.s2.disk.bytes_used,servers.s2.disk.bytes_free))", start, - common.NewTestSeriesValues(ctx, 60000, []float64{0.01, 0.02, 0.03})) }}, nil).Times(2) + common.NewTestSeriesValues(ctx, 60000, []float64{0.01, 0.02, 0.03}))}}, nil).Times(2) tests := []struct { nodeNum int @@ -479,7 +476,9 @@ func TestApplyByNode(t *testing.T) { outSeries, _ = sortByName(ctx, singlePathSpec(outSeries)) common.CompareOutputsAndExpected(t, 60000, start, test.expectedResults, outSeries.Values) - + } +} + func TestAggregateWithWildcards(t *testing.T) { var ( start, _ = time.Parse(time.RFC1123, "Mon, 27 Jul 2015 19:41:19 GMT") From 122564be698983338126d70c20182f653fab214a Mon Sep 17 00:00:00 2001 From: teddywahle Date: Mon, 28 Sep 2020 12:52:06 -0700 Subject: [PATCH 03/17] fixed applyByNode --- src/query/graphite/common/engine.go | 6 ++++ .../graphite/native/aggregation_functions.go | 27 +++++--------- .../native/aggregation_functions_test.go | 36 ++++++++++++++----- src/query/graphite/native/engine.go | 4 +++ 4 files changed, 46 insertions(+), 27 deletions(-) diff --git a/src/query/graphite/common/engine.go b/src/query/graphite/common/engine.go index e5023d2c9e..7850222f26 100644 --- a/src/query/graphite/common/engine.go +++ b/src/query/graphite/common/engine.go @@ -32,6 +32,8 @@ type QueryEngine interface { query string, options storage.FetchOptions, ) (*storage.FetchResult, error) + + Storage() storage.Storage } // The Engine for running queries @@ -54,3 +56,7 @@ func (e *Engine) FetchByQuery( ) (*storage.FetchResult, error) { return e.storage.FetchByQuery(ctx, query, options) } + +func (e *Engine) Storage() storage.Storage { + return e.storage; +} diff --git a/src/query/graphite/native/aggregation_functions.go b/src/query/graphite/native/aggregation_functions.go index 0342d0ace5..a837a8b81c 100644 --- a/src/query/graphite/native/aggregation_functions.go +++ b/src/query/graphite/native/aggregation_functions.go @@ -25,7 +25,6 @@ import ( "github.com/m3db/m3/src/query/block" "github.com/m3db/m3/src/query/graphite/common" "github.com/m3db/m3/src/query/graphite/errors" - "github.com/m3db/m3/src/query/graphite/storage" "github.com/m3db/m3/src/query/graphite/ts" "math" "sort" @@ -278,22 +277,6 @@ func combineSeriesWithWildcards( return r, nil } -func evaluateTarget(ctx *common.Context, target string) ([]*ts.Series, error) { - fetchOptions := storage.FetchOptions{ - StartTime: ctx.StartTime, - EndTime: ctx.EndTime, - DataOptions: storage.DataOptions{ - Timeout: ctx.Timeout, - }, - } - ctxCopy := ctx.NewChildContext(common.NewChildContextOptions()) - result, err := ctx.Engine.FetchByQuery(ctxCopy, target, fetchOptions) - if err != nil { - return nil, err - } - return result.SeriesList, nil -} - /* applyByNode takes a seriesList and applies some complicated function (described by a string), replacing templates with unique prefixes of keys from the seriesList (the key is all nodes up to the index given as `nodeNum`). @@ -346,11 +329,17 @@ func applyByNode(ctx *common.Context, seriesList singlePathSpec, nodeNum int, te var output []*ts.Series for _, prefix := range prefixes { newTarget := strings.ReplaceAll(templateFunction, "%", prefix) - resultSeriesList, err := evaluateTarget(ctx, newTarget) + + eng := NewEngine(ctx.Engine.Storage()) + expression, err := eng.Compile(newTarget) + if err != nil { + return ts.NewSeriesList(), err + } + resultSeriesList, err := expression.Execute(ctx) if err != nil { return ts.NewSeriesList(), err } - for _, resultSeries := range resultSeriesList { + for _, resultSeries := range resultSeriesList.Values { if newName != "" { resultSeries = resultSeries.RenamedTo(strings.ReplaceAll(newName, "%", prefix)) } diff --git a/src/query/graphite/native/aggregation_functions_test.go b/src/query/graphite/native/aggregation_functions_test.go index 728c33c175..ca966fc2eb 100644 --- a/src/query/graphite/native/aggregation_functions_test.go +++ b/src/query/graphite/native/aggregation_functions_test.go @@ -142,6 +142,8 @@ type mockEngine struct { query string, options storage.FetchOptions, ) (*storage.FetchResult, error) + + storage storage.Storage } func (e mockEngine) FetchByQuery( @@ -152,6 +154,10 @@ func (e mockEngine) FetchByQuery( return e.fn(ctx, query, opts) } +func (e mockEngine) Storage() storage.Storage { + return e.storage; +} + func TestVariadicSumSeries(t *testing.T) { expr, err := Compile("sumSeries(foo.bar.*, foo.baz.*)") require.NoError(t, err) @@ -416,12 +422,26 @@ func TestApplyByNode(t *testing.T) { defer ctrl.Finish() defer ctx.Close() - store.EXPECT().FetchByQuery(gomock.Any(), "divideSeries(servers.s1.disk.bytes_used, sumSeries(servers.s1.disk.bytes_*))", gomock.Any()).Return( - &storage.FetchResult{SeriesList: []*ts.Series{ts.NewSeries(ctx, "divideSeries(servers.s1.disk.bytes_used,sumSeries(servers.s1.disk.bytes_used,servers.s1.disk.bytes_free))", start, - common.NewTestSeriesValues(ctx, 60000, []float64{0.10, 0.20, 0.30}))}}, nil).Times(2) - store.EXPECT().FetchByQuery(gomock.Any(), "divideSeries(servers.s2.disk.bytes_used, sumSeries(servers.s2.disk.bytes_*))", gomock.Any()).Return( - &storage.FetchResult{SeriesList: []*ts.Series{ts.NewSeries(ctx, "divideSeries(servers.s2.disk.bytes_used,sumSeries(servers.s2.disk.bytes_used,servers.s2.disk.bytes_free))", start, - common.NewTestSeriesValues(ctx, 60000, []float64{0.01, 0.02, 0.03}))}}, nil).Times(2) + store.EXPECT().FetchByQuery(gomock.Any(), "servers.s1.disk.bytes_used", gomock.Any()).Return( + &storage.FetchResult{SeriesList: []*ts.Series{ts.NewSeries(ctx, "servers.s1.disk.bytes_used", start, + common.NewTestSeriesValues(ctx, 60000, []float64{10, 20, 30}))}}, nil).Times(2) + + store.EXPECT().FetchByQuery(gomock.Any(), "servers.s1.disk.bytes_*", gomock.Any()).Return( + &storage.FetchResult{SeriesList: []*ts.Series{ts.NewSeries(ctx, "servers.s1.disk.bytes_free", start, + common.NewTestSeriesValues(ctx, 60000, []float64{90, 80, 70})), + ts.NewSeries(ctx, "servers.s1.disk.bytes_used", start, + common.NewTestSeriesValues(ctx, 60000, []float64{10, 20, 30}))}}, nil).Times(2) + + store.EXPECT().FetchByQuery(gomock.Any(), "servers.s2.disk.bytes_used", gomock.Any()).Return( + &storage.FetchResult{SeriesList: []*ts.Series{ts.NewSeries(ctx, "servers.s2.disk.bytes_used", start, + common.NewTestSeriesValues(ctx, 60000, []float64{1, 2, 3}))}}, nil).Times(2) + + store.EXPECT().FetchByQuery(gomock.Any(), "servers.s2.disk.bytes_*", gomock.Any()).Return( + &storage.FetchResult{SeriesList: []*ts.Series{ + ts.NewSeries(ctx, "servers.s2.disk.bytes_free", start, + common.NewTestSeriesValues(ctx, 60000, []float64{99, 98, 97})), + ts.NewSeries(ctx, "servers.s2.disk.bytes_used", start, + common.NewTestSeriesValues(ctx, 60000, []float64{1, 2, 3}))}}, nil).Times(2) tests := []struct { nodeNum int @@ -435,11 +455,11 @@ func TestApplyByNode(t *testing.T) { newName: "", expectedResults: []common.TestSeries{ { - Name: "divideSeries(servers.s1.disk.bytes_used,sumSeries(servers.s1.disk.bytes_used,servers.s1.disk.bytes_free))", + Name: "divideSeries(servers.s1.disk.bytes_used,sumSeries(servers.s1.disk.bytes_*))", Data: []float64{0.10, 0.20, 0.30}, }, { - Name: "divideSeries(servers.s2.disk.bytes_used,sumSeries(servers.s2.disk.bytes_used,servers.s2.disk.bytes_free))", + Name: "divideSeries(servers.s2.disk.bytes_used,sumSeries(servers.s2.disk.bytes_*))", Data: []float64{0.01, 0.02, 0.03}, }, }, diff --git a/src/query/graphite/native/engine.go b/src/query/graphite/native/engine.go index a1a9d234b8..5671723cf0 100644 --- a/src/query/graphite/native/engine.go +++ b/src/query/graphite/native/engine.go @@ -52,3 +52,7 @@ func (e *Engine) FetchByQuery( func (e *Engine) Compile(s string) (Expression, error) { return Compile(s) } + +func (e *Engine) Storage() storage.Storage { + return e.storage; +} From 71567f2e3e89b58c2d18ee696d68150ae624699b Mon Sep 17 00:00:00 2001 From: teddywahle <69990143+teddywahle@users.noreply.github.com> Date: Tue, 29 Sep 2020 10:04:32 -0700 Subject: [PATCH 04/17] Update aggregation_functions.go --- src/query/graphite/native/aggregation_functions.go | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/src/query/graphite/native/aggregation_functions.go b/src/query/graphite/native/aggregation_functions.go index a837a8b81c..d4dab0c127 100644 --- a/src/query/graphite/native/aggregation_functions.go +++ b/src/query/graphite/native/aggregation_functions.go @@ -22,13 +22,14 @@ package native import ( "fmt" + "math" + "sort" + "strings" + "github.com/m3db/m3/src/query/block" "github.com/m3db/m3/src/query/graphite/common" "github.com/m3db/m3/src/query/graphite/errors" "github.com/m3db/m3/src/query/graphite/ts" - "math" - "sort" - "strings" ) func wrapPathExpr(wrapper string, series ts.SeriesList) string { From 081c11f86e374a27fd7aa824d54cbaa0f287d143 Mon Sep 17 00:00:00 2001 From: teddywahle <69990143+teddywahle@users.noreply.github.com> Date: Tue, 29 Sep 2020 10:05:37 -0700 Subject: [PATCH 05/17] Apply suggestions from code review --- src/query/graphite/common/engine.go | 1 + 1 file changed, 1 insertion(+) diff --git a/src/query/graphite/common/engine.go b/src/query/graphite/common/engine.go index 7850222f26..e70fd58845 100644 --- a/src/query/graphite/common/engine.go +++ b/src/query/graphite/common/engine.go @@ -57,6 +57,7 @@ func (e *Engine) FetchByQuery( return e.storage.FetchByQuery(ctx, query, options) } +// Storage returns the engine's storage object func (e *Engine) Storage() storage.Storage { return e.storage; } From 5b91c51b65d11cf626d1a9073fa75c16ea7c387f Mon Sep 17 00:00:00 2001 From: teddywahle <69990143+teddywahle@users.noreply.github.com> Date: Tue, 29 Sep 2020 10:06:29 -0700 Subject: [PATCH 06/17] Update src/query/graphite/native/engine.go --- src/query/graphite/native/engine.go | 1 + 1 file changed, 1 insertion(+) diff --git a/src/query/graphite/native/engine.go b/src/query/graphite/native/engine.go index 5671723cf0..6a78535f4e 100644 --- a/src/query/graphite/native/engine.go +++ b/src/query/graphite/native/engine.go @@ -53,6 +53,7 @@ func (e *Engine) Compile(s string) (Expression, error) { return Compile(s) } +// Storage returns the engine's storage object func (e *Engine) Storage() storage.Storage { return e.storage; } From c57bd8486d7948e70f3ab8d411f2d62410c88268 Mon Sep 17 00:00:00 2001 From: teddywahle <69990143+teddywahle@users.noreply.github.com> Date: Tue, 29 Sep 2020 10:07:43 -0700 Subject: [PATCH 07/17] Update aggregation_functions.go --- src/query/graphite/native/aggregation_functions.go | 8 ++------ 1 file changed, 2 insertions(+), 6 deletions(-) diff --git a/src/query/graphite/native/aggregation_functions.go b/src/query/graphite/native/aggregation_functions.go index d4dab0c127..a136843352 100644 --- a/src/query/graphite/native/aggregation_functions.go +++ b/src/query/graphite/native/aggregation_functions.go @@ -287,9 +287,7 @@ If the `newName` parameter is provided, the name of the resulting series will be Example: -.. code-block:: none - -&target=applyByNode(servers.*.disk.bytes_free,1,"divideSeries(%.disk.bytes_free,sumSeries(%.disk.bytes_*))") +`applyByNode(servers.*.disk.bytes_free,1,"divideSeries(%.disk.bytes_free,sumSeries(%.disk.bytes_*))")` Would find all series which match `servers.*.disk.bytes_free`, then trim them down to unique series up to the node given by nodeNum, then fill them into the template function provided (replacing % by the prefixes). @@ -307,9 +305,7 @@ Given keys of The following will return the rate of 5XX's per service: -.. code-block:: none - -applyByNode(stats.counts.haproxy.*.*XX, 3, "asPercent(%.5XX, sumSeries(%.*XX))", "%.pct_5XX") +`applyByNode(stats.counts.haproxy.*.*XX, 3, "asPercent(%.5XX, sumSeries(%.*XX))", "%.pct_5XX")` The output series would have keys `stats.counts.haproxy.web.pct_5XX` and `stats.counts.haproxy.microservice.pct_5XX`. */ From 8a4d9cfddd8195bc1ce87eb360d3fe86247e847e Mon Sep 17 00:00:00 2001 From: teddywahle <69990143+teddywahle@users.noreply.github.com> Date: Tue, 29 Sep 2020 10:07:54 -0700 Subject: [PATCH 08/17] Update aggregation_functions.go From 8095b6c886aaad08aef4de0e00ffe78901f114b9 Mon Sep 17 00:00:00 2001 From: teddywahle Date: Tue, 29 Sep 2020 11:36:49 -0700 Subject: [PATCH 09/17] added changes for artem --- .../graphite/native/aggregation_functions.go | 132 +++++++++++++++--- 1 file changed, 112 insertions(+), 20 deletions(-) diff --git a/src/query/graphite/native/aggregation_functions.go b/src/query/graphite/native/aggregation_functions.go index a136843352..4a46188af2 100644 --- a/src/query/graphite/native/aggregation_functions.go +++ b/src/query/graphite/native/aggregation_functions.go @@ -23,8 +23,10 @@ package native import ( "fmt" "math" + "runtime" "sort" "strings" + "sync" "github.com/m3db/m3/src/query/block" "github.com/m3db/m3/src/query/graphite/common" @@ -278,6 +280,38 @@ func combineSeriesWithWildcards( return r, nil } +// splits a slice into chunks +func chunkArrayHelper(slice []string, numChunks int) [][]string { + var divided [][]string + + chunkSize := (len(slice) + numChunks - 1) / numChunks + + for i := 0; i < len(slice); i += chunkSize { + end := i + chunkSize + + if end > len(slice) { + end = len(slice) + } + + divided = append(divided, slice[i:end]) + } + + return divided +} + +func evaluateTarget(ctx *common.Context, target string) (ts.SeriesList, error) { + eng := NewEngine(ctx.Engine.Storage()) + expression, err := eng.Compile(target) + if err != nil { + return ts.NewSeriesList(), err + } + return expression.Execute(ctx) +} + + +// WaitGroup is used to wait for the program to finish goroutines. +var wg sync.WaitGroup + /* applyByNode takes a seriesList and applies some complicated function (described by a string), replacing templates with unique prefixes of keys from the seriesList (the key is all nodes up to the index given as `nodeNum`). @@ -311,45 +345,103 @@ The output series would have keys `stats.counts.haproxy.web.pct_5XX` and `stats. */ func applyByNode(ctx *common.Context, seriesList singlePathSpec, nodeNum int, templateFunction string, newName string) (ts.SeriesList, error) { // using this as a set - prefixMap := make(map[string]int) + prefixMap := map[string]struct{}{} for _, series := range seriesList.Values { - prefix := strings.Join(strings.Split(series.Name(), ".")[:nodeNum+1], ".") - prefixMap[prefix] = 1 + var ( + name = series.Name() + + partsSeen int + prefix string + ) + + for i, c := range name { + if c == '.' { + partsSeen++ + if partsSeen == nodeNum+1 { + prefix = name[:i] + break + } + } + } + + if len(prefix) == 0 { + continue + } + + prefixMap[prefix] = struct{}{} } + + // transform to slice - prefixes := []string{} + var prefixes []string for p := range prefixMap { prefixes = append(prefixes, p) } sort.Strings(prefixes) + maxConcurrency := runtime.NumCPU() / 2 var output []*ts.Series - for _, prefix := range prefixes { - newTarget := strings.ReplaceAll(templateFunction, "%", prefix) + for _, prefixChunk := range chunkArrayHelper(prefixes, maxConcurrency) { + for _, prefix := range prefixChunk { + newTarget := strings.ReplaceAll(templateFunction, "%", prefix) + + var resultSeriesList ts.SeriesList + nums := make(chan int) // Declare a unbuffered channel + wg.Add(1) + go evaluateTarget(ctx, newTarget) + fmt.Println(<-nums) // Read the value from unbuffered channel + resultSeriesList <- nums + wg.Wait() + close(nums) // Closes the channel - eng := NewEngine(ctx.Engine.Storage()) - expression, err := eng.Compile(newTarget) - if err != nil { - return ts.NewSeriesList(), err - } - resultSeriesList, err := expression.Execute(ctx) - if err != nil { - return ts.NewSeriesList(), err - } - for _, resultSeries := range resultSeriesList.Values { - if newName != "" { - resultSeries = resultSeries.RenamedTo(strings.ReplaceAll(newName, "%", prefix)) + if err != nil { + return ts.NewSeriesList(), err + } + + for _, resultSeries := range resultSeriesList.Values { + if newName != "" { + resultSeries = resultSeries.RenamedTo(strings.ReplaceAll(newName, "%", prefix)) + } + resultSeries.Specification = prefix + output = append(output, resultSeries) } - resultSeries.Specification = prefix - output = append(output, resultSeries) } } + + r := ts.SeriesList(seriesList) r.Values = output return r, nil } +/* +for _, series := range seriesList.Values { + var ( + name = series.Name() + + partsSeen int + prefix string + ) + + for i, c := range name { + if c == '.' { + partsSeen++ + if partsSeen == nodeNum+1 { + prefix = name[:i] + break + } + } + } + + if len(prefix) == 0 { + continue + } + + prefixMap[prefix] = struct{}{} + +*/ + // groupByNode takes a serieslist and maps a callback to subgroups within as defined by a common node // // &target=groupByNode(foo.by-function.*.*.cpu.load5,2,"sumSeries") From fdd37c1a3f664c71f6773289769e3e555c20fc55 Mon Sep 17 00:00:00 2001 From: teddywahle Date: Tue, 29 Sep 2020 13:33:43 -0700 Subject: [PATCH 10/17] updated useSeriesAbove --- .../graphite/native/aggregation_functions.go | 88 +++++++------------ 1 file changed, 32 insertions(+), 56 deletions(-) diff --git a/src/query/graphite/native/aggregation_functions.go b/src/query/graphite/native/aggregation_functions.go index 4a46188af2..2acb803b2b 100644 --- a/src/query/graphite/native/aggregation_functions.go +++ b/src/query/graphite/native/aggregation_functions.go @@ -22,6 +22,7 @@ package native import ( "fmt" + xerrors "github.com/m3db/m3/src/x/errors" "math" "runtime" "sort" @@ -308,10 +309,6 @@ func evaluateTarget(ctx *common.Context, target string) (ts.SeriesList, error) { return expression.Execute(ctx) } - -// WaitGroup is used to wait for the program to finish goroutines. -var wg sync.WaitGroup - /* applyByNode takes a seriesList and applies some complicated function (described by a string), replacing templates with unique prefixes of keys from the seriesList (the key is all nodes up to the index given as `nodeNum`). @@ -379,68 +376,47 @@ func applyByNode(ctx *common.Context, seriesList singlePathSpec, nodeNum int, te } sort.Strings(prefixes) - maxConcurrency := runtime.NumCPU() / 2 var output []*ts.Series + maxConcurrency := runtime.NumCPU() / 2 for _, prefixChunk := range chunkArrayHelper(prefixes, maxConcurrency) { - for _, prefix := range prefixChunk { - newTarget := strings.ReplaceAll(templateFunction, "%", prefix) - - var resultSeriesList ts.SeriesList - nums := make(chan int) // Declare a unbuffered channel - wg.Add(1) - go evaluateTarget(ctx, newTarget) - fmt.Println(<-nums) // Read the value from unbuffered channel - resultSeriesList <- nums - wg.Wait() - close(nums) // Closes the channel - - if err != nil { - return ts.NewSeriesList(), err - } + var ( + mu sync.Mutex + wg sync.WaitGroup + multiErr xerrors.MultiError + ) - for _, resultSeries := range resultSeriesList.Values { - if newName != "" { - resultSeries = resultSeries.RenamedTo(strings.ReplaceAll(newName, "%", prefix)) - } - resultSeries.Specification = prefix - output = append(output, resultSeries) + for i, prefix := range prefixChunk { + _, prefix := i, prefix + newTarget := strings.ReplaceAll(templateFunction, "%", prefix) + wg.Add(1) + go func() { + resultSeriesList, err := evaluateTarget(ctx, newTarget) + + if err != nil { + mu.Lock() + multiErr = multiErr.Add(err) + mu.Unlock() + } + + for _, resultSeries := range resultSeriesList.Values { + if newName != "" { + resultSeries = resultSeries.RenamedTo(strings.ReplaceAll(newName, "%", prefix)) + } + resultSeries.Specification = prefix + output = append(output, resultSeries) + } + + wg.Done() + }() } + wg.Wait() } - } - - - r := ts.SeriesList(seriesList) + r := ts.NewSeriesList() r.Values = output return r, nil } -/* -for _, series := range seriesList.Values { - var ( - name = series.Name() - - partsSeen int - prefix string - ) - - for i, c := range name { - if c == '.' { - partsSeen++ - if partsSeen == nodeNum+1 { - prefix = name[:i] - break - } - } - } - - if len(prefix) == 0 { - continue - } - - prefixMap[prefix] = struct{}{} - -*/ // groupByNode takes a serieslist and maps a callback to subgroups within as defined by a common node // From 5dc9b9e941e068a728567c33d97e9c673740f565 Mon Sep 17 00:00:00 2001 From: teddywahle Date: Tue, 29 Sep 2020 13:36:10 -0700 Subject: [PATCH 11/17] ran go fmt --- .../graphite/native/aggregation_functions.go | 58 +++++++++---------- .../native/aggregation_functions_test.go | 10 ++-- 2 files changed, 33 insertions(+), 35 deletions(-) diff --git a/src/query/graphite/native/aggregation_functions.go b/src/query/graphite/native/aggregation_functions.go index 2acb803b2b..b1a42e861e 100644 --- a/src/query/graphite/native/aggregation_functions.go +++ b/src/query/graphite/native/aggregation_functions.go @@ -345,7 +345,7 @@ func applyByNode(ctx *common.Context, seriesList singlePathSpec, nodeNum int, te prefixMap := map[string]struct{}{} for _, series := range seriesList.Values { var ( - name = series.Name() + name = series.Name() partsSeen int prefix string @@ -368,7 +368,6 @@ func applyByNode(ctx *common.Context, seriesList singlePathSpec, nodeNum int, te prefixMap[prefix] = struct{}{} } - // transform to slice var prefixes []string for p := range prefixMap { @@ -379,45 +378,44 @@ func applyByNode(ctx *common.Context, seriesList singlePathSpec, nodeNum int, te var output []*ts.Series maxConcurrency := runtime.NumCPU() / 2 for _, prefixChunk := range chunkArrayHelper(prefixes, maxConcurrency) { - var ( - mu sync.Mutex - wg sync.WaitGroup - multiErr xerrors.MultiError - ) + var ( + mu sync.Mutex + wg sync.WaitGroup + multiErr xerrors.MultiError + ) - for i, prefix := range prefixChunk { - _, prefix := i, prefix - newTarget := strings.ReplaceAll(templateFunction, "%", prefix) - wg.Add(1) - go func() { - resultSeriesList, err := evaluateTarget(ctx, newTarget) - - if err != nil { - mu.Lock() - multiErr = multiErr.Add(err) - mu.Unlock() - } + for i, prefix := range prefixChunk { + _, prefix := i, prefix + newTarget := strings.ReplaceAll(templateFunction, "%", prefix) + wg.Add(1) + go func() { + resultSeriesList, err := evaluateTarget(ctx, newTarget) + + if err != nil { + mu.Lock() + multiErr = multiErr.Add(err) + mu.Unlock() + } - for _, resultSeries := range resultSeriesList.Values { - if newName != "" { - resultSeries = resultSeries.RenamedTo(strings.ReplaceAll(newName, "%", prefix)) - } - resultSeries.Specification = prefix - output = append(output, resultSeries) + for _, resultSeries := range resultSeriesList.Values { + if newName != "" { + resultSeries = resultSeries.RenamedTo(strings.ReplaceAll(newName, "%", prefix)) } + resultSeries.Specification = prefix + output = append(output, resultSeries) + } - wg.Done() - }() - } - wg.Wait() + wg.Done() + }() } + wg.Wait() + } r := ts.NewSeriesList() r.Values = output return r, nil } - // groupByNode takes a serieslist and maps a callback to subgroups within as defined by a common node // // &target=groupByNode(foo.by-function.*.*.cpu.load5,2,"sumSeries") diff --git a/src/query/graphite/native/aggregation_functions_test.go b/src/query/graphite/native/aggregation_functions_test.go index ca966fc2eb..35aa03857c 100644 --- a/src/query/graphite/native/aggregation_functions_test.go +++ b/src/query/graphite/native/aggregation_functions_test.go @@ -155,7 +155,7 @@ func (e mockEngine) FetchByQuery( } func (e mockEngine) Storage() storage.Storage { - return e.storage; + return e.storage } func TestVariadicSumSeries(t *testing.T) { @@ -429,8 +429,8 @@ func TestApplyByNode(t *testing.T) { store.EXPECT().FetchByQuery(gomock.Any(), "servers.s1.disk.bytes_*", gomock.Any()).Return( &storage.FetchResult{SeriesList: []*ts.Series{ts.NewSeries(ctx, "servers.s1.disk.bytes_free", start, common.NewTestSeriesValues(ctx, 60000, []float64{90, 80, 70})), - ts.NewSeries(ctx, "servers.s1.disk.bytes_used", start, - common.NewTestSeriesValues(ctx, 60000, []float64{10, 20, 30}))}}, nil).Times(2) + ts.NewSeries(ctx, "servers.s1.disk.bytes_used", start, + common.NewTestSeriesValues(ctx, 60000, []float64{10, 20, 30}))}}, nil).Times(2) store.EXPECT().FetchByQuery(gomock.Any(), "servers.s2.disk.bytes_used", gomock.Any()).Return( &storage.FetchResult{SeriesList: []*ts.Series{ts.NewSeries(ctx, "servers.s2.disk.bytes_used", start, @@ -439,9 +439,9 @@ func TestApplyByNode(t *testing.T) { store.EXPECT().FetchByQuery(gomock.Any(), "servers.s2.disk.bytes_*", gomock.Any()).Return( &storage.FetchResult{SeriesList: []*ts.Series{ ts.NewSeries(ctx, "servers.s2.disk.bytes_free", start, - common.NewTestSeriesValues(ctx, 60000, []float64{99, 98, 97})), + common.NewTestSeriesValues(ctx, 60000, []float64{99, 98, 97})), ts.NewSeries(ctx, "servers.s2.disk.bytes_used", start, - common.NewTestSeriesValues(ctx, 60000, []float64{1, 2, 3}))}}, nil).Times(2) + common.NewTestSeriesValues(ctx, 60000, []float64{1, 2, 3}))}}, nil).Times(2) tests := []struct { nodeNum int From 540bc13e46e5304cadb3f3f7ebffaf250c4f368c Mon Sep 17 00:00:00 2001 From: teddywahle Date: Wed, 30 Sep 2020 11:43:29 -0700 Subject: [PATCH 12/17] added update from code review --- src/query/graphite/common/engine.go | 7 ++--- .../graphite/native/aggregation_functions.go | 26 ++++++++++++------- .../native/aggregation_functions_test.go | 6 ++--- src/query/graphite/native/engine.go | 2 +- 4 files changed, 24 insertions(+), 17 deletions(-) diff --git a/src/query/graphite/common/engine.go b/src/query/graphite/common/engine.go index e70fd58845..fa77718467 100644 --- a/src/query/graphite/common/engine.go +++ b/src/query/graphite/common/engine.go @@ -27,12 +27,15 @@ import ( // QueryEngine is the generic engine interface. type QueryEngine interface { + + // FetchByQuery retrieves one or more time series based on a query. FetchByQuery( ctx context.Context, query string, options storage.FetchOptions, ) (*storage.FetchResult, error) + // Storage returns the engine's storage object. Storage() storage.Storage } @@ -48,7 +51,6 @@ func NewEngine(storage storage.Storage) *Engine { } } -// FetchByQuery retrieves one or more time series based on a query func (e *Engine) FetchByQuery( ctx context.Context, query string, @@ -57,7 +59,6 @@ func (e *Engine) FetchByQuery( return e.storage.FetchByQuery(ctx, query, options) } -// Storage returns the engine's storage object func (e *Engine) Storage() storage.Storage { - return e.storage; + return e.storage } diff --git a/src/query/graphite/native/aggregation_functions.go b/src/query/graphite/native/aggregation_functions.go index b1a42e861e..86d6a0245f 100644 --- a/src/query/graphite/native/aggregation_functions.go +++ b/src/query/graphite/native/aggregation_functions.go @@ -22,7 +22,6 @@ package native import ( "fmt" - xerrors "github.com/m3db/m3/src/x/errors" "math" "runtime" "sort" @@ -33,6 +32,7 @@ import ( "github.com/m3db/m3/src/query/graphite/common" "github.com/m3db/m3/src/query/graphite/errors" "github.com/m3db/m3/src/query/graphite/ts" + xerrors "github.com/m3db/m3/src/x/errors" ) func wrapPathExpr(wrapper string, series ts.SeriesList) string { @@ -375,28 +375,35 @@ func applyByNode(ctx *common.Context, seriesList singlePathSpec, nodeNum int, te } sort.Strings(prefixes) - var output []*ts.Series - maxConcurrency := runtime.NumCPU() / 2 + var ( + mu sync.Mutex + wg sync.WaitGroup + multiErr xerrors.MultiError + + output = make([]*ts.Series, len(prefixes)) + maxConcurrency = runtime.NumCPU() / 2 + ) for _, prefixChunk := range chunkArrayHelper(prefixes, maxConcurrency) { - var ( - mu sync.Mutex - wg sync.WaitGroup - multiErr xerrors.MultiError - ) + if multiErr.LastError() != nil { + return ts.NewSeriesList(), multiErr.LastError() + } for i, prefix := range prefixChunk { _, prefix := i, prefix newTarget := strings.ReplaceAll(templateFunction, "%", prefix) wg.Add(1) go func() { + defer wg.Done() resultSeriesList, err := evaluateTarget(ctx, newTarget) if err != nil { mu.Lock() multiErr = multiErr.Add(err) mu.Unlock() + return } + mu.Lock() for _, resultSeries := range resultSeriesList.Values { if newName != "" { resultSeries = resultSeries.RenamedTo(strings.ReplaceAll(newName, "%", prefix)) @@ -404,8 +411,7 @@ func applyByNode(ctx *common.Context, seriesList singlePathSpec, nodeNum int, te resultSeries.Specification = prefix output = append(output, resultSeries) } - - wg.Done() + mu.Unlock() }() } wg.Wait() diff --git a/src/query/graphite/native/aggregation_functions_test.go b/src/query/graphite/native/aggregation_functions_test.go index 35aa03857c..f3d81a6def 100644 --- a/src/query/graphite/native/aggregation_functions_test.go +++ b/src/query/graphite/native/aggregation_functions_test.go @@ -22,8 +22,6 @@ package native import ( "fmt" - "github.com/golang/mock/gomock" - xgomock "github.com/m3db/m3/src/x/test" "math" "sort" "testing" @@ -34,9 +32,11 @@ import ( "github.com/m3db/m3/src/query/graphite/context" "github.com/m3db/m3/src/query/graphite/storage" "github.com/m3db/m3/src/query/graphite/ts" - + xgomock "github.com/m3db/m3/src/x/test" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" + + "github.com/golang/mock/gomock" ) var ( diff --git a/src/query/graphite/native/engine.go b/src/query/graphite/native/engine.go index 6a78535f4e..34a6efb643 100644 --- a/src/query/graphite/native/engine.go +++ b/src/query/graphite/native/engine.go @@ -55,5 +55,5 @@ func (e *Engine) Compile(s string) (Expression, error) { // Storage returns the engine's storage object func (e *Engine) Storage() storage.Storage { - return e.storage; + return e.storage } From 97f4014a48120a0b22303e9318e42071ca362d21 Mon Sep 17 00:00:00 2001 From: teddywahle <69990143+teddywahle@users.noreply.github.com> Date: Wed, 30 Sep 2020 11:45:00 -0700 Subject: [PATCH 13/17] Update src/query/graphite/native/aggregation_functions_test.go --- src/query/graphite/native/aggregation_functions_test.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/query/graphite/native/aggregation_functions_test.go b/src/query/graphite/native/aggregation_functions_test.go index f3d81a6def..2dae34f374 100644 --- a/src/query/graphite/native/aggregation_functions_test.go +++ b/src/query/graphite/native/aggregation_functions_test.go @@ -155,7 +155,7 @@ func (e mockEngine) FetchByQuery( } func (e mockEngine) Storage() storage.Storage { - return e.storage + return nil } func TestVariadicSumSeries(t *testing.T) { From 6913aa0156be97891e835c07346701355a6efbbd Mon Sep 17 00:00:00 2001 From: teddywahle <69990143+teddywahle@users.noreply.github.com> Date: Mon, 5 Oct 2020 11:30:15 -0700 Subject: [PATCH 14/17] Apply suggestions from code review --- src/query/graphite/native/aggregation_functions.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/query/graphite/native/aggregation_functions.go b/src/query/graphite/native/aggregation_functions.go index 86d6a0245f..2f3b1c1145 100644 --- a/src/query/graphite/native/aggregation_functions.go +++ b/src/query/graphite/native/aggregation_functions.go @@ -283,7 +283,7 @@ func combineSeriesWithWildcards( // splits a slice into chunks func chunkArrayHelper(slice []string, numChunks int) [][]string { - var divided [][]string + divided := make([][]string, 0, numChunks) chunkSize := (len(slice) + numChunks - 1) / numChunks @@ -389,7 +389,7 @@ func applyByNode(ctx *common.Context, seriesList singlePathSpec, nodeNum int, te } for i, prefix := range prefixChunk { - _, prefix := i, prefix + prefix := prefix newTarget := strings.ReplaceAll(templateFunction, "%", prefix) wg.Add(1) go func() { From 6cb2739133c4e33598a21575bcbc9706fc4ca546 Mon Sep 17 00:00:00 2001 From: teddywahle Date: Mon, 5 Oct 2020 11:34:16 -0700 Subject: [PATCH 15/17] added nit changes and ran go fmt --- src/query/graphite/native/aggregation_functions.go | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/src/query/graphite/native/aggregation_functions.go b/src/query/graphite/native/aggregation_functions.go index 2f3b1c1145..419452eb71 100644 --- a/src/query/graphite/native/aggregation_functions.go +++ b/src/query/graphite/native/aggregation_functions.go @@ -388,8 +388,7 @@ func applyByNode(ctx *common.Context, seriesList singlePathSpec, nodeNum int, te return ts.NewSeriesList(), multiErr.LastError() } - for i, prefix := range prefixChunk { - prefix := prefix + for _, prefix := range prefixChunk { newTarget := strings.ReplaceAll(templateFunction, "%", prefix) wg.Add(1) go func() { From 59a29b1d8651a2521e03188e5258920276412854 Mon Sep 17 00:00:00 2001 From: teddywahle Date: Mon, 5 Oct 2020 13:03:56 -0700 Subject: [PATCH 16/17] fixed output init --- src/query/graphite/native/aggregation_functions.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/query/graphite/native/aggregation_functions.go b/src/query/graphite/native/aggregation_functions.go index 419452eb71..a52108fc90 100644 --- a/src/query/graphite/native/aggregation_functions.go +++ b/src/query/graphite/native/aggregation_functions.go @@ -379,8 +379,8 @@ func applyByNode(ctx *common.Context, seriesList singlePathSpec, nodeNum int, te mu sync.Mutex wg sync.WaitGroup multiErr xerrors.MultiError + output []*ts.Series - output = make([]*ts.Series, len(prefixes)) maxConcurrency = runtime.NumCPU() / 2 ) for _, prefixChunk := range chunkArrayHelper(prefixes, maxConcurrency) { From 34b60d6d2c8108de410f6140688a184fd20d45bd Mon Sep 17 00:00:00 2001 From: teddywahle Date: Mon, 5 Oct 2020 13:13:27 -0700 Subject: [PATCH 17/17] proper allocation --- src/query/graphite/native/aggregation_functions.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/query/graphite/native/aggregation_functions.go b/src/query/graphite/native/aggregation_functions.go index a52108fc90..34f72dbeeb 100644 --- a/src/query/graphite/native/aggregation_functions.go +++ b/src/query/graphite/native/aggregation_functions.go @@ -379,8 +379,8 @@ func applyByNode(ctx *common.Context, seriesList singlePathSpec, nodeNum int, te mu sync.Mutex wg sync.WaitGroup multiErr xerrors.MultiError - output []*ts.Series + output = make([]*ts.Series, 0, len(prefixes)) maxConcurrency = runtime.NumCPU() / 2 ) for _, prefixChunk := range chunkArrayHelper(prefixes, maxConcurrency) {