diff --git a/CHANGELOG.asciidoc b/CHANGELOG.asciidoc index 82318b1637d..8b148b6f5cb 100644 --- a/CHANGELOG.asciidoc +++ b/CHANGELOG.asciidoc @@ -61,6 +61,7 @@ https://github.com/elastic/beats/compare/v5.1.1...master[Check the HEAD diff] - Downgrade Elasticsearch per batch item failure log to debug level. {issue}3953[3953] - Allow log lines without a program name in the Syslog fileset. {pull}3944[3944] - Fix panic in JSON decoding code if the input line is "null". {pull}4042[4042] +- Properly shut down crawler in case one prospector is misconfigured. {pull}4037[4037] *Heartbeat* - Add default ports in HTTP monitor. {pull}3924[3924] diff --git a/filebeat/beater/filebeat.go b/filebeat/beater/filebeat.go index 0276f4bfb99..d16f3bb9ba2 100644 --- a/filebeat/beater/filebeat.go +++ b/filebeat/beater/filebeat.go @@ -184,6 +184,7 @@ func (fb *Filebeat) Run(b *beat.Beat) error { err = crawler.Start(registrar, config.ProspectorReload) if err != nil { + crawler.Stop() return err } diff --git a/filebeat/crawler/crawler.go b/filebeat/crawler/crawler.go index 9baf86f8084..1dfa8abaaa5 100644 --- a/filebeat/crawler/crawler.go +++ b/filebeat/crawler/crawler.go @@ -76,7 +76,7 @@ func (c *Crawler) startProspector(config *common.Config, states []file.State) er err = p.LoadStates(states) if err != nil { - return fmt.Errorf("error loading states for propsector %v: %v", p.ID(), err) + return fmt.Errorf("error loading states for prospector %v: %v", p.ID(), err) } c.prospectors[p.ID()] = p diff --git a/filebeat/prospector/prospector.go b/filebeat/prospector/prospector.go index f2435bdc51d..604c5cdddd1 100644 --- a/filebeat/prospector/prospector.go +++ b/filebeat/prospector/prospector.go @@ -148,7 +148,7 @@ func (p *Prospector) Start() { logp.Info("Prospector channel stopped") return case <-p.beatDone: - logp.Info("Prospector channel stopped") + logp.Info("Prospector channel stopped because beat is stopping.") return case event := <-p.harvesterChan: // No stopping on error, because on error it is expected that beatDone is closed diff --git a/filebeat/prospector/prospector_log.go b/filebeat/prospector/prospector_log.go index 5f44f998595..9ddfb4c5af9 100644 --- a/filebeat/prospector/prospector_log.go +++ b/filebeat/prospector/prospector_log.go @@ -32,6 +32,10 @@ func NewLog(p *Prospector) (*Log, error) { config: p.config, } + if len(p.config.Paths) == 0 { + return nil, fmt.Errorf("each prospector must have at least one path defined") + } + return prospectorer, nil } diff --git a/filebeat/prospector/prospector_test.go b/filebeat/prospector/prospector_test.go index 0fdfcd28eee..c99a75e4867 100644 --- a/filebeat/prospector/prospector_test.go +++ b/filebeat/prospector/prospector_test.go @@ -28,6 +28,7 @@ func TestProspectorFileExclude(t *testing.T) { prospector := Prospector{ config: prospectorConfig{ + Paths: []string{"test.log"}, ExcludeFiles: []match.Matcher{match.MustCompile(`\.gz$`)}, }, } diff --git a/filebeat/tests/system/config/filebeat.yml.j2 b/filebeat/tests/system/config/filebeat.yml.j2 index b3c6d413d3f..929947d9180 100644 --- a/filebeat/tests/system/config/filebeat.yml.j2 +++ b/filebeat/tests/system/config/filebeat.yml.j2 @@ -82,6 +82,9 @@ filebeat.prospectors: max_lines: {{ max_lines|default(500) }} {% endif %} {% endif %} +{% if prospector_raw %} +{{prospector_raw}} +{% endif %} filebeat.spool_size: filebeat.shutdown_timeout: {{ shutdown_timeout|default(0) }} diff --git a/filebeat/tests/system/test_shutdown.py b/filebeat/tests/system/test_shutdown.py index 3ad8dc63f0d..a87ac65be78 100644 --- a/filebeat/tests/system/test_shutdown.py +++ b/filebeat/tests/system/test_shutdown.py @@ -146,3 +146,27 @@ def nasa_logs(self): self.copy_files(["logs/nasa-50k.log"], source_dir="../files", target_dir="log") + + def test_stopping_empty_path(self): + """ + Test filebeat stops properly when 1 prospector has an invalid config. + """ + + prospector_raw = """ +- input_type: log + paths: [] +""" + + self.render_config_template( + path=os.path.abspath(self.working_dir) + "/log/*", + prospector_raw=prospector_raw, + ) + filebeat = self.start_beat() + time.sleep(2) + + # Wait until first flush + self.wait_until( + lambda: self.log_contains_count("No paths were defined for prospector") >= 1, + max_timeout=5) + + filebeat.check_wait(exit_code=1)