From f3079d957f308136ee82d39001a71c022060c6f3 Mon Sep 17 00:00:00 2001 From: Andrew Kroh Date: Fri, 7 Apr 2017 03:11:54 -0400 Subject: [PATCH] Add new MetricSet interfaces for Module Developers (#3908) This PR adds new interfaces for MetricSet developers. These interfaces provided additional flexibility for MetricSet implementations. The new interfaces are: - `Closer` - With Metricbeat gaining the ability to dynamically load and unload modules there became a need to properly release resources opened by a MetricSet. If a MetricSet implements the `Closer` interface then the `Close() error` method will be invoked when the MetricSet is unloaded. - `ReportingFetcher` - Some MetricSets needed to report multiple errors, but the existing `Fetch` methods only allowed a single error to be returned. `ReportingFetcher` passes a callback interface to the MetricSet that allows it to report any combination of events, errors, or errors with metadata. - `PushMetricSet` - This is for push event sources. All the current MetricSet implementations are driven by periodic `Fetch` callbacks triggered by timer. A `PushMetricSet` does not receive periodic callbacks, but instead has a `Run` method. Implementations can forward events as they are received from the underlying source. --- CHANGELOG.asciidoc | 1 + metricbeat/mb/builders.go | 16 +- metricbeat/mb/example_metricset_test.go | 31 ++-- metricbeat/mb/mb.go | 45 ++++- metricbeat/mb/mb_test.go | 155 ++++++++++++++++- metricbeat/mb/module/event.go | 4 +- metricbeat/mb/module/event_test.go | 14 ++ metricbeat/mb/module/example_test.go | 8 +- metricbeat/mb/module/runner_test.go | 2 +- metricbeat/mb/module/wrapper.go | 211 ++++++++++++++---------- metricbeat/mb/module/wrapper_test.go | 137 +++++++++++++-- 11 files changed, 504 insertions(+), 120 deletions(-) diff --git a/CHANGELOG.asciidoc b/CHANGELOG.asciidoc index 5dc1102a834..1e6e669f8ed 100644 --- a/CHANGELOG.asciidoc +++ b/CHANGELOG.asciidoc @@ -116,6 +116,7 @@ https://github.com/elastic/beats/compare/v5.1.1...master[Check the HEAD diff] - Add experimental metricset `perfmon` to Windows module. {pull}3758[3758] - Add memcached module with stats metricset. {pull}3693[3693] - Add the `process.cmdline.cache.enabled` config option to the System Process Metricset. {pull}3891[3891] +- Add new MetricSet interfaces for developers (`Closer`, `ReportingFetcher`, and `PushMetricSet`). {pull}3908[3908] *Packetbeat* - Add `fields` and `fields_under_root` to packetbeat protocols configurations. {pull}3518[3518] diff --git a/metricbeat/mb/builders.go b/metricbeat/mb/builders.go index 90213315f3d..844884a8978 100644 --- a/metricbeat/mb/builders.go +++ b/metricbeat/mb/builders.go @@ -227,15 +227,25 @@ func mustImplementFetcher(ms MetricSet) error { ifcs = append(ifcs, "EventsFetcher") } + if _, ok := ms.(ReportingMetricSet); ok { + ifcs = append(ifcs, "ReportingMetricSet") + } + + if _, ok := ms.(PushMetricSet); ok { + ifcs = append(ifcs, "PushMetricSet") + } + switch len(ifcs) { case 0: - return fmt.Errorf("MetricSet '%s/%s' does not implement a Fetcher "+ - "interface", ms.Module().Name(), ms.Name()) + return fmt.Errorf("MetricSet '%s/%s' does not implement an event "+ + "producing interface (EventFetcher, EventsFetcher, "+ + "ReportingMetricSet, or PushMetricSet)", + ms.Module().Name(), ms.Name()) case 1: return nil default: return fmt.Errorf("MetricSet '%s/%s' can only implement a single "+ - "Fetcher interface, but implements %v", ms.Module().Name(), + "event producing interface, but implements %v", ms.Module().Name(), ms.Name(), ifcs) } } diff --git a/metricbeat/mb/example_metricset_test.go b/metricbeat/mb/example_metricset_test.go index f8dffc69eae..184db2f3f1f 100644 --- a/metricbeat/mb/example_metricset_test.go +++ b/metricbeat/mb/example_metricset_test.go @@ -8,7 +8,9 @@ import ( "github.com/elastic/beats/metricbeat/mb/parse" ) -var hostParser = parse.URLHostParserBuilder{DefaultScheme: "http"}.Build() +var hostParser = parse.URLHostParserBuilder{ + DefaultScheme: "http", +}.Build() func init() { // Register the MetricSetFactory function for the "status" MetricSet. @@ -26,14 +28,23 @@ func NewMetricSet(base mb.BaseMetricSet) (mb.MetricSet, error) { return &MetricSet{BaseMetricSet: base}, nil } -func (ms *MetricSet) Fetch() (common.MapStr, error) { - // Fetch data from the host (using ms.HostData().URI) and return the data. - return common.MapStr{ - "someParam": "value", - "otherParam": 42, - }, nil +// Fetch will be called periodically by the framework. +func (ms *MetricSet) Fetch(report mb.Reporter) { + // Fetch data from the host at ms.HostData().URI and return the data. + data, err := common.MapStr{ + "some_metric": 18.0, + "answer_to_everything": 42, + }, error(nil) + if err != nil { + // Report an error if it occurs. + report.Error(err) + return + } + + // Otherwise report the collected data. + report.Event(data) } -// ExampleMetricSetFactory demonstrates how to register a MetricSetFactory -// and unpack additional configuration data. -func ExampleMetricSetFactory() {} +// ExampleReportingMetricSet demonstrates how to register a MetricSetFactory +// and implement a ReportingMetricSet. +func ExampleReportingMetricSet() {} diff --git a/metricbeat/mb/mb.go b/metricbeat/mb/mb.go index ce6c4abf9e9..c7326fee0b9 100644 --- a/metricbeat/mb/mb.go +++ b/metricbeat/mb/mb.go @@ -71,19 +71,62 @@ type MetricSet interface { HostData() HostData // HostData returns the parsed host data. } +// Closer is an optional interface that a MetricSet can implement in order to +// cleanup any resources it has open at shutdown. +type Closer interface { + Close() error +} + // EventFetcher is a MetricSet that returns a single event when collecting data. +// Use ReportingMetricSet for new MetricSet implementations. type EventFetcher interface { MetricSet Fetch() (common.MapStr, error) } // EventsFetcher is a MetricSet that returns a multiple events when collecting -// data. +// data. Use ReportingMetricSet for new MetricSet implementations. type EventsFetcher interface { MetricSet Fetch() ([]common.MapStr, error) } +// Reporter is used by a MetricSet to report events, errors, or errors with +// metadata. The methods return false if and only if publishing failed because +// the MetricSet is being closed. +type Reporter interface { + Event(event common.MapStr) bool // Event reports a single successful event. + ErrorWith(err error, meta common.MapStr) bool // ErrorWith reports a single error event with the additional metadata. + Error(err error) bool // Error reports a single error event. +} + +// ReportingMetricSet is a MetricSet that reports events or errors through the +// Reporter interface. Fetch is called periodically to collect events. +type ReportingMetricSet interface { + MetricSet + Fetch(r Reporter) +} + +// PushReporter is used by a MetricSet to report events, errors, or errors with +// metadata. It provides a done channel used to signal that reporter should +// stop. +type PushReporter interface { + Reporter + + // Done returns a channel that's closed when work done on behalf of this + // reporter should be canceled. + Done() <-chan struct{} +} + +// PushMetricSet is a MetricSet that pushes events (rather than pulling them +// periodically via a Fetch callback). Run is invoked to start the event +// subscription and it should block until the MetricSet is ready to stop or +// the PushReporter's done channel is closed. +type PushMetricSet interface { + MetricSet + Run(r PushReporter) +} + // HostData contains values parsed from the 'host' configuration. Other // configuration data like protocols, usernames, and passwords may also be // used to construct this HostData data. diff --git a/metricbeat/mb/mb_test.go b/metricbeat/mb/mb_test.go index 498c6bdc64d..1f2c2fb7e74 100644 --- a/metricbeat/mb/mb_test.go +++ b/metricbeat/mb/mb_test.go @@ -20,6 +20,8 @@ func (m testModule) ParseHost(host string) (HostData, error) { return m.hostParser(host) } +// EventFetcher + type testMetricSet struct { BaseMetricSet } @@ -28,6 +30,32 @@ func (m *testMetricSet) Fetch() (common.MapStr, error) { return nil, nil } +// EventsFetcher + +type testMetricSetEventsFetcher struct { + BaseMetricSet +} + +func (m *testMetricSetEventsFetcher) Fetch() ([]common.MapStr, error) { + return nil, nil +} + +// ReportingFetcher + +type testMetricSetReportingFetcher struct { + BaseMetricSet +} + +func (m *testMetricSetReportingFetcher) Fetch(r Reporter) {} + +// PushMetricSet + +type testPushMetricSet struct { + BaseMetricSet +} + +func (m *testPushMetricSet) Run(r PushReporter) {} + func TestModuleConfig(t *testing.T) { tests := []struct { in interface{} @@ -169,7 +197,7 @@ func TestNewModulesDuplicateHosts(t *testing.T) { assert.Error(t, err) } -func TestNewModules(t *testing.T) { +func TestNewModulesHostParser(t *testing.T) { const ( name = "HostParser" host = "example.com" @@ -235,6 +263,131 @@ func TestNewModules(t *testing.T) { } assert.FailNow(t, "no modules found") }) + +} + +func TestNewModulesMetricSetTypes(t *testing.T) { + r := newTestRegistry(t) + + factory := func(base BaseMetricSet) (MetricSet, error) { + return &testMetricSet{base}, nil + } + + name := "EventFetcher" + if err := r.AddMetricSet(moduleName, name, factory); err != nil { + t.Fatal(err) + } + + t.Run(name+" MetricSet", func(t *testing.T) { + c := newConfig(t, map[string]interface{}{ + "module": moduleName, + "metricsets": []string{name}, + }) + + modules, err := NewModules(c, r) + if err != nil { + t.Fatal(err) + } + assert.Len(t, modules, 1) + + for _, metricSets := range modules { + if assert.Len(t, metricSets, 1) { + metricSet := metricSets[0] + _, ok := metricSet.(EventFetcher) + assert.True(t, ok, name+" not implemented") + } + } + }) + + factory = func(base BaseMetricSet) (MetricSet, error) { + return &testMetricSetEventsFetcher{base}, nil + } + + name = "EventsFetcher" + if err := r.AddMetricSet(moduleName, name, factory); err != nil { + t.Fatal(err) + } + + t.Run(name+" MetricSet", func(t *testing.T) { + c := newConfig(t, map[string]interface{}{ + "module": moduleName, + "metricsets": []string{name}, + }) + + modules, err := NewModules(c, r) + if err != nil { + t.Fatal(err) + } + assert.Len(t, modules, 1) + + for _, metricSets := range modules { + if assert.Len(t, metricSets, 1) { + metricSet := metricSets[0] + _, ok := metricSet.(EventsFetcher) + assert.True(t, ok, name+" not implemented") + } + } + }) + + factory = func(base BaseMetricSet) (MetricSet, error) { + return &testMetricSetReportingFetcher{base}, nil + } + + name = "ReportingFetcher" + if err := r.AddMetricSet(moduleName, name, factory); err != nil { + t.Fatal(err) + } + + t.Run(name+" MetricSet", func(t *testing.T) { + c := newConfig(t, map[string]interface{}{ + "module": moduleName, + "metricsets": []string{name}, + }) + + modules, err := NewModules(c, r) + if err != nil { + t.Fatal(err) + } + assert.Len(t, modules, 1) + + for _, metricSets := range modules { + if assert.Len(t, metricSets, 1) { + metricSet := metricSets[0] + _, ok := metricSet.(ReportingMetricSet) + assert.True(t, ok, name+" not implemented") + } + } + }) + + factory = func(base BaseMetricSet) (MetricSet, error) { + return &testPushMetricSet{base}, nil + } + + name = "Push" + if err := r.AddMetricSet(moduleName, name, factory); err != nil { + t.Fatal(err) + } + + t.Run(name+" MetricSet", func(t *testing.T) { + c := newConfig(t, map[string]interface{}{ + "module": moduleName, + "metricsets": []string{name}, + }) + + modules, err := NewModules(c, r) + if err != nil { + t.Fatal(err) + } + assert.Len(t, modules, 1) + + for _, metricSets := range modules { + if assert.Len(t, metricSets, 1) { + metricSet := metricSets[0] + _, ok := metricSet.(PushMetricSet) + assert.True(t, ok, name+" not implemented") + } + } + }) } // TestNewBaseModuleFromModuleConfigStruct tests the creation a new BaseModule. diff --git a/metricbeat/mb/module/event.go b/metricbeat/mb/module/event.go index e360b064ec6..d2d28be43d7 100644 --- a/metricbeat/mb/module/event.go +++ b/metricbeat/mb/module/event.go @@ -59,7 +59,9 @@ func (b EventBuilder) Build() (common.MapStr, error) { metricsetData := common.MapStr{ "module": b.ModuleName, "name": b.MetricSetName, - "rtt": b.FetchDuration.Nanoseconds() / int64(time.Microsecond), + } + if b.FetchDuration != 0 { + metricsetData["rtt"] = b.FetchDuration.Nanoseconds() / int64(time.Microsecond) } namespace := b.MetricSetName diff --git a/metricbeat/mb/module/event_test.go b/metricbeat/mb/module/event_test.go index bc24aabd979..20f77148044 100644 --- a/metricbeat/mb/module/event_test.go +++ b/metricbeat/mb/module/event_test.go @@ -78,3 +78,17 @@ func TestEventBuilderNoHost(t *testing.T) { _, found := event["metricset-host"] assert.False(t, found) } + +func TestEventBuildNoRTT(t *testing.T) { + b := builder + b.FetchDuration = 0 + + event, err := b.Build() + if err != nil { + t.Fatal(err) + } + + metricset := event["metricset"].(common.MapStr) + _, found := metricset["rtt"] + assert.False(t, found, "found rtt") +} diff --git a/metricbeat/mb/module/example_test.go b/metricbeat/mb/module/example_test.go index 91cb007958a..53b3e983a89 100644 --- a/metricbeat/mb/module/example_test.go +++ b/metricbeat/mb/module/example_test.go @@ -20,7 +20,7 @@ func ExampleWrapper() { // Build a configuration object. config, err := common.NewConfigFrom(map[string]interface{}{ "module": moduleName, - "metricsets": []string{metricSetName}, + "metricsets": []string{eventFetcherName}, }) if err != nil { fmt.Println("Error:", err) @@ -68,13 +68,13 @@ func ExampleWrapper() { // "Tags": null // }, // "fake": { - // "status": { + // "eventfetcher": { // "metric": 1 // } // }, // "metricset": { // "module": "fake", - // "name": "status", + // "name": "eventfetcher", // "rtt": 111 // }, // "type": "metricsets" @@ -91,7 +91,7 @@ func ExampleRunner() { config, err := common.NewConfigFrom(map[string]interface{}{ "module": moduleName, - "metricsets": []string{metricSetName}, + "metricsets": []string{eventFetcherName}, }) if err != nil { return diff --git a/metricbeat/mb/module/runner_test.go b/metricbeat/mb/module/runner_test.go index 7ad6dc6645a..888ba006484 100644 --- a/metricbeat/mb/module/runner_test.go +++ b/metricbeat/mb/module/runner_test.go @@ -19,7 +19,7 @@ func TestRunner(t *testing.T) { config, err := common.NewConfigFrom(map[string]interface{}{ "module": moduleName, - "metricsets": []string{metricSetName}, + "metricsets": []string{eventFetcherName}, }) if err != nil { t.Fatal(err) diff --git a/metricbeat/mb/module/wrapper.go b/metricbeat/mb/module/wrapper.go index 803a8c92a98..ab71232f0c5 100644 --- a/metricbeat/mb/module/wrapper.go +++ b/metricbeat/mb/module/wrapper.go @@ -49,13 +49,13 @@ type metricSetWrapper struct { stats *stats // stats for this MetricSet. } -// stats bundles common metricset stats +// stats bundles common metricset stats. type stats struct { - key string // full stats key - ref uint32 // number of modules/metricsets reusing stats instance - success *monitoring.Int - failures *monitoring.Int - events *monitoring.Int + key string // full stats key + ref uint32 // number of modules/metricsets reusing stats instance + success *monitoring.Int // Total success events. + failures *monitoring.Int // Total error events. + events *monitoring.Int // Total events published. } // NewWrapper create a new Module and its associated MetricSets based @@ -142,7 +142,8 @@ func (mw *Wrapper) Start(done <-chan struct{}) <-chan common.MapStr { go func(msw *metricSetWrapper) { defer releaseStats(msw.stats) defer wg.Done() - msw.startFetching(done, out) + defer msw.close() + msw.run(done, out) }(msw) } @@ -170,7 +171,8 @@ func (mw *Wrapper) Hash() uint64 { } var err error - // Config is unpacked into map[string]interface{} to also take metricset configs into account for the hash + // Config is unpacked into map[string]interface{} to also take metricset + // configs into account for the hash. var c map[string]interface{} mw.UnpackConfig(&c) mw.configHash, err = hashstructure.Hash(c, nil) @@ -182,34 +184,48 @@ func (mw *Wrapper) Hash() uint64 { // metricSetWrapper methods -// startFetching performs an immediate fetch for the MetricSet then it -// begins a continuous timer scheduled loop to fetch data. To stop the loop the -// done channel should be closed. -func (msw *metricSetWrapper) startFetching( - done <-chan struct{}, - out chan<- common.MapStr, -) { +func (msw *metricSetWrapper) run(done <-chan struct{}, out chan<- common.MapStr) { + defer logp.Recover(fmt.Sprintf("recovered from panic while fetching "+ + "'%s/%s' for host '%s'", msw.module.Name(), msw.Name(), msw.Host())) + debugf("Starting %s", msw) defer debugf("Stopped %s", msw) - // Fetch immediately. - err := msw.fetch(done, out) - if err != nil { - logp.Err("%v", err) + // Events and errors are reported through this. + reporter := &eventReporter{ + msw: msw, + out: out, + done: done, + } + + switch ms := msw.MetricSet.(type) { + case mb.PushMetricSet: + ms.Run(reporter) + case mb.EventFetcher, mb.EventsFetcher, mb.ReportingMetricSet: + msw.startPeriodicFetching(reporter) + default: + // Earlier startup stages prevent this from happening. + logp.Err("MetricSet '%s/%s' does not implement an event producing interface", + msw.Module().Name(), msw.Name()) } +} + +// startPeriodicFetching performs an immediate fetch for the MetricSet then it +// begins a continuous timer scheduled loop to fetch data. To stop the loop the +// done channel should be closed. +func (msw *metricSetWrapper) startPeriodicFetching(reporter *eventReporter) { + // Fetch immediately. + msw.fetch(reporter) // Start timer for future fetches. t := time.NewTicker(msw.Module().Config().Period) defer t.Stop() for { select { - case <-done: + case <-reporter.done: return case <-t.C: - err := msw.fetch(done, out) - if err != nil { - logp.Err("%v", err) - } + msw.fetch(reporter) } } } @@ -217,93 +233,110 @@ func (msw *metricSetWrapper) startFetching( // fetch invokes the appropriate Fetch method for the MetricSet and publishes // the result using the publisher client. This method will recover from panics // and log a stack track if one occurs. -func (msw *metricSetWrapper) fetch(done <-chan struct{}, out chan<- common.MapStr) error { - defer logp.Recover(fmt.Sprintf("recovered from panic while fetching "+ - "'%s/%s' for host '%s'", msw.module.Name(), msw.Name(), msw.Host())) - +func (msw *metricSetWrapper) fetch(reporter *eventReporter) { switch fetcher := msw.MetricSet.(type) { case mb.EventFetcher: - event, err := msw.singleEventFetch(fetcher) - if err != nil { - return err - } - if event != nil { - msw.stats.events.Add(1) - writeEvent(done, out, event) - } + msw.singleEventFetch(fetcher, reporter) case mb.EventsFetcher: - events, err := msw.multiEventFetch(fetcher) - if err != nil { - return err - } - for _, event := range events { - msw.stats.events.Add(1) - if !writeEvent(done, out, event) { - break - } - } + msw.multiEventFetch(fetcher, reporter) + case mb.ReportingMetricSet: + msw.reportingFetch(fetcher, reporter) default: - return fmt.Errorf("MetricSet '%s/%s' does not implement a Fetcher "+ - "interface", msw.Module().Name(), msw.Name()) + panic(fmt.Sprintf("unexpected fetcher type for %v", msw)) } - - return nil } -func (msw *metricSetWrapper) singleEventFetch(fetcher mb.EventFetcher) (common.MapStr, error) { - start := time.Now() +func (msw *metricSetWrapper) singleEventFetch(fetcher mb.EventFetcher, reporter *eventReporter) { + reporter.startFetchTimer() event, err := fetcher.Fetch() - elapsed := time.Since(start) + reporter.ErrorWith(err, event) +} - if err == nil { - msw.stats.success.Add(1) - } else { - msw.stats.failures.Add(1) +func (msw *metricSetWrapper) multiEventFetch(fetcher mb.EventsFetcher, reporter *eventReporter) { + reporter.startFetchTimer() + events, err := fetcher.Fetch() + for _, event := range events { + reporter.ErrorWith(err, event) } +} + +func (msw *metricSetWrapper) reportingFetch(fetcher mb.ReportingMetricSet, reporter *eventReporter) { + reporter.startFetchTimer() + fetcher.Fetch(reporter) +} - if event, err = createEvent(msw, event, err, start, elapsed); err != nil { - return nil, errors.Wrap(err, "createEvent failed") +// close closes the underlying MetricSet if it implements the mb.Closer +// interface. +func (msw *metricSetWrapper) close() error { + if closer, ok := msw.MetricSet.(mb.Closer); ok { + return closer.Close() } + return nil +} - return event, nil +// String returns a string representation of metricSetWrapper. +func (msw *metricSetWrapper) String() string { + return fmt.Sprintf("metricSetWrapper[module=%s, name=%s, host=%s]", + msw.module.Name(), msw.Name(), msw.Host()) } -func (msw *metricSetWrapper) multiEventFetch(fetcher mb.EventsFetcher) ([]common.MapStr, error) { - start := time.Now() - events, err := fetcher.Fetch() - elapsed := time.Since(start) +// Reporter implementation - var rtnEvents []common.MapStr - if err == nil { - msw.stats.success.Add(1) +// eventReporter implements the Reporter interface which is a callback interface +// used by MetricSet implementations to report an event(s), an error, or an error +// with some additional metadata. +type eventReporter struct { + msw *metricSetWrapper + done <-chan struct{} + out chan<- common.MapStr + start time.Time // Start time of the current fetch (or zero for push sources). +} - for _, event := range events { - if event, err = createEvent(msw, event, nil, start, elapsed); err != nil { - return nil, errors.Wrap(err, "createEvent failed") - } - if event != nil { - rtnEvents = append(rtnEvents, event) - } - } +// startFetchTimer demarcates the start of a new fetch. The elapsed time of a +// fetch is computed based on the time of this call. +func (r *eventReporter) startFetchTimer() { + r.start = time.Now() +} + +func (r *eventReporter) Done() <-chan struct{} { + return r.done +} + +func (r *eventReporter) Event(event common.MapStr) bool { + return r.ErrorWith(nil, event) +} + +func (r *eventReporter) Error(err error) bool { + return r.ErrorWith(err, nil) +} + +func (r *eventReporter) ErrorWith(err error, meta common.MapStr) bool { + timestamp := r.start + elapsed := time.Duration(0) + + if !timestamp.IsZero() { + elapsed = time.Since(timestamp) } else { - msw.stats.failures.Add(1) + timestamp = time.Now() + } - event, err := createEvent(msw, nil, err, start, elapsed) - if err != nil { - return nil, errors.Wrap(err, "createEvent failed") - } - if event != nil { - rtnEvents = append(rtnEvents, event) - } + if err == nil { + r.msw.stats.success.Add(1) + } else { + r.msw.stats.failures.Add(1) } - return rtnEvents, nil -} + event, err := createEvent(r.msw, meta, err, timestamp, elapsed) + if err != nil { + logp.Err("createEvent failed: %v", err) + return false + } -// String returns a string representation of metricSetWrapper. -func (msw *metricSetWrapper) String() string { - return fmt.Sprintf("metricSetWrapper[module=%s, name=%s, host=%s]", - msw.module.Name(), msw.Name(), msw.Host()) + if !writeEvent(r.done, r.out, event) { + return false + } + r.msw.stats.events.Add(1) + return true } // other utility functions diff --git a/metricbeat/mb/module/wrapper_test.go b/metricbeat/mb/module/wrapper_test.go index f04924dfb93..cc9326bc67c 100644 --- a/metricbeat/mb/module/wrapper_test.go +++ b/metricbeat/mb/module/wrapper_test.go @@ -14,29 +14,75 @@ import ( ) const ( - moduleName = "fake" - metricSetName = "status" + moduleName = "fake" + eventFetcherName = "EventFetcher" + reportingFetcherName = "ReportingFetcher" + pushMetricSetName = "PushMetricSet" ) // fakeMetricSet func init() { - if err := mb.Registry.AddMetricSet(moduleName, metricSetName, newFakeMetricSet); err != nil { + if err := mb.Registry.AddMetricSet(moduleName, eventFetcherName, newFakeEventFetcher); err != nil { + panic(err) + } + if err := mb.Registry.AddMetricSet(moduleName, reportingFetcherName, newFakeReportingFetcher); err != nil { + panic(err) + } + if err := mb.Registry.AddMetricSet(moduleName, pushMetricSetName, newFakePushMetricSet); err != nil { panic(err) } } -type fakeMetricSet struct { +// EventFetcher + +type fakeEventFetcher struct { mb.BaseMetricSet } -func (ms *fakeMetricSet) Fetch() (common.MapStr, error) { +func (ms *fakeEventFetcher) Fetch() (common.MapStr, error) { t, _ := time.Parse(time.RFC3339, "2016-05-10T23:27:58.485Z") return common.MapStr{"@timestamp": common.Time(t), "metric": 1}, nil } -func newFakeMetricSet(base mb.BaseMetricSet) (mb.MetricSet, error) { - return &fakeMetricSet{BaseMetricSet: base}, nil +func (ms *fakeEventFetcher) Close() error { + return nil +} + +func newFakeEventFetcher(base mb.BaseMetricSet) (mb.MetricSet, error) { + return &fakeEventFetcher{BaseMetricSet: base}, nil +} + +// ReportingFetcher + +type fakeReportingFetcher struct { + mb.BaseMetricSet +} + +func (ms *fakeReportingFetcher) Fetch(r mb.Reporter) { + t, _ := time.Parse(time.RFC3339, "2016-05-10T23:27:58.485Z") + r.Event(common.MapStr{"@timestamp": common.Time(t), "metric": 1}) +} + +func newFakeReportingFetcher(base mb.BaseMetricSet) (mb.MetricSet, error) { + return &fakeReportingFetcher{BaseMetricSet: base}, nil +} + +// PushMetricSet + +type fakePushMetricSet struct { + mb.BaseMetricSet +} + +func (ms *fakePushMetricSet) Run(r mb.PushReporter) { + t, _ := time.Parse(time.RFC3339, "2016-05-10T23:27:58.485Z") + event := common.MapStr{"@timestamp": common.Time(t), "metric": 1} + r.Event(event) + <-r.Done() +} + +func newFakePushMetricSet(base mb.BaseMetricSet) (mb.MetricSet, error) { + return &fakePushMetricSet{BaseMetricSet: base}, nil } // test utilities @@ -44,7 +90,13 @@ func newFakeMetricSet(base mb.BaseMetricSet) (mb.MetricSet, error) { func newTestRegistry(t testing.TB) *mb.Register { r := mb.NewRegister() - if err := r.AddMetricSet(moduleName, metricSetName, newFakeMetricSet); err != nil { + if err := r.AddMetricSet(moduleName, eventFetcherName, newFakeEventFetcher); err != nil { + t.Fatal(err) + } + if err := r.AddMetricSet(moduleName, reportingFetcherName, newFakeReportingFetcher); err != nil { + t.Fatal(err) + } + if err := r.AddMetricSet(moduleName, pushMetricSetName, newFakePushMetricSet); err != nil { t.Fatal(err) } @@ -61,11 +113,11 @@ func newConfig(t testing.TB, moduleConfig interface{}) *common.Config { // test cases -func TestWrapper(t *testing.T) { +func TestWrapperOfEventFetcher(t *testing.T) { hosts := []string{"alpha", "beta"} c := newConfig(t, map[string]interface{}{ "module": moduleName, - "metricsets": []string{metricSetName}, + "metricsets": []string{eventFetcherName}, "hosts": hosts, }) @@ -93,3 +145,68 @@ func TestWrapper(t *testing.T) { } } } + +func TestWrapperOfReportingFetcher(t *testing.T) { + hosts := []string{"alpha", "beta"} + c := newConfig(t, map[string]interface{}{ + "module": moduleName, + "metricsets": []string{reportingFetcherName}, + "hosts": hosts, + }) + + m, err := module.NewWrapper(c, newTestRegistry(t)) + if err != nil { + t.Fatal(err) + } + + done := make(chan struct{}) + output := m.Start(done) + + <-output + <-output + close(done) + + // Validate that the channel is closed after receiving the two + // initial events. + select { + case _, ok := <-output: + if !ok { + // Channel is closed. + return + } else { + assert.Fail(t, "received unexpected event") + } + } +} + +func TestWrapperOfPushMetricSet(t *testing.T) { + hosts := []string{"alpha"} + c := newConfig(t, map[string]interface{}{ + "module": moduleName, + "metricsets": []string{pushMetricSetName}, + "hosts": hosts, + }) + + m, err := module.NewWrapper(c, newTestRegistry(t)) + if err != nil { + t.Fatal(err) + } + + done := make(chan struct{}) + output := m.Start(done) + + <-output + close(done) + + // Validate that the channel is closed after receiving the two + // initial events. + select { + case _, ok := <-output: + if !ok { + // Channel is closed. + return + } else { + assert.Fail(t, "received unexpected event") + } + } +}