From 01d2fbeee46e374d6f141d99493eb0d88abb6cab Mon Sep 17 00:00:00 2001 From: Tigran Najaryan Date: Thu, 3 Dec 2020 13:12:10 -0500 Subject: [PATCH] Introduce soft and hard limits for memory limiter Contributes to https://github.com/open-telemetry/opentelemetry-collector/issues/1121 Memory limiter processor previously had only one limit. When exceeding this limit it was previously continuously calling GC. This resulted in huge CPU consumption if the check interval was small and this was forcing to use large check intervals. This in turn was resulting in lethargic response to growing memory usage and the memory limiter was not very effective in situations when memory usage were growing rapidly (e.g. when there was a big spike or when the backend was down). I changed the logic of memory limiter to be based on 2 thresholds: soft and hard. While below soft threshold the memory limiter is fully disabled. Between soft and hard limiter the limiter begins dropping incoming data but does not perform GC. Only when exceed the hard limit we perform GC. The net result is that the actually used memory is limited at the level set by soft limit and fluctuates between soft and hard limit as the garbage is created and collected. Correspondingly GC runs much more infrequently, only when the hard limit is reached and such GC immediately collects significant amount of garbage (reduces memory usage close to soft limit) and thus does not require subsequent GC calls for quite some time. I did some performance tests with the old and new approaches with 4000 Mib limit, 100,000 spans per second and with exporter completely blocked (no backend). With the old approach an interval of 100 ms causes about 450% of CPU usage once the memory limit is hit (while below limit the CPU usage is around 50%). Here is an extract of performance test output showing the moment when the limiter is hit: ``` 2020/12/03 20:20:47 Agent RAM (RES):3296 MiB, CPU:44.4% | Sent: 7022700 items | Received: 0 items (0/sec) 2020/12/03 20:20:50 Agent RAM (RES):3481 MiB, CPU:43.0% | Sent: 7322500 items | Received: 0 items (0/sec) 2020/12/03 20:20:53 Agent RAM (RES):3681 MiB, CPU:41.6% | Sent: 7614100 items | Received: 0 items (0/sec) 2020/12/03 20:20:56 Agent RAM (RES):3703 MiB, CPU:47.7% | Sent: 7863600 items | Received: 0 items (0/sec) 2020/12/03 20:20:59 Agent RAM (RES):3028 MiB, CPU:47.0% | Sent: 8062700 items | Received: 0 items (0/sec) 2020/12/03 20:21:02 Agent RAM (RES):3644 MiB, CPU:246.9% | Sent: 8331600 items | Received: 0 items (0/sec) <-- likely a regular GC, not at limit yet 2020/12/03 20:21:05 Agent RAM (RES):3555 MiB, CPU:72.8% | Sent: 8620500 items | Received: 0 items (0/sec) 2020/12/03 20:21:08 Agent RAM (RES):3717 MiB, CPU:57.5% | Sent: 8895500 items | Received: 0 items (0/sec) 2020/12/03 20:21:11 Agent RAM (RES):3877 MiB, CPU:126.9% | Sent: 9172900 items | Received: 0 items (0/sec) <-- hit limit 2020/12/03 20:21:14 Agent RAM (RES):3900 MiB, CPU:127.6% | Sent: 9461100 items | Received: 0 items (0/sec) 2020/12/03 20:21:17 Agent RAM (RES):3918 MiB, CPU:201.7% | Sent: 9728900 items | Received: 0 items (0/sec) 2020/12/03 20:21:20 Agent RAM (RES):3938 MiB, CPU:326.0% | Sent: 9994700 items | Received: 0 items (0/sec) 2020/12/03 20:21:23 Agent RAM (RES):3951 MiB, CPU:470.8% | Sent: 10253200 items | Received: 0 items (0/sec) 2020/12/03 20:21:26 Agent RAM (RES):3955 MiB, CPU:440.0% | Sent: 10504400 items | Received: 0 items (0/sec) 2020/12/03 20:21:29 Agent RAM (RES):3961 MiB, CPU:451.0% | Sent: 10766200 items | Received: 0 items (0/sec) 2020/12/03 20:21:32 Agent RAM (RES):3965 MiB, CPU:465.8% | Sent: 11008400 items | Received: 0 items (0/sec) 2020/12/03 20:21:35 Agent RAM (RES):3974 MiB, CPU:423.6% | Sent: 11272700 items | Received: 0 items (0/sec) ``` Even the interval of 1 second was unusable with the old approach and we had to choose a longer interval to avoid performance degradation. With the new approach under the exact same conditions when using 100ms check interval the CPU usage is 50% when below memory limits and when the hard memory limits are hit the CPU usage increases to 68%. With 1 second check interval there is no measurable increase in CPU usage when memory limiter is hit (unlike 9x CPU increase with the old approach). Here is an extract of performance test output showing the moment when the limiter is hit: ``` 2020/12/03 20:28:35 Agent RAM (RES):1888 MiB, CPU:48.2% | Sent: 3796400 items | Received: 0 items (0/sec) 2020/12/03 20:28:38 Agent RAM (RES):2029 MiB, CPU:47.1% | Sent: 4088600 items | Received: 0 items (0/sec) 2020/12/03 20:28:41 Agent RAM (RES):2197 MiB, CPU:48.3% | Sent: 4388200 items | Received: 0 items (0/sec) 2020/12/03 20:28:44 Agent RAM (RES):2370 MiB, CPU:45.7% | Sent: 4679900 items | Received: 0 items (0/sec) 2020/12/03 20:28:47 Agent RAM (RES):2558 MiB, CPU:49.0% | Sent: 4972200 items | Received: 0 items (0/sec) 2020/12/03 20:28:50 Agent RAM (RES):2771 MiB, CPU:47.4% | Sent: 5260700 items | Received: 0 items (0/sec) 2020/12/03 20:28:53 Agent RAM (RES):2921 MiB, CPU:133.3% | Sent: 5547500 items | Received: 0 items (0/sec) 2020/12/03 20:28:56 Agent RAM (RES):2922 MiB, CPU:50.1% | Sent: 5846700 items | Received: 0 items (0/sec) 2020/12/03 20:28:59 Agent RAM (RES):2957 MiB, CPU:43.6% | Sent: 6131700 items | Received: 0 items (0/sec) 2020/12/03 20:29:02 Agent RAM (RES):3144 MiB, CPU:50.0% | Sent: 6419400 items | Received: 0 items (0/sec) 2020/12/03 20:29:05 Agent RAM (RES):3328 MiB, CPU:49.0% | Sent: 6719100 items | Received: 0 items (0/sec) 2020/12/03 20:29:08 Agent RAM (RES):3488 MiB, CPU:38.6% | Sent: 7007300 items | Received: 0 items (0/sec) 2020/12/03 20:29:11 Agent RAM (RES):3667 MiB, CPU:42.0% | Sent: 7306700 items | Received: 0 items (0/sec) 2020/12/03 20:29:14 Agent RAM (RES):3813 MiB, CPU:37.4% | Sent: 7577700 items | Received: 0 items (0/sec) 2020/12/03 20:29:17 Agent RAM (RES):3802 MiB, CPU:170.9% | Sent: 7860100 items | Received: 0 items (0/sec) <-- hit hard limit 2020/12/03 20:29:20 Agent RAM (RES):3882 MiB, CPU:68.1% | Sent: 8160000 items | Received: 0 items (0/sec) 2020/12/03 20:29:23 Agent RAM (RES):4007 MiB, CPU:42.3% | Sent: 8447900 items | Received: 0 items (0/sec) 2020/12/03 20:29:26 Agent RAM (RES):4007 MiB, CPU:39.3% | Sent: 8747800 items | Received: 0 items (0/sec) 2020/12/03 20:29:29 Agent RAM (RES):4008 MiB, CPU:34.3% | Sent: 9038400 items | Received: 0 items (0/sec) 2020/12/03 20:29:32 Agent RAM (RES):4009 MiB, CPU:39.9% | Sent: 9317200 items | Received: 0 items (0/sec) ``` This is a dramatically better picture compared to the old approach. With 1 second interval memory limiter's impact on CPU is not measurable with the new approach, whereas with the old approach it was still showing several times higher CPU when limit was hit. This makes small check intervals practically useful and allows to effectively suppress incoming surges of data. --- processor/memorylimiter/README.md | 49 +++++--- processor/memorylimiter/memorylimiter.go | 98 +++++++++++---- processor/memorylimiter/memorylimiter_test.go | 119 +++++++++--------- testbed/tests/trace_test.go | 38 ++---- 4 files changed, 180 insertions(+), 124 deletions(-) diff --git a/processor/memorylimiter/README.md b/processor/memorylimiter/README.md index 1e649a770e3..7dd1bdc6ce5 100644 --- a/processor/memorylimiter/README.md +++ b/processor/memorylimiter/README.md @@ -6,13 +6,29 @@ The memory limiter processor is used to prevent out of memory situations on the collector. Given that the amount and type of data a collector processes is environment specific and resource utilization of the collector is also dependent on the configured processors, it is important to put checks in place regarding -memory usage. The memory_limiter processor offers the follow safeguards: +memory usage. The memory_limiter processor offers the following safeguard: - Ability to define an interval when memory usage will be checked and if memory -usage exceeds a defined limit will trigger GC to reduce memory consumption. -- Ability to define an interval when memory usage will be compared against the -previous interval's value and if the delta exceeds a defined limit will trigger -GC to reduce memory consumption. +usage exceeds defined limits will begin dropping data and forcing GC to reduce +memory consumption. + +The memory_limiter uses soft and hard memory limits. Hard limit is set higher than +the soft limit. + +When memory usage exceeds the soft limit memory_limiter will start dropping the data and +return errors to the preceding component it in the pipeline (which should be normally a +receiver). + +When memory usage exceeds the hard limit memory_limiter will start to forcedly perform +garbage collection in order to try to free memory. + +When the memory usage drop below the soft limit normal operation will resumed (data +will not longer be dropped). + +The difference between the soft limit and hard limits is defined via `spike_limit_mib` +configuration option. The value should be selected in a way that ensures that between +the memory check intervals memory usage cannot increase by more than this value (otherwise +memory usage may exceed the hard limit - even if temporarily). In addition, there is a command line option (`mem-ballast-size-mib`) which can be used to define a ballast, which allocates memory and provides stability to the @@ -23,8 +39,8 @@ command line must also be defined in the memory_limiter processor. Note that while these configuration options can help mitigate out of memory situations, they are not a replacement for properly sizing and configuring the -collector. For example, if the limit or spike thresholds are crossed, the collector -will return errors to all receive operations until enough memory is freed. This may +collector. For example, if the soft limit is crossed, the collector will +return errors to all receive operations until enough memory is freed. This will result in dropped data. It is highly recommended to configure the ballast command line option as well as the @@ -39,13 +55,16 @@ Please refer to [config.go](./config.go) for the config spec. The following configuration options **must be changed**: - `check_interval` (default = 0s): Time between measurements of memory -usage. Values below 1 second are not recommended since it can result in -unnecessary CPU consumption. +usage. The recommended value is 1 second. +If the expected traffic to the Collector is very spiky then decrease the `check_interval` +or increase `spike_limit_mib` to avoid memory usage going over the hard limit. - `limit_mib` (default = 0): Maximum amount of memory, in MiB, targeted to be allocated by the process heap. Note that typically the total memory usage of -process will be about 50MiB higher than this value. -- `spike_limit_mib` (default = 0): Maximum spike expected between the -measurements of memory usage. The value must be less than `limit_mib`. +process will be about 50MiB higher than this value. This defines the hard limit. +- `spike_limit_mib` (default = 20% of `limit_mib`): Maximum spike expected between the +measurements of memory usage. The value must be less than `limit_mib`. The soft limit +value will be equal to (limit_mib - spike_limit_mib). +The recommended value for `spike_limit_mib` is about 20% `limit_mib`. - `limit_percentage` (default = 0): Maximum amount of total memory targeted to be allocated by the process heap. This configuration is supported on Linux systems with cgroups and it's intended to be used in dynamic platforms like docker. @@ -69,16 +88,16 @@ Examples: processors: memory_limiter: ballast_size_mib: 2000 - check_interval: 5s + check_interval: 1s limit_mib: 4000 - spike_limit_mib: 500 + spike_limit_mib: 800 ``` ```yaml processors: memory_limiter: ballast_size_mib: 2000 - check_interval: 5s + check_interval: 1s limit_percentage: 50 spike_limit_percentage: 30 ``` diff --git a/processor/memorylimiter/memorylimiter.go b/processor/memorylimiter/memorylimiter.go index fcba9833b4e..68f8d04a3ef 100644 --- a/processor/memorylimiter/memorylimiter.go +++ b/processor/memorylimiter/memorylimiter.go @@ -61,7 +61,7 @@ var ( var getMemoryFn = iruntime.TotalMemory type memoryLimiter struct { - decision dropDecision + usageChecker memUsageChecker memCheckWait time.Duration ballastSize uint64 @@ -94,18 +94,18 @@ func newMemoryLimiter(logger *zap.Logger, cfg *Config) (*memoryLimiter, error) { return nil, errLimitOutOfRange } - decision, err := getDecision(cfg, logger) + usageChecker, err := getMemUsageChecker(cfg, logger) if err != nil { return nil, err } logger.Info("Memory limiter configured", - zap.Uint64("limit_mib", decision.memAllocLimit), - zap.Uint64("spike_limit_mib", decision.memSpikeLimit), + zap.Uint64("limit_mib", usageChecker.memAllocLimit), + zap.Uint64("spike_limit_mib", usageChecker.memSpikeLimit), zap.Duration("check_interval", cfg.CheckInterval)) ml := &memoryLimiter{ - decision: *decision, + usageChecker: *usageChecker, memCheckWait: cfg.CheckInterval, ballastSize: ballastSize, ticker: time.NewTicker(cfg.CheckInterval), @@ -120,11 +120,11 @@ func newMemoryLimiter(logger *zap.Logger, cfg *Config) (*memoryLimiter, error) { return ml, nil } -func getDecision(cfg *Config, logger *zap.Logger) (*dropDecision, error) { +func getMemUsageChecker(cfg *Config, logger *zap.Logger) (*memUsageChecker, error) { memAllocLimit := uint64(cfg.MemoryLimitMiB) * mibBytes memSpikeLimit := uint64(cfg.MemorySpikeLimitMiB) * mibBytes if cfg.MemoryLimitMiB != 0 { - return newFixedDecision(memAllocLimit, memSpikeLimit) + return newFixedMemUsageChecker(memAllocLimit, memSpikeLimit) } totalMemory, err := getMemoryFn() if err != nil { @@ -134,7 +134,7 @@ func getDecision(cfg *Config, logger *zap.Logger) (*dropDecision, error) { zap.Int64("total_memory", totalMemory), zap.Uint32("limit_percentage", cfg.MemoryLimitPercentage), zap.Uint32("spike_limit_percentage", cfg.MemorySpikePercentage)) - return newPercentageDecision(totalMemory, int64(cfg.MemoryLimitPercentage), int64(cfg.MemorySpikePercentage)) + return newPercentageMemUsageChecker(totalMemory, int64(cfg.MemoryLimitPercentage), int64(cfg.MemorySpikePercentage)) } func (ml *memoryLimiter) shutdown(context.Context) error { @@ -220,6 +220,7 @@ func (ml *memoryLimiter) readMemStats() *runtime.MemStats { ml.logger.Warn(typeStr + " is likely incorrectly configured. " + ballastSizeMibKey + " must be set equal to --mem-ballast-size-mib command line option.") } + return ms } @@ -228,7 +229,7 @@ func (ml *memoryLimiter) readMemStats() *runtime.MemStats { func (ml *memoryLimiter) startMonitoring() { go func() { for range ml.ticker.C { - ml.memCheck() + ml.checkMemLimits() } }() } @@ -238,44 +239,89 @@ func (ml *memoryLimiter) forcingDrop() bool { return atomic.LoadInt64(&ml.forceDrop) != 0 } -func (ml *memoryLimiter) memCheck() { - ms := ml.readMemStats() - ml.memLimiting(ms) +func (ml *memoryLimiter) setForcingDrop(b bool) { + var i int64 + if b { + i = 1 + } else { + i = 0 + } + atomic.StoreInt64(&ml.forceDrop, i) } -func (ml *memoryLimiter) memLimiting(ms *runtime.MemStats) { - if !ml.decision.shouldDrop(ms) { - atomic.StoreInt64(&ml.forceDrop, 0) - } else { - atomic.StoreInt64(&ml.forceDrop, 1) - // Force a GC at this point and see if this is enough to get to - // the desired level. +func bytesToMib(ms *runtime.MemStats) zap.Field { + return zap.Uint64("cur_mem_mib", ms.Alloc/1024/1024) +} + +func (ml *memoryLimiter) checkMemLimits() { + ms := ml.readMemStats() + + ml.logger.Debug("Currently used memory.", bytesToMib(ms)) + + if ml.usageChecker.aboveHardLimit(ms) { + // Memory usage is above hard limit. Force GC. + ml.logger.Warn("Memory usage is above hard limit, force GC.", bytesToMib(ms)) runtime.GC() + ms = ml.readMemStats() + ml.logger.Info("Memory usage after GC.", bytesToMib(ms)) + } + + if ml.forcingDrop() { + // Currently forcing drop due to memory limit. See if we still need to do it. + + if ml.usageChecker.aboveSoftLimit(ms) { + // Continue dropping, we are still above soft limit. + return + } + + // Enough memory is available now, no need to limit. + ml.logger.Info("Memory usage back within limits, resuming normal operation.", bytesToMib(ms)) + ml.setForcingDrop(false) + + } else { + // Currently not forcing drop. Check if we are still below soft memory limit. + + if !ml.usageChecker.aboveSoftLimit(ms) { + // We are still below limits, nothing to do. + return + } + + // We are above soft limit, so start dropping. + ml.setForcingDrop(true) + ml.logger.Warn("Memory usage is above soft limit, begin dropping data.", bytesToMib(ms)) } } -type dropDecision struct { +type memUsageChecker struct { memAllocLimit uint64 memSpikeLimit uint64 } -func (d dropDecision) shouldDrop(ms *runtime.MemStats) bool { - return d.memAllocLimit <= ms.Alloc || d.memAllocLimit-ms.Alloc <= d.memSpikeLimit +func (d memUsageChecker) aboveSoftLimit(ms *runtime.MemStats) bool { + return ms.Alloc >= d.memAllocLimit-d.memSpikeLimit } -func newFixedDecision(memAllocLimit, memSpikeLimit uint64) (*dropDecision, error) { +func (d memUsageChecker) aboveHardLimit(ms *runtime.MemStats) bool { + return ms.Alloc >= d.memAllocLimit +} + +func newFixedMemUsageChecker(memAllocLimit, memSpikeLimit uint64) (*memUsageChecker, error) { if memSpikeLimit >= memAllocLimit { return nil, errMemSpikeLimitOutOfRange } - return &dropDecision{ + if memSpikeLimit == 0 { + // If spike limit is unspecified use 20% of mem limit. + memSpikeLimit = memAllocLimit / 5 + } + return &memUsageChecker{ memAllocLimit: memAllocLimit, memSpikeLimit: memSpikeLimit, }, nil } -func newPercentageDecision(totalMemory int64, percentageLimit, percentageSpike int64) (*dropDecision, error) { +func newPercentageMemUsageChecker(totalMemory int64, percentageLimit, percentageSpike int64) (*memUsageChecker, error) { if percentageLimit > 100 || percentageLimit <= 0 || percentageSpike > 100 || percentageSpike <= 0 { return nil, errPercentageLimitOutOfRange } - return newFixedDecision(uint64(percentageLimit*totalMemory)/100, uint64(percentageSpike*totalMemory)/100) + return newFixedMemUsageChecker(uint64(percentageLimit*totalMemory)/100, uint64(percentageSpike*totalMemory)/100) } diff --git a/processor/memorylimiter/memorylimiter_test.go b/processor/memorylimiter/memorylimiter_test.go index 244f4e9f67d..396038e5f9d 100644 --- a/processor/memorylimiter/memorylimiter_test.go +++ b/processor/memorylimiter/memorylimiter_test.go @@ -104,13 +104,14 @@ func TestNew(t *testing.T) { func TestMetricsMemoryPressureResponse(t *testing.T) { var currentMemAlloc uint64 ml := &memoryLimiter{ - decision: dropDecision{ + usageChecker: memUsageChecker{ memAllocLimit: 1024, }, readMemStatsFn: func(ms *runtime.MemStats) { ms.Alloc = currentMemAlloc }, obsrep: obsreport.NewProcessorObsReport(configtelemetry.LevelNone, ""), + logger: zap.NewNop(), } mp, err := processorhelper.NewMetricsProcessor( &Config{ @@ -130,12 +131,12 @@ func TestMetricsMemoryPressureResponse(t *testing.T) { // Below memAllocLimit. currentMemAlloc = 800 - ml.memCheck() + ml.checkMemLimits() assert.NoError(t, mp.ConsumeMetrics(ctx, md)) // Above memAllocLimit. currentMemAlloc = 1800 - ml.memCheck() + ml.checkMemLimits() assert.Equal(t, errForcedDrop, mp.ConsumeMetrics(ctx, md)) // Check ballast effect @@ -143,28 +144,28 @@ func TestMetricsMemoryPressureResponse(t *testing.T) { // Below memAllocLimit accounting for ballast. currentMemAlloc = 800 + ml.ballastSize - ml.memCheck() + ml.checkMemLimits() assert.NoError(t, mp.ConsumeMetrics(ctx, md)) // Above memAllocLimit even accountiing for ballast. currentMemAlloc = 1800 + ml.ballastSize - ml.memCheck() + ml.checkMemLimits() assert.Equal(t, errForcedDrop, mp.ConsumeMetrics(ctx, md)) // Restore ballast to default. ml.ballastSize = 0 // Check spike limit - ml.decision.memSpikeLimit = 512 + ml.usageChecker.memSpikeLimit = 512 // Below memSpikeLimit. currentMemAlloc = 500 - ml.memCheck() + ml.checkMemLimits() assert.NoError(t, mp.ConsumeMetrics(ctx, md)) // Above memSpikeLimit. currentMemAlloc = 550 - ml.memCheck() + ml.checkMemLimits() assert.Equal(t, errForcedDrop, mp.ConsumeMetrics(ctx, md)) } @@ -174,13 +175,14 @@ func TestMetricsMemoryPressureResponse(t *testing.T) { func TestTraceMemoryPressureResponse(t *testing.T) { var currentMemAlloc uint64 ml := &memoryLimiter{ - decision: dropDecision{ + usageChecker: memUsageChecker{ memAllocLimit: 1024, }, readMemStatsFn: func(ms *runtime.MemStats) { ms.Alloc = currentMemAlloc }, obsrep: obsreport.NewProcessorObsReport(configtelemetry.LevelNone, ""), + logger: zap.NewNop(), } tp, err := processorhelper.NewTraceProcessor( &Config{ @@ -200,12 +202,12 @@ func TestTraceMemoryPressureResponse(t *testing.T) { // Below memAllocLimit. currentMemAlloc = 800 - ml.memCheck() + ml.checkMemLimits() assert.NoError(t, tp.ConsumeTraces(ctx, td)) // Above memAllocLimit. currentMemAlloc = 1800 - ml.memCheck() + ml.checkMemLimits() assert.Equal(t, errForcedDrop, tp.ConsumeTraces(ctx, td)) // Check ballast effect @@ -213,28 +215,28 @@ func TestTraceMemoryPressureResponse(t *testing.T) { // Below memAllocLimit accounting for ballast. currentMemAlloc = 800 + ml.ballastSize - ml.memCheck() + ml.checkMemLimits() assert.NoError(t, tp.ConsumeTraces(ctx, td)) // Above memAllocLimit even accountiing for ballast. currentMemAlloc = 1800 + ml.ballastSize - ml.memCheck() + ml.checkMemLimits() assert.Equal(t, errForcedDrop, tp.ConsumeTraces(ctx, td)) // Restore ballast to default. ml.ballastSize = 0 // Check spike limit - ml.decision.memSpikeLimit = 512 + ml.usageChecker.memSpikeLimit = 512 // Below memSpikeLimit. currentMemAlloc = 500 - ml.memCheck() + ml.checkMemLimits() assert.NoError(t, tp.ConsumeTraces(ctx, td)) // Above memSpikeLimit. currentMemAlloc = 550 - ml.memCheck() + ml.checkMemLimits() assert.Equal(t, errForcedDrop, tp.ConsumeTraces(ctx, td)) } @@ -244,13 +246,14 @@ func TestTraceMemoryPressureResponse(t *testing.T) { func TestLogMemoryPressureResponse(t *testing.T) { var currentMemAlloc uint64 ml := &memoryLimiter{ - decision: dropDecision{ + usageChecker: memUsageChecker{ memAllocLimit: 1024, }, readMemStatsFn: func(ms *runtime.MemStats) { ms.Alloc = currentMemAlloc }, obsrep: obsreport.NewProcessorObsReport(configtelemetry.LevelNone, ""), + logger: zap.NewNop(), } lp, err := processorhelper.NewLogsProcessor( &Config{ @@ -270,12 +273,12 @@ func TestLogMemoryPressureResponse(t *testing.T) { // Below memAllocLimit. currentMemAlloc = 800 - ml.memCheck() + ml.checkMemLimits() assert.NoError(t, lp.ConsumeLogs(ctx, ld)) // Above memAllocLimit. currentMemAlloc = 1800 - ml.memCheck() + ml.checkMemLimits() assert.Equal(t, errForcedDrop, lp.ConsumeLogs(ctx, ld)) // Check ballast effect @@ -283,42 +286,42 @@ func TestLogMemoryPressureResponse(t *testing.T) { // Below memAllocLimit accounting for ballast. currentMemAlloc = 800 + ml.ballastSize - ml.memCheck() + ml.checkMemLimits() assert.NoError(t, lp.ConsumeLogs(ctx, ld)) // Above memAllocLimit even accountiing for ballast. currentMemAlloc = 1800 + ml.ballastSize - ml.memCheck() + ml.checkMemLimits() assert.Equal(t, errForcedDrop, lp.ConsumeLogs(ctx, ld)) // Restore ballast to default. ml.ballastSize = 0 // Check spike limit - ml.decision.memSpikeLimit = 512 + ml.usageChecker.memSpikeLimit = 512 // Below memSpikeLimit. currentMemAlloc = 500 - ml.memCheck() + ml.checkMemLimits() assert.NoError(t, lp.ConsumeLogs(ctx, ld)) // Above memSpikeLimit. currentMemAlloc = 550 - ml.memCheck() + ml.checkMemLimits() assert.Equal(t, errForcedDrop, lp.ConsumeLogs(ctx, ld)) } func TestGetDecision(t *testing.T) { t.Run("fixed_limit", func(t *testing.T) { - d, err := getDecision(&Config{MemoryLimitMiB: 100, MemorySpikeLimitMiB: 20}, zap.NewNop()) + d, err := getMemUsageChecker(&Config{MemoryLimitMiB: 100, MemorySpikeLimitMiB: 20}, zap.NewNop()) require.NoError(t, err) - assert.Equal(t, &dropDecision{ + assert.Equal(t, &memUsageChecker{ memAllocLimit: 100 * mibBytes, memSpikeLimit: 20 * mibBytes, }, d) }) t.Run("fixed_limit_error", func(t *testing.T) { - d, err := getDecision(&Config{MemoryLimitMiB: 20, MemorySpikeLimitMiB: 100}, zap.NewNop()) + d, err := getMemUsageChecker(&Config{MemoryLimitMiB: 20, MemorySpikeLimitMiB: 100}, zap.NewNop()) require.Error(t, err) assert.Nil(t, d) }) @@ -330,55 +333,55 @@ func TestGetDecision(t *testing.T) { return 100 * mibBytes, nil } t.Run("percentage_limit", func(t *testing.T) { - d, err := getDecision(&Config{MemoryLimitPercentage: 50, MemorySpikePercentage: 10}, zap.NewNop()) + d, err := getMemUsageChecker(&Config{MemoryLimitPercentage: 50, MemorySpikePercentage: 10}, zap.NewNop()) require.NoError(t, err) - assert.Equal(t, &dropDecision{ + assert.Equal(t, &memUsageChecker{ memAllocLimit: 50 * mibBytes, memSpikeLimit: 10 * mibBytes, }, d) }) t.Run("percentage_limit_error", func(t *testing.T) { - d, err := getDecision(&Config{MemoryLimitPercentage: 101, MemorySpikePercentage: 10}, zap.NewNop()) + d, err := getMemUsageChecker(&Config{MemoryLimitPercentage: 101, MemorySpikePercentage: 10}, zap.NewNop()) require.Error(t, err) assert.Nil(t, d) - d, err = getDecision(&Config{MemoryLimitPercentage: 99, MemorySpikePercentage: 101}, zap.NewNop()) + d, err = getMemUsageChecker(&Config{MemoryLimitPercentage: 99, MemorySpikePercentage: 101}, zap.NewNop()) require.Error(t, err) assert.Nil(t, d) }) } func TestDropDecision(t *testing.T) { - decison1000Limit30Spike30, err := newPercentageDecision(1000, 60, 30) + decison1000Limit30Spike30, err := newPercentageMemUsageChecker(1000, 60, 30) require.NoError(t, err) - decison1000Limit60Spike50, err := newPercentageDecision(1000, 60, 50) + decison1000Limit60Spike50, err := newPercentageMemUsageChecker(1000, 60, 50) require.NoError(t, err) - decison1000Limit40Spike20, err := newPercentageDecision(1000, 40, 20) + decison1000Limit40Spike20, err := newPercentageMemUsageChecker(1000, 40, 20) require.NoError(t, err) - decison1000Limit40Spike60, err := newPercentageDecision(1000, 40, 60) + decison1000Limit40Spike60, err := newPercentageMemUsageChecker(1000, 40, 60) require.Error(t, err) assert.Nil(t, decison1000Limit40Spike60) tests := []struct { - name string - decision dropDecision - ms *runtime.MemStats - shouldDrop bool + name string + usageChecker memUsageChecker + ms *runtime.MemStats + shouldDrop bool }{ { - name: "should drop over limit", - decision: *decison1000Limit30Spike30, - ms: &runtime.MemStats{Alloc: 600}, - shouldDrop: true, + name: "should drop over limit", + usageChecker: *decison1000Limit30Spike30, + ms: &runtime.MemStats{Alloc: 600}, + shouldDrop: true, }, { - name: "should not drop", - decision: *decison1000Limit30Spike30, - ms: &runtime.MemStats{Alloc: 100}, - shouldDrop: false, + name: "should not drop", + usageChecker: *decison1000Limit30Spike30, + ms: &runtime.MemStats{Alloc: 100}, + shouldDrop: false, }, { - name: "should not drop spike, fixed decision", - decision: dropDecision{ + name: "should not drop spike, fixed usageChecker", + usageChecker: memUsageChecker{ memAllocLimit: 600, memSpikeLimit: 500, }, @@ -386,21 +389,21 @@ func TestDropDecision(t *testing.T) { shouldDrop: true, }, { - name: "should drop, spike, percentage decision", - decision: *decison1000Limit60Spike50, - ms: &runtime.MemStats{Alloc: 300}, - shouldDrop: true, + name: "should drop, spike, percentage usageChecker", + usageChecker: *decison1000Limit60Spike50, + ms: &runtime.MemStats{Alloc: 300}, + shouldDrop: true, }, { - name: "should drop, spike, percentage decision", - decision: *decison1000Limit40Spike20, - ms: &runtime.MemStats{Alloc: 250}, - shouldDrop: true, + name: "should drop, spike, percentage usageChecker", + usageChecker: *decison1000Limit40Spike20, + ms: &runtime.MemStats{Alloc: 250}, + shouldDrop: true, }, } for _, test := range tests { t.Run(test.name, func(t *testing.T) { - shouldDrop := test.decision.shouldDrop(test.ms) + shouldDrop := test.usageChecker.aboveSoftLimit(test.ms) assert.Equal(t, test.shouldDrop, shouldDrop) }) } diff --git a/testbed/tests/trace_test.go b/testbed/tests/trace_test.go index 7424d78dd3e..738d9abaf12 100644 --- a/testbed/tests/trace_test.go +++ b/testbed/tests/trace_test.go @@ -120,8 +120,8 @@ func TestTraceNoBackend10kSPS(t *testing.T) { limitProcessors := map[string]string{ "memory_limiter": ` memory_limiter: - check_interval: 1s - limit_mib: 10 + check_interval: 100ms + limit_mib: 20 `, } @@ -131,14 +131,14 @@ func TestTraceNoBackend10kSPS(t *testing.T) { { Name: "NoMemoryLimit", Processor: noLimitProcessors, - ExpectedMaxRAM: 200, - ExpectedMinFinalRAM: 30, + ExpectedMaxRAM: 150, + ExpectedMinFinalRAM: 100, }, { Name: "MemoryLimit", Processor: limitProcessors, - ExpectedMaxRAM: 60, - ExpectedMinFinalRAM: 10, + ExpectedMaxRAM: 70, + ExpectedMinFinalRAM: 40, }, } @@ -146,27 +146,12 @@ func TestTraceNoBackend10kSPS(t *testing.T) { name string sender testbed.DataSender receiver testbed.DataReceiver - resourceSpec testbed.ResourceSpec configuration []processorConfig }{ { - "JaegerGRPC", - testbed.NewJaegerGRPCDataSender(testbed.DefaultHost, testbed.DefaultJaegerPort), - testbed.NewOCDataReceiver(testbed.DefaultOCPort), - testbed.ResourceSpec{ - ExpectedMaxCPU: 70, - ExpectedMaxRAM: 198, - }, - processorsConfig, - }, - { - "Zipkin", - testbed.NewZipkinDataSender(testbed.DefaultHost, testbed.DefaultZipkinAddressPort), - testbed.NewOCDataReceiver(testbed.DefaultOCPort), - testbed.ResourceSpec{ - ExpectedMaxCPU: 120, - ExpectedMaxRAM: 198, - }, + "OTLP", + testbed.NewOTLPTraceDataSender(testbed.DefaultHost, testbed.GetAvailablePort(t)), + testbed.NewOTLPDataReceiver(testbed.GetAvailablePort(t)), processorsConfig, }, } @@ -179,7 +164,10 @@ func TestTraceNoBackend10kSPS(t *testing.T) { t, test.sender, test.receiver, - test.resourceSpec, + testbed.ResourceSpec{ + ExpectedMaxCPU: 50, + ExpectedMaxRAM: testConf.ExpectedMaxRAM, + }, performanceResultsSummary, testConf, )