From 231da37819a4787690dc75c2136843e78a4c34d9 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Emilio=20Alvarez=20Pi=C3=B1eiro?= <95703246+emilioalvap@users.noreply.github.com> Date: Fri, 4 Oct 2024 18:09:28 +0200 Subject: [PATCH] [Heartbeat] Add managed status reporter at monitor factory level (#41077) * [Heartbeat] Add status reporting for monitors when running under elastic-agent (cherry picked from commit c70d2d8ce0c990de16528fcfae812a44a23ba435) --- CHANGELOG.next.asciidoc | 3 ++ heartbeat/monitors/monitor.go | 18 +++++++++ heartbeat/monitors/monitor_test.go | 61 ++++++++++++++++++++++++++++++ 3 files changed, 82 insertions(+) diff --git a/CHANGELOG.next.asciidoc b/CHANGELOG.next.asciidoc index 05a1e91cdbc..09cf1937f84 100644 --- a/CHANGELOG.next.asciidoc +++ b/CHANGELOG.next.asciidoc @@ -190,6 +190,9 @@ https://github.com/elastic/beats/compare/v8.8.1\...main[Check the HEAD diff] *Heartbeat* - Added status to monitor run log report. +- Upgrade node to latest LTS v18.20.3. {pull}40038[40038] +- Add journey duration to synthetics browser events. {pull}40230[40230] +- Add monitor status reporter under managed mode. {pull}41077[41077] *Metricbeat* diff --git a/heartbeat/monitors/monitor.go b/heartbeat/monitors/monitor.go index 29e7713145c..79390a60ef0 100644 --- a/heartbeat/monitors/monitor.go +++ b/heartbeat/monitors/monitor.go @@ -34,6 +34,7 @@ import ( "github.com/elastic/beats/v7/heartbeat/monitors/wrappers" "github.com/elastic/beats/v7/heartbeat/scheduler" "github.com/elastic/beats/v7/libbeat/beat" + "github.com/elastic/beats/v7/libbeat/management/status" ) // ErrMonitorDisabled is returned when the monitor plugin is marked as disabled. @@ -71,6 +72,12 @@ type Monitor struct { stats plugin.RegistryRecorder monitorStateTracker *monitorstate.Tracker + statusReporter status.StatusReporter +} + +// SetStatusReporter +func (m *Monitor) SetStatusReporter(statusReporter status.StatusReporter) { + m.statusReporter = statusReporter } // String prints a description of the monitor in a threadsafe way. It is important that this use threadsafe @@ -175,6 +182,9 @@ func newMonitorUnsafe( logp.L().Error(fullErr) p.Jobs = []jobs.Job{func(event *beat.Event) ([]jobs.Job, error) { + // if statusReporter is set, as it is for running managed-mode, update the input status + // to failed, specifying the error + m.updateStatus(status.Failed, fmt.Sprintf("monitor could not be started: %s, err: %s", m.stdFields.ID, fullErr)) return nil, fullErr }} @@ -237,6 +247,7 @@ func (m *Monitor) Start() { m.stats.StartMonitor(int64(m.endpoints)) m.state = MON_STARTED + m.updateStatus(status.Running, "") } // Stop stops the monitor without freeing it in global dedup @@ -262,4 +273,11 @@ func (m *Monitor) Stop() { m.stats.StopMonitor(int64(m.endpoints)) m.state = MON_STOPPED + m.updateStatus(status.Stopped, "") +} + +func (m *Monitor) updateStatus(status status.Status, msg string) { + if m.statusReporter != nil { + m.statusReporter.UpdateStatus(status, msg) + } } diff --git a/heartbeat/monitors/monitor_test.go b/heartbeat/monitors/monitor_test.go index 3176d27a2fa..1ea3c52d6ca 100644 --- a/heartbeat/monitors/monitor_test.go +++ b/heartbeat/monitors/monitor_test.go @@ -18,12 +18,14 @@ package monitors import ( + "fmt" "testing" "time" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" + "github.com/elastic/elastic-agent-libs/config" conf "github.com/elastic/elastic-agent-libs/config" "github.com/elastic/elastic-agent-libs/mapstr" "github.com/elastic/elastic-agent-libs/monitoring" @@ -32,7 +34,9 @@ import ( "github.com/elastic/go-lookslike/testslike" "github.com/elastic/go-lookslike/validator" + "github.com/elastic/beats/v7/heartbeat/monitors/plugin" "github.com/elastic/beats/v7/heartbeat/scheduler" + "github.com/elastic/beats/v7/libbeat/management/status" ) // TestMonitorBasic tests a basic config @@ -131,3 +135,60 @@ func TestCheckInvalidConfig(t *testing.T) { require.Error(t, checkMonitorConfig(serverMonConf, reg)) } + +type MockStatusReporter struct { + us func(status status.Status, msg string) +} + +func (sr *MockStatusReporter) UpdateStatus(status status.Status, msg string) { + sr.us(status, msg) +} + +func TestStatusReporter(t *testing.T) { + confMap := map[string]interface{}{ + "type": "fail", + "urls": []string{"http://example.net"}, + "schedule": "@every 1ms", + "name": "myName", + "id": "myId", + } + conf, err := config.NewConfigFrom(confMap) + require.NoError(t, err) + + reg, _, _ := mockPluginsReg() + pipel := &MockPipeline{} + monReg := monitoring.NewRegistry() + + mockDegradedPluginFactory := plugin.PluginFactory{ + Name: "fail", + Aliases: []string{"failAlias"}, + Make: func(s string, config *config.C) (plugin.Plugin, error) { + return plugin.Plugin{}, fmt.Errorf("error plugin") + }, + Stats: plugin.NewPluginCountersRecorder("fail", monReg), + } + reg.Add(mockDegradedPluginFactory) + + sched := scheduler.Create(1, monitoring.NewRegistry(), time.Local, nil, true) + defer sched.Stop() + + c, err := pipel.Connect() + require.NoError(t, err) + m, err := newMonitor(conf, reg, c, sched.Add, nil, nil) + require.NoError(t, err) + + // Track status marked as failed during run_once execution + var failed bool = false + m.SetStatusReporter(&MockStatusReporter{ + us: func(s status.Status, msg string) { + if s == status.Failed { + failed = true + } + }, + }) + m.Start() + + sched.WaitForRunOnce() + + require.True(t, failed) +}