diff --git a/CHANGELOG.asciidoc b/CHANGELOG.asciidoc index bca8a50833c4..3a6793fdbc6b 100644 --- a/CHANGELOG.asciidoc +++ b/CHANGELOG.asciidoc @@ -40,7 +40,7 @@ https://github.com/elastic/beats/compare/v6.4.0...6.x[Check the HEAD diff] - Deregister pipeline loader callback when inputsRunner is stopped. {pull}[7893][7893] - Add backoff support to x-pack monitoring outputs. {issue}7966[7966] - Removed execute permissions systemd unit file. {pull}7873[7873] -- Fix a race condition with the `add_host_metadata` and the event serialization. {pull}8223[8223] +- Fix a race condition with the `add_host_metadata` and the event serialization. {pull}8223[8223] {pull}8653[8653] - Enforce that data used by k8s or docker doesn't use any reference. {pull}8240[8240] - Switch to different UUID lib due to to non-random generated UUIDs. {pull}8485[8485] - Fix race condition when publishing monitoring data. {pull}8646[8646] diff --git a/libbeat/processors/add_host_metadata/add_host_metadata.go b/libbeat/processors/add_host_metadata/add_host_metadata.go index f9597cb79f2d..5127d8b7e531 100644 --- a/libbeat/processors/add_host_metadata/add_host_metadata.go +++ b/libbeat/processors/add_host_metadata/add_host_metadata.go @@ -20,6 +20,7 @@ package add_host_metadata import ( "fmt" "net" + "sync" "time" "github.com/joeshaw/multierror" @@ -40,9 +41,12 @@ func init() { type addHostMetadata struct { info types.HostInfo - lastUpdate time.Time - data common.MapStr - config Config + lastUpdate struct { + time.Time + sync.Mutex + } + data common.MapStrPointer + config Config } const ( @@ -63,42 +67,55 @@ func newHostMetadataProcessor(cfg *common.Config) (processors.Processor, error) p := &addHostMetadata{ info: h.Info(), config: config, + data: common.NewMapStrPointer(nil), } + p.loadData() return p, nil } // Run enriches the given event with the host meta data func (p *addHostMetadata) Run(event *beat.Event) (*beat.Event, error) { p.loadData() - event.Fields.DeepUpdate(p.data.Clone()) + event.Fields.DeepUpdate(p.data.Get().Clone()) return event, nil } -func (p *addHostMetadata) loadData() { +func (p *addHostMetadata) expired() bool { + p.lastUpdate.Lock() + defer p.lastUpdate.Unlock() - // Check if cache is expired - if p.lastUpdate.Add(cacheExpiration).Before(time.Now()) { - p.data = host.MapHostInfo(p.info) + if p.lastUpdate.Add(cacheExpiration).After(time.Now()) { + return false + } + p.lastUpdate.Time = time.Now() + return true +} - if p.config.NetInfoEnabled { - // IP-address and MAC-address - var ipList, hwList, err = p.getNetInfo() - if err != nil { - logp.Info("Error when getting network information %v", err) - } +func (p *addHostMetadata) loadData() { + if !p.expired() { + return + } - if len(ipList) > 0 { - p.data.Put("host.ip", ipList) - } - if len(hwList) > 0 { - p.data.Put("host.mac", hwList) - } + data := host.MapHostInfo(p.info) + if p.config.NetInfoEnabled { + // IP-address and MAC-address + var ipList, hwList, err = p.getNetInfo() + if err != nil { + logp.Info("Error when getting network information %v", err) + } + + if len(ipList) > 0 { + data.Put("host.ip", ipList) + } + if len(hwList) > 0 { + data.Put("host.mac", hwList) } - p.lastUpdate = time.Now() } + + p.data.Set(data) } -func (p addHostMetadata) getNetInfo() ([]string, []string, error) { +func (p *addHostMetadata) getNetInfo() ([]string, []string, error) { var ipList []string var hwList []string @@ -143,7 +160,7 @@ func (p addHostMetadata) getNetInfo() ([]string, []string, error) { return ipList, hwList, errs.Err() } -func (p addHostMetadata) String() string { +func (p *addHostMetadata) String() string { return fmt.Sprintf("%v=[netinfo.enabled=[%v]]", processorName, p.config.NetInfoEnabled) }