Skip to content

Commit

Permalink
Fix cgroup helper init in add_processor_metadata and `add_docker_me…
Browse files Browse the repository at this point in the history
…tadata` processors (#41108)

* only initialize cgroup reader once

* docs

* remove old struct fields

---------

Co-authored-by: Pierre HILBERT <pierre.hilbert@elastic.co>
  • Loading branch information
fearful-symmetry and pierrehilbert authored Oct 9, 2024
1 parent 77aa604 commit 13d71a6
Show file tree
Hide file tree
Showing 6 changed files with 114 additions and 44 deletions.
25 changes: 16 additions & 9 deletions libbeat/processors/add_docker_metadata/add_docker_metadata.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,9 +47,11 @@ const (
cgroupCacheExpiration = 5 * time.Minute
)

// processGroupPaths returns the cgroups associated with a process. This enables
// initCgroupPaths initializes a new cgroup reader. This enables
// unit testing by allowing us to stub the OS interface.
var processCgroupPaths = cgroup.ProcessCgroupPaths
var initCgroupPaths processors.InitCgroupHandler = func(rootfsMountpoint resolve.Resolver, ignoreRootCgroups bool) (processors.CGReader, error) {
return cgroup.NewReader(rootfsMountpoint, ignoreRootCgroups)
}

func init() {
processors.RegisterPlugin(processorName, New)
Expand All @@ -61,11 +63,11 @@ type addDockerMetadata struct {
fields []string
sourceProcessor beat.Processor

pidFields []string // Field names that contain PIDs.
cgroups *common.Cache // Cache of PID (int) to cgropus (map[string]string).
hostFS resolve.Resolver // Directory where /proc is found
dedot bool // If set to true, replace dots in labels with `_`.
dockerAvailable bool // If Docker exists in env, then it is set to true
pidFields []string // Field names that contain PIDs.
cgroups *common.Cache // Cache of PID (int) to cgropus (map[string]string).
dedot bool // If set to true, replace dots in labels with `_`.
dockerAvailable bool // If Docker exists in env, then it is set to true
cgreader processors.CGReader
}

const selector = "add_docker_metadata"
Expand Down Expand Up @@ -110,15 +112,20 @@ func buildDockerMetadataProcessor(log *logp.Logger, cfg *conf.C, watcherConstruc
}
}

reader, err := initCgroupPaths(resolve.NewTestResolver(config.HostFS), false)
if err != nil {
return nil, fmt.Errorf("error creating cgroup reader: %w", err)
}

return &addDockerMetadata{
log: log,
watcher: watcher,
fields: config.Fields,
sourceProcessor: sourceProcessor,
pidFields: config.MatchPIDs,
hostFS: resolve.NewTestResolver(config.HostFS),
dedot: config.DeDot,
dockerAvailable: dockerAvailable,
cgreader: reader,
}, nil
}

Expand Down Expand Up @@ -277,7 +284,7 @@ func (d *addDockerMetadata) getProcessCgroups(pid int) (cgroup.PathList, error)
return cgroups, nil
}

