Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Cherry-pick #15947 to 7.x: enrich container id from process id #17235

Merged
merged 1 commit into from
Mar 25, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.next.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -218,6 +218,7 @@ https://github.com/elastic/beats/compare/v7.0.0-alpha2...master[Check the HEAD d
- Add `add_cloudfoundry_metadata` processor to annotate events with Cloud Foundry application data. {pull}16621[16621]
- Add `translate_sid` processor on Windows for converting Windows security identifier (SID) values to names. {issue}7451[7451] {pull}16013[16013]
- Add support for kubernetes provider to recognize namespace level defaults {pull}16321[16321]
- Add capability of enrich `container.id` with process id in `add_process_metadata` processor {pull}15947[15947]
- Update RPM packages contained in Beat Docker images. {issue}17035[17035]
- Add Kerberos support to Kafka input and output. {pull}16781[16781]

Expand Down
85 changes: 74 additions & 11 deletions libbeat/processors/add_process_metadata/add_process_metadata.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,11 +30,13 @@ import (
"github.com/elastic/beats/v7/libbeat/logp"
"github.com/elastic/beats/v7/libbeat/processors"
jsprocessor "github.com/elastic/beats/v7/libbeat/processors/script/javascript/module/processor"
"github.com/elastic/gosigar/cgroup"
)

const (
processorName = "add_process_metadata"
cacheExpiration = time.Second * 30
processorName = "add_process_metadata"
cacheExpiration = time.Second * 30
containerIDMapping = "container.id"
)

var (
Expand All @@ -47,14 +49,17 @@ var (

procCache = newProcessCache(cacheExpiration, gosysinfoProvider{})

processCgroupPaths = cgroup.ProcessCgroupPaths

instanceID atomic.Uint32
)

type addProcessMetadata struct {
config config
provider processMetadataProvider
log *logp.Logger
mappings common.MapStr
config config
provider processMetadataProvider
cidProvider cidProvider
log *logp.Logger
mappings common.MapStr
}

type processMetadata struct {
Expand All @@ -71,6 +76,10 @@ type processMetadataProvider interface {
GetProcessMetadata(pid int) (*processMetadata, error)
}

type cidProvider interface {
GetCid(pid int) (string, error)
}

func init() {
processors.RegisterPlugin(processorName, New)
jsprocessor.RegisterPlugin("AddProcessMetadata", New)
Expand All @@ -93,18 +102,50 @@ func newProcessMetadataProcessorWithProvider(cfg *common.Config, provider proces
return nil, errors.Wrapf(err, "fail to unpack the %v configuration", processorName)
}

p := addProcessMetadata{
mappings, err := config.getMappings()

if err != nil {
return nil, errors.Wrapf(err, "error unpacking %v.target_fields", processorName)
}

var p addProcessMetadata

p = addProcessMetadata{
config: config,
provider: provider,
log: log,
mappings: mappings,
}
if p.mappings, err = config.getMappings(); err != nil {
return nil, errors.Wrapf(err, "error unpacking %v.target_fields", processorName)
// don't use cgroup.ProcessCgroupPaths to save it from doing the work when container id disabled
if ok := containsValue(mappings, "container.id"); ok {
if config.CgroupCacheExpireTime != 0 {
p.log.Debug("Initializing cgroup cache")
evictionListener := func(k common.Key, v common.Value) {
p.log.Debugf("Evicted cached cgroups for PID=%v", k)
}

cgroupsCache := common.NewCacheWithRemovalListener(config.CgroupCacheExpireTime, 100, evictionListener)
cgroupsCache.StartJanitor(config.CgroupCacheExpireTime)
p.cidProvider = newCidProvider(config.HostPath, config.CgroupPrefixes, processCgroupPaths, cgroupsCache)
} else {
p.cidProvider = newCidProvider(config.HostPath, config.CgroupPrefixes, processCgroupPaths, nil)
}

}

return &p, nil
}

// check if the value exist in mapping
func containsValue(m common.MapStr, v string) bool {
for _, x := range m {
if x == v {
return true
}
}
return false
}

// Run enriches the given event with the host meta data
func (p *addProcessMetadata) Run(event *beat.Event) (*beat.Event, error) {
for _, pidField := range p.config.MatchPIDs {
Expand Down Expand Up @@ -156,6 +197,10 @@ func (p *addProcessMetadata) enrich(event common.MapStr, pidField string) (resul
}
meta := metaPtr.fields

if err = p.enrichContainerID(pid, meta); err != nil {
return nil, err
}

result = event.Clone()
for dest, sourceIf := range p.mappings {
source, castOk := sourceIf.(string)
Expand All @@ -168,23 +213,41 @@ func (p *addProcessMetadata) enrich(event common.MapStr, pidField string) (resul
return nil, errors.Errorf("target field '%s' already exists and overwrite_keys is false", dest)
}
}

value, err := meta.GetValue(source)
if err != nil {
// Should never happen
return nil, err
}

if _, err = result.Put(dest, value); err != nil {
return nil, err
}
}

return result, nil
}

// enrichContainerID adds container.id into meta for mapping to pickup
func (p *addProcessMetadata) enrichContainerID(pid int, meta common.MapStr) error {
if p.cidProvider == nil {
return nil
}
cid, err := p.cidProvider.GetCid(pid)
if err != nil {
return err
}
if _, err = meta.Put("container", common.MapStr{"id": cid}); err != nil {
return err
}
return nil
}

// String returns the processor representation formatted as a string
func (p *addProcessMetadata) String() string {
return fmt.Sprintf("%v=[match_pids=%v, mappings=%v, ignore_missing=%v, overwrite_fields=%v, restricted_fields=%v]",
return fmt.Sprintf("%v=[match_pids=%v, mappings=%v, ignore_missing=%v, overwrite_fields=%v, restricted_fields=%v, host_path=%v, cgroup_prefixes=%v]",
processorName, p.config.MatchPIDs, p.mappings, p.config.IgnoreMissing,
p.config.OverwriteKeys, p.config.RestrictedFields)
p.config.OverwriteKeys, p.config.RestrictedFields, p.config.HostPath, p.config.CgroupPrefixes)
}

func (p *processMetadata) toMap() common.MapStr {
Expand Down
Loading