Skip to content

Commit

Permalink
Refactor input.Event similar to outputs.Data (#3823)
Browse files Browse the repository at this point in the history
  • Loading branch information
vjsamuel authored and ruflin committed Mar 30, 2017
1 parent e41868b commit 1c2d335
Show file tree
Hide file tree
Showing 12 changed files with 159 additions and 103 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,7 @@ https://github.com/elastic/beats/compare/v5.1.1...master[Check the HEAD diff]
- The `symlinks` and `harverster_limit` settings are now GA, instead of experimental. {pull}3525[3525]
- close_timeout is also applied when the output is blocking. {pull}3511[3511]
- Improve handling of different path variants on Windows. {pull}3781[3781]
- Restructure input.Event to be inline with outputs.Data {pull}3823[3823]

*Heartbeat*

Expand Down
14 changes: 7 additions & 7 deletions filebeat/beater/channels.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,12 +19,12 @@ type spoolerOutlet struct {

type publisherChannel struct {
done chan struct{}
ch chan []*input.Event
ch chan []*input.Data
}

type registrarLogger struct {
done chan struct{}
ch chan<- []*input.Event
ch chan<- []*input.Data
}

type finishedLogger struct {
Expand All @@ -44,7 +44,7 @@ func newSpoolerOutlet(
}
}

func (o *spoolerOutlet) OnEvent(event *input.Event) bool {
func (o *spoolerOutlet) OnEvent(event *input.Data) bool {
open := atomic.LoadInt32(&o.isOpen) == 1
if !open {
return false
Expand All @@ -69,12 +69,12 @@ func (o *spoolerOutlet) OnEvent(event *input.Event) bool {
func newPublisherChannel() *publisherChannel {
return &publisherChannel{
done: make(chan struct{}),
ch: make(chan []*input.Event, 1),
ch: make(chan []*input.Data, 1),
}
}

func (c *publisherChannel) Close() { close(c.done) }
func (c *publisherChannel) Send(events []*input.Event) bool {
func (c *publisherChannel) Send(events []*input.Data) bool {
select {
case <-c.done:
// set ch to nil, so no more events will be send after channel close signal
Expand All @@ -96,7 +96,7 @@ func newRegistrarLogger(reg *registrar.Registrar) *registrarLogger {
}

func (l *registrarLogger) Close() { close(l.done) }
func (l *registrarLogger) Published(events []*input.Event) bool {
func (l *registrarLogger) Published(events []*input.Data) bool {
select {
case <-l.done:
// set ch to nil, so no more events will be send after channel close signal
Expand All @@ -114,7 +114,7 @@ func newFinishedLogger(wg *sync.WaitGroup) *finishedLogger {
return &finishedLogger{wg}
}

func (l *finishedLogger) Published(events []*input.Event) bool {
func (l *finishedLogger) Published(events []*input.Data) bool {
for range events {
l.wg.Done()
}
Expand Down
54 changes: 41 additions & 13 deletions filebeat/input/event.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,24 +11,37 @@ import (

// Event is sent to the output and must contain all relevant information
type Event struct {
EventMeta
Text *string
JSONConfig *reader.JSONConfig
Data common.MapStr // Use in readers to add data to the event

}

type EventMeta struct {
common.EventMetadata
ReadTime time.Time
Pipeline string
Fileset string
Module string
InputType string
DocumentType string
ReadTime time.Time
Bytes int
Text *string
JSONConfig *reader.JSONConfig
State file.State
Data common.MapStr // Use in readers to add data to the event
Pipeline string
Fileset string
Module string
}

type Data struct {
Event common.MapStr
Metadata EventMeta
}

func NewEvent(state file.State) *Event {
return &Event{
State: state,
EventMeta: EventMeta{
State: state,
},
}

}

func (e *Event) ToMapStr() common.MapStr {
Expand Down Expand Up @@ -68,21 +81,36 @@ func (e *Event) ToMapStr() common.MapStr {
return event
}

func (e *Event) GetData() Data {
return Data{
Event: e.ToMapStr(),
Metadata: EventMeta{
Pipeline: e.Pipeline,
Bytes: e.Bytes,
State: e.State,
Fileset: e.Fileset,
Module: e.Module,
ReadTime: e.ReadTime,
EventMetadata: e.EventMetadata,
},
}
}

// Metadata creates a common.MapStr containing the metadata to
// be associated with the event.
func (e *Event) Metadata() common.MapStr {
if e.Pipeline != "" {
func (eh *Data) GetMetadata() common.MapStr {
if eh.Metadata.Pipeline != "" {
return common.MapStr{
"pipeline": e.Pipeline,
"pipeline": eh.Metadata.Pipeline,
}
}
return nil
}

// HasData returns true if the event itself contains data
// Events without data are only state updates
func (e *Event) HasData() bool {
return e.Bytes > 0
func (eh *Data) HasData() bool {
return eh.Metadata.Bytes > 0
}

// mergeJSONFields writes the JSON fields in the event map,
Expand Down
108 changes: 65 additions & 43 deletions filebeat/input/event_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,10 +31,13 @@ func TestEventToMapStrJSON(t *testing.T) {
{
// by default, don't overwrite keys
Event: Event{
DocumentType: "test_type",
Text: &text,
Data: common.MapStr{"json": common.MapStr{"type": "test", "text": "hello"}},
JSONConfig: &reader.JSONConfig{KeysUnderRoot: true},
EventMeta: EventMeta{
DocumentType: "test_type",
},

Text: &text,
Data: common.MapStr{"json": common.MapStr{"type": "test", "text": "hello"}},
JSONConfig: &reader.JSONConfig{KeysUnderRoot: true},
},
ExpectedItems: common.MapStr{
"type": "test_type",
Expand All @@ -44,10 +47,12 @@ func TestEventToMapStrJSON(t *testing.T) {
{
// overwrite keys if asked
Event: Event{
DocumentType: "test_type",
Text: &text,
Data: common.MapStr{"json": common.MapStr{"type": "test", "text": "hello"}},
JSONConfig: &reader.JSONConfig{KeysUnderRoot: true, OverwriteKeys: true},
EventMeta: EventMeta{
DocumentType: "test_type",
},
Text: &text,
Data: common.MapStr{"json": common.MapStr{"type": "test", "text": "hello"}},
JSONConfig: &reader.JSONConfig{KeysUnderRoot: true, OverwriteKeys: true},
},
ExpectedItems: common.MapStr{
"type": "test",
Expand All @@ -57,10 +62,12 @@ func TestEventToMapStrJSON(t *testing.T) {
{
// without keys_under_root, put everything in a json key
Event: Event{
DocumentType: "test_type",
Text: &text,
Data: common.MapStr{"json": common.MapStr{"type": "test", "text": "hello"}},
JSONConfig: &reader.JSONConfig{},
EventMeta: EventMeta{
DocumentType: "test_type",
},
Text: &text,
Data: common.MapStr{"json": common.MapStr{"type": "test", "text": "hello"}},
JSONConfig: &reader.JSONConfig{},
},
ExpectedItems: common.MapStr{
"json": common.MapStr{"type": "test", "text": "hello"},
Expand All @@ -70,10 +77,12 @@ func TestEventToMapStrJSON(t *testing.T) {
{
// when MessageKey is defined, the Text overwrites the value of that key
Event: Event{
DocumentType: "test_type",
Text: &text,
Data: common.MapStr{"json": common.MapStr{"type": "test", "text": "hi"}},
JSONConfig: &reader.JSONConfig{MessageKey: "text"},
EventMeta: EventMeta{
DocumentType: "test_type",
},
Text: &text,
Data: common.MapStr{"json": common.MapStr{"type": "test", "text": "hi"}},
JSONConfig: &reader.JSONConfig{MessageKey: "text"},
},
ExpectedItems: common.MapStr{
"json": common.MapStr{"type": "test", "text": "hello"},
Expand All @@ -84,11 +93,13 @@ func TestEventToMapStrJSON(t *testing.T) {
// when @timestamp is in JSON and overwrite_keys is true, parse it
// in a common.Time
Event: Event{
ReadTime: now,
DocumentType: "test_type",
Text: &text,
Data: common.MapStr{"json": common.MapStr{"type": "test", "@timestamp": "2016-04-05T18:47:18.444Z"}},
JSONConfig: &reader.JSONConfig{KeysUnderRoot: true, OverwriteKeys: true},
EventMeta: EventMeta{
DocumentType: "test_type",
ReadTime: now,
},
Text: &text,
Data: common.MapStr{"json": common.MapStr{"type": "test", "@timestamp": "2016-04-05T18:47:18.444Z"}},
JSONConfig: &reader.JSONConfig{KeysUnderRoot: true, OverwriteKeys: true},
},
ExpectedItems: common.MapStr{
"@timestamp": common.MustParseTime("2016-04-05T18:47:18.444Z"),
Expand All @@ -99,11 +110,13 @@ func TestEventToMapStrJSON(t *testing.T) {
// when the parsing on @timestamp fails, leave the existing value and add an error key
// in a common.Time
Event: Event{
ReadTime: now,
DocumentType: "test_type",
Text: &text,
Data: common.MapStr{"json": common.MapStr{"type": "test", "@timestamp": "2016-04-05T18:47:18.44XX4Z"}},
JSONConfig: &reader.JSONConfig{KeysUnderRoot: true, OverwriteKeys: true},
EventMeta: EventMeta{
DocumentType: "test_type",
ReadTime: now,
},
Text: &text,
Data: common.MapStr{"json": common.MapStr{"type": "test", "@timestamp": "2016-04-05T18:47:18.44XX4Z"}},
JSONConfig: &reader.JSONConfig{KeysUnderRoot: true, OverwriteKeys: true},
},
ExpectedItems: common.MapStr{
"@timestamp": common.Time(now),
Expand All @@ -115,11 +128,13 @@ func TestEventToMapStrJSON(t *testing.T) {
// when the @timestamp has the wrong type, leave the existing value and add an error key
// in a common.Time
Event: Event{
ReadTime: now,
DocumentType: "test_type",
Text: &text,
Data: common.MapStr{"json": common.MapStr{"type": "test", "@timestamp": 42}},
JSONConfig: &reader.JSONConfig{KeysUnderRoot: true, OverwriteKeys: true},
EventMeta: EventMeta{
DocumentType: "test_type",
ReadTime: now,
},
Text: &text,
Data: common.MapStr{"json": common.MapStr{"type": "test", "@timestamp": 42}},
JSONConfig: &reader.JSONConfig{KeysUnderRoot: true, OverwriteKeys: true},
},
ExpectedItems: common.MapStr{
"@timestamp": common.Time(now),
Expand All @@ -130,10 +145,12 @@ func TestEventToMapStrJSON(t *testing.T) {
{
// if overwrite_keys is true, but the `type` key in json is not a string, ignore it
Event: Event{
DocumentType: "test_type",
Text: &text,
Data: common.MapStr{"json": common.MapStr{"type": 42}},
JSONConfig: &reader.JSONConfig{KeysUnderRoot: true, OverwriteKeys: true},
EventMeta: EventMeta{
DocumentType: "test_type",
},
Text: &text,
Data: common.MapStr{"json": common.MapStr{"type": 42}},
JSONConfig: &reader.JSONConfig{KeysUnderRoot: true, OverwriteKeys: true},
},
ExpectedItems: common.MapStr{
"type": "test_type",
Expand All @@ -143,10 +160,12 @@ func TestEventToMapStrJSON(t *testing.T) {
{
// if overwrite_keys is true, but the `type` key in json is empty, ignore it
Event: Event{
DocumentType: "test_type",
Text: &text,
Data: common.MapStr{"json": common.MapStr{"type": ""}},
JSONConfig: &reader.JSONConfig{KeysUnderRoot: true, OverwriteKeys: true},
EventMeta: EventMeta{
DocumentType: "test_type",
},
Text: &text,
Data: common.MapStr{"json": common.MapStr{"type": ""}},
JSONConfig: &reader.JSONConfig{KeysUnderRoot: true, OverwriteKeys: true},
},
ExpectedItems: common.MapStr{
"type": "test_type",
Expand All @@ -156,10 +175,13 @@ func TestEventToMapStrJSON(t *testing.T) {
{
// if overwrite_keys is true, but the `type` key in json starts with _, ignore it
Event: Event{
DocumentType: "test_type",
Text: &text,
Data: common.MapStr{"json": common.MapStr{"type": "_type"}},
JSONConfig: &reader.JSONConfig{KeysUnderRoot: true, OverwriteKeys: true},
EventMeta: EventMeta{
DocumentType: "test_type",
ReadTime: now,
},
Text: &text,
Data: common.MapStr{"json": common.MapStr{"type": "_type"}},
JSONConfig: &reader.JSONConfig{KeysUnderRoot: true, OverwriteKeys: true},
},
ExpectedItems: common.MapStr{
"type": "test_type",
Expand Down
6 changes: 4 additions & 2 deletions filebeat/prospector/prospector.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ type Prospectorer interface {

// Outlet is the outlet for a prospector
type Outlet interface {
OnEvent(event *input.Event) bool
OnEvent(event *input.Data) bool
}

// NewProspector instantiates a new prospector
Expand Down Expand Up @@ -214,7 +214,9 @@ func (p *Prospector) updateState(event *input.Event) error {
event.Module = p.config.Module
event.Fileset = p.config.Fileset

ok := p.outlet.OnEvent(event)
eventHolder := event.GetData()
ok := p.outlet.OnEvent(&eventHolder)

if !ok {
logp.Info("Prospector outlet closed")
return errors.New("prospector outlet closed")
Expand Down
2 changes: 1 addition & 1 deletion filebeat/prospector/prospector_log_other_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -155,4 +155,4 @@ func TestInit(t *testing.T) {
// TestOutlet is an empty outlet for testing
type TestOutlet struct{}

func (o TestOutlet) OnEvent(event *input.Event) bool { return true }
func (o TestOutlet) OnEvent(event *input.Data) bool { return true }
6 changes: 3 additions & 3 deletions filebeat/publisher/async.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ import (
type asyncLogPublisher struct {
pub publisher.Publisher
client publisher.Client
in chan []*input.Event
in chan []*input.Data
out SuccessLogger

// list of in-flight batches
Expand All @@ -29,7 +29,7 @@ type asyncLogPublisher struct {
type eventsBatch struct {
next *eventsBatch
flag int32
events []*input.Event
events []*input.Data
}

type batchList struct {
Expand All @@ -50,7 +50,7 @@ const (
)

func newAsyncLogPublisher(
in chan []*input.Event,
in chan []*input.Data,
out SuccessLogger,
pub publisher.Publisher,
) *asyncLogPublisher {
Expand Down
Loading

0 comments on commit 1c2d335

Please sign in to comment.