Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Heartbeat] Refactor config system #23467

Merged
merged 45 commits into from
Feb 17, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
45 commits
Select commit Hold shift + click to select a range
6a0b645
[Uptime] Copy synthetics suites to tmpdir before running
andrewvc Jan 4, 2021
2c48c55
Add changelog
andrewvc Jan 4, 2021
1ab0fd6
Fix var naming
andrewvc Jan 4, 2021
168c6a0
Remote journeyps
andrewvc Jan 5, 2021
e94fe34
Checkpoint
andrewvc Jan 5, 2021
a6581ba
Checkpoint
andrewvc Jan 5, 2021
3ba5203
Checkpoint
andrewvc Jan 6, 2021
d5aacfe
Checkpoint
andrewvc Jan 6, 2021
3e47422
Suites as jobs
andrewvc Jan 8, 2021
f7971f4
Checkpoint
andrewvc Jan 8, 2021
fd4d03d
Checkpoint
andrewvc Jan 10, 2021
d27f695
Sort of works
andrewvc Jan 10, 2021
47320ce
Reorg
andrewvc Jan 12, 2021
7c05aa6
Simplification attempt
andrewvc Jan 12, 2021
367548c
checkpoint
andrewvc Jan 12, 2021
70f5281
Things work again
andrewvc Jan 13, 2021
3c66ed1
Switch from multiple plugin registration to aliases
andrewvc Jan 14, 2021
a96277f
Cleanup
andrewvc Jan 14, 2021
44628b0
Suites almost working, inline works
andrewvc Jan 15, 2021
75250bb
Checkpoint
andrewvc Jan 15, 2021
54eff79
npm i + beginnings of cleanup
andrewvc Jan 15, 2021
c57c81e
Improve structure of monitor plugins
andrewvc Jan 16, 2021
9b9f6b1
More updates
andrewvc Jan 16, 2021
1c10922
Add some basic validation to sources
andrewvc Jan 16, 2021
faac3c9
Test fixes
andrewvc Jan 16, 2021
100dead
Test fixes
andrewvc Jan 16, 2021
5c7d981
Improve monitor tests
andrewvc Jan 20, 2021
0956aff
Refactor wrappers/monitors for greater testability
andrewvc Jan 21, 2021
d587a67
Merge remote-tracking branch 'origin/master' into remote-journeys
andrewvc Jan 24, 2021
8554370
checkpoint
andrewvc Jan 25, 2021
8081b1c
Add tests for new wrapper behavior
andrewvc Jan 29, 2021
2332b13
Additional tests
andrewvc Jan 29, 2021
fde5765
Add basic validations for local source
andrewvc Jan 29, 2021
c82c155
Add tests for local source
andrewvc Jan 30, 2021
24ced05
Minimize fixtures
andrewvc Jan 30, 2021
95d695d
Merge remote-tracking branch 'origin/master' into remote-journeys
andrewvc Jan 30, 2021
ebcfb03
Update go.sum
andrewvc Jan 30, 2021
34469bf
Fix linter errors
andrewvc Jan 30, 2021
8ed3ad1
Fix local source behavior to remove node_modules
andrewvc Feb 2, 2021
4002755
Improve test coverage
andrewvc Feb 2, 2021
b7c8d19
Add changelog
andrewvc Feb 2, 2021
6e1a49b
Update changelog
andrewvc Feb 3, 2021
3c47a5c
Merge remote-tracking branch 'origin/master' into remote-journeys
andrewvc Feb 9, 2021
4d8c07b
Merge remote-tracking branch 'origin/master' into remote-journeys
andrewvc Feb 17, 2021
8248900
Incorporate PR feedback
andrewvc Feb 17, 2021
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions CHANGELOG.next.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,8 @@ https://github.com/elastic/beats/compare/v7.0.0-alpha2...master[Check the HEAD d
- Add `nodes` to filebeat-kubernetes.yaml ClusterRole. {issue}24051[24051] {pull}24052[24052]

*Heartbeat*
- Adds negative body match. {pull}20728[20728]
- Refactor synthetics configuration to new syntax. {pull}23467[23467]

*Journalbeat*

Expand Down
69 changes: 13 additions & 56 deletions heartbeat/beater/heartbeat.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@
package beater

