Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[8.x](backport #41108) Fix cgroup helper init in add_processor_metadata and add_docker_metadata processors #41185

Merged
merged 2 commits into from
Oct 11, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
28 changes: 19 additions & 9 deletions libbeat/processors/add_docker_metadata/add_docker_metadata.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,9 +47,11 @@ const (
cgroupCacheExpiration = 5 * time.Minute
)

// processGroupPaths returns the cgroups associated with a process. This enables
// initCgroupPaths initializes a new cgroup reader. This enables
// unit testing by allowing us to stub the OS interface.
var processCgroupPaths = cgroup.ProcessCgroupPaths
var initCgroupPaths processors.InitCgroupHandler = func(rootfsMountpoint resolve.Resolver, ignoreRootCgroups bool) (processors.CGReader, error) {
return cgroup.NewReader(rootfsMountpoint, ignoreRootCgroups)
}

func init() {
processors.RegisterPlugin(processorName, New)
Expand All @@ -61,11 +63,11 @@ type addDockerMetadata struct {
fields []string
sourceProcessor beat.Processor

pidFields []string // Field names that contain PIDs.
cgroups *common.Cache // Cache of PID (int) to cgropus (map[string]string).
hostFS resolve.Resolver // Directory where /proc is found
dedot bool // If set to true, replace dots in labels with `_`.
dockerAvailable bool // If Docker exists in env, then it is set to true
pidFields []string // Field names that contain PIDs.
cgroups *common.Cache // Cache of PID (int) to cgropus (map[string]string).
dedot bool // If set to true, replace dots in labels with `_`.
dockerAvailable bool // If Docker exists in env, then it is set to true
cgreader processors.CGReader
}

const selector = "add_docker_metadata"
Expand Down Expand Up @@ -110,15 +112,20 @@ func buildDockerMetadataProcessor(log *logp.Logger, cfg *conf.C, watcherConstruc
}
}

reader, err := initCgroupPaths(resolve.NewTestResolver(config.HostFS), false)
if err != nil && !errors.Is(err, cgroup.ErrCgroupsMissing) {
return nil, fmt.Errorf("error creating cgroup reader: %w", err)
}

return &addDockerMetadata{
log: log,
watcher: watcher,
fields: config.Fields,
sourceProcessor: sourceProcessor,
pidFields: config.MatchPIDs,
hostFS: resolve.NewTestResolver(config.HostFS),
dedot: config.DeDot,
dockerAvailable: dockerAvailable,
cgreader: reader,
}, nil
}

Expand Down Expand Up @@ -277,7 +284,10 @@ func (d *addDockerMetadata) getProcessCgroups(pid int) (cgroup.PathList, error)
return cgroups, nil
}

