diff --git a/CHANGELOG.asciidoc b/CHANGELOG.asciidoc index 36fa17e509b..511d5baa8d3 100644 --- a/CHANGELOG.asciidoc +++ b/CHANGELOG.asciidoc @@ -30,6 +30,8 @@ https://github.com/elastic/beats/compare/v6.0.1...6.0[Check the HEAD diff] *Affecting all Beats* +- Fix isolation of modules when merging local and global field settings. {issue}5795[5795] + *Auditbeat* *Filebeat* diff --git a/libbeat/publisher/pipeline/pipeline.go b/libbeat/publisher/pipeline/pipeline.go index 18cafa356cf..10bf800c889 100644 --- a/libbeat/publisher/pipeline/pipeline.go +++ b/libbeat/publisher/pipeline/pipeline.go @@ -68,7 +68,8 @@ type pipelineProcessors struct { processors beat.Processor - disabled bool // disabled is set if outputs have been disabled via CLI + disabled bool // disabled is set if outputs have been disabled via CLI + alwaysCopy bool } // Settings is used to pass additional settings to a newly created pipeline instance. @@ -300,8 +301,7 @@ func (p *Pipeline) ConnectWith(cfg beat.ClientConfig) (beat.Client, error) { } } - processors := p.newProcessorPipeline(cfg) - + processors := newProcessorPipeline(p.beatInfo, p.processors, cfg) acker := p.makeACKer(processors != nil, &cfg, waitClose) producerCfg := queue.ProducerConfig{ // only cancel events from queue if acker is configured diff --git a/libbeat/publisher/pipeline/processor.go b/libbeat/publisher/pipeline/processor.go index 2c8d3f245fa..3b8f9cd8a05 100644 --- a/libbeat/publisher/pipeline/processor.go +++ b/libbeat/publisher/pipeline/processor.go @@ -39,7 +39,9 @@ type processorFn struct { // 8. (P) pipeline processors list // 9. (P) (if publish/debug enabled) log event // 10. (P) (if output disabled) dropEvent -func (p *Pipeline) newProcessorPipeline( +func newProcessorPipeline( + info beat.Info, + global pipelineProcessors, config beat.ClientConfig, ) beat.Processor { var ( @@ -49,12 +51,9 @@ func (p *Pipeline) newProcessorPipeline( // client fields and metadata clientMeta = config.Meta localProcessors = makeClientProcessors(config) - - // pipeline global - global = p.processors ) - needsCopy := localProcessors != nil || global.processors != nil + needsCopy := global.alwaysCopy || localProcessors != nil || global.processors != nil // setup 1: generalize/normalize output (P) processors.add(generalizeProcessor) @@ -74,10 +73,11 @@ func (p *Pipeline) newProcessorPipeline( // setup 3, 4, 5: client config fields + pipeline fields + client fields fields := config.Fields.Clone() - fields.DeepUpdate(global.fields) + fields.DeepUpdate(global.fields.Clone()) if em := config.EventMetadata; len(em.Fields) > 0 { common.MergeFields(fields, em.Fields.Clone(), em.FieldsUnderRoot) } + if len(fields) > 0 { processors.add(makeAddFieldsProcessor("fields", fields, needsCopy)) } @@ -95,7 +95,7 @@ func (p *Pipeline) newProcessorPipeline( // setup 9: debug print final event (P) if logp.IsDebug("publish") { - processors.add(debugPrintProcessor(p.beatInfo)) + processors.add(debugPrintProcessor(info)) } // setup 10: drop all events if outputs are disabled (P) diff --git a/libbeat/publisher/pipeline/processor_test.go b/libbeat/publisher/pipeline/processor_test.go new file mode 100644 index 00000000000..d35aadf7ce9 --- /dev/null +++ b/libbeat/publisher/pipeline/processor_test.go @@ -0,0 +1,322 @@ +package pipeline + +import ( + "sync" + "testing" + "time" + + "github.com/stretchr/testify/assert" + + "github.com/elastic/beats/libbeat/beat" + "github.com/elastic/beats/libbeat/common" +) + +func TestProcessors(t *testing.T) { + info := beat.Info{} + + type local struct { + config beat.ClientConfig + events []common.MapStr + expected []common.MapStr + } + + tests := []struct { + name string + global pipelineProcessors + local []local + }{ + { + "user global fields and tags", + pipelineProcessors{ + fields: common.MapStr{"global": 1}, + tags: []string{"tag"}, + }, + []local{ + { + config: beat.ClientConfig{}, + events: []common.MapStr{{"value": "abc"}}, + expected: []common.MapStr{ + {"value": "abc", "global": 1, "tags": []string{"tag"}}, + }, + }, + }, + }, + { + "beat local fields", + pipelineProcessors{}, + []local{ + { + config: beat.ClientConfig{ + Fields: common.MapStr{"local": 1}, + }, + events: []common.MapStr{{"value": "abc"}}, + expected: []common.MapStr{{"value": "abc", "local": 1}}, + }, + }, + }, + { + "beat local and user global fields", + pipelineProcessors{ + fields: common.MapStr{"global": 1}, + tags: []string{"tag"}, + }, + []local{ + { + config: beat.ClientConfig{ + Fields: common.MapStr{"local": 1}, + }, + events: []common.MapStr{{"value": "abc"}}, + expected: []common.MapStr{ + {"value": "abc", "local": 1, "global": 1, "tags": []string{"tag"}}, + }, + }, + }, + }, + { + "user global fields overwrite beat local fields", + pipelineProcessors{ + fields: common.MapStr{"global": 1, "shared": "global"}, + tags: []string{"tag"}, + }, + []local{ + { + config: beat.ClientConfig{ + Fields: common.MapStr{"local": 1, "shared": "local"}, + }, + events: []common.MapStr{{"value": "abc"}}, + expected: []common.MapStr{ + {"value": "abc", "local": 1, "global": 1, "shared": "global", "tags": []string{"tag"}}, + }, + }, + }, + }, + { + "beat local fields isolated", + pipelineProcessors{}, + []local{ + { + config: beat.ClientConfig{ + Fields: common.MapStr{"local": 1}, + }, + events: []common.MapStr{{"value": "abc"}}, + expected: []common.MapStr{{"value": "abc", "local": 1}}, + }, + { + config: beat.ClientConfig{ + Fields: common.MapStr{"local": 2}, + }, + events: []common.MapStr{{"value": "def"}}, + expected: []common.MapStr{{"value": "def", "local": 2}}, + }, + }, + }, + + { + "beat local fields + user global fields isolated", + pipelineProcessors{ + fields: common.MapStr{"global": 0}, + }, + []local{ + { + config: beat.ClientConfig{ + Fields: common.MapStr{"local": 1}, + }, + events: []common.MapStr{{"value": "abc"}}, + expected: []common.MapStr{{"value": "abc", "global": 0, "local": 1}}, + }, + { + config: beat.ClientConfig{ + Fields: common.MapStr{"local": 2}, + }, + events: []common.MapStr{{"value": "def"}}, + expected: []common.MapStr{{"value": "def", "global": 0, "local": 2}}, + }, + }, + }, + { + "user local fields and tags", + pipelineProcessors{}, + []local{ + { + config: beat.ClientConfig{ + EventMetadata: common.EventMetadata{ + Fields: common.MapStr{"local": 1}, + Tags: []string{"tag"}, + }, + }, + events: []common.MapStr{{"value": "abc"}}, + expected: []common.MapStr{ + {"value": "abc", "fields": common.MapStr{"local": 1}, "tags": []string{"tag"}}, + }, + }, + }, + }, + { + "user local fields (under root) and tags", + pipelineProcessors{}, + []local{ + { + config: beat.ClientConfig{ + EventMetadata: common.EventMetadata{ + Fields: common.MapStr{"local": 1}, + FieldsUnderRoot: true, + Tags: []string{"tag"}, + }, + }, + events: []common.MapStr{{"value": "abc"}}, + expected: []common.MapStr{ + {"value": "abc", "local": 1, "tags": []string{"tag"}}, + }, + }, + }, + }, + { + "user local fields overwrite user global fields", + pipelineProcessors{ + fields: common.MapStr{"global": 0, "shared": "global"}, + tags: []string{"global"}, + }, + []local{ + { + config: beat.ClientConfig{ + EventMetadata: common.EventMetadata{ + Fields: common.MapStr{"local": 1, "shared": "local"}, + FieldsUnderRoot: true, + Tags: []string{"local"}, + }, + }, + events: []common.MapStr{{"value": "abc"}}, + expected: []common.MapStr{ + { + "value": "abc", + "global": 0, "local": 1, "shared": "local", + "tags": []string{"global", "local"}, + }, + }, + }, + }, + }, + { + "user local fields isolated", + pipelineProcessors{}, + []local{ + { + config: beat.ClientConfig{ + EventMetadata: common.EventMetadata{ + Fields: common.MapStr{"local": 1}, + }, + }, + events: []common.MapStr{{"value": "abc"}}, + expected: []common.MapStr{{"value": "abc", "fields": common.MapStr{"local": 1}}}, + }, + { + config: beat.ClientConfig{ + EventMetadata: common.EventMetadata{ + Fields: common.MapStr{"local": 2}, + }, + }, + events: []common.MapStr{{"value": "def"}}, + expected: []common.MapStr{{"value": "def", "fields": common.MapStr{"local": 2}}}, + }, + }, + }, + { + "user local + global fields isolated", + pipelineProcessors{ + fields: common.MapStr{"fields": common.MapStr{"global": 0}}, + }, + []local{ + { + config: beat.ClientConfig{ + EventMetadata: common.EventMetadata{ + Fields: common.MapStr{"local": 1}, + }, + }, + events: []common.MapStr{{"value": "abc"}}, + expected: []common.MapStr{{"value": "abc", "fields": common.MapStr{"global": 0, "local": 1}}}, + }, + { + config: beat.ClientConfig{ + EventMetadata: common.EventMetadata{ + Fields: common.MapStr{"local": 2}, + }, + }, + events: []common.MapStr{{"value": "def"}}, + expected: []common.MapStr{{"value": "def", "fields": common.MapStr{"global": 0, "local": 2}}}, + }, + }, + }, + { + "user local + global fields isolated (fields with root)", + pipelineProcessors{ + fields: common.MapStr{"global": 0}, + }, + []local{ + { + config: beat.ClientConfig{ + EventMetadata: common.EventMetadata{ + Fields: common.MapStr{"local": 1}, + FieldsUnderRoot: true, + }, + }, + events: []common.MapStr{{"value": "abc"}}, + expected: []common.MapStr{{"value": "abc", "global": 0, "local": 1}}, + }, + { + config: beat.ClientConfig{ + EventMetadata: common.EventMetadata{ + Fields: common.MapStr{"local": 2}, + FieldsUnderRoot: true, + }, + }, + events: []common.MapStr{{"value": "def"}}, + expected: []common.MapStr{{"value": "def", "global": 0, "local": 2}}, + }, + }, + }, + } + + for _, test := range tests { + test := test + t.Run(test.name, func(t *testing.T) { + // create processor pipelines + programs := make([]beat.Processor, len(test.local)) + for i, local := range test.local { + programs[i] = newProcessorPipeline(info, test.global, local.config) + } + + // run processor pipelines in parallel + var ( + wg sync.WaitGroup + mux sync.Mutex + results = make([][]common.MapStr, len(programs)) + ) + for id, local := range test.local { + wg.Add(1) + id, program, local := id, programs[id], local + go func() { + defer wg.Done() + + actual := make([]common.MapStr, len(local.events)) + for i, event := range local.events { + out, _ := program.Run(&beat.Event{ + Timestamp: time.Now(), + Fields: event, + }) + actual[i] = out.Fields + } + + mux.Lock() + defer mux.Unlock() + results[id] = actual + }() + } + wg.Wait() + + // validate + for i, local := range test.local { + assert.Equal(t, local.expected, results[i]) + } + }) + } +}