Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[query] clean up logic of groupByNodes, implement nodes param in asPercent #2816

Merged
merged 20 commits into from
Nov 1, 2020
Merged
Show file tree
Hide file tree
Changes from 14 commits
Commits
Show all changes
20 commits
Select commit Hold shift + click to select a range
6e5bbdc
Added the graphite sortByMinima function
teddywahle Sep 23, 2020
253ff2a
Merge branch 'master' of github.com:m3db/m3
teddywahle Sep 30, 2020
1eff3a7
Merge branch 'master' of github.com:m3db/m3
teddywahle Oct 5, 2020
447b7b8
Merge branch 'master' of github.com:m3db/m3
teddywahle Oct 8, 2020
c8c7dfc
complete merge
teddywahle Oct 9, 2020
ca2e4f3
Merge branch 'master' of github.com:m3db/m3
teddywahle Oct 12, 2020
6a94fda
Merge branch 'master' of https://github.com/teddywahle/m3
teddywahle Oct 14, 2020
1fe1a35
added line to fix time bounds bug
teddywahle Oct 16, 2020
0335a6f
resolved merge conflicts
teddywahle Oct 19, 2020
6f8a890
fixed bug in percentileOfSeries
teddywahle Oct 20, 2020
9ccfd78
Merge branch 'master' of github.com:m3db/m3
teddywahle Oct 20, 2020
b021795
Merge branch 'master' of https://github.com/teddywahle/m3
teddywahle Oct 26, 2020
37def27
fixed bugs in asPercent and groupbyNodes
teddywahle Oct 28, 2020
ad3bcca
Merge branch 'master' into twahle-fix-aggregate-funcs
teddywahle Oct 28, 2020
b0c0371
Merge branch 'master' into twahle-fix-aggregate-funcs
teddywahle Oct 29, 2020
28f3334
Update src/query/graphite/native/aggregation_functions.go
teddywahle Oct 29, 2020
9391a58
Apply suggestions from code review
teddywahle Oct 29, 2020
a96a3ff
updated groupByNodes logic
teddywahle Oct 29, 2020
46125b5
Merge branch 'master' into twahle-fix-aggregate-funcs
teddywahle Oct 29, 2020
d5cff29
Merge branch 'master' into twahle-fix-aggregate-funcs
teddywahle Nov 1, 2020
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
83 changes: 46 additions & 37 deletions src/query/graphite/native/aggregation_functions.go
Original file line number Diff line number Diff line change
Expand Up @@ -561,24 +561,54 @@ func applyByNode(ctx *common.Context, seriesList singlePathSpec, nodeNum int, te
//
// sumSeries(foo.by-function.server1.*.cpu.load5),sumSeries(foo.by-function.server2.*.cpu.load5),...
func groupByNode(ctx *common.Context, series singlePathSpec, node int, fname string) (ts.SeriesList, error) {
metaSeries := make(map[string][]*ts.Series)
for _, s := range series.Values {
parts := strings.Split(s.Name(), ".")
return groupByNodes(ctx, series, fname, []int{node}...)
}

func findFirstMetricExpression(seriesName string) string {
idxOfRightParen := strings.Index(seriesName, ")")
substring := seriesName[:idxOfRightParen]
idxOfLeftParen := strings.LastIndex(substring, "(")
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you make findFirstMetricExpression potentially return an error if either strings.Index or strings.LastIndex return a -1?

Otherwise it will cause out of bounds panic, better to always optionally return an error rather than panic.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe return (string, bool) and if either of them return a -1 then the second bool param value returns false.

That way you can do:

func getParts(series *ts.Series) {
  seriesName := series.Name()
  if metricExpr, ok := findFirstMetricExpression(seriesName); ok {
    seriesName = metricExpr
  }
  return strings.Split(seriesName, ".")
}

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done! Great idea.

return seriesName[idxOfLeftParen+1:idxOfRightParen]
}

func getParts(series *ts.Series) []string {
seriesName := series.Name()
if strings.Contains(seriesName, ")") {
seriesName = findFirstMetricExpression(seriesName)
}

return strings.Split(seriesName, ".")
teddywahle marked this conversation as resolved.
Show resolved Hide resolved

n := node
}
func getAggregationKey(series *ts.Series, nodes []int) string {
parts := getParts(series)

var keys []string
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you pre-allocate this perhaps?

keys := make([]string, 0, len(nodes))

teddywahle marked this conversation as resolved.
Show resolved Hide resolved
for _, n := range nodes {
if n < 0 {
n = len(parts) + n
}

if n >= len(parts) || n < 0 {
return aggregate(ctx, series, fname)
if n < len(parts) {
keys = append(keys, parts[n])
Copy link
Collaborator

@robskillington robskillington Oct 29, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For graphite-web just an empty string is used if n is not within the range (see IndexError simply will "pass" which then tries to get the value from graphite tags or if not found returns empty string""):
https://github.com/graphite-project/graphite-web/blob/53a50b584016b889310eff346a2de01779613644/webapp/graphite/render/functions.py#L98-L115

To mimic same behavior instead of leaving it from the final key produced, I believe you want an else here:

if n < len(parts) {
  keys = append(keys, parts[n])
} else {
  keys = append(keys, "")
}

That way you'll get "foo...bar" for two missing in the middle which better represents the true key since that part was missing and now won't collide with other values missing a value in a different key.

}
}
return strings.Join(keys, ".")
}

func getMetaSeriesGrouping(seriesList singlePathSpec, nodes []int) map[string][]*ts.Series {
metaSeries := make(map[string][]*ts.Series)

key := parts[n]
metaSeries[key] = append(metaSeries[key], s)
if len(nodes) > 0 {
for _, s := range seriesList.Values {
key := getAggregationKey(s, nodes)
if key != "" {
Copy link
Collaborator

@robskillington robskillington Oct 29, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

graphite-web does not seem to perform this behavior, let's make this match graphite-web and always add the key to the grouping.

metaSeries[key] = append(metaSeries[key], s)
}
}
}

return applyFnToMetaSeries(ctx, series, metaSeries, fname)
return metaSeries
}

// Takes a serieslist and maps a callback to subgroups within as defined by multiple nodes
Expand All @@ -592,37 +622,16 @@ func groupByNode(ctx *common.Context, series singlePathSpec, node int, fname str
// sumSeries(ganglia.server2.*.cpu.load5),sumSeries(ganglia.server2.*.cpu.load10),sumSeries(ganglia.server2.*.cpu.load15),...
//
// NOTE: if len(nodes) = 0, aggregate all series into 1 series.
func groupByNodes(ctx *common.Context, series singlePathSpec, fname string, nodes ...int) (ts.SeriesList, error) {
metaSeries := make(map[string][]*ts.Series)

nodeLen := len(nodes)
if nodeLen == 0 {
key := "*" // put into single group, not ideal, but more graphite-ish.
for _, s := range series.Values {
metaSeries[key] = append(metaSeries[key], s)
}
} else {
for _, s := range series.Values {
parts := strings.Split(s.Name(), ".")
func groupByNodes(ctx *common.Context, seriesList singlePathSpec, fname string, nodes ...int) (ts.SeriesList, error) {
metaSeries := getMetaSeriesGrouping(seriesList, nodes)

var keys []string
for _, n := range nodes {
if n < 0 {
n = len(parts) + n
}

if n >= len(parts) || n < 0 {
return aggregate(ctx, series, fname)
}

keys = append(keys, parts[n])
}
key := strings.Join(keys, ".")
metaSeries[key] = append(metaSeries[key], s)
}
if len(metaSeries) == 0 {
// if nodes is an empty slice or every node in nodes exceeds the number
// of parts in each series, just treat it like aggregate
return aggregate(ctx, seriesList, fname)
}

return applyFnToMetaSeries(ctx, series, metaSeries, fname)
return applyFnToMetaSeries(ctx, seriesList, metaSeries, fname)
}

func applyFnToMetaSeries(ctx *common.Context, series singlePathSpec, metaSeries map[string][]*ts.Series, fname string) (ts.SeriesList, error) {
Expand Down
13 changes: 11 additions & 2 deletions src/query/graphite/native/aggregation_functions_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -712,7 +712,7 @@ func TestGroupByNodes(t *testing.T) {
end, _ = time.Parse(time.RFC1123, "Mon, 27 Jul 2015 19:43:19 GMT")
ctx = common.NewContext(common.ContextOptions{Start: start, End: end})
inputs = []*ts.Series{
ts.NewSeries(ctx, "servers.foo-1.pod1.status.500", start,
ts.NewSeries(ctx, "transformNull(servers.foo-1.pod1.status.500)", start,
ts.NewConstantValues(ctx, 2, 12, 10000)),
ts.NewSeries(ctx, "servers.foo-2.pod1.status.500", start,
ts.NewConstantValues(ctx, 4, 12, 10000)),
Expand Down Expand Up @@ -755,14 +755,23 @@ func TestGroupByNodes(t *testing.T) {
{"pod2.400", 40 * 12},
{"pod2.500", 10 * 12},
}},
{"max", []int{2, 4, 100}, []result{ // test with a node number that exceeds num parts
{"pod1.400", 30 * 12},
{"pod1.500", 6 * 12},
{"pod2.400", 40 * 12},
{"pod2.500", 10 * 12},
}},
{"min", []int{2, -1}, []result{ // test negative index handling
{"pod1.400", 20 * 12},
{"pod1.500", 2 * 12},
{"pod2.400", 40 * 12},
{"pod2.500", 8 * 12},
}},
{"sum", []int{}, []result{ // test empty slice handing.
{"*", (2 + 4 + 6 + 8 + 10 + 20 + 30 + 40) * 12},
{"sumSeries(transformNull(servers.foo-1.pod1.status.500),servers.foo-2.pod1.status.500,servers.foo-3.pod1.status.500,servers.foo-1.pod2.status.500,servers.foo-2.pod2.status.500,servers.foo-1.pod1.status.400,servers.foo-2.pod1.status.400,servers.foo-3.pod2.status.400)", (2 + 4 + 6 + 8 + 10 + 20 + 30 + 40) * 12},
}},
{"sum", []int{100}, []result{ // test all nodes out of bounds
{"sumSeries(transformNull(servers.foo-1.pod1.status.500),servers.foo-2.pod1.status.500,servers.foo-3.pod1.status.500,servers.foo-1.pod2.status.500,servers.foo-2.pod2.status.500,servers.foo-1.pod1.status.400,servers.foo-2.pod1.status.400,servers.foo-3.pod2.status.400)", (2 + 4 + 6 + 8 + 10 + 20 + 30 + 40) * 12},
}},
}

Expand Down
55 changes: 45 additions & 10 deletions src/query/graphite/native/builtin_functions.go
Original file line number Diff line number Diff line change
Expand Up @@ -990,7 +990,7 @@ func exponentialMovingAverage(ctx *common.Context, input singlePathSpec, windowS
}

// totalFunc takes an index and returns a total value for that index
type totalFunc func(int) float64
type totalFunc func(int, *ts.Series) float64

func totalBySum(seriesList []*ts.Series, index int) float64 {
s, hasValue := 0.0, false
Expand All @@ -1008,7 +1008,7 @@ func totalBySum(seriesList []*ts.Series, index int) float64 {
}

// asPercent calculates a percentage of the total of a wildcard series.
func asPercent(ctx *common.Context, input singlePathSpec, total genericInterface) (ts.SeriesList, error) {
func asPercent(ctx *common.Context, input singlePathSpec, total genericInterface, nodes ...int) (ts.SeriesList, error) {
if len(input.Values) == 0 {
return ts.SeriesList(input), nil
}
Expand All @@ -1029,24 +1029,56 @@ func asPercent(ctx *common.Context, input singlePathSpec, total genericInterface
if total.Len() == 0 {
// normalize input and sum up input as the total series
toNormalize = input.Values
tf = func(idx int) float64 { return totalBySum(normalized, idx) }
tf = func(idx int, _ *ts.Series) float64 { return totalBySum(normalized, idx) }
} else {
// check total is a single-series list and normalize all of them
if total.Len() != 1 {
err := errors.NewInvalidParamsError(errors.New("total must be a single series"))
return ts.NewSeriesList(), err
}
if len(nodes) > 0 {
// group the series by specified nodes and then sum those groups
groupedTotal, err := groupByNodes(ctx, input, "sum", nodes...)
if err != nil {
return ts.NewSeriesList(), err
}
toNormalize = append(input.Values, groupedTotal.Values[0])
metaSeriesSumByKey := make(map[string]*ts.Series)

toNormalize = append(input.Values, total.Values[0])
tf = func(idx int) float64 { return normalized[len(normalized)-1].ValueAt(idx) }
totalText = total.Values[0].Name()
// map the aggregation key to the aggregated series
for _, series := range groupedTotal.Values {
metaSeriesSumByKey[series.Name()] = series
}

tf = func(idx int, series *ts.Series) float64 {
// find which aggregation key this series falls under
// and return the sum for that aggregated group
key := getAggregationKey(series, nodes)
return metaSeriesSumByKey[key].ValueAt(idx)
}
totalText = groupedTotal.Values[0].Name()
} else {
toNormalize = append(input.Values, total.Values[0])
tf = func(idx int, _ *ts.Series) float64 { return normalized[len(normalized)-1].ValueAt(idx) }
totalText = total.Values[0].Name()
}
}
case float64:
toNormalize = input.Values
tf = func(idx int) float64 { return totalArg }
tf = func(idx int, _ *ts.Series) float64 { return totalArg }
totalText = fmt.Sprintf(common.FloatingPointFormat, totalArg)
case nil:
// if total is nil, the total is the sum of all the input series
toNormalize = input.Values
var err error = nil
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Don't need to initialize to nil, just var err error will get default value.

summedSeries, err := sumSeries(ctx, multiplePathSpecs(input))
if err != nil {
return ts.NewSeriesList(), err
}
tf = func(idx int, _ *ts.Series) float64 { return summedSeries.Values[0].ValueAt(idx) }
totalText = summedSeries.Values[0].Name()
default:
err := errors.NewInvalidParamsError(errors.New("total is neither an int nor a series"))
err := errors.NewInvalidParamsError(errors.New("total must be either an int, a series, or nil"))
return ts.NewSeriesList(), err
}

Expand All @@ -1066,8 +1098,8 @@ func asPercent(ctx *common.Context, input singlePathSpec, total genericInterface
values = append(values, percents)
}
for i := 0; i < normalized[0].Len(); i++ {
t := tf(i)
for j := 0; j < numInputSeries; j++ {
t := tf(i, normalized[j])
v := normalized[j].ValueAt(i)
if !math.IsNaN(v) && !math.IsNaN(t) && t != 0 {
values[j].SetValueAt(i, 100.0*v/t)
Expand Down Expand Up @@ -2348,6 +2380,7 @@ func init() {
})
MustRegisterFunction(asPercent).WithDefaultParams(map[uint8]interface{}{
2: []*ts.Series(nil), // total
3: nil, // nodes
})
MustRegisterFunction(averageAbove)
MustRegisterFunction(averageSeries)
Expand Down Expand Up @@ -2378,7 +2411,9 @@ func init() {
MustRegisterFunction(groupByNode).WithDefaultParams(map[uint8]interface{}{
3: "average", // fname
})
MustRegisterFunction(groupByNodes)
MustRegisterFunction(groupByNodes).WithDefaultParams(map[uint8]interface{}{
3: nil, // nodes
})
MustRegisterFunction(highest).WithDefaultParams(map[uint8]interface{}{
2: 1, // n,
3: "average", // f
Expand Down
40 changes: 40 additions & 0 deletions src/query/graphite/native/builtin_functions_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1966,6 +1966,46 @@ func TestAsPercentWithFloatTotal(t *testing.T) {
}
}

func TestAsPercentWithNilTotal(t *testing.T) {
ctx := common.NewTestContext()
defer ctx.Close()

nan := math.NaN()
tests := []struct {
valuesStep int
values []float64
outputStep int
output []float64
}{
{
60,
[]float64{12.0, 14.0, 16.0, nan, 20.0},
60,
[]float64{100, 100, 100, nan, 100},
},
}

for _, test := range tests {
timeSeries := ts.NewSeries(ctx, "<values>", ctx.StartTime,
common.NewTestSeriesValues(ctx, test.valuesStep, test.values))
r, err := asPercent(ctx, singlePathSpec{
Values: []*ts.Series{timeSeries},
}, nil)
require.NoError(t, err)

output := r.Values
require.Equal(t, 1, len(output))
require.Equal(t, output[0].MillisPerStep(), test.outputStep)
expectedName := fmt.Sprintf("asPercent(<values>, sumSeries(<values>))")
assert.Equal(t, expectedName, output[0].Name())

for step := 0; step < output[0].Len(); step++ {
v := output[0].ValueAt(step)
xtest.Equalish(t, math.Trunc(v), test.output[step])
}
}
}

func TestAsPercentWithSeriesList(t *testing.T) {
ctx := common.NewTestContext()
defer ctx.Close()
Expand Down