diff --git a/.chloggen/service-remove-ballast-deps-2.yaml b/.chloggen/service-remove-ballast-deps-2.yaml new file mode 100644 index 00000000000..e6377b387de --- /dev/null +++ b/.chloggen/service-remove-ballast-deps-2.yaml @@ -0,0 +1,25 @@ +# Use this changelog template to create an entry for release notes. + +# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix' +change_type: breaking + +# The name of the component, or a single word describing the area of concern, (e.g. otlpreceiver) +component: processor/memorylimiter + +# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`). +note: The memory limiter processor will no longer account for ballast size. + +# One or more tracking issues or pull requests related to the change +issues: [10696] + +# (Optional) One or more lines of additional information to render under the primary note. +# These lines will be padded with 2 spaces and then inserted directly into the document. +# Use pipe (|) for multiline entries. +subtext: If you are already using GOMEMLIMIT instead of the ballast extension this does not affect you. + +# Optional: The change log or logs in which this entry should be included. +# e.g. '[user]' or '[user, api]' +# Include 'user' if the change is relevant to end users. +# Include 'api' if there is a change to a library API. +# Default: '[user]' +change_logs: [] diff --git a/.chloggen/service-remove-ballast-deps-3.yaml b/.chloggen/service-remove-ballast-deps-3.yaml new file mode 100644 index 00000000000..c07f90439f6 --- /dev/null +++ b/.chloggen/service-remove-ballast-deps-3.yaml @@ -0,0 +1,25 @@ +# Use this changelog template to create an entry for release notes. + +# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix' +change_type: breaking + +# The name of the component, or a single word describing the area of concern, (e.g. otlpreceiver) +component: extension/memorylimiter + +# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`). +note: The memory limiter extension will no longer account for ballast size. + +# One or more tracking issues or pull requests related to the change +issues: [10696] + +# (Optional) One or more lines of additional information to render under the primary note. +# These lines will be padded with 2 spaces and then inserted directly into the document. +# Use pipe (|) for multiline entries. +subtext: If you are already using GOMEMLIMIT instead of the ballast extension this does not affect you. + +# Optional: The change log or logs in which this entry should be included. +# e.g. '[user]' or '[user, api]' +# Include 'user' if the change is relevant to end users. +# Include 'api' if there is a change to a library API. +# Default: '[user]' +change_logs: [] diff --git a/.chloggen/service-remove-ballast-deps.yaml b/.chloggen/service-remove-ballast-deps.yaml new file mode 100644 index 00000000000..31ffa90c3ac --- /dev/null +++ b/.chloggen/service-remove-ballast-deps.yaml @@ -0,0 +1,25 @@ +# Use this changelog template to create an entry for release notes. + +# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix' +change_type: breaking + +# The name of the component, or a single word describing the area of concern, (e.g. otlpreceiver) +component: service + +# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`). +note: The service will no longer be able to get a ballast size from the deprecated ballast extension. + +# One or more tracking issues or pull requests related to the change +issues: [10696] + +# (Optional) One or more lines of additional information to render under the primary note. +# These lines will be padded with 2 spaces and then inserted directly into the document. +# Use pipe (|) for multiline entries. +subtext: If you are already using GOMEMLIMIT instead of the ballast extension this does not affect you. + +# Optional: The change log or logs in which this entry should be included. +# e.g. '[user]' or '[user, api]' +# Include 'user' if the change is relevant to end users. +# Include 'api' if there is a change to a library API. +# Default: '[user]' +change_logs: [] diff --git a/internal/memorylimiter/memorylimiter.go b/internal/memorylimiter/memorylimiter.go index a010cba5427..db93795127a 100644 --- a/internal/memorylimiter/memorylimiter.go +++ b/internal/memorylimiter/memorylimiter.go @@ -44,7 +44,6 @@ type MemoryLimiter struct { usageChecker memUsageChecker memCheckWait time.Duration - ballastSize uint64 // mustRefuse is used to indicate when data should be refused. mustRefuse *atomic.Bool @@ -58,8 +57,7 @@ type MemoryLimiter struct { readMemStatsFn func(m *runtime.MemStats) // Fields used for logging. - logger *zap.Logger - configMismatchedLogged bool + logger *zap.Logger refCounterLock sync.Mutex refCounter int @@ -114,14 +112,7 @@ func (ml *MemoryLimiter) startMonitoring() { } } -func (ml *MemoryLimiter) Start(_ context.Context, host component.Host) error { - extensions := host.GetExtensions() - for _, extension := range extensions { - if ext, ok := extension.(interface{ GetBallastSize() uint64 }); ok { - ml.ballastSize = ext.GetBallastSize() - break - } - } +func (ml *MemoryLimiter) Start(_ context.Context, _ component.Host) error { ml.startMonitoring() return nil } @@ -168,16 +159,6 @@ func getMemUsageChecker(cfg *Config, logger *zap.Logger) (*memUsageChecker, erro func (ml *MemoryLimiter) readMemStats() *runtime.MemStats { ms := &runtime.MemStats{} ml.readMemStatsFn(ms) - // If proper configured ms.Alloc should be at least ml.ballastSize but since - // a misconfiguration is possible check for that here. - if ms.Alloc >= ml.ballastSize { - ms.Alloc -= ml.ballastSize - } else if !ml.configMismatchedLogged { - // This indicates misconfiguration. Log it once. - ml.configMismatchedLogged = true - ml.logger.Warn(`"size_mib" in ballast extension is likely incorrectly configured.`) - } - return ms } diff --git a/internal/memorylimiter/memorylimiter_test.go b/internal/memorylimiter/memorylimiter_test.go index e9e92a33f70..6919ce50968 100644 --- a/internal/memorylimiter/memorylimiter_test.go +++ b/internal/memorylimiter/memorylimiter_test.go @@ -4,17 +4,14 @@ package memorylimiter import ( - "context" "runtime" "sync/atomic" "testing" - "time" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" "go.uber.org/zap" - "go.opentelemetry.io/collector/component" "go.opentelemetry.io/collector/internal/iruntime" ) @@ -43,22 +40,6 @@ func TestMemoryPressureResponse(t *testing.T) { ml.CheckMemLimits() assert.True(t, ml.MustRefuse()) - // Check ballast effect - ml.ballastSize = 1000 - - // Below memAllocLimit accounting for ballast. - currentMemAlloc = 800 + ml.ballastSize - ml.CheckMemLimits() - assert.False(t, ml.MustRefuse()) - - // Above memAllocLimit even accounting for ballast. - currentMemAlloc = 1800 + ml.ballastSize - ml.CheckMemLimits() - assert.True(t, ml.MustRefuse()) - - // Restore ballast to default. - ml.ballastSize = 0 - // Check spike limit ml.usageChecker.memSpikeLimit = 512 @@ -151,38 +132,3 @@ func TestRefuseDecision(t *testing.T) { }) } } - -func TestBallastSize(t *testing.T) { - cfg := &Config{ - CheckInterval: 10 * time.Second, - MemoryLimitMiB: 1024, - } - got, err := NewMemoryLimiter(cfg, zap.NewNop()) - require.NoError(t, err) - - got.startMonitoring() - require.NoError(t, got.Start(context.Background(), &host{ballastSize: 113})) - assert.Equal(t, uint64(113), got.ballastSize) - require.NoError(t, got.Shutdown(context.Background())) -} - -type host struct { - ballastSize uint64 - component.Host -} - -func (h *host) GetExtensions() map[component.ID]component.Component { - ret := make(map[component.ID]component.Component) - ret[component.MustNewID("ballast")] = &ballastExtension{ballastSize: h.ballastSize} - return ret -} - -type ballastExtension struct { - ballastSize uint64 - component.StartFunc - component.ShutdownFunc -} - -func (be *ballastExtension) GetBallastSize() uint64 { - return be.ballastSize -} diff --git a/processor/memorylimiterprocessor/memorylimiter_test.go b/processor/memorylimiterprocessor/memorylimiter_test.go index 1ce63d794f3..6172fa39889 100644 --- a/processor/memorylimiterprocessor/memorylimiter_test.go +++ b/processor/memorylimiterprocessor/memorylimiter_test.go @@ -122,7 +122,6 @@ func TestMetricsMemoryPressureResponse(t *testing.T) { tests := []struct { name string mlCfg *Config - ballastSize uint64 memAlloc uint64 expectError bool }{ @@ -133,7 +132,6 @@ func TestMetricsMemoryPressureResponse(t *testing.T) { MemoryLimitPercentage: 50, MemorySpikePercentage: 1, }, - ballastSize: 0, memAlloc: 800, expectError: false, }, @@ -144,29 +142,6 @@ func TestMetricsMemoryPressureResponse(t *testing.T) { MemoryLimitPercentage: 50, MemorySpikePercentage: 1, }, - ballastSize: 0, - memAlloc: 1800, - expectError: true, - }, - { - name: "Below memAllocLimit accounting for ballast", - mlCfg: &Config{ - CheckInterval: time.Second, - MemoryLimitPercentage: 50, - MemorySpikePercentage: 1, - }, - ballastSize: 1000, - memAlloc: 800, - expectError: false, - }, - { - name: "Above memAllocLimit even accounting for ballast", - mlCfg: &Config{ - CheckInterval: time.Second, - MemoryLimitPercentage: 50, - MemorySpikePercentage: 1, - }, - ballastSize: 1000, memAlloc: 1800, expectError: true, }, @@ -177,7 +152,6 @@ func TestMetricsMemoryPressureResponse(t *testing.T) { MemoryLimitPercentage: 50, MemorySpikePercentage: 10, }, - ballastSize: 0, memAlloc: 800, expectError: false, }, @@ -188,7 +162,6 @@ func TestMetricsMemoryPressureResponse(t *testing.T) { MemoryLimitPercentage: 50, MemorySpikePercentage: 11, }, - ballastSize: 0, memAlloc: 800, expectError: true, }, @@ -197,7 +170,7 @@ func TestMetricsMemoryPressureResponse(t *testing.T) { t.Run(tt.name, func(t *testing.T) { memorylimiter.GetMemoryFn = totalMemory memorylimiter.ReadMemStatsFn = func(ms *runtime.MemStats) { - ms.Alloc = tt.memAlloc + tt.ballastSize + ms.Alloc = tt.memAlloc } ml, err := newMemoryLimiterProcessor(processortest.NewNopSettings(), tt.mlCfg) @@ -213,7 +186,7 @@ func TestMetricsMemoryPressureResponse(t *testing.T) { processorhelper.WithShutdown(ml.shutdown)) require.NoError(t, err) - assert.NoError(t, mp.Start(ctx, &host{ballastSize: tt.ballastSize})) + assert.NoError(t, mp.Start(ctx, &host{})) ml.memlimiter.CheckMemLimits() err = mp.ConsumeMetrics(ctx, md) if tt.expectError { @@ -239,7 +212,6 @@ func TestTraceMemoryPressureResponse(t *testing.T) { tests := []struct { name string mlCfg *Config - ballastSize uint64 memAlloc uint64 expectError bool }{ @@ -250,7 +222,6 @@ func TestTraceMemoryPressureResponse(t *testing.T) { MemoryLimitPercentage: 50, MemorySpikePercentage: 1, }, - ballastSize: 0, memAlloc: 800, expectError: false, }, @@ -261,29 +232,6 @@ func TestTraceMemoryPressureResponse(t *testing.T) { MemoryLimitPercentage: 50, MemorySpikePercentage: 1, }, - ballastSize: 0, - memAlloc: 1800, - expectError: true, - }, - { - name: "Below memAllocLimit accounting for ballast", - mlCfg: &Config{ - CheckInterval: time.Second, - MemoryLimitPercentage: 50, - MemorySpikePercentage: 1, - }, - ballastSize: 1000, - memAlloc: 800, - expectError: false, - }, - { - name: "Above memAllocLimit even accounting for ballast", - mlCfg: &Config{ - CheckInterval: time.Second, - MemoryLimitPercentage: 50, - MemorySpikePercentage: 1, - }, - ballastSize: 1000, memAlloc: 1800, expectError: true, }, @@ -294,7 +242,6 @@ func TestTraceMemoryPressureResponse(t *testing.T) { MemoryLimitPercentage: 50, MemorySpikePercentage: 10, }, - ballastSize: 0, memAlloc: 800, expectError: false, }, @@ -305,7 +252,6 @@ func TestTraceMemoryPressureResponse(t *testing.T) { MemoryLimitPercentage: 50, MemorySpikePercentage: 11, }, - ballastSize: 0, memAlloc: 800, expectError: true, }, @@ -314,7 +260,7 @@ func TestTraceMemoryPressureResponse(t *testing.T) { t.Run(tt.name, func(t *testing.T) { memorylimiter.GetMemoryFn = totalMemory memorylimiter.ReadMemStatsFn = func(ms *runtime.MemStats) { - ms.Alloc = tt.memAlloc + tt.ballastSize + ms.Alloc = tt.memAlloc } ml, err := newMemoryLimiterProcessor(processortest.NewNopSettings(), tt.mlCfg) @@ -330,7 +276,7 @@ func TestTraceMemoryPressureResponse(t *testing.T) { processorhelper.WithShutdown(ml.shutdown)) require.NoError(t, err) - assert.NoError(t, tp.Start(ctx, &host{ballastSize: tt.ballastSize})) + assert.NoError(t, tp.Start(ctx, &host{})) ml.memlimiter.CheckMemLimits() err = tp.ConsumeTraces(ctx, td) if tt.expectError { @@ -356,7 +302,6 @@ func TestLogMemoryPressureResponse(t *testing.T) { tests := []struct { name string mlCfg *Config - ballastSize uint64 memAlloc uint64 expectError bool }{ @@ -367,7 +312,6 @@ func TestLogMemoryPressureResponse(t *testing.T) { MemoryLimitPercentage: 50, MemorySpikePercentage: 1, }, - ballastSize: 0, memAlloc: 800, expectError: false, }, @@ -378,29 +322,6 @@ func TestLogMemoryPressureResponse(t *testing.T) { MemoryLimitPercentage: 50, MemorySpikePercentage: 1, }, - ballastSize: 0, - memAlloc: 1800, - expectError: true, - }, - { - name: "Below memAllocLimit accounting for ballast", - mlCfg: &Config{ - CheckInterval: time.Second, - MemoryLimitPercentage: 50, - MemorySpikePercentage: 1, - }, - ballastSize: 1000, - memAlloc: 800, - expectError: false, - }, - { - name: "Above memAllocLimit even accounting for ballast", - mlCfg: &Config{ - CheckInterval: time.Second, - MemoryLimitPercentage: 50, - MemorySpikePercentage: 1, - }, - ballastSize: 1000, memAlloc: 1800, expectError: true, }, @@ -411,7 +332,6 @@ func TestLogMemoryPressureResponse(t *testing.T) { MemoryLimitPercentage: 50, MemorySpikePercentage: 10, }, - ballastSize: 0, memAlloc: 800, expectError: false, }, @@ -422,7 +342,6 @@ func TestLogMemoryPressureResponse(t *testing.T) { MemoryLimitPercentage: 50, MemorySpikePercentage: 11, }, - ballastSize: 0, memAlloc: 800, expectError: true, }, @@ -431,7 +350,7 @@ func TestLogMemoryPressureResponse(t *testing.T) { t.Run(tt.name, func(t *testing.T) { memorylimiter.GetMemoryFn = totalMemory memorylimiter.ReadMemStatsFn = func(ms *runtime.MemStats) { - ms.Alloc = tt.memAlloc + tt.ballastSize + ms.Alloc = tt.memAlloc } ml, err := newMemoryLimiterProcessor(processortest.NewNopSettings(), tt.mlCfg) @@ -447,7 +366,7 @@ func TestLogMemoryPressureResponse(t *testing.T) { processorhelper.WithShutdown(ml.shutdown)) require.NoError(t, err) - assert.NoError(t, tp.Start(ctx, &host{ballastSize: tt.ballastSize})) + assert.NoError(t, tp.Start(ctx, &host{})) ml.memlimiter.CheckMemLimits() err = tp.ConsumeLogs(ctx, ld) if tt.expectError { @@ -465,26 +384,14 @@ func TestLogMemoryPressureResponse(t *testing.T) { } type host struct { - ballastSize uint64 component.Host } func (h *host) GetExtensions() map[component.ID]component.Component { ret := make(map[component.ID]component.Component) - ret[component.MustNewID("ballast")] = &ballastExtension{ballastSize: h.ballastSize} return ret } -type ballastExtension struct { - ballastSize uint64 - component.StartFunc - component.ShutdownFunc -} - -func (be *ballastExtension) GetBallastSize() uint64 { - return be.ballastSize -} - func totalMemory() (uint64, error) { return uint64(2048), nil } diff --git a/service/internal/proctelemetry/process_telemetry.go b/service/internal/proctelemetry/process_telemetry.go index 0b978758104..e7a0cc1454a 100644 --- a/service/internal/proctelemetry/process_telemetry.go +++ b/service/internal/proctelemetry/process_telemetry.go @@ -21,7 +21,6 @@ import ( // processMetrics is a struct that contains views related to process metrics (cpu, mem, etc) type processMetrics struct { startTimeUnixNano int64 - ballastSizeBytes uint64 proc *process.Process context context.Context @@ -54,7 +53,7 @@ func WithHostProc(hostProc string) RegisterOption { // RegisterProcessMetrics creates a new set of processMetrics (mem, cpu) that can be used to measure // basic information about this process. -func RegisterProcessMetrics(cfg servicetelemetry.TelemetrySettings, ballastSizeBytes uint64, opts ...RegisterOption) error { +func RegisterProcessMetrics(cfg servicetelemetry.TelemetrySettings, opts ...RegisterOption) error { set := registerOption{} for _, opt := range opts { opt.apply(&set) @@ -62,7 +61,6 @@ func RegisterProcessMetrics(cfg servicetelemetry.TelemetrySettings, ballastSizeB var err error pm := &processMetrics{ startTimeUnixNano: time.Now().UnixNano(), - ballastSizeBytes: ballastSizeBytes, ms: &runtime.MemStats{}, } @@ -139,10 +137,4 @@ func (pm *processMetrics) readMemStatsIfNeeded() { } pm.lastMsRead = now runtime.ReadMemStats(pm.ms) - if pm.ballastSizeBytes > 0 { - pm.ms.Alloc -= pm.ballastSizeBytes - pm.ms.HeapAlloc -= pm.ballastSizeBytes - pm.ms.HeapSys -= pm.ballastSizeBytes - pm.ms.HeapInuse -= pm.ballastSizeBytes - } } diff --git a/service/internal/proctelemetry/process_telemetry_linux_test.go b/service/internal/proctelemetry/process_telemetry_linux_test.go index 73605c0ae8e..99471b4eca2 100644 --- a/service/internal/proctelemetry/process_telemetry_linux_test.go +++ b/service/internal/proctelemetry/process_telemetry_linux_test.go @@ -21,7 +21,7 @@ func TestProcessTelemetryWithHostProc(t *testing.T) { // Make the sure the environment variable value is not used. t.Setenv("HOST_PROC", "foo/bar") - require.NoError(t, RegisterProcessMetrics(tel.TelemetrySettings, 0, WithHostProc("/proc"))) + require.NoError(t, RegisterProcessMetrics(tel.TelemetrySettings, WithHostProc("/proc"))) // Check that the metrics are actually filled. time.Sleep(200 * time.Millisecond) diff --git a/service/internal/proctelemetry/process_telemetry_test.go b/service/internal/proctelemetry/process_telemetry_test.go index cccc5ad8e30..fb750fcf664 100644 --- a/service/internal/proctelemetry/process_telemetry_test.go +++ b/service/internal/proctelemetry/process_telemetry_test.go @@ -78,7 +78,7 @@ func fetchPrometheusMetrics(handler http.Handler) (map[string]*io_prometheus_cli func TestProcessTelemetry(t *testing.T) { tel := setupTelemetry(t) - require.NoError(t, RegisterProcessMetrics(tel.TelemetrySettings, 0)) + require.NoError(t, RegisterProcessMetrics(tel.TelemetrySettings)) mp, err := fetchPrometheusMetrics(tel.promHandler) require.NoError(t, err) diff --git a/service/service.go b/service/service.go index 99747db9ac6..8b57bd12cad 100644 --- a/service/service.go +++ b/service/service.go @@ -159,7 +159,7 @@ func New(ctx context.Context, set Settings, cfg Config) (*Service, error) { if cfg.Telemetry.Metrics.Level != configtelemetry.LevelNone && cfg.Telemetry.Metrics.Address != "" { // The process telemetry initialization requires the ballast size, which is available after the extensions are initialized. - if err = proctelemetry.RegisterProcessMetrics(srv.telemetrySettings, getBallastSize(srv.host)); err != nil { + if err = proctelemetry.RegisterProcessMetrics(srv.telemetrySettings); err != nil { return nil, fmt.Errorf("failed to register process metrics: %w", err) } } @@ -316,15 +316,6 @@ func (srv *Service) Logger() *zap.Logger { return srv.telemetrySettings.Logger } -func getBallastSize(host component.Host) uint64 { - for _, ext := range host.GetExtensions() { - if bExt, ok := ext.(interface{ GetBallastSize() uint64 }); ok { - return bExt.GetBallastSize() - } - } - return 0 -} - func pdataFromSdk(res *sdkresource.Resource) pcommon.Resource { // pcommon.NewResource is the best way to generate a new resource currently and is safe to use outside of tests. // Because the resource is signal agnostic, and we need a net new resource, not an existing one, this is the only