From 733346aa9985e58f3a87e16224a0acb2556973a3 Mon Sep 17 00:00:00 2001 From: Gil Raphaelli Date: Tue, 8 Jan 2019 11:02:55 -0500 Subject: [PATCH 1/4] move agent metadata to a processor and make it disable-able --- libbeat/beat/pipeline.go | 4 ++++ libbeat/publisher/pipeline/module.go | 11 ----------- libbeat/publisher/pipeline/processor.go | 24 +++++++++++++++++++++++- 3 files changed, 27 insertions(+), 12 deletions(-) diff --git a/libbeat/beat/pipeline.go b/libbeat/beat/pipeline.go index 444daae7efa..3db7baae6d2 100644 --- a/libbeat/beat/pipeline.go +++ b/libbeat/beat/pipeline.go @@ -76,6 +76,10 @@ type ClientConfig struct { // if the normalization step should be skipped set this to true. SkipNormalization bool + // By default events are decorated with agent metadata. + // To skip adding that metadata set this to true. + SkipAgentMetadata bool + // ACK handler strategies. // Note: ack handlers are run in another go-routine owned by the publisher pipeline. // They should not block for to long, to not block the internal buffers for diff --git a/libbeat/publisher/pipeline/module.go b/libbeat/publisher/pipeline/module.go index cff8abdc1a6..0bb3710b9da 100644 --- a/libbeat/publisher/pipeline/module.go +++ b/libbeat/publisher/pipeline/module.go @@ -80,13 +80,6 @@ func Load( Annotations: Annotations{ Event: config.EventMetadata, Builtin: common.MapStr{ - "agent": common.MapStr{ - "type": beatInfo.Beat, - "hostname": beatInfo.Hostname, - "version": beatInfo.Version, - "id": beatInfo.ID.String(), - "ephemeral_id": beatInfo.EphemeralID.String(), - }, "host": common.MapStr{ "name": name, }, @@ -97,10 +90,6 @@ func Load( }, } - if name != beatInfo.Hostname { - settings.Annotations.Builtin.Put("agent.name", name) - } - queueBuilder, err := createQueueBuilder(config.Queue, monitors) if err != nil { return nil, err diff --git a/libbeat/publisher/pipeline/processor.go b/libbeat/publisher/pipeline/processor.go index e2dcb7c73c0..b2dccddc9d0 100644 --- a/libbeat/publisher/pipeline/processor.go +++ b/libbeat/publisher/pipeline/processor.go @@ -121,7 +121,12 @@ func newProcessorPipeline( processors.add(makeAddFieldsProcessor("beatsMeta", meta, needsCopy)) } - // setup 7: pipeline processors list + // setup 7: add agent metadata + if !config.SkipAgentMetadata { + processors.add(makeAddAgentMetadataProcessor(info)) + } + + // setup 8: pipeline processors list processors.add(global.processors) // setup 9: debug print final event (P) @@ -290,6 +295,23 @@ func makeAddDynMetaProcessor( }) } +func makeAddAgentMetadataProcessor(info beat.Info) *processorFn { + metadata := common.MapStr{ + "type": info.Beat, + "ephemeral_id": info.EphemeralID.String(), + "hostname": info.Hostname, + "id": info.ID.String(), + "version": info.Version, + } + if info.Name != info.Hostname { + metadata.Put("name", info.Name) + } + return newProcessor("add_agent_metadata", func(event *beat.Event) (*beat.Event, error) { + _, err := event.Fields.Put("agent", metadata) + return event, err + }) +} + func debugPrintProcessor(info beat.Info) *processorFn { // ensure only one go-routine is using the encoder (in case // beat.Client is shared between multiple go-routines by accident) From 1ba4454afca270c1828f2d03561bf75005bcd1e0 Mon Sep 17 00:00:00 2001 From: Gil Raphaelli Date: Tue, 8 Jan 2019 13:49:57 -0500 Subject: [PATCH 2/4] fix tests * skip agent adding agent metadata by default * test adding agent metadata --- libbeat/publisher/pipeline/processor_test.go | 158 ++++++++++++++----- 1 file changed, 115 insertions(+), 43 deletions(-) diff --git a/libbeat/publisher/pipeline/processor_test.go b/libbeat/publisher/pipeline/processor_test.go index df5f30724b4..3e49ee5eea7 100644 --- a/libbeat/publisher/pipeline/processor_test.go +++ b/libbeat/publisher/pipeline/processor_test.go @@ -22,6 +22,7 @@ import ( "testing" "time" + "github.com/gofrs/uuid" "github.com/stretchr/testify/assert" "github.com/elastic/beats/libbeat/beat" @@ -29,26 +30,28 @@ import ( ) func TestProcessors(t *testing.T) { - info := beat.Info{} + defaultInfo := beat.Info{} type local struct { - config beat.ClientConfig - events []common.MapStr - expected []common.MapStr + config beat.ClientConfig + events []common.MapStr + expected []common.MapStr + includeAgentMetadata bool } tests := []struct { name string global pipelineProcessors local []local + info *beat.Info }{ { - "user global fields and tags", - pipelineProcessors{ + name: "user global fields and tags", + global: pipelineProcessors{ fields: common.MapStr{"global": 1}, tags: []string{"tag"}, }, - []local{ + local: []local{ { config: beat.ClientConfig{}, events: []common.MapStr{{"value": "abc", "user": nil}}, @@ -59,12 +62,12 @@ func TestProcessors(t *testing.T) { }, }, { - "no normalization", - pipelineProcessors{ + name: "no normalization", + global: pipelineProcessors{ fields: common.MapStr{"global": 1}, tags: []string{"tag"}, }, - []local{ + local: []local{ { config: beat.ClientConfig{SkipNormalization: true}, events: []common.MapStr{{"value": "abc", "user": nil}}, @@ -75,9 +78,77 @@ func TestProcessors(t *testing.T) { }, }, { - "beat local fields", - pipelineProcessors{}, - []local{ + name: "add agent metadata", + global: pipelineProcessors{ + fields: common.MapStr{"global": 1}, + tags: []string{"tag"}, + }, + info: &beat.Info{ + Beat: "test", + EphemeralID: uuid.Must(uuid.FromString("123e4567-e89b-12d3-a456-426655440000")), + Hostname: "test.host.name", + ID: uuid.Must(uuid.FromString("123e4567-e89b-12d3-a456-426655440001")), + Name: "test.host.name", + Version: "0.1", + }, + local: []local{ + { + config: beat.ClientConfig{}, + events: []common.MapStr{{"value": "abc", "user": nil}}, + expected: []common.MapStr{ + { + "agent": common.MapStr{ + "ephemeral_id": "123e4567-e89b-12d3-a456-426655440000", + "hostname": "test.host.name", + "id": "123e4567-e89b-12d3-a456-426655440001", + "type": "test", + "version": "0.1", + }, + "value": "abc", "global": 1, "tags": []string{"tag"}, + }, + }, + includeAgentMetadata: true, + }, + }, + }, + { + name: "add agent metadata with custom host.name", + global: pipelineProcessors{ + fields: common.MapStr{"global": 1}, + tags: []string{"tag"}, + }, + info: &beat.Info{ + Beat: "test", + EphemeralID: uuid.Must(uuid.FromString("123e4567-e89b-12d3-a456-426655440000")), + Hostname: "test.host.name", + ID: uuid.Must(uuid.FromString("123e4567-e89b-12d3-a456-426655440001")), + Name: "other.test.host.name", + Version: "0.1", + }, + local: []local{ + { + config: beat.ClientConfig{}, + events: []common.MapStr{{"value": "abc", "user": nil}}, + expected: []common.MapStr{ + { + "agent": common.MapStr{ + "ephemeral_id": "123e4567-e89b-12d3-a456-426655440000", + "hostname": "test.host.name", + "id": "123e4567-e89b-12d3-a456-426655440001", + "name": "other.test.host.name", + "type": "test", + "version": "0.1", + }, + "value": "abc", "global": 1, "tags": []string{"tag"}, + }, + }, + includeAgentMetadata: true, + }, + }, + }, + { + name: "beat local fields", + local: []local{ { config: beat.ClientConfig{ Fields: common.MapStr{"local": 1}, @@ -88,12 +159,12 @@ func TestProcessors(t *testing.T) { }, }, { - "beat local and user global fields", - pipelineProcessors{ + name: "beat local and user global fields", + global: pipelineProcessors{ fields: common.MapStr{"global": 1}, tags: []string{"tag"}, }, - []local{ + local: []local{ { config: beat.ClientConfig{ Fields: common.MapStr{"local": 1}, @@ -106,12 +177,12 @@ func TestProcessors(t *testing.T) { }, }, { - "user global fields overwrite beat local fields", - pipelineProcessors{ + name: "user global fields overwrite beat local fields", + global: pipelineProcessors{ fields: common.MapStr{"global": 1, "shared": "global"}, tags: []string{"tag"}, }, - []local{ + local: []local{ { config: beat.ClientConfig{ Fields: common.MapStr{"local": 1, "shared": "local"}, @@ -124,9 +195,8 @@ func TestProcessors(t *testing.T) { }, }, { - "beat local fields isolated", - pipelineProcessors{}, - []local{ + name: "beat local fields isolated", + local: []local{ { config: beat.ClientConfig{ Fields: common.MapStr{"local": 1}, @@ -145,11 +215,11 @@ func TestProcessors(t *testing.T) { }, { - "beat local fields + user global fields isolated", - pipelineProcessors{ + name: "beat local fields + user global fields isolated", + global: pipelineProcessors{ fields: common.MapStr{"global": 0}, }, - []local{ + local: []local{ { config: beat.ClientConfig{ Fields: common.MapStr{"local": 1}, @@ -167,9 +237,8 @@ func TestProcessors(t *testing.T) { }, }, { - "user local fields and tags", - pipelineProcessors{}, - []local{ + name: "user local fields and tags", + local: []local{ { config: beat.ClientConfig{ EventMetadata: common.EventMetadata{ @@ -185,9 +254,8 @@ func TestProcessors(t *testing.T) { }, }, { - "user local fields (under root) and tags", - pipelineProcessors{}, - []local{ + name: "user local fields (under root) and tags", + local: []local{ { config: beat.ClientConfig{ EventMetadata: common.EventMetadata{ @@ -204,12 +272,12 @@ func TestProcessors(t *testing.T) { }, }, { - "user local fields overwrite user global fields", - pipelineProcessors{ + name: "user local fields overwrite user global fields", + global: pipelineProcessors{ fields: common.MapStr{"global": 0, "shared": "global"}, tags: []string{"global"}, }, - []local{ + local: []local{ { config: beat.ClientConfig{ EventMetadata: common.EventMetadata{ @@ -230,9 +298,8 @@ func TestProcessors(t *testing.T) { }, }, { - "user local fields isolated", - pipelineProcessors{}, - []local{ + name: "user local fields isolated", + local: []local{ { config: beat.ClientConfig{ EventMetadata: common.EventMetadata{ @@ -254,11 +321,11 @@ func TestProcessors(t *testing.T) { }, }, { - "user local + global fields isolated", - pipelineProcessors{ + name: "user local + global fields isolated", + global: pipelineProcessors{ fields: common.MapStr{"fields": common.MapStr{"global": 0}}, }, - []local{ + local: []local{ { config: beat.ClientConfig{ EventMetadata: common.EventMetadata{ @@ -280,11 +347,11 @@ func TestProcessors(t *testing.T) { }, }, { - "user local + global fields isolated (fields with root)", - pipelineProcessors{ + name: "user local + global fields isolated (fields with root)", + global: pipelineProcessors{ fields: common.MapStr{"global": 0}, }, - []local{ + local: []local{ { config: beat.ClientConfig{ EventMetadata: common.EventMetadata{ @@ -314,7 +381,12 @@ func TestProcessors(t *testing.T) { t.Run(test.name, func(t *testing.T) { // create processor pipelines programs := make([]beat.Processor, len(test.local)) + info := defaultInfo + if test.info != nil { + info = *test.info + } for i, local := range test.local { + local.config.SkipAgentMetadata = !local.includeAgentMetadata programs[i] = newProcessorPipeline(info, test.global, local.config) } From 6f8ec6aa93e91b69f0135d929605611a00d299ea Mon Sep 17 00:00:00 2001 From: Gil Raphaelli Date: Tue, 8 Jan 2019 17:43:40 -0500 Subject: [PATCH 3/4] retain previous agent metadata merge behavior and address race condition per @urso --- libbeat/publisher/pipeline/processor.go | 5 +---- libbeat/publisher/pipeline/processor_test.go | 3 ++- 2 files changed, 3 insertions(+), 5 deletions(-) diff --git a/libbeat/publisher/pipeline/processor.go b/libbeat/publisher/pipeline/processor.go index b2dccddc9d0..5387d0b8e92 100644 --- a/libbeat/publisher/pipeline/processor.go +++ b/libbeat/publisher/pipeline/processor.go @@ -306,10 +306,7 @@ func makeAddAgentMetadataProcessor(info beat.Info) *processorFn { if info.Name != info.Hostname { metadata.Put("name", info.Name) } - return newProcessor("add_agent_metadata", func(event *beat.Event) (*beat.Event, error) { - _, err := event.Fields.Put("agent", metadata) - return event, err - }) + return makeAddFieldsProcessor("add_agent_metadata", common.MapStr{"agent": metadata}, true) } func debugPrintProcessor(info beat.Info) *processorFn { diff --git a/libbeat/publisher/pipeline/processor_test.go b/libbeat/publisher/pipeline/processor_test.go index 3e49ee5eea7..a2e84da4371 100644 --- a/libbeat/publisher/pipeline/processor_test.go +++ b/libbeat/publisher/pipeline/processor_test.go @@ -80,7 +80,7 @@ func TestProcessors(t *testing.T) { { name: "add agent metadata", global: pipelineProcessors{ - fields: common.MapStr{"global": 1}, + fields: common.MapStr{"global": 1, "agent": common.MapStr{"foo": "bar"}}, tags: []string{"tag"}, }, info: &beat.Info{ @@ -103,6 +103,7 @@ func TestProcessors(t *testing.T) { "id": "123e4567-e89b-12d3-a456-426655440001", "type": "test", "version": "0.1", + "foo": "bar", }, "value": "abc", "global": 1, "tags": []string{"tag"}, }, From e6f631e237189e2fd4ee357ecb03fb285f107d67 Mon Sep 17 00:00:00 2001 From: Gil Raphaelli Date: Wed, 9 Jan 2019 11:18:24 -0500 Subject: [PATCH 4/4] update dev changelog --- CHANGELOG-developer.next.asciidoc | 2 ++ 1 file changed, 2 insertions(+) diff --git a/CHANGELOG-developer.next.asciidoc b/CHANGELOG-developer.next.asciidoc index 4cacdc3a60b..edeb4ef22a2 100644 --- a/CHANGELOG-developer.next.asciidoc +++ b/CHANGELOG-developer.next.asciidoc @@ -23,3 +23,5 @@ The list below covers the major changes between 7.0.0-alpha2 and master only. ==== Bugfixes ==== Added + +- Move agent metadata addition to a processor {pull}9952[9952]