Skip to content

Commit

Permalink
Interrupt scan on shutdown (elastic#3035)
Browse files Browse the repository at this point in the history
So far before a shutdown a scan was completed. This could heavily slow down the shutdown with a large number of files as the scan had to be completed first.

Further changes:

* Fix flaky filebeat tests for shutdown and inactive on very fast / very slow machines
  • Loading branch information
ruflin authored and suraj-soni committed Dec 15, 2016
1 parent 1e3a381 commit 7b9e11a
Show file tree
Hide file tree
Showing 4 changed files with 21 additions and 12 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,7 @@ https://github.com/elastic/beats/compare/v5.0.0...master[Check the HEAD diff]
- Only load matching states into prospector to improve state handling {pull}2840[2840]
- Reset all states ttl on startup to make sure it is overwritten by new config {pull}2840[2840]
- Persist all states for files which fall under ignore_older to have consistent behaviour {pull}2859[2859]
- Improve shutdown behaviour with large number of files. {pull}3035[3035]

*Winlogbeat*

Expand Down
7 changes: 7 additions & 0 deletions filebeat/prospector/prospector_log.go
Original file line number Diff line number Diff line change
Expand Up @@ -202,6 +202,13 @@ func (p *ProspectorLog) scan() {

for path, info := range p.getFiles() {

select {
case <-p.Prospector.done:
logp.Info("Scan aborted because prospector stopped.")
return
default:
}

logp.Debug("prospector", "Check file for harvesting: %s", path)

// Create new state for comparison
Expand Down
23 changes: 12 additions & 11 deletions filebeat/tests/system/test_registrar.py
Original file line number Diff line number Diff line change
Expand Up @@ -1145,7 +1145,7 @@ def test_restart_state_reset_ttl(self):

self.render_config_template(
path=os.path.abspath(self.working_dir) + "/log/test.log",
clean_inactive="10s",
clean_inactive="20s",
ignore_older="5s"
)
os.mkdir(self.working_dir + "/log/")
Expand All @@ -1167,28 +1167,29 @@ def test_restart_state_reset_ttl(self):
# Check that ttl > 0 was set because of clean_inactive
data = self.get_registry()
assert len(data) == 1
assert data[0]["ttl"] == 10 * 1000 * 1000 * 1000
assert data[0]["ttl"] == 20 * 1000 * 1000 * 1000

# No config file which does not match the existing state
# New config file which does not match the existing clean_inactive
self.render_config_template(
path=os.path.abspath(self.working_dir) + "/log/test.log",
clean_inactive="20s",
clean_inactive="40s",
ignore_older="5s",
)

filebeat = self.start_beat(output="filebeat2.log")

# Wait until new state is written

self.wait_until(
lambda: self.log_contains("Flushing spooler because of timeout. Events flushed: ", logfile="filebeat2.log"),
max_timeout=10)
lambda: self.log_contains("Flushing spooler because of timeout. Events flushed: ",
logfile="filebeat2.log"), max_timeout=10)

filebeat.check_kill_and_wait()

# Check that ttl was reset correctly
data = self.get_registry()
assert len(data) == 1
assert data[0]["ttl"] == 20 * 1000 * 1000 * 1000
assert data[0]["ttl"] == 40 * 1000 * 1000 * 1000

def test_restart_state_reset_ttl_with_space(self):
"""
Expand All @@ -1198,7 +1199,7 @@ def test_restart_state_reset_ttl_with_space(self):

self.render_config_template(
path=os.path.abspath(self.working_dir) + "/log/test file.log",
clean_inactive="10s",
clean_inactive="20s",
ignore_older="5s"
)
os.mkdir(self.working_dir + "/log/")
Expand All @@ -1220,12 +1221,12 @@ def test_restart_state_reset_ttl_with_space(self):
# Check that ttl > 0 was set because of clean_inactive
data = self.get_registry()
assert len(data) == 1
assert data[0]["ttl"] == 10 * 1000 * 1000 * 1000
assert data[0]["ttl"] == 20 * 1000 * 1000 * 1000

# new config file whith other clean_inactive
self.render_config_template(
path=os.path.abspath(self.working_dir) + "/log/test file.log",
clean_inactive="20s",
clean_inactive="40s",
ignore_older="5s",
)

Expand All @@ -1241,7 +1242,7 @@ def test_restart_state_reset_ttl_with_space(self):
# Check that ttl was reset correctly
data = self.get_registry()
assert len(data) == 1
assert data[0]["ttl"] == 20 * 1000 * 1000 * 1000
assert data[0]["ttl"] == 40 * 1000 * 1000 * 1000


def test_restart_state_reset_ttl_no_clean_inactive(self):
Expand Down
2 changes: 1 addition & 1 deletion filebeat/tests/system/test_shutdown.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ def test_shutdown_wait_ok(self):

# Wait until first flush
self.wait_until(
lambda: self.log_contains("Flushing spooler because spooler full"),
lambda: self.log_contains_count("Flushing spooler") > 1,
max_timeout=15)

filebeat.check_kill_and_wait()
Expand Down

0 comments on commit 7b9e11a

Please sign in to comment.