From a6f43f88e3f00fdba68a180ba2ecf551f177c851 Mon Sep 17 00:00:00 2001 From: Steffen Siering Date: Thu, 20 Jul 2017 01:15:22 +0200 Subject: [PATCH] Update metricbeat to use new publisher API (#4699) * update metricbeat to use new publisher API * update libbeat/publisher/testing to support new API only * Update metricbeat unit tests * Update libbeat/publisher/testing unit test * Update filters->processors in system tests * Update redis test to change semantics on processor * review fixes --- CHANGELOG.asciidoc | 2 + libbeat/publisher/testing/testing.go | 59 ++----- libbeat/publisher/testing/testing_test.go | 16 +- metricbeat/beater/metricbeat.go | 64 ++++++-- metricbeat/mb/builders.go | 145 +++++++---------- metricbeat/mb/mb.go | 23 ++- metricbeat/mb/mb_test.go | 153 +++++------------- metricbeat/mb/module/connector.go | 46 ++++++ metricbeat/mb/module/event.go | 49 +++--- metricbeat/mb/module/event_test.go | 16 +- metricbeat/mb/module/example_test.go | 50 +++++- metricbeat/mb/module/factory.go | 19 ++- metricbeat/mb/module/publish.go | 9 +- metricbeat/mb/module/runner.go | 8 +- metricbeat/mb/module/runner_test.go | 8 +- metricbeat/mb/module/wrapper.go | 85 +++------- metricbeat/mb/testing/data_generator.go | 14 +- metricbeat/mb/testing/modules.go | 25 ++- metricbeat/module/mysql/status/status_test.go | 2 +- metricbeat/module/system/cpu/cpu_test.go | 4 +- .../tests/system/config/metricbeat.yml.j2 | 8 +- metricbeat/tests/system/test_redis.py | 10 +- 22 files changed, 377 insertions(+), 438 deletions(-) create mode 100644 metricbeat/mb/module/connector.go diff --git a/CHANGELOG.asciidoc b/CHANGELOG.asciidoc index d2d4973a0c9e..41bfcc711ddc 100644 --- a/CHANGELOG.asciidoc +++ b/CHANGELOG.asciidoc @@ -34,6 +34,7 @@ https://github.com/elastic/beats/compare/v6.0.0-alpha2...master[Check the HEAD d This will make the CPU usage percentages from the system cpu metricset consistent with the system process metricset. The documentation for these metrics already stated that on multi-core systems the percentages could be greater than 100%. {pull}4544[4544] +- Remove filters setting from metricbeat modules. {pull}4699[4699] *Packetbeat* @@ -105,6 +106,7 @@ https://github.com/elastic/beats/compare/v6.0.0-alpha2...master[Check the HEAD d - Add the ability to configure kernel's audit failure mode. {pull}4516[4516] - Add experimental Aerospike module. {pull}4560[4560] - Vsphere module: collect custom fields from virtual machines. {issue}4464[4464] +- Add `processors` setting to metricbeat modules. {pull}4699[4699] *Packetbeat* diff --git a/libbeat/publisher/testing/testing.go b/libbeat/publisher/testing/testing.go index 5f14a4c3c95a..38c808cff394 100644 --- a/libbeat/publisher/testing/testing.go +++ b/libbeat/publisher/testing/testing.go @@ -2,38 +2,30 @@ package testing // ChanClient implements Client interface, forwarding published events to some import ( - "github.com/elastic/beats/libbeat/common" "github.com/elastic/beats/libbeat/publisher/bc/publisher" "github.com/elastic/beats/libbeat/publisher/beat" ) type TestPublisher struct { - client publisher.Client + client beat.Client } // given channel only. type ChanClient struct { done chan struct{} - Channel chan PublishMessage - - recvBuf []common.MapStr -} - -type PublishMessage struct { - Context publisher.Context - Events []common.MapStr + Channel chan beat.Event } -func PublisherWithClient(client publisher.Client) publisher.Publisher { +func PublisherWithClient(client beat.Client) publisher.Publisher { return &TestPublisher{client} } func (pub *TestPublisher) Connect() publisher.Client { - return pub.client + panic("Not supported") } func (pub *TestPublisher) ConnectX(_ beat.ClientConfig) (beat.Client, error) { - panic("Not supported") + return pub.client, nil } func (pub *TestPublisher) SetACKHandler(_ beat.PipelineACKHandler) error { @@ -41,12 +33,12 @@ func (pub *TestPublisher) SetACKHandler(_ beat.PipelineACKHandler) error { } func NewChanClient(bufSize int) *ChanClient { - return NewChanClientWith(make(chan PublishMessage, bufSize)) + return NewChanClientWith(make(chan beat.Event, bufSize)) } -func NewChanClientWith(ch chan PublishMessage) *ChanClient { +func NewChanClientWith(ch chan beat.Event) *ChanClient { if ch == nil { - ch = make(chan PublishMessage, 1) + ch = make(chan beat.Event, 1) } c := &ChanClient{ done: make(chan struct{}), @@ -62,40 +54,19 @@ func (c *ChanClient) Close() error { // PublishEvent will publish the event on the channel. Options will be ignored. // Always returns true. -func (c *ChanClient) PublishEvent(event common.MapStr, opts ...publisher.ClientOption) bool { - return c.PublishEvents([]common.MapStr{event}, opts...) -} - -// PublishEvents publishes all event on the configured channel. Options will be ignored. -// Always returns true. -func (c *ChanClient) PublishEvents(events []common.MapStr, opts ...publisher.ClientOption) bool { - _, ctx := publisher.MakeContext(opts) - msg := PublishMessage{ctx, events} +func (c *ChanClient) Publish(event beat.Event) { select { case <-c.done: - return false - case c.Channel <- msg: - return true + case c.Channel <- event: } } -func (c *ChanClient) ReceiveEvent() common.MapStr { - if len(c.recvBuf) > 0 { - evt := c.recvBuf[0] - c.recvBuf = c.recvBuf[1:] - return evt +func (c *ChanClient) PublishAll(event []beat.Event) { + for _, e := range event { + c.Publish(e) } - - msg := <-c.Channel - c.recvBuf = msg.Events - return c.ReceiveEvent() } -func (c *ChanClient) ReceiveEvents() []common.MapStr { - if len(c.recvBuf) > 0 { - return c.recvBuf - } - - msg := <-c.Channel - return msg.Events +func (c *ChanClient) ReceiveEvent() beat.Event { + return <-c.Channel } diff --git a/libbeat/publisher/testing/testing_test.go b/libbeat/publisher/testing/testing_test.go index bea675b095bd..1b6810d5aafc 100644 --- a/libbeat/publisher/testing/testing_test.go +++ b/libbeat/publisher/testing/testing_test.go @@ -4,15 +4,19 @@ import ( "testing" "github.com/elastic/beats/libbeat/common" + "github.com/elastic/beats/libbeat/publisher/beat" "github.com/stretchr/testify/assert" ) var cnt = 0 -func testEvent() common.MapStr { - event := common.MapStr{} - event["message"] = "test" - event["idx"] = cnt +func testEvent() beat.Event { + event := beat.Event{ + Fields: common.MapStr{ + "message": "test", + "idx": cnt, + }, + } cnt++ return event } @@ -21,7 +25,7 @@ func testEvent() common.MapStr { func TestChanClientPublishEvent(t *testing.T) { cc := NewChanClient(1) e1 := testEvent() - cc.PublishEvent(e1) + cc.Publish(e1) assert.Equal(t, e1, cc.ReceiveEvent()) } @@ -30,7 +34,7 @@ func TestChanClientPublishEvents(t *testing.T) { cc := NewChanClient(1) e1, e2 := testEvent(), testEvent() - cc.PublishEvents([]common.MapStr{e1, e2}) + go cc.PublishAll([]beat.Event{e1, e2}) assert.Equal(t, e1, cc.ReceiveEvent()) assert.Equal(t, e2, cc.ReceiveEvent()) } diff --git a/metricbeat/beater/metricbeat.go b/metricbeat/beater/metricbeat.go index 4140f8cb04e6..ba0810fa48f2 100644 --- a/metricbeat/beater/metricbeat.go +++ b/metricbeat/beater/metricbeat.go @@ -6,9 +6,9 @@ import ( "github.com/elastic/beats/libbeat/beat" "github.com/elastic/beats/libbeat/common" "github.com/elastic/beats/libbeat/logp" - "github.com/elastic/beats/libbeat/publisher/bc/publisher" "github.com/elastic/beats/metricbeat/mb" "github.com/elastic/beats/metricbeat/mb/module" + "github.com/joeshaw/multierror" "github.com/elastic/beats/libbeat/cfgfile" "github.com/pkg/errors" @@ -19,12 +19,16 @@ import ( // Metricbeat implements the Beater interface for metricbeat. type Metricbeat struct { - done chan struct{} // Channel used to initiate shutdown. - modules []*module.Wrapper // Active list of modules. - client publisher.Client // Publisher client. + done chan struct{} // Channel used to initiate shutdown. + modules []staticModule // Active list of modules. config Config } +type staticModule struct { + connector *module.Connector + module *module.Wrapper +} + // New creates and returns a new Metricbeat instance. func New(b *beat.Beat, rawConfig *common.Config) (beat.Beater, error) { // List all registered modules and metricsets. @@ -35,14 +39,45 @@ func New(b *beat.Beat, rawConfig *common.Config) (beat.Beater, error) { return nil, errors.Wrap(err, "error reading configuration file") } - modules, err := module.NewWrappers(config.MaxStartDelay, config.Modules, mb.Registry) - if err != nil { - // Empty config is fine if dynamic config is enabled - if !config.ConfigModules.Enabled() { - return nil, err - } else if err != mb.ErrEmptyConfig && err != mb.ErrAllModulesDisabled { - return nil, err + dynamicCfgEnabled := config.ConfigModules.Enabled() + if !dynamicCfgEnabled && len(config.Modules) == 0 { + return nil, mb.ErrEmptyConfig + } + + var errs multierror.Errors + var modules []staticModule + for _, moduleCfg := range config.Modules { + if !moduleCfg.Enabled() { + continue + } + + failed := false + connector, err := module.NewConnector(b.Publisher, moduleCfg) + if err != nil { + errs = append(errs, err) + failed = true + } + + module, err := module.NewWrapper(config.MaxStartDelay, moduleCfg, mb.Registry) + if err != nil { + errs = append(errs, err) + failed = true } + + if failed { + continue + } + modules = append(modules, staticModule{ + connector: connector, + module: module, + }) + } + + if err := errs.Err(); err != nil { + return nil, err + } + if len(modules) == 0 && !dynamicCfgEnabled { + return nil, mb.ErrAllModulesDisabled } mb := &Metricbeat{ @@ -62,7 +97,12 @@ func (bt *Metricbeat) Run(b *beat.Beat) error { var wg sync.WaitGroup for _, m := range bt.modules { - r := module.NewRunner(b.Publisher.Connect, m) + client, err := m.connector.Connect() + if err != nil { + return err + } + + r := module.NewRunner(client, m.module) r.Start() wg.Add(1) go func() { diff --git a/metricbeat/mb/builders.go b/metricbeat/mb/builders.go index 844884a8978a..4837caf6799f 100644 --- a/metricbeat/mb/builders.go +++ b/metricbeat/mb/builders.go @@ -20,41 +20,37 @@ var ( // ErrAllModulesDisabled indicates that all modules are disabled. At least // one module must be enabled. ErrAllModulesDisabled = errors.New("all modules are disabled") + + // ErrModuleDisabled indicates a disabled module has been tried to instantiate. + ErrModuleDisabled = errors.New("disabled module") ) -// NewModules builds new Modules and their associated MetricSets based on the -// provided configuration data. config is a list module config data (the data +// NewModule builds a new Module and its associated MetricSets based on the +// provided configuration data. config contains config data (the data // will be unpacked into ModuleConfig structs). r is the Register where the // ModuleFactory's and MetricSetFactory's will be obtained from. This method -// returns a mapping of Modules to MetricSets or an error. -func NewModules(config []*common.Config, r *Register) (map[Module][]MetricSet, error) { - if config == nil || len(config) == 0 { - return nil, ErrEmptyConfig +// returns a Module and its configured MetricSets or an error. +func NewModule(config *common.Config, r *Register) (Module, []MetricSet, error) { + if !config.Enabled() { + return nil, nil, ErrModuleDisabled } - baseModules, err := newBaseModulesFromConfig(config) + bm, err := newBaseModuleFromConfig(config) if err != nil { - return nil, err + return nil, nil, err } - // Create new Modules using the registered ModuleFactory's - modules, err := createModules(r, baseModules) + module, err := createModule(r, bm) if err != nil { - return nil, err + return nil, nil, err } - // Create new MetricSets for each Module using the registered MetricSetFactory's - modToMetricSets, err := initMetricSets(r, modules) + metricsets, err := initMetricSets(r, module) if err != nil { - return nil, err - } - - if len(modToMetricSets) == 0 { - return nil, ErrAllModulesDisabled + return nil, nil, err } - debugf("mb.NewModules() is returning %s", modToMetricSets) - return modToMetricSets, nil + return module, metricsets, nil } // newBaseModulesFromConfig creates new BaseModules from a list of configs @@ -104,104 +100,77 @@ func newBaseModuleFromConfig(rawConfig *common.Config) (BaseModule, error) { return baseModule, nil } -func createModules(r *Register, baseModules []BaseModule) ([]Module, error) { - modules := make([]Module, 0, len(baseModules)) - var errs multierror.Errors - for _, bm := range baseModules { - f := r.moduleFactory(bm.Name()) - if f == nil { - f = DefaultModuleFactory - } - - module, err := f(bm) - if err != nil { - errs = append(errs, err) - continue - } - modules = append(modules, module) +func createModule(r *Register, bm BaseModule) (Module, error) { + f := r.moduleFactory(bm.Name()) + if f == nil { + f = DefaultModuleFactory } - err := errs.Err() - if err != nil { - return nil, err - } - return modules, nil + return f(bm) } -func initMetricSets(r *Register, modules []Module) (map[Module][]MetricSet, error) { - active := map[Module][]MetricSet{} - var errs multierror.Errors - for _, bms := range newBaseMetricSets(modules) { - f, hostParser, err := r.metricSetFactory(bms.Module().Name(), bms.Name()) +func initMetricSets(r *Register, m Module) ([]MetricSet, error) { + var ( + errs multierror.Errors + metricsets []MetricSet + ) + + for _, bm := range newBaseMetricSets(m) { + f, hostParser, err := r.metricSetFactory(bm.Module().Name(), bm.Name()) if err != nil { errs = append(errs, err) continue } - // Parse the 'host' field using the HostParser registered with the MetricSet. + bm.hostData = HostData{URI: bm.host} if hostParser != nil { - bms.hostData, err = hostParser(bms.Module(), bms.host) + bm.hostData, err = hostParser(bm.Module(), bm.host) if err != nil { errs = append(errs, errors.Wrapf(err, "host parsing failed for %v-%v", - bms.Module().Name(), bms.Name())) + bm.Module().Name(), bm.Name())) continue } - bms.host = bms.hostData.Host - } else { - // The MetricSet was registered without a HostParser so provide a - // default HostData value. - bms.hostData = HostData{URI: bms.host} + bm.host = bm.hostData.Host } - metricSet, err := f(bms) - if err != nil { - errs = append(errs, err) - continue - } - err = mustImplementFetcher(metricSet) - if err != nil { - errs = append(errs, err) - continue + metricSet, err := f(bm) + if err == nil { + err = mustHaveModule(metricSet, bm) + if err == nil { + err = mustImplementFetcher(metricSet) + } } - err = mustHaveModule(metricSet, bms) if err != nil { errs = append(errs, err) continue } - module := metricSet.Module() - active[module] = append(active[module], metricSet) - } - - err := errs.Err() - if err != nil { - return nil, err + metricsets = append(metricsets, metricSet) } - return active, nil + return metricsets, errs.Err() } // newBaseMetricSets creates a new BaseMetricSet for all MetricSets defined // in the modules' config. -func newBaseMetricSets(modules []Module) []BaseMetricSet { - baseMetricSets := make([]BaseMetricSet, 0, len(modules)) - for _, m := range modules { - hosts := []string{""} - if len(m.Config().Hosts) > 0 { - hosts = m.Config().Hosts - } - - for _, name := range m.Config().MetricSets { - for _, host := range hosts { - baseMetricSets = append(baseMetricSets, BaseMetricSet{ - name: strings.ToLower(name), - module: m, - host: host, - }) - } +func newBaseMetricSets(m Module) []BaseMetricSet { + hosts := []string{""} + if l := m.Config().Hosts; len(l) > 0 { + hosts = l + } + + var metricsets []BaseMetricSet + for _, name := range m.Config().MetricSets { + name = strings.ToLower(name) + for _, host := range hosts { + metricsets = append(metricsets, BaseMetricSet{ + name: name, + module: m, + host: host, + }) } } - return baseMetricSets + return metricsets } // mustHaveModule returns an error if the given MetricSet's Module() method diff --git a/metricbeat/mb/mb.go b/metricbeat/mb/mb.go index 61534d02d239..d8dc2ed39524 100644 --- a/metricbeat/mb/mb.go +++ b/metricbeat/mb/mb.go @@ -9,7 +9,6 @@ import ( "time" "github.com/elastic/beats/libbeat/common" - "github.com/elastic/beats/libbeat/processors" ) const ( @@ -206,24 +205,20 @@ func (b *BaseMetricSet) HostData() HostData { // the metricset fetches not only the predefined fields but add alls raw data under // the raw namespace to the event. type ModuleConfig struct { - Hosts []string `config:"hosts"` - Period time.Duration `config:"period" validate:"positive"` - Timeout time.Duration `config:"timeout" validate:"positive"` - Module string `config:"module" validate:"required"` - MetricSets []string `config:"metricsets" validate:"required"` - Enabled bool `config:"enabled"` - Filters processors.PluginConfig `config:"filters"` - Raw bool `config:"raw"` - - common.EventMetadata `config:",inline"` // Fields and tags to add to events. + Hosts []string `config:"hosts"` + Period time.Duration `config:"period" validate:"positive"` + Timeout time.Duration `config:"timeout" validate:"positive"` + Module string `config:"module" validate:"required"` + MetricSets []string `config:"metricsets" validate:"required"` + Enabled bool `config:"enabled"` + Raw bool `config:"raw"` } func (c ModuleConfig) String() string { return fmt.Sprintf(`{Module:"%v", MetricSets:%v, Enabled:%v, `+ - `Hosts:[%v hosts], Period:"%v", Timeout:"%v", Raw:%v, Fields:%v, `+ - `FieldsUnderRoot:%v, Tags:%v}`, + `Hosts:[%v hosts], Period:"%v", Timeout:"%v", Raw:%v}`, c.Module, c.MetricSets, c.Enabled, len(c.Hosts), c.Period, c.Timeout, - c.Raw, c.Fields, c.FieldsUnderRoot, c.Tags) + c.Raw) } func (c ModuleConfig) GoString() string { return c.String() } diff --git a/metricbeat/mb/mb_test.go b/metricbeat/mb/mb_test.go index 1f2c2fb7e745..926e26521fd6 100644 --- a/metricbeat/mb/mb_test.go +++ b/metricbeat/mb/mb_test.go @@ -159,29 +159,6 @@ func TestModuleConfigDefaults(t *testing.T) { assert.Empty(t, mc.Hosts) } -// TestNewModulesWithEmptyModulesConfig verifies that an error is returned if -// the modules configuration list is empty. -func TestNewModulesWithEmptyModulesConfig(t *testing.T) { - r := newTestRegistry(t) - _, err := NewModules(nil, r) - assert.Equal(t, ErrEmptyConfig, err) -} - -// TestNewModulesWithAllDisabled verifies that an error is returned if all -// modules defined in the config are disabled. -func TestNewModulesWithAllDisabled(t *testing.T) { - r := newTestRegistry(t) - - c := newConfig(t, map[string]interface{}{ - "module": moduleName, - "metricsets": []string{metricSetName}, - "enabled": false, - }) - - _, err := NewModules(c, r) - assert.Equal(t, ErrAllModulesDisabled, err) -} - // TestNewModulesDuplicateHosts verifies that an error is returned by // NewModules if any module configuration contains duplicate hosts. func TestNewModulesDuplicateHosts(t *testing.T) { @@ -193,7 +170,7 @@ func TestNewModulesDuplicateHosts(t *testing.T) { "hosts": []string{"a", "b", "a"}, }) - _, err := NewModules(c, r) + _, _, err := NewModule(c, r) assert.Error(t, err) } @@ -219,49 +196,27 @@ func TestNewModulesHostParser(t *testing.T) { } t.Run("MetricSet without HostParser", func(t *testing.T) { - c := newConfig(t, map[string]interface{}{ + ms := newTestMetricSet(t, r, map[string]interface{}{ "module": moduleName, "metricsets": []string{metricSetName}, "hosts": []string{uri}, }) - modules, err := NewModules(c, r) - if err != nil { - t.Fatal(err) - } - - for _, metricSets := range modules { - metricSet := metricSets[0] - - // The URI is passed through in the Host() and HostData().URI. - assert.Equal(t, uri, metricSet.Host()) - assert.Equal(t, HostData{URI: uri}, metricSet.HostData()) - return - } - assert.FailNow(t, "no modules found") + // The URI is passed through in the Host() and HostData().URI. + assert.Equal(t, uri, ms.Host()) + assert.Equal(t, HostData{URI: uri}, ms.HostData()) }) t.Run("MetricSet with HostParser", func(t *testing.T) { - c := newConfig(t, map[string]interface{}{ + ms := newTestMetricSet(t, r, map[string]interface{}{ "module": moduleName, "metricsets": []string{name}, "hosts": []string{uri}, }) - modules, err := NewModules(c, r) - if err != nil { - t.Fatal(err) - } - - for _, metricSets := range modules { - metricSet := metricSets[0] - - // The URI is passed through in the Host() and HostData().URI. - assert.Equal(t, host, metricSet.Host()) - assert.Equal(t, HostData{URI: uri, Host: host}, metricSet.HostData()) - return - } - assert.FailNow(t, "no modules found") + // The URI is passed through in the Host() and HostData().URI. + assert.Equal(t, host, ms.Host()) + assert.Equal(t, HostData{URI: uri, Host: host}, ms.HostData()) }) } @@ -279,24 +234,12 @@ func TestNewModulesMetricSetTypes(t *testing.T) { } t.Run(name+" MetricSet", func(t *testing.T) { - c := newConfig(t, map[string]interface{}{ + ms := newTestMetricSet(t, r, map[string]interface{}{ "module": moduleName, "metricsets": []string{name}, }) - - modules, err := NewModules(c, r) - if err != nil { - t.Fatal(err) - } - assert.Len(t, modules, 1) - - for _, metricSets := range modules { - if assert.Len(t, metricSets, 1) { - metricSet := metricSets[0] - _, ok := metricSet.(EventFetcher) - assert.True(t, ok, name+" not implemented") - } - } + _, ok := ms.(EventFetcher) + assert.True(t, ok, name+" not implemented") }) factory = func(base BaseMetricSet) (MetricSet, error) { @@ -309,24 +252,12 @@ func TestNewModulesMetricSetTypes(t *testing.T) { } t.Run(name+" MetricSet", func(t *testing.T) { - c := newConfig(t, map[string]interface{}{ + ms := newTestMetricSet(t, r, map[string]interface{}{ "module": moduleName, "metricsets": []string{name}, }) - - modules, err := NewModules(c, r) - if err != nil { - t.Fatal(err) - } - assert.Len(t, modules, 1) - - for _, metricSets := range modules { - if assert.Len(t, metricSets, 1) { - metricSet := metricSets[0] - _, ok := metricSet.(EventsFetcher) - assert.True(t, ok, name+" not implemented") - } - } + _, ok := ms.(EventsFetcher) + assert.True(t, ok, name+" not implemented") }) factory = func(base BaseMetricSet) (MetricSet, error) { @@ -339,24 +270,13 @@ func TestNewModulesMetricSetTypes(t *testing.T) { } t.Run(name+" MetricSet", func(t *testing.T) { - c := newConfig(t, map[string]interface{}{ + ms := newTestMetricSet(t, r, map[string]interface{}{ "module": moduleName, "metricsets": []string{name}, }) - modules, err := NewModules(c, r) - if err != nil { - t.Fatal(err) - } - assert.Len(t, modules, 1) - - for _, metricSets := range modules { - if assert.Len(t, metricSets, 1) { - metricSet := metricSets[0] - _, ok := metricSet.(ReportingMetricSet) - assert.True(t, ok, name+" not implemented") - } - } + _, ok := ms.(ReportingMetricSet) + assert.True(t, ok, name+" not implemented") }) factory = func(base BaseMetricSet) (MetricSet, error) { @@ -369,24 +289,12 @@ func TestNewModulesMetricSetTypes(t *testing.T) { } t.Run(name+" MetricSet", func(t *testing.T) { - c := newConfig(t, map[string]interface{}{ + ms := newTestMetricSet(t, r, map[string]interface{}{ "module": moduleName, "metricsets": []string{name}, }) - - modules, err := NewModules(c, r) - if err != nil { - t.Fatal(err) - } - assert.Len(t, modules, 1) - - for _, metricSets := range modules { - if assert.Len(t, metricSets, 1) { - metricSet := metricSets[0] - _, ok := metricSet.(PushMetricSet) - assert.True(t, ok, name+" not implemented") - } - } + _, ok := ms.(PushMetricSet) + assert.True(t, ok, name+" not implemented") }) } @@ -398,7 +306,7 @@ func TestNewBaseModuleFromModuleConfigStruct(t *testing.T) { c := newConfig(t, moduleConf) - baseModule, err := newBaseModuleFromConfig(c[0]) + baseModule, err := newBaseModuleFromConfig(c) assert.NoError(t, err) assert.Equal(t, moduleName, baseModule.Name()) @@ -427,11 +335,22 @@ func newTestRegistry(t testing.TB) *Register { return r } -func newConfig(t testing.TB, moduleConfig interface{}) []*common.Config { - config, err := common.NewConfigFrom(moduleConfig) +func newTestMetricSet(t testing.TB, r *Register, config map[string]interface{}) MetricSet { + _, metricsets, err := NewModule(newConfig(t, config), r) if err != nil { t.Fatal(err) } + if !assert.Len(t, metricsets, 1) { + assert.FailNow(t, "invalid number of metricsets") + } + + return metricsets[0] +} - return []*common.Config{config} +func newConfig(t testing.TB, moduleConfig interface{}) *common.Config { + config, err := common.NewConfigFrom(moduleConfig) + if err != nil { + t.Fatal(err) + } + return config } diff --git a/metricbeat/mb/module/connector.go b/metricbeat/mb/module/connector.go new file mode 100644 index 000000000000..e9de37243b1e --- /dev/null +++ b/metricbeat/mb/module/connector.go @@ -0,0 +1,46 @@ +package module + +import ( + "github.com/elastic/beats/libbeat/common" + "github.com/elastic/beats/libbeat/processors" + "github.com/elastic/beats/libbeat/publisher/bc/publisher" + "github.com/elastic/beats/libbeat/publisher/beat" +) + +// Connector configures ann establishes a beat.Client for publishing events +// to the publisher pipeline. +type Connector struct { + pipeline publisher.Publisher + processors *processors.Processors + eventMeta common.EventMetadata +} + +type connectorConfig struct { + Processors processors.PluginConfig `config:"processors"` + common.EventMetadata `config:",inline"` // Fields and tags to add to events. +} + +func NewConnector(pipeline publisher.Publisher, c *common.Config) (*Connector, error) { + config := connectorConfig{} + if err := c.Unpack(&config); err != nil { + return nil, err + } + + processors, err := processors.New(config.Processors) + if err != nil { + return nil, err + } + + return &Connector{ + pipeline: pipeline, + processors: processors, + eventMeta: config.EventMetadata, + }, nil +} + +func (c *Connector) Connect() (beat.Client, error) { + return c.pipeline.ConnectX(beat.ClientConfig{ + EventMetadata: c.eventMeta, + Processor: c.processors, + }) +} diff --git a/metricbeat/mb/module/event.go b/metricbeat/mb/module/event.go index 341b70937fda..174e9caf570a 100644 --- a/metricbeat/mb/module/event.go +++ b/metricbeat/mb/module/event.go @@ -5,7 +5,7 @@ import ( "github.com/elastic/beats/libbeat/common" "github.com/elastic/beats/libbeat/logp" - "github.com/elastic/beats/libbeat/processors" + "github.com/elastic/beats/libbeat/publisher/beat" "github.com/elastic/beats/metricbeat/mb" ) @@ -20,28 +20,19 @@ type EventBuilder struct { FetchDuration time.Duration Event common.MapStr fetchErr error - filters *processors.Processors - metadata common.EventMetadata } // Build builds an event from MetricSet data and applies the Module-level // filters. -func (b EventBuilder) Build() (common.MapStr, error) { +func (b EventBuilder) Build() (beat.Event, error) { // event may be nil when there was an error fetching. - event := b.Event - if event == nil { - event = common.MapStr{} // TODO (akroh): do we want to send an empty event field? + fields := b.Event + if fields == nil { + fields = common.MapStr{} } // Get and remove meta fields from the event created by the MetricSet. - timestamp := getTimestamp(event, common.Time(b.StartTime)) - - // Apply filters. - if b.filters != nil { - if event = b.filters.RunBC(event); event == nil { - return nil, nil - } - } + timestamp := time.Time(getTimestamp(fields, common.Time(b.StartTime))) metricsetData := common.MapStr{ "module": b.ModuleName, @@ -56,8 +47,8 @@ func (b EventBuilder) Build() (common.MapStr, error) { } namespace := b.MetricSetName - if n, ok := event["_namespace"]; ok { - delete(event, "_namespace") + if n, ok := fields["_namespace"]; ok { + delete(fields, "_namespace") if ns, ok := n.(string); ok { namespace = ns } @@ -67,13 +58,13 @@ func (b EventBuilder) Build() (common.MapStr, error) { // Checks if additional meta information is provided by the MetricSet under the key ModuleData // This is based on the convention that each MetricSet can provide module data under the key ModuleData - moduleData, moudleDataExists := event[mb.ModuleDataKey] + moduleData, moudleDataExists := fields[mb.ModuleDataKey] if moudleDataExists { - delete(event, mb.ModuleDataKey) + delete(fields, mb.ModuleDataKey) } moduleEvent := common.MapStr{} - moduleEvent.Put(namespace, event) + moduleEvent.Put(namespace, fields) // In case meta data exists, it is added on the module level // This is mostly used for shared fields across multiple metricsets in one module @@ -83,16 +74,18 @@ func (b EventBuilder) Build() (common.MapStr, error) { } } - event = common.MapStr{ - "@timestamp": timestamp, - common.EventMetadataKey: b.metadata, - b.ModuleName: moduleEvent, - "metricset": metricsetData, + event := beat.Event{ + Timestamp: time.Time(timestamp), + Fields: common.MapStr{ + // common.EventMetadataKey: b.metadata, + b.ModuleName: moduleEvent, + "metricset": metricsetData, + }, } // Adds error to event in case error happened if b.fetchErr != nil { - event["error"] = common.MapStr{ + event.Fields["error"] = common.MapStr{ "message": b.fetchErr.Error(), } } @@ -127,7 +120,7 @@ func createEvent( fetchErr error, start time.Time, elapsed time.Duration, -) (common.MapStr, error) { +) (beat.Event, error) { return EventBuilder{ ModuleName: msw.Module().Name(), MetricSetName: msw.Name(), @@ -136,7 +129,5 @@ func createEvent( FetchDuration: elapsed, Event: event, fetchErr: fetchErr, - filters: msw.module.filters, - metadata: msw.module.Config().EventMetadata, }.Build() } diff --git a/metricbeat/mb/module/event_test.go b/metricbeat/mb/module/event_test.go index e4059e18f0b7..1f10d514ba67 100644 --- a/metricbeat/mb/module/event_test.go +++ b/metricbeat/mb/module/event_test.go @@ -45,15 +45,16 @@ func TestEventBuilder(t *testing.T) { t.Fatal(err) } - assert.Equal(t, common.Time(startTime), event["@timestamp"]) + assert.Equal(t, startTime, event.Timestamp) - metricset := event["metricset"].(common.MapStr) + module := event.Fields[moduleName].(common.MapStr) + metricset := event.Fields["metricset"].(common.MapStr) assert.Equal(t, moduleName, metricset["module"]) assert.Equal(t, metricSetName, metricset["name"]) assert.Equal(t, int64(500000), metricset["rtt"]) assert.Equal(t, host, metricset["host"]) - assert.Equal(t, common.MapStr{}, event[moduleName].(common.MapStr)[metricSetName]) - assert.Nil(t, event["error"]) + assert.Equal(t, common.MapStr{}, module[metricSetName]) + assert.Nil(t, event.Fields["error"]) } func TestEventBuilderError(t *testing.T) { @@ -64,7 +65,8 @@ func TestEventBuilderError(t *testing.T) { t.Fatal(err) } - assert.Equal(t, errFetch.Error(), event["error"].(common.MapStr)["message"]) + errDoc := event.Fields["error"].(common.MapStr) + assert.Equal(t, errFetch.Error(), errDoc["message"]) } func TestEventBuilderNoHost(t *testing.T) { @@ -74,7 +76,7 @@ func TestEventBuilderNoHost(t *testing.T) { t.Fatal(err) } - _, found := event["metricset-host"] + _, found := event.Fields["metricset-host"] assert.False(t, found) } @@ -87,7 +89,7 @@ func TestEventBuildNoRTT(t *testing.T) { t.Fatal(err) } - metricset := event["metricset"].(common.MapStr) + metricset := event.Fields["metricset"].(common.MapStr) _, found := metricset["rtt"] assert.False(t, found, "found rtt") } diff --git a/metricbeat/mb/module/example_test.go b/metricbeat/mb/module/example_test.go index e4b59f80b0ff..d205a75b2dc4 100644 --- a/metricbeat/mb/module/example_test.go +++ b/metricbeat/mb/module/example_test.go @@ -3,12 +3,15 @@ package module_test import ( + stdjson "encoding/json" "fmt" "sync" "time" "github.com/elastic/beats/libbeat/beat" "github.com/elastic/beats/libbeat/common" + "github.com/elastic/beats/libbeat/outputs/codec/json" + pub "github.com/elastic/beats/libbeat/publisher/beat" "github.com/elastic/beats/metricbeat/mb" "github.com/elastic/beats/metricbeat/mb/module" ) @@ -45,8 +48,12 @@ func ExampleWrapper() { defer wg.Done() for event := range output { // Make rtt a constant so that the output is constant. - event["metricset"].(common.MapStr)["rtt"] = 111 - fmt.Println(event.StringToPrint()) + event.Fields["metricset"].(common.MapStr)["rtt"] = 111 + + output, err := encodeEvent(event) + if err == nil { + fmt.Println(output) + } } }() @@ -61,12 +68,11 @@ func ExampleWrapper() { // Output: // { - // "@timestamp": "2016-05-10T23:27:58.485Z", - // "_event_metadata": { - // "Fields": null, - // "FieldsUnderRoot": false, - // "Tags": null + // "@metadata": { + // "beat": "noindex", + // "type": "doc" // }, + // "@timestamp": "2016-05-10T23:27:58.485Z", // "fake": { // "eventfetcher": { // "metric": 1 @@ -102,8 +108,18 @@ func ExampleRunner() { return } + connector, err := module.NewConnector(b.Publisher, config) + if err != nil { + return + } + + client, err := connector.Connect() + if err != nil { + return + } + // Create the Runner facade. - runner := module.NewRunner(b.Publisher.Connect, m) + runner := module.NewRunner(client, m) // Start the module and have it publish to a new publisher.Client. runner.Start() @@ -112,3 +128,21 @@ func ExampleRunner() { // stopped and the publisher.Client is closed. runner.Stop() } + +func encodeEvent(event pub.Event) (string, error) { + output, err := json.New(false).Encode("noindex", &event) + if err != nil { + return "", nil + } + + // FIX: need to parse and re-encode, so fields ordering in final json document + // keeps stable. + + var tmp interface{} + if err := stdjson.Unmarshal(output, &tmp); err != nil { + panic(err) + } + + output, err = stdjson.MarshalIndent(tmp, "", " ") + return string(output), err +} diff --git a/metricbeat/mb/module/factory.go b/metricbeat/mb/module/factory.go index d281d214adff..cf641407f3ac 100644 --- a/metricbeat/mb/module/factory.go +++ b/metricbeat/mb/module/factory.go @@ -9,26 +9,37 @@ import ( "github.com/elastic/beats/metricbeat/mb" ) -// Factory is used to register and reload modules +// Factory creates new Runner instances from configuration objects. +// It is used to register and reload modules. type Factory struct { - client func() publisher.Client + pipeline publisher.Publisher maxStartDelay time.Duration } // NewFactory creates new Reloader instance for the given config func NewFactory(maxStartDelay time.Duration, p publisher.Publisher) *Factory { return &Factory{ - client: p.Connect, + pipeline: p, maxStartDelay: maxStartDelay, } } func (r *Factory) Create(c *common.Config) (cfgfile.Runner, error) { + connector, err := NewConnector(r.pipeline, c) + if err != nil { + return nil, err + } + w, err := NewWrapper(r.maxStartDelay, c, mb.Registry) if err != nil { return nil, err } - mr := NewRunner(r.client, w) + client, err := connector.Connect() + if err != nil { + return nil, err + } + + mr := NewRunner(client, w) return mr, nil } diff --git a/metricbeat/mb/module/publish.go b/metricbeat/mb/module/publish.go index 531648922932..dabddb7255cd 100644 --- a/metricbeat/mb/module/publish.go +++ b/metricbeat/mb/module/publish.go @@ -3,8 +3,7 @@ package module import ( "sync" - "github.com/elastic/beats/libbeat/common" - "github.com/elastic/beats/libbeat/publisher/bc/publisher" + "github.com/elastic/beats/libbeat/publisher/beat" ) // PublishChannels publishes the events read from each channel to the given @@ -15,14 +14,14 @@ import ( // and are fully read. To stop the method immediately, close the channels and // close the publisher client to ensure that publishing does not block. This // may result is some events being discarded. -func PublishChannels(client publisher.Client, cs ...<-chan common.MapStr) { +func PublishChannels(client beat.Client, cs ...<-chan beat.Event) { var wg sync.WaitGroup // output publishes values from c until c is closed, then calls wg.Done. - output := func(c <-chan common.MapStr) { + output := func(c <-chan beat.Event) { defer wg.Done() for event := range c { - client.PublishEvent(event) + client.Publish(event) } } diff --git a/metricbeat/mb/module/runner.go b/metricbeat/mb/module/runner.go index 142aa8415e9a..62e0e4327025 100644 --- a/metricbeat/mb/module/runner.go +++ b/metricbeat/mb/module/runner.go @@ -3,7 +3,7 @@ package module import ( "sync" - "github.com/elastic/beats/libbeat/publisher/bc/publisher" + "github.com/elastic/beats/libbeat/publisher/beat" ) // Runner is a facade for a Wrapper that provides a simple interface @@ -25,11 +25,11 @@ type Runner interface { // NewRunner returns a Runner facade. The events generated by // the Module will be published to a new publisher.Client generated from the // pubClientFactory. -func NewRunner(pubClientFactory func() publisher.Client, mod *Wrapper) Runner { +func NewRunner(client beat.Client, mod *Wrapper) Runner { return &runner{ done: make(chan struct{}), mod: mod, - client: pubClientFactory(), + client: client, } } @@ -39,7 +39,7 @@ type runner struct { startOnce sync.Once stopOnce sync.Once mod *Wrapper - client publisher.Client + client beat.Client } func (mr *runner) Start() { diff --git a/metricbeat/mb/module/runner_test.go b/metricbeat/mb/module/runner_test.go index e8aa4c9d95b7..e7a6ca5b86b0 100644 --- a/metricbeat/mb/module/runner_test.go +++ b/metricbeat/mb/module/runner_test.go @@ -6,7 +6,7 @@ import ( "testing" "github.com/elastic/beats/libbeat/common" - "github.com/elastic/beats/libbeat/publisher/bc/publisher" + "github.com/elastic/beats/libbeat/publisher/beat" pubtest "github.com/elastic/beats/libbeat/publisher/testing" "github.com/elastic/beats/metricbeat/mb" "github.com/elastic/beats/metricbeat/mb/module" @@ -32,7 +32,7 @@ func TestRunner(t *testing.T) { } // Create the Runner facade. - runner := module.NewRunner(factory, m) + runner := module.NewRunner(factory(), m) // Start the module and have it publish to a new publisher.Client. runner.Start() @@ -47,7 +47,7 @@ func TestRunner(t *testing.T) { // newPubClientFactory returns a new ChanClient and a function that returns // the same Client when invoked. This simulates the return value of // Publisher.Connect. -func newPubClientFactory() (*pubtest.ChanClient, func() publisher.Client) { +func newPubClientFactory() (*pubtest.ChanClient, func() beat.Client) { client := pubtest.NewChanClient(10) - return client, func() publisher.Client { return client } + return client, func() beat.Client { return client } } diff --git a/metricbeat/mb/module/wrapper.go b/metricbeat/mb/module/wrapper.go index 750976f4de40..542fb2cd2f15 100644 --- a/metricbeat/mb/module/wrapper.go +++ b/metricbeat/mb/module/wrapper.go @@ -9,12 +9,10 @@ import ( "github.com/elastic/beats/libbeat/common" "github.com/elastic/beats/libbeat/logp" "github.com/elastic/beats/libbeat/monitoring" - "github.com/elastic/beats/libbeat/processors" + "github.com/elastic/beats/libbeat/publisher/beat" "github.com/elastic/beats/metricbeat/mb" - "github.com/joeshaw/multierror" "github.com/mitchellh/hashstructure" - "github.com/pkg/errors" ) // Expvar metric names. @@ -37,7 +35,6 @@ var ( // Use NewWrapper or NewWrappers to construct new Wrappers. type Wrapper struct { mb.Module - filters *processors.Processors metricSets []*metricSetWrapper // List of pointers to its associated MetricSets. configHash uint64 maxStartDelay time.Duration @@ -61,64 +58,28 @@ type stats struct { } // NewWrapper create a new Module and its associated MetricSets based -// on the given configuration. It constructs the supporting filters and stores -// them in the Wrapper. -func NewWrapper(maxStartDelay time.Duration, moduleConfig *common.Config, r *mb.Register) (*Wrapper, error) { - mws, err := NewWrappers(maxStartDelay, []*common.Config{moduleConfig}, r) +// on the given configuration. +func NewWrapper(maxStartDelay time.Duration, config *common.Config, r *mb.Register) (*Wrapper, error) { + module, metricsets, err := mb.NewModule(config, r) if err != nil { return nil, err } - if len(mws) == 0 { - return nil, fmt.Errorf("module not created") + wrapper := &Wrapper{ + Module: module, + maxStartDelay: maxStartDelay, + metricSets: make([]*metricSetWrapper, len(metricsets)), } - return mws[0], nil -} - -// NewWrappers creates new Modules and their associated MetricSets based -// on the given configuration. It constructs the supporting filters and stores -// them all in a Wrapper. -func NewWrappers(maxStartDelay time.Duration, modulesConfig []*common.Config, r *mb.Register) ([]*Wrapper, error) { - modules, err := mb.NewModules(modulesConfig, r) - if err != nil { - return nil, err - } - - // Wrap the Modules and MetricSet's. - var wrappers []*Wrapper - var errs multierror.Errors - for k, v := range modules { - debugf("Initializing Module type '%s': %T=%+v", k.Name(), k, k) - f, err := processors.New(k.Config().Filters) - if err != nil { - errs = append(errs, errors.Wrapf(err, "module %s", k.Name())) - continue + for i, ms := range metricsets { + wrapper.metricSets[i] = &metricSetWrapper{ + MetricSet: ms, + module: wrapper, + stats: getMetricSetStats(wrapper.Name(), ms.Name()), } - - mw := &Wrapper{ - Module: k, - filters: f, - maxStartDelay: maxStartDelay, - } - wrappers = append(wrappers, mw) - - msws := make([]*metricSetWrapper, 0, len(v)) - for _, ms := range v { - debugf("Initializing MetricSet type '%s/%s' for host '%s': %T=%+v", - ms.Module().Name(), ms.Name(), ms.Host(), ms, ms) - - msw := &metricSetWrapper{ - MetricSet: ms, - module: mw, - stats: getMetricSetStats(mw.Name(), ms.Name()), - } - msws = append(msws, msw) - } - mw.metricSets = msws } - return wrappers, errs.Err() + return wrapper, nil } // Wrapper methods @@ -132,10 +93,10 @@ func NewWrappers(maxStartDelay time.Duration, modulesConfig []*common.Config, r // prevent blocking the operation of the MetricSets. // // Start should be called only once in the life of a Wrapper. -func (mw *Wrapper) Start(done <-chan struct{}) <-chan common.MapStr { +func (mw *Wrapper) Start(done <-chan struct{}) <-chan beat.Event { debugf("Starting %s", mw) - out := make(chan common.MapStr, 1) + out := make(chan beat.Event, 1) // Start one worker per MetricSet + host combination. var wg sync.WaitGroup @@ -187,7 +148,7 @@ func (mw *Wrapper) Hash() uint64 { // metricSetWrapper methods -func (msw *metricSetWrapper) run(done <-chan struct{}, out chan<- common.MapStr) { +func (msw *metricSetWrapper) run(done <-chan struct{}, out chan<- beat.Event) { defer logp.Recover(fmt.Sprintf("recovered from panic while fetching "+ "'%s/%s' for host '%s'", msw.module.Name(), msw.Name(), msw.Host())) @@ -306,7 +267,7 @@ func (msw *metricSetWrapper) String() string { type eventReporter struct { msw *metricSetWrapper done <-chan struct{} - out chan<- common.MapStr + out chan<- beat.Event start time.Time // Start time of the current fetch (or zero for push sources). } @@ -354,19 +315,17 @@ func (r *eventReporter) ErrorWith(err error, meta common.MapStr) bool { return false } - if event != nil { // event can be nil if it was dropped by processors - if !writeEvent(r.done, r.out, event) { - return false - } - r.msw.stats.events.Add(1) + if !writeEvent(r.done, r.out, event) { + return false } + r.msw.stats.events.Add(1) return true } // other utility functions -func writeEvent(done <-chan struct{}, out chan<- common.MapStr, event common.MapStr) bool { +func writeEvent(done <-chan struct{}, out chan<- beat.Event, event beat.Event) bool { select { case <-done: return false diff --git a/metricbeat/mb/testing/data_generator.go b/metricbeat/mb/testing/data_generator.go index 38c875597902..729c521a109e 100644 --- a/metricbeat/mb/testing/data_generator.go +++ b/metricbeat/mb/testing/data_generator.go @@ -1,7 +1,6 @@ package testing import ( - "encoding/json" "flag" "fmt" "io/ioutil" @@ -10,6 +9,8 @@ import ( "time" "github.com/elastic/beats/libbeat/common" + "github.com/elastic/beats/libbeat/outputs/codec/json" + "github.com/elastic/beats/libbeat/publisher/beat" "github.com/elastic/beats/metricbeat/mb" "github.com/elastic/beats/metricbeat/mb/module" ) @@ -60,7 +61,7 @@ func WriteEvents(f mb.EventsFetcher, t testing.TB) error { // CreateFullEvent builds a full event given the data generated by a MetricSet. // This simulates the output of Metricbeat as if it were // 2016-05-23T08:05:34.853Z and the hostname is host.example.com. -func CreateFullEvent(ms mb.MetricSet, metricSetData common.MapStr) common.MapStr { +func CreateFullEvent(ms mb.MetricSet, metricSetData common.MapStr) beat.Event { startTime, err := time.Parse(time.RFC3339Nano, "2016-05-23T08:05:34.853Z") if err != nil { panic(err) @@ -79,13 +80,13 @@ func CreateFullEvent(ms mb.MetricSet, metricSetData common.MapStr) common.MapStr if err != nil { panic(err) } - fullEvent["beat"] = common.MapStr{ + fullEvent.Fields["beat"] = common.MapStr{ "name": "host.example.com", "hostname": "host.example.com", } // Delete meta data as not needed for the event output here. - delete(fullEvent, common.EventMetadataKey) + delete(fullEvent.Fields, common.EventMetadataKey) return fullEvent } @@ -93,7 +94,7 @@ func CreateFullEvent(ms mb.MetricSet, metricSetData common.MapStr) common.MapStr // WriteEventToDataJSON writes the given event as "pretty" JSON to // a ./_meta/data.json file. If the -data CLI flag is unset or false then the // method is a no-op. -func WriteEventToDataJSON(t testing.TB, fullEvent common.MapStr) { +func WriteEventToDataJSON(t testing.TB, fullEvent beat.Event) { if !*dataFlag { return } @@ -103,7 +104,8 @@ func WriteEventToDataJSON(t testing.TB, fullEvent common.MapStr) { t.Fatal(err) } - output, err := json.MarshalIndent(fullEvent, "", " ") + // use json output codec to encode event to json + output, err := json.New(true).Encode("noindex", &fullEvent) if err != nil { t.Fatal(err) } diff --git a/metricbeat/mb/testing/modules.go b/metricbeat/mb/testing/modules.go index 370133062673..9209dcd9df9d 100644 --- a/metricbeat/mb/testing/modules.go +++ b/metricbeat/mb/testing/modules.go @@ -43,8 +43,6 @@ import ( "github.com/elastic/beats/libbeat/common" "github.com/elastic/beats/metricbeat/mb" - - "github.com/stretchr/testify/assert" ) type TestModule struct { @@ -74,28 +72,23 @@ func newMetricSet(t testing.TB, config interface{}) mb.MetricSet { if err != nil { t.Fatal(err) } - m, err := mb.NewModules([]*common.Config{c}, mb.Registry) + m, metricsets, err := mb.NewModule(c, mb.Registry) if err != nil { t.Fatal(err) } - if !assert.Len(t, m, 1) { - t.FailNow() + if m == nil { + t.Fatal("no module instantiated") } - var metricSet mb.MetricSet - for _, v := range m { - if !assert.Len(t, v, 1) { - t.FailNow() - } - - metricSet = v[0] - break + if len(metricsets) != 1 { + t.Fatal("invalid number of metricsets instantiated") } - if !assert.NotNil(t, metricSet) { - t.FailNow() + metricset := metricsets[0] + if metricset == nil { + t.Fatal("metricset is nil") } - return metricSet + return metricset } // NewEventFetcher instantiates a new EventFetcher using the given diff --git a/metricbeat/module/mysql/status/status_test.go b/metricbeat/module/mysql/status/status_test.go index 9d550f7d92fb..0e275c841bd5 100644 --- a/metricbeat/module/mysql/status/status_test.go +++ b/metricbeat/module/mysql/status/status_test.go @@ -67,7 +67,7 @@ func TestConfigValidation(t *testing.T) { t.Fatal(err) } - _, err = mb.NewModules([]*common.Config{c}, mb.Registry) + _, _, err = mb.NewModule(c, mb.Registry) if err != nil && test.err == "" { t.Errorf("unexpected error in testcase %d: %v", i, err) continue diff --git a/metricbeat/module/system/cpu/cpu_test.go b/metricbeat/module/system/cpu/cpu_test.go index f1331c4a46ae..66efa36d6abc 100644 --- a/metricbeat/module/system/cpu/cpu_test.go +++ b/metricbeat/module/system/cpu/cpu_test.go @@ -17,12 +17,12 @@ func TestData(t *testing.T) { } time.Sleep(1 * time.Second) - event, err := f.Fetch() + fields, err := f.Fetch() if err != nil { t.Fatal(err) } - event = mbtest.CreateFullEvent(f, event) + event := mbtest.CreateFullEvent(f, fields) mbtest.WriteEventToDataJSON(t, event) } diff --git a/metricbeat/tests/system/config/metricbeat.yml.j2 b/metricbeat/tests/system/config/metricbeat.yml.j2 index 4ca002b67e22..227a00eae4ab 100644 --- a/metricbeat/tests/system/config/metricbeat.yml.j2 +++ b/metricbeat/tests/system/config/metricbeat.yml.j2 @@ -53,10 +53,10 @@ metricbeat.modules: processes: {{ m.processes }} {% endif -%} - {% if m.filters is defined -%} - filters: - {% for f in m.filters -%} - {% for k, v in f.items() -%} + {% if m.processors is defined -%} + processors: + {% for p in m.processors -%} + {% for k, v in p.items() -%} - {{ k }}.fields: [ {%- for field in v -%} '{{ field }}' {%- if not loop.last %}, {% endif -%} diff --git a/metricbeat/tests/system/test_redis.py b/metricbeat/tests/system/test_redis.py index 913c29f2b352..dfd0d80f9e82 100644 --- a/metricbeat/tests/system/test_redis.py +++ b/metricbeat/tests/system/test_redis.py @@ -81,18 +81,20 @@ def test_keysace(self): @unittest.skipUnless(metricbeat.INTEGRATION_TESTS, "integration test") @attr('integration') - def test_filters(self): + def test_module_processors(self): """ - Test filters for Redis info event. + Test local processors for Redis info event. """ fields = ["clients", "cpu"] + eventFields = ['beat', 'metricset'] + eventFields += ['redis.info.' + f for f in fields] self.render_config_template(modules=[{ "name": "redis", "metricsets": ["info"], "hosts": self.get_hosts(), "period": "5s", - "filters": [{ - "include_fields": fields, + "processors": [{ + "include_fields": eventFields, }], }]) proc = self.start_beat()