Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[query] Implemented the Graphite applyByNode function #2654

Merged
merged 31 commits into from
Oct 5, 2020
Merged
Show file tree
Hide file tree
Changes from 23 commits
Commits
Show all changes
31 commits
Select commit Hold shift + click to select a range
e4e0185
added the graphite applyByNode function
teddywahle Sep 22, 2020
744a685
Merge branch 'master' into graphite-apply-by-node
teddywahle Sep 22, 2020
a81a3af
ran go fmt
teddywahle Sep 22, 2020
04c878f
Merge branch 'master' into graphite-apply-by-node
teddywahle Sep 23, 2020
b269fed
Merge branch 'master' into graphite-apply-by-node
teddywahle Sep 23, 2020
a6cc182
Merge branch 'master' into graphite-apply-by-node
teddywahle Sep 28, 2020
122564b
fixed applyByNode
teddywahle Sep 28, 2020
17f0f0b
Merge branch 'master' into graphite-apply-by-node
teddywahle Sep 28, 2020
71567f2
Update aggregation_functions.go
teddywahle Sep 29, 2020
081c11f
Apply suggestions from code review
teddywahle Sep 29, 2020
5b91c51
Update src/query/graphite/native/engine.go
teddywahle Sep 29, 2020
c57bd84
Update aggregation_functions.go
teddywahle Sep 29, 2020
8a4d9cf
Update aggregation_functions.go
teddywahle Sep 29, 2020
8095b6c
added changes for artem
teddywahle Sep 29, 2020
fdd37c1
updated useSeriesAbove
teddywahle Sep 29, 2020
ac0a7d9
Merge branch 'master' into graphite-apply-by-node
teddywahle Sep 29, 2020
5dc9b9e
ran go fmt
teddywahle Sep 29, 2020
6153026
:wMerge branch 'graphite-apply-by-node' of https://github.com/teddywa…
teddywahle Sep 29, 2020
40d8ffd
Merge branch 'master' into graphite-apply-by-node
teddywahle Sep 30, 2020
540bc13
added update from code review
teddywahle Sep 30, 2020
97f4014
Update src/query/graphite/native/aggregation_functions_test.go
teddywahle Sep 30, 2020
02ee52c
Merge branch 'master' into graphite-apply-by-node
teddywahle Oct 1, 2020
7c70f70
Merge branch 'master' into graphite-apply-by-node
teddywahle Oct 1, 2020
6913aa0
Apply suggestions from code review
teddywahle Oct 5, 2020
5adc578
Merge branch 'master' into graphite-apply-by-node
teddywahle Oct 5, 2020
6cb2739
added nit changes and ran go fmt
teddywahle Oct 5, 2020
b728e99
Merge branch 'master' into graphite-apply-by-node
teddywahle Oct 5, 2020
7c3eae0
Merge branch 'master' into graphite-apply-by-node
teddywahle Oct 5, 2020
59a29b1
fixed output init
teddywahle Oct 5, 2020
34b60d6
proper allocation
teddywahle Oct 5, 2020
da2d280
Merge branch 'master' into graphite-apply-by-node
teddywahle Oct 5, 2020
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 9 additions & 1 deletion src/query/graphite/common/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,11 +27,16 @@ import (

// QueryEngine is the generic engine interface.
type QueryEngine interface {

// FetchByQuery retrieves one or more time series based on a query.
FetchByQuery(
ctx context.Context,
query string,
options storage.FetchOptions,
) (*storage.FetchResult, error)

// Storage returns the engine's storage object.
Storage() storage.Storage
}

// The Engine for running queries
Expand All @@ -46,11 +51,14 @@ func NewEngine(storage storage.Storage) *Engine {
}
}

// FetchByQuery retrieves one or more time series based on a query
func (e *Engine) FetchByQuery(
ctx context.Context,
query string,
options storage.FetchOptions,
) (*storage.FetchResult, error) {
return e.storage.FetchByQuery(ctx, query, options)
}

