Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Do not close filestream harvester if an unexpected error is returned when close.on_state_change.* is enabled #26411

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.next.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -277,6 +277,7 @@ https://github.com/elastic/beats/compare/v7.0.0-alpha2...master[Check the HEAD d
- Fix bug in aws-s3 input where the end of gzipped log files might have been discarded. {pull}26260[26260]
- Fix bug in `httpjson` that prevented `first_event` getting updated. {pull}26407[26407]
- Fix bug in the Syslog input that misparsed rfc5424 days starting with 0. {pull}26419[26419]
- Do not close filestream harvester if an unexpected error is returned when close.on_state_change.* is enabled. {pull}26411[26411]

*Filebeat*

Expand Down
13 changes: 10 additions & 3 deletions filebeat/input/filestream/filestream.go
Original file line number Diff line number Diff line change
Expand Up @@ -179,22 +179,29 @@ func (f *logFile) shouldBeClosed() bool {

info, statErr := f.file.Stat()
if statErr != nil {
// return early if the file does not exist anymore and the reader should be closed
if f.closeRemoved && errors.Is(statErr, os.ErrNotExist) {
f.log.Debugf("close.on_state_change.removed is enabled and file %s has been removed", f.file.Name())
return true
}

// If an unexpected error happens we keep the reader open hoping once everything will go back to normal.
f.log.Errorf("Unexpected error reading from %s; error: %s", f.file.Name(), statErr)
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This one still might produce a false-positive log message. The shouldBeClosed method is called by a separate go-routine independent from the reader go-routine. When the reader go-routine shuts down (cancelled event) while shouldBeClosed is run we will see an error telling us that f is invalid because it was already closed.

In order to prevent the false-positive message, we either need a mutex or some ref-counting on f + a wrapper on the reader that allows us to unblock a pending read call if the file gets closed async.

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's try to solve the race in a separate PR.

return true
return false
}

if f.closeRenamed {
// Check if the file can still be found under the same path
if !isSameFile(f.file.Name(), info) {
f.log.Debugf("close_renamed is enabled and file %s has been renamed", f.file.Name())
f.log.Debugf("close.on_state_change.renamed is enabled and file %s has been renamed", f.file.Name())
return true
}
}

if f.closeRemoved {
// Check if the file name exists. See https://github.com/elastic/filebeat/issues/93
if file.IsRemoved(f.file) {
f.log.Debugf("close_removed is enabled and file %s has been removed", f.file.Name())
f.log.Debugf("close.on_state_change.removed is enabled and file %s has been removed", f.file.Name())
return true
}
}
Expand Down