From d890df04f0e3147f3de08ef95be754103c51f8aa Mon Sep 17 00:00:00 2001 From: Tigran Najaryan Date: Fri, 31 Mar 2023 11:26:26 -0400 Subject: [PATCH] Clarify that memory limiter refuses data, doesn't drop it Contributes to https://github.com/open-telemetry/opentelemetry-collector/issues/1084 - Clarify what the memory limiter does. - Set expectations from receivers, how they are supposed to react when the memory limiter refuses the data. All receivers must adhere to this contract. See for example an issue opened against filelog receiver: https://github.com/open-telemetry/opentelemetry-collector-contrib/issues/20511 Future work: one additional thing we can do is implement a backoff logic in the memory limiter. When in memory limited mode the processor can introduce pauses before it returns from the ConsumeLogs/Traces/Metrics call. This will allow to slow down the inflow of data into the Collector and give time for the pipeline to clear and memory usage to return to the normal. This needs to be explored further. --- processor/memorylimiterprocessor/README.md | 28 +++++++--- processor/memorylimiterprocessor/config.go | 3 +- .../memorylimiterprocessor/memorylimiter.go | 46 ++++++++-------- .../memorylimiter_test.go | 54 +++++++++---------- 4 files changed, 71 insertions(+), 60 deletions(-) diff --git a/processor/memorylimiterprocessor/README.md b/processor/memorylimiterprocessor/README.md index 0e896d0e271..e0e4f22a22e 100644 --- a/processor/memorylimiterprocessor/README.md +++ b/processor/memorylimiterprocessor/README.md @@ -13,21 +13,32 @@ on the configured processors, it is important to put checks in place regarding memory usage. The memory_limiter processor allows to perform periodic checks of memory -usage if it exceeds defined limits will begin dropping data and forcing GC to reduce +usage if it exceeds defined limits will begin refusing data and forcing GC to reduce memory consumption. The memory_limiter uses soft and hard memory limits. Hard limit is always above or equal the soft limit. -When the memory usage exceeds the soft limit the processor will start dropping the data and -return errors to the preceding component it in the pipeline (which should be normally a -receiver). +When the memory usage exceeds the soft limit the processor will enter the memory limited +mode and will start refusing the data by returning errors to the preceding component +in the pipeline that made the ConsumeLogs/Trace/Metrics function call. +The preceding component should be normally a receiver. -When the memory usage is above the hard limit in addition to dropping the data the +In memory limited mode the error returned by ConsumeLogs/Trace/Metrics function is a +non-permanent error. When receivers see this error they are expected to retry sending +the same data. The receivers may also apply a backpressure to their data sources +in order to slow down the inflow of data into the Collector and allow the memory usage +to go below the limits. + +>Warning: if the component preceding the memory limiter in the pipeline does not correctly +retry and send the data again after ConsumeLogs/Trace/Metrics functions return then that +data will be permanently lost. We consider such components incorrectly implemented. + +When the memory usage is above the hard limit in addition to refusing the data the processor will forcedly perform garbage collection in order to try to free memory. When the memory usage drop below the soft limit, the normal operation is resumed (data -will not longer be dropped and no forced garbage collection will be performed). +will no longer be refused and no forced garbage collection will be performed). The difference between the soft limit and hard limits is defined via `spike_limit_mib` configuration option. The value of this option should be selected in a way that ensures @@ -39,8 +50,9 @@ A good starting point for `spike_limit_mib` is 20% of the hard limit. Bigger Note that while the processor can help mitigate out of memory situations, it is not a replacement for properly sizing and configuring the collector. Keep in mind that if the soft limit is crossed, the collector will -return errors to all receive operations until enough memory is freed. This will -result in dropped data. +return errors to all receive operations until enough memory is freed. This may +eventually result in dropped data since the receivers may not be able to hold back +and retry the data indefinitely. It is highly recommended to configure `ballastextension` as well as the `memory_limiter` processor on every collector. The ballast should be configured to diff --git a/processor/memorylimiterprocessor/config.go b/processor/memorylimiterprocessor/config.go index 5bf18b0d2d9..b81f2f84ecf 100644 --- a/processor/memorylimiterprocessor/config.go +++ b/processor/memorylimiterprocessor/config.go @@ -13,8 +13,7 @@ // limitations under the License. // Package memorylimiterprocessor provides a processor for OpenTelemetry Service pipeline -// that drops data on the pipeline according to the current state of memory -// usage. +// that refuses data on the pipeline according to the current state of memory usage. package memorylimiterprocessor // import "go.opentelemetry.io/collector/processor/memorylimiterprocessor" import ( diff --git a/processor/memorylimiterprocessor/memorylimiter.go b/processor/memorylimiterprocessor/memorylimiter.go index 09c6042cb4f..6cad937a20e 100644 --- a/processor/memorylimiterprocessor/memorylimiter.go +++ b/processor/memorylimiterprocessor/memorylimiter.go @@ -39,9 +39,9 @@ const ( ) var ( - // errForcedDrop will be returned to callers of ConsumeTraceData to indicate - // that data is being dropped due to high memory usage. - errForcedDrop = errors.New("data dropped due to high memory usage") + // errDataRefused will be returned to callers of ConsumeTraceData to indicate + // that data is being refused due to high memory usage. + errDataRefused = errors.New("data refused due to high memory usage") // Construction errors @@ -70,8 +70,8 @@ type memoryLimiter struct { memCheckWait time.Duration ballastSize uint64 - // forceDrop is used atomically to indicate when data should be dropped. - forceDrop *atomic.Bool + // mustRefuse is used to indicate when data should be refused. + mustRefuse *atomic.Bool ticker *time.Ticker @@ -129,7 +129,7 @@ func newMemoryLimiter(set processor.CreateSettings, cfg *Config) (*memoryLimiter ticker: time.NewTicker(cfg.CheckInterval), readMemStatsFn: runtime.ReadMemStats, logger: logger, - forceDrop: &atomic.Bool{}, + mustRefuse: &atomic.Bool{}, obsrep: obsrep, } @@ -180,15 +180,15 @@ func (ml *memoryLimiter) shutdown(context.Context) error { func (ml *memoryLimiter) processTraces(ctx context.Context, td ptrace.Traces) (ptrace.Traces, error) { numSpans := td.SpanCount() - if ml.forceDrop.Load() { + if ml.mustRefuse.Load() { // TODO: actually to be 100% sure that this is "refused" and not "dropped" // it is necessary to check the pipeline to see if this is directly connected // to a receiver (ie.: a receiver is on the call stack). For now it // assumes that the pipeline is properly configured and a receiver is on the - // callstack. + // callstack and that the receiver will correctly retry the refused data again. ml.obsrep.TracesRefused(ctx, numSpans) - return td, errForcedDrop + return td, errDataRefused } // Even if the next consumer returns error record the data as accepted by @@ -199,14 +199,14 @@ func (ml *memoryLimiter) processTraces(ctx context.Context, td ptrace.Traces) (p func (ml *memoryLimiter) processMetrics(ctx context.Context, md pmetric.Metrics) (pmetric.Metrics, error) { numDataPoints := md.DataPointCount() - if ml.forceDrop.Load() { + if ml.mustRefuse.Load() { // TODO: actually to be 100% sure that this is "refused" and not "dropped" // it is necessary to check the pipeline to see if this is directly connected // to a receiver (ie.: a receiver is on the call stack). For now it // assumes that the pipeline is properly configured and a receiver is on the // callstack. ml.obsrep.MetricsRefused(ctx, numDataPoints) - return md, errForcedDrop + return md, errDataRefused } // Even if the next consumer returns error record the data as accepted by @@ -217,7 +217,7 @@ func (ml *memoryLimiter) processMetrics(ctx context.Context, md pmetric.Metrics) func (ml *memoryLimiter) processLogs(ctx context.Context, ld plog.Logs) (plog.Logs, error) { numRecords := ld.LogRecordCount() - if ml.forceDrop.Load() { + if ml.mustRefuse.Load() { // TODO: actually to be 100% sure that this is "refused" and not "dropped" // it is necessary to check the pipeline to see if this is directly connected // to a receiver (ie.: a receiver is on the call stack). For now it @@ -225,7 +225,7 @@ func (ml *memoryLimiter) processLogs(ctx context.Context, ld plog.Logs) (plog.Lo // callstack. ml.obsrep.LogsRefused(ctx, numRecords) - return ld, errForcedDrop + return ld, errDataRefused } // Even if the next consumer returns error record the data as accepted by @@ -288,33 +288,33 @@ func (ml *memoryLimiter) checkMemLimits() { ms = ml.doGCandReadMemStats() } - // Remember current dropping state. - wasForcingDrop := ml.forceDrop.Load() + // Remember current state. + wasRefusing := ml.mustRefuse.Load() // Check if the memory usage is above the soft limit. - mustForceDrop := ml.usageChecker.aboveSoftLimit(ms) + mustRefuse := ml.usageChecker.aboveSoftLimit(ms) - if wasForcingDrop && !mustForceDrop { - // Was previously dropping but enough memory is available now, no need to limit. + if wasRefusing && !mustRefuse { + // Was previously refusing but enough memory is available now, no need to limit. ml.logger.Info("Memory usage back within limits. Resuming normal operation.", memstatToZapField(ms)) } - if !wasForcingDrop && mustForceDrop { + if !wasRefusing && mustRefuse { // We are above soft limit, do a GC if it wasn't done recently and see if // it brings memory usage below the soft limit. if time.Since(ml.lastGCDone) > minGCIntervalWhenSoftLimited { ml.logger.Info("Memory usage is above soft limit. Forcing a GC.", memstatToZapField(ms)) ms = ml.doGCandReadMemStats() // Check the limit again to see if GC helped. - mustForceDrop = ml.usageChecker.aboveSoftLimit(ms) + mustRefuse = ml.usageChecker.aboveSoftLimit(ms) } - if mustForceDrop { - ml.logger.Warn("Memory usage is above soft limit. Dropping data.", memstatToZapField(ms)) + if mustRefuse { + ml.logger.Warn("Memory usage is above soft limit. Refusing data.", memstatToZapField(ms)) } } - ml.forceDrop.Store(mustForceDrop) + ml.mustRefuse.Store(mustRefuse) } type memUsageChecker struct { diff --git a/processor/memorylimiterprocessor/memorylimiter_test.go b/processor/memorylimiterprocessor/memorylimiter_test.go index 19b910f728c..7377b43c774 100644 --- a/processor/memorylimiterprocessor/memorylimiter_test.go +++ b/processor/memorylimiterprocessor/memorylimiter_test.go @@ -112,7 +112,7 @@ func TestMetricsMemoryPressureResponse(t *testing.T) { usageChecker: memUsageChecker{ memAllocLimit: 1024, }, - forceDrop: &atomic.Bool{}, + mustRefuse: &atomic.Bool{}, readMemStatsFn: func(ms *runtime.MemStats) { ms.Alloc = currentMemAlloc }, @@ -140,7 +140,7 @@ func TestMetricsMemoryPressureResponse(t *testing.T) { // Above memAllocLimit. currentMemAlloc = 1800 ml.checkMemLimits() - assert.Equal(t, errForcedDrop, mp.ConsumeMetrics(ctx, md)) + assert.Equal(t, errDataRefused, mp.ConsumeMetrics(ctx, md)) // Check ballast effect ml.ballastSize = 1000 @@ -153,7 +153,7 @@ func TestMetricsMemoryPressureResponse(t *testing.T) { // Above memAllocLimit even accountiing for ballast. currentMemAlloc = 1800 + ml.ballastSize ml.checkMemLimits() - assert.Equal(t, errForcedDrop, mp.ConsumeMetrics(ctx, md)) + assert.Equal(t, errDataRefused, mp.ConsumeMetrics(ctx, md)) // Restore ballast to default. ml.ballastSize = 0 @@ -169,7 +169,7 @@ func TestMetricsMemoryPressureResponse(t *testing.T) { // Above memSpikeLimit. currentMemAlloc = 550 ml.checkMemLimits() - assert.Equal(t, errForcedDrop, mp.ConsumeMetrics(ctx, md)) + assert.Equal(t, errDataRefused, mp.ConsumeMetrics(ctx, md)) } @@ -181,7 +181,7 @@ func TestTraceMemoryPressureResponse(t *testing.T) { usageChecker: memUsageChecker{ memAllocLimit: 1024, }, - forceDrop: &atomic.Bool{}, + mustRefuse: &atomic.Bool{}, readMemStatsFn: func(ms *runtime.MemStats) { ms.Alloc = currentMemAlloc }, @@ -209,7 +209,7 @@ func TestTraceMemoryPressureResponse(t *testing.T) { // Above memAllocLimit. currentMemAlloc = 1800 ml.checkMemLimits() - assert.Equal(t, errForcedDrop, tp.ConsumeTraces(ctx, td)) + assert.Equal(t, errDataRefused, tp.ConsumeTraces(ctx, td)) // Check ballast effect ml.ballastSize = 1000 @@ -222,7 +222,7 @@ func TestTraceMemoryPressureResponse(t *testing.T) { // Above memAllocLimit even accountiing for ballast. currentMemAlloc = 1800 + ml.ballastSize ml.checkMemLimits() - assert.Equal(t, errForcedDrop, tp.ConsumeTraces(ctx, td)) + assert.Equal(t, errDataRefused, tp.ConsumeTraces(ctx, td)) // Restore ballast to default. ml.ballastSize = 0 @@ -238,7 +238,7 @@ func TestTraceMemoryPressureResponse(t *testing.T) { // Above memSpikeLimit. currentMemAlloc = 550 ml.checkMemLimits() - assert.Equal(t, errForcedDrop, tp.ConsumeTraces(ctx, td)) + assert.Equal(t, errDataRefused, tp.ConsumeTraces(ctx, td)) } @@ -250,7 +250,7 @@ func TestLogMemoryPressureResponse(t *testing.T) { usageChecker: memUsageChecker{ memAllocLimit: 1024, }, - forceDrop: &atomic.Bool{}, + mustRefuse: &atomic.Bool{}, readMemStatsFn: func(ms *runtime.MemStats) { ms.Alloc = currentMemAlloc }, @@ -278,7 +278,7 @@ func TestLogMemoryPressureResponse(t *testing.T) { // Above memAllocLimit. currentMemAlloc = 1800 ml.checkMemLimits() - assert.Equal(t, errForcedDrop, lp.ConsumeLogs(ctx, ld)) + assert.Equal(t, errDataRefused, lp.ConsumeLogs(ctx, ld)) // Check ballast effect ml.ballastSize = 1000 @@ -291,7 +291,7 @@ func TestLogMemoryPressureResponse(t *testing.T) { // Above memAllocLimit even accountiing for ballast. currentMemAlloc = 1800 + ml.ballastSize ml.checkMemLimits() - assert.Equal(t, errForcedDrop, lp.ConsumeLogs(ctx, ld)) + assert.Equal(t, errDataRefused, lp.ConsumeLogs(ctx, ld)) // Restore ballast to default. ml.ballastSize = 0 @@ -307,7 +307,7 @@ func TestLogMemoryPressureResponse(t *testing.T) { // Above memSpikeLimit. currentMemAlloc = 550 ml.checkMemLimits() - assert.Equal(t, errForcedDrop, lp.ConsumeLogs(ctx, ld)) + assert.Equal(t, errDataRefused, lp.ConsumeLogs(ctx, ld)) } func TestGetDecision(t *testing.T) { @@ -349,7 +349,7 @@ func TestGetDecision(t *testing.T) { }) } -func TestDropDecision(t *testing.T) { +func TestRefuseDecision(t *testing.T) { decison1000Limit30Spike30, err := newPercentageMemUsageChecker(1000, 60, 30) require.NoError(t, err) decison1000Limit60Spike50, err := newPercentageMemUsageChecker(1000, 60, 50) @@ -364,46 +364,46 @@ func TestDropDecision(t *testing.T) { name string usageChecker memUsageChecker ms *runtime.MemStats - shouldDrop bool + shouldRefuse bool }{ { - name: "should drop over limit", + name: "should refuse over limit", usageChecker: *decison1000Limit30Spike30, ms: &runtime.MemStats{Alloc: 600}, - shouldDrop: true, + shouldRefuse: true, }, { - name: "should not drop", + name: "should not refuse", usageChecker: *decison1000Limit30Spike30, ms: &runtime.MemStats{Alloc: 100}, - shouldDrop: false, + shouldRefuse: false, }, { - name: "should not drop spike, fixed usageChecker", + name: "should not refuse spike, fixed usageChecker", usageChecker: memUsageChecker{ memAllocLimit: 600, memSpikeLimit: 500, }, - ms: &runtime.MemStats{Alloc: 300}, - shouldDrop: true, + ms: &runtime.MemStats{Alloc: 300}, + shouldRefuse: true, }, { - name: "should drop, spike, percentage usageChecker", + name: "should refuse, spike, percentage usageChecker", usageChecker: *decison1000Limit60Spike50, ms: &runtime.MemStats{Alloc: 300}, - shouldDrop: true, + shouldRefuse: true, }, { - name: "should drop, spike, percentage usageChecker", + name: "should refuse, spike, percentage usageChecker", usageChecker: *decison1000Limit40Spike20, ms: &runtime.MemStats{Alloc: 250}, - shouldDrop: true, + shouldRefuse: true, }, } for _, test := range tests { t.Run(test.name, func(t *testing.T) { - shouldDrop := test.usageChecker.aboveSoftLimit(test.ms) - assert.Equal(t, test.shouldDrop, shouldDrop) + shouldRefuse := test.usageChecker.aboveSoftLimit(test.ms) + assert.Equal(t, test.shouldRefuse, shouldRefuse) }) } }