Skip to content

Commit

Permalink
Reorder processors in publisher pipeline (elastic#5149) (elastic#5165)
Browse files Browse the repository at this point in the history
* Reorder processors pipeline order

- reorder processors, such that all client processors run before the
  global processors
- remove support for EventsMetadataKey

* Combine fields and tags if possible

- combine client internal field updates and configured field updates
  into one dictionary
- combine proessors for adding client and global fields+tags if no
  client processors are configured

* Do not copy if no processors are defined

Do not Clone (deep copy) fields being added in the pipeline if no
processors are configured.

As processors might add/remove fields and potentially modify shared
field objects, these must be copied if there is a chance global shared
structured being overwritten by processors. Especially if processors are
guarded by conditions.

* Fix processors order once again

Problem is local fields+tags must be applied after the globaly
configured fields and tests, while client processors must be run before
the global ones.

* review

- use 'disabled' flag
- simplify pipeline fields init

(cherry picked from commit b66abfe)
  • Loading branch information
Steffen Siering authored and andrewkroh committed Sep 12, 2017
1 parent 4956e31 commit e7bfe98
Show file tree
Hide file tree
Showing 5 changed files with 134 additions and 134 deletions.
5 changes: 2 additions & 3 deletions libbeat/common/mapstr.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,9 +11,8 @@ import (
// Event metadata constants. These keys are used within libbeat to identify
// metadata stored in an event.
const (
EventMetadataKey = "_event_metadata"
FieldsKey = "fields"
TagsKey = "tags"
FieldsKey = "fields"
TagsKey = "tags"
)

var (
Expand Down
76 changes: 46 additions & 30 deletions libbeat/publisher/pipeline/pipeline.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,10 +60,13 @@ type pipelineProcessors struct {
// The pipeline its processor settings for
// constructing the clients complete processor
// pipeline on connect.
beatMetaProcessor beat.Processor
eventMetaProcessor beat.Processor
processors beat.Processor
disabled bool // disabled is set if outputs have been disabled via CLI
beatsMeta common.MapStr
fields common.MapStr
tags []string

processors beat.Processor

disabled bool // disabled is set if outputs have been disabled via CLI
}

// Settings is used to pass additional settings to a newly created pipeline instance.
Expand Down Expand Up @@ -132,39 +135,17 @@ func New(
out outputs.Group,
settings Settings,
) (*Pipeline, error) {
annotations := settings.Annotations
var err error

var beatMeta beat.Processor
if meta := annotations.Beat; meta != nil {
beatMeta = beatAnnotateProcessor(meta)
}

var eventMeta beat.Processor
if em := annotations.Event; len(em.Fields) > 0 || len(em.Tags) > 0 {
eventMeta = eventAnnotateProcessor(em)
}

var prog beat.Processor
if ps := settings.Processors; ps != nil && len(ps.List) > 0 {
tmp := &program{title: "global"}
for _, p := range ps.List {
tmp.add(p)
}
prog = tmp
}

log := defaultLogger
annotations := settings.Annotations
processors := settings.Processors
disabledOutput := settings.Disabled
p := &Pipeline{
logger: log,
waitCloseMode: settings.WaitCloseMode,
waitCloseTimeout: settings.WaitClose,
processors: pipelineProcessors{
beatMetaProcessor: beatMeta,
eventMetaProcessor: eventMeta,
processors: prog,
disabled: settings.Disabled,
},
processors: makePipelineProcessors(annotations, processors, disabledOutput),
}
p.ackBuilder = &pipelineEmptyACK{p}
p.ackActive = atomic.MakeBool(true)
Expand Down Expand Up @@ -382,3 +363,38 @@ func (e *waitCloser) dec(n int) {
func (e *waitCloser) wait() {
e.events.Wait()
}

func makePipelineProcessors(
annotations Annotations,
processors *processors.Processors,
disabled bool,
) pipelineProcessors {
p := pipelineProcessors{
disabled: disabled,
}

hasProcessors := processors != nil && len(processors.List) > 0
if hasProcessors {
tmp := &program{title: "global"}
for _, p := range processors.List {
tmp.add(p)
}
p.processors = tmp
}

if meta := annotations.Beat; meta != nil {
p.beatsMeta = common.MapStr{"beat": meta}
}

if em := annotations.Event; len(em.Fields) > 0 {
fields := common.MapStr{}
common.MergeFields(fields, em.Fields.Clone(), em.FieldsUnderRoot)
p.fields = fields
}

if t := annotations.Event.Tags; len(t) > 0 {
p.tags = t
}

return p
}
183 changes: 86 additions & 97 deletions libbeat/publisher/pipeline/processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,73 +29,76 @@ type processorFn struct {
//
// Pipeline (C=client, P=pipeline)
//
// 1. (P) extract EventMetadataKey fields + tags (to be removed in favor of 4)
// 2. (P) generalize/normalize event
// 3. (P) add beats metadata (name, hostname, version)
// 4. (C) add Meta from client Config to event.Meta
// 5. (C) add Fields from client config to event.Fields
// 6. (P) add pipeline fields + tags
// 7. (C) add client fields + tags
// 8. (P/C) apply EventMetadataKey fields + tags (to be removed in favor of 4)
// 9. (C) client processors list
// 10. (P) pipeline processors list
// 11. (P) (if publish/debug enabled) log event
// 12. (P) (if output disabled) dropEvent
// 1. (P) generalize/normalize event
// 2. (C) add Meta from client Config to event.Meta
// 3. (C) add Fields from client config to event.Fields
// 4. (P) add pipeline fields + tags
// 5. (C) add client fields + tags
// 6. (C) client processors list
// 7. (P) add beats metadata
// 8. (P) pipeline processors list
// 9. (P) (if publish/debug enabled) log event
// 10. (P) (if output disabled) dropEvent
func (p *Pipeline) newProcessorPipeline(
config beat.ClientConfig,
) beat.Processor {
processors := &program{title: "processPipeline"}
var (
// pipeline processors
processors = &program{title: "processPipeline"}

global := p.processors
// client fields and metadata
clientMeta = config.Meta
localProcessors = makeClientProcessors(config)

// setup 1: extract EventMetadataKey fields + tags
processors.add(preEventUserAnnotateProcessor)
// pipeline global
global = p.processors
)

// setup 2 and 3: generalize/normalize output (P)
needsCopy := localProcessors != nil || global.processors != nil

// setup 1: generalize/normalize output (P)
processors.add(generalizeProcessor)
processors.add(global.beatMetaProcessor)

// setup 4: add Meta from client config
if m := config.Meta; len(m) > 0 {
processors.add(clientEventMeta(m))
// setup 2: add Meta from client config (C)
if m := clientMeta; len(m) > 0 {
processors.add(clientEventMeta(m, needsCopy))
}

// setup 5: add Fields from client config
if m := config.Fields; len(m) > 0 {
processors.add(clientEventFields(m))
// setup 4, 5: pipeline tags + client tags
var tags []string
tags = append(tags, global.tags...)
tags = append(tags, config.EventMetadata.Tags...)
if len(tags) > 0 {
processors.add(makeAddTagsProcessor("tags", tags))
}

// setup 6: add event fields + tags (P)
processors.add(global.eventMetaProcessor)

// setup 7: add fields + tags (C)
if em := config.EventMetadata; len(em.Fields) > 0 || len(em.Tags) > 0 {
processors.add(eventAnnotateProcessor(em))
// setup 3, 4, 5: client config fields + pipeline fields + client fields
fields := config.Fields.Clone()
fields.DeepUpdate(global.fields)
if em := config.EventMetadata; len(em.Fields) > 0 {
common.MergeFields(fields, em.Fields.Clone(), em.FieldsUnderRoot)
}
if len(fields) > 0 {
processors.add(makeAddFieldsProcessor("fields", fields, needsCopy))
}

// setup 8: apply EventMetadata fields + tags
processors.add(eventUserAnnotateProcessor)

// setup 9: client processors (C)
if procs := config.Processor; procs != nil {
if lst := procs.All(); len(lst) > 0 {
// setup 5: client processor list
processors.add(localProcessors)

processors.add(&program{
title: "client",
list: lst,
})
}
// setup 6: add beats metadata
if meta := global.beatsMeta; len(meta) > 0 {
processors.add(makeAddFieldsProcessor("beatsMeta", meta, needsCopy))
}

// setup 10: pipeline processors (P)
// setup 7: pipeline processors list
processors.add(global.processors)

// setup 11: debug print final event (P)
// setup 9: debug print final event (P)
if logp.IsDebug("publish") {
processors.add(debugPrintProcessor())
}

// setup 12: drop all events if outputs are disabled
// setup 10: drop all events if outputs are disabled (P)
if global.disabled {
processors.add(dropDisabledProcessor)
}
Expand Down Expand Up @@ -205,67 +208,41 @@ func eventAnnotateProcessor(eventMeta common.EventMetadata) *processorFn {
})
}

func clientEventMeta(meta common.MapStr) *processorFn {
return newAnnotateProcessor("@metadata", func(event *beat.Event) {
if event.Meta == nil {
event.Meta = meta.Clone()
} else {
event.Meta = event.Meta.Clone()
event.Meta.DeepUpdate(meta.Clone())
}
})
func clientEventMeta(meta common.MapStr, needsCopy bool) *processorFn {
fn := func(event *beat.Event) { addMeta(event, meta) }
if needsCopy {
fn = func(event *beat.Event) { addMeta(event, meta.Clone()) }
}
return newAnnotateProcessor("@metadata", fn)
}

func clientEventFields(fields common.MapStr) *processorFn {
return newAnnotateProcessor("globalFields", func(event *beat.Event) {
event.Fields.DeepUpdate(fields.Clone())
})
func addMeta(event *beat.Event, meta common.MapStr) {
if event.Meta == nil {
event.Meta = meta
} else {
event.Meta.Clone()
event.Meta.DeepUpdate(meta)
}
}

// TODO: remove var-section. Keep for backwards compatibility with old publisher API.
// Remove after updating all beats to new publisher API.
// Note: this functionality is used by filebeat/winlogbeat, so prospector/harvesters
// can apply fields to events after generating the event type.
// This functionality will be removed, in favor of harvesters publishing
// event to a beat.Client with properly setup processor
var (
preEventUserAnnotateProcessor = newAnnotateProcessor("annotateEventUserPre", func(event *beat.Event) {
const key = common.EventMetadataKey
val, exists := event.Fields[key]
if !exists {
return
}

delete(event.Fields, key)
func pipelineEventFields(fields common.MapStr, copy bool) *processorFn {
return makeAddFieldsProcessor("pipelineFields", fields, copy)
}

if _, ok := val.(common.EventMetadata); ok {
if event.Meta == nil {
event.Meta = common.MapStr{}
}
event.Meta[key] = val
}
func makeAddTagsProcessor(name string, tags []string) *processorFn {
return newAnnotateProcessor(name, func(event *beat.Event) {
common.AddTags(event.Fields, tags)
})
}

eventUserAnnotateProcessor = newAnnotateProcessor("annotateEventUser", func(event *beat.Event) {
const key = common.EventMetadataKey

tmp, ok := event.Meta[key]
if !ok {
return
}

delete(event.Meta, key)
if len(event.Meta) == 0 {
event.Meta = nil
}
func makeAddFieldsProcessor(name string, fields common.MapStr, copy bool) *processorFn {
fn := func(event *beat.Event) { event.Fields.DeepUpdate(fields) }
if copy {
fn = func(event *beat.Event) { event.Fields.DeepUpdate(fields.Clone()) }
}

eventMeta := tmp.(common.EventMetadata)
common.AddTags(event.Fields, eventMeta.Tags)
if fields := eventMeta.Fields; len(fields) > 0 {
common.MergeFields(event.Fields, fields.Clone(), eventMeta.FieldsUnderRoot)
}
})
)
return newAnnotateProcessor(name, fn)
}

func debugPrintProcessor() *processorFn {
// ensure only one go-routine is using the encoder (in case
Expand All @@ -286,3 +263,15 @@ func debugPrintProcessor() *processorFn {
return event, nil
})
}

func makeClientProcessors(config beat.ClientConfig) processors.Processor {
procs := config.Processor
if procs == nil || len(procs.All()) == 0 {
return nil
}

return &program{
title: "client",
list: procs.All(),
}
}
1 change: 0 additions & 1 deletion metricbeat/mb/module/event.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,6 @@ func (b EventBuilder) Build() (beat.Event, error) {
event := beat.Event{
Timestamp: time.Time(timestamp),
Fields: common.MapStr{
// common.EventMetadataKey: b.metadata,
b.ModuleName: moduleEvent,
"metricset": metricsetData,
},
Expand Down
3 changes: 0 additions & 3 deletions metricbeat/mb/testing/data_generator.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,9 +85,6 @@ func CreateFullEvent(ms mb.MetricSet, metricSetData common.MapStr) beat.Event {
"hostname": "host.example.com",
}

// Delete meta data as not needed for the event output here.
delete(fullEvent.Fields, common.EventMetadataKey)

return fullEvent
}

Expand Down

0 comments on commit e7bfe98

Please sign in to comment.