Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Prevent prospector scans from racing and leaking harvesters #2539

Closed
wants to merge 1 commit into from
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions filebeat/prospector/prospector_log.go
Original file line number Diff line number Diff line change
@@ -11,6 +11,7 @@ import (
"github.com/elastic/beats/filebeat/input"
"github.com/elastic/beats/filebeat/input/file"
"github.com/elastic/beats/libbeat/logp"
"sync"
)

var (
@@ -22,6 +23,7 @@ type ProspectorLog struct {
Prospector *Prospector
config prospectorConfig
lastClean time.Time
mutex sync.Mutex
}

func NewProspectorLog(p *Prospector) (*ProspectorLog, error) {
@@ -54,6 +56,8 @@ func (p *ProspectorLog) Init() {
}

func (p *ProspectorLog) Run() {
p.mutex.Lock()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This indicates that you suspect that Run is called for the same prospector multiple times in parallel. This should not be the case as Run is not executed inside a go routine: https://github.com/elastic/beats/blob/master/filebeat/prospector/prospector.go#L145 Let me know if I miss some parts here.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You are right, sorry I missed that. The mutex is not required - not sure why I missed that.

defer p.mutex.Unlock()
logp.Debug("prospector", "Start next scan")

p.scan()
@@ -179,6 +183,7 @@ func (p *ProspectorLog) scan() {
// Decides if previous state exists
if lastState.IsEmpty() {
logp.Debug("prospector", "Start harvester for new file: %s", newState.Source)
p.Prospector.states.Update(newState)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

New states should not be persisted before the harvester is started as during the startup of a harvester also some checks will be made and only then the state will be sent to the prospector.

err := p.Prospector.startHarvester(newState, 0)
if err != nil {
logp.Err("Harvester could not be started on new file: %s, Err: %s", newState.Source, err)