Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix batch processor metrics reorder, improve performance #3034

Merged
merged 1 commit into from
May 5, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
88 changes: 42 additions & 46 deletions processor/batchprocessor/splitmetrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,56 +19,52 @@ import (
)

// splitMetrics removes metrics from the input data and returns a new data of the specified size.
func splitMetrics(size int, toSplit pdata.Metrics) pdata.Metrics {
if toSplit.MetricCount() <= size {
return toSplit
func splitMetrics(size int, src pdata.Metrics) pdata.Metrics {
if src.MetricCount() <= size {
return src
}
copiedMetrics := 0
result := pdata.NewMetrics()
result.ResourceMetrics().Resize(toSplit.ResourceMetrics().Len())
rms := toSplit.ResourceMetrics()
totalCopiedMetrics := 0
dest := pdata.NewMetrics()

rmsCount := 0
for i := rms.Len() - 1; i >= 0; i-- {
rmsCount++
rm := rms.At(i)
destRs := result.ResourceMetrics().At(result.ResourceMetrics().Len() - 1 - i)
rm.Resource().CopyTo(destRs.Resource())

destRs.InstrumentationLibraryMetrics().Resize(rm.InstrumentationLibraryMetrics().Len())
src.ResourceMetrics().RemoveIf(func(srcRm pdata.ResourceMetrics) bool {
// If we are done skip everything else.
if totalCopiedMetrics == size {
return false
}

ilmCount := 0
for j := rm.InstrumentationLibraryMetrics().Len() - 1; j >= 0; j-- {
ilmCount++
instMetrics := rm.InstrumentationLibraryMetrics().At(j)
destInstMetrics := destRs.InstrumentationLibraryMetrics().At(destRs.InstrumentationLibraryMetrics().Len() - 1 - j)
instMetrics.InstrumentationLibrary().CopyTo(destInstMetrics.InstrumentationLibrary())
destRs := dest.ResourceMetrics().AppendEmpty()
srcRm.Resource().CopyTo(destRs.Resource())

if size-copiedMetrics >= instMetrics.Metrics().Len() {
destInstMetrics.Metrics().Resize(instMetrics.Metrics().Len())
} else {
destInstMetrics.Metrics().Resize(size - copiedMetrics)
}
for k, destIdx := instMetrics.Metrics().Len()-1, 0; k >= 0 && copiedMetrics < size; k, destIdx = k-1, destIdx+1 {
metric := instMetrics.Metrics().At(k)
metric.CopyTo(destInstMetrics.Metrics().At(destIdx))
copiedMetrics++
// remove metric
instMetrics.Metrics().Resize(instMetrics.Metrics().Len() - 1)
srcRm.InstrumentationLibraryMetrics().RemoveIf(func(srcIm pdata.InstrumentationLibraryMetrics) bool {
// If we are done skip everything else.
if totalCopiedMetrics == size {
return false
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The return true and return false operations for RemoveIf callbacks here are a bit difficult to understand (return false is not self-explanatory). Please add comments to explain what is happening.
A nicer approach perhaps would be to change RemoveIf's callback to return an enum instead of bool. In that case we could write e.g. return pdata.Remove, return pdata.Keep or return pdata.BreakIteration or similar.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I like it, the "problem" is that we will avoid calling the callback but we still need to iterate over all to shift them to the left in case something was removed.

}
if instMetrics.Metrics().Len() == 0 {
rm.InstrumentationLibraryMetrics().Resize(rm.InstrumentationLibraryMetrics().Len() - 1)
}
if copiedMetrics == size {
result.ResourceMetrics().Resize(rmsCount)
return result

destIms := destRs.InstrumentationLibraryMetrics().AppendEmpty()
srcIm.InstrumentationLibrary().CopyTo(destIms.InstrumentationLibrary())

// If possible to move all metrics do that.
srcMetricsLen := srcIm.Metrics().Len()
if size-totalCopiedMetrics >= srcMetricsLen {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This might read a little cleaner:

Suggested change
if size-totalCopiedMetrics >= srcMetricsLen {
remainingSpace := size - totalCopiedMetrics
if srcMetricsLen <= remainingSpace {
totalCopiedMetrics += srcMetricsLen

totalCopiedMetrics += srcMetricsLen
srcIm.Metrics().MoveAndAppendTo(destIms.Metrics())
return true
}
}
destRs.InstrumentationLibraryMetrics().Resize(ilmCount)
if rm.InstrumentationLibraryMetrics().Len() == 0 {
rms.Resize(rms.Len() - 1)
}
}
result.ResourceMetrics().Resize(rmsCount)
return result

srcIm.Metrics().RemoveIf(func(srcMetric pdata.Metric) bool {
// If we are done skip everything else.
if totalCopiedMetrics == size {
return false
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: it would be nicer if RemoveIf had a way to break the iteration. In this case we will continue to iterate but there is no need, it is pointless. (BTW, RemoveAt would allow this :-) )

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I know, was thinking to actually return a slice of pdata to benefit of the fact that we iterate over all anyway, but I like your suggestion about returning an enum. Will do a quick benchmark to confirm the switch does not affect too much.

}
srcMetric.CopyTo(destIms.Metrics().AppendEmpty())
totalCopiedMetrics++
return true
})
return false
})
return srcRm.InstrumentationLibraryMetrics().Len() == 0
})

return dest
}
99 changes: 78 additions & 21 deletions processor/batchprocessor/splitmetrics_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,45 +46,59 @@ func TestSplitMetrics(t *testing.T) {
cp.ResourceMetrics().At(0).InstrumentationLibraryMetrics().At(0).InstrumentationLibrary())
md.ResourceMetrics().At(0).Resource().CopyTo(
cp.ResourceMetrics().At(0).Resource())
metrics.At(19).CopyTo(cpMetrics.At(0))
metrics.At(18).CopyTo(cpMetrics.At(1))
metrics.At(17).CopyTo(cpMetrics.At(2))
metrics.At(16).CopyTo(cpMetrics.At(3))
metrics.At(15).CopyTo(cpMetrics.At(4))
metrics.At(0).CopyTo(cpMetrics.At(0))
metrics.At(1).CopyTo(cpMetrics.At(1))
metrics.At(2).CopyTo(cpMetrics.At(2))
metrics.At(3).CopyTo(cpMetrics.At(3))
metrics.At(4).CopyTo(cpMetrics.At(4))

splitSize := 5
split := splitMetrics(splitSize, md)
assert.Equal(t, splitSize, split.MetricCount())
assert.Equal(t, cp, split)
assert.Equal(t, 15, md.MetricCount())
assert.Equal(t, "test-metric-int-0-19", split.ResourceMetrics().At(0).InstrumentationLibraryMetrics().At(0).Metrics().At(0).Name())
assert.Equal(t, "test-metric-int-0-15", split.ResourceMetrics().At(0).InstrumentationLibraryMetrics().At(0).Metrics().At(4).Name())
assert.Equal(t, "test-metric-int-0-0", split.ResourceMetrics().At(0).InstrumentationLibraryMetrics().At(0).Metrics().At(0).Name())
assert.Equal(t, "test-metric-int-0-4", split.ResourceMetrics().At(0).InstrumentationLibraryMetrics().At(0).Metrics().At(4).Name())

split = splitMetrics(splitSize, md)
assert.Equal(t, 10, md.MetricCount())
assert.Equal(t, "test-metric-int-0-5", split.ResourceMetrics().At(0).InstrumentationLibraryMetrics().At(0).Metrics().At(0).Name())
assert.Equal(t, "test-metric-int-0-9", split.ResourceMetrics().At(0).InstrumentationLibraryMetrics().At(0).Metrics().At(4).Name())

split = splitMetrics(splitSize, md)
assert.Equal(t, 5, md.MetricCount())
assert.Equal(t, "test-metric-int-0-10", split.ResourceMetrics().At(0).InstrumentationLibraryMetrics().At(0).Metrics().At(0).Name())
assert.Equal(t, "test-metric-int-0-14", split.ResourceMetrics().At(0).InstrumentationLibraryMetrics().At(0).Metrics().At(4).Name())

split = splitMetrics(splitSize, md)
assert.Equal(t, 5, md.MetricCount())
assert.Equal(t, "test-metric-int-0-15", split.ResourceMetrics().At(0).InstrumentationLibraryMetrics().At(0).Metrics().At(0).Name())
assert.Equal(t, "test-metric-int-0-19", split.ResourceMetrics().At(0).InstrumentationLibraryMetrics().At(0).Metrics().At(4).Name())
}

func TestSplitMetricsMultipleResourceSpans(t *testing.T) {
td := testdata.GenerateMetricsManyMetricsSameResource(20)
metrics := td.ResourceMetrics().At(0).InstrumentationLibraryMetrics().At(0).Metrics()
md := testdata.GenerateMetricsManyMetricsSameResource(20)
metrics := md.ResourceMetrics().At(0).InstrumentationLibraryMetrics().At(0).Metrics()
for i := 0; i < metrics.Len(); i++ {
metrics.At(i).SetName(getTestMetricName(0, i))
}
td.ResourceMetrics().Resize(2)
// add second index to resource metrics
testdata.GenerateMetricsManyMetricsSameResource(20).
ResourceMetrics().At(0).CopyTo(td.ResourceMetrics().At(1))
metrics = td.ResourceMetrics().At(1).InstrumentationLibraryMetrics().At(0).Metrics()
ResourceMetrics().At(0).CopyTo(md.ResourceMetrics().AppendEmpty())
metrics = md.ResourceMetrics().At(1).InstrumentationLibraryMetrics().At(0).Metrics()
for i := 0; i < metrics.Len(); i++ {
metrics.At(i).SetName(getTestMetricName(1, i))
}

splitSize := 5
split := splitMetrics(splitSize, td)
split := splitMetrics(splitSize, md)
assert.Equal(t, splitSize, split.MetricCount())
assert.Equal(t, 35, td.MetricCount())
assert.Equal(t, "test-metric-int-1-19", split.ResourceMetrics().At(0).InstrumentationLibraryMetrics().At(0).Metrics().At(0).Name())
assert.Equal(t, "test-metric-int-1-15", split.ResourceMetrics().At(0).InstrumentationLibraryMetrics().At(0).Metrics().At(4).Name())
assert.Equal(t, 35, md.MetricCount())
assert.Equal(t, "test-metric-int-0-0", split.ResourceMetrics().At(0).InstrumentationLibraryMetrics().At(0).Metrics().At(0).Name())
assert.Equal(t, "test-metric-int-0-4", split.ResourceMetrics().At(0).InstrumentationLibraryMetrics().At(0).Metrics().At(4).Name())
}

func TestSplitMetricsMultipleResourceSpans_split_size_greater_than_metric_size(t *testing.T) {
func TestSplitMetricsMultipleResourceSpans_SplitSizeGreaterThanMetricSize(t *testing.T) {
td := testdata.GenerateMetricsManyMetricsSameResource(20)
metrics := td.ResourceMetrics().At(0).InstrumentationLibraryMetrics().At(0).Metrics()
for i := 0; i < metrics.Len(); i++ {
Expand All @@ -104,8 +118,51 @@ func TestSplitMetricsMultipleResourceSpans_split_size_greater_than_metric_size(t
assert.Equal(t, splitSize, split.MetricCount())
assert.Equal(t, 40-splitSize, td.MetricCount())
assert.Equal(t, 1, td.ResourceMetrics().Len())
assert.Equal(t, "test-metric-int-1-19", split.ResourceMetrics().At(0).InstrumentationLibraryMetrics().At(0).Metrics().At(0).Name())
assert.Equal(t, "test-metric-int-1-0", split.ResourceMetrics().At(0).InstrumentationLibraryMetrics().At(0).Metrics().At(19).Name())
assert.Equal(t, "test-metric-int-0-19", split.ResourceMetrics().At(1).InstrumentationLibraryMetrics().At(0).Metrics().At(0).Name())
assert.Equal(t, "test-metric-int-0-15", split.ResourceMetrics().At(1).InstrumentationLibraryMetrics().At(0).Metrics().At(4).Name())
assert.Equal(t, "test-metric-int-0-0", split.ResourceMetrics().At(0).InstrumentationLibraryMetrics().At(0).Metrics().At(0).Name())
assert.Equal(t, "test-metric-int-0-19", split.ResourceMetrics().At(0).InstrumentationLibraryMetrics().At(0).Metrics().At(19).Name())
assert.Equal(t, "test-metric-int-1-0", split.ResourceMetrics().At(1).InstrumentationLibraryMetrics().At(0).Metrics().At(0).Name())
assert.Equal(t, "test-metric-int-1-4", split.ResourceMetrics().At(1).InstrumentationLibraryMetrics().At(0).Metrics().At(4).Name())
}

func BenchmarkSplitMetrics(b *testing.B) {
md := pdata.NewMetrics()
rms := md.ResourceMetrics()
for i := 0; i < 20; i++ {
testdata.GenerateMetricsManyMetricsSameResource(20).ResourceMetrics().MoveAndAppendTo(md.ResourceMetrics())
ms := rms.At(rms.Len() - 1).InstrumentationLibraryMetrics().At(0).Metrics()
for i := 0; i < ms.Len(); i++ {
ms.At(i).SetName(getTestMetricName(1, i))
}
}

b.ReportAllocs()
b.ResetTimer()
for n := 0; n < b.N; n++ {
cloneReq := md.Clone()
split := splitMetrics(128, cloneReq)
if split.MetricCount() != 128 || cloneReq.MetricCount() != 400-128 {
b.Fail()
}
}
}

func BenchmarkCloneMetrics(b *testing.B) {
md := pdata.NewMetrics()
rms := md.ResourceMetrics()
for i := 0; i < 20; i++ {
testdata.GenerateMetricsManyMetricsSameResource(20).ResourceMetrics().MoveAndAppendTo(md.ResourceMetrics())
ms := rms.At(rms.Len() - 1).InstrumentationLibraryMetrics().At(0).Metrics()
for i := 0; i < ms.Len(); i++ {
ms.At(i).SetName(getTestMetricName(1, i))
}
}

b.ReportAllocs()
b.ResetTimer()
for n := 0; n < b.N; n++ {
cloneReq := md.Clone()
if cloneReq.MetricCount() != 400 {
b.Fail()
}
}
}