From ef06eb0d3eec3ee9fe7c7306fca2080fb66938a0 Mon Sep 17 00:00:00 2001 From: Andrew Cholakian Date: Fri, 14 Sep 2018 11:33:20 -0500 Subject: [PATCH] Heartbeat Automatic Reload (#8023) Add automatic reloading for heartbeat config files. This deprecates the `watch.poll_file` options. This patch also fixes a potential source of races in code using `cfgfile/Runner` by making that interface implement `Stringer`, the reason being that by default `cfgfile/Runner` can recursively print the backing structure, which can trigger a race. (cherry picked from commit 037a4f2dd08f43afb1ad4f1c3d7b56d5f1f14e96) --- CHANGELOG.asciidoc | 4 + heartbeat/_meta/beat.reference.yml | 45 ++- heartbeat/beater/heartbeat.go | 85 +++-- heartbeat/beater/manager.go | 328 ------------------ heartbeat/config/config.go | 12 +- heartbeat/heartbeat.reference.yml | 45 ++- heartbeat/monitors/active/http/http.go | 2 +- heartbeat/monitors/active/http/http_test.go | 3 +- heartbeat/monitors/active/icmp/icmp.go | 2 +- heartbeat/monitors/active/tcp/tcp.go | 2 +- heartbeat/monitors/active/tcp/tcp_test.go | 5 +- heartbeat/monitors/factory.go | 48 +++ heartbeat/monitors/job.go | 30 ++ heartbeat/monitors/monitor.go | 242 +++++++++++++ heartbeat/monitors/monitors.go | 131 ------- heartbeat/monitors/plugin.go | 107 +++++- heartbeat/monitors/plugin_test.go | 249 +++++++++++++ heartbeat/monitors/pluginconf.go | 51 +++ heartbeat/monitors/task.go | 150 ++++++++ heartbeat/monitors/util.go | 21 +- .../tests/system/config/heartbeat.yml.j2 | 7 + heartbeat/tests/system/heartbeat.py | 23 +- heartbeat/tests/system/test_monitor.py | 17 - heartbeat/tests/system/test_reload.py | 121 +++++++ heartbeat/watcher/watch_config.go | 32 ++ libbeat/cfgfile/list_test.go | 4 + libbeat/cfgfile/reload.go | 8 + metricbeat/mb/module/runner.go | 5 + 28 files changed, 1223 insertions(+), 556 deletions(-) delete mode 100644 heartbeat/beater/manager.go create mode 100644 heartbeat/monitors/factory.go create mode 100644 heartbeat/monitors/job.go create mode 100644 heartbeat/monitors/monitor.go delete mode 100644 heartbeat/monitors/monitors.go create mode 100644 heartbeat/monitors/plugin_test.go create mode 100644 heartbeat/monitors/pluginconf.go create mode 100644 heartbeat/monitors/task.go create mode 100644 heartbeat/tests/system/test_reload.py create mode 100644 heartbeat/watcher/watch_config.go diff --git a/CHANGELOG.asciidoc b/CHANGELOG.asciidoc index 5b85861614f..cc4e4c7f29e 100644 --- a/CHANGELOG.asciidoc +++ b/CHANGELOG.asciidoc @@ -116,6 +116,9 @@ https://github.com/elastic/beats/compare/v6.4.0...6.x[Check the HEAD diff] *Winlogbeat* +*Heartbeat* + +- Add automatic config file reloading. {pull}8023[8023] ==== Deprecated @@ -124,6 +127,7 @@ https://github.com/elastic/beats/compare/v6.4.0...6.x[Check the HEAD diff] *Filebeat* *Heartbeat* +- watch.poll_file is now deprecated and superceded by automatic config file reloading. *Metricbeat* - Redis `info` `replication.master_offset` has been deprecated in favor of `replication.master.offset`.{pull}7695[7695] diff --git a/heartbeat/_meta/beat.reference.yml b/heartbeat/_meta/beat.reference.yml index 513cda15c53..80a1027601f 100644 --- a/heartbeat/_meta/beat.reference.yml +++ b/heartbeat/_meta/beat.reference.yml @@ -32,14 +32,6 @@ heartbeat.monitors: ipv6: true mode: any - # Configure file json file to be watched for changes to the monitor: - #watch.poll_file: - # Path to check for updates. - #path: - - # Interval between file file changed checks. - #interval: 5s - # Total running time per ping test. timeout: 16s @@ -62,6 +54,15 @@ heartbeat.monitors: # sub-dictionary. Default is false. #fields_under_root: false + # NOTE: THIS FEATURE IS DEPRECATED AND WILL BE REMOVED IN A FUTURE RELEASE + # Configure file json file to be watched for changes to the monitor: + #watch.poll_file: + # Path to check for updates. + #path: + + # Interval between file file changed checks. + #interval: 5s + - type: tcp # monitor type `tcp`. Connect via TCP and optionally verify endpoint # by sending/receiving a custom payload @@ -98,14 +99,6 @@ heartbeat.monitors: ipv6: true mode: any - # Configure file json file to be watched for changes to the monitor: - #watch.poll_file: - # Path to check for updates. - #path: - - # Interval between file file changed checks. - #interval: 5s - # List of ports to ping if host does not contain a port number # ports: [80, 9200, 5044] @@ -135,6 +128,16 @@ heartbeat.monitors: # Required TLS protocols #supported_protocols: ["TLSv1.0", "TLSv1.1", "TLSv1.2"] + # NOTE: THIS FEATURE IS DEPRECATED AND WILL BE REMOVED IN A FUTURE RELEASE + # Configure file json file to be watched for changes to the monitor: + #watch.poll_file: + # Path to check for updates. + #path: + + # Interval between file file changed checks. + #interval: 5s + + - type: http # monitor type `http`. Connect via HTTP an optionally verify response # Monitor name used for job name and document type @@ -205,6 +208,16 @@ heartbeat.monitors: # Required response contents. #body: + # NOTE: THIS FEATURE IS DEPRECATED AND WILL BE REMOVED IN A FUTURE RELEASE + # Configure file json file to be watched for changes to the monitor: + #watch.poll_file: + # Path to check for updates. + #path: + + # Interval between file file changed checks. + #interval: 5s + + heartbeat.scheduler: # Limit number of concurrent tasks executed by heartbeat. The task limit if # disabled if set to 0. The default is 0. diff --git a/heartbeat/beater/heartbeat.go b/heartbeat/beater/heartbeat.go index 036f48db598..fc6780a5984 100644 --- a/heartbeat/beater/heartbeat.go +++ b/heartbeat/beater/heartbeat.go @@ -21,33 +21,38 @@ import ( "fmt" "time" - "github.com/elastic/beats/libbeat/beat" - "github.com/elastic/beats/libbeat/common" - "github.com/elastic/beats/libbeat/logp" + "github.com/pkg/errors" "github.com/elastic/beats/heartbeat/config" "github.com/elastic/beats/heartbeat/monitors" "github.com/elastic/beats/heartbeat/scheduler" + "github.com/elastic/beats/libbeat/beat" + "github.com/elastic/beats/libbeat/cfgfile" + "github.com/elastic/beats/libbeat/common" "github.com/elastic/beats/libbeat/common/cfgwarn" + "github.com/elastic/beats/libbeat/logp" ) +// Heartbeat represents the root datastructure of this beat. type Heartbeat struct { done chan struct{} - - scheduler *scheduler.Scheduler - manager *monitorManager + // config is used for iterating over elements of the config. + config config.Config + scheduler *scheduler.Scheduler + monitorReloader *cfgfile.Reloader } -func New(b *beat.Beat, cfg *common.Config) (beat.Beater, error) { +// New creates a new heartbeat. +func New(b *beat.Beat, rawConfig *common.Config) (beat.Beater, error) { cfgwarn.Beta("Heartbeat is beta software") - config := config.DefaultConfig - if err := cfg.Unpack(&config); err != nil { + parsedConfig := config.DefaultConfig + if err := rawConfig.Unpack(&parsedConfig); err != nil { return nil, fmt.Errorf("Error reading config file: %v", err) } - limit := config.Scheduler.Limit - locationName := config.Scheduler.Location + limit := parsedConfig.Scheduler.Limit + locationName := parsedConfig.Scheduler.Location if locationName == "" { locationName = "Local" } @@ -56,23 +61,35 @@ func New(b *beat.Beat, cfg *common.Config) (beat.Beater, error) { return nil, err } - sched := scheduler.NewWithLocation(limit, location) - manager, err := newMonitorManager(b.Publisher, sched, monitors.Registry, config.Monitors) - if err != nil { - return nil, err - } + scheduler := scheduler.NewWithLocation(limit, location) bt := &Heartbeat{ done: make(chan struct{}), - scheduler: sched, - manager: manager, + config: parsedConfig, + scheduler: scheduler, } return bt, nil } +// Run executes the beat. func (bt *Heartbeat) Run(b *beat.Beat) error { logp.Info("heartbeat is running! Hit CTRL-C to stop it.") + err := bt.RunStaticMonitors(b) + if err != nil { + return err + } + + if bt.config.ConfigMonitors.Enabled() { + bt.monitorReloader = cfgfile.NewReloader(b.Publisher, bt.config.ConfigMonitors) + defer bt.monitorReloader.Stop() + + err := bt.RunDynamicMonitors(b) + if err != nil { + return err + } + } + if err := bt.scheduler.Start(); err != nil { return err } @@ -80,12 +97,40 @@ func (bt *Heartbeat) Run(b *beat.Beat) error { <-bt.done - bt.manager.Stop() - logp.Info("Shutting down.") return nil } +// RunStaticMonitors runs the `heartbeat.monitors` portion of the yaml config if present. +func (bt *Heartbeat) RunStaticMonitors(b *beat.Beat) error { + factory := monitors.NewFactory(bt.scheduler, true) + + for _, cfg := range bt.config.Monitors { + created, err := factory.Create(b.Publisher, cfg, nil) + if err != nil { + return errors.Wrap(err, "could not create monitor") + } + created.Start() + } + return nil +} + +// RunDynamicMonitors runs the `heartbeat.config.monitors` portion of the yaml config if present. +func (bt *Heartbeat) RunDynamicMonitors(b *beat.Beat) (err error) { + factory := monitors.NewFactory(bt.scheduler, false) + + // Check monitor configs + if err := bt.monitorReloader.Check(factory); err != nil { + return err + } + + // Execute the monitor + go bt.monitorReloader.Run(factory) + + return nil +} + +// Stop stops the beat. func (bt *Heartbeat) Stop() { close(bt.done) } diff --git a/heartbeat/beater/manager.go b/heartbeat/beater/manager.go deleted file mode 100644 index ece16499f1b..00000000000 --- a/heartbeat/beater/manager.go +++ /dev/null @@ -1,328 +0,0 @@ -// Licensed to Elasticsearch B.V. under one or more contributor -// license agreements. See the NOTICE file distributed with -// this work for additional information regarding copyright -// ownership. Elasticsearch B.V. licenses this file to you under -// the Apache License, Version 2.0 (the "License"); you may -// not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -package beater - -import ( - "bytes" - "encoding/json" - "errors" - "fmt" - "time" - - "github.com/elastic/beats/heartbeat/monitors" - "github.com/elastic/beats/heartbeat/scheduler" - "github.com/elastic/beats/heartbeat/scheduler/schedule" - "github.com/elastic/beats/heartbeat/watcher" - "github.com/elastic/beats/libbeat/beat" - "github.com/elastic/beats/libbeat/common" - "github.com/elastic/beats/libbeat/logp" - "github.com/elastic/beats/libbeat/processors" -) - -type monitorManager struct { - monitors []monitor - jobControl jobControl -} - -type monitor struct { - manager *monitorManager - watcher watcher.Watch - - name string - uniqueName string - factory monitors.Factory - config *common.Config - - active map[string]monitorTask - - pipeline beat.Pipeline -} - -type monitorTask struct { - job monitors.Job - cancel jobCanceller - - config monitorTaskConfig -} - -type monitorTaskConfig struct { - Name string `config:"name"` - Type string `config:"type"` - Schedule *schedule.Schedule `config:"schedule" validate:"required"` - - // Fields and tags to add to monitor. - EventMetadata common.EventMetadata `config:",inline"` - Processors processors.PluginConfig `config:"processors"` -} - -type jobControl interface { - Add(sched scheduler.Schedule, name string, f scheduler.TaskFunc) (stopFn func() error, err error) -} - -type jobCanceller func() error - -var defaultFilePollInterval = 5 * time.Second - -func newMonitorManager( - pipeline beat.Pipeline, - jobControl jobControl, - registry *monitors.Registrar, - configs []*common.Config, -) (*monitorManager, error) { - type watchConfig struct { - Path string `config:"watch.poll_file.path"` - Poll time.Duration `config:"watch.poll_file.interval" validate:"min=1"` - } - defaultWatchConfig := watchConfig{ - Poll: defaultFilePollInterval, - } - - m := &monitorManager{ - jobControl: jobControl, - } - - if len(configs) == 0 { - return nil, errors.New("no monitor configured") - } - - // check monitors exist - for _, config := range configs { - plugin := struct { - Type string `config:"type" validate:"required"` - Enabled bool `config:"enabled"` - }{ - Enabled: true, - } - - if err := config.Unpack(&plugin); err != nil { - return nil, err - } - - if !plugin.Enabled { - continue - } - - info, found := registry.Query(plugin.Type) - if !found { - return nil, fmt.Errorf("Monitor type '%v' does not exist", plugin.Type) - } - logp.Info("Select (%v) monitor %v", info.Type, info.Name) - - factory := registry.GetFactory(plugin.Type) - if factory == nil { - return nil, fmt.Errorf("Found non-runnable monitor %v", plugin.Type) - } - - m.monitors = append(m.monitors, monitor{ - manager: m, - name: info.Name, - factory: factory, - config: config, - active: map[string]monitorTask{}, - pipeline: pipeline, - }) - } - - // load watcher configs - watchConfigs := make([]watchConfig, len(m.monitors)) - for i, monitor := range m.monitors { - watchConfigs[i] = defaultWatchConfig - if err := monitor.config.Unpack(&watchConfigs[i]); err != nil { - return nil, err - } - } - - // load initial monitors - for _, monitor := range m.monitors { - err := monitor.Update([]*common.Config{monitor.config}) - if err != nil { - logp.Err("failed to load monitor tasks: %v", err) - } - } - - // start monitor resource watchers if configured (will drop registered monitoring tasks and install new one if resource is available) - for i := range m.monitors { - monitor := &m.monitors[i] - path := watchConfigs[i].Path - if path == "" { - continue - } - - poll := watchConfigs[i].Poll - monitor.watcher, _ = watcher.NewFilePoller(path, poll, createWatchUpdater(monitor)) - } - - return m, nil -} - -func (m *monitorManager) Stop() { - for _, m := range m.monitors { - m.Close() - } -} - -func (m *monitor) Update(configs []*common.Config) error { - all := map[string]monitorTask{} - for i, upd := range configs { - config, err := common.MergeConfigs(m.config, upd) - if err != nil { - logp.Err("Failed merging monitor config with updates: %v", err) - return err - } - - shared := monitorTaskConfig{} - if err := config.Unpack(&shared); err != nil { - logp.Err("Failed parsing job schedule: ", err) - return err - } - - jobs, err := m.factory(config) - if err != nil { - err = fmt.Errorf("%v when initializing monitor %v(%v)", err, m.name, i) - return err - } - - for _, job := range jobs { - all[job.Name()] = monitorTask{ - job: job, - config: shared, - } - } - } - - // stop all active jobs - for _, job := range m.active { - job.cancel() - } - m.active = map[string]monitorTask{} - - // start new and reconfigured tasks - for id, t := range all { - processors, err := processors.New(t.config.Processors) - if err != nil { - logp.Critical("Fail to load monitor processors: %v", err) - continue - } - - // create connection per monitorTask - client, err := m.pipeline.ConnectWith(beat.ClientConfig{ - EventMetadata: t.config.EventMetadata, - Processor: processors, - }) - if err != nil { - logp.Critical("Fail to connect job '%v' to publisher pipeline: %v", id, err) - continue - } - - job := t.createJob(client) - jobCancel, err := m.manager.jobControl.Add(t.config.Schedule, id, job) - if err == scheduler.ErrAlreadyStopped { - logp.Info("Will not add new task to scheduler. Heartbeat is stopping.") - } else if err != nil { - logp.Err("an unexpected error occurred adding a task to the scheduler: %v", err) - continue - } - - t.cancel = func() error { - client.Close() - return jobCancel() - } - - m.active[id] = t - } - - return nil -} - -func (m *monitor) Close() { - for _, mt := range m.active { - mt.cancel() - } -} - -func createWatchUpdater(monitor *monitor) func(content []byte) { - return func(content []byte) { - defer logp.Recover("Failed applying monitor watch") - - // read multiple json objects from content - dec := json.NewDecoder(bytes.NewBuffer(content)) - var configs []*common.Config - for dec.More() { - var obj map[string]interface{} - err := dec.Decode(&obj) - if err != nil { - logp.Err("Failed parsing json object: %v", err) - return - } - - logp.Info("load watch object: %v", obj) - - cfg, err := common.NewConfigFrom(obj) - if err != nil { - logp.Err("Failed normalizing json input: %v", err) - return - } - - configs = append(configs, cfg) - } - - // apply read configurations - if err := monitor.Update(configs); err != nil { - logp.Err("Failed applying configuration: %v", err) - } - } -} - -func (m *monitorTask) createJob(client beat.Client) scheduler.TaskFunc { - name := m.config.Name - if name == "" { - name = m.config.Type - } - - meta := common.MapStr{ - "monitor": common.MapStr{ - "name": name, - "type": m.config.Type, - }, - } - return m.prepareSchedulerJob(client, meta, m.job.Run) -} - -func (m *monitorTask) prepareSchedulerJob(client beat.Client, meta common.MapStr, run monitors.JobRunner) scheduler.TaskFunc { - return func() []scheduler.TaskFunc { - event, next, err := run() - if err != nil { - logp.Err("Job %v failed with: ", err) - } - - if event.Fields != nil { - event.Fields.DeepUpdate(meta) - client.Publish(event) - } - - if len(next) == 0 { - return nil - } - - cont := make([]scheduler.TaskFunc, len(next)) - for i, n := range next { - cont[i] = m.prepareSchedulerJob(client, meta, n) - } - return cont - } -} diff --git a/heartbeat/config/config.go b/heartbeat/config/config.go index 935de5a3ddc..620f1fd62c3 100644 --- a/heartbeat/config/config.go +++ b/heartbeat/config/config.go @@ -20,17 +20,23 @@ package config -import "github.com/elastic/beats/libbeat/common" +import ( + "github.com/elastic/beats/libbeat/common" +) +// Config defines the structure of heartbeat.yml. type Config struct { // Modules is a list of module specific configuration data. - Monitors []*common.Config `config:"monitors" validate:"required"` - Scheduler Scheduler `config:"scheduler"` + Monitors []*common.Config `config:"monitors"` + ConfigMonitors *common.Config `config:"config.monitors"` + Scheduler Scheduler `config:"scheduler"` } +// Scheduler defines the syntax of a heartbeat.yml scheduler block. type Scheduler struct { Limit uint `config:"limit" validate:"min=0"` Location string `config:"location"` } +// DefaultConfig is the canonical instantiation of Config. var DefaultConfig = Config{} diff --git a/heartbeat/heartbeat.reference.yml b/heartbeat/heartbeat.reference.yml index 99393893e6a..d94143f4284 100644 --- a/heartbeat/heartbeat.reference.yml +++ b/heartbeat/heartbeat.reference.yml @@ -32,14 +32,6 @@ heartbeat.monitors: ipv6: true mode: any - # Configure file json file to be watched for changes to the monitor: - #watch.poll_file: - # Path to check for updates. - #path: - - # Interval between file file changed checks. - #interval: 5s - # Total running time per ping test. timeout: 16s @@ -62,6 +54,15 @@ heartbeat.monitors: # sub-dictionary. Default is false. #fields_under_root: false + # NOTE: THIS FEATURE IS DEPRECATED AND WILL BE REMOVED IN A FUTURE RELEASE + # Configure file json file to be watched for changes to the monitor: + #watch.poll_file: + # Path to check for updates. + #path: + + # Interval between file file changed checks. + #interval: 5s + - type: tcp # monitor type `tcp`. Connect via TCP and optionally verify endpoint # by sending/receiving a custom payload @@ -98,14 +99,6 @@ heartbeat.monitors: ipv6: true mode: any - # Configure file json file to be watched for changes to the monitor: - #watch.poll_file: - # Path to check for updates. - #path: - - # Interval between file file changed checks. - #interval: 5s - # List of ports to ping if host does not contain a port number # ports: [80, 9200, 5044] @@ -135,6 +128,16 @@ heartbeat.monitors: # Required TLS protocols #supported_protocols: ["TLSv1.0", "TLSv1.1", "TLSv1.2"] + # NOTE: THIS FEATURE IS DEPRECATED AND WILL BE REMOVED IN A FUTURE RELEASE + # Configure file json file to be watched for changes to the monitor: + #watch.poll_file: + # Path to check for updates. + #path: + + # Interval between file file changed checks. + #interval: 5s + + - type: http # monitor type `http`. Connect via HTTP an optionally verify response # Monitor name used for job name and document type @@ -205,6 +208,16 @@ heartbeat.monitors: # Required response contents. #body: + # NOTE: THIS FEATURE IS DEPRECATED AND WILL BE REMOVED IN A FUTURE RELEASE + # Configure file json file to be watched for changes to the monitor: + #watch.poll_file: + # Path to check for updates. + #path: + + # Interval between file file changed checks. + #interval: 5s + + heartbeat.scheduler: # Limit number of concurrent tasks executed by heartbeat. The task limit if # disabled if set to 0. The default is 0. diff --git a/heartbeat/monitors/active/http/http.go b/heartbeat/monitors/active/http/http.go index c93e2139b80..34ba5011367 100644 --- a/heartbeat/monitors/active/http/http.go +++ b/heartbeat/monitors/active/http/http.go @@ -37,7 +37,7 @@ func init() { var debugf = logp.MakeDebug("http") func create( - info monitors.Info, + name string, cfg *common.Config, ) ([]monitors.Job, error) { config := defaultConfig diff --git a/heartbeat/monitors/active/http/http_test.go b/heartbeat/monitors/active/http/http_test.go index 07645cb6461..8899cdf0fc5 100644 --- a/heartbeat/monitors/active/http/http_test.go +++ b/heartbeat/monitors/active/http/http_test.go @@ -28,7 +28,6 @@ import ( "github.com/stretchr/testify/require" "github.com/elastic/beats/heartbeat/hbtest" - "github.com/elastic/beats/heartbeat/monitors" "github.com/elastic/beats/libbeat/beat" "github.com/elastic/beats/libbeat/common" "github.com/elastic/beats/libbeat/common/mapval" @@ -55,7 +54,7 @@ func testTLSRequest(t *testing.T, testURL string, certPath string) beat.Event { config, err := common.NewConfigFrom(configSrc) require.NoError(t, err) - jobs, err := create(monitors.Info{}, config) + jobs, err := create("tls", config) require.NoError(t, err) job := jobs[0] diff --git a/heartbeat/monitors/active/icmp/icmp.go b/heartbeat/monitors/active/icmp/icmp.go index 0c445eab68b..aa7747f0a7d 100644 --- a/heartbeat/monitors/active/icmp/icmp.go +++ b/heartbeat/monitors/active/icmp/icmp.go @@ -35,7 +35,7 @@ func init() { var debugf = logp.MakeDebug("icmp") func create( - info monitors.Info, + name string, cfg *common.Config, ) ([]monitors.Job, error) { config := DefaultConfig diff --git a/heartbeat/monitors/active/tcp/tcp.go b/heartbeat/monitors/active/tcp/tcp.go index 4e37cd0ad30..6bcf785b70b 100644 --- a/heartbeat/monitors/active/tcp/tcp.go +++ b/heartbeat/monitors/active/tcp/tcp.go @@ -45,7 +45,7 @@ type connURL struct { } func create( - info monitors.Info, + name string, cfg *common.Config, ) ([]monitors.Job, error) { config := DefaultConfig diff --git a/heartbeat/monitors/active/tcp/tcp_test.go b/heartbeat/monitors/active/tcp/tcp_test.go index 3a7df243798..9752157b2ff 100644 --- a/heartbeat/monitors/active/tcp/tcp_test.go +++ b/heartbeat/monitors/active/tcp/tcp_test.go @@ -30,7 +30,6 @@ import ( "github.com/stretchr/testify/require" "github.com/elastic/beats/heartbeat/hbtest" - "github.com/elastic/beats/heartbeat/monitors" "github.com/elastic/beats/libbeat/beat" "github.com/elastic/beats/libbeat/common" "github.com/elastic/beats/libbeat/common/mapval" @@ -46,7 +45,7 @@ func testTCPCheck(t *testing.T, host string, port uint16) *beat.Event { }) require.NoError(t, err) - jobs, err := create(monitors.Info{}, config) + jobs, err := create("tcp", config) require.NoError(t, err) job := jobs[0] @@ -66,7 +65,7 @@ func testTLSTCPCheck(t *testing.T, host string, port uint16, certFileName string }) require.NoError(t, err) - jobs, err := create(monitors.Info{}, config) + jobs, err := create("tcp", config) require.NoError(t, err) job := jobs[0] diff --git a/heartbeat/monitors/factory.go b/heartbeat/monitors/factory.go new file mode 100644 index 00000000000..c2ce985fcb5 --- /dev/null +++ b/heartbeat/monitors/factory.go @@ -0,0 +1,48 @@ +// Licensed to Elasticsearch B.V. under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Elasticsearch B.V. licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package monitors + +import ( + "github.com/elastic/beats/heartbeat/scheduler" + "github.com/elastic/beats/libbeat/beat" + "github.com/elastic/beats/libbeat/cfgfile" + "github.com/elastic/beats/libbeat/common" +) + +// RunnerFactory that can be used to create cfg.Runner cast versions of Monitor +// suitable for config reloading. +type RunnerFactory struct { + sched *scheduler.Scheduler + allowWatches bool +} + +// NewFactory takes a scheduler and creates a RunnerFactory that can create cfgfile.Runner(Monitor) objects. +func NewFactory(sched *scheduler.Scheduler, allowWatches bool) *RunnerFactory { + return &RunnerFactory{sched, allowWatches} +} + +// Create makes a new Runner for a new monitor with the given Config. +func (f *RunnerFactory) Create(p beat.Pipeline, c *common.Config, meta *common.MapStrPointer) (cfgfile.Runner, error) { + monitor, err := newMonitor(c, globalPluginsReg, p, f.sched, f.allowWatches) + return monitor, err +} + +// CheckConfig checks to see if the given monitor config is valid. +func (f *RunnerFactory) CheckConfig(config *common.Config) error { + return checkMonitorConfig(config, globalPluginsReg, f.allowWatches) +} diff --git a/heartbeat/monitors/job.go b/heartbeat/monitors/job.go new file mode 100644 index 00000000000..b5bfb8586c6 --- /dev/null +++ b/heartbeat/monitors/job.go @@ -0,0 +1,30 @@ +// Licensed to Elasticsearch B.V. under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Elasticsearch B.V. licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package monitors + +import ( + "github.com/elastic/beats/libbeat/beat" +) + +// Job represents the work done by a single check by a given Monitor. +type Job interface { + Name() string + Run() (beat.Event, []jobRunner, error) +} + +type jobRunner func() (beat.Event, []jobRunner, error) diff --git a/heartbeat/monitors/monitor.go b/heartbeat/monitors/monitor.go new file mode 100644 index 00000000000..5e08bb77e1f --- /dev/null +++ b/heartbeat/monitors/monitor.go @@ -0,0 +1,242 @@ +// Licensed to Elasticsearch B.V. under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Elasticsearch B.V. licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package monitors + +import ( + "bytes" + "encoding/json" + "fmt" + "sync" + + "github.com/pkg/errors" + + "github.com/elastic/beats/heartbeat/scheduler" + "github.com/elastic/beats/heartbeat/watcher" + "github.com/elastic/beats/libbeat/beat" + "github.com/elastic/beats/libbeat/common" + "github.com/elastic/beats/libbeat/logp" +) + +// Monitor represents a configured recurring monitoring task loaded from a config file. Starting it +// will cause it to run with the given scheduler until Stop() is called. +type Monitor struct { + name string + config *common.Config + registrar *pluginsReg + uniqueName string + scheduler *scheduler.Scheduler + jobTasks []*task + enabled bool + // internalsMtx is used to synchronize access to critical + // internal datastructures + internalsMtx sync.Mutex + + // Watch related fields + watchPollTasks []*task + watch watcher.Watch + + pipeline beat.Pipeline +} + +// String prints a description of the monitor in a threadsafe way. It is important that this use threadsafe +// values because it may be invoked from another thread in cfgfile/runner. +func (m *Monitor) String() string { + return fmt.Sprintf("Monitor", m.name, m.enabled) +} + +func checkMonitorConfig(config *common.Config, registrar *pluginsReg, allowWatches bool) error { + _, err := newMonitor(config, registrar, nil, nil, allowWatches) + return err +} + +// ErrWatchesDisabled is returned when the user attempts to declare a watch poll file in a +var ErrWatchesDisabled = errors.New("watch poll files are only allowed in heartbeat.yml, not dynamic configs") + +func newMonitor( + config *common.Config, + registrar *pluginsReg, + pipeline beat.Pipeline, + scheduler *scheduler.Scheduler, + allowWatches bool, +) (*Monitor, error) { + // Extract just the Type and Enabled fields from the config + // We'll parse things more precisely later once we know what exact type of + // monitor we have + mpi, err := pluginInfo(config) + if err != nil { + return nil, err + } + + monitorPlugin, found := registrar.get(mpi.Type) + if !found { + return nil, fmt.Errorf("monitor type %v does not exist", mpi.Type) + } + + m := &Monitor{ + name: monitorPlugin.name, + scheduler: scheduler, + jobTasks: []*task{}, + pipeline: pipeline, + watchPollTasks: []*task{}, + internalsMtx: sync.Mutex{}, + config: config, + } + + jobs, err := monitorPlugin.create(config) + if err != nil { + return nil, fmt.Errorf("job err %v", err) + } + + m.jobTasks, err = m.makeTasks(config, jobs) + if err != nil { + return nil, err + } + + err = m.makeWatchTasks(monitorPlugin) + if err != nil { + return nil, err + } + + if len(m.watchPollTasks) > 0 { + if !allowWatches { + return nil, ErrWatchesDisabled + } + + logp.Info(`Obsolete option 'watch.poll_file' declared. This will be removed in a future release. +See https://www.elastic.co/guide/en/beats/heartbeat/current/configuration-heartbeat-options.html for more info`) + } + + return m, nil +} + +func (m *Monitor) makeTasks(config *common.Config, jobs []Job) ([]*task, error) { + mtConf := taskConfig{} + if err := config.Unpack(&mtConf); err != nil { + return nil, errors.Wrap(err, "invalid config, could not unpack monitor config") + } + + var mTasks []*task + for _, job := range jobs { + t, err := newTask(job, mtConf, m) + if err != nil { + // Failure to compile monitor processors should not crash hb or prevent progress + if _, ok := err.(InvalidMonitorProcessorsError); ok { + logp.Critical("Failed to load monitor processors: %v", err) + continue + } + + return nil, err + } + + mTasks = append(mTasks, t) + } + + return mTasks, nil +} + +func (m *Monitor) makeWatchTasks(monitorPlugin pluginBuilder) error { + watchCfg := watcher.DefaultWatchConfig + err := m.config.Unpack(&watchCfg) + if err != nil { + return err + } + + if len(watchCfg.Path) > 0 { + m.watch, err = watcher.NewFilePoller(watchCfg.Path, watchCfg.Poll, func(content []byte) { + var newTasks []*task + + dec := json.NewDecoder(bytes.NewBuffer(content)) + for dec.More() { + var obj map[string]interface{} + err = dec.Decode(&obj) + if err != nil { + logp.Err("Failed parsing JSON object: %v", err) + return + } + + cfg, err := common.NewConfigFrom(obj) + if err != nil { + logp.Err("Failed normalizing JSON input: %v", err) + return + } + + merged, err := common.MergeConfigs(m.config, cfg) + if err != nil { + logp.Err("Could not merge config: %v", err) + return + } + + watchJobs, err := monitorPlugin.create(merged) + if err != nil { + logp.Err("Could not create job from watch file: %v", err) + } + + watchTasks, err := m.makeTasks(merged, watchJobs) + if err != nil { + logp.Err("Could not make task for config: %v", err) + return + } + + newTasks = append(newTasks, watchTasks...) + } + + m.internalsMtx.Lock() + defer m.internalsMtx.Unlock() + + for _, t := range m.watchPollTasks { + t.Stop() + } + m.watchPollTasks = newTasks + for _, t := range m.watchPollTasks { + t.Start() + } + }) + + if err != nil { + return err + } + } + + return nil +} + +func (m *Monitor) Start() { + m.internalsMtx.Lock() + defer m.internalsMtx.Unlock() + + for _, t := range m.jobTasks { + t.Start() + } + + for _, t := range m.watchPollTasks { + t.Start() + } +} + +func (m *Monitor) Stop() { + m.internalsMtx.Lock() + defer m.internalsMtx.Unlock() + + for _, t := range m.jobTasks { + t.Stop() + } + + for _, t := range m.watchPollTasks { + t.Stop() + } +} diff --git a/heartbeat/monitors/monitors.go b/heartbeat/monitors/monitors.go deleted file mode 100644 index be1c897af26..00000000000 --- a/heartbeat/monitors/monitors.go +++ /dev/null @@ -1,131 +0,0 @@ -// Licensed to Elasticsearch B.V. under one or more contributor -// license agreements. See the NOTICE file distributed with -// this work for additional information regarding copyright -// ownership. Elasticsearch B.V. licenses this file to you under -// the Apache License, Version 2.0 (the "License"); you may -// not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -package monitors - -import ( - "fmt" - "sort" - "strings" - - "github.com/elastic/beats/libbeat/beat" - "github.com/elastic/beats/libbeat/common" -) - -type Factory func(*common.Config) ([]Job, error) - -type ActiveBuilder func(Info, *common.Config) ([]Job, error) - -type Job interface { - Name() string - Run() (beat.Event, []JobRunner, error) -} - -type JobRunner func() (beat.Event, []JobRunner, error) - -type TaskRunner interface { - Run() (common.MapStr, []TaskRunner, error) -} - -type Type uint8 - -type Info struct { - Name string - Type Type -} - -const ( - ActiveMonitor Type = iota + 1 - PassiveMonitor -) - -var Registry = newRegistrar() - -type Registrar struct { - modules map[string]entry -} - -type entry struct { - info Info - builder ActiveBuilder -} - -func newRegistrar() *Registrar { - return &Registrar{ - modules: map[string]entry{}, - } -} - -func RegisterActive(name string, builder ActiveBuilder) { - if err := Registry.AddActive(name, builder); err != nil { - panic(err) - } -} - -func (r *Registrar) Register(name string, t Type, builder ActiveBuilder) error { - if _, found := r.modules[name]; found { - return fmt.Errorf("monitor type %v already exists", name) - } - - info := Info{Name: name, Type: t} - r.modules[name] = entry{info: info, builder: builder} - - return nil -} - -func (r *Registrar) Query(name string) (Info, bool) { - e, found := r.modules[name] - return e.info, found -} - -func (r *Registrar) GetFactory(name string) Factory { - e, found := r.modules[name] - if !found { - return nil - } - return e.Create -} - -func (r *Registrar) AddActive(name string, builder ActiveBuilder) error { - return r.Register(name, ActiveMonitor, builder) -} - -func (r *Registrar) String() string { - var monitors []string - for m := range r.modules { - monitors = append(monitors, m) - } - sort.Strings(monitors) - - return fmt.Sprintf("Registry, monitors: %v", - strings.Join(monitors, ", ")) -} - -func (e *entry) Create(cfg *common.Config) ([]Job, error) { - return e.builder(e.info, cfg) -} - -func (t Type) String() string { - switch t { - case ActiveMonitor: - return "active" - case PassiveMonitor: - return "passive" - default: - return "unknown type" - } -} diff --git a/heartbeat/monitors/plugin.go b/heartbeat/monitors/plugin.go index b6e3f9eb2dd..8a3286287d0 100644 --- a/heartbeat/monitors/plugin.go +++ b/heartbeat/monitors/plugin.go @@ -19,29 +19,120 @@ package monitors import ( "errors" + "fmt" + "sort" + "strings" + "github.com/elastic/beats/libbeat/common" "github.com/elastic/beats/libbeat/plugin" ) -type monitorPlugin struct { +type pluginBuilder struct { name string typ Type - builder ActiveBuilder + builder PluginBuilder } var pluginKey = "heartbeat.monitor" -func ActivePlugin(name string, b ActiveBuilder) map[string][]interface{} { - return plugin.MakePlugin(pluginKey, monitorPlugin{name, ActiveMonitor, b}) -} - func init() { plugin.MustRegisterLoader(pluginKey, func(ifc interface{}) error { - p, ok := ifc.(monitorPlugin) + p, ok := ifc.(pluginBuilder) if !ok { return errors.New("plugin does not match monitor plugin type") } - return Registry.Register(p.name, p.typ, p.builder) + return globalPluginsReg.register(pluginBuilder{p.name, p.typ, p.builder}) }) } + +// PluginBuilder is the signature of functions used to build active +// monitors +type PluginBuilder func(string, *common.Config) ([]Job, error) + +// Type represents whether a plugin is active or passive. +type Type uint8 + +const ( + // ActiveMonitor represents monitors that reach across the network to do things. + ActiveMonitor Type = iota + 1 + // PassiveMonitor represents monitors that receive inbound data. + PassiveMonitor +) + +// globalPluginsReg maintains the canonical list of valid Heartbeat monitors at runtime. +var globalPluginsReg = newPluginsReg() + +type pluginsReg struct { + monitors map[string]pluginBuilder +} + +func newPluginsReg() *pluginsReg { + return &pluginsReg{ + monitors: map[string]pluginBuilder{}, + } +} + +// RegisterActive registers a new active (as opposed to passive) monitor. +func RegisterActive(name string, builder PluginBuilder) { + if err := globalPluginsReg.add(pluginBuilder{name, ActiveMonitor, builder}); err != nil { + panic(err) + } +} + +// ErrPluginAlreadyExists is returned when there is an attempt to register two plugins +// with the same name. +type ErrPluginAlreadyExists pluginBuilder + +func (m ErrPluginAlreadyExists) Error() string { + return fmt.Sprintf("monitor plugin '%s' already exists", m.name) +} + +func (r *pluginsReg) add(plugin pluginBuilder) error { + if _, exists := r.monitors[plugin.name]; exists { + return ErrPluginAlreadyExists(plugin) + } + r.monitors[plugin.name] = plugin + return nil +} + +func (r *pluginsReg) register(plugin pluginBuilder) error { + if _, found := r.monitors[plugin.name]; found { + return fmt.Errorf("monitor type %v already exists", plugin.name) + } + + r.monitors[plugin.name] = plugin + + return nil +} + +func (r *pluginsReg) get(name string) (pluginBuilder, bool) { + e, found := r.monitors[name] + return e, found +} + +func (r *pluginsReg) String() string { + var monitors []string + for m := range r.monitors { + monitors = append(monitors, m) + } + sort.Strings(monitors) + + return fmt.Sprintf("globalPluginsReg, monitor: %v", + strings.Join(monitors, ", ")) +} + +func (e *pluginBuilder) create(cfg *common.Config) ([]Job, error) { + return e.builder(e.name, cfg) +} + +func (t Type) String() string { + switch t { + case ActiveMonitor: + return "active" + case PassiveMonitor: + return "passive" + default: + return "unknown type" + } +} diff --git a/heartbeat/monitors/plugin_test.go b/heartbeat/monitors/plugin_test.go new file mode 100644 index 00000000000..b8b199afa72 --- /dev/null +++ b/heartbeat/monitors/plugin_test.go @@ -0,0 +1,249 @@ +// Licensed to Elasticsearch B.V. under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Elasticsearch B.V. licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package monitors + +import ( + "reflect" + "testing" + + "github.com/elastic/beats/libbeat/common" +) + +func Test_newPluginsReg(t *testing.T) { + tests := []struct { + name string + want *pluginsReg + }{ + // TODO: Add test cases. + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + if got := newPluginsReg(); !reflect.DeepEqual(got, tt.want) { + t.Errorf("newPluginsReg() = %v, want %v", got, tt.want) + } + }) + } +} + +func TestRegisterActive(t *testing.T) { + type args struct { + name string + builder PluginBuilder + } + tests := []struct { + name string + args args + }{ + // TODO: Add test cases. + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + RegisterActive(tt.args.name, tt.args.builder) + }) + } +} + +func TestMonitorPluginAlreadyExistsError_Error(t *testing.T) { + type fields struct { + name string + typ Type + builder PluginBuilder + } + tests := []struct { + name string + fields fields + want string + }{ + // TODO: Add test cases. + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + m := ErrPluginAlreadyExists{ + name: tt.fields.name, + typ: tt.fields.typ, + builder: tt.fields.builder, + } + if got := m.Error(); got != tt.want { + t.Errorf("ErrPluginAlreadyExists.Error() = %v, want %v", got, tt.want) + } + }) + } +} + +func Test_pluginsReg_add(t *testing.T) { + type fields struct { + monitors map[string]pluginBuilder + } + type args struct { + plugin pluginBuilder + } + tests := []struct { + name string + fields fields + args args + wantErr bool + }{ + // TODO: Add test cases. + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + r := &pluginsReg{ + monitors: tt.fields.monitors, + } + if err := r.add(tt.args.plugin); (err != nil) != tt.wantErr { + t.Errorf("pluginsReg.add() error = %v, wantErr %v", err, tt.wantErr) + } + }) + } +} + +func Test_pluginsReg_register(t *testing.T) { + type fields struct { + monitors map[string]pluginBuilder + } + type args struct { + plugin pluginBuilder + } + tests := []struct { + name string + fields fields + args args + wantErr bool + }{ + // TODO: Add test cases. + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + r := &pluginsReg{ + monitors: tt.fields.monitors, + } + if err := r.register(tt.args.plugin); (err != nil) != tt.wantErr { + t.Errorf("pluginsReg.register() error = %v, wantErr %v", err, tt.wantErr) + } + }) + } +} + +func Test_pluginsReg_get(t *testing.T) { + type fields struct { + monitors map[string]pluginBuilder + } + type args struct { + name string + } + tests := []struct { + name string + fields fields + args args + want pluginBuilder + want1 bool + }{ + // TODO: Add test cases. + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + r := &pluginsReg{ + monitors: tt.fields.monitors, + } + got, got1 := r.get(tt.args.name) + if !reflect.DeepEqual(got, tt.want) { + t.Errorf("pluginsReg.get() got = %v, want %v", got, tt.want) + } + if got1 != tt.want1 { + t.Errorf("pluginsReg.get() got1 = %v, want %v", got1, tt.want1) + } + }) + } +} + +func Test_pluginsReg_String(t *testing.T) { + type fields struct { + monitors map[string]pluginBuilder + } + tests := []struct { + name string + fields fields + want string + }{ + // TODO: Add test cases. + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + r := &pluginsReg{ + monitors: tt.fields.monitors, + } + if got := r.String(); got != tt.want { + t.Errorf("pluginsReg.String() = %v, want %v", got, tt.want) + } + }) + } +} + +func Test_pluginBuilder_create(t *testing.T) { + type fields struct { + name string + typ Type + builder PluginBuilder + } + type args struct { + cfg *common.Config + } + tests := []struct { + name string + fields fields + args args + want []Job + wantErr bool + }{ + // TODO: Add test cases. + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + e := &pluginBuilder{ + name: tt.fields.name, + typ: tt.fields.typ, + builder: tt.fields.builder, + } + got, err := e.create(tt.args.cfg) + if (err != nil) != tt.wantErr { + t.Errorf("pluginBuilder.create() error = %v, wantErr %v", err, tt.wantErr) + return + } + if !reflect.DeepEqual(got, tt.want) { + t.Errorf("pluginBuilder.create() = %v, want %v", got, tt.want) + } + }) + } +} + +func TestType_String(t *testing.T) { + tests := []struct { + name string + t Type + want string + }{ + // TODO: Add test cases. + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + if got := tt.t.String(); got != tt.want { + t.Errorf("Type.String() = %v, want %v", got, tt.want) + } + }) + } +} diff --git a/heartbeat/monitors/pluginconf.go b/heartbeat/monitors/pluginconf.go new file mode 100644 index 00000000000..8f9bbc58b70 --- /dev/null +++ b/heartbeat/monitors/pluginconf.go @@ -0,0 +1,51 @@ +// Licensed to Elasticsearch B.V. under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Elasticsearch B.V. licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package monitors + +import ( + "fmt" + + "github.com/pkg/errors" + + "github.com/elastic/beats/libbeat/common" +) + +type PluginDisabledError struct{} + +func (e PluginDisabledError) Error() string { + return fmt.Sprintf("Monitor not loaded, plugin is disabled") +} + +type MonitorPluginInfo struct { + Type string `config:"type" validate:"required"` + Enabled bool `config:"enabled"` +} + +func pluginInfo(config *common.Config) (MonitorPluginInfo, error) { + mpi := MonitorPluginInfo{Enabled: true} + + if err := config.Unpack(&mpi); err != nil { + return mpi, errors.Wrap(err, "error unpacking monitor plugin config") + } + + if !mpi.Enabled { + return mpi, PluginDisabledError{} + } + + return mpi, nil +} diff --git a/heartbeat/monitors/task.go b/heartbeat/monitors/task.go new file mode 100644 index 00000000000..8f32db39b07 --- /dev/null +++ b/heartbeat/monitors/task.go @@ -0,0 +1,150 @@ +// Licensed to Elasticsearch B.V. under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Elasticsearch B.V. licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package monitors + +import ( + "fmt" + + "github.com/pkg/errors" + + "github.com/elastic/beats/heartbeat/scheduler" + "github.com/elastic/beats/heartbeat/scheduler/schedule" + "github.com/elastic/beats/libbeat/beat" + "github.com/elastic/beats/libbeat/common" + "github.com/elastic/beats/libbeat/logp" + "github.com/elastic/beats/libbeat/processors" +) + +type taskCanceller func() error + +type task struct { + job Job + config taskConfig + monitor *Monitor + processors *processors.Processors + cancelFn taskCanceller + client beat.Client +} + +type taskConfig struct { + Name string `config:"name"` + Type string `config:"type"` + Schedule *schedule.Schedule `config:"schedule" validate:"required"` + + // Fields and tags to add to monitor. + EventMetadata common.EventMetadata `config:",inline"` + Processors processors.PluginConfig `config:"processors"` +} + +// InvalidMonitorProcessorsError is used to indicate situations when processors could not be loaded. +// This special type is used because these errors are caught and handled gracefully. +type InvalidMonitorProcessorsError struct{ root error } + +func (e InvalidMonitorProcessorsError) Error() string { + return fmt.Sprintf("could not load monitor processors: %s", e.root) +} + +func newTask(job Job, config taskConfig, monitor *Monitor) (*task, error) { + t := &task{ + job: job, + config: config, + monitor: monitor, + } + + processors, err := processors.New(config.Processors) + if err != nil { + return nil, InvalidMonitorProcessorsError{err} + } + t.processors = processors + + if err != nil { + logp.Critical("Could not create client for monitor task %+v", t.monitor) + return nil, errors.Wrap(err, "could not create client for monitor task") + } + + return t, nil +} + +func (t *task) prepareSchedulerJob(meta common.MapStr, run jobRunner) scheduler.TaskFunc { + return func() []scheduler.TaskFunc { + event, next, err := run() + if err != nil { + logp.Err("Job %v failed with: ", err) + } + + if event.Fields != nil { + event.Fields.DeepUpdate(meta) + t.client.Publish(event) + } + + if len(next) == 0 { + return nil + } + + continuations := make([]scheduler.TaskFunc, len(next)) + for i, n := range next { + continuations[i] = t.prepareSchedulerJob(meta, n) + } + return continuations + } +} + +func (t *task) makeSchedulerTaskFunc() scheduler.TaskFunc { + name := t.config.Name + if name == "" { + name = t.config.Type + } + + meta := common.MapStr{ + "monitor": common.MapStr{ + "name": name, + "type": t.config.Type, + }, + } + + return t.prepareSchedulerJob(meta, t.job.Run) +} + +// Start schedules this task for execution. +func (t *task) Start() { + var err error + t.client, err = t.monitor.pipeline.ConnectWith(beat.ClientConfig{ + EventMetadata: t.config.EventMetadata, + Processor: t.processors, + }) + if err != nil { + logp.Err("could not start monitor: %v", err) + return + } + + tf := t.makeSchedulerTaskFunc() + t.cancelFn, err = t.monitor.scheduler.Add(t.config.Schedule, t.job.Name(), tf) + if err != nil { + logp.Err("could not start monitor: %v, err") + } +} + +// Stop unschedules this task from execution. +func (t *task) Stop() { + if t.cancelFn != nil { + t.cancelFn() + } + if t.client != nil { + t.client.Close() + } +} diff --git a/heartbeat/monitors/util.go b/heartbeat/monitors/util.go index 1e16b28562d..79519dcf71e 100644 --- a/heartbeat/monitors/util.go +++ b/heartbeat/monitors/util.go @@ -29,9 +29,16 @@ import ( "github.com/elastic/beats/heartbeat/look" ) +// TaskRunner describes a runnable task. +// Note that these tasks can decompose and produce continuations, +// along the line of a java fork join pool. +type TaskRunner interface { + Run() (common.MapStr, []TaskRunner, error) +} + type funcJob struct { settings JobSettings - run JobRunner + run jobRunner } type funcTask struct { @@ -114,7 +121,7 @@ func MakeJob(settings JobSettings, f func() (common.MapStr, []TaskRunner, error) }, }) - return &funcJob{settings, func() (beat.Event, []JobRunner, error) { + return &funcJob{settings, func() (beat.Event, []jobRunner, error) { // Create and run new annotated Job whenever the Jobs root is Task is executed. // This will set the jobs active start timestamp to the time.Now(). return annotated(settings, time.Now(), f)() @@ -126,8 +133,8 @@ func annotated( settings JobSettings, start time.Time, fn func() (common.MapStr, []TaskRunner, error), -) JobRunner { - return func() (beat.Event, []JobRunner, error) { +) jobRunner { + return func() (beat.Event, []jobRunner, error) { var event beat.Event fields, cont, err := fn() @@ -156,7 +163,7 @@ func annotated( event.Fields = fields } - jobCont := make([]JobRunner, len(cont)) + jobCont := make([]jobRunner, len(cont)) for i, c := range cont { jobCont[i] = annotated(settings, start, c.Run) } @@ -179,7 +186,7 @@ func MakeSimpleCont(f func() (common.MapStr, error)) TaskRunner { }) } -// MakePingIPFactory creates a factory for building a Task from a new IP address. +// MakePingIPFactory creates a jobFactory for building a Task from a new IP address. func MakePingIPFactory( f func(*net.IPAddr) (common.MapStr, error), ) func(*net.IPAddr) TaskRunner { @@ -428,7 +435,7 @@ func withStart(field string, start time.Time, r TaskRunner) TaskRunner { func (f *funcJob) Name() string { return f.settings.Name } -func (f *funcJob) Run() (beat.Event, []JobRunner, error) { return f.run() } +func (f *funcJob) Run() (beat.Event, []jobRunner, error) { return f.run() } func (f funcTask) Run() (common.MapStr, []TaskRunner, error) { return f.run() } diff --git a/heartbeat/tests/system/config/heartbeat.yml.j2 b/heartbeat/tests/system/config/heartbeat.yml.j2 index 475ff930678..4407d65e0c7 100644 --- a/heartbeat/tests/system/config/heartbeat.yml.j2 +++ b/heartbeat/tests/system/config/heartbeat.yml.j2 @@ -33,6 +33,13 @@ heartbeat.monitors: {% endif %} {% endfor -%} +{% if reload or reload_path -%} +heartbeat.config.monitors: + path: {{ reload_path|default("${path.config}/monitors.d/*.yml") }} + reload.period: 1s + reload.enabled: {{ reload|default("false")}} +{% endif -%} + {%- if shipper_name %} name: {{ shipper_name }} {% endif %} diff --git a/heartbeat/tests/system/heartbeat.py b/heartbeat/tests/system/heartbeat.py index d4bb45f5508..8bf11f4c768 100644 --- a/heartbeat/tests/system/heartbeat.py +++ b/heartbeat/tests/system/heartbeat.py @@ -1,7 +1,10 @@ import os import sys +import BaseHTTPServer +import threading -sys.path.append(os.path.join(os.path.dirname(__file__), '../../../libbeat/tests/system')) +sys.path.append(os.path.join(os.path.dirname( + __file__), '../../../libbeat/tests/system')) from beat.beat import TestCase @@ -11,5 +14,21 @@ class BaseTest(TestCase): @classmethod def setUpClass(self): self.beat_name = "heartbeat" - self.beat_path = os.path.abspath(os.path.join(os.path.dirname(__file__), "../../")) + self.beat_path = os.path.abspath( + os.path.join(os.path.dirname(__file__), "../../")) super(BaseTest, self).setUpClass() + + def start_server(self, content, status_code): + class HTTPHandler(BaseHTTPServer.BaseHTTPRequestHandler): + def do_GET(self): + self.send_response(status_code) + self.send_header('Content-Type', 'application/json') + self.end_headers() + self.wfile.write(content) + + server = BaseHTTPServer.HTTPServer(('localhost', 8185), HTTPHandler) + + thread = threading.Thread(target=server.serve_forever) + thread.start() + + return server diff --git a/heartbeat/tests/system/test_monitor.py b/heartbeat/tests/system/test_monitor.py index c00e847d811..2aa6c8afe34 100644 --- a/heartbeat/tests/system/test_monitor.py +++ b/heartbeat/tests/system/test_monitor.py @@ -1,6 +1,4 @@ from heartbeat import BaseTest -import BaseHTTPServer -import threading from parameterized import parameterized import os from nose.plugins.skip import SkipTest @@ -74,18 +72,3 @@ def test_tcp(self, port, status): # Currently skipped on Windows as fields.yml not generated raise SkipTest self.assert_fields_are_documented(output[0]) - - def start_server(self, content, status_code): - class HTTPHandler(BaseHTTPServer.BaseHTTPRequestHandler): - def do_GET(self): - self.send_response(status_code) - self.send_header('Content-Type', 'application/json') - self.end_headers() - self.wfile.write(content) - - server = BaseHTTPServer.HTTPServer(('localhost', 8185), HTTPHandler) - - thread = threading.Thread(target=server.serve_forever) - thread.start() - - return server diff --git a/heartbeat/tests/system/test_reload.py b/heartbeat/tests/system/test_reload.py new file mode 100644 index 00000000000..c9af001451c --- /dev/null +++ b/heartbeat/tests/system/test_reload.py @@ -0,0 +1,121 @@ +from heartbeat import BaseTest +import nose.tools +import os + + +class Test(BaseTest): + def __init__(self, *args): + self.proc = None + super(Test, self).__init__(*args) + + def test_config_reload(self): + """ + Test a reload of a config + """ + server = self.start_server("hello world", 200) + try: + self.setup() + + cfg_file = "test.yml" + + self.write_dyn_config( + cfg_file, self.http_cfg("http://localhost:8185")) + + self.wait_until(lambda: self.output_has(lines=1)) + + self.assert_last_status("up") + + self.write_dyn_config( + cfg_file, self.http_cfg("http://localhost:8186")) + + self.wait_until(lambda: self.last_output_line()[ + "http.url"] == "http://localhost:8186") + + self.assert_last_status("down") + + self.proc.check_kill_and_wait() + finally: + server.shutdown() + + def test_config_remove(self): + """ + Test the removal of a dynamic config + """ + server = self.start_server("hello world", 200) + try: + self.setup() + + cfg_file = "test.yml" + + self.write_dyn_config( + cfg_file, self.http_cfg("http://localhost:8185")) + + self.wait_until(lambda: self.output_has(lines=2)) + + self.assert_last_status("up") + + os.remove(self.monitors_dir() + cfg_file) + + # Ensure the job was removed from the scheduler + self.wait_until(lambda: self.log_contains( + "Remove scheduler job 'http@http://localhost:8185")) + self.wait_until(lambda: self.log_contains( + "Job 'http@http://localhost:8185' returned")) + + self.proc.check_kill_and_wait() + finally: + server.shutdown() + + def test_config_add(self): + """ + Test the addition of a dynamic config + """ + self.setup() + + self.wait_until(lambda: self.log_contains( + "Starting reload procedure, current runners: 0")) + + server = self.start_server("hello world", 200) + try: + self.write_dyn_config( + "test.yml", self.http_cfg("http://localhost:8185")) + + self.wait_until(lambda: self.log_contains( + "Starting reload procedure, current runners: 1")) + + self.wait_until(lambda: self.output_has(lines=1)) + + self.proc.check_kill_and_wait() + finally: + server.shutdown() + + def setup(self): + os.mkdir(self.monitors_dir()) + self.render_config_template( + reload=True, + reload_path=self.monitors_dir() + "*.yml", + flush_min_events=1, + ) + + self.proc = self.start_beat() + + def write_dyn_config(self, filename, cfg): + with open(self.monitors_dir() + filename, 'w') as f: + f.write(cfg) + + def monitors_dir(self): + return self.working_dir + "/monitors.d" + + def assert_last_status(self, status): + nose.tools.eq_(self.last_output_line()["monitor.status"], status) + + def last_output_line(self): + return self.read_output()[-1] + + @staticmethod + def http_cfg(url): + return """ +- type: http + schedule: "@every 1s" + urls: ["{url}"] + """[1:-1].format(url=url) diff --git a/heartbeat/watcher/watch_config.go b/heartbeat/watcher/watch_config.go new file mode 100644 index 00000000000..3ebf128dc67 --- /dev/null +++ b/heartbeat/watcher/watch_config.go @@ -0,0 +1,32 @@ +// Licensed to Elasticsearch B.V. under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Elasticsearch B.V. licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package watcher + +import "time" + +var defaultFilePollInterval = 5 * time.Second + +type watchConfig struct { + Path string `config:"watch.poll_file.path"` + Poll time.Duration `config:"watch.poll_file.interval" validate:"min=1"` +} + +// DefaultWatchConfig is used to initialize watch config data. +var DefaultWatchConfig = watchConfig{ + Poll: defaultFilePollInterval, +} diff --git a/libbeat/cfgfile/list_test.go b/libbeat/cfgfile/list_test.go index bdddcf221b8..e4851e5d1cb 100644 --- a/libbeat/cfgfile/list_test.go +++ b/libbeat/cfgfile/list_test.go @@ -34,6 +34,10 @@ type runner struct { stopped bool } +func (r *runner) String() string { + return "test runner" +} + func (r *runner) Start() { r.started = true } func (r *runner) Stop() { r.stopped = true } diff --git a/libbeat/cfgfile/reload.go b/libbeat/cfgfile/reload.go index e4bc314b00e..a1624835217 100644 --- a/libbeat/cfgfile/reload.go +++ b/libbeat/cfgfile/reload.go @@ -18,6 +18,7 @@ package cfgfile import ( + "fmt" "path/filepath" "sync" "time" @@ -68,6 +69,13 @@ type RunnerFactory interface { } type Runner interface { + // We include fmt.Stringer here because we do log debug messages that must print + // something for the given Runner. We need Runner implementers to consciously implement a + // String() method because the default behavior of `%s` is to print everything recursively + // in a struct, which could cause a race that would cause the race detector to fail. + // This is something that could be anticipated for the Runner interface specifically, because + // most runners will use a goroutine that modifies internal state. + fmt.Stringer Start() Stop() } diff --git a/metricbeat/mb/module/runner.go b/metricbeat/mb/module/runner.go index 84f24dec62b..c60a009caea 100644 --- a/metricbeat/mb/module/runner.go +++ b/metricbeat/mb/module/runner.go @@ -37,6 +37,11 @@ func init() { // Runner is a facade for a Wrapper that provides a simple interface // for starting and stopping a Module. type Runner interface { + // fmt.Stringer is required here because when used as a cfgfile.Runner + // we need a way to print a threadsafe set of fields since it will likely + // be printed from a concurrent goroutine. + fmt.Stringer + // Start starts the Module. If Start is called more than once, only the // first will start the Module. Start()