diff --git a/CHANGELOG.next.asciidoc b/CHANGELOG.next.asciidoc index 68c4e5f203d4..a4327a88b0f6 100644 --- a/CHANGELOG.next.asciidoc +++ b/CHANGELOG.next.asciidoc @@ -29,6 +29,7 @@ https://github.com/elastic/beats/compare/v8.8.1\...main[Check the HEAD diff] - Update Salesforce module to use new Salesforce input. {pull}37509[37509] - Tag events that come from a filestream in "take over" mode. {pull}39828[39828] - Fix high IO and handling of a corrupted registry log file. {pull}35893[35893] +- Filebeat, when running with Elastic-Agent, reports status for Filestream input. {pull}40121[40121] - Enable file ingestion to report detailed status to Elastic Agent {pull}40075[40075] - Fix filestream's registry GC: registry entries will never be removed if clean_inactive is set to "-1". {pull}40258[40258] diff --git a/filebeat/input/filestream/input.go b/filebeat/input/filestream/input.go index 6bacf4e1b8cf..c2efe2c50cdd 100644 --- a/filebeat/input/filestream/input.go +++ b/filebeat/input/filestream/input.go @@ -34,6 +34,7 @@ import ( "github.com/elastic/beats/v7/libbeat/common/file" "github.com/elastic/beats/v7/libbeat/common/match" "github.com/elastic/beats/v7/libbeat/feature" + "github.com/elastic/beats/v7/libbeat/management/status" "github.com/elastic/beats/v7/libbeat/reader" "github.com/elastic/beats/v7/libbeat/reader/debug" "github.com/elastic/beats/v7/libbeat/reader/parser" @@ -164,7 +165,11 @@ func (inp *filestream) Run( }) defer streamCancel() - return inp.readFromSource(ctx, log, r, fs.newPath, state, publisher, metrics) + if err := inp.readFromSource(ctx, log, r, fs.newPath, state, publisher, metrics); err != nil { + ctx.UpdateStatus(status.Degraded, fmt.Sprintf("error while reading from source: %v", err)) + return err + } + return nil } func initState(log *logp.Logger, c loginp.Cursor, s fileSource) state { diff --git a/filebeat/input/filestream/internal/input-logfile/harvester.go b/filebeat/input/filestream/internal/input-logfile/harvester.go index 41cfc83857f3..a7f70c6d31fa 100644 --- a/filebeat/input/filestream/internal/input-logfile/harvester.go +++ b/filebeat/input/filestream/internal/input-logfile/harvester.go @@ -28,6 +28,7 @@ import ( "github.com/elastic/beats/v7/filebeat/input/filestream/internal/task" inputv2 "github.com/elastic/beats/v7/filebeat/input/v2" "github.com/elastic/beats/v7/libbeat/beat" + "github.com/elastic/beats/v7/libbeat/management/status" "github.com/elastic/elastic-agent-libs/logp" "github.com/elastic/go-concert/ctxtool" ) @@ -204,7 +205,7 @@ func startHarvester( if errors.Is(err, ErrHarvesterAlreadyRunning) { return nil } - + ctx.UpdateStatus(status.Degraded, fmt.Sprintf("error while adding new reader to the bookkeeper %v", err)) return fmt.Errorf("error while adding new reader to the bookkeeper %w", err) } @@ -214,6 +215,7 @@ func startHarvester( resource, err := lock(ctx, hg.store, srcID) if err != nil { hg.readers.remove(srcID) + ctx.UpdateStatus(status.Degraded, fmt.Sprintf("error while locking resource: %v", err)) return fmt.Errorf("error while locking resource: %w", err) } defer releaseResource(resource) @@ -223,6 +225,7 @@ func startHarvester( }) if err != nil { hg.readers.remove(srcID) + ctx.UpdateStatus(status.Degraded, fmt.Sprintf("error while connecting to output with pipeline: %v", err)) return fmt.Errorf("error while connecting to output with pipeline: %w", err) } defer client.Close() @@ -234,6 +237,7 @@ func startHarvester( err = hg.harvester.Run(ctx, src, cursor, publisher, metrics) if err != nil && !errors.Is(err, context.Canceled) { hg.readers.remove(srcID) + ctx.UpdateStatus(status.Degraded, fmt.Sprintf("error while running harvester: %v", err)) return fmt.Errorf("error while running harvester: %w", err) } // If the context was not cancelled it means that the Harvester is stopping because of diff --git a/filebeat/input/filestream/internal/input-logfile/input.go b/filebeat/input/filestream/internal/input-logfile/input.go index db3e713fbdd4..88adb5622ce7 100644 --- a/filebeat/input/filestream/internal/input-logfile/input.go +++ b/filebeat/input/filestream/internal/input-logfile/input.go @@ -25,6 +25,7 @@ import ( input "github.com/elastic/beats/v7/filebeat/input/v2" "github.com/elastic/beats/v7/libbeat/beat" "github.com/elastic/beats/v7/libbeat/common/acker" + "github.com/elastic/beats/v7/libbeat/management/status" "github.com/elastic/go-concert/ctxtool" ) @@ -53,6 +54,7 @@ func (inp *managedInput) Run( ctx input.Context, pipeline beat.PipelineConnector, ) (err error) { + ctx.UpdateStatus(status.Starting, "") groupStore := inp.manager.getRetainedStore() defer groupStore.Release() @@ -85,6 +87,10 @@ func (inp *managedInput) Run( defer prospectorStore.Release() sourceStore := newSourceStore(prospectorStore, inp.sourceIdentifier) + // Mark it as running for now. + // Any errors encountered by harverter will change state to Degraded + ctx.UpdateStatus(status.Running, "") + inp.prospector.Run(ctx, sourceStore, hg) // Notify the manager the input has stopped, currently that is used to