Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Cherry-pick #5808 to 6.0: Fix isolation of modules when merging local and global field settings. #5836

Merged
merged 1 commit into from
Dec 7, 2017
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions CHANGELOG.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,8 @@ https://github.com/elastic/beats/compare/v6.0.1...6.0[Check the HEAD diff]

*Affecting all Beats*

- Fix isolation of modules when merging local and global field settings. {issue}5795[5795]

*Auditbeat*

*Filebeat*
Expand Down
6 changes: 3 additions & 3 deletions libbeat/publisher/pipeline/pipeline.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,8 @@ type pipelineProcessors struct {

processors beat.Processor

disabled bool // disabled is set if outputs have been disabled via CLI
disabled bool // disabled is set if outputs have been disabled via CLI
alwaysCopy bool
}

// Settings is used to pass additional settings to a newly created pipeline instance.
Expand Down Expand Up @@ -300,8 +301,7 @@ func (p *Pipeline) ConnectWith(cfg beat.ClientConfig) (beat.Client, error) {
}
}

processors := p.newProcessorPipeline(cfg)

processors := newProcessorPipeline(p.beatInfo, p.processors, cfg)
acker := p.makeACKer(processors != nil, &cfg, waitClose)
producerCfg := queue.ProducerConfig{
// only cancel events from queue if acker is configured
Expand Down
14 changes: 7 additions & 7 deletions libbeat/publisher/pipeline/processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,9 @@ type processorFn struct {
// 8. (P) pipeline processors list
// 9. (P) (if publish/debug enabled) log event
// 10. (P) (if output disabled) dropEvent
func (p *Pipeline) newProcessorPipeline(
func newProcessorPipeline(
info beat.Info,
global pipelineProcessors,
config beat.ClientConfig,
) beat.Processor {
var (
Expand All @@ -49,12 +51,9 @@ func (p *Pipeline) newProcessorPipeline(
// client fields and metadata
clientMeta = config.Meta
localProcessors = makeClientProcessors(config)

// pipeline global
global = p.processors
)

needsCopy := localProcessors != nil || global.processors != nil
needsCopy := global.alwaysCopy || localProcessors != nil || global.processors != nil

// setup 1: generalize/normalize output (P)
processors.add(generalizeProcessor)
Expand All @@ -74,10 +73,11 @@ func (p *Pipeline) newProcessorPipeline(

// setup 3, 4, 5: client config fields + pipeline fields + client fields
fields := config.Fields.Clone()
fields.DeepUpdate(global.fields)
fields.DeepUpdate(global.fields.Clone())
if em := config.EventMetadata; len(em.Fields) > 0 {
common.MergeFields(fields, em.Fields.Clone(), em.FieldsUnderRoot)
}

if len(fields) > 0 {
processors.add(makeAddFieldsProcessor("fields", fields, needsCopy))
}
Expand All @@ -95,7 +95,7 @@ func (p *Pipeline) newProcessorPipeline(

// setup 9: debug print final event (P)
if logp.IsDebug("publish") {
processors.add(debugPrintProcessor(p.beatInfo))
processors.add(debugPrintProcessor(info))
}

// setup 10: drop all events if outputs are disabled (P)
Expand Down
322 changes: 322 additions & 0 deletions libbeat/publisher/pipeline/processor_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,322 @@
package pipeline

import (
"sync"
"testing"
"time"

"github.com/stretchr/testify/assert"

"github.com/elastic/beats/libbeat/beat"
"github.com/elastic/beats/libbeat/common"
)

func TestProcessors(t *testing.T) {
info := beat.Info{}

type local struct {
config beat.ClientConfig
events []common.MapStr
expected []common.MapStr
}

tests := []struct {
name string
global pipelineProcessors
local []local
}{
{
"user global fields and tags",
pipelineProcessors{
fields: common.MapStr{"global": 1},
tags: []string{"tag"},
},
[]local{
{
config: beat.ClientConfig{},
events: []common.MapStr{{"value": "abc"}},
expected: []common.MapStr{
{"value": "abc", "global": 1, "tags": []string{"tag"}},
},
},
},
},
{
"beat local fields",
pipelineProcessors{},
[]local{
{
config: beat.ClientConfig{
Fields: common.MapStr{"local": 1},
},
events: []common.MapStr{{"value": "abc"}},
expected: []common.MapStr{{"value": "abc", "local": 1}},
},
},
},
{
"beat local and user global fields",
pipelineProcessors{
fields: common.MapStr{"global": 1},
tags: []string{"tag"},
},
[]local{
{
config: beat.ClientConfig{
Fields: common.MapStr{"local": 1},
},
events: []common.MapStr{{"value": "abc"}},
expected: []common.MapStr{
{"value": "abc", "local": 1, "global": 1, "tags": []string{"tag"}},
},
},
},
},
{
"user global fields overwrite beat local fields",
pipelineProcessors{
fields: common.MapStr{"global": 1, "shared": "global"},
tags: []string{"tag"},
},
[]local{
{
config: beat.ClientConfig{
Fields: common.MapStr{"local": 1, "shared": "local"},
},
events: []common.MapStr{{"value": "abc"}},
expected: []common.MapStr{
{"value": "abc", "local": 1, "global": 1, "shared": "global", "tags": []string{"tag"}},
},
},
},
},
{
"beat local fields isolated",
pipelineProcessors{},
[]local{
{
config: beat.ClientConfig{
Fields: common.MapStr{"local": 1},
},
events: []common.MapStr{{"value": "abc"}},
expected: []common.MapStr{{"value": "abc", "local": 1}},
},
{
config: beat.ClientConfig{
Fields: common.MapStr{"local": 2},
},
events: []common.MapStr{{"value": "def"}},
expected: []common.MapStr{{"value": "def", "local": 2}},
},
},
},

{
"beat local fields + user global fields isolated",
pipelineProcessors{
fields: common.MapStr{"global": 0},
},
[]local{
{
config: beat.ClientConfig{
Fields: common.MapStr{"local": 1},
},
events: []common.MapStr{{"value": "abc"}},
expected: []common.MapStr{{"value": "abc", "global": 0, "local": 1}},
},
{
config: beat.ClientConfig{
Fields: common.MapStr{"local": 2},
},
events: []common.MapStr{{"value": "def"}},
expected: []common.MapStr{{"value": "def", "global": 0, "local": 2}},
},
},
},
{
"user local fields and tags",
pipelineProcessors{},
[]local{
{
config: beat.ClientConfig{
EventMetadata: common.EventMetadata{
Fields: common.MapStr{"local": 1},
Tags: []string{"tag"},
},
},
events: []common.MapStr{{"value": "abc"}},
expected: []common.MapStr{
{"value": "abc", "fields": common.MapStr{"local": 1}, "tags": []string{"tag"}},
},
},
},
},
{
"user local fields (under root) and tags",
pipelineProcessors{},
[]local{
{
config: beat.ClientConfig{
EventMetadata: common.EventMetadata{
Fields: common.MapStr{"local": 1},
FieldsUnderRoot: true,
Tags: []string{"tag"},
},
},
events: []common.MapStr{{"value": "abc"}},
expected: []common.MapStr{
{"value": "abc", "local": 1, "tags": []string{"tag"}},
},
},
},
},
{
"user local fields overwrite user global fields",
pipelineProcessors{
fields: common.MapStr{"global": 0, "shared": "global"},
tags: []string{"global"},
},
[]local{
{
config: beat.ClientConfig{
EventMetadata: common.EventMetadata{
Fields: common.MapStr{"local": 1, "shared": "local"},
FieldsUnderRoot: true,
Tags: []string{"local"},
},
},
events: []common.MapStr{{"value": "abc"}},
expected: []common.MapStr{
{
"value": "abc",
"global": 0, "local": 1, "shared": "local",
"tags": []string{"global", "local"},
},
},
},
},
},
{
"user local fields isolated",
pipelineProcessors{},
[]local{
{
config: beat.ClientConfig{
EventMetadata: common.EventMetadata{
Fields: common.MapStr{"local": 1},
},
},
events: []common.MapStr{{"value": "abc"}},
expected: []common.MapStr{{"value": "abc", "fields": common.MapStr{"local": 1}}},
},
{
config: beat.ClientConfig{
EventMetadata: common.EventMetadata{
Fields: common.MapStr{"local": 2},
},
},
events: []common.MapStr{{"value": "def"}},
expected: []common.MapStr{{"value": "def", "fields": common.MapStr{"local": 2}}},
},
},
},
{
"user local + global fields isolated",
pipelineProcessors{
fields: common.MapStr{"fields": common.MapStr{"global": 0}},
},
[]local{
{
config: beat.ClientConfig{
EventMetadata: common.EventMetadata{
Fields: common.MapStr{"local": 1},
},
},
events: []common.MapStr{{"value": "abc"}},
expected: []common.MapStr{{"value": "abc", "fields": common.MapStr{"global": 0, "local": 1}}},
},
{
config: beat.ClientConfig{
EventMetadata: common.EventMetadata{
Fields: common.MapStr{"local": 2},
},
},
events: []common.MapStr{{"value": "def"}},
expected: []common.MapStr{{"value": "def", "fields": common.MapStr{"global": 0, "local": 2}}},
},
},
},
{
"user local + global fields isolated (fields with root)",
pipelineProcessors{
fields: common.MapStr{"global": 0},
},
[]local{
{
config: beat.ClientConfig{
EventMetadata: common.EventMetadata{
Fields: common.MapStr{"local": 1},
FieldsUnderRoot: true,
},
},
events: []common.MapStr{{"value": "abc"}},
expected: []common.MapStr{{"value": "abc", "global": 0, "local": 1}},
},
{
config: beat.ClientConfig{
EventMetadata: common.EventMetadata{
Fields: common.MapStr{"local": 2},
FieldsUnderRoot: true,
},
},
events: []common.MapStr{{"value": "def"}},
expected: []common.MapStr{{"value": "def", "global": 0, "local": 2}},
},
},
},
}

for _, test := range tests {
test := test
t.Run(test.name, func(t *testing.T) {
// create processor pipelines
programs := make([]beat.Processor, len(test.local))
for i, local := range test.local {
programs[i] = newProcessorPipeline(info, test.global, local.config)
}

// run processor pipelines in parallel
var (
wg sync.WaitGroup
mux sync.Mutex
results = make([][]common.MapStr, len(programs))
)
for id, local := range test.local {
wg.Add(1)
id, program, local := id, programs[id], local
go func() {
defer wg.Done()

actual := make([]common.MapStr, len(local.events))
for i, event := range local.events {
out, _ := program.Run(&beat.Event{
Timestamp: time.Now(),
Fields: event,
})
actual[i] = out.Fields
}

mux.Lock()
defer mux.Unlock()
results[id] = actual
}()
}
wg.Wait()

// validate
for i, local := range test.local {
assert.Equal(t, local.expected, results[i])
}
})
}
}