Skip to content

Commit

Permalink
Add processors namespacing (#2076)
Browse files Browse the repository at this point in the history
Add support for namespacing processors on Register by using dots on namespace
names.

When constructing a processor from config, the namespace will check, only one
valid plugin is requested.

With multi-level namespaces, the `when`-conditional setting can be applied at
any level (even level 0).

That is the filter

```
processors:
- drop_fields:
    fields: ['field']
    when: ...
```

Can now be written as:

```
processors:
- drop_fields:
    fields: ['field']
  when: ...
```

The exactly same processor structure will be created in either case.

Alternatively these 3 `drop_event` processor configurations are all equivalent:

```
processors:
- drop_event.when: ...
```

```
processors:
- drop_event:
    when: ...
```

and

```
processors:
- drop_event:
  when: ...
```

Namespaces come in handy, if some filter supports multiple backends. e.g.:

```
processors:
- lookup:
    exec:
      ...
      when: ...
```

or

```
processors:
- lookup:
    file:
      ...
      when: ...
```

Using namespaces, the `when`-clause can be used on any level, making all these
configurations equivalent:

```
processors:
- lookup:
    exec:
      ...
      when: ...
```

```
processors:
- lookup:
    exec:
      ...
    when: ...
```

and

```
processors:
- lookup:
    exec:
      ...
  when: ...
```

This minimizes risk of configuration errors when copying conditions or
indentation is of (a little). Plus the aforementioned filter can be written more
conveniently:

```
processors:
- lookup.exec:
    ...
    when: ...
```
  • Loading branch information
Steffen Siering authored and tsg committed Aug 1, 2016
1 parent 763ab78 commit 1efe230
Show file tree
Hide file tree
Showing 7 changed files with 252 additions and 27 deletions.
6 changes: 2 additions & 4 deletions libbeat/processors/actions/drop_event.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,10 +8,8 @@ import (
type dropEvent struct{}

func init() {
constructor := configChecked(newDropEvent, allowedFields("when"))
if err := processors.RegisterPlugin("drop_event", constructor); err != nil {
panic(err)
}
processors.RegisterPlugin("drop_event",
configChecked(newDropEvent, allowedFields("when")))
}

func newDropEvent(c common.Config) (processors.Processor, error) {
Expand Down
9 changes: 4 additions & 5 deletions libbeat/processors/actions/drop_fields.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,11 +13,10 @@ type dropFields struct {
}

func init() {
constructor := configChecked(newDropFields,
requireFields("fields"), allowedFields("fields", "when"))
if err := processors.RegisterPlugin("drop_fields", constructor); err != nil {
panic(err)
}
processors.RegisterPlugin("drop_fields",
configChecked(newDropFields,
requireFields("fields"),
allowedFields("fields", "when")))
}

func newDropFields(c common.Config) (processors.Processor, error) {
Expand Down
9 changes: 4 additions & 5 deletions libbeat/processors/actions/include_fields.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,11 +13,10 @@ type includeFields struct {
}

func init() {
constructor := configChecked(newIncludeFields,
requireFields("fields"), allowedFields("fields", "when"))
if err := processors.RegisterPlugin("include_fields", constructor); err != nil {
panic(err)
}
processors.RegisterPlugin("include_fields",
configChecked(newIncludeFields,
requireFields("fields"),
allowedFields("fields", "when")))
}

func newIncludeFields(c common.Config) (processors.Processor, error) {
Expand Down
106 changes: 106 additions & 0 deletions libbeat/processors/namespace.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,106 @@
package processors

import (
"errors"
"fmt"
"strings"

"github.com/elastic/beats/libbeat/common"
)

type Namespace struct {
reg map[string]pluginer
}

type plugin struct {
c Constructor
}

type pluginer interface {
Plugin() Constructor
}

func NewNamespace() *Namespace {
return &Namespace{
reg: map[string]pluginer{},
}
}

func (ns *Namespace) Register(name string, factory Constructor) error {
p := plugin{NewConditional(factory)}
names := strings.Split(name, ".")
if err := ns.add(names, p); err != nil {
return fmt.Errorf("plugin %s registration fail %v", name, err)
}
return nil
}

func (ns *Namespace) add(names []string, p pluginer) error {
name := names[0]

// register plugin if intermediate node in path being processed
if len(names) == 1 {
if _, found := ns.reg[name]; found {
return errors.New("exists already")
}

ns.reg[name] = p
return nil
}

// check if namespace path already exists
tmp, found := ns.reg[name]
if found {
ns, ok := tmp.(*Namespace)
if !ok {
return errors.New("non-namespace plugin already registered")
}
return ns.add(names[1:], p)
}

// register new namespace
sub := NewNamespace()
err := sub.add(names[1:], p)
if err != nil {
return err
}
ns.reg[name] = sub
return nil
}

func (ns *Namespace) Plugin() Constructor {
return NewConditional(func(cfg common.Config) (Processor, error) {
var section string
for _, name := range cfg.GetFields() {
if name == "when" { // TODO: remove check for "when" once fields are filtered
continue
}

if section != "" {
return nil, fmt.Errorf("Too many lookup modules configured (%v, %v)",
section, name)
}

section = name
}

if section == "" {
return nil, errors.New("No lookup module configured")
}

backend, found := ns.reg[section]
if !found {
return nil, fmt.Errorf("Unknown lookup module: %v", section)
}

config, err := cfg.Child(section, -1)
if err != nil {
return nil, err
}

constructor := backend.Plugin()
return constructor(*config)
})
}

func (p plugin) Plugin() Constructor { return p.c }
127 changes: 127 additions & 0 deletions libbeat/processors/namespace_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,127 @@
package processors

import (
"errors"
"testing"

"github.com/elastic/beats/libbeat/common"
"github.com/stretchr/testify/assert"
)

type testFilterRule struct {
str func() string
run func(common.MapStr) (common.MapStr, error)
}

func TestNamespace(t *testing.T) {
tests := []struct {
name string
}{
{"test"},
{"test.test"},
{"abc.def.test"},
}

for i, test := range tests {
t.Logf("run (%v): %v", i, test.name)

ns := NewNamespace()
err := ns.Register(test.name, newTestFilterRule)
fatalError(t, err)

cfg, _ := common.NewConfigFrom(map[string]interface{}{
test.name: nil,
})

filter, err := ns.Plugin()(*cfg)

assert.NoError(t, err)
assert.NotNil(t, filter)
}
}

func TestNamespaceRegisterFail(t *testing.T) {
ns := NewNamespace()
err := ns.Register("test", newTestFilterRule)
fatalError(t, err)

err = ns.Register("test", newTestFilterRule)
assert.Error(t, err)
}

func TestNamespaceError(t *testing.T) {
tests := []struct {
title string
factory Constructor
config interface{}
}{
{
"no module configured",
newTestFilterRule,
map[string]interface{}{},
},
{
"unknown module configured",
newTestFilterRule,
map[string]interface{}{
"notTest": nil,
},
},
{
"too many modules",
newTestFilterRule,
map[string]interface{}{
"a": nil,
"b": nil,
"test": nil,
},
},
{
"filter init fail",
func(_ common.Config) (Processor, error) {
return nil, errors.New("test")
},
map[string]interface{}{
"test": nil,
},
},
}

for i, test := range tests {
t.Logf("run (%v): %v", i, test.title)

ns := NewNamespace()
err := ns.Register("test", test.factory)
fatalError(t, err)

config, err := common.NewConfigFrom(test.config)
fatalError(t, err)

_, err = ns.Plugin()(*config)
assert.Error(t, err)
}
}

func newTestFilterRule(_ common.Config) (Processor, error) {
return &testFilterRule{}, nil
}

func (r *testFilterRule) String() string {
if r.str == nil {
return "test"
}
return r.str()
}

func (r *testFilterRule) Run(evt common.MapStr) (common.MapStr, error) {
if r.run == nil {
return evt, nil
}
return r.Run(evt)
}

func fatalError(t *testing.T, err error) {
if err != nil {
t.Fatal(err)
}
}
8 changes: 4 additions & 4 deletions libbeat/processors/processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,26 +25,26 @@ func New(config PluginConfig) (*Processors, error) {

for processorName, cfg := range processor {

constructor, exists := constructors[processorName]
gen, exists := registry.reg[processorName]
if !exists {
return nil, fmt.Errorf("the processor %s doesn't exist", processorName)
}

constructor := gen.Plugin()
plugin, err := constructor(cfg)
if err != nil {
return nil, err
}

procs.addProcessor(plugin)
procs.add(plugin)
}
}

logp.Debug("processors", "Processors: %v", procs)
return &procs, nil
}

func (procs *Processors) addProcessor(p Processor) {

func (procs *Processors) add(p Processor) {
procs.list = append(procs.list, p)
}

Expand Down
14 changes: 5 additions & 9 deletions libbeat/processors/registry.go
Original file line number Diff line number Diff line change
@@ -1,8 +1,6 @@
package processors

import (
"fmt"

"github.com/elastic/beats/libbeat/common"
"github.com/elastic/beats/libbeat/logp"
)
Expand All @@ -14,15 +12,13 @@ type Processor interface {

type Constructor func(config common.Config) (Processor, error)

var constructors = map[string]Constructor{}

func RegisterPlugin(name string, constructor Constructor) error {
var registry = NewNamespace()

func RegisterPlugin(name string, constructor Constructor) {
logp.Debug("processors", "Register plugin %s", name)

if _, exists := constructors[name]; exists {
return fmt.Errorf("plugin %s already registered", name)
err := registry.Register(name, constructor)
if err != nil {
panic(err)
}
constructors[name] = NewConditional(constructor)
return nil
}

0 comments on commit 1efe230

Please sign in to comment.