From df1710daacca9914b77b8f681474c2a30b17f4b7 Mon Sep 17 00:00:00 2001 From: Bogdan Drutu Date: Tue, 27 Apr 2021 13:09:03 -0700 Subject: [PATCH] Fix batch processor metrics reorder, improve performance Benchmarks Before: ``` goos: darwin goarch: amd64 pkg: go.opentelemetry.io/collector/processor/batchprocessor cpu: Intel(R) Core(TM) i9-9880H CPU @ 2.30GHz BenchmarkSplitMetrics BenchmarkSplitMetrics-16 6660 170653 ns/op 182889 B/op 3309 allocs/op PASS Process finished with the exit code 0 ``` Benchmarks After: ``` goos: darwin goarch: amd64 pkg: go.opentelemetry.io/collector/processor/batchprocessor cpu: Intel(R) Core(TM) i9-9880H CPU @ 2.30GHz BenchmarkSplitMetrics BenchmarkSplitMetrics-16 7858 134259 ns/op 141881 B/op 2596 allocs/op PASS Process finished with the exit code 0 ``` Benchmarks Reference Clone: ``` goos: darwin goarch: amd64 pkg: go.opentelemetry.io/collector/processor/batchprocessor cpu: Intel(R) Core(TM) i9-9880H CPU @ 2.30GHz BenchmarkCloneMetrics BenchmarkCloneMetrics-16 8726 127948 ns/op 137816 B/op 2503 allocs/op PASS Process finished with the exit code 0 ``` Signed-off-by: Bogdan Drutu --- processor/batchprocessor/splitmetrics.go | 88 ++++++++--------- processor/batchprocessor/splitmetrics_test.go | 99 +++++++++++++++---- 2 files changed, 120 insertions(+), 67 deletions(-) diff --git a/processor/batchprocessor/splitmetrics.go b/processor/batchprocessor/splitmetrics.go index f45641c5e48..7853c5ab99e 100644 --- a/processor/batchprocessor/splitmetrics.go +++ b/processor/batchprocessor/splitmetrics.go @@ -19,56 +19,52 @@ import ( ) // splitMetrics removes metrics from the input data and returns a new data of the specified size. -func splitMetrics(size int, toSplit pdata.Metrics) pdata.Metrics { - if toSplit.MetricCount() <= size { - return toSplit +func splitMetrics(size int, src pdata.Metrics) pdata.Metrics { + if src.MetricCount() <= size { + return src } - copiedMetrics := 0 - result := pdata.NewMetrics() - result.ResourceMetrics().Resize(toSplit.ResourceMetrics().Len()) - rms := toSplit.ResourceMetrics() + totalCopiedMetrics := 0 + dest := pdata.NewMetrics() - rmsCount := 0 - for i := rms.Len() - 1; i >= 0; i-- { - rmsCount++ - rm := rms.At(i) - destRs := result.ResourceMetrics().At(result.ResourceMetrics().Len() - 1 - i) - rm.Resource().CopyTo(destRs.Resource()) - - destRs.InstrumentationLibraryMetrics().Resize(rm.InstrumentationLibraryMetrics().Len()) + src.ResourceMetrics().RemoveIf(func(srcRm pdata.ResourceMetrics) bool { + // If we are done skip everything else. + if totalCopiedMetrics == size { + return false + } - ilmCount := 0 - for j := rm.InstrumentationLibraryMetrics().Len() - 1; j >= 0; j-- { - ilmCount++ - instMetrics := rm.InstrumentationLibraryMetrics().At(j) - destInstMetrics := destRs.InstrumentationLibraryMetrics().At(destRs.InstrumentationLibraryMetrics().Len() - 1 - j) - instMetrics.InstrumentationLibrary().CopyTo(destInstMetrics.InstrumentationLibrary()) + destRs := dest.ResourceMetrics().AppendEmpty() + srcRm.Resource().CopyTo(destRs.Resource()) - if size-copiedMetrics >= instMetrics.Metrics().Len() { - destInstMetrics.Metrics().Resize(instMetrics.Metrics().Len()) - } else { - destInstMetrics.Metrics().Resize(size - copiedMetrics) - } - for k, destIdx := instMetrics.Metrics().Len()-1, 0; k >= 0 && copiedMetrics < size; k, destIdx = k-1, destIdx+1 { - metric := instMetrics.Metrics().At(k) - metric.CopyTo(destInstMetrics.Metrics().At(destIdx)) - copiedMetrics++ - // remove metric - instMetrics.Metrics().Resize(instMetrics.Metrics().Len() - 1) + srcRm.InstrumentationLibraryMetrics().RemoveIf(func(srcIm pdata.InstrumentationLibraryMetrics) bool { + // If we are done skip everything else. + if totalCopiedMetrics == size { + return false } - if instMetrics.Metrics().Len() == 0 { - rm.InstrumentationLibraryMetrics().Resize(rm.InstrumentationLibraryMetrics().Len() - 1) - } - if copiedMetrics == size { - result.ResourceMetrics().Resize(rmsCount) - return result + + destIms := destRs.InstrumentationLibraryMetrics().AppendEmpty() + srcIm.InstrumentationLibrary().CopyTo(destIms.InstrumentationLibrary()) + + // If possible to move all metrics do that. + srcMetricsLen := srcIm.Metrics().Len() + if size-totalCopiedMetrics >= srcMetricsLen { + totalCopiedMetrics += srcMetricsLen + srcIm.Metrics().MoveAndAppendTo(destIms.Metrics()) + return true } - } - destRs.InstrumentationLibraryMetrics().Resize(ilmCount) - if rm.InstrumentationLibraryMetrics().Len() == 0 { - rms.Resize(rms.Len() - 1) - } - } - result.ResourceMetrics().Resize(rmsCount) - return result + + srcIm.Metrics().RemoveIf(func(srcMetric pdata.Metric) bool { + // If we are done skip everything else. + if totalCopiedMetrics == size { + return false + } + srcMetric.CopyTo(destIms.Metrics().AppendEmpty()) + totalCopiedMetrics++ + return true + }) + return false + }) + return srcRm.InstrumentationLibraryMetrics().Len() == 0 + }) + + return dest } diff --git a/processor/batchprocessor/splitmetrics_test.go b/processor/batchprocessor/splitmetrics_test.go index 95153bee852..8d7e1c8743b 100644 --- a/processor/batchprocessor/splitmetrics_test.go +++ b/processor/batchprocessor/splitmetrics_test.go @@ -46,45 +46,59 @@ func TestSplitMetrics(t *testing.T) { cp.ResourceMetrics().At(0).InstrumentationLibraryMetrics().At(0).InstrumentationLibrary()) md.ResourceMetrics().At(0).Resource().CopyTo( cp.ResourceMetrics().At(0).Resource()) - metrics.At(19).CopyTo(cpMetrics.At(0)) - metrics.At(18).CopyTo(cpMetrics.At(1)) - metrics.At(17).CopyTo(cpMetrics.At(2)) - metrics.At(16).CopyTo(cpMetrics.At(3)) - metrics.At(15).CopyTo(cpMetrics.At(4)) + metrics.At(0).CopyTo(cpMetrics.At(0)) + metrics.At(1).CopyTo(cpMetrics.At(1)) + metrics.At(2).CopyTo(cpMetrics.At(2)) + metrics.At(3).CopyTo(cpMetrics.At(3)) + metrics.At(4).CopyTo(cpMetrics.At(4)) splitSize := 5 split := splitMetrics(splitSize, md) assert.Equal(t, splitSize, split.MetricCount()) assert.Equal(t, cp, split) assert.Equal(t, 15, md.MetricCount()) - assert.Equal(t, "test-metric-int-0-19", split.ResourceMetrics().At(0).InstrumentationLibraryMetrics().At(0).Metrics().At(0).Name()) - assert.Equal(t, "test-metric-int-0-15", split.ResourceMetrics().At(0).InstrumentationLibraryMetrics().At(0).Metrics().At(4).Name()) + assert.Equal(t, "test-metric-int-0-0", split.ResourceMetrics().At(0).InstrumentationLibraryMetrics().At(0).Metrics().At(0).Name()) + assert.Equal(t, "test-metric-int-0-4", split.ResourceMetrics().At(0).InstrumentationLibraryMetrics().At(0).Metrics().At(4).Name()) + + split = splitMetrics(splitSize, md) + assert.Equal(t, 10, md.MetricCount()) + assert.Equal(t, "test-metric-int-0-5", split.ResourceMetrics().At(0).InstrumentationLibraryMetrics().At(0).Metrics().At(0).Name()) + assert.Equal(t, "test-metric-int-0-9", split.ResourceMetrics().At(0).InstrumentationLibraryMetrics().At(0).Metrics().At(4).Name()) + + split = splitMetrics(splitSize, md) + assert.Equal(t, 5, md.MetricCount()) + assert.Equal(t, "test-metric-int-0-10", split.ResourceMetrics().At(0).InstrumentationLibraryMetrics().At(0).Metrics().At(0).Name()) + assert.Equal(t, "test-metric-int-0-14", split.ResourceMetrics().At(0).InstrumentationLibraryMetrics().At(0).Metrics().At(4).Name()) + + split = splitMetrics(splitSize, md) + assert.Equal(t, 5, md.MetricCount()) + assert.Equal(t, "test-metric-int-0-15", split.ResourceMetrics().At(0).InstrumentationLibraryMetrics().At(0).Metrics().At(0).Name()) + assert.Equal(t, "test-metric-int-0-19", split.ResourceMetrics().At(0).InstrumentationLibraryMetrics().At(0).Metrics().At(4).Name()) } func TestSplitMetricsMultipleResourceSpans(t *testing.T) { - td := testdata.GenerateMetricsManyMetricsSameResource(20) - metrics := td.ResourceMetrics().At(0).InstrumentationLibraryMetrics().At(0).Metrics() + md := testdata.GenerateMetricsManyMetricsSameResource(20) + metrics := md.ResourceMetrics().At(0).InstrumentationLibraryMetrics().At(0).Metrics() for i := 0; i < metrics.Len(); i++ { metrics.At(i).SetName(getTestMetricName(0, i)) } - td.ResourceMetrics().Resize(2) // add second index to resource metrics testdata.GenerateMetricsManyMetricsSameResource(20). - ResourceMetrics().At(0).CopyTo(td.ResourceMetrics().At(1)) - metrics = td.ResourceMetrics().At(1).InstrumentationLibraryMetrics().At(0).Metrics() + ResourceMetrics().At(0).CopyTo(md.ResourceMetrics().AppendEmpty()) + metrics = md.ResourceMetrics().At(1).InstrumentationLibraryMetrics().At(0).Metrics() for i := 0; i < metrics.Len(); i++ { metrics.At(i).SetName(getTestMetricName(1, i)) } splitSize := 5 - split := splitMetrics(splitSize, td) + split := splitMetrics(splitSize, md) assert.Equal(t, splitSize, split.MetricCount()) - assert.Equal(t, 35, td.MetricCount()) - assert.Equal(t, "test-metric-int-1-19", split.ResourceMetrics().At(0).InstrumentationLibraryMetrics().At(0).Metrics().At(0).Name()) - assert.Equal(t, "test-metric-int-1-15", split.ResourceMetrics().At(0).InstrumentationLibraryMetrics().At(0).Metrics().At(4).Name()) + assert.Equal(t, 35, md.MetricCount()) + assert.Equal(t, "test-metric-int-0-0", split.ResourceMetrics().At(0).InstrumentationLibraryMetrics().At(0).Metrics().At(0).Name()) + assert.Equal(t, "test-metric-int-0-4", split.ResourceMetrics().At(0).InstrumentationLibraryMetrics().At(0).Metrics().At(4).Name()) } -func TestSplitMetricsMultipleResourceSpans_split_size_greater_than_metric_size(t *testing.T) { +func TestSplitMetricsMultipleResourceSpans_SplitSizeGreaterThanMetricSize(t *testing.T) { td := testdata.GenerateMetricsManyMetricsSameResource(20) metrics := td.ResourceMetrics().At(0).InstrumentationLibraryMetrics().At(0).Metrics() for i := 0; i < metrics.Len(); i++ { @@ -104,8 +118,51 @@ func TestSplitMetricsMultipleResourceSpans_split_size_greater_than_metric_size(t assert.Equal(t, splitSize, split.MetricCount()) assert.Equal(t, 40-splitSize, td.MetricCount()) assert.Equal(t, 1, td.ResourceMetrics().Len()) - assert.Equal(t, "test-metric-int-1-19", split.ResourceMetrics().At(0).InstrumentationLibraryMetrics().At(0).Metrics().At(0).Name()) - assert.Equal(t, "test-metric-int-1-0", split.ResourceMetrics().At(0).InstrumentationLibraryMetrics().At(0).Metrics().At(19).Name()) - assert.Equal(t, "test-metric-int-0-19", split.ResourceMetrics().At(1).InstrumentationLibraryMetrics().At(0).Metrics().At(0).Name()) - assert.Equal(t, "test-metric-int-0-15", split.ResourceMetrics().At(1).InstrumentationLibraryMetrics().At(0).Metrics().At(4).Name()) + assert.Equal(t, "test-metric-int-0-0", split.ResourceMetrics().At(0).InstrumentationLibraryMetrics().At(0).Metrics().At(0).Name()) + assert.Equal(t, "test-metric-int-0-19", split.ResourceMetrics().At(0).InstrumentationLibraryMetrics().At(0).Metrics().At(19).Name()) + assert.Equal(t, "test-metric-int-1-0", split.ResourceMetrics().At(1).InstrumentationLibraryMetrics().At(0).Metrics().At(0).Name()) + assert.Equal(t, "test-metric-int-1-4", split.ResourceMetrics().At(1).InstrumentationLibraryMetrics().At(0).Metrics().At(4).Name()) +} + +func BenchmarkSplitMetrics(b *testing.B) { + md := pdata.NewMetrics() + rms := md.ResourceMetrics() + for i := 0; i < 20; i++ { + testdata.GenerateMetricsManyMetricsSameResource(20).ResourceMetrics().MoveAndAppendTo(md.ResourceMetrics()) + ms := rms.At(rms.Len() - 1).InstrumentationLibraryMetrics().At(0).Metrics() + for i := 0; i < ms.Len(); i++ { + ms.At(i).SetName(getTestMetricName(1, i)) + } + } + + b.ReportAllocs() + b.ResetTimer() + for n := 0; n < b.N; n++ { + cloneReq := md.Clone() + split := splitMetrics(128, cloneReq) + if split.MetricCount() != 128 || cloneReq.MetricCount() != 400-128 { + b.Fail() + } + } +} + +func BenchmarkCloneMetrics(b *testing.B) { + md := pdata.NewMetrics() + rms := md.ResourceMetrics() + for i := 0; i < 20; i++ { + testdata.GenerateMetricsManyMetricsSameResource(20).ResourceMetrics().MoveAndAppendTo(md.ResourceMetrics()) + ms := rms.At(rms.Len() - 1).InstrumentationLibraryMetrics().At(0).Metrics() + for i := 0; i < ms.Len(); i++ { + ms.At(i).SetName(getTestMetricName(1, i)) + } + } + + b.ReportAllocs() + b.ResetTimer() + for n := 0; n < b.N; n++ { + cloneReq := md.Clone() + if cloneReq.MetricCount() != 400 { + b.Fail() + } + } }