Skip to content

Commit

Permalink
Fix isolation of modules when merging local and global field settings. (
Browse files Browse the repository at this point in the history
elastic#5808)

The user global and local fields configurations must be merged
on pipeline setup. The DeepUpdate method did only keep references to the
globally shared fields, such that local fields settings do overwrite
the shared global state. If more then one beat.Client instance did set
local fields, the global state (and all fields added to an event) will
be given by the last client being instantiated.

The fix Clones the global fields before merging, such that the
beat.Client fields operate fully in isolation.
  • Loading branch information
Steffen Siering authored and ruflin committed Dec 6, 2017
1 parent 192b565 commit 08eb2f5
Show file tree
Hide file tree
Showing 4 changed files with 333 additions and 10 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ https://github.com/elastic/beats/compare/v6.0.0-beta2...master[Check the HEAD di
- Fix console color output for Windows. {issue}5611[5611]
- Fix documentation links in README.md files. {pull}5710[5710]
- Fix logstash output debug message. {pull}5799{5799]
- Fix isolation of modules when merging local and global field settings. {issue}5795[5795]

*Auditbeat*

Expand Down
6 changes: 3 additions & 3 deletions libbeat/publisher/pipeline/pipeline.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,8 @@ type pipelineProcessors struct {

processors beat.Processor

disabled bool // disabled is set if outputs have been disabled via CLI
disabled bool // disabled is set if outputs have been disabled via CLI
alwaysCopy bool
}

// Settings is used to pass additional settings to a newly created pipeline instance.
Expand Down Expand Up @@ -304,8 +305,7 @@ func (p *Pipeline) ConnectWith(cfg beat.ClientConfig) (beat.Client, error) {
}
}

processors := p.newProcessorPipeline(cfg)

