Skip to content

Commit

Permalink
Fix race on shared maps in global fields (#6947) (#7010)
Browse files Browse the repository at this point in the history
* Fix race on shared maps in global fields

On publish fields are added to an event in this order:
- local/global configured fields
- dynamic fields
- "beat" metadata

When merging the fields, shared structures must not be overwritten or
updated concurrently. This is enforced by cloning the original fields
structure before applying updates.

This adds missing Clone operations if configured fields add new
fields to the `beat` namespace or if dynamic fields are enabled.

* Remove hard coded bool from testing

(cherry picked from commit ffa812b)
  • Loading branch information
Steffen Siering authored and ruflin committed May 4, 2018
1 parent a3163f7 commit e3248ad
Show file tree
Hide file tree
Showing 2 changed files with 29 additions and 8 deletions.
2 changes: 2 additions & 0 deletions CHANGELOG.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,8 @@ https://github.com/elastic/beats/compare/v6.2.4...6.2[Check the HEAD diff]

*Affecting all Beats*

- Fix map overwrite panics by cloning shared structs before doing the update. {pull}6947[6947]

*Auditbeat*

*Filebeat*
Expand Down
35 changes: 27 additions & 8 deletions libbeat/publisher/pipeline/processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,11 +79,19 @@ func newProcessorPipeline(
}

if len(fields) > 0 {
processors.add(makeAddFieldsProcessor("fields", fields, needsCopy))
// Enforce a copy of fields if dynamic fields are configured or beats
// metadata will be merged into the fields.
// With dynamic fields potentially changing at any time, we need to copy,
// so we do not change shared structures be accident.
fieldsNeedsCopy := needsCopy || config.DynamicFields != nil || fields["beat"] != nil
processors.add(makeAddFieldsProcessor("fields", fields, fieldsNeedsCopy))
}

if config.DynamicFields != nil {
processors.add(makeAddDynMetaProcessor("dynamicFields", config.DynamicFields, needsCopy))
checkCopy := func(m common.MapStr) bool {
return needsCopy || hasKey(m, "beat")
}
processors.add(makeAddDynMetaProcessor("dynamicFields", config.DynamicFields, checkCopy))
}

// setup 5: client processor list
Expand Down Expand Up @@ -248,13 +256,19 @@ func makeAddFieldsProcessor(name string, fields common.MapStr, copy bool) *proce
return newAnnotateProcessor(name, fn)
}

func makeAddDynMetaProcessor(name string, meta *common.MapStrPointer, copy bool) *processorFn {
fn := func(event *beat.Event) { event.Fields.DeepUpdate(meta.Get()) }
if copy {
fn = func(event *beat.Event) { event.Fields.DeepUpdate(meta.Get().Clone()) }
}
func makeAddDynMetaProcessor(
name string,
meta *common.MapStrPointer,
checkCopy func(m common.MapStr) bool,
) *processorFn {
return newAnnotateProcessor(name, func(event *beat.Event) {
dynFields := meta.Get()
if checkCopy(dynFields) {
dynFields = dynFields.Clone()
}

return newAnnotateProcessor(name, fn)
event.Fields.DeepUpdate(dynFields)
})
}

func debugPrintProcessor(info beat.Info) *processorFn {
Expand Down Expand Up @@ -288,3 +302,8 @@ func makeClientProcessors(config beat.ClientConfig) processors.Processor {
list: procs.All(),
}
}

func hasKey(m common.MapStr, key string) bool {
_, exists := m[key]
return exists
}

0 comments on commit e3248ad

Please sign in to comment.