Skip to content

Commit

Permalink
[filebeat][filestream] Enable status reporter for filestream input (#…
Browse files Browse the repository at this point in the history
…40121)

* initial commit filestream status

* fix: test cleanup

* fix: move the statusReporter to correct place

* fix: remove test cases for now

* chore: add changelog

* fix: address review comments
  • Loading branch information
VihasMakwana authored Jul 10, 2024
1 parent 9848f68 commit 0e9c9de
Show file tree
Hide file tree
Showing 4 changed files with 18 additions and 2 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.next.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ https://github.com/elastic/beats/compare/v8.8.1\...main[Check the HEAD diff]
- Update Salesforce module to use new Salesforce input. {pull}37509[37509]
- Tag events that come from a filestream in "take over" mode. {pull}39828[39828]
- Fix high IO and handling of a corrupted registry log file. {pull}35893[35893]
- Filebeat, when running with Elastic-Agent, reports status for Filestream input. {pull}40121[40121]

*Heartbeat*

Expand Down
7 changes: 6 additions & 1 deletion filebeat/input/filestream/input.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ import (
"github.com/elastic/beats/v7/libbeat/common/file"
"github.com/elastic/beats/v7/libbeat/common/match"
"github.com/elastic/beats/v7/libbeat/feature"
"github.com/elastic/beats/v7/libbeat/management/status"
"github.com/elastic/beats/v7/libbeat/reader"
"github.com/elastic/beats/v7/libbeat/reader/debug"
"github.com/elastic/beats/v7/libbeat/reader/parser"
Expand Down Expand Up @@ -163,7 +164,11 @@ func (inp *filestream) Run(
})
defer streamCancel()

return inp.readFromSource(ctx, log, r, fs.newPath, state, publisher, metrics)
if err := inp.readFromSource(ctx, log, r, fs.newPath, state, publisher, metrics); err != nil {
ctx.UpdateStatus(status.Degraded, fmt.Sprintf("error while reading from source: %v", err))
return err
}
return nil
}

func initState(log *logp.Logger, c loginp.Cursor, s fileSource) state {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import (
"github.com/elastic/beats/v7/filebeat/input/filestream/internal/task"
inputv2 "github.com/elastic/beats/v7/filebeat/input/v2"
"github.com/elastic/beats/v7/libbeat/beat"
"github.com/elastic/beats/v7/libbeat/management/status"
"github.com/elastic/elastic-agent-libs/logp"
"github.com/elastic/go-concert/ctxtool"
)
Expand Down Expand Up @@ -204,7 +205,7 @@ func startHarvester(
if errors.Is(err, ErrHarvesterAlreadyRunning) {
return nil
}

ctx.UpdateStatus(status.Degraded, fmt.Sprintf("error while adding new reader to the bookkeeper %v", err))
return fmt.Errorf("error while adding new reader to the bookkeeper %w", err)
}

Expand All @@ -214,6 +215,7 @@ func startHarvester(
resource, err := lock(ctx, hg.store, srcID)
if err != nil {
hg.readers.remove(srcID)
ctx.UpdateStatus(status.Degraded, fmt.Sprintf("error while locking resource: %v", err))
return fmt.Errorf("error while locking resource: %w", err)
}
defer releaseResource(resource)
Expand All @@ -223,6 +225,7 @@ func startHarvester(
})
if err != nil {
hg.readers.remove(srcID)
ctx.UpdateStatus(status.Degraded, fmt.Sprintf("error while connecting to output with pipeline: %v", err))
return fmt.Errorf("error while connecting to output with pipeline: %w", err)
}
defer client.Close()
Expand All @@ -234,6 +237,7 @@ func startHarvester(
err = hg.harvester.Run(ctx, src, cursor, publisher, metrics)
if err != nil && !errors.Is(err, context.Canceled) {
hg.readers.remove(srcID)
ctx.UpdateStatus(status.Degraded, fmt.Sprintf("error while running harvester: %v", err))
return fmt.Errorf("error while running harvester: %w", err)
}
// If the context was not cancelled it means that the Harvester is stopping because of
Expand Down
6 changes: 6 additions & 0 deletions filebeat/input/filestream/internal/input-logfile/input.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (
input "github.com/elastic/beats/v7/filebeat/input/v2"
"github.com/elastic/beats/v7/libbeat/beat"
"github.com/elastic/beats/v7/libbeat/common/acker"
"github.com/elastic/beats/v7/libbeat/management/status"
"github.com/elastic/go-concert/ctxtool"
)

Expand Down Expand Up @@ -53,6 +54,7 @@ func (inp *managedInput) Run(
ctx input.Context,
pipeline beat.PipelineConnector,
) (err error) {
ctx.UpdateStatus(status.Starting, "")
groupStore := inp.manager.getRetainedStore()
defer groupStore.Release()

Expand Down Expand Up @@ -85,6 +87,10 @@ func (inp *managedInput) Run(
defer prospectorStore.Release()
sourceStore := newSourceStore(prospectorStore, inp.sourceIdentifier)

// Mark it as running for now.
// Any errors encountered by harverter will change state to Degraded
ctx.UpdateStatus(status.Running, "")

inp.prospector.Run(ctx, sourceStore, hg)

// Notify the manager the input has stopped, currently that is used to
Expand Down

0 comments on commit 0e9c9de

Please sign in to comment.