processors := newProcessorPipeline(p.beatInfo, p.processors, cfg)
acker := p.makeACKer(processors != nil, &cfg, waitClose)
producerCfg := queue.ProducerConfig{
// Cancel events from queue if acker is configured
Expand Down
14 changes: 7 additions & 7 deletions libbeat/publisher/pipeline/processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,9 @@ type processorFn struct {
// 8. (P) pipeline processors list
// 9. (P) (if publish/debug enabled) log event
// 10. (P) (if output disabled) dropEvent
func (p *Pipeline) newProcessorPipeline(
func newProcessorPipeline(
info beat.Info,
global pipelineProcessors,
config beat.ClientConfig,
) beat.Processor {
var (
Expand All @@ -49,12 +51,9 @@ func (p *Pipeline) newProcessorPipeline(
// client fields and metadata
clientMeta = config.Meta
localProcessors = makeClientProcessors(config)

// pipeline global
global = p.processors
)

needsCopy := localProcessors != nil || global.processors != nil
needsCopy := global.alwaysCopy || localProcessors != nil || global.processors != nil

// setup 1: generalize/normalize output (P)
processors.add(generalizeProcessor)
Expand All @@ -74,10 +73,11 @@ func (p *Pipeline) newProcessorPipeline(

// setup 3, 4, 5: client config fields + pipeline fields + client fields + dyn metadata
fields := config.Fields.Clone()
fields.DeepUpdate(global.fields)
fields.DeepUpdate(global.fields.Clone())
if em := config.EventMetadata; len(em.Fields) > 0 {
common.MergeFields(fields, em.Fields.Clone(), em.FieldsUnderRoot)
}

if len(fields) > 0 {
processors.add(makeAddFieldsProcessor("fields", fields, needsCopy))
}
Expand All @@ -99,7 +99,7 @@ func (p *Pipeline) newProcessorPipeline(

// setup 9: debug print final event (P)
if logp.IsDebug("publish") {
processors.add(debugPrintProcessor(p.beatInfo))
processors.add(debugPrintProcessor(info))
}

// setup 10: drop all events if outputs are disabled (P)
Expand Down
322 changes: 322 additions & 0 deletions libbeat/publisher/pipeline/processor_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,322 @@
package pipeline

import (
"sync"
"testing"
"time"

"github.com/stretchr/testify/assert"

"github.com/elastic/beats/libbeat/beat"
"github.com/elastic/beats/libbeat/common"
)

func TestProcessors(t *testing.T) {
info := beat.Info{}

type local struct {
config beat.ClientConfig
events []common.MapStr
expected []common.MapStr
}

tests := []struct {
name string
global pipelineProcessors
local []local
}{
{
"user global fields and tags",
pipelineProcessors{
fields: common.MapStr{"global": 1},
tags: []string{"tag"},
},
[]local{
{
config: beat.ClientConfig{},
events: []common.MapStr{{"value": "abc"}},
expected: []common.MapStr{
{"value": "abc", "global": 1, "tags": []string{"tag"}},
},
},
},
},
{
"beat local fields",
pipelineProcessors{},
[]local{
{
config: beat.ClientConfig{
Fields: common.MapStr{"local": 1},
},
events: []common.MapStr{{"value": "abc"}},
expected: []common.MapStr{{"value": "abc", "local": 1}},
},
},
},
{
"beat local and user global fields",
pipelineProcessors{
fields: common.MapStr{"global": 1},
tags: []string{"tag"},
},
[]local{
{
config: beat.ClientConfig{
Fields: common.MapStr{"local": 1},
},
events: []common.MapStr{{"value": "abc"}},
expected: []common.MapStr{
{"value": "abc", "local": 1, "global": 1, "tags": []string{"tag"}},
},
},
},
},
{
"user global fields overwrite beat local fields",
pipelineProcessors{
fields: common.MapStr{"global": 1, "shared": "global"},
tags: []string{"tag"},
},
[]local{
{
config: beat.ClientConfig{
Fields: common.MapStr{"local": 1, "shared": "local"},
},
events: []common.MapStr{{"value": "abc"}},
expected: []common.MapStr{
{"value": "abc", "local": 1, "global": 1, "shared": "global", "tags": []string{"tag"}},
},
},
},
},
{
"beat local fields isolated",
pipelineProcessors{},
[]local{
{
config: beat.ClientConfig{
Fields: common.MapStr{"local": 1},
},
events: []common.MapStr{{"value": "abc"}},
expected: []common.MapStr{{"value": "abc", "local": 1}},
},
{
config: beat.ClientConfig{
Fields: common.MapStr{"local": 2},
},
events: []common.MapStr{{"value": "def"}},
expected: []common.MapStr{{"value": "def", "local": 2}},
},
},
},

{
"beat local fields + user global fields isolated",
pipelineProcessors{
fields: common.MapStr{"global": 0},
},
[]local{
{
config: beat.ClientConfig{
Fields: common.MapStr{"local": 1},
},
events: []common.MapStr{{"value": "abc"}},
expected: []common.MapStr{{"value": "abc", "global": 0, "local": 1}},
},
{
config: beat.ClientConfig{
Fields: common.MapStr{"local": 2},
},
events: []common.MapStr{{"value": "def"}},
expected: []common.MapStr{{"value": "def", "global": 0, "local": 2}},
},
},
},
{
"user local fields and tags",
pipelineProcessors{},
[]local{
{
config: beat.ClientConfig{
EventMetadata: common.EventMetadata{
Fields: common.MapStr{"local": 1},
Tags: []string{"tag"},
},
},
events: []common.MapStr{{"value": "abc"}},
expected: []common.MapStr{
{"value": "abc", "fields": common.MapStr{"local": 1}, "tags": []string{"tag"}},
},
},
},
},
{
"user local fields (under root) and tags",
pipelineProcessors{},
[]local{
{
config: beat.ClientConfig{
EventMetadata: common.EventMetadata{
Fields: common.MapStr{"local": 1},
FieldsUnderRoot: true,
Tags: []string{"tag"},
},
},
events: []common.MapStr{{"value": "abc"}},
expected: []common.MapStr{
{"value": "abc", "local": 1, "tags": []string{"tag"}},
},
},
},
},
{
"user local fields overwrite user global fields",
pipelineProcessors{
fields: common.MapStr{"global": 0, "shared": "global"},
tags: []string{"global"},
},
[]local{
{
config: beat.ClientConfig{
EventMetadata: common.EventMetadata{
Fields: common.MapStr{"local": 1, "shared": "local"},
FieldsUnderRoot: true,
Tags: []string{"local"},
},
},
events: []common.MapStr{{"value": "abc"}},
expected: []common.MapStr{
{
"value": "abc",
"global": 0, "local": 1, "shared": "local",
"tags": []string{"global", "local"},
},
},
},
},
},
{
"user local fields isolated",
pipelineProcessors{},
[]local{
{
config: beat.ClientConfig{
EventMetadata: common.EventMetadata{
Fields: common.MapStr{"local": 1},
},
},
events: []common.MapStr{{"value": "abc"}},
expected: []common.MapStr{{"value": "abc", "fields": common.MapStr{"local": 1}}},
},
{
config: beat.ClientConfig{
EventMetadata: common.EventMetadata{
Fields: common.MapStr{"local": 2},
},
},
events: []common.MapStr{{"value": "def"}},
expected: []common.MapStr{{"value": "def", "fields": common.MapStr{"local": 2}}},
},
},
},
{
"user local + global fields isolated",
pipelineProcessors{
fields: common.MapStr{"fields": common.MapStr{"global": 0}},
},
[]local{
{
config: beat.ClientConfig{
EventMetadata: common.EventMetadata{
Fields: common.MapStr{"local": 1},
},
},
events: []common.MapStr{{"value": "abc"}},
expected: []common.MapStr{{"value": "abc", "fields": common.MapStr{"global": 0, "local": 1}}},
},
{
config: beat.ClientConfig{
EventMetadata: common.EventMetadata{
Fields: common.MapStr{"local": 2},
},
},
events: []common.MapStr{{"value": "def"}},
expected: []common.MapStr{{"value": "def", "fields": common.MapStr{"global": 0, "local": 2}}},
},
},
},
{
"user local + global fields isolated (fields with root)",
pipelineProcessors{
fields: common.MapStr{"global": 0},
},
[]local{
{
config: beat.ClientConfig{
EventMetadata: common.EventMetadata{
Fields: common.MapStr{"local": 1},
FieldsUnderRoot: true,
},
},
events: []common.MapStr{{"value": "abc"}},
expected: []common.MapStr{{"value": "abc", "global": 0, "local": 1}},
},
{
config: beat.ClientConfig{
EventMetadata: common.EventMetadata{
Fields: common.MapStr{"local": 2},
FieldsUnderRoot: true,
},
},
events: []common.MapStr{{"value": "def"}},
expected: []common.MapStr{{"value": "def", "global": 0, "local": 2}},
},
},
},
}

for _, test := range tests {
test := test
t.Run(test.name, func(t *testing.T) {
// create processor pipelines
programs := make([]beat.Processor, len(test.local))
for i, local := range test.local {
programs[i] = newProcessorPipeline(info, test.global, local.config)
}

// run processor pipelines in parallel
var (
wg sync.WaitGroup
mux sync.Mutex
results = make([][]common.MapStr, len(programs))
)
for id, local := range test.local {
wg.Add(1)
id, program, local := id, programs[id], local
go func() {
defer wg.Done()

actual := make([]common.MapStr, len(local.events))
for i, event := range local.events {
out, _ := program.Run(&beat.Event{
Timestamp: time.Now(),
Fields: event,
})
actual[i] = out.Fields
}

mux.Lock()
defer mux.Unlock()
results[id] = actual
}()
}
wg.Wait()

// validate
for i, local := range test.local {
assert.Equal(t, local.expected, results[i])
}
})
}
}

0 comments on commit 08eb2f5

Please sign in to comment.