teddywahle marked this conversation as resolved.
Show resolved Hide resolved
func (e *Engine) Storage() storage.Storage {
return e.storage
}
154 changes: 149 additions & 5 deletions src/query/graphite/native/aggregation_functions.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,12 +23,16 @@ package native
import (
"fmt"
"math"
"runtime"
"sort"
"strings"
"sync"

"github.com/m3db/m3/src/query/graphite/common"
"github.com/m3db/m3/src/query/block"
"github.com/m3db/m3/src/query/graphite/common"
"github.com/m3db/m3/src/query/graphite/errors"
"github.com/m3db/m3/src/query/graphite/ts"
xerrors "github.com/m3db/m3/src/x/errors"
)

func wrapPathExpr(wrapper string, series ts.SeriesList) string {
Expand Down Expand Up @@ -111,7 +115,7 @@ func divideSeriesHelper(ctx *common.Context, dividendSeries, divisorSeries *ts.S
vals.SetValueAt(i, value)
}
}

// The individual series will be named divideSeries(X, X), even if it is generated by divideSeriesLists
// Based on Graphite source code (link below)
// https://github.com/graphite-project/graphite-web/blob/17a34e7966f7a46eded30c2362765c74eea899cb/webapp/graphite/render/functions.py#L901
Expand All @@ -137,7 +141,7 @@ func divideSeries(ctx *common.Context, dividendSeriesList, divisorSeriesList sin
divisorSeries := divisorSeriesList.Values[0]
results := make([]*ts.Series, len(dividendSeriesList.Values))
for idx, dividendSeries := range dividendSeriesList.Values {
metadata := divisorSeriesList.Metadata.CombineMetadata(dividendSeriesList.Metadata)
metadata := divisorSeriesList.Metadata.CombineMetadata(dividendSeriesList.Metadata)
quotientSeries, err := divideSeriesHelper(ctx, dividendSeries, divisorSeries, metadata)
if err != nil {
return ts.NewSeriesList(), err
Expand All @@ -150,7 +154,6 @@ func divideSeries(ctx *common.Context, dividendSeriesList, divisorSeriesList sin
return r, nil
}


// divideSeriesLists divides one series list by another series list
func divideSeriesLists(ctx *common.Context, dividendSeriesList, divisorSeriesList singlePathSpec) (ts.SeriesList, error) {
if len(dividendSeriesList.Values) != len(divisorSeriesList.Values) {
Expand All @@ -161,7 +164,7 @@ func divideSeriesLists(ctx *common.Context, dividendSeriesList, divisorSeriesLis
results := make([]*ts.Series, len(dividendSeriesList.Values))
for idx, dividendSeries := range dividendSeriesList.Values {
divisorSeries := divisorSeriesList.Values[idx]
metadata := divisorSeriesList.Metadata.CombineMetadata(dividendSeriesList.Metadata)
metadata := divisorSeriesList.Metadata.CombineMetadata(dividendSeriesList.Metadata)
quotientSeries, err := divideSeriesHelper(ctx, dividendSeries, divisorSeries, metadata)
if err != nil {
return ts.NewSeriesList(), err
Expand Down Expand Up @@ -278,6 +281,147 @@ func combineSeriesWithWildcards(
return r, nil
}

// splits a slice into chunks
func chunkArrayHelper(slice []string, numChunks int) [][]string {
var divided [][]string
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: try to init with a size to avoid having go grow it automatically, since it does a silly doubling thing which can lead to a lot of allocs (each time capacity increases it allocs a new slice, copies, then frees the old one): https://play.golang.org/p/HaEISY70Ar8

e.g. divided := make([][]string, 0, numChunks)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done!

teddywahle marked this conversation as resolved.
Show resolved Hide resolved

chunkSize := (len(slice) + numChunks - 1) / numChunks

for i := 0; i < len(slice); i += chunkSize {
end := i + chunkSize

if end > len(slice) {
end = len(slice)
}

divided = append(divided, slice[i:end])
}

return divided
}

func evaluateTarget(ctx *common.Context, target string) (ts.SeriesList, error) {
eng := NewEngine(ctx.Engine.Storage())
expression, err := eng.Compile(target)
if err != nil {
return ts.NewSeriesList(), err
}
return expression.Execute(ctx)
}

/*
applyByNode takes a seriesList and applies some complicated function (described by a string), replacing templates with unique
prefixes of keys from the seriesList (the key is all nodes up to the index given as `nodeNum`).

If the `newName` parameter is provided, the name of the resulting series will be given by that parameter, with any
"%" characters replaced by the unique prefix.

Example:

`applyByNode(servers.*.disk.bytes_free,1,"divideSeries(%.disk.bytes_free,sumSeries(%.disk.bytes_*))")`

Would find all series which match `servers.*.disk.bytes_free`, then trim them down to unique series up to the node
given by nodeNum, then fill them into the template function provided (replacing % by the prefixes).

Additional Examples:

Given keys of

- `stats.counts.haproxy.web.2XX`
- `stats.counts.haproxy.web.3XX`
- `stats.counts.haproxy.web.5XX`
- `stats.counts.haproxy.microservice.2XX`
- `stats.counts.haproxy.microservice.3XX`
- `stats.counts.haproxy.microservice.5XX`

The following will return the rate of 5XX's per service:

`applyByNode(stats.counts.haproxy.*.*XX, 3, "asPercent(%.5XX, sumSeries(%.*XX))", "%.pct_5XX")`

The output series would have keys `stats.counts.haproxy.web.pct_5XX` and `stats.counts.haproxy.microservice.pct_5XX`.
*/
func applyByNode(ctx *common.Context, seriesList singlePathSpec, nodeNum int, templateFunction string, newName string) (ts.SeriesList, error) {
// using this as a set
prefixMap := map[string]struct{}{}
for _, series := range seriesList.Values {
var (
name = series.Name()

partsSeen int
prefix string
)

for i, c := range name {
if c == '.' {
partsSeen++
if partsSeen == nodeNum+1 {
prefix = name[:i]
break
}
}
}

if len(prefix) == 0 {
continue
}

prefixMap[prefix] = struct{}{}
}

// transform to slice
var prefixes []string
for p := range prefixMap {
prefixes = append(prefixes, p)
}
sort.Strings(prefixes)

var (
mu sync.Mutex
wg sync.WaitGroup
multiErr xerrors.MultiError

output = make([]*ts.Series, len(prefixes))
maxConcurrency = runtime.NumCPU() / 2
)
for _, prefixChunk := range chunkArrayHelper(prefixes, maxConcurrency) {
if multiErr.LastError() != nil {
return ts.NewSeriesList(), multiErr.LastError()
}

arnikola marked this conversation as resolved.
Show resolved Hide resolved
for i, prefix := range prefixChunk {
_, prefix := i, prefix
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit; Can probably drop using i here, and just capture the prefix itself

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I just deleted this line altogether. I don't know why I included it. I am not modifying prefix at all.

teddywahle marked this conversation as resolved.
Show resolved Hide resolved
newTarget := strings.ReplaceAll(templateFunction, "%", prefix)
wg.Add(1)
go func() {
defer wg.Done()
resultSeriesList, err := evaluateTarget(ctx, newTarget)

if err != nil {
mu.Lock()
multiErr = multiErr.Add(err)
mu.Unlock()
return
}

mu.Lock()
for _, resultSeries := range resultSeriesList.Values {
arnikola marked this conversation as resolved.
Show resolved Hide resolved
if newName != "" {
resultSeries = resultSeries.RenamedTo(strings.ReplaceAll(newName, "%", prefix))
}
resultSeries.Specification = prefix
output = append(output, resultSeries)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This needs to be added within the lock since it's appending rather than inserting at an index. Probably better to lock the entire loop across values here

Copy link
Contributor Author

@teddywahle teddywahle Sep 30, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

                                 mu.Lock()
				for _, resultSeries := range resultSeriesList.Values {
					if newName != "" {
						resultSeries = resultSeries.RenamedTo(strings.ReplaceAll(newName, "%", prefix))
					}
					resultSeries.Specification = prefix
					output = append(output, resultSeries)
				}
				mu.Unlock()

does this look right?

}
mu.Unlock()
}()
}
wg.Wait()
}

r := ts.NewSeriesList()
r.Values = output
return r, nil
}

// groupByNode takes a serieslist and maps a callback to subgroups within as defined by a common node
//
// &target=groupByNode(foo.by-function.*.*.cpu.load5,2,"sumSeries")
Expand Down
112 changes: 110 additions & 2 deletions src/query/graphite/native/aggregation_functions_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,9 +32,11 @@ import (
"github.com/m3db/m3/src/query/graphite/context"
"github.com/m3db/m3/src/query/graphite/storage"
"github.com/m3db/m3/src/query/graphite/ts"

xgomock "github.com/m3db/m3/src/x/test"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"

"github.com/golang/mock/gomock"
)

var (
Expand Down Expand Up @@ -140,6 +142,8 @@ type mockEngine struct {
query string,
options storage.FetchOptions,
) (*storage.FetchResult, error)

storage storage.Storage
}

func (e mockEngine) FetchByQuery(
Expand All @@ -150,6 +154,10 @@ func (e mockEngine) FetchByQuery(
return e.fn(ctx, query, opts)
}

func (e mockEngine) Storage() storage.Storage {
return nil
}

func TestVariadicSumSeries(t *testing.T) {
expr, err := Compile("sumSeries(foo.bar.*, foo.baz.*)")
require.NoError(t, err)
Expand Down Expand Up @@ -314,7 +322,6 @@ func TestDivideSeriesLists(t *testing.T) {
require.Error(t, err)
}


func TestAverageSeriesWithWildcards(t *testing.T) {
ctx, _ := newConsolidationTestSeries()
defer ctx.Close()
Expand Down Expand Up @@ -391,6 +398,107 @@ func TestSumSeriesWithWildcards(t *testing.T) {
}
}

func TestApplyByNode(t *testing.T) {
var (
ctrl = xgomock.NewController(t)
store = storage.NewMockStorage(ctrl)
engine = NewEngine(store)
start, _ = time.Parse(time.RFC1123, "Mon, 27 Jul 2015 19:41:19 GMT")
end, _ = time.Parse(time.RFC1123, "Mon, 27 Jul 2015 19:43:19 GMT")
ctx = common.NewContext(common.ContextOptions{Start: start, End: end, Engine: engine})
millisPerStep = 60000
inputs = []*ts.Series{
ts.NewSeries(ctx, "servers.s1.disk.bytes_used", start,
common.NewTestSeriesValues(ctx, millisPerStep, []float64{10, 20, 30})),
ts.NewSeries(ctx, "servers.s1.disk.bytes_free", start,
common.NewTestSeriesValues(ctx, millisPerStep, []float64{90, 80, 70})),
ts.NewSeries(ctx, "servers.s2.disk.bytes_used", start,
common.NewTestSeriesValues(ctx, millisPerStep, []float64{1, 2, 3})),
ts.NewSeries(ctx, "servers.s2.disk.bytes_free", start,
common.NewTestSeriesValues(ctx, millisPerStep, []float64{99, 98, 97})),
}
)

defer ctrl.Finish()
defer ctx.Close()

store.EXPECT().FetchByQuery(gomock.Any(), "servers.s1.disk.bytes_used", gomock.Any()).Return(
&storage.FetchResult{SeriesList: []*ts.Series{ts.NewSeries(ctx, "servers.s1.disk.bytes_used", start,
common.NewTestSeriesValues(ctx, 60000, []float64{10, 20, 30}))}}, nil).Times(2)

store.EXPECT().FetchByQuery(gomock.Any(), "servers.s1.disk.bytes_*", gomock.Any()).Return(
&storage.FetchResult{SeriesList: []*ts.Series{ts.NewSeries(ctx, "servers.s1.disk.bytes_free", start,
common.NewTestSeriesValues(ctx, 60000, []float64{90, 80, 70})),
ts.NewSeries(ctx, "servers.s1.disk.bytes_used", start,
common.NewTestSeriesValues(ctx, 60000, []float64{10, 20, 30}))}}, nil).Times(2)

store.EXPECT().FetchByQuery(gomock.Any(), "servers.s2.disk.bytes_used", gomock.Any()).Return(
&storage.FetchResult{SeriesList: []*ts.Series{ts.NewSeries(ctx, "servers.s2.disk.bytes_used", start,
common.NewTestSeriesValues(ctx, 60000, []float64{1, 2, 3}))}}, nil).Times(2)

store.EXPECT().FetchByQuery(gomock.Any(), "servers.s2.disk.bytes_*", gomock.Any()).Return(
&storage.FetchResult{SeriesList: []*ts.Series{
ts.NewSeries(ctx, "servers.s2.disk.bytes_free", start,
common.NewTestSeriesValues(ctx, 60000, []float64{99, 98, 97})),
ts.NewSeries(ctx, "servers.s2.disk.bytes_used", start,
common.NewTestSeriesValues(ctx, 60000, []float64{1, 2, 3}))}}, nil).Times(2)

tests := []struct {
nodeNum int
templateFunction string
newName string
expectedResults []common.TestSeries
}{
{
nodeNum: 1,
templateFunction: "divideSeries(%.disk.bytes_used, sumSeries(%.disk.bytes_*))",
newName: "",
expectedResults: []common.TestSeries{
{
Name: "divideSeries(servers.s1.disk.bytes_used,sumSeries(servers.s1.disk.bytes_*))",
Data: []float64{0.10, 0.20, 0.30},
},
{
Name: "divideSeries(servers.s2.disk.bytes_used,sumSeries(servers.s2.disk.bytes_*))",
Data: []float64{0.01, 0.02, 0.03},
},
},
},
{
nodeNum: 1,
templateFunction: "divideSeries(%.disk.bytes_used, sumSeries(%.disk.bytes_*))",
newName: "%.disk.pct_used",
expectedResults: []common.TestSeries{
{
Name: "servers.s1.disk.pct_used",
Data: []float64{0.10, 0.20, 0.30},
},
{
Name: "servers.s2.disk.pct_used",
Data: []float64{0.01, 0.02, 0.03},
},
},
},
}

for _, test := range tests {
outSeries, err := applyByNode(
ctx,
singlePathSpec{
Values: inputs,
},
test.nodeNum,
test.templateFunction,
test.newName,
)
require.NoError(t, err)
require.Equal(t, len(test.expectedResults), len(outSeries.Values))

outSeries, _ = sortByName(ctx, singlePathSpec(outSeries))
common.CompareOutputsAndExpected(t, 60000, start, test.expectedResults, outSeries.Values)
}
}

func TestAggregateWithWildcards(t *testing.T) {
var (
start, _ = time.Parse(time.RFC1123, "Mon, 27 Jul 2015 19:41:19 GMT")
Expand Down
3 changes: 3 additions & 0 deletions src/query/graphite/native/builtin_functions.go
Original file line number Diff line number Diff line change
Expand Up @@ -2258,6 +2258,9 @@ func init() {
MustRegisterFunction(aliasByMetric)
MustRegisterFunction(aliasByNode)
MustRegisterFunction(aliasSub)
MustRegisterFunction(applyByNode).WithDefaultParams(map[uint8]interface{}{
4: "", // newName
})
MustRegisterFunction(asPercent).WithDefaultParams(map[uint8]interface{}{
2: []*ts.Series(nil), // total
})
Expand Down
Loading