Skip to content

Commit

Permalink
[Heartbeat] Refactor config system (elastic#23467)
Browse files Browse the repository at this point in the history
* Refactors the config for synthetics suites to act as regular monitors under `heartbeat.monitors` rather than have the new top-level `synthetic.suites` syntax we've been using so far.
* Changes the behavior of local suites to copy their data into the container rather than run directly off the shared docker volume
* Adds a first-class notion of aliasing to monitor types, so that `synthetics/http` monitors show up as `http`, and not as a distinct type (applies to all monitor types)
* Simplifies the types associated with monitor plugins into a new `plugin.Plugin{}` struct, rather than passing around multiple values everywhere.
* See https://github.com/elastic/beats/pull/23467/files#diff-7f1e5387c4757cca1e98483a5678e377a28ca5f9d77b267a4121a14249c96b82R7 for an example of the new config syntax

#### More on the change to copying local tests into the container

The rationale here is that doing so resolves any file permissions issues that may be present due to the suite directory being shared to the container as read only OR due to incompatible UIDs between the docker container and the host.

Fixes elastic/synthetics#156
Fixes elastic#23823

As a note, no tests are added here due to the complexity of testing this small amount of I/O functionality, however, any issues should be caught by our E2E tests in https://github.com/elastic/synthetics/tree/master/__tests__/e2e . I've opened an issue to improve this situation here: elastic#23346

(cherry picked from commit fb25ded)
  • Loading branch information
andrewvc committed Feb 17, 2021
1 parent f32bf19 commit efbfb13
Show file tree
Hide file tree
Showing 41 changed files with 1,354 additions and 450 deletions.
2 changes: 2 additions & 0 deletions CHANGELOG.next.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,8 @@ https://github.com/elastic/beats/compare/v7.0.0-alpha2...master[Check the HEAD d
- Rename `s3` input to `aws-s3` input. {pull}23469[23469]

*Heartbeat*
- Adds negative body match. {pull}20728[20728]
- Refactor synthetics configuration to new syntax. {pull}23467[23467]

*Journalbeat*

Expand Down
69 changes: 13 additions & 56 deletions heartbeat/beater/heartbeat.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@
package beater

import (
"context"
"fmt"
"time"

Expand Down Expand Up @@ -83,10 +82,11 @@ func New(b *beat.Beat, rawConfig *common.Config) (beat.Beater, error) {
func (bt *Heartbeat) Run(b *beat.Beat) error {
logp.Info("heartbeat is running! Hit CTRL-C to stop it.")

err := bt.RunStaticMonitors(b)
stopStaticMonitors, err := bt.RunStaticMonitors(b)
if err != nil {
return err
}
defer stopStaticMonitors()

if b.Manager.Enabled() {
bt.RunCentralMgmtMonitors(b)
Expand All @@ -102,13 +102,6 @@ func (bt *Heartbeat) Run(b *beat.Beat) error {
}
}

if len(bt.config.SyntheticSuites) > 0 {
err := bt.RunSyntheticSuiteMonitors(b)
if err != nil {
return err
}
}

if bt.config.Autodiscover != nil {
bt.autodiscover, err = bt.makeAutodiscover(b)
if err != nil {
Expand All @@ -131,22 +124,30 @@ func (bt *Heartbeat) Run(b *beat.Beat) error {
}

// RunStaticMonitors runs the `heartbeat.monitors` portion of the yaml config if present.
func (bt *Heartbeat) RunStaticMonitors(b *beat.Beat) error {
func (bt *Heartbeat) RunStaticMonitors(b *beat.Beat) (stop func(), err error) {
factory := monitors.NewFactory(b.Info, bt.scheduler, true)

var runners []cfgfile.Runner
for _, cfg := range bt.config.Monitors {
created, err := factory.Create(b.Publisher, cfg)
if err != nil {
if err == stdfields.ErrPluginDisabled {
continue // don't stop loading monitors just because they're disabled
}

return errors.Wrap(err, "could not create monitor")
return nil, errors.Wrap(err, "could not create monitor")
}

created.Start()
runners = append(runners, created)
}
return nil

stop = func() {
for _, runner := range runners {
runner.Stop()
}
}
return stop, nil
}

// RunCentralMgmtMonitors loads any central management configured configs.
Expand All @@ -170,50 +171,6 @@ func (bt *Heartbeat) RunReloadableMonitors(b *beat.Beat) (err error) {
return nil
}

// Provide hook to define journey list discovery from x-pack
type JourneyLister func(ctx context.Context, suiteFile string, params common.MapStr) ([]string, error)

var mainJourneyLister JourneyLister

func RegisterJourneyLister(jl JourneyLister) {
mainJourneyLister = jl
}

func (bt *Heartbeat) RunSyntheticSuiteMonitors(b *beat.Beat) error {
// If we are running without XPack this will be nil
if mainJourneyLister == nil {
return nil
}
for _, suite := range bt.config.SyntheticSuites {
logp.Info("Listing suite %s", suite.Path)
journeyNames, err := mainJourneyLister(context.TODO(), suite.Path, suite.Params)
if err != nil {
return err
}
factory := monitors.NewFactory(b.Info, bt.scheduler, false)
for _, name := range journeyNames {
cfg, err := common.NewConfigFrom(map[string]interface{}{
"type": "browser",
"path": suite.Path,
"schedule": suite.Schedule,
"params": suite.Params,
"journey_name": name,
"name": name,
"id": name,
})
if err != nil {
return err
}
created, err := factory.Create(b.Publisher, cfg)
if err != nil {
return errors.Wrap(err, "could not create monitor")
}
created.Start()
}
}
return nil
}

// makeAutodiscover creates an autodiscover object ready to be started.
func (bt *Heartbeat) makeAutodiscover(b *beat.Beat) (*autodiscover.Autodiscover, error) {
autodiscover, err := autodiscover.NewAutodiscover(
Expand Down
9 changes: 1 addition & 8 deletions heartbeat/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ type Config struct {
ConfigMonitors *common.Config `config:"config.monitors"`
Scheduler Scheduler `config:"scheduler"`
Autodiscover *autodiscover.Config `config:"autodiscover"`
SyntheticSuites []*SyntheticSuite `config:"synthetic_suites"`
SyntheticSuites []*common.Config `config:"synthetic_suites"`
}

// Scheduler defines the syntax of a heartbeat.yml scheduler block.
Expand All @@ -41,12 +41,5 @@ type Scheduler struct {
Location string `config:"location"`
}

type SyntheticSuite struct {
Path string `config:"path"`
Name string `config:"id_prefix"`
Schedule string `config:"schedule"`
Params map[string]interface{} `config:"params"`
}

// DefaultConfig is the canonical instantiation of Config.
var DefaultConfig = Config{}
28 changes: 14 additions & 14 deletions heartbeat/monitors/active/http/http.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,8 @@ import (
"net/http"
"net/url"

"github.com/elastic/beats/v7/heartbeat/monitors"
"github.com/elastic/beats/v7/heartbeat/monitors/plugin"

"github.com/elastic/beats/v7/heartbeat/monitors/jobs"
"github.com/elastic/beats/v7/heartbeat/monitors/wrappers"
"github.com/elastic/beats/v7/libbeat/common"
Expand All @@ -32,8 +33,7 @@ import (
)

func init() {
monitors.RegisterActive("http", create)
monitors.RegisterActive("synthetics/http", create)
plugin.Register("http", create, "synthetics/http")
}

var debugf = logp.MakeDebug("http")
Expand All @@ -42,15 +42,15 @@ var debugf = logp.MakeDebug("http")
func create(
name string,
cfg *common.Config,
) (js []jobs.Job, endpoints int, err error) {
) (p plugin.Plugin, err error) {
config := defaultConfig
if err := cfg.Unpack(&config); err != nil {
return nil, 0, err
return plugin.Plugin{}, err
}

tls, err := tlscommon.LoadTLSConfig(config.TLS)
if err != nil {
return nil, 0, err
return plugin.Plugin{}, err
}

var body []byte
Expand All @@ -61,21 +61,21 @@ func create(
compression := config.Check.Request.Compression
enc, err = getContentEncoder(compression.Type, compression.Level)
if err != nil {
return nil, 0, err
return plugin.Plugin{}, err
}

buf := bytes.NewBuffer(nil)
err = enc.Encode(buf, bytes.NewBufferString(config.Check.Request.SendBody))
if err != nil {
return nil, 0, err
return plugin.Plugin{}, err
}

body = buf.Bytes()
}

validator, err := makeValidateResponse(&config.Check.Response)
if err != nil {
return nil, 0, err
return plugin.Plugin{}, err
}

// Determine whether we're using a proxy or not and then use that to figure out how to
Expand All @@ -87,7 +87,7 @@ func create(
if config.ProxyURL != "" || config.MaxRedirects > 0 {
transport, err := newRoundTripper(&config, tls)
if err != nil {
return nil, 0, err
return plugin.Plugin{}, err
}

makeJob = func(urlStr string) (jobs.Job, error) {
Expand All @@ -99,24 +99,24 @@ func create(
}
}

js = make([]jobs.Job, len(config.Hosts))
js := make([]jobs.Job, len(config.Hosts))
for i, urlStr := range config.Hosts {
u, _ := url.Parse(urlStr)
if err != nil {
return nil, 0, err
return plugin.Plugin{}, err
}

job, err := makeJob(urlStr)
if err != nil {
return nil, 0, err
return plugin.Plugin{}, err
}

// Assign any execution errors to the error field and
// assign the url field
js[i] = wrappers.WithURLField(u, job)
}

return js, len(config.Hosts), nil
return plugin.Plugin{Jobs: js, Close: nil, Endpoints: len(config.Hosts)}, nil
}

func newRoundTripper(config *Config, tls *tlscommon.TLSConfig) (*http.Transport, error) {
Expand Down
18 changes: 9 additions & 9 deletions heartbeat/monitors/active/http/http_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,17 +78,17 @@ func sendTLSRequest(t *testing.T, testURL string, useUrls bool, extraConfig map[
config, err := common.NewConfigFrom(configSrc)
require.NoError(t, err)

jobs, endpoints, err := create("tls", config)
p, err := create("tls", config)
require.NoError(t, err)

sched := schedule.MustParse("@every 1s")
job := wrappers.WrapCommon(jobs, stdfields.StdMonitorFields{ID: "tls", Type: "http", Schedule: sched, Timeout: 1})[0]
job := wrappers.WrapCommon(p.Jobs, stdfields.StdMonitorFields{ID: "tls", Type: "http", Schedule: sched, Timeout: 1})[0]

event := &beat.Event{}
_, err = job(event)
require.NoError(t, err)

require.Equal(t, 1, endpoints)
require.Equal(t, 1, p.Endpoints)

return event
}
Expand Down Expand Up @@ -318,11 +318,11 @@ func TestLargeResponse(t *testing.T) {
config, err := common.NewConfigFrom(configSrc)
require.NoError(t, err)

jobs, _, err := create("largeresp", config)
p, err := create("largeresp", config)
require.NoError(t, err)

sched, _ := schedule.Parse("@every 1s")
job := wrappers.WrapCommon(jobs, stdfields.StdMonitorFields{ID: "test", Type: "http", Schedule: sched, Timeout: 1})[0]
job := wrappers.WrapCommon(p.Jobs, stdfields.StdMonitorFields{ID: "test", Type: "http", Schedule: sched, Timeout: 1})[0]

event := &beat.Event{}
_, err = job(event)
Expand Down Expand Up @@ -532,11 +532,11 @@ func TestRedirect(t *testing.T) {
config, err := common.NewConfigFrom(configSrc)
require.NoError(t, err)

jobs, _, err := create("redirect", config)
p, err := create("redirect", config)
require.NoError(t, err)

sched, _ := schedule.Parse("@every 1s")
job := wrappers.WrapCommon(jobs, stdfields.StdMonitorFields{ID: "test", Type: "http", Schedule: sched, Timeout: 1})[0]
job := wrappers.WrapCommon(p.Jobs, stdfields.StdMonitorFields{ID: "test", Type: "http", Schedule: sched, Timeout: 1})[0]

// Run this test multiple times since in the past we had an issue where the redirects
// list was added onto by each request. See https://github.com/elastic/beats/pull/15944
Expand Down Expand Up @@ -579,11 +579,11 @@ func TestNoHeaders(t *testing.T) {
config, err := common.NewConfigFrom(configSrc)
require.NoError(t, err)

jobs, _, err := create("http", config)
p, err := create("http", config)
require.NoError(t, err)

sched, _ := schedule.Parse("@every 1s")
job := wrappers.WrapCommon(jobs, stdfields.StdMonitorFields{ID: "test", Type: "http", Schedule: sched, Timeout: 1})[0]
job := wrappers.WrapCommon(p.Jobs, stdfields.StdMonitorFields{ID: "test", Type: "http", Schedule: sched, Timeout: 1})[0]

event := &beat.Event{}
_, err = job(event)
Expand Down
26 changes: 14 additions & 12 deletions heartbeat/monitors/active/icmp/icmp.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@ import (
"net"
"net/url"

"github.com/elastic/beats/v7/heartbeat/monitors/plugin"

"github.com/elastic/beats/v7/heartbeat/eventext"
"github.com/elastic/beats/v7/heartbeat/look"
"github.com/elastic/beats/v7/heartbeat/monitors"
Expand All @@ -35,30 +37,29 @@ import (
var debugf = logp.MakeDebug("icmp")

func init() {
monitors.RegisterActive("icmp", create)
monitors.RegisterActive("synthetics/icmp", create)
plugin.Register("icmp", create, "synthetics/icmp")
}

func create(
name string,
commonConfig *common.Config,
) (jobs []jobs.Job, endpoints int, err error) {
) (p plugin.Plugin, err error) {
loop, err := getStdLoop()
if err != nil {
logp.Warn("Failed to initialize ICMP loop %v", err)
return nil, 0, err
return plugin.Plugin{}, err
}

config := DefaultConfig
if err := commonConfig.Unpack(&config); err != nil {
return nil, 0, err
return plugin.Plugin{}, err
}

jf, err := newJobFactory(config, monitors.NewStdResolver(), loop)
if err != nil {
return nil, 0, err
return plugin.Plugin{}, err
}
return jf.makeJobs()
return jf.makePlugin()

}

Expand Down Expand Up @@ -89,29 +90,30 @@ func (jf *jobFactory) checkConfig() error {
return nil
}

func (jf *jobFactory) makeJobs() (j []jobs.Job, endpoints int, err error) {
func (jf *jobFactory) makePlugin() (plugin2 plugin.Plugin, err error) {
if err := jf.loop.checkNetworkMode(jf.ipVersion); err != nil {
return nil, 0, err
return plugin.Plugin{}, err
}

pingFactory := jf.pingIPFactory(&jf.config)

var j []jobs.Job
for _, host := range jf.config.Hosts {
job, err := monitors.MakeByHostJob(host, jf.config.Mode, monitors.NewStdResolver(), pingFactory)

if err != nil {
return nil, 0, err
return plugin.Plugin{}, err
}

u, err := url.Parse(fmt.Sprintf("icmp://%s", host))
if err != nil {
return nil, 0, err
return plugin.Plugin{}, err
}

j = append(j, wrappers.WithURLField(u, job))
}

return j, len(jf.config.Hosts), nil
return plugin.Plugin{Jobs: j, Close: nil, Endpoints: len(jf.config.Hosts)}, nil
}

func (jf *jobFactory) pingIPFactory(config *Config) func(*net.IPAddr) jobs.Job {
Expand Down
Loading

0 comments on commit efbfb13

Please sign in to comment.