From 60ec3a690d71d987032d7fe0686418f5a0583d8c Mon Sep 17 00:00:00 2001 From: Monica Sarbu Date: Thu, 30 Jun 2016 17:18:04 +0200 Subject: [PATCH] Rename the filters section with processors --- CHANGELOG.asciidoc | 1 + filebeat/filebeat.full.yml | 15 +-- filebeat/tests/system/config/filebeat.yml.j2 | 2 +- .../{test_filtering.py => test_processors.py} | 0 libbeat/beat/beat.go | 22 ++--- libbeat/docs/filtering.asciidoc | 9 +- libbeat/docs/filteringconfig.asciidoc | 24 ++--- libbeat/filter/filter.go | 87 ----------------- libbeat/filter/registry.go | 28 ------ .../actions}/drop_event.go | 18 ++-- .../actions}/drop_fields.go | 22 ++--- .../actions}/include_fields.go | 22 ++--- libbeat/{filter => processors}/condition.go | 10 +- .../{filter => processors}/condition_test.go | 34 +++---- libbeat/{filter => processors}/config.go | 16 ++-- libbeat/processors/processor.go | 85 +++++++++++++++++ .../processor_test.go} | 94 +++++++++---------- libbeat/processors/registry.go | 28 ++++++ libbeat/publisher/client.go | 4 +- libbeat/publisher/publish.go | 8 +- metricbeat/beater/event.go | 6 +- metricbeat/beater/event_test.go | 2 +- metricbeat/beater/module.go | 6 +- metricbeat/mb/mb.go | 16 ++-- .../tests/system/config/metricbeat.yml.j2 | 4 +- packetbeat/packetbeat.full.yml | 15 +-- .../tests/system/config/packetbeat.yml.j2 | 4 +- ...0_filtering.py => test_0060_processors.py} | 0 winlogbeat/winlogbeat.full.yml | 7 +- 29 files changed, 297 insertions(+), 292 deletions(-) rename filebeat/tests/system/{test_filtering.py => test_processors.py} (100%) delete mode 100644 libbeat/filter/filter.go delete mode 100644 libbeat/filter/registry.go rename libbeat/{filter/rules => processors/actions}/drop_event.go (67%) rename libbeat/{filter/rules => processors/actions}/drop_fields.go (73%) rename libbeat/{filter/rules => processors/actions}/include_fields.go (76%) rename libbeat/{filter => processors}/condition.go (96%) rename libbeat/{filter => processors}/condition_test.go (85%) rename libbeat/{filter => processors}/config.go (87%) create mode 100644 libbeat/processors/processor.go rename libbeat/{filter/filter_test.go => processors/processor_test.go} (82%) create mode 100644 libbeat/processors/registry.go rename packetbeat/tests/system/{test_0060_filtering.py => test_0060_processors.py} (100%) diff --git a/CHANGELOG.asciidoc b/CHANGELOG.asciidoc index b30d180a019..b41690fb77b 100644 --- a/CHANGELOG.asciidoc +++ b/CHANGELOG.asciidoc @@ -13,6 +13,7 @@ https://github.com/elastic/beats/compare/v5.0.0-alpha3...master[Check the HEAD d ==== Breaking changes *Affecting all Beats* +- Rename the `filters` section to `processors`. {pull}1944[1944] *Metricbeat* diff --git a/filebeat/filebeat.full.yml b/filebeat/filebeat.full.yml index 6773eebec37..8f619b4b27d 100644 --- a/filebeat/filebeat.full.yml +++ b/filebeat/filebeat.full.yml @@ -257,20 +257,21 @@ filebeat.prospectors: # default is the number of logical CPUs available in the system. #max_procs: -#================================ Filters ===================================== +#================================ Processors ===================================== -# This section defines a list of filtering rules that are applied one by one -# starting with the exported event: +# Processors are used to reduce the number of fields in the exported event or to +# enhance the event with external meta data. This section defines a list of processors +# that are applied one by one and the first one receives the initial event: # # event -> filter1 -> event1 -> filter2 ->event2 ... # -# Supported actions: drop_fields, drop_event, include_fields +# Supported processors: drop_fields, drop_event, include_fields # -# For example, the following filter configuration uses multiple actions to keep +# For example, you can use the following processors to keep # the fields that contain CPU load percentages, but remove the fields that # contain CPU ticks values: # -#filters: +#processors: #- include_fields: # fields: ["cpu"] #- drop_fields: @@ -278,7 +279,7 @@ filebeat.prospectors: # # The following example drops the events that have the HTTP response code 200: # -#filters: +#processors: #- drop_event: # equals: # http.code: 200 diff --git a/filebeat/tests/system/config/filebeat.yml.j2 b/filebeat/tests/system/config/filebeat.yml.j2 index c8b6b05209f..608a4585555 100644 --- a/filebeat/tests/system/config/filebeat.yml.j2 +++ b/filebeat/tests/system/config/filebeat.yml.j2 @@ -110,7 +110,7 @@ geoip: {%- if drop_fields or drop_event or include_fields %} #================================ Filters ===================================== -filters: +processors: {%- if include_fields %} - include_fields: diff --git a/filebeat/tests/system/test_filtering.py b/filebeat/tests/system/test_processors.py similarity index 100% rename from filebeat/tests/system/test_filtering.py rename to filebeat/tests/system/test_processors.py diff --git a/libbeat/beat/beat.go b/libbeat/beat/beat.go index cf3ab04c7f4..8fd6e45dc4c 100644 --- a/libbeat/beat/beat.go +++ b/libbeat/beat/beat.go @@ -43,10 +43,10 @@ import ( "github.com/elastic/beats/libbeat/cfgfile" "github.com/elastic/beats/libbeat/common" - "github.com/elastic/beats/libbeat/filter" - _ "github.com/elastic/beats/libbeat/filter/rules" "github.com/elastic/beats/libbeat/logp" "github.com/elastic/beats/libbeat/paths" + "github.com/elastic/beats/libbeat/processors" + _ "github.com/elastic/beats/libbeat/processors/actions" "github.com/elastic/beats/libbeat/publisher" svc "github.com/elastic/beats/libbeat/service" "github.com/satori/go.uuid" @@ -107,16 +107,16 @@ type Beat struct { Config BeatConfig // Common Beat configuration data. Publisher *publisher.Publisher // Publisher - filters *filter.Filters // Filters + processors *processors.Processors // Processors } // BeatConfig struct contains the basic configuration of every beat type BeatConfig struct { - Shipper publisher.ShipperConfig `config:",inline"` - Output map[string]*common.Config `config:"output"` - Logging logp.Logging `config:"logging"` - Filters filter.FilterPluginConfig `config:"filters"` - Path paths.Path `config:"path"` + Shipper publisher.ShipperConfig `config:",inline"` + Output map[string]*common.Config `config:"output"` + Logging logp.Logging `config:"logging"` + Processors processors.PluginConfig `config:"processors"` + Path paths.Path `config:"path"` } // Run initializes and runs a Beater implementation. name is the name of the @@ -223,9 +223,9 @@ func (bc *instance) config() error { // log paths values to help with troubleshooting logp.Info(paths.Paths.String()) - bc.data.filters, err = filter.New(bc.data.Config.Filters) + bc.data.processors, err = processors.New(bc.data.Config.Processors) if err != nil { - return fmt.Errorf("error initializing filters: %v", err) + return fmt.Errorf("error initializing processors: %v", err) } if bc.data.Config.Shipper.MaxProcs != nil { @@ -251,7 +251,7 @@ func (bc *instance) setup() error { return fmt.Errorf("error initializing publisher: %v", err) } - bc.data.Publisher.RegisterFilter(bc.data.filters) + bc.data.Publisher.RegisterProcessors(bc.data.processors) err = bc.beater.Setup(bc.data) if err != nil { return err diff --git a/libbeat/docs/filtering.asciidoc b/libbeat/docs/filtering.asciidoc index 8a36958f47a..24cb7f72c58 100644 --- a/libbeat/docs/filtering.asciidoc +++ b/libbeat/docs/filtering.asciidoc @@ -9,8 +9,9 @@ //// include::../../libbeat/docs/filtering.asciidoc[] ////////////////////////////////////////////////////////////////////////// -Generic filtering is available to all Beats through libbeat. With generic filtering, you can reduce the number of -fields that are exported by the Beat by defining a list of filter actions that are applied to each event before it's -sent to the defined output. The filter actions are executed in the order that they are defined in the config file. +You can defined processors in any Beat as it is part of libbeat. Processors are used to reduce the number of +the exported fields, but also to enhance them with additional meta data. +Each processor receives an event, applies a defined action to the event and returns it. In case you define a list of +processors, then they are executed in the order they are defined in the configuration file. -You can define the filter actions under the `filters` section of the {beatname_uc} configuration file. +The processors are defined in the {beatname_uc} configuration file. diff --git a/libbeat/docs/filteringconfig.asciidoc b/libbeat/docs/filteringconfig.asciidoc index 54516ad997c..df747dbace5 100644 --- a/libbeat/docs/filteringconfig.asciidoc +++ b/libbeat/docs/filteringconfig.asciidoc @@ -11,26 +11,28 @@ ////////////////////////////////////////////////////////////////////////// [[configuration-filter]] -=== Filters Configuration +=== Processors -You can set a list of filter actions in the `filters` section of the +{beatname_lc}.yml+ config file to reduce the number +You can define a set of `processors` in the +{beatname_lc}.yml+ config file to reduce the number of fields that are exported by the Beat. -If more filtering rules are defined, then they are executed in the order they are defined. The initial event is passed to the first filtering rule and what results from it is passed to the second filtering rule until all the filtering rules are applied. The condition that is used in the following filtering rules is running against the event that is received as input and it might defer from the original event. +If more processors are defined, then they are executed in the order they are defined. The initial event is passed to the +first processor and what results from it is passed to the second processor until all processors are applied. The +condition is checked against the event that is received as input and it might defer from the original event. [source,yaml] ------- -event -> filter action 1 -> event1 -> filter action 2 -> event2 ... +event -> processor 1 -> event1 -> processor 2 -> event2 ... ------- See <> for the full list of possible fields. -Each filter action receives a condition and optionally a set of arguments. The action is executed only if the condition +Each processor receives a condition and optionally a set of arguments. The action is executed only if the condition is fulfilled. [source,yaml] ------ -filters: +processors: - action1: condition1 [arguments] @@ -44,7 +46,7 @@ filters: See <> for specific {beatname_uc} examples. [[filtering-condition]] -==== Filtering condition +==== Condition Each condition receives a field to compare or multiple fields under the same condition and then `AND` is used between them. You can see a list of the <>. @@ -138,7 +140,7 @@ range: cpu.user_p: 0.8 ------ -==== Filtering Actions +==== Actions The supported filter actions are: @@ -156,7 +158,7 @@ optional and if it's missing then the defined fields are always exported. The `@ [source,yaml] ------- -filters: +processors: - include_fields: [condition] fields: ["field1", "field2", ...] @@ -178,7 +180,7 @@ even if they show up in the `drop_fields` list. [source,yaml] ----------------------------------------------------- -filters: +processors: - drop_fields: [condition] fields: ["field1", "field2", ...] @@ -195,7 +197,7 @@ without one all the events are dropped. [source,yaml] ------ -filters: +processors: - drop_event: condition ------ diff --git a/libbeat/filter/filter.go b/libbeat/filter/filter.go deleted file mode 100644 index 1c6ebcc91d7..00000000000 --- a/libbeat/filter/filter.go +++ /dev/null @@ -1,87 +0,0 @@ -package filter - -import ( - "fmt" - "strings" - - "github.com/elastic/beats/libbeat/common" - "github.com/elastic/beats/libbeat/logp" -) - -type Filters struct { - list []FilterRule -} - -func New(config FilterPluginConfig) (*Filters, error) { - - filters := Filters{} - - for _, filter := range config { - - if len(filter) != 1 { - return nil, fmt.Errorf("each filtering rule needs to have exactly one action, but found %d actions.", len(filter)) - } - - for filterName, cfg := range filter { - - constructor, exists := filterConstructors[filterName] - if !exists { - return nil, fmt.Errorf("the filtering rule %s doesn't exist", filterName) - } - - plugin, err := constructor(cfg) - if err != nil { - return nil, err - } - - filters.addRule(plugin) - } - } - - logp.Debug("filter", "filters: %v", filters) - return &filters, nil -} - -func (filters *Filters) addRule(filter FilterRule) { - - if filters.list == nil { - filters.list = []FilterRule{} - } - filters.list = append(filters.list, filter) -} - -// Applies a sequence of filtering rules and returns the filtered event -func (filters *Filters) Filter(event common.MapStr) common.MapStr { - - // Check if filters are set, just return event if not - if len(filters.list) == 0 { - return event - } - - // clone the event at first, before starting filtering - filtered := event.Clone() - var err error - - for _, filter := range filters.list { - filtered, err = filter.Filter(filtered) - if err != nil { - logp.Debug("filter", "fail to apply filtering rule %s: %s", filter, err) - } - if filtered == nil { - // drop event - return nil - } - } - - return filtered -} - -func (filters Filters) String() string { - s := []string{} - - for _, filter := range filters.list { - - s = append(s, filter.String()) - } - return strings.Join(s, ", ") -} diff --git a/libbeat/filter/registry.go b/libbeat/filter/registry.go deleted file mode 100644 index 8d281f4f711..00000000000 --- a/libbeat/filter/registry.go +++ /dev/null @@ -1,28 +0,0 @@ -package filter - -import ( - "fmt" - - "github.com/elastic/beats/libbeat/common" - "github.com/elastic/beats/libbeat/logp" -) - -type FilterRule interface { - Filter(event common.MapStr) (common.MapStr, error) - String() string -} - -type FilterConstructor func(config common.Config) (FilterRule, error) - -var filterConstructors = map[string]FilterConstructor{} - -func RegisterPlugin(name string, constructor FilterConstructor) error { - - logp.Debug("filter", "Register plugin %s", name) - - if _, exists := filterConstructors[name]; exists { - return fmt.Errorf("plugin %s already registered", name) - } - filterConstructors[name] = constructor - return nil -} diff --git a/libbeat/filter/rules/drop_event.go b/libbeat/processors/actions/drop_event.go similarity index 67% rename from libbeat/filter/rules/drop_event.go rename to libbeat/processors/actions/drop_event.go index 7b9e60d6053..cc98b2cb6dc 100644 --- a/libbeat/filter/rules/drop_event.go +++ b/libbeat/processors/actions/drop_event.go @@ -1,27 +1,27 @@ -package rules +package actions import ( "fmt" "github.com/elastic/beats/libbeat/common" - "github.com/elastic/beats/libbeat/filter" + "github.com/elastic/beats/libbeat/processors" ) type DropEvent struct { - Cond *filter.Condition + Cond *processors.Condition } type DropEventConfig struct { - filter.ConditionConfig `config:",inline"` + processors.ConditionConfig `config:",inline"` } func init() { - if err := filter.RegisterPlugin("drop_event", newDropEvent); err != nil { + if err := processors.RegisterPlugin("drop_event", newDropEvent); err != nil { panic(err) } } -func newDropEvent(c common.Config) (filter.FilterRule, error) { +func newDropEvent(c common.Config) (processors.Processor, error) { f := DropEvent{} @@ -36,7 +36,7 @@ func newDropEvent(c common.Config) (filter.FilterRule, error) { return nil, fmt.Errorf("fail to unpack the drop_event configuration: %s", err) } - cond, err := filter.NewCondition(config.ConditionConfig) + cond, err := processors.NewCondition(config.ConditionConfig) if err != nil { return nil, err } @@ -48,14 +48,14 @@ func newDropEvent(c common.Config) (filter.FilterRule, error) { func (f *DropEvent) CheckConfig(c common.Config) error { for _, field := range c.GetFields() { - if !filter.AvailableCondition(field) { + if !processors.AvailableCondition(field) { return fmt.Errorf("unexpected %s option in the drop_event configuration", field) } } return nil } -func (f *DropEvent) Filter(event common.MapStr) (common.MapStr, error) { +func (f *DropEvent) Run(event common.MapStr) (common.MapStr, error) { if f.Cond != nil && !f.Cond.Check(event) { return event, nil diff --git a/libbeat/filter/rules/drop_fields.go b/libbeat/processors/actions/drop_fields.go similarity index 73% rename from libbeat/filter/rules/drop_fields.go rename to libbeat/processors/actions/drop_fields.go index 37e74dd74f7..5db3ca35bbe 100644 --- a/libbeat/filter/rules/drop_fields.go +++ b/libbeat/processors/actions/drop_fields.go @@ -1,31 +1,31 @@ -package rules +package actions import ( "fmt" "strings" "github.com/elastic/beats/libbeat/common" - "github.com/elastic/beats/libbeat/filter" + "github.com/elastic/beats/libbeat/processors" ) type DropFields struct { Fields []string // condition - Cond *filter.Condition + Cond *processors.Condition } type DropFieldsConfig struct { - Fields []string `config:"fields"` - filter.ConditionConfig `config:",inline"` + Fields []string `config:"fields"` + processors.ConditionConfig `config:",inline"` } func init() { - if err := filter.RegisterPlugin("drop_fields", newDropFields); err != nil { + if err := processors.RegisterPlugin("drop_fields", newDropFields); err != nil { panic(err) } } -func newDropFields(c common.Config) (filter.FilterRule, error) { +func newDropFields(c common.Config) (processors.Processor, error) { f := DropFields{} @@ -41,7 +41,7 @@ func newDropFields(c common.Config) (filter.FilterRule, error) { } /* remove read only fields */ - for _, readOnly := range filter.MandatoryExportedFields { + for _, readOnly := range processors.MandatoryExportedFields { for i, field := range config.Fields { if readOnly == field { config.Fields = append(config.Fields[:i], config.Fields[i+1:]...) @@ -50,7 +50,7 @@ func newDropFields(c common.Config) (filter.FilterRule, error) { } f.Fields = config.Fields - cond, err := filter.NewCondition(config.ConditionConfig) + cond, err := processors.NewCondition(config.ConditionConfig) if err != nil { return nil, err } @@ -64,7 +64,7 @@ func (f *DropFields) CheckConfig(c common.Config) error { complete := false for _, field := range c.GetFields() { - if !filter.AvailableCondition(field) { + if !processors.AvailableCondition(field) { if field != "fields" { return fmt.Errorf("unexpected %s option in the drop_fields configuration", field) } @@ -80,7 +80,7 @@ func (f *DropFields) CheckConfig(c common.Config) error { return nil } -func (f *DropFields) Filter(event common.MapStr) (common.MapStr, error) { +func (f *DropFields) Run(event common.MapStr) (common.MapStr, error) { if f.Cond != nil && !f.Cond.Check(event) { return event, nil diff --git a/libbeat/filter/rules/include_fields.go b/libbeat/processors/actions/include_fields.go similarity index 76% rename from libbeat/filter/rules/include_fields.go rename to libbeat/processors/actions/include_fields.go index 31fcd8751fb..741c4c34cf9 100644 --- a/libbeat/filter/rules/include_fields.go +++ b/libbeat/processors/actions/include_fields.go @@ -1,31 +1,31 @@ -package rules +package actions import ( "fmt" "strings" "github.com/elastic/beats/libbeat/common" - "github.com/elastic/beats/libbeat/filter" + "github.com/elastic/beats/libbeat/processors" ) type IncludeFields struct { Fields []string // condition - Cond *filter.Condition + Cond *processors.Condition } type IncludeFieldsConfig struct { - Fields []string `config:"fields"` - filter.ConditionConfig `config:",inline"` + Fields []string `config:"fields"` + processors.ConditionConfig `config:",inline"` } func init() { - if err := filter.RegisterPlugin("include_fields", newIncludeFields); err != nil { + if err := processors.RegisterPlugin("include_fields", newIncludeFields); err != nil { panic(err) } } -func newIncludeFields(c common.Config) (filter.FilterRule, error) { +func newIncludeFields(c common.Config) (processors.Processor, error) { f := IncludeFields{} @@ -41,7 +41,7 @@ func newIncludeFields(c common.Config) (filter.FilterRule, error) { } /* add read only fields if they are not yet */ - for _, readOnly := range filter.MandatoryExportedFields { + for _, readOnly := range processors.MandatoryExportedFields { found := false for _, field := range config.Fields { if readOnly == field { @@ -54,7 +54,7 @@ func newIncludeFields(c common.Config) (filter.FilterRule, error) { } f.Fields = config.Fields - cond, err := filter.NewCondition(config.ConditionConfig) + cond, err := processors.NewCondition(config.ConditionConfig) if err != nil { return nil, err } @@ -68,7 +68,7 @@ func (f *IncludeFields) CheckConfig(c common.Config) error { complete := false for _, field := range c.GetFields() { - if !filter.AvailableCondition(field) { + if !processors.AvailableCondition(field) { if field != "fields" { return fmt.Errorf("unexpected %s option in the include_fields configuration", field) } @@ -84,7 +84,7 @@ func (f *IncludeFields) CheckConfig(c common.Config) error { return nil } -func (f *IncludeFields) Filter(event common.MapStr) (common.MapStr, error) { +func (f *IncludeFields) Run(event common.MapStr) (common.MapStr, error) { if f.Cond != nil && !f.Cond.Check(event) { return event, nil diff --git a/libbeat/filter/condition.go b/libbeat/processors/condition.go similarity index 96% rename from libbeat/filter/condition.go rename to libbeat/processors/condition.go index c4f44dc5c8d..2efa08634f5 100644 --- a/libbeat/filter/condition.go +++ b/libbeat/processors/condition.go @@ -1,4 +1,4 @@ -package filter +package processors import ( "fmt" @@ -68,7 +68,7 @@ func NewCondition(config ConditionConfig) (*Condition, error) { return &c, nil } -func (c *Condition) setEquals(cfg *ConditionFilter) error { +func (c *Condition) setEquals(cfg *ConditionFields) error { c.equals = map[string]EqualsValue{} @@ -88,7 +88,7 @@ func (c *Condition) setEquals(cfg *ConditionFilter) error { return nil } -func (c *Condition) setContains(cfg *ConditionFilter) error { +func (c *Condition) setContains(cfg *ConditionFields) error { c.contains = map[string]string{} @@ -104,7 +104,7 @@ func (c *Condition) setContains(cfg *ConditionFilter) error { return nil } -func (c *Condition) setRegexp(cfg *ConditionFilter) error { +func (c *Condition) setRegexp(cfg *ConditionFields) error { var err error @@ -124,7 +124,7 @@ func (c *Condition) setRegexp(cfg *ConditionFilter) error { return nil } -func (c *Condition) setRange(cfg *ConditionFilter) error { +func (c *Condition) setRange(cfg *ConditionFields) error { c.rangexp = map[string]RangeValue{} diff --git a/libbeat/filter/condition_test.go b/libbeat/processors/condition_test.go similarity index 85% rename from libbeat/filter/condition_test.go rename to libbeat/processors/condition_test.go index 31abc7dda79..dbd94b0ed6f 100644 --- a/libbeat/filter/condition_test.go +++ b/libbeat/processors/condition_test.go @@ -1,4 +1,4 @@ -package filter +package processors import ( "testing" @@ -16,24 +16,24 @@ func TestBadCondition(t *testing.T) { configs := []ConditionConfig{ ConditionConfig{ - Equals: &ConditionFilter{fields: map[string]interface{}{ + Equals: &ConditionFields{fields: map[string]interface{}{ "proc.pid": 0.08, }}, }, ConditionConfig{ - Range: &ConditionFilter{fields: map[string]interface{}{ + Range: &ConditionFields{fields: map[string]interface{}{ "gtr": 0.3, }}, }, ConditionConfig{ - Range: &ConditionFilter{fields: map[string]interface{}{ + Range: &ConditionFields{fields: map[string]interface{}{ "gt": "fdfdd", }}, }, ConditionConfig{ - Regexp: &ConditionFilter{fields: map[string]interface{}{ + Regexp: &ConditionFields{fields: map[string]interface{}{ "proc.name": "58gdhsga-=kw++w00", }}, }, @@ -67,20 +67,20 @@ func TestEqualsCondition(t *testing.T) { configs := []ConditionConfig{ ConditionConfig{ - Equals: &ConditionFilter{fields: map[string]interface{}{ + Equals: &ConditionFields{fields: map[string]interface{}{ "type": "process", }}, }, ConditionConfig{ - Equals: &ConditionFilter{fields: map[string]interface{}{ + Equals: &ConditionFields{fields: map[string]interface{}{ "type": "process", "proc.pid": 305, }}, }, ConditionConfig{ - Range: &ConditionFilter{fields: map[string]interface{}{ + Range: &ConditionFields{fields: map[string]interface{}{ "proc.cpu.total_p.gt": 0.5, }}, }, @@ -121,14 +121,14 @@ func TestContainsCondition(t *testing.T) { configs := []ConditionConfig{ ConditionConfig{ - Contains: &ConditionFilter{fields: map[string]interface{}{ + Contains: &ConditionFields{fields: map[string]interface{}{ "proc.name": "sec", "proc.username": "monica", }}, }, ConditionConfig{ - Contains: &ConditionFilter{fields: map[string]interface{}{ + Contains: &ConditionFields{fields: map[string]interface{}{ "type": "process", "proc.name": "secddd", }}, @@ -169,19 +169,19 @@ func TestRegexpCondition(t *testing.T) { configs := []ConditionConfig{ ConditionConfig{ - Regexp: &ConditionFilter{fields: map[string]interface{}{ + Regexp: &ConditionFields{fields: map[string]interface{}{ "source": "apache2/error.*", }}, }, ConditionConfig{ - Regexp: &ConditionFilter{fields: map[string]interface{}{ + Regexp: &ConditionFields{fields: map[string]interface{}{ "source": "apache2/access.*", }}, }, ConditionConfig{ - Regexp: &ConditionFilter{fields: map[string]interface{}{ + Regexp: &ConditionFields{fields: map[string]interface{}{ "source": "apache2/error.*", "message": "[client 1.2.3.4]", }}, @@ -224,27 +224,27 @@ func TestRangeCondition(t *testing.T) { configs := []ConditionConfig{ ConditionConfig{ - Range: &ConditionFilter{fields: map[string]interface{}{ + Range: &ConditionFields{fields: map[string]interface{}{ "http.code.gte": 400, "http.code.lt": 500, }}, }, ConditionConfig{ - Range: &ConditionFilter{fields: map[string]interface{}{ + Range: &ConditionFields{fields: map[string]interface{}{ "bytes_out.gte": 2800, }}, }, ConditionConfig{ - Range: &ConditionFilter{fields: map[string]interface{}{ + Range: &ConditionFields{fields: map[string]interface{}{ "bytes_out.gte": 2800, "responsetime.gt": 30, }}, }, ConditionConfig{ - Range: &ConditionFilter{fields: map[string]interface{}{ + Range: &ConditionFields{fields: map[string]interface{}{ "proc.cpu.total_p.gte": 0.5, }}, }, diff --git a/libbeat/filter/config.go b/libbeat/processors/config.go similarity index 87% rename from libbeat/filter/config.go rename to libbeat/processors/config.go index dc2d2f568b5..550203fe41b 100644 --- a/libbeat/filter/config.go +++ b/libbeat/processors/config.go @@ -1,4 +1,4 @@ -package filter +package processors import ( "fmt" @@ -9,22 +9,22 @@ import ( ) type ConditionConfig struct { - Equals *ConditionFilter `config:"equals"` - Contains *ConditionFilter `config:"contains"` - Regexp *ConditionFilter `config:"regexp"` - Range *ConditionFilter `config:"range"` + Equals *ConditionFields `config:"equals"` + Contains *ConditionFields `config:"contains"` + Regexp *ConditionFields `config:"regexp"` + Range *ConditionFields `config:"range"` } -type ConditionFilter struct { +type ConditionFields struct { fields map[string]interface{} } -type FilterPluginConfig []map[string]common.Config +type PluginConfig []map[string]common.Config // fields that should be always exported var MandatoryExportedFields = []string{"@timestamp", "type"} -func (f *ConditionFilter) Unpack(to interface{}) error { +func (f *ConditionFields) Unpack(to interface{}) error { m, ok := to.(map[string]interface{}) if !ok { return fmt.Errorf("wrong type, expect map") diff --git a/libbeat/processors/processor.go b/libbeat/processors/processor.go new file mode 100644 index 00000000000..5d15c467977 --- /dev/null +++ b/libbeat/processors/processor.go @@ -0,0 +1,85 @@ +package processors + +import ( + "fmt" + "strings" + + "github.com/elastic/beats/libbeat/common" + "github.com/elastic/beats/libbeat/logp" +) + +type Processors struct { + list []Processor +} + +func New(config PluginConfig) (*Processors, error) { + + processors := Processors{} + + for _, processor := range config { + + if len(processor) != 1 { + return nil, fmt.Errorf("each processor needs to have exactly one action, but found %d actions.", + len(processor)) + } + + for processorName, cfg := range processor { + + constructor, exists := constructors[processorName] + if !exists { + return nil, fmt.Errorf("the processor %s doesn't exist", processorName) + } + + plugin, err := constructor(cfg) + if err != nil { + return nil, err + } + + processors.addProcessor(plugin) + } + } + + logp.Debug("processors", "Processors: %v", processors) + return &processors, nil +} + +func (processors *Processors) addProcessor(p Processor) { + + processors.list = append(processors.list, p) +} + +// Applies a sequence of processing rules and returns the filtered event +func (processors *Processors) Run(event common.MapStr) common.MapStr { + + // Check if processors are set, just return event if not + if len(processors.list) == 0 { + return event + } + + // clone the event at first, before starting filtering + filtered := event.Clone() + var err error + + for _, p := range processors.list { + filtered, err = p.Run(filtered) + if err != nil { + logp.Debug("filter", "fail to apply processor %s: %s", p, err) + } + if filtered == nil { + // drop event + return nil + } + } + + return filtered +} + +func (processors Processors) String() string { + s := []string{} + + for _, p := range processors.list { + + s = append(s, p.String()) + } + return strings.Join(s, ", ") +} diff --git a/libbeat/filter/filter_test.go b/libbeat/processors/processor_test.go similarity index 82% rename from libbeat/filter/filter_test.go rename to libbeat/processors/processor_test.go index 3fceeb91208..a25ed78e5f2 100644 --- a/libbeat/filter/filter_test.go +++ b/libbeat/processors/processor_test.go @@ -1,36 +1,36 @@ -package filter_test +package processors_test import ( "testing" "github.com/elastic/beats/libbeat/common" - "github.com/elastic/beats/libbeat/filter" - _ "github.com/elastic/beats/libbeat/filter/rules" "github.com/elastic/beats/libbeat/logp" + "github.com/elastic/beats/libbeat/processors" + _ "github.com/elastic/beats/libbeat/processors/actions" "github.com/stretchr/testify/assert" ) -func GetFilters(t *testing.T, yml []map[string]interface{}) *filter.Filters { +func GetProcessors(t *testing.T, yml []map[string]interface{}) *processors.Processors { - config := filter.FilterPluginConfig{} + config := processors.PluginConfig{} - for _, rule := range yml { + for _, action := range yml { c := map[string]common.Config{} - for name, ruleYml := range rule { - ruleConfig, err := common.NewConfigFrom(ruleYml) + for name, actionYml := range action { + actionConfig, err := common.NewConfigFrom(actionYml) assert.Nil(t, err) - c[name] = *ruleConfig + c[name] = *actionConfig } config = append(config, c) } - filters, err := filter.New(config) + list, err := processors.New(config) assert.Nil(t, err) - return filters + return list } @@ -54,21 +54,21 @@ func TestBadConfig(t *testing.T) { }, } - config := filter.FilterPluginConfig{} + config := processors.PluginConfig{} - for _, rule := range yml { + for _, action := range yml { c := map[string]common.Config{} - for name, ruleYml := range rule { - ruleConfig, err := common.NewConfigFrom(ruleYml) + for name, actionYml := range action { + actionConfig, err := common.NewConfigFrom(actionYml) assert.Nil(t, err) - c[name] = *ruleConfig + c[name] = *actionConfig } config = append(config, c) } - _, err := filter.New(config) + _, err := processors.New(config) assert.NotNil(t, err) } @@ -90,7 +90,7 @@ func TestIncludeFields(t *testing.T) { }, } - filters := GetFilters(t, yml) + processors := GetProcessors(t, yml) event := common.MapStr{ "@timestamp": "2016-01-24T18:35:19.308Z", @@ -118,7 +118,7 @@ func TestIncludeFields(t *testing.T) { "type": "process", } - filteredEvent := filters.Filter(event) + processedEvent := processors.Run(event) expectedEvent := common.MapStr{ "@timestamp": "2016-01-24T18:35:19.308Z", @@ -136,7 +136,7 @@ func TestIncludeFields(t *testing.T) { "type": "process", } - assert.Equal(t, expectedEvent, filteredEvent) + assert.Equal(t, expectedEvent, processedEvent) } func TestIncludeFields1(t *testing.T) { @@ -156,7 +156,7 @@ func TestIncludeFields1(t *testing.T) { }, } - filters := GetFilters(t, yml) + processors := GetProcessors(t, yml) event := common.MapStr{ "@timestamp": "2016-01-24T18:35:19.308Z", @@ -184,14 +184,14 @@ func TestIncludeFields1(t *testing.T) { "type": "process", } - filteredEvent := filters.Filter(event) + processedEvent := processors.Run(event) expectedEvent := common.MapStr{ "@timestamp": "2016-01-24T18:35:19.308Z", "type": "process", } - assert.Equal(t, expectedEvent, filteredEvent) + assert.Equal(t, expectedEvent, processedEvent) } func TestDropFields(t *testing.T) { @@ -207,7 +207,7 @@ func TestDropFields(t *testing.T) { }, } - filters := GetFilters(t, yml) + processors := GetProcessors(t, yml) event := common.MapStr{ "@timestamp": "2016-01-24T18:35:19.308Z", @@ -235,7 +235,7 @@ func TestDropFields(t *testing.T) { "type": "process", } - filteredEvent := filters.Filter(event) + processedEvent := processors.Run(event) expectedEvent := common.MapStr{ "@timestamp": "2016-01-24T18:35:19.308Z", @@ -250,7 +250,7 @@ func TestDropFields(t *testing.T) { "type": "process", } - assert.Equal(t, expectedEvent, filteredEvent) + assert.Equal(t, expectedEvent, processedEvent) } func TestMultipleIncludeFields(t *testing.T) { @@ -275,7 +275,7 @@ func TestMultipleIncludeFields(t *testing.T) { }, } - filters := GetFilters(t, yml) + processors := GetProcessors(t, yml) event1 := common.MapStr{ "@timestamp": "2016-01-24T18:35:19.308Z", @@ -341,8 +341,8 @@ func TestMultipleIncludeFields(t *testing.T) { "type": "process", } - actual1 := filters.Filter(event1) - actual2 := filters.Filter(event2) + actual1 := processors.Run(event1) + actual2 := processors.Run(event2) assert.Equal(t, expected1, actual1) assert.Equal(t, expected2, actual2) @@ -366,7 +366,7 @@ func TestDropEvent(t *testing.T) { }, } - filters := GetFilters(t, yml) + processors := GetProcessors(t, yml) event := common.MapStr{ "@timestamp": "2016-01-24T18:35:19.308Z", @@ -394,9 +394,9 @@ func TestDropEvent(t *testing.T) { "type": "process", } - filteredEvent := filters.Filter(event) + processedEvent := processors.Run(event) - assert.Nil(t, filteredEvent) + assert.Nil(t, processedEvent) } func TestEmptyCondition(t *testing.T) { @@ -411,7 +411,7 @@ func TestEmptyCondition(t *testing.T) { }, } - filters := GetFilters(t, yml) + processors := GetProcessors(t, yml) event := common.MapStr{ "@timestamp": "2016-01-24T18:35:19.308Z", @@ -439,9 +439,9 @@ func TestEmptyCondition(t *testing.T) { "type": "process", } - filteredEvent := filters.Filter(event) + processedEvent := processors.Run(event) - assert.Nil(t, filteredEvent) + assert.Nil(t, processedEvent) } func TestBadCondition(t *testing.T) { @@ -460,21 +460,21 @@ func TestBadCondition(t *testing.T) { }, } - config := filter.FilterPluginConfig{} + config := processors.PluginConfig{} - for _, rule := range yml { + for _, action := range yml { c := map[string]common.Config{} - for name, ruleYml := range rule { - ruleConfig, err := common.NewConfigFrom(ruleYml) + for name, actionYml := range action { + actionConfig, err := common.NewConfigFrom(actionYml) assert.Nil(t, err) - c[name] = *ruleConfig + c[name] = *actionConfig } config = append(config, c) } - _, err := filter.New(config) + _, err := processors.New(config) assert.NotNil(t, err) } @@ -495,21 +495,21 @@ func TestMissingFields(t *testing.T) { }, } - config := filter.FilterPluginConfig{} + config := processors.PluginConfig{} - for _, rule := range yml { + for _, action := range yml { c := map[string]common.Config{} - for name, ruleYml := range rule { - ruleConfig, err := common.NewConfigFrom(ruleYml) + for name, actionYml := range action { + actionConfig, err := common.NewConfigFrom(actionYml) assert.Nil(t, err) - c[name] = *ruleConfig + c[name] = *actionConfig } config = append(config, c) } - _, err := filter.New(config) + _, err := processors.New(config) assert.NotNil(t, err) } diff --git a/libbeat/processors/registry.go b/libbeat/processors/registry.go new file mode 100644 index 00000000000..b2413331147 --- /dev/null +++ b/libbeat/processors/registry.go @@ -0,0 +1,28 @@ +package processors + +import ( + "fmt" + + "github.com/elastic/beats/libbeat/common" + "github.com/elastic/beats/libbeat/logp" +) + +type Processor interface { + Run(event common.MapStr) (common.MapStr, error) + String() string +} + +type Constructor func(config common.Config) (Processor, error) + +var constructors = map[string]Constructor{} + +func RegisterPlugin(name string, constructor Constructor) error { + + logp.Debug("processors", "Register plugin %s", name) + + if _, exists := constructors[name]; exists { + return fmt.Errorf("plugin %s already registered", name) + } + constructors[name] = constructor + return nil +} diff --git a/libbeat/publisher/client.go b/libbeat/publisher/client.go index b5bed1904ff..2940d994708 100644 --- a/libbeat/publisher/client.go +++ b/libbeat/publisher/client.go @@ -165,8 +165,8 @@ func (c *client) filterEvent(event common.MapStr) *common.MapStr { } - // filter the event by applying the configured rules - publishEvent := c.publisher.Filters.Filter(event) + // process the event by applying the configured actions + publishEvent := c.publisher.Processors.Run(event) if publishEvent == nil { // the event is dropped logp.Debug("publish", "Drop event %s", event.StringToPrint()) diff --git a/libbeat/publisher/publish.go b/libbeat/publisher/publish.go index ca31b93737e..dc99e1bc909 100644 --- a/libbeat/publisher/publish.go +++ b/libbeat/publisher/publish.go @@ -9,9 +9,9 @@ import ( "github.com/elastic/beats/libbeat/common" "github.com/elastic/beats/libbeat/common/op" - "github.com/elastic/beats/libbeat/filter" "github.com/elastic/beats/libbeat/logp" "github.com/elastic/beats/libbeat/outputs" + "github.com/elastic/beats/libbeat/processors" "github.com/nranchev/go-libGeoIP" // load supported output plugins @@ -57,7 +57,7 @@ type Publisher struct { TopologyOutput outputs.TopologyOutputer IgnoreOutgoing bool GeoLite *libgeo.GeoIP - Filters *filter.Filters + Processors *processors.Processors globalEventMetadata common.EventMetadata // Fields and tags to add to each event. @@ -172,9 +172,9 @@ func (publisher *Publisher) PublishTopology(params ...string) error { return nil } -func (publisher *Publisher) RegisterFilter(filters *filter.Filters) error { +func (publisher *Publisher) RegisterProcessors(list *processors.Processors) error { - publisher.Filters = filters + publisher.Processors = list return nil } diff --git a/metricbeat/beater/event.go b/metricbeat/beater/event.go index f87a73a7d9e..a88a832e1bc 100644 --- a/metricbeat/beater/event.go +++ b/metricbeat/beater/event.go @@ -4,8 +4,8 @@ import ( "time" "github.com/elastic/beats/libbeat/common" - "github.com/elastic/beats/libbeat/filter" "github.com/elastic/beats/libbeat/logp" + "github.com/elastic/beats/libbeat/processors" ) const ( @@ -23,7 +23,7 @@ type eventBuilder struct { fetchDuration time.Duration event common.MapStr fetchErr error - filters *filter.Filters + filters *processors.Processors metadata common.EventMetadata } @@ -43,7 +43,7 @@ func (b eventBuilder) build() (common.MapStr, error) { // Apply filters. if b.filters != nil { - event = b.filters.Filter(event) + event = b.filters.Run(event) } event = common.MapStr{ diff --git a/metricbeat/beater/event_test.go b/metricbeat/beater/event_test.go index 11b02eb6b6c..94f0efe2294 100644 --- a/metricbeat/beater/event_test.go +++ b/metricbeat/beater/event_test.go @@ -33,7 +33,7 @@ var builder = eventBuilder{ fetchDuration: elapsed, // event // fetchErr - // filters + // processors // metadata } diff --git a/metricbeat/beater/module.go b/metricbeat/beater/module.go index 90aa6443759..d2e74b45187 100644 --- a/metricbeat/beater/module.go +++ b/metricbeat/beater/module.go @@ -7,8 +7,8 @@ import ( "time" "github.com/elastic/beats/libbeat/common" - "github.com/elastic/beats/libbeat/filter" "github.com/elastic/beats/libbeat/logp" + "github.com/elastic/beats/libbeat/processors" "github.com/elastic/beats/metricbeat/mb" "github.com/joeshaw/multierror" @@ -34,7 +34,7 @@ var ( // Use NewModuleWrapper or NewModuleWrappers to construct new ModuleWrappers. type ModuleWrapper struct { mb.Module - filters *filter.Filters + filters *processors.Processors metricSets []*metricSetWrapper // List of pointers to its associated MetricSets. } @@ -76,7 +76,7 @@ func NewModuleWrappers(modulesConfig []*common.Config, r *mb.Register) ([]*Modul var errs multierror.Errors for k, v := range modules { debugf("Initializing Module type '%s': %T=%+v", k.Name(), k, k) - f, err := filter.New(k.Config().Filters) + f, err := processors.New(k.Config().Filters) if err != nil { errs = append(errs, errors.Wrapf(err, "module %s", k.Name())) continue diff --git a/metricbeat/mb/mb.go b/metricbeat/mb/mb.go index 8b0f3a9da65..5a552e6767d 100644 --- a/metricbeat/mb/mb.go +++ b/metricbeat/mb/mb.go @@ -8,7 +8,7 @@ import ( "time" "github.com/elastic/beats/libbeat/common" - "github.com/elastic/beats/libbeat/filter" + "github.com/elastic/beats/libbeat/processors" ) // Module interfaces @@ -100,13 +100,13 @@ func (b *BaseMetricSet) Host() string { // ModuleConfig is the base configuration data for all Modules. type ModuleConfig struct { - Hosts []string `config:"hosts"` - Period time.Duration `config:"period" validate:"positive"` - Timeout time.Duration `config:"timeout" validate:"positive"` - Module string `config:"module" validate:"required"` - MetricSets []string `config:"metricsets" validate:"required"` - Enabled bool `config:"enabled"` - Filters filter.FilterPluginConfig `config:"filters"` + Hosts []string `config:"hosts"` + Period time.Duration `config:"period" validate:"positive"` + Timeout time.Duration `config:"timeout" validate:"positive"` + Module string `config:"module" validate:"required"` + MetricSets []string `config:"metricsets" validate:"required"` + Enabled bool `config:"enabled"` + Filters processors.PluginConfig `config:"filters"` common.EventMetadata `config:",inline"` // Fields and tags to add to events. } diff --git a/metricbeat/tests/system/config/metricbeat.yml.j2 b/metricbeat/tests/system/config/metricbeat.yml.j2 index b90bb07c50f..98acbd96611 100644 --- a/metricbeat/tests/system/config/metricbeat.yml.j2 +++ b/metricbeat/tests/system/config/metricbeat.yml.j2 @@ -104,8 +104,8 @@ geoip: {%- if drop_fields or drop_event or include_fields %} -#================================ Filters ===================================== -filters: +#================================ Processors ===================================== +processors: {%- if include_fields %} - include_fields: diff --git a/packetbeat/packetbeat.full.yml b/packetbeat/packetbeat.full.yml index b52ade68353..a244da5742d 100644 --- a/packetbeat/packetbeat.full.yml +++ b/packetbeat/packetbeat.full.yml @@ -429,20 +429,21 @@ packetbeat.protocols.nfs: # default is the number of logical CPUs available in the system. #max_procs: -#================================ Filters ===================================== +#================================ Processors ===================================== -# This section defines a list of filtering rules that are applied one by one -# starting with the exported event: +# Processors are used to reduce the number of fields in the exported event or to +# enhance the event with external meta data. This section defines a list of processors +# that are applied one by one and the first one receives the initial event: # # event -> filter1 -> event1 -> filter2 ->event2 ... # -# Supported actions: drop_fields, drop_event, include_fields +# Supported processors: drop_fields, drop_event, include_fields # -# For example, the following filter configuration uses multiple actions to keep +# For example, you can use the following processors to keep # the fields that contain CPU load percentages, but remove the fields that # contain CPU ticks values: # -#filters: +#processors: #- include_fields: # fields: ["cpu"] #- drop_fields: @@ -450,7 +451,7 @@ packetbeat.protocols.nfs: # # The following example drops the events that have the HTTP response code 200: # -#filters: +#processors: #- drop_event: # equals: # http.code: 200 diff --git a/packetbeat/tests/system/config/packetbeat.yml.j2 b/packetbeat/tests/system/config/packetbeat.yml.j2 index 1b028cecb8f..07e9eaaa7ec 100644 --- a/packetbeat/tests/system/config/packetbeat.yml.j2 +++ b/packetbeat/tests/system/config/packetbeat.yml.j2 @@ -176,8 +176,8 @@ geoip: {%- if drop_fields or drop_event or include_fields %} -#================================ Filters ===================================== -filters: +#================================ Processors ===================================== +processors: {%- if include_fields %} - include_fields: diff --git a/packetbeat/tests/system/test_0060_filtering.py b/packetbeat/tests/system/test_0060_processors.py similarity index 100% rename from packetbeat/tests/system/test_0060_filtering.py rename to packetbeat/tests/system/test_0060_processors.py diff --git a/winlogbeat/winlogbeat.full.yml b/winlogbeat/winlogbeat.full.yml index ee03acfc820..2cf65a8e93e 100644 --- a/winlogbeat/winlogbeat.full.yml +++ b/winlogbeat/winlogbeat.full.yml @@ -80,10 +80,11 @@ winlogbeat.event_logs: # default is the number of logical CPUs available in the system. #max_procs: -#================================ Filters ===================================== +#================================ Processors ===================================== -# This section defines a list of filtering rules that are applied one by one -# starting with the exported event: +# Processors are used to reduce the number of fields in the exported event or to +# enhance the event with external meta data. This section defines a list of processors +# that are applied one by one and the first one receives the initial event: # # event -> filter1 -> event1 -> filter2 ->event2 ... #