Skip to content

Commit

Permalink
Heartbeat Automatic Reload (elastic#8023)
Browse files Browse the repository at this point in the history
Add automatic reloading for heartbeat config files.

This deprecates the `watch.poll_file` options.

This patch also fixes a potential source of races in code using `cfgfile/Runner` by making that interface implement `Stringer`, the reason being that by default `cfgfile/Runner` can recursively print the backing structure, which can trigger a race.
  • Loading branch information
andrewvc authored Sep 14, 2018
1 parent 457a784 commit 037a4f2
Show file tree
Hide file tree
Showing 28 changed files with 1,223 additions and 556 deletions.
4 changes: 4 additions & 0 deletions CHANGELOG.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -136,6 +136,9 @@ https://github.com/elastic/beats/compare/v6.4.0...master[Check the HEAD diff]

*Winlogbeat*

*Heartbeat*

- Add automatic config file reloading. {pull}8023[8023]

==== Deprecated

Expand All @@ -144,6 +147,7 @@ https://github.com/elastic/beats/compare/v6.4.0...master[Check the HEAD diff]
*Filebeat*

*Heartbeat*
- watch.poll_file is now deprecated and superceded by automatic config file reloading.

*Metricbeat*
- Redis `info` `replication.master_offset` has been deprecated in favor of `replication.master.offset`.{pull}7695[7695]
Expand Down
45 changes: 29 additions & 16 deletions heartbeat/_meta/beat.reference.yml
Original file line number Diff line number Diff line change
Expand Up @@ -32,14 +32,6 @@ heartbeat.monitors:
ipv6: true
mode: any

# Configure file json file to be watched for changes to the monitor:
#watch.poll_file:
# Path to check for updates.
#path:

# Interval between file file changed checks.
#interval: 5s

# Total running time per ping test.
timeout: 16s

Expand All @@ -62,6 +54,15 @@ heartbeat.monitors:
# sub-dictionary. Default is false.
#fields_under_root: false

# NOTE: THIS FEATURE IS DEPRECATED AND WILL BE REMOVED IN A FUTURE RELEASE
# Configure file json file to be watched for changes to the monitor:
#watch.poll_file:
# Path to check for updates.
#path:

# Interval between file file changed checks.
#interval: 5s

- type: tcp # monitor type `tcp`. Connect via TCP and optionally verify endpoint
# by sending/receiving a custom payload

Expand Down Expand Up @@ -98,14 +99,6 @@ heartbeat.monitors:
ipv6: true
mode: any

# Configure file json file to be watched for changes to the monitor:
#watch.poll_file:
# Path to check for updates.
#path:

# Interval between file file changed checks.
#interval: 5s

# List of ports to ping if host does not contain a port number
# ports: [80, 9200, 5044]

Expand Down Expand Up @@ -135,6 +128,16 @@ heartbeat.monitors:
# Required TLS protocols
#supported_protocols: ["TLSv1.0", "TLSv1.1", "TLSv1.2"]

# NOTE: THIS FEATURE IS DEPRECATED AND WILL BE REMOVED IN A FUTURE RELEASE
# Configure file json file to be watched for changes to the monitor:
#watch.poll_file:
# Path to check for updates.
#path:

# Interval between file file changed checks.
#interval: 5s


- type: http # monitor type `http`. Connect via HTTP an optionally verify response

# Monitor name used for job name and document type
Expand Down Expand Up @@ -205,6 +208,16 @@ heartbeat.monitors:
# Required response contents.
#body:

# NOTE: THIS FEATURE IS DEPRECATED AND WILL BE REMOVED IN A FUTURE RELEASE
# Configure file json file to be watched for changes to the monitor:
#watch.poll_file:
# Path to check for updates.
#path:

# Interval between file file changed checks.
#interval: 5s


heartbeat.scheduler:
# Limit number of concurrent tasks executed by heartbeat. The task limit if
# disabled if set to 0. The default is 0.
Expand Down
85 changes: 65 additions & 20 deletions heartbeat/beater/heartbeat.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,33 +21,38 @@ import (
"fmt"
"time"

"github.com/elastic/beats/libbeat/beat"
"github.com/elastic/beats/libbeat/common"
"github.com/elastic/beats/libbeat/logp"
"github.com/pkg/errors"

"github.com/elastic/beats/heartbeat/config"
"github.com/elastic/beats/heartbeat/monitors"
"github.com/elastic/beats/heartbeat/scheduler"
"github.com/elastic/beats/libbeat/beat"
"github.com/elastic/beats/libbeat/cfgfile"
"github.com/elastic/beats/libbeat/common"
"github.com/elastic/beats/libbeat/common/cfgwarn"
"github.com/elastic/beats/libbeat/logp"
)

// Heartbeat represents the root datastructure of this beat.
type Heartbeat struct {
done chan struct{}

scheduler *scheduler.Scheduler
manager *monitorManager
// config is used for iterating over elements of the config.
config config.Config
scheduler *scheduler.Scheduler
monitorReloader *cfgfile.Reloader
}

func New(b *beat.Beat, cfg *common.Config) (beat.Beater, error) {
// New creates a new heartbeat.
func New(b *beat.Beat, rawConfig *common.Config) (beat.Beater, error) {
cfgwarn.Beta("Heartbeat is beta software")

config := config.DefaultConfig
if err := cfg.Unpack(&config); err != nil {
parsedConfig := config.DefaultConfig
if err := rawConfig.Unpack(&parsedConfig); err != nil {
return nil, fmt.Errorf("Error reading config file: %v", err)
}

limit := config.Scheduler.Limit
locationName := config.Scheduler.Location
limit := parsedConfig.Scheduler.Limit
locationName := parsedConfig.Scheduler.Location
if locationName == "" {
locationName = "Local"
}
Expand All @@ -56,36 +61,76 @@ func New(b *beat.Beat, cfg *common.Config) (beat.Beater, error) {
return nil, err
}

sched := scheduler.NewWithLocation(limit, location)
manager, err := newMonitorManager(b.Publisher, sched, monitors.Registry, config.Monitors)
if err != nil {
return nil, err
}
scheduler := scheduler.NewWithLocation(limit, location)

bt := &Heartbeat{
done: make(chan struct{}),
scheduler: sched,
manager: manager,
config: parsedConfig,
scheduler: scheduler,
}
return bt, nil
}

// Run executes the beat.
func (bt *Heartbeat) Run(b *beat.Beat) error {
logp.Info("heartbeat is running! Hit CTRL-C to stop it.")

err := bt.RunStaticMonitors(b)
if err != nil {
return err
}

if bt.config.ConfigMonitors.Enabled() {
bt.monitorReloader = cfgfile.NewReloader(b.Publisher, bt.config.ConfigMonitors)
defer bt.monitorReloader.Stop()

err := bt.RunDynamicMonitors(b)
if err != nil {
return err
}
}

if err := bt.scheduler.Start(); err != nil {
return err
}
defer bt.scheduler.Stop()

<-bt.done

bt.manager.Stop()

logp.Info("Shutting down.")
return nil
}

// RunStaticMonitors runs the `heartbeat.monitors` portion of the yaml config if present.
func (bt *Heartbeat) RunStaticMonitors(b *beat.Beat) error {
factory := monitors.NewFactory(bt.scheduler, true)

for _, cfg := range bt.config.Monitors {
created, err := factory.Create(b.Publisher, cfg, nil)
if err != nil {
return errors.Wrap(err, "could not create monitor")
}
created.Start()
}
return nil
}

// RunDynamicMonitors runs the `heartbeat.config.monitors` portion of the yaml config if present.
func (bt *Heartbeat) RunDynamicMonitors(b *beat.Beat) (err error) {
factory := monitors.NewFactory(bt.scheduler, false)

// Check monitor configs
if err := bt.monitorReloader.Check(factory); err != nil {
return err
}

// Execute the monitor
go bt.monitorReloader.Run(factory)

return nil
}

// Stop stops the beat.
func (bt *Heartbeat) Stop() {
close(bt.done)
}
Loading

0 comments on commit 037a4f2

Please sign in to comment.