diff --git a/CHANGELOG.asciidoc b/CHANGELOG.asciidoc index 12a7cc44ad7..5383c43e26a 100644 --- a/CHANGELOG.asciidoc +++ b/CHANGELOG.asciidoc @@ -88,6 +88,7 @@ https://github.com/elastic/beats/compare/v5.1.1...master[Check the HEAD diff] - The `symlinks` and `harverster_limit` settings are now GA, instead of experimental. {pull}3525[3525] - close_timeout is also applied when the output is blocking. {pull}3511[3511] - Improve handling of different path variants on Windows. {pull}3781[3781] +- Restructure input.Event to be inline with outputs.Data {pull}3823[3823] *Heartbeat* diff --git a/filebeat/beater/channels.go b/filebeat/beater/channels.go index bf0f318d622..13c4edc5aae 100644 --- a/filebeat/beater/channels.go +++ b/filebeat/beater/channels.go @@ -19,12 +19,12 @@ type spoolerOutlet struct { type publisherChannel struct { done chan struct{} - ch chan []*input.Event + ch chan []*input.Data } type registrarLogger struct { done chan struct{} - ch chan<- []*input.Event + ch chan<- []*input.Data } type finishedLogger struct { @@ -44,7 +44,7 @@ func newSpoolerOutlet( } } -func (o *spoolerOutlet) OnEvent(event *input.Event) bool { +func (o *spoolerOutlet) OnEvent(event *input.Data) bool { open := atomic.LoadInt32(&o.isOpen) == 1 if !open { return false @@ -69,12 +69,12 @@ func (o *spoolerOutlet) OnEvent(event *input.Event) bool { func newPublisherChannel() *publisherChannel { return &publisherChannel{ done: make(chan struct{}), - ch: make(chan []*input.Event, 1), + ch: make(chan []*input.Data, 1), } } func (c *publisherChannel) Close() { close(c.done) } -func (c *publisherChannel) Send(events []*input.Event) bool { +func (c *publisherChannel) Send(events []*input.Data) bool { select { case <-c.done: // set ch to nil, so no more events will be send after channel close signal @@ -96,7 +96,7 @@ func newRegistrarLogger(reg *registrar.Registrar) *registrarLogger { } func (l *registrarLogger) Close() { close(l.done) } -func (l *registrarLogger) Published(events []*input.Event) bool { +func (l *registrarLogger) Published(events []*input.Data) bool { select { case <-l.done: // set ch to nil, so no more events will be send after channel close signal @@ -114,7 +114,7 @@ func newFinishedLogger(wg *sync.WaitGroup) *finishedLogger { return &finishedLogger{wg} } -func (l *finishedLogger) Published(events []*input.Event) bool { +func (l *finishedLogger) Published(events []*input.Data) bool { for range events { l.wg.Done() } diff --git a/filebeat/input/event.go b/filebeat/input/event.go index 51267dba763..b0ba0b5f0ba 100644 --- a/filebeat/input/event.go +++ b/filebeat/input/event.go @@ -11,24 +11,37 @@ import ( // Event is sent to the output and must contain all relevant information type Event struct { + EventMeta + Text *string + JSONConfig *reader.JSONConfig + Data common.MapStr // Use in readers to add data to the event + +} + +type EventMeta struct { common.EventMetadata - ReadTime time.Time + Pipeline string + Fileset string + Module string InputType string DocumentType string + ReadTime time.Time Bytes int - Text *string - JSONConfig *reader.JSONConfig State file.State - Data common.MapStr // Use in readers to add data to the event - Pipeline string - Fileset string - Module string +} + +type Data struct { + Event common.MapStr + Metadata EventMeta } func NewEvent(state file.State) *Event { return &Event{ - State: state, + EventMeta: EventMeta{ + State: state, + }, } + } func (e *Event) ToMapStr() common.MapStr { @@ -68,12 +81,27 @@ func (e *Event) ToMapStr() common.MapStr { return event } +func (e *Event) GetData() Data { + return Data{ + Event: e.ToMapStr(), + Metadata: EventMeta{ + Pipeline: e.Pipeline, + Bytes: e.Bytes, + State: e.State, + Fileset: e.Fileset, + Module: e.Module, + ReadTime: e.ReadTime, + EventMetadata: e.EventMetadata, + }, + } +} + // Metadata creates a common.MapStr containing the metadata to // be associated with the event. -func (e *Event) Metadata() common.MapStr { - if e.Pipeline != "" { +func (eh *Data) GetMetadata() common.MapStr { + if eh.Metadata.Pipeline != "" { return common.MapStr{ - "pipeline": e.Pipeline, + "pipeline": eh.Metadata.Pipeline, } } return nil @@ -81,8 +109,8 @@ func (e *Event) Metadata() common.MapStr { // HasData returns true if the event itself contains data // Events without data are only state updates -func (e *Event) HasData() bool { - return e.Bytes > 0 +func (eh *Data) HasData() bool { + return eh.Metadata.Bytes > 0 } // mergeJSONFields writes the JSON fields in the event map, diff --git a/filebeat/input/event_test.go b/filebeat/input/event_test.go index c678d262139..e41fa679da8 100644 --- a/filebeat/input/event_test.go +++ b/filebeat/input/event_test.go @@ -31,10 +31,13 @@ func TestEventToMapStrJSON(t *testing.T) { { // by default, don't overwrite keys Event: Event{ - DocumentType: "test_type", - Text: &text, - Data: common.MapStr{"json": common.MapStr{"type": "test", "text": "hello"}}, - JSONConfig: &reader.JSONConfig{KeysUnderRoot: true}, + EventMeta: EventMeta{ + DocumentType: "test_type", + }, + + Text: &text, + Data: common.MapStr{"json": common.MapStr{"type": "test", "text": "hello"}}, + JSONConfig: &reader.JSONConfig{KeysUnderRoot: true}, }, ExpectedItems: common.MapStr{ "type": "test_type", @@ -44,10 +47,12 @@ func TestEventToMapStrJSON(t *testing.T) { { // overwrite keys if asked Event: Event{ - DocumentType: "test_type", - Text: &text, - Data: common.MapStr{"json": common.MapStr{"type": "test", "text": "hello"}}, - JSONConfig: &reader.JSONConfig{KeysUnderRoot: true, OverwriteKeys: true}, + EventMeta: EventMeta{ + DocumentType: "test_type", + }, + Text: &text, + Data: common.MapStr{"json": common.MapStr{"type": "test", "text": "hello"}}, + JSONConfig: &reader.JSONConfig{KeysUnderRoot: true, OverwriteKeys: true}, }, ExpectedItems: common.MapStr{ "type": "test", @@ -57,10 +62,12 @@ func TestEventToMapStrJSON(t *testing.T) { { // without keys_under_root, put everything in a json key Event: Event{ - DocumentType: "test_type", - Text: &text, - Data: common.MapStr{"json": common.MapStr{"type": "test", "text": "hello"}}, - JSONConfig: &reader.JSONConfig{}, + EventMeta: EventMeta{ + DocumentType: "test_type", + }, + Text: &text, + Data: common.MapStr{"json": common.MapStr{"type": "test", "text": "hello"}}, + JSONConfig: &reader.JSONConfig{}, }, ExpectedItems: common.MapStr{ "json": common.MapStr{"type": "test", "text": "hello"}, @@ -70,10 +77,12 @@ func TestEventToMapStrJSON(t *testing.T) { { // when MessageKey is defined, the Text overwrites the value of that key Event: Event{ - DocumentType: "test_type", - Text: &text, - Data: common.MapStr{"json": common.MapStr{"type": "test", "text": "hi"}}, - JSONConfig: &reader.JSONConfig{MessageKey: "text"}, + EventMeta: EventMeta{ + DocumentType: "test_type", + }, + Text: &text, + Data: common.MapStr{"json": common.MapStr{"type": "test", "text": "hi"}}, + JSONConfig: &reader.JSONConfig{MessageKey: "text"}, }, ExpectedItems: common.MapStr{ "json": common.MapStr{"type": "test", "text": "hello"}, @@ -84,11 +93,13 @@ func TestEventToMapStrJSON(t *testing.T) { // when @timestamp is in JSON and overwrite_keys is true, parse it // in a common.Time Event: Event{ - ReadTime: now, - DocumentType: "test_type", - Text: &text, - Data: common.MapStr{"json": common.MapStr{"type": "test", "@timestamp": "2016-04-05T18:47:18.444Z"}}, - JSONConfig: &reader.JSONConfig{KeysUnderRoot: true, OverwriteKeys: true}, + EventMeta: EventMeta{ + DocumentType: "test_type", + ReadTime: now, + }, + Text: &text, + Data: common.MapStr{"json": common.MapStr{"type": "test", "@timestamp": "2016-04-05T18:47:18.444Z"}}, + JSONConfig: &reader.JSONConfig{KeysUnderRoot: true, OverwriteKeys: true}, }, ExpectedItems: common.MapStr{ "@timestamp": common.MustParseTime("2016-04-05T18:47:18.444Z"), @@ -99,11 +110,13 @@ func TestEventToMapStrJSON(t *testing.T) { // when the parsing on @timestamp fails, leave the existing value and add an error key // in a common.Time Event: Event{ - ReadTime: now, - DocumentType: "test_type", - Text: &text, - Data: common.MapStr{"json": common.MapStr{"type": "test", "@timestamp": "2016-04-05T18:47:18.44XX4Z"}}, - JSONConfig: &reader.JSONConfig{KeysUnderRoot: true, OverwriteKeys: true}, + EventMeta: EventMeta{ + DocumentType: "test_type", + ReadTime: now, + }, + Text: &text, + Data: common.MapStr{"json": common.MapStr{"type": "test", "@timestamp": "2016-04-05T18:47:18.44XX4Z"}}, + JSONConfig: &reader.JSONConfig{KeysUnderRoot: true, OverwriteKeys: true}, }, ExpectedItems: common.MapStr{ "@timestamp": common.Time(now), @@ -115,11 +128,13 @@ func TestEventToMapStrJSON(t *testing.T) { // when the @timestamp has the wrong type, leave the existing value and add an error key // in a common.Time Event: Event{ - ReadTime: now, - DocumentType: "test_type", - Text: &text, - Data: common.MapStr{"json": common.MapStr{"type": "test", "@timestamp": 42}}, - JSONConfig: &reader.JSONConfig{KeysUnderRoot: true, OverwriteKeys: true}, + EventMeta: EventMeta{ + DocumentType: "test_type", + ReadTime: now, + }, + Text: &text, + Data: common.MapStr{"json": common.MapStr{"type": "test", "@timestamp": 42}}, + JSONConfig: &reader.JSONConfig{KeysUnderRoot: true, OverwriteKeys: true}, }, ExpectedItems: common.MapStr{ "@timestamp": common.Time(now), @@ -130,10 +145,12 @@ func TestEventToMapStrJSON(t *testing.T) { { // if overwrite_keys is true, but the `type` key in json is not a string, ignore it Event: Event{ - DocumentType: "test_type", - Text: &text, - Data: common.MapStr{"json": common.MapStr{"type": 42}}, - JSONConfig: &reader.JSONConfig{KeysUnderRoot: true, OverwriteKeys: true}, + EventMeta: EventMeta{ + DocumentType: "test_type", + }, + Text: &text, + Data: common.MapStr{"json": common.MapStr{"type": 42}}, + JSONConfig: &reader.JSONConfig{KeysUnderRoot: true, OverwriteKeys: true}, }, ExpectedItems: common.MapStr{ "type": "test_type", @@ -143,10 +160,12 @@ func TestEventToMapStrJSON(t *testing.T) { { // if overwrite_keys is true, but the `type` key in json is empty, ignore it Event: Event{ - DocumentType: "test_type", - Text: &text, - Data: common.MapStr{"json": common.MapStr{"type": ""}}, - JSONConfig: &reader.JSONConfig{KeysUnderRoot: true, OverwriteKeys: true}, + EventMeta: EventMeta{ + DocumentType: "test_type", + }, + Text: &text, + Data: common.MapStr{"json": common.MapStr{"type": ""}}, + JSONConfig: &reader.JSONConfig{KeysUnderRoot: true, OverwriteKeys: true}, }, ExpectedItems: common.MapStr{ "type": "test_type", @@ -156,10 +175,13 @@ func TestEventToMapStrJSON(t *testing.T) { { // if overwrite_keys is true, but the `type` key in json starts with _, ignore it Event: Event{ - DocumentType: "test_type", - Text: &text, - Data: common.MapStr{"json": common.MapStr{"type": "_type"}}, - JSONConfig: &reader.JSONConfig{KeysUnderRoot: true, OverwriteKeys: true}, + EventMeta: EventMeta{ + DocumentType: "test_type", + ReadTime: now, + }, + Text: &text, + Data: common.MapStr{"json": common.MapStr{"type": "_type"}}, + JSONConfig: &reader.JSONConfig{KeysUnderRoot: true, OverwriteKeys: true}, }, ExpectedItems: common.MapStr{ "type": "test_type", diff --git a/filebeat/prospector/prospector.go b/filebeat/prospector/prospector.go index 2bbdccc90c5..b29c41ea382 100644 --- a/filebeat/prospector/prospector.go +++ b/filebeat/prospector/prospector.go @@ -50,7 +50,7 @@ type Prospectorer interface { // Outlet is the outlet for a prospector type Outlet interface { - OnEvent(event *input.Event) bool + OnEvent(event *input.Data) bool } // NewProspector instantiates a new prospector @@ -214,7 +214,9 @@ func (p *Prospector) updateState(event *input.Event) error { event.Module = p.config.Module event.Fileset = p.config.Fileset - ok := p.outlet.OnEvent(event) + eventHolder := event.GetData() + ok := p.outlet.OnEvent(&eventHolder) + if !ok { logp.Info("Prospector outlet closed") return errors.New("prospector outlet closed") diff --git a/filebeat/prospector/prospector_log_other_test.go b/filebeat/prospector/prospector_log_other_test.go index b09982bb858..da0afe50a5b 100644 --- a/filebeat/prospector/prospector_log_other_test.go +++ b/filebeat/prospector/prospector_log_other_test.go @@ -155,4 +155,4 @@ func TestInit(t *testing.T) { // TestOutlet is an empty outlet for testing type TestOutlet struct{} -func (o TestOutlet) OnEvent(event *input.Event) bool { return true } +func (o TestOutlet) OnEvent(event *input.Data) bool { return true } diff --git a/filebeat/publisher/async.go b/filebeat/publisher/async.go index 38ef9af77ba..35b304429ba 100644 --- a/filebeat/publisher/async.go +++ b/filebeat/publisher/async.go @@ -13,7 +13,7 @@ import ( type asyncLogPublisher struct { pub publisher.Publisher client publisher.Client - in chan []*input.Event + in chan []*input.Data out SuccessLogger // list of in-flight batches @@ -29,7 +29,7 @@ type asyncLogPublisher struct { type eventsBatch struct { next *eventsBatch flag int32 - events []*input.Event + events []*input.Data } type batchList struct { @@ -50,7 +50,7 @@ const ( ) func newAsyncLogPublisher( - in chan []*input.Event, + in chan []*input.Data, out SuccessLogger, pub publisher.Publisher, ) *asyncLogPublisher { diff --git a/filebeat/publisher/publisher.go b/filebeat/publisher/publisher.go index 6f78fa48071..0d982f2f4e0 100644 --- a/filebeat/publisher/publisher.go +++ b/filebeat/publisher/publisher.go @@ -24,12 +24,12 @@ type LogPublisher interface { type SuccessLogger interface { // Published will be run after events have been acknowledged by the outputs. - Published(events []*input.Event) bool + Published(events []*input.Data) bool } func New( async bool, - in chan []*input.Event, + in chan []*input.Data, out SuccessLogger, pub publisher.Publisher, ) LogPublisher { @@ -46,13 +46,13 @@ var ( // getDataEvents returns all events which contain data (not only state updates) // together with their associated metadata -func getDataEvents(events []*input.Event) (dataEvents []common.MapStr, meta []common.MapStr) { +func getDataEvents(events []*input.Data) (dataEvents []common.MapStr, meta []common.MapStr) { dataEvents = make([]common.MapStr, 0, len(events)) meta = make([]common.MapStr, 0, len(events)) for _, event := range events { if event.HasData() { - dataEvents = append(dataEvents, event.ToMapStr()) - meta = append(meta, event.Metadata()) + dataEvents = append(dataEvents, event.Event) + meta = append(meta, event.GetMetadata()) } } return dataEvents, meta diff --git a/filebeat/publisher/publisher_test.go b/filebeat/publisher/publisher_test.go index 28039a5210d..927b1636a8b 100644 --- a/filebeat/publisher/publisher_test.go +++ b/filebeat/publisher/publisher_test.go @@ -16,25 +16,29 @@ import ( type collectLogger struct { wg *sync.WaitGroup - events [][]*input.Event + events [][]*input.Data } -func (l *collectLogger) Published(events []*input.Event) bool { +func (l *collectLogger) Published(events []*input.Data) bool { l.wg.Done() l.events = append(l.events, events) return true } -func makeEvents(name string, n int) []*input.Event { - var events []*input.Event +func makeEvents(name string, n int) []*input.Data { + var events []*input.Data for i := 0; i < n; i++ { event := &input.Event{ - ReadTime: time.Now(), - InputType: "log", - DocumentType: "log", - Bytes: 100, + EventMeta: input.EventMeta{ + ReadTime: time.Now(), + InputType: "log", + DocumentType: "log", + Bytes: 100, + }, } - events = append(events, event) + + eventHolder := event.GetData() + events = append(events, &eventHolder) } return events } @@ -55,7 +59,7 @@ func TestPublisherModes(t *testing.T) { wg := sync.WaitGroup{} - pubChan := make(chan []*input.Event, len(test.order)+1) + pubChan := make(chan []*input.Data, len(test.order)+1) collector := &collectLogger{&wg, nil} client := pubtest.NewChanClient(0) @@ -63,7 +67,7 @@ func TestPublisherModes(t *testing.T) { pubtest.PublisherWithClient(client)) pub.Start() - var events [][]*input.Event + var events [][]*input.Data for i := range test.order { tmp := makeEvents(fmt.Sprintf("msg: %v", i), 1) wg.Add(1) diff --git a/filebeat/publisher/sync.go b/filebeat/publisher/sync.go index 5b4885959dc..2fbe2e2b3ab 100644 --- a/filebeat/publisher/sync.go +++ b/filebeat/publisher/sync.go @@ -11,7 +11,7 @@ import ( type syncLogPublisher struct { pub publisher.Publisher client publisher.Client - in chan []*input.Event + in chan []*input.Data out SuccessLogger done chan struct{} @@ -19,7 +19,7 @@ type syncLogPublisher struct { } func newSyncLogPublisher( - in chan []*input.Event, + in chan []*input.Data, out SuccessLogger, pub publisher.Publisher, ) *syncLogPublisher { @@ -51,7 +51,7 @@ func (p *syncLogPublisher) Start() { } func (p *syncLogPublisher) Publish() error { - var events []*input.Event + var events []*input.Data select { case <-p.done: return sigPublisherStop diff --git a/filebeat/registrar/registrar.go b/filebeat/registrar/registrar.go index 8e30ac7b61f..e08a741d368 100644 --- a/filebeat/registrar/registrar.go +++ b/filebeat/registrar/registrar.go @@ -17,7 +17,7 @@ import ( ) type Registrar struct { - Channel chan []*input.Event + Channel chan []*input.Data out publisher.SuccessLogger done chan struct{} registryFile string // Path to the Registry File @@ -38,7 +38,7 @@ func New(registryFile string, out publisher.SuccessLogger) (*Registrar, error) { registryFile: registryFile, done: make(chan struct{}), states: file.NewStates(), - Channel: make(chan []*input.Event, 1), + Channel: make(chan []*input.Data, 1), out: out, wg: sync.WaitGroup{}, } @@ -153,7 +153,7 @@ func (r *Registrar) Run() { }() for { - var events []*input.Event + var events []*input.Data select { case <-r.done: @@ -183,17 +183,17 @@ func (r *Registrar) Run() { } // processEventStates gets the states from the events and writes them to the registrar state -func (r *Registrar) processEventStates(events []*input.Event) { +func (r *Registrar) processEventStates(events []*input.Data) { logp.Debug("registrar", "Processing %d events", len(events)) - // Take the last event found for each file source + // skip stdin for _, event := range events { // skip stdin - if event.InputType == cfg.StdinInputType { + if event.Metadata.InputType == cfg.StdinInputType { continue } - r.states.Update(event.State) + r.states.Update(event.Metadata.State) statesUpdate.Add(1) } } diff --git a/filebeat/spooler/spooler.go b/filebeat/spooler/spooler.go index a5d3c4e3cea..5279581292d 100644 --- a/filebeat/spooler/spooler.go +++ b/filebeat/spooler/spooler.go @@ -16,16 +16,16 @@ const channelSize = 16 // Spooler aggregates the events and sends the aggregated data to the publisher. type Spooler struct { - Channel chan *input.Event // Channel is the input to the Spooler. + Channel chan *input.Data // Channel is the input to the Spooler. config spoolerConfig output Output // batch event output on flush - spool []*input.Event // Events being held by the Spooler. + spool []*input.Data // Events being held by the Spooler. wg sync.WaitGroup // WaitGroup used to control the shutdown. } // Output spooler sends event to through Send method type Output interface { - Send(events []*input.Event) bool + Send(events []*input.Data) bool } type spoolerConfig struct { @@ -40,13 +40,13 @@ func New( out Output, ) (*Spooler, error) { return &Spooler{ - Channel: make(chan *input.Event, channelSize), + Channel: make(chan *input.Data, channelSize), config: spoolerConfig{ idleTimeout: config.IdleTimeout, spoolSize: config.SpoolSize, }, output: out, - spool: make([]*input.Event, 0, config.SpoolSize), + spool: make([]*input.Data, 0, config.SpoolSize), }, nil } @@ -74,7 +74,6 @@ func (s *Spooler) run() { if !ok { return } - if event != nil { flushed := s.queue(event) if flushed { @@ -112,7 +111,7 @@ func (s *Spooler) Stop() { // queue queues a single event to be spooled. If the queue reaches spoolSize // while calling this method then all events in the queue will be flushed to // the publisher. -func (s *Spooler) queue(event *input.Event) bool { +func (s *Spooler) queue(event *input.Data) bool { flushed := false s.spool = append(s.spool, event) if len(s.spool) == cap(s.spool) { @@ -132,7 +131,7 @@ func (s *Spooler) flush() int { } // copy buffer - tmpCopy := make([]*input.Event, count) + tmpCopy := make([]*input.Data, count) copy(tmpCopy, s.spool) // clear buffer