Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Refactor input.Event similar to outputs.Data #3823

Merged
merged 5 commits into from
Mar 30, 2017
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,7 @@ https://github.com/elastic/beats/compare/v5.1.1...master[Check the HEAD diff]
- The `symlinks` and `harverster_limit` settings are now GA, instead of experimental. {pull}3525[3525]
- close_timeout is also applied when the output is blocking. {pull}3511[3511]
- Improve handling of different path variants on Windows. {pull}3781[3781]
- Add support for prospector level processors. {pull}3823[3823]

*Heartbeat*

Expand Down
14 changes: 7 additions & 7 deletions filebeat/beater/channels.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,12 +19,12 @@ type spoolerOutlet struct {

type publisherChannel struct {
done chan struct{}
ch chan []*input.Event
ch chan []*input.EventHolder
}

type registrarLogger struct {
done chan struct{}
ch chan<- []*input.Event
ch chan<- []*input.EventHolder
}

type finishedLogger struct {
Expand All @@ -44,7 +44,7 @@ func newSpoolerOutlet(
}
}

func (o *spoolerOutlet) OnEvent(event *input.Event) bool {
func (o *spoolerOutlet) OnEvent(event *input.EventHolder) bool {
open := atomic.LoadInt32(&o.isOpen) == 1
if !open {
return false
Expand All @@ -69,12 +69,12 @@ func (o *spoolerOutlet) OnEvent(event *input.Event) bool {
func newPublisherChannel() *publisherChannel {
return &publisherChannel{
done: make(chan struct{}),
ch: make(chan []*input.Event, 1),
ch: make(chan []*input.EventHolder, 1),
}
}

func (c *publisherChannel) Close() { close(c.done) }
func (c *publisherChannel) Send(events []*input.Event) bool {
func (c *publisherChannel) Send(events []*input.EventHolder) bool {
select {
case <-c.done:
// set ch to nil, so no more events will be send after channel close signal
Expand All @@ -96,7 +96,7 @@ func newRegistrarLogger(reg *registrar.Registrar) *registrarLogger {
}

func (l *registrarLogger) Close() { close(l.done) }
func (l *registrarLogger) Published(events []*input.Event) bool {
func (l *registrarLogger) Published(events []*input.EventHolder) bool {
select {
case <-l.done:
// set ch to nil, so no more events will be send after channel close signal
Expand All @@ -114,7 +114,7 @@ func newFinishedLogger(wg *sync.WaitGroup) *finishedLogger {
return &finishedLogger{wg}
}

func (l *finishedLogger) Published(events []*input.Event) bool {
func (l *finishedLogger) Published(events []*input.EventHolder) bool {
for range events {
l.wg.Done()
}
Expand Down
58 changes: 43 additions & 15 deletions filebeat/input/event.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,24 +11,37 @@ import (

// Event is sent to the output and must contain all relevant information
type Event struct {
common.EventMetadata
ReadTime time.Time
InputType string
DocumentType string
Bytes int
Text *string
JSONConfig *reader.JSONConfig
State file.State
Data common.MapStr // Use in readers to add data to the event
EventMeta
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So currently we have EventMeta twice, means two copies of the data? What would be the affect if we not have it in the Event? I think we will need in the future processors to have access to the meta data too, but do we need this in the first step?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Event will be used in harvesters and when it reaches the processor, we do a event.GetEventHolder(), which would (you are right), copy the metadata from Event to EventHolder and generate the MapStr. But after that since Event object goes out of scope, it would be garbage collected if I am not wrong. Would that be an issue? Today when we generate the common.MapStr, we copy all the log related fields over. Now with this change, we additionally copy over the meta fields as well. Please correct me if I'm wrong.

Text *string
JSONConfig *reader.JSONConfig
Data common.MapStr // Use in readers to add data to the event

}

type EventMeta struct {
Pipeline string
Fileset string
Module string
InputType string
DocumentType string
common.EventMetadata
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

lets move the common.EventMetadata to the the first line of EventMeta

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done.

ReadTime time.Time
Bytes int
State file.State
}

type EventHolder struct {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we rename EventHolder to Data to be consistent with https://github.com/elastic/beats/blob/master/libbeat/outputs/outputs.go#L20 ? I'm ok with keeping Metadata as here it is at the moment only about Metadata

Event common.MapStr
Metadata EventMeta
}

func NewEvent(state file.State) *Event {
return &Event{
State: state,
EventMeta: EventMeta{
State: state,
},
}

}

func (e *Event) ToMapStr() common.MapStr {
Expand Down Expand Up @@ -68,21 +81,36 @@ func (e *Event) ToMapStr() common.MapStr {
return event
}

func (e *Event) GetEventHolder() EventHolder {
return EventHolder{
Event: e.ToMapStr(),
Metadata: EventMeta{
Pipeline: e.Pipeline,
Bytes: e.Bytes,
State: e.State,
Fileset: e.Fileset,
Module: e.Module,
ReadTime: e.ReadTime,
EventMetadata: e.EventMetadata,
},
}
}

// Metadata creates a common.MapStr containing the metadata to
// be associated with the event.
func (e *Event) Metadata() common.MapStr {
if e.Pipeline != "" {
func (eh *EventHolder) GetMetadata() common.MapStr {
if eh.Metadata.Pipeline != "" {
return common.MapStr{
"pipeline": e.Pipeline,
"pipeline": eh.Metadata.Pipeline,
}
}
return nil
}

// HasData returns true if the event itself contains data
// Events without data are only state updates
func (e *Event) HasData() bool {
return e.Bytes > 0
func (eh *EventHolder) HasData() bool {
return eh.Metadata.Bytes > 0
}

// mergeJSONFields writes the JSON fields in the event map,
Expand Down
108 changes: 65 additions & 43 deletions filebeat/input/event_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,10 +31,13 @@ func TestEventToMapStrJSON(t *testing.T) {
{
// by default, don't overwrite keys
Event: Event{
DocumentType: "test_type",
Text: &text,
Data: common.MapStr{"json": common.MapStr{"type": "test", "text": "hello"}},
JSONConfig: &reader.JSONConfig{KeysUnderRoot: true},
EventMeta: EventMeta{
DocumentType: "test_type",
},

Text: &text,
Data: common.MapStr{"json": common.MapStr{"type": "test", "text": "hello"}},
JSONConfig: &reader.JSONConfig{KeysUnderRoot: true},
},
ExpectedItems: common.MapStr{
"type": "test_type",
Expand All @@ -44,10 +47,12 @@ func TestEventToMapStrJSON(t *testing.T) {
{
// overwrite keys if asked
Event: Event{
DocumentType: "test_type",
Text: &text,
Data: common.MapStr{"json": common.MapStr{"type": "test", "text": "hello"}},
JSONConfig: &reader.JSONConfig{KeysUnderRoot: true, OverwriteKeys: true},
EventMeta: EventMeta{
DocumentType: "test_type",
},
Text: &text,
Data: common.MapStr{"json": common.MapStr{"type": "test", "text": "hello"}},
JSONConfig: &reader.JSONConfig{KeysUnderRoot: true, OverwriteKeys: true},
},
ExpectedItems: common.MapStr{
"type": "test",
Expand All @@ -57,10 +62,12 @@ func TestEventToMapStrJSON(t *testing.T) {
{
// without keys_under_root, put everything in a json key
Event: Event{
DocumentType: "test_type",
Text: &text,
Data: common.MapStr{"json": common.MapStr{"type": "test", "text": "hello"}},
JSONConfig: &reader.JSONConfig{},
EventMeta: EventMeta{
DocumentType: "test_type",
},
Text: &text,
Data: common.MapStr{"json": common.MapStr{"type": "test", "text": "hello"}},
JSONConfig: &reader.JSONConfig{},
},
ExpectedItems: common.MapStr{
"json": common.MapStr{"type": "test", "text": "hello"},
Expand All @@ -70,10 +77,12 @@ func TestEventToMapStrJSON(t *testing.T) {
{
// when MessageKey is defined, the Text overwrites the value of that key
Event: Event{
DocumentType: "test_type",
Text: &text,
Data: common.MapStr{"json": common.MapStr{"type": "test", "text": "hi"}},
JSONConfig: &reader.JSONConfig{MessageKey: "text"},
EventMeta: EventMeta{
DocumentType: "test_type",
},
Text: &text,
Data: common.MapStr{"json": common.MapStr{"type": "test", "text": "hi"}},
JSONConfig: &reader.JSONConfig{MessageKey: "text"},
},
ExpectedItems: common.MapStr{
"json": common.MapStr{"type": "test", "text": "hello"},
Expand All @@ -84,11 +93,13 @@ func TestEventToMapStrJSON(t *testing.T) {
// when @timestamp is in JSON and overwrite_keys is true, parse it
// in a common.Time
Event: Event{
ReadTime: now,
DocumentType: "test_type",
Text: &text,
Data: common.MapStr{"json": common.MapStr{"type": "test", "@timestamp": "2016-04-05T18:47:18.444Z"}},
JSONConfig: &reader.JSONConfig{KeysUnderRoot: true, OverwriteKeys: true},
EventMeta: EventMeta{
DocumentType: "test_type",
ReadTime: now,
},
Text: &text,
Data: common.MapStr{"json": common.MapStr{"type": "test", "@timestamp": "2016-04-05T18:47:18.444Z"}},
JSONConfig: &reader.JSONConfig{KeysUnderRoot: true, OverwriteKeys: true},
},
ExpectedItems: common.MapStr{
"@timestamp": common.MustParseTime("2016-04-05T18:47:18.444Z"),
Expand All @@ -99,11 +110,13 @@ func TestEventToMapStrJSON(t *testing.T) {
// when the parsing on @timestamp fails, leave the existing value and add an error key
// in a common.Time
Event: Event{
ReadTime: now,
DocumentType: "test_type",
Text: &text,
Data: common.MapStr{"json": common.MapStr{"type": "test", "@timestamp": "2016-04-05T18:47:18.44XX4Z"}},
JSONConfig: &reader.JSONConfig{KeysUnderRoot: true, OverwriteKeys: true},
EventMeta: EventMeta{
DocumentType: "test_type",
ReadTime: now,
},
Text: &text,
Data: common.MapStr{"json": common.MapStr{"type": "test", "@timestamp": "2016-04-05T18:47:18.44XX4Z"}},
JSONConfig: &reader.JSONConfig{KeysUnderRoot: true, OverwriteKeys: true},
},
ExpectedItems: common.MapStr{
"@timestamp": common.Time(now),
Expand All @@ -115,11 +128,13 @@ func TestEventToMapStrJSON(t *testing.T) {
// when the @timestamp has the wrong type, leave the existing value and add an error key
// in a common.Time
Event: Event{
ReadTime: now,
DocumentType: "test_type",
Text: &text,
Data: common.MapStr{"json": common.MapStr{"type": "test", "@timestamp": 42}},
JSONConfig: &reader.JSONConfig{KeysUnderRoot: true, OverwriteKeys: true},
EventMeta: EventMeta{
DocumentType: "test_type",
ReadTime: now,
},
Text: &text,
Data: common.MapStr{"json": common.MapStr{"type": "test", "@timestamp": 42}},
JSONConfig: &reader.JSONConfig{KeysUnderRoot: true, OverwriteKeys: true},
},
ExpectedItems: common.MapStr{
"@timestamp": common.Time(now),
Expand All @@ -130,10 +145,12 @@ func TestEventToMapStrJSON(t *testing.T) {
{
// if overwrite_keys is true, but the `type` key in json is not a string, ignore it
Event: Event{
DocumentType: "test_type",
Text: &text,
Data: common.MapStr{"json": common.MapStr{"type": 42}},
JSONConfig: &reader.JSONConfig{KeysUnderRoot: true, OverwriteKeys: true},
EventMeta: EventMeta{
DocumentType: "test_type",
},
Text: &text,
Data: common.MapStr{"json": common.MapStr{"type": 42}},
JSONConfig: &reader.JSONConfig{KeysUnderRoot: true, OverwriteKeys: true},
},
ExpectedItems: common.MapStr{
"type": "test_type",
Expand All @@ -143,10 +160,12 @@ func TestEventToMapStrJSON(t *testing.T) {
{
// if overwrite_keys is true, but the `type` key in json is empty, ignore it
Event: Event{
DocumentType: "test_type",
Text: &text,
Data: common.MapStr{"json": common.MapStr{"type": ""}},
JSONConfig: &reader.JSONConfig{KeysUnderRoot: true, OverwriteKeys: true},
EventMeta: EventMeta{
DocumentType: "test_type",
},
Text: &text,
Data: common.MapStr{"json": common.MapStr{"type": ""}},
JSONConfig: &reader.JSONConfig{KeysUnderRoot: true, OverwriteKeys: true},
},
ExpectedItems: common.MapStr{
"type": "test_type",
Expand All @@ -156,10 +175,13 @@ func TestEventToMapStrJSON(t *testing.T) {
{
// if overwrite_keys is true, but the `type` key in json starts with _, ignore it
Event: Event{
DocumentType: "test_type",
Text: &text,
Data: common.MapStr{"json": common.MapStr{"type": "_type"}},
JSONConfig: &reader.JSONConfig{KeysUnderRoot: true, OverwriteKeys: true},
EventMeta: EventMeta{
DocumentType: "test_type",
ReadTime: now,
},
Text: &text,
Data: common.MapStr{"json": common.MapStr{"type": "_type"}},
JSONConfig: &reader.JSONConfig{KeysUnderRoot: true, OverwriteKeys: true},
},
ExpectedItems: common.MapStr{
"type": "test_type",
Expand Down
36 changes: 19 additions & 17 deletions filebeat/prospector/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"github.com/elastic/beats/filebeat/harvester/reader"
"github.com/elastic/beats/libbeat/common"
"github.com/elastic/beats/libbeat/common/match"
"github.com/elastic/beats/libbeat/processors"
)

var (
Expand All @@ -26,23 +27,24 @@ var (
)

type prospectorConfig struct {
common.EventMetadata `config:",inline"` // Fields and tags to add to events.
Enabled bool `config:"enabled"`
DocumentType string `config:"document_type"`
ExcludeFiles []match.Matcher `config:"exclude_files"`
IgnoreOlder time.Duration `config:"ignore_older"`
Paths []string `config:"paths"`
ScanFrequency time.Duration `config:"scan_frequency" validate:"min=0,nonzero"`
InputType string `config:"input_type"`
CleanInactive time.Duration `config:"clean_inactive" validate:"min=0"`
CleanRemoved bool `config:"clean_removed"`
HarvesterLimit uint64 `config:"harvester_limit" validate:"min=0"`
Symlinks bool `config:"symlinks"`
TailFiles bool `config:"tail_files"`
JSON *reader.JSONConfig `config:"json"`
Pipeline string `config:"pipeline"`
Module string `config:"_module_name"` // hidden option to set the module name
Fileset string `config:"_fileset_name"` // hidden option to set the fileset name
common.EventMetadata `config:",inline"` // Fields and tags to add to events.
Enabled bool `config:"enabled"`
DocumentType string `config:"document_type"`
ExcludeFiles []match.Matcher `config:"exclude_files"`
IgnoreOlder time.Duration `config:"ignore_older"`
Paths []string `config:"paths"`
ScanFrequency time.Duration `config:"scan_frequency" validate:"min=0,nonzero"`
InputType string `config:"input_type"`
CleanInactive time.Duration `config:"clean_inactive" validate:"min=0"`
CleanRemoved bool `config:"clean_removed"`
HarvesterLimit uint64 `config:"harvester_limit" validate:"min=0"`
Symlinks bool `config:"symlinks"`
TailFiles bool `config:"tail_files"`
JSON *reader.JSONConfig `config:"json"`
Pipeline string `config:"pipeline"`
Module string `config:"_module_name"` // hidden option to set the module name
Fileset string `config:"_fileset_name"` // hidden option to set the fileset name
Processors processors.PluginConfig `config:"processors"`
}

func (config *prospectorConfig) Validate() error {
Expand Down
Loading