Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[8.15](backport #40121) [filebeat][filestream] Enable status reporter for filestream input #40168

Closed
wants to merge 4 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.next.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ https://github.com/elastic/beats/compare/v8.8.1\...main[Check the HEAD diff]
- Update Salesforce module to use new Salesforce input. {pull}37509[37509]
- Tag events that come from a filestream in "take over" mode. {pull}39828[39828]
- Fix high IO and handling of a corrupted registry log file. {pull}35893[35893]
- Filebeat, when running with Elastic-Agent, reports status for Filestream input. {pull}40121[40121]
- Enable file ingestion to report detailed status to Elastic Agent {pull}40075[40075]
- Fix filestream's registry GC: registry entries will never be removed if clean_inactive is set to "-1". {pull}40258[40258]

Expand Down
7 changes: 6 additions & 1 deletion filebeat/input/filestream/input.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ import (
"github.com/elastic/beats/v7/libbeat/common/file"
"github.com/elastic/beats/v7/libbeat/common/match"
"github.com/elastic/beats/v7/libbeat/feature"
"github.com/elastic/beats/v7/libbeat/management/status"
"github.com/elastic/beats/v7/libbeat/reader"
"github.com/elastic/beats/v7/libbeat/reader/debug"
"github.com/elastic/beats/v7/libbeat/reader/parser"
Expand Down Expand Up @@ -164,7 +165,11 @@ func (inp *filestream) Run(
})
defer streamCancel()

return inp.readFromSource(ctx, log, r, fs.newPath, state, publisher, metrics)
if err := inp.readFromSource(ctx, log, r, fs.newPath, state, publisher, metrics); err != nil {
ctx.UpdateStatus(status.Degraded, fmt.Sprintf("error while reading from source: %v", err))
return err
}
return nil
}

func initState(log *logp.Logger, c loginp.Cursor, s fileSource) state {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import (
"github.com/elastic/beats/v7/filebeat/input/filestream/internal/task"
inputv2 "github.com/elastic/beats/v7/filebeat/input/v2"
"github.com/elastic/beats/v7/libbeat/beat"
"github.com/elastic/beats/v7/libbeat/management/status"
"github.com/elastic/elastic-agent-libs/logp"
"github.com/elastic/go-concert/ctxtool"
)
Expand Down Expand Up @@ -204,7 +205,7 @@ func startHarvester(
if errors.Is(err, ErrHarvesterAlreadyRunning) {
return nil
}

ctx.UpdateStatus(status.Degraded, fmt.Sprintf("error while adding new reader to the bookkeeper %v", err))
return fmt.Errorf("error while adding new reader to the bookkeeper %w", err)
}

Expand All @@ -214,6 +215,7 @@ func startHarvester(
resource, err := lock(ctx, hg.store, srcID)
if err != nil {
hg.readers.remove(srcID)
ctx.UpdateStatus(status.Degraded, fmt.Sprintf("error while locking resource: %v", err))
return fmt.Errorf("error while locking resource: %w", err)
}
defer releaseResource(resource)
Expand All @@ -223,6 +225,7 @@ func startHarvester(
})
if err != nil {
hg.readers.remove(srcID)
ctx.UpdateStatus(status.Degraded, fmt.Sprintf("error while connecting to output with pipeline: %v", err))
return fmt.Errorf("error while connecting to output with pipeline: %w", err)
}
defer client.Close()
Expand All @@ -234,6 +237,7 @@ func startHarvester(
err = hg.harvester.Run(ctx, src, cursor, publisher, metrics)
if err != nil && !errors.Is(err, context.Canceled) {
hg.readers.remove(srcID)
ctx.UpdateStatus(status.Degraded, fmt.Sprintf("error while running harvester: %v", err))
return fmt.Errorf("error while running harvester: %w", err)
}
// If the context was not cancelled it means that the Harvester is stopping because of
Expand Down
6 changes: 6 additions & 0 deletions filebeat/input/filestream/internal/input-logfile/input.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (
input "github.com/elastic/beats/v7/filebeat/input/v2"
"github.com/elastic/beats/v7/libbeat/beat"
"github.com/elastic/beats/v7/libbeat/common/acker"
"github.com/elastic/beats/v7/libbeat/management/status"
"github.com/elastic/go-concert/ctxtool"
)

Expand Down Expand Up @@ -53,6 +54,7 @@ func (inp *managedInput) Run(
ctx input.Context,
pipeline beat.PipelineConnector,
) (err error) {
ctx.UpdateStatus(status.Starting, "")
groupStore := inp.manager.getRetainedStore()
defer groupStore.Release()

Expand Down Expand Up @@ -85,6 +87,10 @@ func (inp *managedInput) Run(
defer prospectorStore.Release()
sourceStore := newSourceStore(prospectorStore, inp.sourceIdentifier)

// Mark it as running for now.
// Any errors encountered by harverter will change state to Degraded
ctx.UpdateStatus(status.Running, "")

inp.prospector.Run(ctx, sourceStore, hg)

// Notify the manager the input has stopped, currently that is used to
Expand Down
Loading