diff --git a/.chloggen/27083-spanmetrics-prune.yaml b/.chloggen/27083-spanmetrics-prune.yaml new file mode 100644 index 000000000000..9846e18f4559 --- /dev/null +++ b/.chloggen/27083-spanmetrics-prune.yaml @@ -0,0 +1,29 @@ +# Use this changelog template to create an entry for release notes. + +# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix' +change_type: bug_fix + +# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver) +component: processor/spanmetrics + +# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`). +note: Prune histograms when dimension cache is pruned. + +# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists. +issues: [27080] + +# (Optional) One or more lines of additional information to render under the primary note. +# These lines will be padded with 2 spaces and then inserted directly into the document. +# Use pipe (|) for multiline entries. +subtext: | + Dimension cache was always pruned but histograms were not being pruned. This caused metric series created + by processor/spanmetrics to grow unbounded. + +# If your change doesn't affect end users or the exported elements of any package, +# you should instead start your pull request title with [chore] or use the "Skip Changelog" label. +# Optional: The change log or logs in which this entry should be included. +# e.g. '[user]' or '[user, api]' +# Include 'user' if the change is relevant to end users. +# Include 'api' if there is a change to a library API. +# Default: '[user]' +change_logs: [user] diff --git a/processor/spanmetricsprocessor/internal/cache/cache.go b/processor/spanmetricsprocessor/internal/cache/cache.go index e0daebc9546c..768943873079 100644 --- a/processor/spanmetricsprocessor/internal/cache/cache.go +++ b/processor/spanmetricsprocessor/internal/cache/cache.go @@ -64,6 +64,10 @@ func (c *Cache[K, V]) Get(key K) (V, bool) { return val, ok } +func (c *Cache[K, V]) Contains(key K) bool { + return c.lru.Contains(key) +} + // Len returns the number of items in the cache. func (c *Cache[K, V]) Len() int { return c.lru.Len() diff --git a/processor/spanmetricsprocessor/processor.go b/processor/spanmetricsprocessor/processor.go index 5f06f67f3161..291d83902c3a 100644 --- a/processor/spanmetricsprocessor/processor.go +++ b/processor/spanmetricsprocessor/processor.go @@ -258,6 +258,11 @@ func (p *processorImp) exportMetrics(ctx context.Context) { p.metricKeyToDimensions.Purge() } else { p.metricKeyToDimensions.RemoveEvictedItems() + for key := range p.histograms { + if !p.metricKeyToDimensions.Contains(key) { + delete(p.histograms, key) + } + } } // This component no longer needs to read the metrics once built, so it is safe to unlock. diff --git a/processor/spanmetricsprocessor/processor_test.go b/processor/spanmetricsprocessor/processor_test.go index 8f90b7d095e7..52c25e7ca39f 100644 --- a/processor/spanmetricsprocessor/processor_test.go +++ b/processor/spanmetricsprocessor/processor_test.go @@ -47,7 +47,7 @@ const ( notInSpanAttrName0 = "shouldBeInMetric" notInSpanAttrName1 = "shouldNotBeInMetric" regionResourceAttrName = "region" - DimensionsCacheSize = 2 + DimensionsCacheSize = 1000 sampleRegion = "us-east-1" sampleLatency = float64(11) @@ -144,7 +144,7 @@ func TestProcessorConcurrentShutdown(t *testing.T) { ticker := mockClock.NewTicker(time.Nanosecond) // Test - p := newProcessorImp(mexp, tcon, nil, cumulative, logger, ticker) + p := newProcessorImp(mexp, tcon, nil, cumulative, logger, ticker, DimensionsCacheSize) err := p.Start(ctx, mhost) require.NoError(t, err) @@ -230,7 +230,7 @@ func TestProcessorConsumeTracesErrors(t *testing.T) { tcon := &mocks.TracesConsumer{} tcon.On("ConsumeTraces", mock.Anything, mock.Anything).Return(fakeErr) - p := newProcessorImp(mexp, tcon, nil, cumulative, logger, nil) + p := newProcessorImp(mexp, tcon, nil, cumulative, logger, nil, DimensionsCacheSize) traces := buildSampleTrace() @@ -262,7 +262,7 @@ func TestProcessorConsumeMetricsErrors(t *testing.T) { mockClock := clock.NewMock(time.Now()) ticker := mockClock.NewTicker(time.Nanosecond) - p := newProcessorImp(mexp, tcon, nil, cumulative, logger, ticker) + p := newProcessorImp(mexp, tcon, nil, cumulative, logger, ticker, DimensionsCacheSize) exporters := map[component.DataType]map[component.ID]component.Component{} mhost := &mocks.Host{} @@ -362,7 +362,7 @@ func TestProcessorConsumeTraces(t *testing.T) { mockClock := clock.NewMock(time.Now()) ticker := mockClock.NewTicker(time.Nanosecond) - p := newProcessorImp(mexp, tcon, &defaultNullValue, tc.aggregationTemporality, zaptest.NewLogger(t), ticker) + p := newProcessorImp(mexp, tcon, &defaultNullValue, tc.aggregationTemporality, zaptest.NewLogger(t), ticker, DimensionsCacheSize) exporters := map[component.DataType]map[component.ID]component.Component{} mhost := &mocks.Host{} @@ -387,39 +387,61 @@ func TestProcessorConsumeTraces(t *testing.T) { } } -func TestMetricKeyCache(t *testing.T) { +func TestMetricCache(t *testing.T) { + var wg sync.WaitGroup + mexp := &mocks.MetricsConsumer{} - tcon := &mocks.TracesConsumer{} + mexp.On("ConsumeMetrics", mock.Anything, mock.MatchedBy(func(input pmetric.Metrics) bool { + wg.Done() + return true + })).Return(nil) - mexp.On("ConsumeMetrics", mock.Anything, mock.Anything).Return(nil) + tcon := &mocks.TracesConsumer{} tcon.On("ConsumeTraces", mock.Anything, mock.Anything).Return(nil) defaultNullValue := pcommon.NewValueStr("defaultNullValue") - p := newProcessorImp(mexp, tcon, &defaultNullValue, cumulative, zaptest.NewLogger(t), nil) - traces := buildSampleTrace() + mockClock := clock.NewMock(time.Now()) + ticker := mockClock.NewTicker(time.Nanosecond) + dimensionsCacheSize := 2 + + p := newProcessorImp(mexp, tcon, &defaultNullValue, cumulative, zaptest.NewLogger(t), ticker, dimensionsCacheSize) + + exporters := map[component.DataType]map[component.ID]component.Component{} + mhost := &mocks.Host{} + mhost.On("GetExporters").Return(exporters) // Test ctx := metadata.NewIncomingContext(context.Background(), nil) + err := p.Start(ctx, mhost) + require.NoError(t, err) // 0 key was cached at beginning - assert.Zero(t, p.metricKeyToDimensions.Len()) + assert.Zero(t, len(p.histograms)) + + traces := buildSampleTrace() + require.Condition(t, func() bool { + return traces.SpanCount() >= dimensionsCacheSize + }) + + err = p.ConsumeTraces(ctx, traces) + wg.Add(1) + mockClock.Add(time.Nanosecond) + wg.Wait() - err := p.ConsumeTraces(ctx, traces) // Validate require.NoError(t, err) // 2 key was cached, 1 key was evicted and cleaned after the processing - assert.Eventually(t, func() bool { - return p.metricKeyToDimensions.Len() == DimensionsCacheSize - }, 10*time.Second, time.Millisecond*100) + assert.Equal(t, len(p.histograms), dimensionsCacheSize) // consume another batch of traces err = p.ConsumeTraces(ctx, traces) require.NoError(t, err) + wg.Add(1) + mockClock.Add(time.Nanosecond) + wg.Wait() // 2 key was cached, other keys were evicted and cleaned after the processing - assert.Eventually(t, func() bool { - return p.metricKeyToDimensions.Len() == DimensionsCacheSize - }, 10*time.Second, time.Millisecond*100) + assert.Equal(t, len(p.histograms), dimensionsCacheSize) } func BenchmarkProcessorConsumeTraces(b *testing.B) { @@ -431,7 +453,7 @@ func BenchmarkProcessorConsumeTraces(b *testing.B) { tcon.On("ConsumeTraces", mock.Anything, mock.Anything).Return(nil) defaultNullValue := pcommon.NewValueStr("defaultNullValue") - p := newProcessorImp(mexp, tcon, &defaultNullValue, cumulative, zaptest.NewLogger(b), nil) + p := newProcessorImp(mexp, tcon, &defaultNullValue, cumulative, zaptest.NewLogger(b), nil, DimensionsCacheSize) traces := buildSampleTrace() @@ -442,10 +464,10 @@ func BenchmarkProcessorConsumeTraces(b *testing.B) { } } -func newProcessorImp(mexp *mocks.MetricsConsumer, tcon *mocks.TracesConsumer, defaultNullValue *pcommon.Value, temporality string, logger *zap.Logger, ticker *clock.Ticker) *processorImp { +func newProcessorImp(mexp *mocks.MetricsConsumer, tcon *mocks.TracesConsumer, defaultNullValue *pcommon.Value, temporality string, logger *zap.Logger, ticker *clock.Ticker, cacheSize int) *processorImp { defaultNotInSpanAttrVal := pcommon.NewValueStr("defaultNotInSpanAttrVal") // use size 2 for LRU cache for testing purpose - metricKeyToDimensions, err := cache.NewCache[metricKey, pcommon.Map](DimensionsCacheSize) + metricKeyToDimensions, err := cache.NewCache[metricKey, pcommon.Map](cacheSize) if err != nil { panic(err) } @@ -979,8 +1001,7 @@ func TestConsumeTracesEvictedCacheKey(t *testing.T) { mockClock := clock.NewMock(time.Now()) ticker := mockClock.NewTicker(time.Nanosecond) - // Note: default dimension key cache size is 2. - p := newProcessorImp(mexp, tcon, &defaultNullValue, cumulative, zaptest.NewLogger(t), ticker) + p := newProcessorImp(mexp, tcon, &defaultNullValue, cumulative, zaptest.NewLogger(t), ticker, DimensionsCacheSize) exporters := map[component.DataType]map[component.ID]component.Component{}