Skip to content

Commit

Permalink
Modify clean_removed handling (#3827)
Browse files Browse the repository at this point in the history
* Modify clean_removed handling

Previously if a file could be found under the same name as a state, the state was not removed. But this could have been also an other file. With this change also the file itself is compared and if it is not the same file, the state will be removed. This has the affect that in case a file is renamed after monitoring the file finished, the state will be removed. In most cases this should not have any side affect.

The positive effect of this change is that there will be less left over states in the registry.

Closes #3789

* implement review changes
  • Loading branch information
ruflin authored and 7AC committed Apr 18, 2017
1 parent 6977885 commit caf398f
Show file tree
Hide file tree
Showing 5 changed files with 34 additions and 14 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ https://github.com/elastic/beats/compare/v5.1.1...master[Check the HEAD diff]
- Always use absolute path for event and registry. This can lead to issues when relative paths were used before. {pull}3328[3328]
- Remove code to convert states from 1.x. {pull}3767[3767]
- Remove deprecated config options force_close_files and close_older. {pull}3768[3768]
- Change clean_removed behaviour to also remove states for files which cannot be found anymore under the same name. {pull}3827[3827]

*Heartbeat*

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -273,14 +273,12 @@ NOTE: Every time a file is renamed, the file state is updated and the counter fo
[[clean-removed]]
===== clean_removed

When this option is enabled, Filebeat cleans files from the registry if they cannot be found on disk anymore. This setting does not apply to renamed files or files that were moved to another directory that is still visible to Filebeat. This option is enabled by default.

When this option is enabled, Filebeat cleans files from the registry if they cannot be found on disk anymore under the last known name. This means also files which were renamed after the harvester was finished will be removed. This option is enabled by default.

If a shared drive disappears for a short period and appears again, all files will be read again from the beginning because the states were removed from the registry file. In such cases, we recommend that you disable the `clean_removed` option.

You must disable this option if you also disable `close_removed`.


[[scan-frequency]]
===== scan_frequency

Expand Down
36 changes: 27 additions & 9 deletions filebeat/prospector/prospector_log.go
Original file line number Diff line number Diff line change
Expand Up @@ -94,24 +94,42 @@ func (l *Log) Run() {
if l.config.CleanRemoved {
for _, state := range l.Prospector.states.GetStates() {
// os.Stat will return an error in case the file does not exist
_, err := os.Stat(state.Source)
stat, err := os.Stat(state.Source)
if err != nil {
// Only clean up files where state is Finished
if state.Finished {
state.TTL = 0
err := l.Prospector.updateState(input.NewEvent(state))
if err != nil {
logp.Err("File cleanup state update error: %s", err)
}
if os.IsNotExist(err) {
l.removeState(state)
logp.Debug("prospector", "Remove state for file as file removed: %s", state.Source)
} else {
logp.Debug("prospector", "State for file not removed because not finished: %s", state.Source)
logp.Err("Prospector state for %s was not removed: %s", state.Source, err)
}
} else {
// Check if existing source on disk and state are the same. Remove if not the case.
newState := file.NewState(stat, state.Source)
if !newState.FileStateOS.IsSame(state.FileStateOS) {
l.removeState(state)
logp.Debug("prospector", "Remove state for file as file removed or renamed: %s", state.Source)
}
}
}
}
}

func (l *Log) removeState(state file.State) {

// Only clean up files where state is Finished
if !state.Finished {
logp.Debug("prospector", "State for file not removed because harvester not finished: %s", state.Source)
return
}

state.TTL = 0
err := l.Prospector.updateState(input.NewEvent(state))
if err != nil {
logp.Err("File cleanup state update error: %s", err)
}

}

// getFiles returns all files which have to be harvested
// All globs are expanded and then directory and excluded files are removed
func (l *Log) getFiles() map[string]os.FileInfo {
Expand Down
4 changes: 3 additions & 1 deletion filebeat/tests/system/test_harvester.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ def test_close_renamed(self):
self.render_config_template(
path=os.path.abspath(self.working_dir) + "/log/test.log",
close_renamed="true",
clean_removed="false",
scan_frequency="0.1s"
)
os.mkdir(self.working_dir + "/log/")
Expand Down Expand Up @@ -620,7 +621,8 @@ def test_symlink_removed(self):
self.render_config_template(
path=os.path.abspath(self.working_dir) + "/log/symlink.log",
symlinks="true",
clean_removed="false"
clean_removed="false",
close_removed="false",
)

os.mkdir(self.working_dir + "/log/")
Expand Down
3 changes: 2 additions & 1 deletion filebeat/tests/system/test_registrar.py
Original file line number Diff line number Diff line change
Expand Up @@ -396,7 +396,8 @@ def test_rotating_file_with_restart(self):
self.render_config_template(
path=os.path.abspath(self.working_dir) + "/log/input*",
scan_frequency="1s",
close_inactive="1s"
close_inactive="1s",
clean_removed="false"
)

if os.name == "nt":
Expand Down

0 comments on commit caf398f

Please sign in to comment.