cgroups, err := processCgroupPaths(d.hostFS, pid)
if d.cgreader == nil {
return cgroups, fs.ErrNotExist
}
cgroups, err := d.cgreader.ProcessCgroupPaths(pid)
if err != nil {
return cgroups, fmt.Errorf("failed to read cgroups for pid=%v: %w", pid, err)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,10 @@ import (
"testing"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"

"github.com/elastic/beats/v7/libbeat/beat"
"github.com/elastic/beats/v7/libbeat/processors"
"github.com/elastic/elastic-agent-autodiscover/bus"
"github.com/elastic/elastic-agent-autodiscover/docker"
"github.com/elastic/elastic-agent-libs/config"
Expand All @@ -37,30 +39,62 @@ import (
"github.com/elastic/elastic-agent-system-metrics/metric/system/resolve"
)

type testCGReader struct {
}

func (r testCGReader) ProcessCgroupPaths(pid int) (cgroup.PathList, error) {
switch pid {
case 1000:
return cgroup.PathList{
V1: map[string]cgroup.ControllerPath{
"cpu": {ControllerPath: "/docker/8c147fdfab5a2608fe513d10294bf77cb502a231da9725093a155bd25cd1f14b", IsV2: false},
},
}, nil
case 2000:
return cgroup.PathList{
V1: map[string]cgroup.ControllerPath{
"memory": {ControllerPath: "/user.slice", IsV2: false},
},
}, nil
case 3000:
// Parser error (hopefully this never happens).
return cgroup.PathList{}, fmt.Errorf("cgroup parse failure")
default:
return cgroup.PathList{}, os.ErrNotExist
}
}

func init() {
// Stub out the procfs.
processCgroupPaths = func(_ resolve.Resolver, pid int) (cgroup.PathList, error) {
initCgroupPaths = func(_ resolve.Resolver, _ bool) (processors.CGReader, error) {
return testCGReader{}, nil
}
}

switch pid {
case 1000:
return cgroup.PathList{
V1: map[string]cgroup.ControllerPath{
"cpu": {ControllerPath: "/docker/8c147fdfab5a2608fe513d10294bf77cb502a231da9725093a155bd25cd1f14b", IsV2: false},
},
}, nil
case 2000:
return cgroup.PathList{
V1: map[string]cgroup.ControllerPath{
"memory": {ControllerPath: "/user.slice", IsV2: false},
},
}, nil
case 3000:
// Parser error (hopefully this never happens).
return cgroup.PathList{}, fmt.Errorf("cgroup parse failure")
default:
return cgroup.PathList{}, os.ErrNotExist
}
func TestDefaultProcessorStartup(t *testing.T) {
// set initCgroupPaths to system non-test defaults
initCgroupPaths = func(rootfsMountpoint resolve.Resolver, ignoreRootCgroups bool) (processors.CGReader, error) {
return cgroup.NewReader(rootfsMountpoint, ignoreRootCgroups)
}

defer func() {
initCgroupPaths = func(_ resolve.Resolver, _ bool) (processors.CGReader, error) {
return testCGReader{}, nil
}
}()

rawCfg := defaultConfig()
cfg, err := config.NewConfigFrom(rawCfg)
require.NoError(t, err)

proc, err := buildDockerMetadataProcessor(logp.L(), cfg, docker.NewWatcher)
require.NoError(t, err)

unwrapped, _ := proc.(*addDockerMetadata)

// make sure pid readers have been initialized properly
_, err = unwrapped.getProcessCgroups(os.Getpid())
require.NoError(t, err)
}

func TestInitializationNoDocker(t *testing.T) {
Expand Down
14 changes: 11 additions & 3 deletions libbeat/processors/add_process_metadata/add_process_metadata.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,10 @@ var (

procCache = newProcessCache(cacheExpiration, cacheCapacity, cacheEvictionEffort, gosysinfoProvider{})

processCgroupPaths = cgroup.ProcessCgroupPaths
// cgroups resolver, turned to a stub function to make testing easier.
initCgroupPaths processors.InitCgroupHandler = func(rootfsMountpoint resolve.Resolver, ignoreRootCgroups bool) (processors.CGReader, error) {
return cgroup.NewReader(rootfsMountpoint, ignoreRootCgroups)
}

instanceID atomic.Uint32
)
Expand Down Expand Up @@ -160,6 +163,11 @@ func newProcessMetadataProcessorWithProvider(config config, provider processMeta
}
}

reader, err := initCgroupPaths(resolve.NewTestResolver(config.HostPath), false)
if err != nil && !errors.Is(err, cgroup.ErrCgroupsMissing) {
return nil, fmt.Errorf("error creating cgroup reader: %w", err)
}

// don't use cgroup.ProcessCgroupPaths to save it from doing the work when container id disabled
if ok := containsValue(mappings, "container.id"); ok {
if withCache && config.CgroupCacheExpireTime != 0 {
Expand All @@ -170,9 +178,9 @@ func newProcessMetadataProcessorWithProvider(config config, provider processMeta

p.cgroupsCache = common.NewCacheWithRemovalListener(config.CgroupCacheExpireTime, 100, evictionListener)
p.cgroupsCache.StartJanitor(config.CgroupCacheExpireTime)
p.cidProvider = newCidProvider(resolve.NewTestResolver(config.HostPath), config.CgroupPrefixes, config.CgroupRegex, processCgroupPaths, p.cgroupsCache)
p.cidProvider = newCidProvider(config.CgroupPrefixes, config.CgroupRegex, reader, p.cgroupsCache)
} else {
p.cidProvider = newCidProvider(resolve.NewTestResolver(config.HostPath), config.CgroupPrefixes, config.CgroupRegex, processCgroupPaths, nil)
p.cidProvider = newCidProvider(config.CgroupPrefixes, config.CgroupRegex, reader, nil)
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,16 +28,48 @@ import (
"unsafe"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"

"github.com/elastic/beats/v7/libbeat/beat"
"github.com/elastic/beats/v7/libbeat/common/capabilities"
"github.com/elastic/beats/v7/libbeat/processors"
conf "github.com/elastic/elastic-agent-libs/config"
"github.com/elastic/elastic-agent-libs/logp"
"github.com/elastic/elastic-agent-libs/mapstr"
"github.com/elastic/elastic-agent-system-metrics/metric/system/cgroup"
"github.com/elastic/elastic-agent-system-metrics/metric/system/resolve"
)

type testCGRsolver struct {
res func(pid int) (cgroup.PathList, error)
}

func (t testCGRsolver) ProcessCgroupPaths(pid int) (cgroup.PathList, error) {
return t.res(pid)
}

func newCGHandlerBuilder(handler testCGRsolver) processors.InitCgroupHandler {
return func(_ resolve.Resolver, _ bool) (processors.CGReader, error) {
return handler, nil
}
}

func TestDefaultProcessorStartup(t *testing.T) {
// set initCgroupPaths to system non-test defaults
initCgroupPaths = func(rootfsMountpoint resolve.Resolver, ignoreRootCgroups bool) (processors.CGReader, error) {
return cgroup.NewReader(rootfsMountpoint, ignoreRootCgroups)
}

proc, err := newProcessMetadataProcessorWithProvider(defaultConfig(), &procCache, false)
require.NoError(t, err)

// ensure the underlying provider has been initialized properly
unwrapped, _ := proc.(*addProcessMetadata)
metadata, err := unwrapped.provider.GetProcessMetadata(os.Getpid())
require.NoError(t, err)
require.NotNil(t, metadata)
}

func TestAddProcessMetadata(t *testing.T) {
logp.TestingSetup(logp.WithSelectors(processorName))

Expand Down Expand Up @@ -90,7 +122,7 @@ func TestAddProcessMetadata(t *testing.T) {
}

// mock of the cgroup processCgroupPaths
processCgroupPaths = func(_ resolve.Resolver, pid int) (cgroup.PathList, error) {
processCgroupPaths := func(pid int) (cgroup.PathList, error) {
testMap := map[int]cgroup.PathList{
1: {
V1: map[string]cgroup.ControllerPath{
Expand Down Expand Up @@ -135,6 +167,7 @@ func TestAddProcessMetadata(t *testing.T) {

return testMap[pid], nil
}
initCgroupPaths = newCGHandlerBuilder(testCGRsolver{res: processCgroupPaths})

for _, test := range []struct {
description string
Expand Down Expand Up @@ -884,7 +917,7 @@ func TestUsingCache(t *testing.T) {
selfPID := os.Getpid()

// mock of the cgroup processCgroupPaths
processCgroupPaths = func(_ resolve.Resolver, pid int) (cgroup.PathList, error) {
processCgroupPaths := func(pid int) (cgroup.PathList, error) {
testStruct := cgroup.PathList{
V1: map[string]cgroup.ControllerPath{
"cpu": {ControllerPath: "/kubepods/besteffort/pod665fb997-575b-11ea-bfce-080027421ddf/b5285682fba7449c86452b89a800609440ecc88a7ba5f2d38bedfb85409b30b1"},
Expand All @@ -909,7 +942,7 @@ func TestUsingCache(t *testing.T) {
// testMap :=
return testMap[pid], nil
}

initCgroupPaths = newCGHandlerBuilder(testCGRsolver{res: processCgroupPaths})
config, err := conf.NewConfigFrom(mapstr.M{
"match_pids": []string{"system.process.ppid"},
"include_fields": []string{"container.id", "process.env"},
Expand Down Expand Up @@ -1202,15 +1235,17 @@ func TestPIDToInt(t *testing.T) {
}

func TestV2CID(t *testing.T) {
processCgroupPaths = func(_ resolve.Resolver, _ int) (cgroup.PathList, error) {
processCgroupPaths := func(_ int) (cgroup.PathList, error) {
testMap := cgroup.PathList{
V1: map[string]cgroup.ControllerPath{
"cpu": {IsV2: true, ControllerPath: "system.slice/docker-2dcbab615aebfa9313feffc5cfdacd381543cfa04c6be3f39ac656e55ef34805.scope"},
},
}
return testMap, nil
}
provider := newCidProvider(resolve.NewTestResolver(""), nil, defaultCgroupRegex, processCgroupPaths, nil)
resolver := testCGRsolver{res: processCgroupPaths}
initCgroupPaths = newCGHandlerBuilder(resolver)
provider := newCidProvider(nil, defaultCgroupRegex, resolver, nil)
result, err := provider.GetCid(1)
assert.NoError(t, err)
assert.Equal(t, "2dcbab615aebfa9313feffc5cfdacd381543cfa04c6be3f39ac656e55ef34805", result)
Expand Down
23 changes: 8 additions & 15 deletions libbeat/processors/add_process_metadata/gosigar_cid_provider.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,17 +18,15 @@
package add_process_metadata

import (
"errors"
"fmt"
"io/fs"
"path/filepath"
"regexp"
"strings"

"github.com/elastic/beats/v7/libbeat/common"
"github.com/elastic/beats/v7/libbeat/processors"
"github.com/elastic/elastic-agent-libs/logp"
"github.com/elastic/elastic-agent-system-metrics/metric/system/cgroup"
"github.com/elastic/elastic-agent-system-metrics/metric/system/resolve"
)

const (
Expand All @@ -37,10 +35,9 @@ const (

type gosigarCidProvider struct {
log *logp.Logger
hostPath resolve.Resolver
cgroupPrefixes []string
cgroupRegex *regexp.Regexp
processCgroupPaths func(resolve.Resolver, int) (cgroup.PathList, error)
processCgroupPaths processors.CGReader
pidCidCache *common.Cache
}

Expand Down Expand Up @@ -70,10 +67,9 @@ func (p gosigarCidProvider) GetCid(pid int) (result string, err error) {
return cid, nil
}

func newCidProvider(hostPath resolve.Resolver, cgroupPrefixes []string, cgroupRegex *regexp.Regexp, processCgroupPaths func(resolve.Resolver, int) (cgroup.PathList, error), pidCidCache *common.Cache) gosigarCidProvider {
func newCidProvider(cgroupPrefixes []string, cgroupRegex *regexp.Regexp, processCgroupPaths processors.CGReader, pidCidCache *common.Cache) gosigarCidProvider {
return gosigarCidProvider{
log: logp.NewLogger(providerName),
hostPath: hostPath,
cgroupPrefixes: cgroupPrefixes,
cgroupRegex: cgroupRegex,
processCgroupPaths: processCgroupPaths,
Expand All @@ -84,15 +80,12 @@ func newCidProvider(hostPath resolve.Resolver, cgroupPrefixes []string, cgroupRe
// getProcessCgroups returns a mapping of cgroup subsystem name to path. It
// returns an error if it failed to retrieve the cgroup info.
func (p gosigarCidProvider) getProcessCgroups(pid int) (cgroup.PathList, error) {
pathList, err := p.processCgroupPaths(p.hostPath, pid)
//return nil if we aren't supporting cgroups
if p.processCgroupPaths == nil {
return cgroup.PathList{}, nil
}
pathList, err := p.processCgroupPaths.ProcessCgroupPaths(pid)
if err != nil {
var pathError *fs.PathError
if errors.As(err, &pathError) {
// do no thing when err is nil or when os.PathError happens because the process don't exist,
// or not running in linux system
return cgroup.PathList{}, nil
}
// should never happen
return cgroup.PathList{}, fmt.Errorf("failed to read cgroups for pid=%v: %w", pid, err)
}

Expand Down
32 changes: 32 additions & 0 deletions libbeat/processors/cgroups.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
// Licensed to Elasticsearch B.V. under one or more contributor
// license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright
// ownership. Elasticsearch B.V. licenses this file to you under
// the Apache License, Version 2.0 (the "License"); you may
// not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

package processors

import (
"github.com/elastic/elastic-agent-system-metrics/metric/system/cgroup"
"github.com/elastic/elastic-agent-system-metrics/metric/system/resolve"
)

// InitCgroupHandler is a type for creating stubs for the cgroup resolver. Used primarily for testing.
type InitCgroupHandler = func(rootfsMountpoint resolve.Resolver, ignoreRootCgroups bool) (CGReader, error)

// CGReader wraps the group Reader.ProcessCgroupPaths() call, this allows us to
// set different cgroups readers for testing.
type CGReader interface {
ProcessCgroupPaths(pid int) (cgroup.PathList, error)
}
Loading