diff --git a/CHANGELOG.asciidoc b/CHANGELOG.asciidoc index fa40448891c..d94ab8555d4 100644 --- a/CHANGELOG.asciidoc +++ b/CHANGELOG.asciidoc @@ -88,7 +88,7 @@ https://github.com/elastic/beats/compare/v5.1.1...master[Check the HEAD diff] - The `symlinks` and `harverster_limit` settings are now GA, instead of experimental. {pull}3525[3525] - close_timeout is also applied when the output is blocking. {pull}3511[3511] - Improve handling of different path variants on Windows. {pull}3781[3781] -- Add support for prospector level processors. {pull}3823[3823] +- Restructure input.Event to be inline with outputs.Data {pull}3823[3823] *Heartbeat* diff --git a/filebeat/beater/channels.go b/filebeat/beater/channels.go index 328e880b7e5..13c4edc5aae 100644 --- a/filebeat/beater/channels.go +++ b/filebeat/beater/channels.go @@ -19,12 +19,12 @@ type spoolerOutlet struct { type publisherChannel struct { done chan struct{} - ch chan []*input.EventHolder + ch chan []*input.Data } type registrarLogger struct { done chan struct{} - ch chan<- []*input.EventHolder + ch chan<- []*input.Data } type finishedLogger struct { @@ -44,7 +44,7 @@ func newSpoolerOutlet( } } -func (o *spoolerOutlet) OnEvent(event *input.EventHolder) bool { +func (o *spoolerOutlet) OnEvent(event *input.Data) bool { open := atomic.LoadInt32(&o.isOpen) == 1 if !open { return false @@ -69,12 +69,12 @@ func (o *spoolerOutlet) OnEvent(event *input.EventHolder) bool { func newPublisherChannel() *publisherChannel { return &publisherChannel{ done: make(chan struct{}), - ch: make(chan []*input.EventHolder, 1), + ch: make(chan []*input.Data, 1), } } func (c *publisherChannel) Close() { close(c.done) } -func (c *publisherChannel) Send(events []*input.EventHolder) bool { +func (c *publisherChannel) Send(events []*input.Data) bool { select { case <-c.done: // set ch to nil, so no more events will be send after channel close signal @@ -96,7 +96,7 @@ func newRegistrarLogger(reg *registrar.Registrar) *registrarLogger { } func (l *registrarLogger) Close() { close(l.done) } -func (l *registrarLogger) Published(events []*input.EventHolder) bool { +func (l *registrarLogger) Published(events []*input.Data) bool { select { case <-l.done: // set ch to nil, so no more events will be send after channel close signal @@ -114,7 +114,7 @@ func newFinishedLogger(wg *sync.WaitGroup) *finishedLogger { return &finishedLogger{wg} } -func (l *finishedLogger) Published(events []*input.EventHolder) bool { +func (l *finishedLogger) Published(events []*input.Data) bool { for range events { l.wg.Done() } diff --git a/filebeat/input/event.go b/filebeat/input/event.go index 2e797d0b552..f2ae013729d 100644 --- a/filebeat/input/event.go +++ b/filebeat/input/event.go @@ -19,18 +19,18 @@ type Event struct { } type EventMeta struct { + common.EventMetadata Pipeline string Fileset string Module string InputType string DocumentType string - common.EventMetadata - ReadTime time.Time - Bytes int - State file.State + ReadTime time.Time + Bytes int + State file.State } -type EventHolder struct { +type Data struct { Event common.MapStr Metadata EventMeta } @@ -81,8 +81,8 @@ func (e *Event) ToMapStr() common.MapStr { return event } -func (e *Event) GetEventHolder() EventHolder { - return EventHolder{ +func (e *Event) GetEventHolder() Data { + return Data{ Event: e.ToMapStr(), Metadata: EventMeta{ Pipeline: e.Pipeline, @@ -98,7 +98,7 @@ func (e *Event) GetEventHolder() EventHolder { // Metadata creates a common.MapStr containing the metadata to // be associated with the event. -func (eh *EventHolder) GetMetadata() common.MapStr { +func (eh *Data) GetMetadata() common.MapStr { if eh.Metadata.Pipeline != "" { return common.MapStr{ "pipeline": eh.Metadata.Pipeline, @@ -109,7 +109,7 @@ func (eh *EventHolder) GetMetadata() common.MapStr { // HasData returns true if the event itself contains data // Events without data are only state updates -func (eh *EventHolder) HasData() bool { +func (eh *Data) HasData() bool { return eh.Metadata.Bytes > 0 } diff --git a/filebeat/prospector/config.go b/filebeat/prospector/config.go index 839fa2e7932..a5fd6b90b47 100644 --- a/filebeat/prospector/config.go +++ b/filebeat/prospector/config.go @@ -8,7 +8,6 @@ import ( "github.com/elastic/beats/filebeat/harvester/reader" "github.com/elastic/beats/libbeat/common" "github.com/elastic/beats/libbeat/common/match" - "github.com/elastic/beats/libbeat/processors" ) var ( @@ -27,24 +26,23 @@ var ( ) type prospectorConfig struct { - common.EventMetadata `config:",inline"` // Fields and tags to add to events. - Enabled bool `config:"enabled"` - DocumentType string `config:"document_type"` - ExcludeFiles []match.Matcher `config:"exclude_files"` - IgnoreOlder time.Duration `config:"ignore_older"` - Paths []string `config:"paths"` - ScanFrequency time.Duration `config:"scan_frequency" validate:"min=0,nonzero"` - InputType string `config:"input_type"` - CleanInactive time.Duration `config:"clean_inactive" validate:"min=0"` - CleanRemoved bool `config:"clean_removed"` - HarvesterLimit uint64 `config:"harvester_limit" validate:"min=0"` - Symlinks bool `config:"symlinks"` - TailFiles bool `config:"tail_files"` - JSON *reader.JSONConfig `config:"json"` - Pipeline string `config:"pipeline"` - Module string `config:"_module_name"` // hidden option to set the module name - Fileset string `config:"_fileset_name"` // hidden option to set the fileset name - Processors processors.PluginConfig `config:"processors"` + common.EventMetadata `config:",inline"` // Fields and tags to add to events. + Enabled bool `config:"enabled"` + DocumentType string `config:"document_type"` + ExcludeFiles []match.Matcher `config:"exclude_files"` + IgnoreOlder time.Duration `config:"ignore_older"` + Paths []string `config:"paths"` + ScanFrequency time.Duration `config:"scan_frequency" validate:"min=0,nonzero"` + InputType string `config:"input_type"` + CleanInactive time.Duration `config:"clean_inactive" validate:"min=0"` + CleanRemoved bool `config:"clean_removed"` + HarvesterLimit uint64 `config:"harvester_limit" validate:"min=0"` + Symlinks bool `config:"symlinks"` + TailFiles bool `config:"tail_files"` + JSON *reader.JSONConfig `config:"json"` + Pipeline string `config:"pipeline"` + Module string `config:"_module_name"` // hidden option to set the module name + Fileset string `config:"_fileset_name"` // hidden option to set the fileset name } func (config *prospectorConfig) Validate() error { diff --git a/filebeat/prospector/prospector.go b/filebeat/prospector/prospector.go index ba4a8d97884..638eaf68fc2 100644 --- a/filebeat/prospector/prospector.go +++ b/filebeat/prospector/prospector.go @@ -16,7 +16,6 @@ import ( "github.com/elastic/beats/libbeat/common" "github.com/elastic/beats/libbeat/logp" "github.com/elastic/beats/libbeat/monitoring" - "github.com/elastic/beats/libbeat/processors" ) var ( @@ -41,7 +40,6 @@ type Prospector struct { registry *harvesterRegistry beatDone chan struct{} eventCounter *sync.WaitGroup - processors *processors.Processors } // Prospectorer is the interface common to all prospectors @@ -52,7 +50,7 @@ type Prospectorer interface { // Outlet is the outlet for a prospector type Outlet interface { - OnEvent(event *input.EventHolder) bool + OnEvent(event *input.Data) bool } // NewProspector instantiates a new prospector @@ -86,13 +84,6 @@ func NewProspector(cfg *common.Config, outlet Outlet, beatDone chan struct{}) (* return nil, err } - f, err := processors.New(prospector.config.Processors) - if err != nil { - return nil, err - } - - prospector.processors = f - logp.Debug("prospector", "File Configs: %v", prospector.config.Paths) return prospector, nil @@ -224,20 +215,7 @@ func (p *Prospector) updateState(event *input.Event) error { event.Fileset = p.config.Fileset eventHolder := event.GetEventHolder() - //run the filters before sending to - if event.Bytes > 0 { - eventHolder.Event = p.processors.Run(eventHolder.Event) - } - - var ok bool - if eventHolder.Event != nil { - //processor might decide to drop the event - ok = p.outlet.OnEvent(&eventHolder) - - } else { - eventHolder.Metadata.Bytes = 0 - ok = p.outlet.OnEvent(&eventHolder) - } + ok := p.outlet.OnEvent(&eventHolder) if !ok { logp.Info("Prospector outlet closed") diff --git a/filebeat/prospector/prospector_log_other_test.go b/filebeat/prospector/prospector_log_other_test.go index e237302ef5a..da0afe50a5b 100644 --- a/filebeat/prospector/prospector_log_other_test.go +++ b/filebeat/prospector/prospector_log_other_test.go @@ -155,4 +155,4 @@ func TestInit(t *testing.T) { // TestOutlet is an empty outlet for testing type TestOutlet struct{} -func (o TestOutlet) OnEvent(event *input.EventHolder) bool { return true } +func (o TestOutlet) OnEvent(event *input.Data) bool { return true } diff --git a/filebeat/publisher/async.go b/filebeat/publisher/async.go index 39d8852e60a..35b304429ba 100644 --- a/filebeat/publisher/async.go +++ b/filebeat/publisher/async.go @@ -13,7 +13,7 @@ import ( type asyncLogPublisher struct { pub publisher.Publisher client publisher.Client - in chan []*input.EventHolder + in chan []*input.Data out SuccessLogger // list of in-flight batches @@ -29,7 +29,7 @@ type asyncLogPublisher struct { type eventsBatch struct { next *eventsBatch flag int32 - events []*input.EventHolder + events []*input.Data } type batchList struct { @@ -50,7 +50,7 @@ const ( ) func newAsyncLogPublisher( - in chan []*input.EventHolder, + in chan []*input.Data, out SuccessLogger, pub publisher.Publisher, ) *asyncLogPublisher { diff --git a/filebeat/publisher/publisher.go b/filebeat/publisher/publisher.go index 41a6b960d6b..0d982f2f4e0 100644 --- a/filebeat/publisher/publisher.go +++ b/filebeat/publisher/publisher.go @@ -24,12 +24,12 @@ type LogPublisher interface { type SuccessLogger interface { // Published will be run after events have been acknowledged by the outputs. - Published(events []*input.EventHolder) bool + Published(events []*input.Data) bool } func New( async bool, - in chan []*input.EventHolder, + in chan []*input.Data, out SuccessLogger, pub publisher.Publisher, ) LogPublisher { @@ -46,7 +46,7 @@ var ( // getDataEvents returns all events which contain data (not only state updates) // together with their associated metadata -func getDataEvents(events []*input.EventHolder) (dataEvents []common.MapStr, meta []common.MapStr) { +func getDataEvents(events []*input.Data) (dataEvents []common.MapStr, meta []common.MapStr) { dataEvents = make([]common.MapStr, 0, len(events)) meta = make([]common.MapStr, 0, len(events)) for _, event := range events { diff --git a/filebeat/publisher/publisher_test.go b/filebeat/publisher/publisher_test.go index 2ef423bae40..b1f3892d696 100644 --- a/filebeat/publisher/publisher_test.go +++ b/filebeat/publisher/publisher_test.go @@ -16,17 +16,17 @@ import ( type collectLogger struct { wg *sync.WaitGroup - events [][]*input.EventHolder + events [][]*input.Data } -func (l *collectLogger) Published(events []*input.EventHolder) bool { +func (l *collectLogger) Published(events []*input.Data) bool { l.wg.Done() l.events = append(l.events, events) return true } -func makeEvents(name string, n int) []*input.EventHolder { - var events []*input.EventHolder +func makeEvents(name string, n int) []*input.Data { + var events []*input.Data for i := 0; i < n; i++ { event := &input.Event{ EventMeta: input.EventMeta{ @@ -59,7 +59,7 @@ func TestPublisherModes(t *testing.T) { wg := sync.WaitGroup{} - pubChan := make(chan []*input.EventHolder, len(test.order)+1) + pubChan := make(chan []*input.Data, len(test.order)+1) collector := &collectLogger{&wg, nil} client := pubtest.NewChanClient(0) @@ -67,7 +67,7 @@ func TestPublisherModes(t *testing.T) { pubtest.PublisherWithClient(client)) pub.Start() - var events [][]*input.EventHolder + var events [][]*input.Data for i := range test.order { tmp := makeEvents(fmt.Sprintf("msg: %v", i), 1) wg.Add(1) diff --git a/filebeat/publisher/sync.go b/filebeat/publisher/sync.go index dd333414118..2fbe2e2b3ab 100644 --- a/filebeat/publisher/sync.go +++ b/filebeat/publisher/sync.go @@ -11,7 +11,7 @@ import ( type syncLogPublisher struct { pub publisher.Publisher client publisher.Client - in chan []*input.EventHolder + in chan []*input.Data out SuccessLogger done chan struct{} @@ -19,7 +19,7 @@ type syncLogPublisher struct { } func newSyncLogPublisher( - in chan []*input.EventHolder, + in chan []*input.Data, out SuccessLogger, pub publisher.Publisher, ) *syncLogPublisher { @@ -51,7 +51,7 @@ func (p *syncLogPublisher) Start() { } func (p *syncLogPublisher) Publish() error { - var events []*input.EventHolder + var events []*input.Data select { case <-p.done: return sigPublisherStop diff --git a/filebeat/registrar/registrar.go b/filebeat/registrar/registrar.go index 6e2293f70b4..e08a741d368 100644 --- a/filebeat/registrar/registrar.go +++ b/filebeat/registrar/registrar.go @@ -17,7 +17,7 @@ import ( ) type Registrar struct { - Channel chan []*input.EventHolder + Channel chan []*input.Data out publisher.SuccessLogger done chan struct{} registryFile string // Path to the Registry File @@ -38,7 +38,7 @@ func New(registryFile string, out publisher.SuccessLogger) (*Registrar, error) { registryFile: registryFile, done: make(chan struct{}), states: file.NewStates(), - Channel: make(chan []*input.EventHolder, 1), + Channel: make(chan []*input.Data, 1), out: out, wg: sync.WaitGroup{}, } @@ -153,7 +153,7 @@ func (r *Registrar) Run() { }() for { - var events []*input.EventHolder + var events []*input.Data select { case <-r.done: @@ -183,7 +183,7 @@ func (r *Registrar) Run() { } // processEventStates gets the states from the events and writes them to the registrar state -func (r *Registrar) processEventStates(events []*input.EventHolder) { +func (r *Registrar) processEventStates(events []*input.Data) { logp.Debug("registrar", "Processing %d events", len(events)) // skip stdin diff --git a/filebeat/spooler/spooler.go b/filebeat/spooler/spooler.go index f97c9b7e2fe..5279581292d 100644 --- a/filebeat/spooler/spooler.go +++ b/filebeat/spooler/spooler.go @@ -16,16 +16,16 @@ const channelSize = 16 // Spooler aggregates the events and sends the aggregated data to the publisher. type Spooler struct { - Channel chan *input.EventHolder // Channel is the input to the Spooler. + Channel chan *input.Data // Channel is the input to the Spooler. config spoolerConfig - output Output // batch event output on flush - spool []*input.EventHolder // Events being held by the Spooler. - wg sync.WaitGroup // WaitGroup used to control the shutdown. + output Output // batch event output on flush + spool []*input.Data // Events being held by the Spooler. + wg sync.WaitGroup // WaitGroup used to control the shutdown. } // Output spooler sends event to through Send method type Output interface { - Send(events []*input.EventHolder) bool + Send(events []*input.Data) bool } type spoolerConfig struct { @@ -40,13 +40,13 @@ func New( out Output, ) (*Spooler, error) { return &Spooler{ - Channel: make(chan *input.EventHolder, channelSize), + Channel: make(chan *input.Data, channelSize), config: spoolerConfig{ idleTimeout: config.IdleTimeout, spoolSize: config.SpoolSize, }, output: out, - spool: make([]*input.EventHolder, 0, config.SpoolSize), + spool: make([]*input.Data, 0, config.SpoolSize), }, nil } @@ -111,7 +111,7 @@ func (s *Spooler) Stop() { // queue queues a single event to be spooled. If the queue reaches spoolSize // while calling this method then all events in the queue will be flushed to // the publisher. -func (s *Spooler) queue(event *input.EventHolder) bool { +func (s *Spooler) queue(event *input.Data) bool { flushed := false s.spool = append(s.spool, event) if len(s.spool) == cap(s.spool) { @@ -131,7 +131,7 @@ func (s *Spooler) flush() int { } // copy buffer - tmpCopy := make([]*input.EventHolder, count) + tmpCopy := make([]*input.Data, count) copy(tmpCopy, s.spool) // clear buffer