diff --git a/CHANGELOG.asciidoc b/CHANGELOG.asciidoc index c2fe8728bcf..38b6f2dcacd 100644 --- a/CHANGELOG.asciidoc +++ b/CHANGELOG.asciidoc @@ -30,7 +30,7 @@ https://github.com/elastic/beats/compare/v5.3.1...master[Check the HEAD diff] *Affecting all Beats* *Filebeat* - +- Properly shut down crawler in case one prospector is misconfigured. {pull}4037[4037] *Heartbeat* @@ -138,8 +138,6 @@ https://github.com/elastic/beats/compare/v5.2.2...v5.3.0[View commits] filebeat.config_dir. {pull}3573[3573] - Fix empty registry file on machine crash. {issue}3537[3537] -- Fix empty registry file on machine crash. {issue}3537[3537] - *Metricbeat* - Add error handling to system process metricset for when Linux cgroups are missing from the kernel. {pull}3692[3692] diff --git a/filebeat/beater/filebeat.go b/filebeat/beater/filebeat.go index 878b830a7fc..21467ace431 100644 --- a/filebeat/beater/filebeat.go +++ b/filebeat/beater/filebeat.go @@ -177,6 +177,7 @@ func (fb *Filebeat) Run(b *beat.Beat) error { err = crawler.Start(registrar, config.ProspectorReload) if err != nil { + crawler.Stop() return err } diff --git a/filebeat/crawler/crawler.go b/filebeat/crawler/crawler.go index 9baf86f8084..1dfa8abaaa5 100644 --- a/filebeat/crawler/crawler.go +++ b/filebeat/crawler/crawler.go @@ -76,7 +76,7 @@ func (c *Crawler) startProspector(config *common.Config, states []file.State) er err = p.LoadStates(states) if err != nil { - return fmt.Errorf("error loading states for propsector %v: %v", p.ID(), err) + return fmt.Errorf("error loading states for prospector %v: %v", p.ID(), err) } c.prospectors[p.ID()] = p diff --git a/filebeat/prospector/prospector.go b/filebeat/prospector/prospector.go index 398dcde286e..2e3a8420b54 100644 --- a/filebeat/prospector/prospector.go +++ b/filebeat/prospector/prospector.go @@ -134,7 +134,7 @@ func (p *Prospector) Start() { logp.Info("Prospector channel stopped") return case <-p.beatDone: - logp.Info("Prospector channel stopped") + logp.Info("Prospector channel stopped because beat is stopping.") return case event := <-p.harvesterChan: // No stopping on error, because on error it is expected that beatDone is closed diff --git a/filebeat/prospector/prospector_log.go b/filebeat/prospector/prospector_log.go index e696e30af3a..fb3c3304245 100644 --- a/filebeat/prospector/prospector_log.go +++ b/filebeat/prospector/prospector_log.go @@ -30,6 +30,10 @@ func NewProspectorLog(p *Prospector) (*ProspectorLog, error) { config: p.config, } + if len(p.config.Paths) == 0 { + return nil, fmt.Errorf("each prospector must have at least one path defined") + } + return prospectorer, nil } diff --git a/filebeat/prospector/prospector_test.go b/filebeat/prospector/prospector_test.go index d4e95795764..4876f34e7f6 100644 --- a/filebeat/prospector/prospector_test.go +++ b/filebeat/prospector/prospector_test.go @@ -28,6 +28,7 @@ func TestProspectorFileExclude(t *testing.T) { prospector := Prospector{ config: prospectorConfig{ + Paths: []string{"test.log"}, ExcludeFiles: []match.Matcher{match.MustCompile(`\.gz$`)}, }, } diff --git a/filebeat/tests/system/config/filebeat.yml.j2 b/filebeat/tests/system/config/filebeat.yml.j2 index ce19ea2a534..b2b5c4c0b35 100644 --- a/filebeat/tests/system/config/filebeat.yml.j2 +++ b/filebeat/tests/system/config/filebeat.yml.j2 @@ -70,6 +70,9 @@ filebeat.prospectors: max_lines: {{ max_lines|default(500) }} {% endif %} {% endif %} +{% if prospector_raw %} +{{prospector_raw}} +{% endif %} filebeat.spool_size: filebeat.shutdown_timeout: {{ shutdown_timeout|default(0) }} diff --git a/filebeat/tests/system/test_shutdown.py b/filebeat/tests/system/test_shutdown.py index 3ad8dc63f0d..a87ac65be78 100644 --- a/filebeat/tests/system/test_shutdown.py +++ b/filebeat/tests/system/test_shutdown.py @@ -146,3 +146,27 @@ def nasa_logs(self): self.copy_files(["logs/nasa-50k.log"], source_dir="../files", target_dir="log") + + def test_stopping_empty_path(self): + """ + Test filebeat stops properly when 1 prospector has an invalid config. + """ + + prospector_raw = """ +- input_type: log + paths: [] +""" + + self.render_config_template( + path=os.path.abspath(self.working_dir) + "/log/*", + prospector_raw=prospector_raw, + ) + filebeat = self.start_beat() + time.sleep(2) + + # Wait until first flush + self.wait_until( + lambda: self.log_contains_count("No paths were defined for prospector") >= 1, + max_timeout=5) + + filebeat.check_wait(exit_code=1)