Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix config appender registration #9873

Merged
merged 2 commits into from
Jan 9, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
79 changes: 33 additions & 46 deletions libbeat/autodiscover/appenders/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@ package config
import (
"fmt"

"github.com/pkg/errors"
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Did you mean "errors" or "github.com/pkg/errors". Just asking as I'm seeing this wrong import a lot recently.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

https://github.com/pkg/errors is the one that gives me errors.Wrap, I think this is correct?


"github.com/elastic/beats/libbeat/autodiscover"
"github.com/elastic/beats/libbeat/autodiscover/template"
"github.com/elastic/beats/libbeat/common"
Expand All @@ -38,51 +40,38 @@ type config struct {
Config *common.Config `config:"config"`
}

type configs []config

type configMap struct {
type configAppender struct {
condition conditions.Condition
config common.MapStr
}

type configAppender struct {
configMaps []configMap
}

// NewConfigAppender creates a configAppender that can append templatized configs into built configs
func NewConfigAppender(cfg *common.Config) (autodiscover.Appender, error) {
cfgwarn.Beta("The config appender is beta")

confs := configs{}
err := cfg.Unpack(&confs)
config := config{}
err := cfg.Unpack(&config)
if err != nil {
return nil, fmt.Errorf("unable to unpack config appender due to error: %+v", err)
}

var configMaps []configMap
for _, conf := range confs {
var cond conditions.Condition

if conf.ConditionConfig != nil {
cond, err = conditions.NewCondition(conf.ConditionConfig)
if err != nil {
logp.Warn("unable to create condition due to error: %+v", err)
continue
}
}
cm := configMap{condition: cond}
var cond conditions.Condition

// Unpack the config
cf := common.MapStr{}
err = conf.Config.Unpack(&cf)
if config.ConditionConfig != nil {
cond, err = conditions.NewCondition(config.ConditionConfig)
if err != nil {
logp.Warn("unable to unpack config due to error: %+v", err)
continue
return nil, errors.Wrap(err, "unable to create condition due to error")
}
cm.config = cf
configMaps = append(configMaps, cm)
}
return &configAppender{configMaps: configMaps}, nil

// Unpack the config
cf := common.MapStr{}
err = config.Config.Unpack(&cf)
if err != nil {
return nil, errors.Wrap(err, "unable to unpack config due to error")
}

return &configAppender{condition: cond, config: cf}, nil
}

// Append adds configuration into configs built by builds/templates. It applies conditions to filter out
Expand All @@ -99,25 +88,23 @@ func (c *configAppender) Append(event bus.Event) {
if !ok {
return
}
for _, configMap := range c.configMaps {
if configMap.condition == nil || configMap.condition.Check(common.MapStr(event)) == true {
// Merge the template with all the configs
for _, cfg := range cfgs {
cf := common.MapStr{}
err := cfg.Unpack(&cf)
if err != nil {
logp.Debug("config", "unable to unpack config due to error: %v", err)
continue
}
err = cfg.Merge(&configMap.config)
if err != nil {
logp.Debug("config", "unable to merge configs due to error: %v", err)
}
if c.condition == nil || c.condition.Check(common.MapStr(event)) == true {
// Merge the template with all the configs
for _, cfg := range cfgs {
cf := common.MapStr{}
err := cfg.Unpack(&cf)
if err != nil {
logp.Debug("config", "unable to unpack config due to error: %v", err)
continue
}
err = cfg.Merge(&c.config)
if err != nil {
logp.Debug("config", "unable to merge configs due to error: %v", err)
}

// Apply the template
template.ApplyConfigTemplate(event, cfgs)
}

// Apply the template
template.ApplyConfigTemplate(event, cfgs)
}

// Replace old config with newly appended configs
Expand Down
79 changes: 35 additions & 44 deletions libbeat/autodiscover/appenders/config/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,96 +28,87 @@ import (

func TestGenerateAppender(t *testing.T) {
tests := []struct {
name string
eventConfig common.MapStr
event bus.Event
result common.MapStr
config string
}{
// Appender without a condition should apply the config regardless
{
name: "Appender without a condition should apply the config regardless",
event: bus.Event{},
result: common.MapStr{
"test": "bar",
"test1": "foo",
"test2": "foo",
},
eventConfig: common.MapStr{
"test": "bar",
},
config: `
- config:
"test1": foo
- config:
"test2": foo
`,
config:
test1: foo`,
},
// Appender with a condition check that fails. Only appender with no condition should pass
{
name: "Appender with a condition check that fails",
event: bus.Event{
"foo": "bar",
"field": "notbar",
},
result: common.MapStr{
"test": "bar",
"test1": "foo",
"test": "bar",
},
eventConfig: common.MapStr{
"test": "bar",
},
config: `
- config:
"test1": foo
- config:
"test2": foo
condition.equals:
"foo": "bar1"
`,
config:
test2: foo
condition.equals:
field: bar`,
},
// Appender with a condition check that passes. It should get appended
{
name: "Appender with a condition check that passes. It should get appended",
event: bus.Event{
"foo": "bar",
"field": "bar",
},
result: common.MapStr{
"test": "bar",
"test1": "foo",
"test2": "foo",
},
eventConfig: common.MapStr{
"test": "bar",
},
config: `
- config:
"test1": foo
- config:
"test2": foo
condition.equals:
"foo": "bar"
`,
config:
test2: foo
condition.equals:
field: bar`,
},
}
for _, test := range tests {
config, err := common.NewConfigWithYAML([]byte(test.config), "")
if err != nil {
t.Fatal(err)
}
t.Run(test.name, func(t *testing.T) {
config, err := common.NewConfigWithYAML([]byte(test.config), "")
if err != nil {
t.Fatal(err)
}

appender, err := NewConfigAppender(config)
assert.Nil(t, err)
assert.NotNil(t, appender)
appender, err := NewConfigAppender(config)
assert.Nil(t, err)
assert.NotNil(t, appender)

eveConfig, err := common.NewConfigFrom(&test.eventConfig)
assert.Nil(t, err)
eveConfig, err := common.NewConfigFrom(&test.eventConfig)
assert.Nil(t, err)

test.event["config"] = []*common.Config{eveConfig}
appender.Append(test.event)
test.event["config"] = []*common.Config{eveConfig}
appender.Append(test.event)

cfgs, _ := test.event["config"].([]*common.Config)
assert.Equal(t, len(cfgs), 1)
cfgs, _ := test.event["config"].([]*common.Config)
assert.Equal(t, len(cfgs), 1)

out := common.MapStr{}
cfgs[0].Unpack(&out)
out := common.MapStr{}
cfgs[0].Unpack(&out)

assert.Equal(t, out, test.result)
assert.Equal(t, out, test.result)
})

}
}
4 changes: 1 addition & 3 deletions libbeat/autodiscover/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@ package autodiscover

import (
"github.com/elastic/beats/libbeat/common"
"github.com/elastic/beats/libbeat/conditions"
)

// Config settings for Autodiscover
Expand All @@ -39,6 +38,5 @@ type BuilderConfig struct {

// AppenderConfig settings
type AppenderConfig struct {
Type string `config:"type"`
ConditionConfig *conditions.Config `config:"condition"`
Type string `config:"type"`
}
1 change: 1 addition & 0 deletions libbeat/cmd/instance/imports.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package instance

import (
_ "github.com/elastic/beats/libbeat/autodiscover/appenders/config" // Register autodiscover appenders
_ "github.com/elastic/beats/libbeat/autodiscover/providers/docker" // Register autodiscover providers
_ "github.com/elastic/beats/libbeat/autodiscover/providers/jolokia"
_ "github.com/elastic/beats/libbeat/autodiscover/providers/kubernetes"
Expand Down
48 changes: 48 additions & 0 deletions metricbeat/tests/system/test_autodiscover.py
Original file line number Diff line number Diff line change
Expand Up @@ -95,3 +95,51 @@ def test_docker_labels(self):
# Check metadata is added
assert output[0]['docker']['container']['image'] == 'memcached:latest'
assert 'name' in output[0]['docker']['container']

@unittest.skipIf(not INTEGRATION_TESTS or
os.getenv("TESTING_ENVIRONMENT") == "2x",
"integration test not available on 2.x")
def test_config_appender(self):
"""
Test config appenders correctly updates configs
"""
import docker
docker_client = docker.from_env()

self.render_config_template(
autodiscover={
'docker': {
'hints.enabled': 'true',
'appenders': '''
- type: config
condition:
equals.docker.container.image: memcached:latest
config:
fields:
foo: bar
''',
},
},
)

proc = self.start_beat()
docker_client.images.pull('memcached:latest')
labels = {
'co.elastic.metrics/module': 'memcached',
'co.elastic.metrics/period': '1s',
'co.elastic.metrics/hosts': "'${data.host}:11211'",
}
container = docker_client.containers.run('memcached:latest', labels=labels, detach=True)

self.wait_until(lambda: self.log_contains('Starting runner: memcached'))

self.wait_until(lambda: self.output_count(lambda x: x >= 1))
container.stop()

self.wait_until(lambda: self.log_contains('Stopping runner: memcached'))

output = self.read_output_json()
proc.check_kill_and_wait()

# Check field is added
assert output[0]['fields']['foo'] == 'bar'