From ab064172d71a07792bc4ffaf06a284afb5e17006 Mon Sep 17 00:00:00 2001 From: Steffen Siering Date: Wed, 6 Dec 2017 04:27:24 +0100 Subject: [PATCH] Fix isolation of modules when merging local and global field settings. (#5808) The user global and local fields configurations must be merged on pipeline setup. The DeepUpdate method did only keep references to the globally shared fields, such that local fields settings do overwrite the shared global state. If more then one beat.Client instance did set local fields, the global state (and all fields added to an event) will be given by the last client being instantiated. The fix Clones the global fields before merging, such that the beat.Client fields operate fully in isolation. (cherry picked from commit 08eb2f5af63625dee4650370537d86e3fb004f32) --- CHANGELOG.asciidoc | 2 + libbeat/publisher/pipeline/pipeline.go | 6 +- libbeat/publisher/pipeline/processor.go | 14 +- libbeat/publisher/pipeline/processor_test.go | 322 +++++++++++++++++++ 4 files changed, 334 insertions(+), 10 deletions(-) create mode 100644 libbeat/publisher/pipeline/processor_test.go diff --git a/CHANGELOG.asciidoc b/CHANGELOG.asciidoc index 36fa17e509bb..511d5baa8d31 100644 --- a/CHANGELOG.asciidoc +++ b/CHANGELOG.asciidoc @@ -30,6 +30,8 @@ https://github.com/elastic/beats/compare/v6.0.1...6.0[Check the HEAD diff] *Affecting all Beats* +- Fix isolation of modules when merging local and global field settings. {issue}5795[5795] + *Auditbeat* *Filebeat* diff --git a/libbeat/publisher/pipeline/pipeline.go b/libbeat/publisher/pipeline/pipeline.go index 18cafa356cff..10bf800c8897 100644 --- a/libbeat/publisher/pipeline/pipeline.go +++ b/libbeat/publisher/pipeline/pipeline.go @@ -68,7 +68,8 @@ type pipelineProcessors struct { processors beat.Processor - disabled bool // disabled is set if outputs have been disabled via CLI + disabled bool // disabled is set if outputs have been disabled via CLI + alwaysCopy bool } // Settings is used to pass additional settings to a newly created pipeline instance. @@ -300,8 +301,7 @@ func (p *Pipeline) ConnectWith(cfg beat.ClientConfig) (beat.Client, error) { } } - processors := p.newProcessorPipeline(cfg) - + processors := newProcessorPipeline(p.beatInfo, p.processors, cfg) acker := p.makeACKer(processors != nil, &cfg, waitClose) producerCfg := queue.ProducerConfig{ // only cancel events from queue if acker is configured diff --git a/libbeat/publisher/pipeline/processor.go b/libbeat/publisher/pipeline/processor.go index 2c8d3f245fa1..3b8f9cd8a051 100644 --- a/libbeat/publisher/pipeline/processor.go +++ b/libbeat/publisher/pipeline/processor.go @@ -39,7 +39,9 @@ type processorFn struct { // 8. (P) pipeline processors list // 9. (P) (if publish/debug enabled) log event // 10. (P) (if output disabled) dropEvent -func (p *Pipeline) newProcessorPipeline( +func newProcessorPipeline( + info beat.Info, + global pipelineProcessors, config beat.ClientConfig, ) beat.Processor { var ( @@ -49,12 +51,9 @@ func (p *Pipeline) newProcessorPipeline( // client fields and metadata clientMeta = config.Meta localProcessors = makeClientProcessors(config) - - // pipeline global - global = p.processors ) - needsCopy := localProcessors != nil || global.processors != nil + needsCopy := global.alwaysCopy || localProcessors != nil || global.processors != nil // setup 1: generalize/normalize output (P) processors.add(generalizeProcessor) @@ -74,10 +73,11 @@ func (p *Pipeline) newProcessorPipeline( // setup 3, 4, 5: client config fields + pipeline fields + client fields fields := config.Fields.Clone() - fields.DeepUpdate(global.fields) + fields.DeepUpdate(global.fields.Clone()) if em := config.EventMetadata; len(em.Fields) > 0 { common.MergeFields(fields, em.Fields.Clone(), em.FieldsUnderRoot) } + if len(fields) > 0 { processors.add(makeAddFieldsProcessor("fields", fields, needsCopy)) } @@ -95,7 +95,7 @@ func (p *Pipeline) newProcessorPipeline( // setup 9: debug print final event (P) if logp.IsDebug("publish") { - processors.add(debugPrintProcessor(p.beatInfo)) + processors.add(debugPrintProcessor(info)) } // setup 10: drop all events if outputs are disabled (P) diff --git a/libbeat/publisher/pipeline/processor_test.go b/libbeat/publisher/pipeline/processor_test.go new file mode 100644 index 000000000000..d35aadf7ce97 --- /dev/null +++ b/libbeat/publisher/pipeline/processor_test.go @@ -0,0 +1,322 @@ +package pipeline + +import ( + "sync" + "testing" + "time" + + "github.com/stretchr/testify/assert" + + "github.com/elastic/beats/libbeat/beat" + "github.com/elastic/beats/libbeat/common" +) + +func TestProcessors(t *testing.T) { + info := beat.Info{} + + type local struct { + config beat.ClientConfig + events []common.MapStr + expected []common.MapStr + } + + tests := []struct { + name string + global pipelineProcessors + local []local + }{ + { + "user global fields and tags", + pipelineProcessors{ + fields: common.MapStr{"global": 1}, + tags: []string{"tag"}, + }, + []local{ + { + config: beat.ClientConfig{}, + events: []common.MapStr{{"value": "abc"}}, + expected: []common.MapStr{ + {"value": "abc", "global": 1, "tags": []string{"tag"}}, + }, + }, + }, + }, + { + "beat local fields", + pipelineProcessors{}, + []local{ + { + config: beat.ClientConfig{ + Fields: common.MapStr{"local": 1}, + }, + events: []common.MapStr{{"value": "abc"}}, + expected: []common.MapStr{{"value": "abc", "local": 1}}, + }, + }, + }, + { + "beat local and user global fields", + pipelineProcessors{ + fields: common.MapStr{"global": 1}, + tags: []string{"tag"}, + }, + []local{ + { + config: beat.ClientConfig{ + Fields: common.MapStr{"local": 1}, + }, + events: []common.MapStr{{"value": "abc"}}, + expected: []common.MapStr{ + {"value": "abc", "local": 1, "global": 1, "tags": []string{"tag"}}, + }, + }, + }, + }, + { + "user global fields overwrite beat local fields", + pipelineProcessors{ + fields: common.MapStr{"global": 1, "shared": "global"}, + tags: []string{"tag"}, + }, + []local{ + { + config: beat.ClientConfig{ + Fields: common.MapStr{"local": 1, "shared": "local"}, + }, + events: []common.MapStr{{"value": "abc"}}, + expected: []common.MapStr{ + {"value": "abc", "local": 1, "global": 1, "shared": "global", "tags": []string{"tag"}}, + }, + }, + }, + }, + { + "beat local fields isolated", + pipelineProcessors{}, + []local{ + { + config: beat.ClientConfig{ + Fields: common.MapStr{"local": 1}, + }, + events: []common.MapStr{{"value": "abc"}}, + expected: []common.MapStr{{"value": "abc", "local": 1}}, + }, + { + config: beat.ClientConfig{ + Fields: common.MapStr{"local": 2}, + }, + events: []common.MapStr{{"value": "def"}}, + expected: []common.MapStr{{"value": "def", "local": 2}}, + }, + }, + }, + + { + "beat local fields + user global fields isolated", + pipelineProcessors{ + fields: common.MapStr{"global": 0}, + }, + []local{ + { + config: beat.ClientConfig{ + Fields: common.MapStr{"local": 1}, + }, + events: []common.MapStr{{"value": "abc"}}, + expected: []common.MapStr{{"value": "abc", "global": 0, "local": 1}}, + }, + { + config: beat.ClientConfig{ + Fields: common.MapStr{"local": 2}, + }, + events: []common.MapStr{{"value": "def"}}, + expected: []common.MapStr{{"value": "def", "global": 0, "local": 2}}, + }, + }, + }, + { + "user local fields and tags", + pipelineProcessors{}, + []local{ + { + config: beat.ClientConfig{ + EventMetadata: common.EventMetadata{ + Fields: common.MapStr{"local": 1}, + Tags: []string{"tag"}, + }, + }, + events: []common.MapStr{{"value": "abc"}}, + expected: []common.MapStr{ + {"value": "abc", "fields": common.MapStr{"local": 1}, "tags": []string{"tag"}}, + }, + }, + }, + }, + { + "user local fields (under root) and tags", + pipelineProcessors{}, + []local{ + { + config: beat.ClientConfig{ + EventMetadata: common.EventMetadata{ + Fields: common.MapStr{"local": 1}, + FieldsUnderRoot: true, + Tags: []string{"tag"}, + }, + }, + events: []common.MapStr{{"value": "abc"}}, + expected: []common.MapStr{ + {"value": "abc", "local": 1, "tags": []string{"tag"}}, + }, + }, + }, + }, + { + "user local fields overwrite user global fields", + pipelineProcessors{ + fields: common.MapStr{"global": 0, "shared": "global"}, + tags: []string{"global"}, + }, + []local{ + { + config: beat.ClientConfig{ + EventMetadata: common.EventMetadata{ + Fields: common.MapStr{"local": 1, "shared": "local"}, + FieldsUnderRoot: true, + Tags: []string{"local"}, + }, + }, + events: []common.MapStr{{"value": "abc"}}, + expected: []common.MapStr{ + { + "value": "abc", + "global": 0, "local": 1, "shared": "local", + "tags": []string{"global", "local"}, + }, + }, + }, + }, + }, + { + "user local fields isolated", + pipelineProcessors{}, + []local{ + { + config: beat.ClientConfig{ + EventMetadata: common.EventMetadata{ + Fields: common.MapStr{"local": 1}, + }, + }, + events: []common.MapStr{{"value": "abc"}}, + expected: []common.MapStr{{"value": "abc", "fields": common.MapStr{"local": 1}}}, + }, + { + config: beat.ClientConfig{ + EventMetadata: common.EventMetadata{ + Fields: common.MapStr{"local": 2}, + }, + }, + events: []common.MapStr{{"value": "def"}}, + expected: []common.MapStr{{"value": "def", "fields": common.MapStr{"local": 2}}}, + }, + }, + }, + { + "user local + global fields isolated", + pipelineProcessors{ + fields: common.MapStr{"fields": common.MapStr{"global": 0}}, + }, + []local{ + { + config: beat.ClientConfig{ + EventMetadata: common.EventMetadata{ + Fields: common.MapStr{"local": 1}, + }, + }, + events: []common.MapStr{{"value": "abc"}}, + expected: []common.MapStr{{"value": "abc", "fields": common.MapStr{"global": 0, "local": 1}}}, + }, + { + config: beat.ClientConfig{ + EventMetadata: common.EventMetadata{ + Fields: common.MapStr{"local": 2}, + }, + }, + events: []common.MapStr{{"value": "def"}}, + expected: []common.MapStr{{"value": "def", "fields": common.MapStr{"global": 0, "local": 2}}}, + }, + }, + }, + { + "user local + global fields isolated (fields with root)", + pipelineProcessors{ + fields: common.MapStr{"global": 0}, + }, + []local{ + { + config: beat.ClientConfig{ + EventMetadata: common.EventMetadata{ + Fields: common.MapStr{"local": 1}, + FieldsUnderRoot: true, + }, + }, + events: []common.MapStr{{"value": "abc"}}, + expected: []common.MapStr{{"value": "abc", "global": 0, "local": 1}}, + }, + { + config: beat.ClientConfig{ + EventMetadata: common.EventMetadata{ + Fields: common.MapStr{"local": 2}, + FieldsUnderRoot: true, + }, + }, + events: []common.MapStr{{"value": "def"}}, + expected: []common.MapStr{{"value": "def", "global": 0, "local": 2}}, + }, + }, + }, + } + + for _, test := range tests { + test := test + t.Run(test.name, func(t *testing.T) { + // create processor pipelines + programs := make([]beat.Processor, len(test.local)) + for i, local := range test.local { + programs[i] = newProcessorPipeline(info, test.global, local.config) + } + + // run processor pipelines in parallel + var ( + wg sync.WaitGroup + mux sync.Mutex + results = make([][]common.MapStr, len(programs)) + ) + for id, local := range test.local { + wg.Add(1) + id, program, local := id, programs[id], local + go func() { + defer wg.Done() + + actual := make([]common.MapStr, len(local.events)) + for i, event := range local.events { + out, _ := program.Run(&beat.Event{ + Timestamp: time.Now(), + Fields: event, + }) + actual[i] = out.Fields + } + + mux.Lock() + defer mux.Unlock() + results[id] = actual + }() + } + wg.Wait() + + // validate + for i, local := range test.local { + assert.Equal(t, local.expected, results[i]) + } + }) + } +}