cgroups, err := processCgroupPaths(d.hostFS, pid)
cgroups, err := d.cgreader.ProcessCgroupPaths(pid)
if err != nil {
return cgroups, fmt.Errorf("failed to read cgroups for pid=%v: %w", pid, err)
}
Expand Down
49 changes: 28 additions & 21 deletions libbeat/processors/add_docker_metadata/add_docker_metadata_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import (
"github.com/stretchr/testify/assert"

"github.com/elastic/beats/v7/libbeat/beat"
"github.com/elastic/beats/v7/libbeat/processors"
"github.com/elastic/elastic-agent-autodiscover/bus"
"github.com/elastic/elastic-agent-autodiscover/docker"
"github.com/elastic/elastic-agent-libs/config"
Expand All @@ -37,29 +38,35 @@ import (
"github.com/elastic/elastic-agent-system-metrics/metric/system/resolve"
)

type testCGReader struct {
}

func (r testCGReader) ProcessCgroupPaths(pid int) (cgroup.PathList, error) {
switch pid {
case 1000:
return cgroup.PathList{
V1: map[string]cgroup.ControllerPath{
"cpu": {ControllerPath: "/docker/8c147fdfab5a2608fe513d10294bf77cb502a231da9725093a155bd25cd1f14b", IsV2: false},
},
}, nil
case 2000:
return cgroup.PathList{
V1: map[string]cgroup.ControllerPath{
"memory": {ControllerPath: "/user.slice", IsV2: false},
},
}, nil
case 3000:
// Parser error (hopefully this never happens).
return cgroup.PathList{}, fmt.Errorf("cgroup parse failure")
default:
return cgroup.PathList{}, os.ErrNotExist
}
}

func init() {
// Stub out the procfs.
processCgroupPaths = func(_ resolve.Resolver, pid int) (cgroup.PathList, error) {

switch pid {
case 1000:
return cgroup.PathList{
V1: map[string]cgroup.ControllerPath{
"cpu": {ControllerPath: "/docker/8c147fdfab5a2608fe513d10294bf77cb502a231da9725093a155bd25cd1f14b", IsV2: false},
},
}, nil
case 2000:
return cgroup.PathList{
V1: map[string]cgroup.ControllerPath{
"memory": {ControllerPath: "/user.slice", IsV2: false},
},
}, nil
case 3000:
// Parser error (hopefully this never happens).
return cgroup.PathList{}, fmt.Errorf("cgroup parse failure")
default:
return cgroup.PathList{}, os.ErrNotExist
}
initCgroupPaths = func(_ resolve.Resolver, _ bool) (processors.CGReader, error) {
return testCGReader{}, nil
}
}

Expand Down
14 changes: 11 additions & 3 deletions libbeat/processors/add_process_metadata/add_process_metadata.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,10 @@ var (

procCache = newProcessCache(cacheExpiration, cacheCapacity, cacheEvictionEffort, gosysinfoProvider{})

processCgroupPaths = cgroup.ProcessCgroupPaths
// cgroups resolver, turned to a stub function to make testing easier.
initCgroupPaths processors.InitCgroupHandler = func(rootfsMountpoint resolve.Resolver, ignoreRootCgroups bool) (processors.CGReader, error) {
return cgroup.NewReader(rootfsMountpoint, ignoreRootCgroups)
}

instanceID atomic.Uint32
)
Expand Down Expand Up @@ -160,6 +163,11 @@ func newProcessMetadataProcessorWithProvider(config config, provider processMeta
}
}

reader, err := initCgroupPaths(resolve.NewTestResolver(config.HostPath), false)
if err != nil {
return nil, fmt.Errorf("error creating cgroup reader: %w", err)
}

// don't use cgroup.ProcessCgroupPaths to save it from doing the work when container id disabled
if ok := containsValue(mappings, "container.id"); ok {
if withCache && config.CgroupCacheExpireTime != 0 {
Expand All @@ -170,9 +178,9 @@ func newProcessMetadataProcessorWithProvider(config config, provider processMeta

p.cgroupsCache = common.NewCacheWithRemovalListener(config.CgroupCacheExpireTime, 100, evictionListener)
p.cgroupsCache.StartJanitor(config.CgroupCacheExpireTime)
p.cidProvider = newCidProvider(resolve.NewTestResolver(config.HostPath), config.CgroupPrefixes, config.CgroupRegex, processCgroupPaths, p.cgroupsCache)
p.cidProvider = newCidProvider(config.CgroupPrefixes, config.CgroupRegex, reader, p.cgroupsCache)
} else {
p.cidProvider = newCidProvider(resolve.NewTestResolver(config.HostPath), config.CgroupPrefixes, config.CgroupRegex, processCgroupPaths, nil)
p.cidProvider = newCidProvider(config.CgroupPrefixes, config.CgroupRegex, reader, nil)
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,13 +31,28 @@ import (

"github.com/elastic/beats/v7/libbeat/beat"
"github.com/elastic/beats/v7/libbeat/common/capabilities"
"github.com/elastic/beats/v7/libbeat/processors"
conf "github.com/elastic/elastic-agent-libs/config"
"github.com/elastic/elastic-agent-libs/logp"
"github.com/elastic/elastic-agent-libs/mapstr"
"github.com/elastic/elastic-agent-system-metrics/metric/system/cgroup"
"github.com/elastic/elastic-agent-system-metrics/metric/system/resolve"
)

type testCGRsolver struct {
res func(pid int) (cgroup.PathList, error)
}

func (t testCGRsolver) ProcessCgroupPaths(pid int) (cgroup.PathList, error) {
return t.res(pid)
}

func newCGHandlerBuilder(handler testCGRsolver) processors.InitCgroupHandler {
return func(_ resolve.Resolver, _ bool) (processors.CGReader, error) {
return handler, nil
}
}

func TestAddProcessMetadata(t *testing.T) {
logp.TestingSetup(logp.WithSelectors(processorName))

Expand Down Expand Up @@ -90,7 +105,7 @@ func TestAddProcessMetadata(t *testing.T) {
}

// mock of the cgroup processCgroupPaths
processCgroupPaths = func(_ resolve.Resolver, pid int) (cgroup.PathList, error) {
processCgroupPaths := func(pid int) (cgroup.PathList, error) {
testMap := map[int]cgroup.PathList{
1: {
V1: map[string]cgroup.ControllerPath{
Expand Down Expand Up @@ -135,6 +150,7 @@ func TestAddProcessMetadata(t *testing.T) {

return testMap[pid], nil
}
initCgroupPaths = newCGHandlerBuilder(testCGRsolver{res: processCgroupPaths})

for _, test := range []struct {
description string
Expand Down Expand Up @@ -884,7 +900,7 @@ func TestUsingCache(t *testing.T) {
selfPID := os.Getpid()

// mock of the cgroup processCgroupPaths
processCgroupPaths = func(_ resolve.Resolver, pid int) (cgroup.PathList, error) {
processCgroupPaths := func(pid int) (cgroup.PathList, error) {
testStruct := cgroup.PathList{
V1: map[string]cgroup.ControllerPath{
"cpu": {ControllerPath: "/kubepods/besteffort/pod665fb997-575b-11ea-bfce-080027421ddf/b5285682fba7449c86452b89a800609440ecc88a7ba5f2d38bedfb85409b30b1"},
Expand All @@ -909,7 +925,7 @@ func TestUsingCache(t *testing.T) {
// testMap :=
return testMap[pid], nil
}

initCgroupPaths = newCGHandlerBuilder(testCGRsolver{res: processCgroupPaths})
config, err := conf.NewConfigFrom(mapstr.M{
"match_pids": []string{"system.process.ppid"},
"include_fields": []string{"container.id", "process.env"},
Expand Down Expand Up @@ -1202,15 +1218,17 @@ func TestPIDToInt(t *testing.T) {
}

func TestV2CID(t *testing.T) {
processCgroupPaths = func(_ resolve.Resolver, _ int) (cgroup.PathList, error) {
processCgroupPaths := func(_ int) (cgroup.PathList, error) {
testMap := cgroup.PathList{
V1: map[string]cgroup.ControllerPath{
"cpu": {IsV2: true, ControllerPath: "system.slice/docker-2dcbab615aebfa9313feffc5cfdacd381543cfa04c6be3f39ac656e55ef34805.scope"},
},
}
return testMap, nil
}
provider := newCidProvider(resolve.NewTestResolver(""), nil, defaultCgroupRegex, processCgroupPaths, nil)
resolver := testCGRsolver{res: processCgroupPaths}
initCgroupPaths = newCGHandlerBuilder(resolver)
provider := newCidProvider(nil, defaultCgroupRegex, resolver, nil)
result, err := provider.GetCid(1)
assert.NoError(t, err)
assert.Equal(t, "2dcbab615aebfa9313feffc5cfdacd381543cfa04c6be3f39ac656e55ef34805", result)
Expand Down
10 changes: 4 additions & 6 deletions libbeat/processors/add_process_metadata/gosigar_cid_provider.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,9 +26,9 @@ import (
"strings"

"github.com/elastic/beats/v7/libbeat/common"
"github.com/elastic/beats/v7/libbeat/processors"
"github.com/elastic/elastic-agent-libs/logp"
"github.com/elastic/elastic-agent-system-metrics/metric/system/cgroup"
"github.com/elastic/elastic-agent-system-metrics/metric/system/resolve"
)

const (
Expand All @@ -37,10 +37,9 @@ const (

type gosigarCidProvider struct {
log *logp.Logger
hostPath resolve.Resolver
cgroupPrefixes []string
cgroupRegex *regexp.Regexp
processCgroupPaths func(resolve.Resolver, int) (cgroup.PathList, error)
processCgroupPaths processors.CGReader
pidCidCache *common.Cache
}

Expand Down Expand Up @@ -70,10 +69,9 @@ func (p gosigarCidProvider) GetCid(pid int) (result string, err error) {
return cid, nil
}

func newCidProvider(hostPath resolve.Resolver, cgroupPrefixes []string, cgroupRegex *regexp.Regexp, processCgroupPaths func(resolve.Resolver, int) (cgroup.PathList, error), pidCidCache *common.Cache) gosigarCidProvider {
func newCidProvider(cgroupPrefixes []string, cgroupRegex *regexp.Regexp, processCgroupPaths processors.CGReader, pidCidCache *common.Cache) gosigarCidProvider {
return gosigarCidProvider{
log: logp.NewLogger(providerName),
hostPath: hostPath,
cgroupPrefixes: cgroupPrefixes,
cgroupRegex: cgroupRegex,
processCgroupPaths: processCgroupPaths,
Expand All @@ -84,7 +82,7 @@ func newCidProvider(hostPath resolve.Resolver, cgroupPrefixes []string, cgroupRe
// getProcessCgroups returns a mapping of cgroup subsystem name to path. It
// returns an error if it failed to retrieve the cgroup info.
func (p gosigarCidProvider) getProcessCgroups(pid int) (cgroup.PathList, error) {
pathList, err := p.processCgroupPaths(p.hostPath, pid)
pathList, err := p.processCgroupPaths.ProcessCgroupPaths(pid)
if err != nil {
var pathError *fs.PathError
if errors.As(err, &pathError) {
Expand Down
32 changes: 32 additions & 0 deletions libbeat/processors/cgroups.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
// Licensed to Elasticsearch B.V. under one or more contributor
// license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright
// ownership. Elasticsearch B.V. licenses this file to you under
// the Apache License, Version 2.0 (the "License"); you may
// not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

package processors

import (
"github.com/elastic/elastic-agent-system-metrics/metric/system/cgroup"
"github.com/elastic/elastic-agent-system-metrics/metric/system/resolve"
)

// InitCgroupHandler is a type for creating stubs for the cgroup resolver. Used primarily for testing.
type InitCgroupHandler = func(rootfsMountpoint resolve.Resolver, ignoreRootCgroups bool) (CGReader, error)

// CGReader wraps the group Reader.ProcessCgroupPaths() call, this allows us to
// set different cgroups readers for testing.
type CGReader interface {
ProcessCgroupPaths(pid int) (cgroup.PathList, error)
}

0 comments on commit 13d71a6

Please sign in to comment.