From 13d71a6a2d0ad0217a8aae6ae0ff20a68fef34f4 Mon Sep 17 00:00:00 2001 From: "Alex K." <8418476+fearful-symmetry@users.noreply.github.com> Date: Wed, 9 Oct 2024 07:27:39 -0700 Subject: [PATCH] Fix cgroup helper init in `add_processor_metadata` and `add_docker_metadata` processors (#41108) * only initialize cgroup reader once * docs * remove old struct fields --------- Co-authored-by: Pierre HILBERT --- .../add_docker_metadata.go | 25 ++++++---- .../add_docker_metadata_test.go | 49 +++++++++++-------- .../add_process_metadata.go | 14 ++++-- .../add_process_metadata_test.go | 28 +++++++++-- .../gosigar_cid_provider.go | 10 ++-- libbeat/processors/cgroups.go | 32 ++++++++++++ 6 files changed, 114 insertions(+), 44 deletions(-) create mode 100644 libbeat/processors/cgroups.go diff --git a/libbeat/processors/add_docker_metadata/add_docker_metadata.go b/libbeat/processors/add_docker_metadata/add_docker_metadata.go index d670713894d..8c6b9d146b1 100644 --- a/libbeat/processors/add_docker_metadata/add_docker_metadata.go +++ b/libbeat/processors/add_docker_metadata/add_docker_metadata.go @@ -47,9 +47,11 @@ const ( cgroupCacheExpiration = 5 * time.Minute ) -// processGroupPaths returns the cgroups associated with a process. This enables +// initCgroupPaths initializes a new cgroup reader. This enables // unit testing by allowing us to stub the OS interface. -var processCgroupPaths = cgroup.ProcessCgroupPaths +var initCgroupPaths processors.InitCgroupHandler = func(rootfsMountpoint resolve.Resolver, ignoreRootCgroups bool) (processors.CGReader, error) { + return cgroup.NewReader(rootfsMountpoint, ignoreRootCgroups) +} func init() { processors.RegisterPlugin(processorName, New) @@ -61,11 +63,11 @@ type addDockerMetadata struct { fields []string sourceProcessor beat.Processor - pidFields []string // Field names that contain PIDs. - cgroups *common.Cache // Cache of PID (int) to cgropus (map[string]string). - hostFS resolve.Resolver // Directory where /proc is found - dedot bool // If set to true, replace dots in labels with `_`. - dockerAvailable bool // If Docker exists in env, then it is set to true + pidFields []string // Field names that contain PIDs. + cgroups *common.Cache // Cache of PID (int) to cgropus (map[string]string). + dedot bool // If set to true, replace dots in labels with `_`. + dockerAvailable bool // If Docker exists in env, then it is set to true + cgreader processors.CGReader } const selector = "add_docker_metadata" @@ -110,15 +112,20 @@ func buildDockerMetadataProcessor(log *logp.Logger, cfg *conf.C, watcherConstruc } } + reader, err := initCgroupPaths(resolve.NewTestResolver(config.HostFS), false) + if err != nil { + return nil, fmt.Errorf("error creating cgroup reader: %w", err) + } + return &addDockerMetadata{ log: log, watcher: watcher, fields: config.Fields, sourceProcessor: sourceProcessor, pidFields: config.MatchPIDs, - hostFS: resolve.NewTestResolver(config.HostFS), dedot: config.DeDot, dockerAvailable: dockerAvailable, + cgreader: reader, }, nil } @@ -277,7 +284,7 @@ func (d *addDockerMetadata) getProcessCgroups(pid int) (cgroup.PathList, error) return cgroups, nil } - cgroups, err := processCgroupPaths(d.hostFS, pid) + cgroups, err := d.cgreader.ProcessCgroupPaths(pid) if err != nil { return cgroups, fmt.Errorf("failed to read cgroups for pid=%v: %w", pid, err) } diff --git a/libbeat/processors/add_docker_metadata/add_docker_metadata_test.go b/libbeat/processors/add_docker_metadata/add_docker_metadata_test.go index 2b6663f71dc..dc3d5e3003c 100644 --- a/libbeat/processors/add_docker_metadata/add_docker_metadata_test.go +++ b/libbeat/processors/add_docker_metadata/add_docker_metadata_test.go @@ -28,6 +28,7 @@ import ( "github.com/stretchr/testify/assert" "github.com/elastic/beats/v7/libbeat/beat" + "github.com/elastic/beats/v7/libbeat/processors" "github.com/elastic/elastic-agent-autodiscover/bus" "github.com/elastic/elastic-agent-autodiscover/docker" "github.com/elastic/elastic-agent-libs/config" @@ -37,29 +38,35 @@ import ( "github.com/elastic/elastic-agent-system-metrics/metric/system/resolve" ) +type testCGReader struct { +} + +func (r testCGReader) ProcessCgroupPaths(pid int) (cgroup.PathList, error) { + switch pid { + case 1000: + return cgroup.PathList{ + V1: map[string]cgroup.ControllerPath{ + "cpu": {ControllerPath: "/docker/8c147fdfab5a2608fe513d10294bf77cb502a231da9725093a155bd25cd1f14b", IsV2: false}, + }, + }, nil + case 2000: + return cgroup.PathList{ + V1: map[string]cgroup.ControllerPath{ + "memory": {ControllerPath: "/user.slice", IsV2: false}, + }, + }, nil + case 3000: + // Parser error (hopefully this never happens). + return cgroup.PathList{}, fmt.Errorf("cgroup parse failure") + default: + return cgroup.PathList{}, os.ErrNotExist + } +} + func init() { // Stub out the procfs. - processCgroupPaths = func(_ resolve.Resolver, pid int) (cgroup.PathList, error) { - - switch pid { - case 1000: - return cgroup.PathList{ - V1: map[string]cgroup.ControllerPath{ - "cpu": {ControllerPath: "/docker/8c147fdfab5a2608fe513d10294bf77cb502a231da9725093a155bd25cd1f14b", IsV2: false}, - }, - }, nil - case 2000: - return cgroup.PathList{ - V1: map[string]cgroup.ControllerPath{ - "memory": {ControllerPath: "/user.slice", IsV2: false}, - }, - }, nil - case 3000: - // Parser error (hopefully this never happens). - return cgroup.PathList{}, fmt.Errorf("cgroup parse failure") - default: - return cgroup.PathList{}, os.ErrNotExist - } + initCgroupPaths = func(_ resolve.Resolver, _ bool) (processors.CGReader, error) { + return testCGReader{}, nil } } diff --git a/libbeat/processors/add_process_metadata/add_process_metadata.go b/libbeat/processors/add_process_metadata/add_process_metadata.go index 8bb8ecea5a9..6bbd1c00897 100644 --- a/libbeat/processors/add_process_metadata/add_process_metadata.go +++ b/libbeat/processors/add_process_metadata/add_process_metadata.go @@ -54,7 +54,10 @@ var ( procCache = newProcessCache(cacheExpiration, cacheCapacity, cacheEvictionEffort, gosysinfoProvider{}) - processCgroupPaths = cgroup.ProcessCgroupPaths + // cgroups resolver, turned to a stub function to make testing easier. + initCgroupPaths processors.InitCgroupHandler = func(rootfsMountpoint resolve.Resolver, ignoreRootCgroups bool) (processors.CGReader, error) { + return cgroup.NewReader(rootfsMountpoint, ignoreRootCgroups) + } instanceID atomic.Uint32 ) @@ -160,6 +163,11 @@ func newProcessMetadataProcessorWithProvider(config config, provider processMeta } } + reader, err := initCgroupPaths(resolve.NewTestResolver(config.HostPath), false) + if err != nil { + return nil, fmt.Errorf("error creating cgroup reader: %w", err) + } + // don't use cgroup.ProcessCgroupPaths to save it from doing the work when container id disabled if ok := containsValue(mappings, "container.id"); ok { if withCache && config.CgroupCacheExpireTime != 0 { @@ -170,9 +178,9 @@ func newProcessMetadataProcessorWithProvider(config config, provider processMeta p.cgroupsCache = common.NewCacheWithRemovalListener(config.CgroupCacheExpireTime, 100, evictionListener) p.cgroupsCache.StartJanitor(config.CgroupCacheExpireTime) - p.cidProvider = newCidProvider(resolve.NewTestResolver(config.HostPath), config.CgroupPrefixes, config.CgroupRegex, processCgroupPaths, p.cgroupsCache) + p.cidProvider = newCidProvider(config.CgroupPrefixes, config.CgroupRegex, reader, p.cgroupsCache) } else { - p.cidProvider = newCidProvider(resolve.NewTestResolver(config.HostPath), config.CgroupPrefixes, config.CgroupRegex, processCgroupPaths, nil) + p.cidProvider = newCidProvider(config.CgroupPrefixes, config.CgroupRegex, reader, nil) } } diff --git a/libbeat/processors/add_process_metadata/add_process_metadata_test.go b/libbeat/processors/add_process_metadata/add_process_metadata_test.go index 977a554f320..bd761b5e3a5 100644 --- a/libbeat/processors/add_process_metadata/add_process_metadata_test.go +++ b/libbeat/processors/add_process_metadata/add_process_metadata_test.go @@ -31,6 +31,7 @@ import ( "github.com/elastic/beats/v7/libbeat/beat" "github.com/elastic/beats/v7/libbeat/common/capabilities" + "github.com/elastic/beats/v7/libbeat/processors" conf "github.com/elastic/elastic-agent-libs/config" "github.com/elastic/elastic-agent-libs/logp" "github.com/elastic/elastic-agent-libs/mapstr" @@ -38,6 +39,20 @@ import ( "github.com/elastic/elastic-agent-system-metrics/metric/system/resolve" ) +type testCGRsolver struct { + res func(pid int) (cgroup.PathList, error) +} + +func (t testCGRsolver) ProcessCgroupPaths(pid int) (cgroup.PathList, error) { + return t.res(pid) +} + +func newCGHandlerBuilder(handler testCGRsolver) processors.InitCgroupHandler { + return func(_ resolve.Resolver, _ bool) (processors.CGReader, error) { + return handler, nil + } +} + func TestAddProcessMetadata(t *testing.T) { logp.TestingSetup(logp.WithSelectors(processorName)) @@ -90,7 +105,7 @@ func TestAddProcessMetadata(t *testing.T) { } // mock of the cgroup processCgroupPaths - processCgroupPaths = func(_ resolve.Resolver, pid int) (cgroup.PathList, error) { + processCgroupPaths := func(pid int) (cgroup.PathList, error) { testMap := map[int]cgroup.PathList{ 1: { V1: map[string]cgroup.ControllerPath{ @@ -135,6 +150,7 @@ func TestAddProcessMetadata(t *testing.T) { return testMap[pid], nil } + initCgroupPaths = newCGHandlerBuilder(testCGRsolver{res: processCgroupPaths}) for _, test := range []struct { description string @@ -884,7 +900,7 @@ func TestUsingCache(t *testing.T) { selfPID := os.Getpid() // mock of the cgroup processCgroupPaths - processCgroupPaths = func(_ resolve.Resolver, pid int) (cgroup.PathList, error) { + processCgroupPaths := func(pid int) (cgroup.PathList, error) { testStruct := cgroup.PathList{ V1: map[string]cgroup.ControllerPath{ "cpu": {ControllerPath: "/kubepods/besteffort/pod665fb997-575b-11ea-bfce-080027421ddf/b5285682fba7449c86452b89a800609440ecc88a7ba5f2d38bedfb85409b30b1"}, @@ -909,7 +925,7 @@ func TestUsingCache(t *testing.T) { // testMap := return testMap[pid], nil } - + initCgroupPaths = newCGHandlerBuilder(testCGRsolver{res: processCgroupPaths}) config, err := conf.NewConfigFrom(mapstr.M{ "match_pids": []string{"system.process.ppid"}, "include_fields": []string{"container.id", "process.env"}, @@ -1202,7 +1218,7 @@ func TestPIDToInt(t *testing.T) { } func TestV2CID(t *testing.T) { - processCgroupPaths = func(_ resolve.Resolver, _ int) (cgroup.PathList, error) { + processCgroupPaths := func(_ int) (cgroup.PathList, error) { testMap := cgroup.PathList{ V1: map[string]cgroup.ControllerPath{ "cpu": {IsV2: true, ControllerPath: "system.slice/docker-2dcbab615aebfa9313feffc5cfdacd381543cfa04c6be3f39ac656e55ef34805.scope"}, @@ -1210,7 +1226,9 @@ func TestV2CID(t *testing.T) { } return testMap, nil } - provider := newCidProvider(resolve.NewTestResolver(""), nil, defaultCgroupRegex, processCgroupPaths, nil) + resolver := testCGRsolver{res: processCgroupPaths} + initCgroupPaths = newCGHandlerBuilder(resolver) + provider := newCidProvider(nil, defaultCgroupRegex, resolver, nil) result, err := provider.GetCid(1) assert.NoError(t, err) assert.Equal(t, "2dcbab615aebfa9313feffc5cfdacd381543cfa04c6be3f39ac656e55ef34805", result) diff --git a/libbeat/processors/add_process_metadata/gosigar_cid_provider.go b/libbeat/processors/add_process_metadata/gosigar_cid_provider.go index 00c46f2b8bf..d01e620c7c5 100644 --- a/libbeat/processors/add_process_metadata/gosigar_cid_provider.go +++ b/libbeat/processors/add_process_metadata/gosigar_cid_provider.go @@ -26,9 +26,9 @@ import ( "strings" "github.com/elastic/beats/v7/libbeat/common" + "github.com/elastic/beats/v7/libbeat/processors" "github.com/elastic/elastic-agent-libs/logp" "github.com/elastic/elastic-agent-system-metrics/metric/system/cgroup" - "github.com/elastic/elastic-agent-system-metrics/metric/system/resolve" ) const ( @@ -37,10 +37,9 @@ const ( type gosigarCidProvider struct { log *logp.Logger - hostPath resolve.Resolver cgroupPrefixes []string cgroupRegex *regexp.Regexp - processCgroupPaths func(resolve.Resolver, int) (cgroup.PathList, error) + processCgroupPaths processors.CGReader pidCidCache *common.Cache } @@ -70,10 +69,9 @@ func (p gosigarCidProvider) GetCid(pid int) (result string, err error) { return cid, nil } -func newCidProvider(hostPath resolve.Resolver, cgroupPrefixes []string, cgroupRegex *regexp.Regexp, processCgroupPaths func(resolve.Resolver, int) (cgroup.PathList, error), pidCidCache *common.Cache) gosigarCidProvider { +func newCidProvider(cgroupPrefixes []string, cgroupRegex *regexp.Regexp, processCgroupPaths processors.CGReader, pidCidCache *common.Cache) gosigarCidProvider { return gosigarCidProvider{ log: logp.NewLogger(providerName), - hostPath: hostPath, cgroupPrefixes: cgroupPrefixes, cgroupRegex: cgroupRegex, processCgroupPaths: processCgroupPaths, @@ -84,7 +82,7 @@ func newCidProvider(hostPath resolve.Resolver, cgroupPrefixes []string, cgroupRe // getProcessCgroups returns a mapping of cgroup subsystem name to path. It // returns an error if it failed to retrieve the cgroup info. func (p gosigarCidProvider) getProcessCgroups(pid int) (cgroup.PathList, error) { - pathList, err := p.processCgroupPaths(p.hostPath, pid) + pathList, err := p.processCgroupPaths.ProcessCgroupPaths(pid) if err != nil { var pathError *fs.PathError if errors.As(err, &pathError) { diff --git a/libbeat/processors/cgroups.go b/libbeat/processors/cgroups.go new file mode 100644 index 00000000000..8e54ae5535b --- /dev/null +++ b/libbeat/processors/cgroups.go @@ -0,0 +1,32 @@ +// Licensed to Elasticsearch B.V. under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Elasticsearch B.V. licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package processors + +import ( + "github.com/elastic/elastic-agent-system-metrics/metric/system/cgroup" + "github.com/elastic/elastic-agent-system-metrics/metric/system/resolve" +) + +// InitCgroupHandler is a type for creating stubs for the cgroup resolver. Used primarily for testing. +type InitCgroupHandler = func(rootfsMountpoint resolve.Resolver, ignoreRootCgroups bool) (CGReader, error) + +// CGReader wraps the group Reader.ProcessCgroupPaths() call, this allows us to +// set different cgroups readers for testing. +type CGReader interface { + ProcessCgroupPaths(pid int) (cgroup.PathList, error) +}