import (
"context"
"fmt"
"time"

Expand Down Expand Up @@ -83,10 +82,11 @@ func New(b *beat.Beat, rawConfig *common.Config) (beat.Beater, error) {
func (bt *Heartbeat) Run(b *beat.Beat) error {
logp.Info("heartbeat is running! Hit CTRL-C to stop it.")

err := bt.RunStaticMonitors(b)
stopStaticMonitors, err := bt.RunStaticMonitors(b)
if err != nil {
return err
}
defer stopStaticMonitors()

if b.Manager.Enabled() {
bt.RunCentralMgmtMonitors(b)
Expand All @@ -102,13 +102,6 @@ func (bt *Heartbeat) Run(b *beat.Beat) error {
}
}

if len(bt.config.SyntheticSuites) > 0 {
err := bt.RunSyntheticSuiteMonitors(b)
if err != nil {
return err
}
}

if bt.config.Autodiscover != nil {
bt.autodiscover, err = bt.makeAutodiscover(b)
if err != nil {
Expand All @@ -131,22 +124,30 @@ func (bt *Heartbeat) Run(b *beat.Beat) error {
}

// RunStaticMonitors runs the `heartbeat.monitors` portion of the yaml config if present.
func (bt *Heartbeat) RunStaticMonitors(b *beat.Beat) error {
func (bt *Heartbeat) RunStaticMonitors(b *beat.Beat) (stop func(), err error) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this can be change to RunMonitors, seems that there is only one type of monitor at this level of the code.

factory := monitors.NewFactory(b.Info, bt.scheduler, true)

var runners []cfgfile.Runner
for _, cfg := range bt.config.Monitors {
created, err := factory.Create(b.Publisher, cfg)
if err != nil {
if err == stdfields.ErrPluginDisabled {
continue // don't stop loading monitors just because they're disabled
}

return errors.Wrap(err, "could not create monitor")
return nil, errors.Wrap(err, "could not create monitor")
}

created.Start()
runners = append(runners, created)
}
return nil

stop = func() {
for _, runner := range runners {
runner.Stop()
}
}
return stop, nil
}

// RunCentralMgmtMonitors loads any central management configured configs.
Expand All @@ -170,50 +171,6 @@ func (bt *Heartbeat) RunReloadableMonitors(b *beat.Beat) (err error) {
return nil
}

// Provide hook to define journey list discovery from x-pack
type JourneyLister func(ctx context.Context, suiteFile string, params common.MapStr) ([]string, error)

var mainJourneyLister JourneyLister

func RegisterJourneyLister(jl JourneyLister) {
mainJourneyLister = jl
}

func (bt *Heartbeat) RunSyntheticSuiteMonitors(b *beat.Beat) error {
// If we are running without XPack this will be nil
if mainJourneyLister == nil {
return nil
}
for _, suite := range bt.config.SyntheticSuites {
logp.Info("Listing suite %s", suite.Path)
journeyNames, err := mainJourneyLister(context.TODO(), suite.Path, suite.Params)
if err != nil {
return err
}
factory := monitors.NewFactory(b.Info, bt.scheduler, false)
for _, name := range journeyNames {
cfg, err := common.NewConfigFrom(map[string]interface{}{
"type": "browser",
"path": suite.Path,
"schedule": suite.Schedule,
"params": suite.Params,
"journey_name": name,
"name": name,
"id": name,
})
if err != nil {
return err
}
created, err := factory.Create(b.Publisher, cfg)
if err != nil {
return errors.Wrap(err, "could not create monitor")
}
created.Start()
}
}
return nil
}

// makeAutodiscover creates an autodiscover object ready to be started.
func (bt *Heartbeat) makeAutodiscover(b *beat.Beat) (*autodiscover.Autodiscover, error) {
autodiscover, err := autodiscover.NewAutodiscover(
Expand Down
9 changes: 1 addition & 8 deletions heartbeat/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ type Config struct {
ConfigMonitors *common.Config `config:"config.monitors"`
Scheduler Scheduler `config:"scheduler"`
Autodiscover *autodiscover.Config `config:"autodiscover"`
SyntheticSuites []*SyntheticSuite `config:"synthetic_suites"`
SyntheticSuites []*common.Config `config:"synthetic_suites"`
}

// Scheduler defines the syntax of a heartbeat.yml scheduler block.
Expand All @@ -41,12 +41,5 @@ type Scheduler struct {
Location string `config:"location"`
}

type SyntheticSuite struct {
Path string `config:"path"`
Name string `config:"id_prefix"`
Schedule string `config:"schedule"`
Params map[string]interface{} `config:"params"`
}

// DefaultConfig is the canonical instantiation of Config.
var DefaultConfig = Config{}
28 changes: 14 additions & 14 deletions heartbeat/monitors/active/http/http.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,8 @@ import (
"net/http"
"net/url"

"github.com/elastic/beats/v7/heartbeat/monitors"
"github.com/elastic/beats/v7/heartbeat/monitors/plugin"

"github.com/elastic/beats/v7/heartbeat/monitors/jobs"
"github.com/elastic/beats/v7/heartbeat/monitors/wrappers"
"github.com/elastic/beats/v7/libbeat/common"
Expand All @@ -32,8 +33,7 @@ import (
)

func init() {
monitors.RegisterActive("http", create)
monitors.RegisterActive("synthetics/http", create)
plugin.Register("http", create, "synthetics/http")
}

var debugf = logp.MakeDebug("http")
Expand All @@ -42,15 +42,15 @@ var debugf = logp.MakeDebug("http")
func create(
name string,
cfg *common.Config,
) (js []jobs.Job, endpoints int, err error) {
) (p plugin.Plugin, err error) {
config := defaultConfig
if err := cfg.Unpack(&config); err != nil {
return nil, 0, err
return plugin.Plugin{}, err
}

tls, err := tlscommon.LoadTLSConfig(config.TLS)
if err != nil {
return nil, 0, err
return plugin.Plugin{}, err
}

var body []byte
Expand All @@ -61,21 +61,21 @@ func create(
compression := config.Check.Request.Compression
enc, err = getContentEncoder(compression.Type, compression.Level)
if err != nil {
return nil, 0, err
return plugin.Plugin{}, err
}

buf := bytes.NewBuffer(nil)
err = enc.Encode(buf, bytes.NewBufferString(config.Check.Request.SendBody))
if err != nil {
return nil, 0, err
return plugin.Plugin{}, err
}

body = buf.Bytes()
}

validator, err := makeValidateResponse(&config.Check.Response)
if err != nil {
return nil, 0, err
return plugin.Plugin{}, err
}

// Determine whether we're using a proxy or not and then use that to figure out how to
Expand All @@ -87,7 +87,7 @@ func create(
if config.ProxyURL != "" || config.MaxRedirects > 0 {
transport, err := newRoundTripper(&config, tls)
if err != nil {
return nil, 0, err
return plugin.Plugin{}, err
}

makeJob = func(urlStr string) (jobs.Job, error) {
Expand All @@ -99,24 +99,24 @@ func create(
}
}

js = make([]jobs.Job, len(config.Hosts))
js := make([]jobs.Job, len(config.Hosts))
for i, urlStr := range config.Hosts {
u, _ := url.Parse(urlStr)
if err != nil {
return nil, 0, err
return plugin.Plugin{}, err
}

job, err := makeJob(urlStr)
if err != nil {
return nil, 0, err
return plugin.Plugin{}, err
}

// Assign any execution errors to the error field and
// assign the url field
js[i] = wrappers.WithURLField(u, job)
}

return js, len(config.Hosts), nil
return plugin.Plugin{Jobs: js, Close: nil, Endpoints: len(config.Hosts)}, nil
}

func newRoundTripper(config *Config, tls *tlscommon.TLSConfig) (*http.Transport, error) {
Expand Down
18 changes: 9 additions & 9 deletions heartbeat/monitors/active/http/http_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,17 +78,17 @@ func sendTLSRequest(t *testing.T, testURL string, useUrls bool, extraConfig map[
config, err := common.NewConfigFrom(configSrc)
require.NoError(t, err)

jobs, endpoints, err := create("tls", config)
p, err := create("tls", config)
require.NoError(t, err)

sched := schedule.MustParse("@every 1s")
job := wrappers.WrapCommon(jobs, stdfields.StdMonitorFields{ID: "tls", Type: "http", Schedule: sched, Timeout: 1})[0]
job := wrappers.WrapCommon(p.Jobs, stdfields.StdMonitorFields{ID: "tls", Type: "http", Schedule: sched, Timeout: 1})[0]

event := &beat.Event{}
_, err = job(event)
require.NoError(t, err)

require.Equal(t, 1, endpoints)
require.Equal(t, 1, p.Endpoints)

return event
}
Expand Down Expand Up @@ -318,11 +318,11 @@ func TestLargeResponse(t *testing.T) {
config, err := common.NewConfigFrom(configSrc)
require.NoError(t, err)

jobs, _, err := create("largeresp", config)
p, err := create("largeresp", config)
require.NoError(t, err)

sched, _ := schedule.Parse("@every 1s")
job := wrappers.WrapCommon(jobs, stdfields.StdMonitorFields{ID: "test", Type: "http", Schedule: sched, Timeout: 1})[0]
job := wrappers.WrapCommon(p.Jobs, stdfields.StdMonitorFields{ID: "test", Type: "http", Schedule: sched, Timeout: 1})[0]

event := &beat.Event{}
_, err = job(event)
Expand Down Expand Up @@ -532,11 +532,11 @@ func TestRedirect(t *testing.T) {
config, err := common.NewConfigFrom(configSrc)
require.NoError(t, err)

jobs, _, err := create("redirect", config)
p, err := create("redirect", config)
require.NoError(t, err)

sched, _ := schedule.Parse("@every 1s")
job := wrappers.WrapCommon(jobs, stdfields.StdMonitorFields{ID: "test", Type: "http", Schedule: sched, Timeout: 1})[0]
job := wrappers.WrapCommon(p.Jobs, stdfields.StdMonitorFields{ID: "test", Type: "http", Schedule: sched, Timeout: 1})[0]

// Run this test multiple times since in the past we had an issue where the redirects
// list was added onto by each request. See https://github.com/elastic/beats/pull/15944
Expand Down Expand Up @@ -579,11 +579,11 @@ func TestNoHeaders(t *testing.T) {
config, err := common.NewConfigFrom(configSrc)
require.NoError(t, err)

jobs, _, err := create("http", config)
p, err := create("http", config)
require.NoError(t, err)

sched, _ := schedule.Parse("@every 1s")
job := wrappers.WrapCommon(jobs, stdfields.StdMonitorFields{ID: "test", Type: "http", Schedule: sched, Timeout: 1})[0]
job := wrappers.WrapCommon(p.Jobs, stdfields.StdMonitorFields{ID: "test", Type: "http", Schedule: sched, Timeout: 1})[0]

event := &beat.Event{}
_, err = job(event)
Expand Down
26 changes: 14 additions & 12 deletions heartbeat/monitors/active/icmp/icmp.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@ import (
"net"
"net/url"

"github.com/elastic/beats/v7/heartbeat/monitors/plugin"

"github.com/elastic/beats/v7/heartbeat/eventext"
"github.com/elastic/beats/v7/heartbeat/look"
"github.com/elastic/beats/v7/heartbeat/monitors"
Expand All @@ -35,30 +37,29 @@ import (
var debugf = logp.MakeDebug("icmp")

func init() {
monitors.RegisterActive("icmp", create)
monitors.RegisterActive("synthetics/icmp", create)
plugin.Register("icmp", create, "synthetics/icmp")
}

func create(
name string,
commonConfig *common.Config,
) (jobs []jobs.Job, endpoints int, err error) {
) (p plugin.Plugin, err error) {
loop, err := getStdLoop()
if err != nil {
logp.Warn("Failed to initialize ICMP loop %v", err)
return nil, 0, err
return plugin.Plugin{}, err
}

config := DefaultConfig
if err := commonConfig.Unpack(&config); err != nil {
return nil, 0, err
return plugin.Plugin{}, err
}

jf, err := newJobFactory(config, monitors.NewStdResolver(), loop)
if err != nil {
return nil, 0, err
return plugin.Plugin{}, err
}
return jf.makeJobs()
return jf.makePlugin()

}

Expand Down Expand Up @@ -89,29 +90,30 @@ func (jf *jobFactory) checkConfig() error {
return nil
}

func (jf *jobFactory) makeJobs() (j []jobs.Job, endpoints int, err error) {
func (jf *jobFactory) makePlugin() (plugin2 plugin.Plugin, err error) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why the names returned parameters? Seems that the code always defines the return in the statement of the function.

if err := jf.loop.checkNetworkMode(jf.ipVersion); err != nil {
return nil, 0, err
return plugin.Plugin{}, err
}

pingFactory := jf.pingIPFactory(&jf.config)

var j []jobs.Job
for _, host := range jf.config.Hosts {
job, err := monitors.MakeByHostJob(host, jf.config.Mode, monitors.NewStdResolver(), pingFactory)

if err != nil {
return nil, 0, err
return plugin.Plugin{}, err
}

u, err := url.Parse(fmt.Sprintf("icmp://%s", host))
if err != nil {
return nil, 0, err
return plugin.Plugin{}, err
}

j = append(j, wrappers.WithURLField(u, job))
}

return j, len(jf.config.Hosts), nil
return plugin.Plugin{Jobs: j, Close: nil, Endpoints: len(jf.config.Hosts)}, nil
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Instead of having the caller know if Close is set, it would be better to always set it (to an empty function).

Or add a Close() function directly on the plugin that does the checking and calling of the actual function.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will do in a follow-up!

}

func (jf *jobFactory) pingIPFactory(config *Config) func(*net.IPAddr) jobs.Job {
Expand Down
Loading