diff --git a/CHANGELOG.next.asciidoc b/CHANGELOG.next.asciidoc index 2c2b126f7b6..a7236878b70 100644 --- a/CHANGELOG.next.asciidoc +++ b/CHANGELOG.next.asciidoc @@ -266,6 +266,7 @@ https://github.com/elastic/beats/compare/v8.8.1\...main[Check the HEAD diff] - Ensure all responses sent by HTTP Endpoint are HTML-escaped. {pull}39329[39329] - Update CEL mito extensions to v1.11.0 to improve type checking. {pull}39460[39460] - Improve logging of request and response with request trace logging in error conditions. {pull}39455[39455] +- Implement Elastic Agent status and health reporting for CEL Filebeat input. {pull}39209[39209] - Add HTTP metrics to CEL input. {issue}39501[39501] {pull}39503[39503] *Auditbeat* diff --git a/filebeat/input/v2/compat/compat.go b/filebeat/input/v2/compat/compat.go index a8d2c0e8cb2..d84c9e39bfd 100644 --- a/filebeat/input/v2/compat/compat.go +++ b/filebeat/input/v2/compat/compat.go @@ -31,6 +31,7 @@ import ( v2 "github.com/elastic/beats/v7/filebeat/input/v2" "github.com/elastic/beats/v7/libbeat/beat" "github.com/elastic/beats/v7/libbeat/cfgfile" + "github.com/elastic/beats/v7/libbeat/management/status" conf "github.com/elastic/elastic-agent-libs/config" "github.com/elastic/elastic-agent-libs/logp" "github.com/elastic/go-concert/ctxtool" @@ -50,13 +51,14 @@ type factory struct { // On stop the runner triggers the shutdown signal and waits until the input // has returned. type runner struct { - id string - log *logp.Logger - agent *beat.Info - wg sync.WaitGroup - sig ctxtool.CancelContext - input v2.Input - connector beat.PipelineConnector + id string + log *logp.Logger + agent *beat.Info + wg sync.WaitGroup + sig ctxtool.CancelContext + input v2.Input + connector beat.PipelineConnector + statusReporter status.StatusReporter } // RunnerFactory creates a cfgfile.RunnerFactory from an input Loader that is @@ -109,6 +111,10 @@ func (f *factory) Create( }, nil } +func (r *runner) SetStatusReporter(reported status.StatusReporter) { + r.statusReporter = reported +} + func (r *runner) String() string { return r.input.Name() } func (r *runner) Start() { @@ -121,10 +127,11 @@ func (r *runner) Start() { log.Infof("Input '%s' starting", name) err := r.input.Run( v2.Context{ - ID: r.id, - Agent: *r.agent, - Logger: log, - Cancelation: r.sig, + ID: r.id, + Agent: *r.agent, + Logger: log, + Cancelation: r.sig, + StatusReporter: r.statusReporter, }, r.connector, ) @@ -140,6 +147,7 @@ func (r *runner) Stop() { r.sig.Cancel() r.wg.Wait() r.log.Infof("Input '%s' stopped (runner)", r.input.Name()) + r.statusReporter = nil } func configID(config *conf.C) (string, error) { diff --git a/filebeat/input/v2/input.go b/filebeat/input/v2/input.go index 30b8ad333b1..9b78bc427ae 100644 --- a/filebeat/input/v2/input.go +++ b/filebeat/input/v2/input.go @@ -22,6 +22,7 @@ import ( "time" "github.com/elastic/beats/v7/libbeat/beat" + "github.com/elastic/beats/v7/libbeat/management/status" conf "github.com/elastic/elastic-agent-libs/config" "github.com/elastic/elastic-agent-libs/logp" @@ -83,6 +84,17 @@ type Context struct { // Cancelation is used by Beats to signal the input to shutdown. Cancelation Canceler + + // StatusReporter provides a method to update the status of the underlying unit + // that maps to the config. Note: Under standalone execution of Filebeat this is + // expected to be nil. + StatusReporter status.StatusReporter +} + +func (c Context) UpdateStatus(status status.Status, msg string) { + if c.StatusReporter != nil { + c.StatusReporter.UpdateStatus(status, msg) + } } // TestContext provides the Input Test function with common environmental diff --git a/libbeat/cfgfile/list.go b/libbeat/cfgfile/list.go index 1f16e23ab4f..d557ffa25c2 100644 --- a/libbeat/cfgfile/list.go +++ b/libbeat/cfgfile/list.go @@ -29,6 +29,7 @@ import ( "github.com/elastic/beats/v7/libbeat/common" "github.com/elastic/beats/v7/libbeat/common/diagnostics" "github.com/elastic/beats/v7/libbeat/common/reload" + "github.com/elastic/beats/v7/libbeat/management/status" "github.com/elastic/beats/v7/libbeat/publisher/pipetool" "github.com/elastic/elastic-agent-libs/config" "github.com/elastic/elastic-agent-libs/logp" @@ -153,6 +154,12 @@ func (r *RunnerList) Reload(configs []*reload.ConfigWithMeta) error { r.logger.Debugf("Starting runner: %s", runner) r.runners[hash] = runner + if config.StatusReporter != nil { + if runnerWithStatus, ok := runner.(status.WithStatusReporter); ok { + runnerWithStatus.SetStatusReporter(config.StatusReporter) + } + } + runner.Start() moduleStarts.Add(1) if config.DiagCallback != nil { diff --git a/libbeat/common/reload/reload.go b/libbeat/common/reload/reload.go index 279b7fd26b0..7021796d28c 100644 --- a/libbeat/common/reload/reload.go +++ b/libbeat/common/reload/reload.go @@ -21,6 +21,7 @@ import ( "fmt" "sync" + "github.com/elastic/beats/v7/libbeat/management/status" "github.com/elastic/elastic-agent-libs/config" "github.com/elastic/elastic-agent-libs/mapstr" ) @@ -47,6 +48,11 @@ type ConfigWithMeta struct { // InputUnitID is the unit's ID that generated this ConfigWithMeta InputUnitID string + + // StatusReporter provides a method to update the status of the underlying unit + // that maps to the config. Note: Under standalone execution of a Beat this is + // expected to be nil. + StatusReporter status.StatusReporter } // ReloadableList provides a method to reload the configuration of a list of entities @@ -160,7 +166,7 @@ func (r *Registry) GetReloadableOutput() Reloadable { func (r *Registry) GetRegisteredNames() []string { r.RLock() defer r.RUnlock() - var names []string + names := make([]string, 0, len(r.confs)+len(r.confsLists)) for name := range r.confs { names = append(names, name) diff --git a/libbeat/management/management.go b/libbeat/management/management.go index 177642b3398..6770e87538d 100644 --- a/libbeat/management/management.go +++ b/libbeat/management/management.go @@ -21,49 +21,19 @@ import ( "sync" "github.com/elastic/beats/v7/libbeat/common/reload" + "github.com/elastic/beats/v7/libbeat/management/status" "github.com/elastic/elastic-agent-client/v7/pkg/client" "github.com/elastic/elastic-agent-libs/config" "github.com/elastic/elastic-agent-libs/logp" ) -// Status describes the current status of the beat. -type Status int - -//go:generate stringer -type=Status -const ( - // Unknown is initial status when none has been reported. - Unknown Status = iota - // Starting is status describing application is starting. - Starting - // Configuring is status describing application is configuring. - Configuring - // Running is status describing application is running. - Running - // Degraded is status describing application is degraded. - Degraded - // Failed is status describing application is failed. This status should - // only be used in the case the beat should stop running as the failure - // cannot be recovered. - Failed - // Stopping is status describing application is stopping. - Stopping - // Stopped is status describing application is stopped. - Stopped -) - // DebugK used as key for all things central management var DebugK = "centralmgmt" -// StatusReporter provides a method to update current status of the beat. -type StatusReporter interface { - // UpdateStatus called when the status of the beat has changed. - UpdateStatus(status Status, msg string) -} - // Manager interacts with the beat to provide status updates and to receive // configurations. type Manager interface { - StatusReporter + status.StatusReporter // Enabled returns true if manager is enabled. Enabled() bool @@ -133,7 +103,7 @@ func NewManager(cfg *config.C, registry *reload.Registry) (Manager, error) { } return &fallbackManager{ logger: logp.NewLogger("mgmt"), - status: Unknown, + status: status.Unknown, msg: "", }, nil } @@ -152,13 +122,13 @@ func SetManagerFactory(factory ManagerFactory) { type fallbackManager struct { logger *logp.Logger lock sync.Mutex - status Status + status status.Status msg string stopFunc func() stopOnce sync.Once } -func (n *fallbackManager) UpdateStatus(status Status, msg string) { +func (n *fallbackManager) UpdateStatus(status status.Status, msg string) { n.lock.Lock() defer n.lock.Unlock() if n.status != status || n.msg != msg { diff --git a/libbeat/management/status/status.go b/libbeat/management/status/status.go new file mode 100644 index 00000000000..d4a9e5c3f8d --- /dev/null +++ b/libbeat/management/status/status.go @@ -0,0 +1,55 @@ +// Licensed to Elasticsearch B.V. under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Elasticsearch B.V. licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package status + +// Status describes the current status of the beat. +type Status int + +//go:generate go run golang.org/x/tools/cmd/stringer -type=Status +const ( + // Unknown is initial status when none has been reported. + Unknown Status = iota + // Starting is status describing unit is starting. + Starting + // Configuring is status describing unit is configuring. + Configuring + // Running is status describing unit is running. + Running + // Degraded is status describing unit is degraded. + Degraded + // Failed is status describing unit is failed. This status should + // only be used in the case the beat should stop running as the failure + // cannot be recovered. + Failed + // Stopping is status describing unit is stopping. + Stopping + // Stopped is status describing unit is stopped. + Stopped +) + +// StatusReporter provides a method to update current status of a unit. +type StatusReporter interface { + // UpdateStatus updates the status of the unit. + UpdateStatus(status Status, msg string) +} + +// WithStatusReporter provides a method to set a status reporter +type WithStatusReporter interface { + // SetStatusReporter sets the status reporter + SetStatusReporter(reporter StatusReporter) +} diff --git a/libbeat/management/status_string.go b/libbeat/management/status/status_string.go similarity index 92% rename from libbeat/management/status_string.go rename to libbeat/management/status/status_string.go index d26703bb4f9..a26ebcc4322 100644 --- a/libbeat/management/status_string.go +++ b/libbeat/management/status/status_string.go @@ -17,7 +17,7 @@ // Code generated by "stringer -type=Status"; DO NOT EDIT. -package management +package status import "strconv" @@ -32,11 +32,12 @@ func _() { _ = x[Degraded-4] _ = x[Failed-5] _ = x[Stopping-6] + _ = x[Stopped-7] } -const _Status_name = "UnknownStartingConfiguringRunningDegradedFailedStopping" +const _Status_name = "UnknownStartingConfiguringRunningDegradedFailedStoppingStopped" -var _Status_index = [...]uint8{0, 7, 15, 26, 33, 41, 47, 55} +var _Status_index = [...]uint8{0, 7, 15, 26, 33, 41, 47, 55, 62} func (i Status) String() string { if i < 0 || i >= Status(len(_Status_index)-1) { diff --git a/x-pack/filebeat/input/cel/input.go b/x-pack/filebeat/input/cel/input.go index 759809e6e80..be2e912e4ee 100644 --- a/x-pack/filebeat/input/cel/input.go +++ b/x-pack/filebeat/input/cel/input.go @@ -23,7 +23,7 @@ import ( "strings" "time" - retryablehttp "github.com/hashicorp/go-retryablehttp" + "github.com/hashicorp/go-retryablehttp" "github.com/icholy/digest" "github.com/rcrowley/go-metrics" "go.elastic.co/ecszap" @@ -39,6 +39,7 @@ import ( inputcursor "github.com/elastic/beats/v7/filebeat/input/v2/input-cursor" "github.com/elastic/beats/v7/libbeat/beat" "github.com/elastic/beats/v7/libbeat/feature" + "github.com/elastic/beats/v7/libbeat/management/status" "github.com/elastic/beats/v7/libbeat/monitoring/inputmon" "github.com/elastic/beats/v7/libbeat/version" "github.com/elastic/beats/v7/x-pack/filebeat/input/internal/httplog" @@ -101,17 +102,26 @@ func (input) Test(src inputcursor.Source, _ v2.TestContext) error { // context cancellation or type invalidity errors, any other error will be retried. func (input) Run(env v2.Context, src inputcursor.Source, crsr inputcursor.Cursor, pub inputcursor.Publisher) error { var cursor map[string]interface{} + env.UpdateStatus(status.Starting, "") if !crsr.IsNew() { // Allow the user to bootstrap the program if needed. err := crsr.Unpack(&cursor) if err != nil { + env.UpdateStatus(status.Failed, "failed to unpack cursor: "+err.Error()) return err } } - return input{}.run(env, src.(*source), cursor, pub) + + err := input{}.run(env, src.(*source), cursor, pub) + if err != nil { + env.UpdateStatus(status.Failed, "failed to run: "+err.Error()) + return err + } + env.UpdateStatus(status.Stopped, "") + return nil } // sanitizeFileName returns name with ":" and "/" replaced with "_", removing repeated instances. -// The request.tracer.filename may have ":" when a httpjson input has cursor config and +// The request.tracer.filename may have ":" when a cel input has cursor config and // the macOS Finder will treat this as path-separator and causes to show up strange filepaths. func sanitizeFileName(name string) string { name = strings.ReplaceAll(name, ":", string(filepath.Separator)) @@ -170,6 +180,7 @@ func (i input) run(env v2.Context, src *source, cursor map[string]interface{}, p goodURL := cfg.Resource.URL.String() state["url"] = goodURL metrics.resource.Set(goodURL) + env.UpdateStatus(status.Running, "") // On entry, state is expected to be in the shape: // // { @@ -209,6 +220,8 @@ func (i input) run(env v2.Context, src *source, cursor map[string]interface{}, p budget = *cfg.MaxExecutions waitUntil time.Time ) + // Keep track of whether CEL is degraded for this periodic run. + var isDegraded bool for { if wait := time.Until(waitUntil); wait > 0 { // We have a special-case wait for when we have a zero limit. @@ -242,7 +255,9 @@ func (i input) run(env v2.Context, src *source, cursor map[string]interface{}, p return err } log.Errorw("failed evaluation", "error", err) + env.UpdateStatus(status.Degraded, "failed evaluation: "+err.Error()) } + isDegraded = err != nil metrics.celProcessingTime.Update(time.Since(start).Nanoseconds()) if trace != nil { log.Debugw("final transaction", "transaction.id", trace.TxID()) @@ -349,6 +364,8 @@ func (i input) run(env v2.Context, src *source, cursor map[string]interface{}, p e, ok := state["events"] if !ok { log.Error("unexpected missing events array from evaluation") + env.UpdateStatus(status.Degraded, "unexpected missing events array from evaluation") + isDegraded = true } var events []interface{} switch e := e.(type) { @@ -362,6 +379,8 @@ func (i input) run(env v2.Context, src *source, cursor map[string]interface{}, p return nil } log.Errorw("single event object returned by evaluation", "event", e) + env.UpdateStatus(status.Degraded, "single event object returned by evaluation") + isDegraded = true events = []interface{}{e} // Make sure the cursor is not updated. delete(state, "cursor") @@ -387,6 +406,8 @@ func (i input) run(env v2.Context, src *source, cursor map[string]interface{}, p if ok { if len(cursors) != len(events) { log.Errorw("unexpected cursor list length", "cursors", len(cursors), "events", len(events)) + env.UpdateStatus(status.Degraded, "unexpected cursor list length") + isDegraded = true // But try to continue. if len(cursors) < len(events) { cursors = nil @@ -437,6 +458,8 @@ func (i input) run(env v2.Context, src *source, cursor map[string]interface{}, p if err != nil { hadPublicationError = true log.Errorw("error publishing event", "error", err) + env.UpdateStatus(status.Degraded, "error publishing event: "+err.Error()) + isDegraded = true cursors = nil // We are lost, so retry with this event's cursor, continue // but continue with the events that we have without // advancing the cursor. This allows us to potentially publish the @@ -453,6 +476,11 @@ func (i input) run(env v2.Context, src *source, cursor map[string]interface{}, p return err } } + + if !isDegraded { + env.UpdateStatus(status.Running, "") + } + metrics.batchProcessingTime.Update(time.Since(start).Nanoseconds()) // Advance the cursor to the final state if there was no error during @@ -472,6 +500,7 @@ func (i input) run(env v2.Context, src *source, cursor map[string]interface{}, p budget-- if budget <= 0 { log.Warnw("exceeding maximum number of CEL executions", "limit", *cfg.MaxExecutions) + env.UpdateStatus(status.Degraded, "exceeding maximum number of CEL executions") return nil } } diff --git a/x-pack/filebeat/input/cel/integration_test.go b/x-pack/filebeat/input/cel/integration_test.go new file mode 100644 index 00000000000..fa9e5bf017c --- /dev/null +++ b/x-pack/filebeat/input/cel/integration_test.go @@ -0,0 +1,521 @@ +// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one +// or more contributor license agreements. Licensed under the Elastic License; +// you may not use this file except in compliance with the Elastic License. + +//go:build integration + +package cel_test + +import ( + "fmt" + "net/http" + "net/http/httptest" + "os" + "reflect" + "testing" + "time" + + "google.golang.org/protobuf/types/known/structpb" + + "github.com/elastic/beats/v7/libbeat/tests/integration" + filebeat "github.com/elastic/beats/v7/x-pack/filebeat/cmd" + "github.com/elastic/elastic-agent-client/v7/pkg/client/mock" + "github.com/elastic/elastic-agent-client/v7/pkg/proto" +) + +// TestCheckinV2 is an integration test that checks that CEL input reports in +// the expected statuses. Specifically it configures a filebeat instance to +// run a CEL input with two streams as well as it monitors the reported state +// by spawning an elastic-agent V2 mock server. +// This test also spawns two http servers for making the CEL input streams +// to report different states that are checked to match the expected states. +func TestCheckinV2(t *testing.T) { + // make sure there is an ES instance running + integration.EnsureESIsRunning(t) + esConnectionDetails := integration.GetESURL(t, "http") + outputHosts := []interface{}{fmt.Sprintf("%s://%s:%s", esConnectionDetails.Scheme, esConnectionDetails.Hostname(), esConnectionDetails.Port())} + outputUsername := esConnectionDetails.User.Username() + outputPassword, _ := esConnectionDetails.User.Password() + outputProtocol := esConnectionDetails.Scheme + + invalidResponse := []byte("invalid json") + validResponse := []byte("{\"ip\":\"0.0.0.0\"}") + + // http server for the first CEL input stream + serverOneResponse := validResponse + svrOne := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + _, _ = w.Write(serverOneResponse) + })) + defer svrOne.Close() + + // http server for the second CEL input stream + serverTwoResponse := validResponse + svrTwo := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + _, _ = w.Write(serverTwoResponse) + })) + defer svrTwo.Close() + + // allStreams is an elastic-agent configuration with an ES output and one CEL + // input with two streams. + allStreams := []*proto.UnitExpected{ + { + Id: "output-unit", + Type: proto.UnitType_OUTPUT, + ConfigStateIdx: 0, + State: proto.State_HEALTHY, + LogLevel: proto.UnitLogLevel_INFO, + Config: &proto.UnitExpectedConfig{ + Id: "default", + Type: "elasticsearch", + Name: "elasticsearch", + Source: integration.RequireNewStruct(t, map[string]interface{}{ + "type": "elasticsearch", + "hosts": outputHosts, + "username": outputUsername, + "password": outputPassword, + "protocol": outputProtocol, + "enabled": true, + "ssl.verification_mode": "none", + }), + }, + }, + { + Id: "input-unit-1", + Type: proto.UnitType_INPUT, + ConfigStateIdx: 0, + State: proto.State_HEALTHY, + LogLevel: proto.UnitLogLevel_DEBUG, + Config: &proto.UnitExpectedConfig{ + Id: "cel-cel-1e8b33de-d54a-45cd-90da-23ed71c482e5", + Type: "cel", + Name: "cel-1", + Source: integration.RequireNewStruct(t, map[string]interface{}{ + "use_output": "default", + "revision": 0, + }), + DataStream: &proto.DataStream{ + Namespace: "default", + }, + Meta: &proto.Meta{ + Package: &proto.Package{ + Name: "cel", + Version: "1.9.0", + }, + }, + Streams: []*proto.Stream{ + { + Id: "cel-cel.cel-1e8b33de-d54a-45cd-90da-23ed71c482e2", + DataStream: &proto.DataStream{ + Dataset: "cel.cel", + }, + Source: integration.RequireNewStruct(t, map[string]interface{}{ + "interval": "10s", + "program": `bytes(get(state.url).Body).as(body,{"events":[body.decode_json()]})`, + "redact.delete": false, + "regexp": nil, + "resource.url": svrOne.URL, + "publisher_pipeline.disable_host": true, + }), + }, + { + Id: "cel-cel.cel-1e8b33de-d54a-45cd-90da-ffffffc482e2", + DataStream: &proto.DataStream{ + Dataset: "cel.cel", + }, + Source: integration.RequireNewStruct(t, map[string]interface{}{ + "interval": "10s", + "program": `bytes(get(state.url).Body).as(body,{"events":[body.decode_json()]})`, + "redact.delete": false, + "regexp": nil, + "resource.url": svrTwo.URL, + "publisher_pipeline.disable_host": true, + }), + }, + }, + }, + }, + } + + // oneStream is an elastic-agent configuration with an ES output and one CEL + // input with one stream. Effectively this is the same as allStreams with + // stream cel-cel.cel-1e8b33de-d54a-45cd-90da-ffffffc482e2 removed. + oneStream := []*proto.UnitExpected{ + { + Id: "output-unit", + Type: proto.UnitType_OUTPUT, + ConfigStateIdx: 0, + State: proto.State_HEALTHY, + LogLevel: proto.UnitLogLevel_INFO, + Config: &proto.UnitExpectedConfig{ + Id: "default", + Type: "elasticsearch", + Name: "elasticsearch", + Source: integration.RequireNewStruct(t, map[string]interface{}{ + "type": "elasticsearch", + "hosts": outputHosts, + "username": outputUsername, + "password": outputPassword, + "protocol": outputProtocol, + "enabled": true, + "ssl.verification_mode": "none", + }), + }, + }, + { + Id: "input-unit-1", + Type: proto.UnitType_INPUT, + ConfigStateIdx: 0, + State: proto.State_HEALTHY, + LogLevel: proto.UnitLogLevel_DEBUG, + Config: &proto.UnitExpectedConfig{ + Id: "cel-cel-1e8b33de-d54a-45cd-90da-23ed71c482e5", + Type: "cel", + Name: "cel-1", + Source: integration.RequireNewStruct(t, map[string]interface{}{ + "use_output": "default", + "revision": 0, + }), + DataStream: &proto.DataStream{ + Namespace: "default", + }, + Meta: &proto.Meta{ + Package: &proto.Package{ + Name: "cel", + Version: "1.9.0", + }, + }, + Streams: []*proto.Stream{ + { + Id: "cel-cel.cel-1e8b33de-d54a-45cd-90da-23ed71c482e2", + DataStream: &proto.DataStream{ + Dataset: "cel.cel", + }, + Source: integration.RequireNewStruct(t, map[string]interface{}{ + "interval": "10s", + "program": `bytes(get(state.url).Body).as(body,{"events":[body.decode_json()]})`, + "redact.delete": false, + "regexp": nil, + "resource.url": svrOne.URL, + "publisher_pipeline.disable_host": true, + }), + }, + }, + }, + }, + } + + // noStream is an elastic-agent configuration with just an ES output. + noStream := []*proto.UnitExpected{ + { + Id: "output-unit", + Type: proto.UnitType_OUTPUT, + ConfigStateIdx: 0, + State: proto.State_HEALTHY, + LogLevel: proto.UnitLogLevel_INFO, + Config: &proto.UnitExpectedConfig{ + Id: "default", + Type: "elasticsearch", + Name: "elasticsearch", + Source: integration.RequireNewStruct(t, map[string]interface{}{ + "type": "elasticsearch", + "hosts": outputHosts, + "username": outputUsername, + "password": outputPassword, + "protocol": outputProtocol, + "enabled": true, + "ssl.verification_mode": "none", + }), + }, + }, + } + + // elastic-agent management V2 mock server + observedStates := make(chan *proto.CheckinObserved) + expectedUnits := make(chan []*proto.UnitExpected) + done := make(chan struct{}) + server := &mock.StubServerV2{ + CheckinV2Impl: func(observed *proto.CheckinObserved) *proto.CheckinExpected { + select { + case observedStates <- observed: + return &proto.CheckinExpected{ + Units: <-expectedUnits, + } + case <-done: + return nil + } + }, + ActionImpl: func(response *proto.ActionResponse) error { + return nil + }, + } + if err := server.Start(); err != nil { + t.Fatalf("failed to start StubServerV2 server: %v", err) + } + defer server.Stop() + + // It's necessary to change os.Args so filebeat.Filebeat() can read the + // appropriate args at beat.Execute(). + initialOSArgs := os.Args + os.Args = []string{ + "filebeat", + "-E", fmt.Sprintf(`management.insecure_grpc_url_for_testing="localhost:%d"`, server.Port), + "-E", "management.enabled=true", + "-E", "management.restart_on_output_change=true", + } + defer func() { + os.Args = initialOSArgs + }() + + beat := filebeat.Filebeat() + beatRunErr := make(chan error) + go func() { + defer close(beatRunErr) + beatRunErr <- beat.Execute() + }() + + // slice of funcs that check if the observed states match the expected ones. + // They return true if they match and false if they don't as well as a slice + // of units expected for the server to respond with. + checks := []func(t *testing.T, observed *proto.CheckinObserved) (bool, []*proto.UnitExpected){ + func(t *testing.T, observed *proto.CheckinObserved) (bool, []*proto.UnitExpected) { + // Wait for all healthy. + unitState, payload := extractStateAndPayload(observed, "input-unit-1") + if unitState != proto.State_HEALTHY { + return false, allStreams + } + + if !reflect.DeepEqual(map[string]interface{}{ + "streams": map[string]interface{}{ + "cel-cel.cel-1e8b33de-d54a-45cd-90da-23ed71c482e2": map[string]interface{}{ + "status": "HEALTHY", + "error": "", + }, + "cel-cel.cel-1e8b33de-d54a-45cd-90da-ffffffc482e2": map[string]interface{}{ + "status": "HEALTHY", + "error": "", + }, + }, + }, payload) { + return false, allStreams + } + + serverOneResponse = invalidResponse + + return true, allStreams + }, + func(t *testing.T, observed *proto.CheckinObserved) (bool, []*proto.UnitExpected) { + // Wait for one degraded. + unitState, payload := extractStateAndPayload(observed, "input-unit-1") + if unitState != proto.State_DEGRADED { + return false, allStreams + } + + if !reflect.DeepEqual(map[string]interface{}{ + "streams": map[string]interface{}{ + "cel-cel.cel-1e8b33de-d54a-45cd-90da-23ed71c482e2": map[string]interface{}{ + "status": "DEGRADED", + "error": "failed evaluation: failed eval: ERROR: :1:30: failed to unmarshal JSON message: invalid character 'i' looking for beginning of value\n | bytes(get(state.url).Body).as(body,{\"events\":[body.decode_json()]})\n | .............................^", + }, + "cel-cel.cel-1e8b33de-d54a-45cd-90da-ffffffc482e2": map[string]interface{}{ + "status": "HEALTHY", + "error": "", + }, + }, + }, payload) { + return false, allStreams + } + + serverTwoResponse = invalidResponse + return true, allStreams + }, + func(t *testing.T, observed *proto.CheckinObserved) (bool, []*proto.UnitExpected) { + // Wait for all degraded. + unitState, payload := extractStateAndPayload(observed, "input-unit-1") + if unitState != proto.State_DEGRADED { + return false, allStreams + } + + if !reflect.DeepEqual(map[string]interface{}{ + "streams": map[string]interface{}{ + "cel-cel.cel-1e8b33de-d54a-45cd-90da-23ed71c482e2": map[string]interface{}{ + "status": "DEGRADED", + "error": "failed evaluation: failed eval: ERROR: :1:30: failed to unmarshal JSON message: invalid character 'i' looking for beginning of value\n | bytes(get(state.url).Body).as(body,{\"events\":[body.decode_json()]})\n | .............................^", + }, + "cel-cel.cel-1e8b33de-d54a-45cd-90da-ffffffc482e2": map[string]interface{}{ + "status": "DEGRADED", + "error": "failed evaluation: failed eval: ERROR: :1:30: failed to unmarshal JSON message: invalid character 'i' looking for beginning of value\n | bytes(get(state.url).Body).as(body,{\"events\":[body.decode_json()]})\n | .............................^", + }, + }, + }, payload) { + return false, allStreams + } + + serverOneResponse = validResponse + serverTwoResponse = validResponse + return true, allStreams + }, + func(t *testing.T, observed *proto.CheckinObserved) (bool, []*proto.UnitExpected) { + // Wait for all healthy. + unitState, payload := extractStateAndPayload(observed, "input-unit-1") + if unitState != proto.State_HEALTHY { + return false, allStreams + } + + if !reflect.DeepEqual(map[string]interface{}{ + "streams": map[string]interface{}{ + "cel-cel.cel-1e8b33de-d54a-45cd-90da-23ed71c482e2": map[string]interface{}{ + "status": "HEALTHY", + "error": "", + }, + "cel-cel.cel-1e8b33de-d54a-45cd-90da-ffffffc482e2": map[string]interface{}{ + "status": "HEALTHY", + "error": "", + }, + }, + }, payload) { + return false, allStreams + } + + serverTwoResponse = invalidResponse + return true, allStreams + }, + func(t *testing.T, observed *proto.CheckinObserved) (bool, []*proto.UnitExpected) { + // Wait for all healthy. + unitState, payload := extractStateAndPayload(observed, "input-unit-1") + if unitState != proto.State_DEGRADED { + return false, allStreams + } + + if !reflect.DeepEqual(map[string]interface{}{ + "streams": map[string]interface{}{ + "cel-cel.cel-1e8b33de-d54a-45cd-90da-23ed71c482e2": map[string]interface{}{ + "status": "HEALTHY", + "error": "", + }, + "cel-cel.cel-1e8b33de-d54a-45cd-90da-ffffffc482e2": map[string]interface{}{ + "status": "DEGRADED", + "error": "failed evaluation: failed eval: ERROR: :1:30: failed to unmarshal JSON message: invalid character 'i' looking for beginning of value\n | bytes(get(state.url).Body).as(body,{\"events\":[body.decode_json()]})\n | .............................^", + }, + }, + }, payload) { + return false, allStreams + } + + setInputUnitsConfigStateIdx(oneStream, 1) + return true, oneStream + }, + func(t *testing.T, observed *proto.CheckinObserved) (bool, []*proto.UnitExpected) { + unitState, payload := extractStateAndPayload(observed, "input-unit-1") + if unitState != proto.State_HEALTHY { + return false, oneStream + } + + if !reflect.DeepEqual(map[string]interface{}{ + "streams": map[string]interface{}{ + "cel-cel.cel-1e8b33de-d54a-45cd-90da-23ed71c482e2": map[string]interface{}{ + "status": "HEALTHY", + "error": "", + }, + }, + }, payload) { + return false, oneStream + } + setInputUnitsConfigStateIdx(noStream, 2) + return true, noStream + }, + func(t *testing.T, observed *proto.CheckinObserved) (bool, []*proto.UnitExpected) { + _, payload := extractStateAndPayload(observed, "input-unit-1") + if payload != nil { + return false, noStream + } + + serverOneResponse = validResponse + serverTwoResponse = validResponse + + setInputUnitsConfigStateIdx(allStreams, 3) + return true, allStreams + }, + func(t *testing.T, observed *proto.CheckinObserved) (bool, []*proto.UnitExpected) { + // Wait for all healthy. + unitState, payload := extractStateAndPayload(observed, "input-unit-1") + if unitState != proto.State_HEALTHY { + return false, allStreams + } + + if !reflect.DeepEqual(map[string]interface{}{ + "streams": map[string]interface{}{ + "cel-cel.cel-1e8b33de-d54a-45cd-90da-23ed71c482e2": map[string]interface{}{ + "status": "HEALTHY", + "error": "", + }, + "cel-cel.cel-1e8b33de-d54a-45cd-90da-ffffffc482e2": map[string]interface{}{ + "status": "HEALTHY", + "error": "", + }, + }, + }, payload) { + return false, allStreams + } + + setInputUnitsConfigStateIdx(noStream, 4) + return true, noStream + }, + func(t *testing.T, observed *proto.CheckinObserved) (bool, []*proto.UnitExpected) { + _, payload := extractStateAndPayload(observed, "input-unit-1") + if payload != nil { + return false, noStream + } + + return true, []*proto.UnitExpected{} + }, + func(t *testing.T, observed *proto.CheckinObserved) (bool, []*proto.UnitExpected) { + return len(observed.Units) == 0, []*proto.UnitExpected{} + }, + } + + const wait = 3 * time.Minute + timer := time.NewTimer(wait) + defer timer.Stop() + for len(checks) > 0 { + select { + case observed := <-observedStates: + matched, expected := checks[0](t, observed) + expectedUnits <- expected + if !matched { + continue + } + timer.Reset(wait) + checks = checks[1:] + case err := <-beatRunErr: + if err != nil { + t.Fatalf("beat run err: %v", err) + } + case <-timer.C: + t.Fatal("timeout waiting for checkin") + } + } +} + +func extractStateAndPayload(observed *proto.CheckinObserved, inputID string) (proto.State, map[string]interface{}) { + for _, unit := range observed.GetUnits() { + if unit.Id == inputID { + return unit.GetState(), unit.Payload.AsMap() + } + } + + return -1, nil +} + +func setInputUnitsConfigStateIdx(units []*proto.UnitExpected, idx uint64) { + for _, unit := range units { + if unit.Type != proto.UnitType_INPUT { + continue + } + + if unit.Config == nil { + return + } + unit.ConfigStateIdx = idx + unit.Config.Source.Fields["revision"] = structpb.NewNumberValue(float64(idx)) + } +} diff --git a/x-pack/libbeat/management/generate.go b/x-pack/libbeat/management/generate.go index 59537e06686..4d1ec6f5174 100644 --- a/x-pack/libbeat/management/generate.go +++ b/x-pack/libbeat/management/generate.go @@ -384,7 +384,7 @@ func groupByOutputs(outCfg *proto.UnitExpectedConfig) (*reload.ConfigWithMeta, e // We still need to emulate the InjectHeadersRule AST code, // I don't think we can get the `Headers()` data reported by the AgentInfo() sourceMap := outCfg.GetSource().AsMap() - outputType := outCfg.GetType() //nolint:typecheck // this is used, linter just doesn't seem to see it + outputType := outCfg.GetType() if outputType == "" { return nil, fmt.Errorf("output config does not have a configured type field") } diff --git a/x-pack/libbeat/management/managerV2.go b/x-pack/libbeat/management/managerV2.go index 6152f4f5306..ea59249012a 100644 --- a/x-pack/libbeat/management/managerV2.go +++ b/x-pack/libbeat/management/managerV2.go @@ -26,6 +26,7 @@ import ( "github.com/elastic/beats/v7/libbeat/common/reload" "github.com/elastic/beats/v7/libbeat/features" lbmanagement "github.com/elastic/beats/v7/libbeat/management" + "github.com/elastic/beats/v7/libbeat/management/status" "github.com/elastic/beats/v7/libbeat/version" "github.com/elastic/elastic-agent-client/v7/pkg/client" "github.com/elastic/elastic-agent-client/v7/pkg/proto" @@ -37,7 +38,7 @@ import ( // since there's a type disagreement with the `client.DiagnosticHook` argument, and due to licensing issues we can't import the agent client types into the reloader type diagnosticHandler struct { log *logp.Logger - client *client.Unit + client *agentUnit } func (handler diagnosticHandler) Register(name string, description string, filename string, contentType string, callback func() []byte) { @@ -70,13 +71,13 @@ type BeatV2Manager struct { // track individual units given to us by the V2 API mx sync.Mutex - units map[unitKey]*client.Unit + units map[unitKey]*agentUnit actions []client.Action forceReload bool // status is reported as a whole for every unit sent to this component // hopefully this can be improved in the future to be separated per unit - status lbmanagement.Status + status status.Status message string payload map[string]interface{} @@ -165,7 +166,8 @@ func NewV2AgentManager(config *conf.C, registry *reload.Registry) (lbmanagement. Meta: map[string]string{ "commit": version.Commit(), "build_time": version.BuildTime().String(), - }} + }, + } var agentClient client.V2 var err error if c.InsecureGRPCURLForTesting != "" && c.Enabled { @@ -202,8 +204,8 @@ func NewV2AgentManagerWithClient(config *Config, registry *reload.Registry, agen config: config, logger: log.Named("V2-manager"), registry: registry, - units: make(map[unitKey]*client.Unit), - status: lbmanagement.Running, + units: make(map[unitKey]*agentUnit), + status: status.Running, message: "Healthy", stopChan: make(chan struct{}, 1), changeDebounce: time.Second, @@ -241,7 +243,7 @@ func (cm *BeatV2Manager) RegisterDiagnosticHook(name string, description string, } // UpdateStatus updates the manager with the current status for the beat. -func (cm *BeatV2Manager) UpdateStatus(status lbmanagement.Status, msg string) { +func (cm *BeatV2Manager) UpdateStatus(status status.Status, msg string) { cm.mx.Lock() defer cm.mx.Unlock() @@ -312,8 +314,8 @@ func (cm *BeatV2Manager) RegisterAction(action client.Action) { for _, unit := range cm.units { // actions are only registered on input units (not a requirement by Agent but // don't see a need in beats to support actions on an output at the moment) - if unit.Type() == client.UnitTypeInput { - unit.RegisterAction(action) + if clientUnit := unit; clientUnit != nil && clientUnit.Type() == client.UnitTypeInput { + clientUnit.RegisterAction(action) } } } @@ -341,8 +343,8 @@ func (cm *BeatV2Manager) UnregisterAction(action client.Action) { for _, unit := range cm.units { // actions are only registered on input units (not a requirement by Agent but // don't see a need in beats to support actions on an output at the moment) - if unit.Type() == client.UnitTypeInput { - unit.UnregisterAction(action) + if clientUnit := unit; clientUnit != nil && clientUnit.Type() == client.UnitTypeInput { + clientUnit.UnregisterAction(action) } } } @@ -364,7 +366,6 @@ func (cm *BeatV2Manager) SetPayload(payload map[string]interface{}) { // Errors while starting/reloading inputs are already reported by unit, but // the shutdown process is still not being handled by unit. func (cm *BeatV2Manager) updateStatuses() { - status := getUnitState(cm.status) message := cm.message payload := cm.payload @@ -375,7 +376,7 @@ func (cm *BeatV2Manager) updateStatuses() { // `reload` method and will be marked stopped in that code path) continue } - err := unit.UpdateState(status, message, payload) + err := unit.UpdateState(cm.status, message, payload) if err != nil { cm.logger.Errorf("Failed to update unit %s status: %s", unit.ID(), err) } @@ -386,21 +387,29 @@ func (cm *BeatV2Manager) updateStatuses() { // Unit manager // ================================ -func (cm *BeatV2Manager) addUnit(unit *client.Unit) { +func (cm *BeatV2Manager) upsertUnit(unit *client.Unit) { cm.mx.Lock() defer cm.mx.Unlock() - cm.units[unitKey{unit.Type(), unit.ID()}] = unit + + aUnit, ok := cm.units[unitKey{unit.Type(), unit.ID()}] + if ok { + aUnit.update(unit) + } else { + unitLogger := cm.logger.Named(fmt.Sprintf("state-unit-%s", unit.ID())) + aUnit = newAgentUnit(unit, unitLogger) + cm.units[unitKey{unit.Type(), unit.ID()}] = aUnit + } // update specific unit to starting - _ = unit.UpdateState(client.UnitStateStarting, "Starting", nil) + _ = aUnit.UpdateState(status.Starting, "Starting", nil) // register the already registered actions (only on input units) for _, action := range cm.actions { - unit.RegisterAction(action) + aUnit.RegisterAction(action) } } -func (cm *BeatV2Manager) modifyUnit(unit *client.Unit) { +func (cm *BeatV2Manager) updateUnit(unit *client.Unit) { // `unit` is already in `cm.units` no need to add it to the map again // but the lock still needs to be held so reload can be triggered cm.mx.Lock() @@ -411,27 +420,32 @@ func (cm *BeatV2Manager) modifyUnit(unit *client.Unit) { // is reflected here. As this deals with modifications, they're already present. // Only the state needs to be updated. + aUnit, ok := cm.units[unitKey{unit.Type(), unit.ID()}] + if !ok { + cm.logger.Infof("BeatV2Manager.updateUnit Unit %s not found", unit.ID()) + return + } + + aUnit.update(unit) + expected := unit.Expected() if expected.State == client.UnitStateStopped { // expected to be stopped; needs to stop this unit - _ = unit.UpdateState(client.UnitStateStopping, "Stopping", nil) + _ = aUnit.UpdateState(status.Stopping, "Stopping", nil) } else { // update specific unit to configuring - _ = unit.UpdateState(client.UnitStateConfiguring, "Configuring", nil) + _ = aUnit.UpdateState(status.Configuring, "Configuring", nil) } } -func (cm *BeatV2Manager) deleteUnit(unit *client.Unit) { - // a unit will only be deleted once it has reported stopped so nothing - // more needs to be done other than cleaning up the reference to the unit +func (cm *BeatV2Manager) softDeleteUnit(unit *client.Unit) { cm.mx.Lock() - delete(cm.units, unitKey{unit.Type(), unit.ID()}) - empty := len(cm.units) == 0 - cm.mx.Unlock() + defer cm.mx.Unlock() - // stop the entire beat when all units removed - if empty && cm.stopOnEmptyUnits { - cm.stopBeat() + key := unitKey{unit.Type(), unit.ID()} + + if aUnit, ok := cm.units[key]; ok { + aUnit.markAsDeleted() } } @@ -482,7 +496,7 @@ func (cm *BeatV2Manager) unitListen() { cm.logger.Debug("Received sighup, stopping") } cm.isRunning = false - cm.UpdateStatus(lbmanagement.Stopping, "Stopping") + cm.UpdateStatus(status.Stopping, "Stopping") return case change := <-cm.client.UnitChanges(): cm.logger.Infof( @@ -494,26 +508,39 @@ func (cm *BeatV2Manager) unitListen() { // Within the context of how we send config to beats, I'm not sure if there is a difference between // A unit add and a unit change, since either way we can't do much more than call the reloader case client.UnitChangedAdded: - cm.addUnit(change.Unit) + cm.upsertUnit(change.Unit) // reset can be called here because `<-t.C` is handled in the same select t.Reset(cm.changeDebounce) case client.UnitChangedModified: - cm.modifyUnit(change.Unit) + cm.updateUnit(change.Unit) // reset can be called here because `<-t.C` is handled in the same select t.Reset(cm.changeDebounce) case client.UnitChangedRemoved: - cm.deleteUnit(change.Unit) + // necessary to soft-delete here and follow up with the actual deletion of units + // in `<-t.C` to avoid deleting a unit that will be re-created before `<-t.C` + // expires where the respective runners will not reload; actual deleting here + // can cause a runner to lose ref to a unit + cm.softDeleteUnit(change.Unit) } case <-t.C: // a copy of the units is used for reload to prevent the holding of the `cm.mx`. // it could be possible that sending the configuration to reload could cause the `UpdateStatus` // to be called on the manager causing it to try and grab the `cm.mx` lock, causing a deadlock. cm.mx.Lock() - units := make(map[unitKey]*client.Unit, len(cm.units)) + units := make(map[unitKey]*agentUnit, len(cm.units)) for k, u := range cm.units { + if u.softDeleted { + delete(cm.units, k) + continue + } units[k] = u } cm.mx.Unlock() + + if len(cm.units) == 0 && cm.stopOnEmptyUnits { + cm.stopBeat() + } + cm.reload(units) if cm.forceReload { // Restart the debounce timer so we try to reload the inputs. @@ -528,7 +555,7 @@ func (cm *BeatV2Manager) stopBeat() { return } cm.logger.Debugf("Stopping beat") - cm.UpdateStatus(lbmanagement.Stopping, "Stopping") + cm.UpdateStatus(status.Stopping, "Stopping") cm.isRunning = false cm.stopMut.Lock() @@ -539,19 +566,19 @@ func (cm *BeatV2Manager) stopBeat() { cm.beatStop.Do(cm.stopFunc) } cm.client.Stop() - cm.UpdateStatus(lbmanagement.Stopped, "Stopped") + cm.UpdateStatus(status.Stopped, "Stopped") if cm.errCanceller != nil { cm.errCanceller() cm.errCanceller = nil } } -func (cm *BeatV2Manager) reload(units map[unitKey]*client.Unit) { +func (cm *BeatV2Manager) reload(units map[unitKey]*agentUnit) { lowestLevel := client.UnitLogLevelError - var outputUnit *client.Unit - var inputUnits []*client.Unit - var stoppingUnits []*client.Unit - healthyInputs := map[string]*client.Unit{} + var outputUnit *agentUnit + var inputUnits []*agentUnit + var stoppingUnits []*agentUnit + healthyInputs := map[string]*agentUnit{} unitErrors := map[string][]error{} // as the very last action, set the state of the failed units @@ -559,7 +586,7 @@ func (cm *BeatV2Manager) reload(units map[unitKey]*client.Unit) { for _, unit := range units { errs := unitErrors[unit.ID()] if len(errs) != 0 { - _ = unit.UpdateState(client.UnitStateFailed, errors.Join(errs...).Error(), nil) + _ = unit.UpdateState(status.Failed, errors.Join(errs...).Error(), nil) } } }() @@ -631,14 +658,14 @@ func (cm *BeatV2Manager) reload(units map[unitKey]*client.Unit) { cm.logger.Errorw("could not start output", "error", err) msg := fmt.Sprintf("could not start output: %s", err) - if err := outputUnit.UpdateState(client.UnitStateFailed, msg, nil); err != nil { + if err := outputUnit.UpdateState(status.Failed, msg, nil); err != nil { cm.logger.Errorw("setting output state", "error", err) } return } - if err := outputUnit.UpdateState(client.UnitStateHealthy, "Healthy", nil); err != nil { + if err := outputUnit.UpdateState(status.Running, "Healthy", nil); err != nil { cm.logger.Errorw("setting output state", "error", err) } @@ -661,7 +688,7 @@ func (cm *BeatV2Manager) reload(units map[unitKey]*client.Unit) { // report the stopping units as stopped for _, unit := range stoppingUnits { - _ = unit.UpdateState(client.UnitStateStopped, "Stopped", nil) + _ = unit.UpdateState(status.Stopped, "Stopped", nil) } // now update the statuses of all units that contain only healthy @@ -675,7 +702,7 @@ func (cm *BeatV2Manager) reload(units map[unitKey]*client.Unit) { continue } - err := unit.UpdateState(client.UnitStateHealthy, "Healthy", nil) + err := unit.UpdateState(status.Running, "Healthy", nil) if err != nil { cm.logger.Errorf("Failed to update unit %s status: %s", unit.ID(), err) } @@ -688,7 +715,7 @@ func (cm *BeatV2Manager) reload(units map[unitKey]*client.Unit) { // // In any other case, the bool is always false and the error will be non nil // if any error has occurred. -func (cm *BeatV2Manager) reloadOutput(unit *client.Unit) (bool, error) { +func (cm *BeatV2Manager) reloadOutput(unit *agentUnit) (bool, error) { // Assuming that the output reloadable isn't a list, see createBeater() in cmd/instance/beat.go output := cm.registry.GetReloadableOutput() if output == nil { @@ -722,7 +749,7 @@ func (cm *BeatV2Manager) reloadOutput(unit *client.Unit) (bool, error) { if cm.stopOnOutputReload && cm.lastOutputCfg != nil { cm.logger.Info("beat is restarting because output changed") - _ = unit.UpdateState(client.UnitStateStopping, "Restarting", nil) + _ = unit.UpdateState(status.Stopping, "Restarting", nil) cm.Stop() return true, nil } @@ -745,7 +772,7 @@ func (cm *BeatV2Manager) reloadOutput(unit *client.Unit) (bool, error) { return false, nil } -func (cm *BeatV2Manager) reloadInputs(inputUnits []*client.Unit) error { +func (cm *BeatV2Manager) reloadInputs(inputUnits []*agentUnit) error { obj := cm.registry.GetInputList() if obj == nil { return fmt.Errorf("failed to find beat reloadable type 'input'") @@ -768,9 +795,10 @@ func (cm *BeatV2Manager) reloadInputs(inputUnits []*client.Unit) error { } // add diag callbacks for unit // we want to add the diagnostic handler that's specific to the unit, and not the gobal diagnostic handler - for _, in := range inputCfg { + for idx, in := range inputCfg { in.DiagCallback = diagnosticHandler{client: unit, log: cm.logger.Named("diagnostic-manager")} in.InputUnitID = unit.ID() + in.StatusReporter = unit.GetReporterForStreamByIndex(idx) } inputCfgs[unit.ID()] = expected.Config inputBeatCfgs = append(inputBeatCfgs, inputCfg...) @@ -892,30 +920,6 @@ func (cm *BeatV2Manager) handleDebugYaml() []byte { return data } -func getUnitState(status lbmanagement.Status) client.UnitState { - switch status { - case lbmanagement.Unknown: - // must be started if its unknown - return client.UnitStateStarting - case lbmanagement.Starting: - return client.UnitStateStarting - case lbmanagement.Configuring: - return client.UnitStateConfiguring - case lbmanagement.Running: - return client.UnitStateHealthy - case lbmanagement.Degraded: - return client.UnitStateDegraded - case lbmanagement.Failed: - return client.UnitStateFailed - case lbmanagement.Stopping: - return client.UnitStateStopping - case lbmanagement.Stopped: - return client.UnitStateStopped - } - // unknown again? - return client.UnitStateStarting -} - func getZapcoreLevel(ll client.UnitLogLevel) (zapcore.Level, bool) { switch ll { case client.UnitLogLevelError: diff --git a/x-pack/libbeat/management/unit.go b/x-pack/libbeat/management/unit.go new file mode 100644 index 00000000000..729be62c77a --- /dev/null +++ b/x-pack/libbeat/management/unit.go @@ -0,0 +1,414 @@ +// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one +// or more contributor license agreements. Licensed under the Elastic License; +// you may not use this file except in compliance with the Elastic License. + +package management + +import ( + "fmt" + "sync" + + "github.com/elastic/beats/v7/libbeat/management/status" + "github.com/elastic/elastic-agent-client/v7/pkg/client" + "github.com/elastic/elastic-agent-libs/logp" +) + +// unitState is the current state of a unit +type unitState struct { + state status.Status + msg string +} + +type clientUnit interface { + ID() string + Type() client.UnitType + Expected() client.Expected + UpdateState(state client.UnitState, message string, payload map[string]interface{}) error + RegisterAction(action client.Action) + UnregisterAction(action client.Action) + RegisterDiagnosticHook(name string, description string, filename string, contentType string, hook client.DiagnosticHook) +} + +// agentUnit implements status.StatusReporter and holds an unitState +// for the input as well as a unitState for each stream of +// the input in when this a client.UnitTypeInput. +type agentUnit struct { + softDeleted bool + mtx sync.Mutex + logger *logp.Logger + clientUnit clientUnit + inputLevelState unitState + streamIDs []string + streamStates map[string]unitState +} + +// getUnitState converts status.Status to client.UnitState +func getUnitState(s status.Status) client.UnitState { + switch s { + case status.Unknown: + // must be started if its unknown + return client.UnitStateStarting + case status.Starting: + return client.UnitStateStarting + case status.Configuring: + return client.UnitStateConfiguring + case status.Running: + return client.UnitStateHealthy + case status.Degraded: + return client.UnitStateDegraded + case status.Failed: + return client.UnitStateFailed + case status.Stopping: + return client.UnitStateStopping + case status.Stopped: + return client.UnitStateStopped + default: + // as this is an unknown state, return failed to get some attention + return client.UnitStateFailed + } +} + +// getUnitState converts status.Status to client.UnitState +func getStatus(s client.UnitState) status.Status { + switch s { + case client.UnitStateStarting: + return status.Starting + case client.UnitStateConfiguring: + return status.Configuring + case client.UnitStateHealthy: + return status.Running + case client.UnitStateDegraded: + return status.Degraded + case client.UnitStateFailed: + return status.Failed + case client.UnitStateStopping: + return status.Stopping + case client.UnitStateStopped: + return status.Stopped + default: + return status.Unknown + } +} + +func getStreamStates(expected client.Expected) (map[string]unitState, []string) { + expectedCfg := expected.Config + + if expectedCfg == nil { + return nil, nil + } + + streamStates := make(map[string]unitState, len(expectedCfg.Streams)) + streamIDs := make([]string, len(expectedCfg.Streams)) + + for idx, stream := range expectedCfg.Streams { + streamState := unitState{ + state: status.Unknown, + msg: "", + } + + if id := stream.GetId(); id != "" { + streamIDs[idx] = id + streamStates[id] = streamState + continue + } + + if cfgName := expectedCfg.GetName(); cfgName != "" { + id := fmt.Sprintf("%s.[%d]", cfgName, idx) + streamIDs[idx] = id + streamStates[id] = streamState + continue + } + + id := fmt.Sprintf("%s.[%d]", expectedCfg.GetId(), idx) + streamIDs[idx] = id + streamStates[id] = streamState + } + + return streamStates, streamIDs +} + +// newAgentUnit creates a new agentUnit. In case the supplied client.Unit is of type +// client.UnitTypeInput it initializes the streamStates with a unitState.Unknown +func newAgentUnit(cu clientUnit, log *logp.Logger) *agentUnit { + var ( + streamStates map[string]unitState + streamIDs []string + ) + + if cu.Type() == client.UnitTypeInput { + streamStates, streamIDs = getStreamStates(cu.Expected()) + } + + return &agentUnit{ + clientUnit: cu, + logger: log, + streamIDs: streamIDs, + streamStates: streamStates, + } +} + +// RegisterAction registers action handler for this unit. +func (u *agentUnit) RegisterAction(action client.Action) { + u.mtx.Lock() + defer u.mtx.Unlock() + + if u.clientUnit == nil { + return + } + + u.clientUnit.RegisterAction(action) +} + +// UnregisterAction unregisters action handler with the client. +func (u *agentUnit) UnregisterAction(action client.Action) { + u.mtx.Lock() + defer u.mtx.Unlock() + + if u.clientUnit == nil { + return + } + + u.clientUnit.UnregisterAction(action) +} + +func (u *agentUnit) RegisterDiagnosticHook(name string, description string, filename string, contentType string, hook client.DiagnosticHook) { + u.mtx.Lock() + defer u.mtx.Unlock() + + if u.clientUnit == nil { + return + } + + u.clientUnit.RegisterDiagnosticHook(name, description, filename, contentType, hook) +} + +func (u *agentUnit) Expected() client.Expected { + u.mtx.Lock() + defer u.mtx.Unlock() + + if u.clientUnit == nil { + return client.Expected{} + } + + return u.clientUnit.Expected() +} + +func (u *agentUnit) ID() string { + u.mtx.Lock() + defer u.mtx.Unlock() + + if u.clientUnit == nil { + return "" + } + + return u.clientUnit.ID() +} + +// calcState calculates the current state of the unit. +func (u *agentUnit) calcState() (status.Status, string) { + // for type output return the unit state directly as it has no streams + if u.clientUnit.Type() == client.UnitTypeOutput { + return u.inputLevelState.state, u.inputLevelState.msg + } + + // if inputLevelState state is not running return the inputLevelState state + if u.inputLevelState.state != status.Running { + return u.inputLevelState.state, u.inputLevelState.msg + } + + // inputLevelState state is marked as running, check the stream states + reportedStatus := status.Running + reportedMsg := "Healthy" + for _, streamState := range u.streamStates { + switch streamState.state { + case status.Degraded: + if reportedStatus != status.Degraded { + reportedStatus = status.Degraded + reportedMsg = streamState.msg + } + case status.Failed: + // return the first failed stream + return streamState.state, streamState.msg + } + } + + return reportedStatus, reportedMsg +} + +// Type of the unit. +func (u *agentUnit) Type() client.UnitType { + u.mtx.Lock() + defer u.mtx.Unlock() + + if u.clientUnit == nil { + return client.UnitTypeInput + } + + return u.clientUnit.Type() +} + +// UpdateState updates the state for the unit. +func (u *agentUnit) UpdateState(state status.Status, msg string, payload map[string]interface{}) error { + u.mtx.Lock() + defer u.mtx.Unlock() + + if u.clientUnit == nil { + return nil + } + + if u.inputLevelState.state == state && u.inputLevelState.msg == msg { + return nil + } + + u.inputLevelState = unitState{ + state: state, + msg: msg, + } + + state, msg = u.calcState() + + if u.clientUnit.Type() == client.UnitTypeOutput || len(u.streamStates) == 0 { + return u.clientUnit.UpdateState(getUnitState(state), msg, payload) + } + + streamsPayload := make(map[string]interface{}, len(u.streamStates)) + + for streamID, streamState := range u.streamStates { + streamsPayload[streamID] = map[string]interface{}{ + "status": getUnitState(streamState.state).String(), + "error": streamState.msg, + } + } + + if payload == nil { + payload = make(map[string]interface{}) + } + + payload["streams"] = streamsPayload + + return u.clientUnit.UpdateState(getUnitState(state), msg, payload) +} + +// updateStateForStream updates the state for a specific stream in the agent unit. +func (u *agentUnit) updateStateForStream(streamID string, state status.Status, msg string) { + u.mtx.Lock() + defer u.mtx.Unlock() + + if u.clientUnit == nil || u.streamStates == nil { + return + } + + if _, ok := u.streamStates[streamID]; !ok { + return + } + + if u.streamStates[streamID].state == state { + return + } + + u.streamStates[streamID] = unitState{ + state: state, + msg: msg, + } + + state, msg = u.calcState() + + streamsPayload := make(map[string]interface{}, len(u.streamStates)) + + for id, streamState := range u.streamStates { + streamsPayload[id] = map[string]interface{}{ + "status": getUnitState(streamState.state).String(), + "error": streamState.msg, + } + } + + payload := map[string]interface{}{ + "streams": streamsPayload, + } + + if err := u.clientUnit.UpdateState(getUnitState(state), msg, payload); err != nil { + u.logger.Warnf("failed to update state for input %s: %v", u.ID(), err) + } +} + +func (u *agentUnit) update(cu *client.Unit) { + u.mtx.Lock() + defer u.mtx.Unlock() + + u.softDeleted = false + u.clientUnit = cu + + inputStatus := getStatus(cu.Expected().State) + if u.inputLevelState.state != inputStatus { + u.inputLevelState = unitState{ + state: inputStatus, + } + } + + newStreamStates, newStreamIDs := getStreamStates(cu.Expected()) + + for key, state := range newStreamStates { + if _, exists := u.streamStates[key]; exists { + continue + } + + u.streamStates[key] = state + } + + for key := range u.streamStates { + if _, exists := newStreamStates[key]; !exists { + delete(u.streamStates, key) + } + } + + switch { + case len(newStreamIDs) != len(u.streamIDs): + u.streamIDs = newStreamIDs + default: + for idx, streamID := range u.streamIDs { + if newStreamIDs[idx] != streamID { + u.streamIDs = newStreamIDs + break + } + } + } +} + +func (u *agentUnit) markAsDeleted() { + u.mtx.Lock() + defer u.mtx.Unlock() + + u.softDeleted = true +} + +// GetReporterForStreamByIndex returns a status reporter for the stream at the given index. +// Note if the index is out of range it returns nil. It is up to the caller to check the return value. +func (u *agentUnit) GetReporterForStreamByIndex(idx int) status.StatusReporter { + u.mtx.Lock() + defer u.mtx.Unlock() + + if idx >= len(u.streamIDs) { + return nil + } + + return &streamStatusReporter{ + id: u.streamIDs[idx], + unit: u, + } +} + +// streamStatusReporter implements status.StatusReporter +type streamStatusReporter struct { + id string + unit *agentUnit +} + +// UpdateStatus updates the status of the stream unit. +func (s *streamStatusReporter) UpdateStatus(state status.Status, msg string) { + s.unit.updateStateForStream(s.id, state, msg) +} + +// ID of the stream unit. +func (s *streamStatusReporter) ID() string { + return s.id +} diff --git a/x-pack/libbeat/management/unit_test.go b/x-pack/libbeat/management/unit_test.go new file mode 100644 index 00000000000..9684ff5e16e --- /dev/null +++ b/x-pack/libbeat/management/unit_test.go @@ -0,0 +1,215 @@ +// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one +// or more contributor license agreements. Licensed under the Elastic License; +// you may not use this file except in compliance with the Elastic License. + +package management + +import ( + "testing" + + "github.com/elastic/elastic-agent-client/v7/pkg/client" + "github.com/elastic/elastic-agent-client/v7/pkg/proto" + + "github.com/elastic/beats/v7/libbeat/management/status" +) + +func TestUnitUpdate(t *testing.T) { + + type StatusUpdate struct { + status status.Status + msg string + } + + const ( + Healthy = "Healthy" + Failed = "Failed" + Degraded = "Degraded" + ) + + unitCfg := &mockClientUnit{ + expected: client.Expected{ + Config: &proto.UnitExpectedConfig{ + Id: "inputLevelState-1", + Streams: []*proto.Stream{ + {Id: "stream-1"}, + {Id: "stream-2"}, + }, + }, + }, + } + + cases := []struct { + name string + unit *mockClientUnit + inputLevelStatus StatusUpdate + streamStates map[string]StatusUpdate + expectedUnitStatus client.UnitState + expectedUnitMsg string + }{ + { + name: "all running", + unit: unitCfg, + inputLevelStatus: StatusUpdate{status.Running, Healthy}, + streamStates: map[string]StatusUpdate{ + "stream-1": {status.Running, Healthy}, + "stream-2": {status.Running, Healthy}, + }, + expectedUnitStatus: client.UnitStateHealthy, + expectedUnitMsg: Healthy, + }, + { + name: "inputLevelState failed", + unit: unitCfg, + inputLevelStatus: StatusUpdate{status.Failed, Failed}, + streamStates: map[string]StatusUpdate{ + "stream-1": {status.Running, Healthy}, + "stream-2": {status.Running, Healthy}, + }, + expectedUnitStatus: client.UnitStateFailed, + expectedUnitMsg: Failed, + }, + { + name: "inputLevelState stopping", + unit: unitCfg, + inputLevelStatus: StatusUpdate{status.Stopping, ""}, + streamStates: map[string]StatusUpdate{ + "stream-1": {status.Running, Healthy}, + "stream-2": {status.Running, Healthy}, + }, + expectedUnitStatus: client.UnitStateStopping, + expectedUnitMsg: "", + }, + { + name: "inputLevelState configuring", + unit: unitCfg, + inputLevelStatus: StatusUpdate{status.Configuring, ""}, + streamStates: map[string]StatusUpdate{ + "stream-1": {status.Running, Healthy}, + "stream-2": {status.Running, Healthy}, + }, + expectedUnitStatus: client.UnitStateConfiguring, + expectedUnitMsg: "", + }, + { + name: "inputLevelState starting", + unit: unitCfg, + inputLevelStatus: StatusUpdate{status.Starting, ""}, + streamStates: map[string]StatusUpdate{ + "stream-1": {status.Running, Healthy}, + "stream-2": {status.Running, Healthy}, + }, + expectedUnitStatus: client.UnitStateStarting, + expectedUnitMsg: "", + }, + { + name: "inputLevelState degraded", + unit: unitCfg, + inputLevelStatus: StatusUpdate{status.Degraded, Degraded}, + streamStates: map[string]StatusUpdate{ + "stream-1": {status.Running, Healthy}, + "stream-2": {status.Running, Healthy}, + }, + expectedUnitStatus: client.UnitStateDegraded, + expectedUnitMsg: Degraded, + }, + { + name: "one stream failed the other running", + unit: unitCfg, + inputLevelStatus: StatusUpdate{status.Running, Healthy}, + streamStates: map[string]StatusUpdate{ + "stream-1": {status.Failed, Failed}, + "stream-2": {status.Running, Healthy}, + }, + expectedUnitStatus: client.UnitStateFailed, + expectedUnitMsg: Failed, + }, + { + name: "one stream failed the other degraded", + unit: unitCfg, + inputLevelStatus: StatusUpdate{status.Running, Healthy}, + streamStates: map[string]StatusUpdate{ + "stream-1": {status.Failed, Failed}, + "stream-2": {status.Degraded, Degraded}, + }, + expectedUnitStatus: client.UnitStateFailed, + expectedUnitMsg: Failed, + }, + { + name: "one stream running the other degraded", + unit: unitCfg, + inputLevelStatus: StatusUpdate{status.Running, Healthy}, + streamStates: map[string]StatusUpdate{ + "stream-1": {status.Running, Healthy}, + "stream-2": {status.Degraded, Degraded}, + }, + expectedUnitStatus: client.UnitStateDegraded, + expectedUnitMsg: Degraded, + }, + { + name: "both streams degraded", + unit: unitCfg, + inputLevelStatus: StatusUpdate{status.Running, Healthy}, + streamStates: map[string]StatusUpdate{ + "stream-1": {status.Degraded, Degraded}, + "stream-2": {status.Degraded, Degraded}, + }, + expectedUnitStatus: client.UnitStateDegraded, + expectedUnitMsg: Degraded, + }, + } + + for _, c := range cases { + t.Run(c.name, func(t *testing.T) { + aUnit := newAgentUnit(c.unit, nil) + err := aUnit.UpdateState(c.inputLevelStatus.status, c.inputLevelStatus.msg, nil) + if err != nil { + t.Fatal(err) + } + + for id, state := range c.streamStates { + aUnit.updateStateForStream(id, state.status, state.msg) + } + + if c.unit.reportedState != c.expectedUnitStatus { + t.Errorf("expected unit status %s, got %s", c.expectedUnitStatus, aUnit.inputLevelState.state) + } + + if c.unit.reportedMsg != c.expectedUnitMsg { + t.Errorf("expected unit msg %s, got %s", c.expectedUnitStatus, aUnit.inputLevelState.state) + } + }) + } +} + +type mockClientUnit struct { + expected client.Expected + reportedState client.UnitState + reportedMsg string +} + +func (u *mockClientUnit) Expected() client.Expected { + return u.expected +} + +func (u *mockClientUnit) UpdateState(state client.UnitState, msg string, _ map[string]interface{}) error { + u.reportedState = state + u.reportedMsg = msg + return nil +} + +func (u *mockClientUnit) ID() string { + return "inputLevelState-1" +} + +func (u *mockClientUnit) Type() client.UnitType { + return client.UnitTypeInput +} + +func (u *mockClientUnit) RegisterAction(_ client.Action) { +} + +func (u *mockClientUnit) UnregisterAction(_ client.Action) { +} + +func (u *mockClientUnit) RegisterDiagnosticHook(_ string, _ string, _ string, _ string, _ client.DiagnosticHook) { +}