Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Properly shut down crawler in case one prospector is misconfigured #4037

Merged
merged 1 commit into from
Apr 19, 2017
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.asciidoc
Original file line number Diff line number Diff line change
@@ -61,6 +61,7 @@ https://github.com/elastic/beats/compare/v5.1.1...master[Check the HEAD diff]
- Downgrade Elasticsearch per batch item failure log to debug level. {issue}3953[3953]
- Allow log lines without a program name in the Syslog fileset. {pull}3944[3944]
- Fix panic in JSON decoding code if the input line is "null". {pull}4042[4042]
- Properly shut down crawler in case one prospector is misconfigured. {pull}4037[4037]

*Heartbeat*
- Add default ports in HTTP monitor. {pull}3924[3924]
1 change: 1 addition & 0 deletions filebeat/beater/filebeat.go
Original file line number Diff line number Diff line change
@@ -184,6 +184,7 @@ func (fb *Filebeat) Run(b *beat.Beat) error {

err = crawler.Start(registrar, config.ProspectorReload)
if err != nil {
crawler.Stop()
return err
}

2 changes: 1 addition & 1 deletion filebeat/crawler/crawler.go
Original file line number Diff line number Diff line change
@@ -76,7 +76,7 @@ func (c *Crawler) startProspector(config *common.Config, states []file.State) er

err = p.LoadStates(states)
if err != nil {
return fmt.Errorf("error loading states for propsector %v: %v", p.ID(), err)
return fmt.Errorf("error loading states for prospector %v: %v", p.ID(), err)
}

c.prospectors[p.ID()] = p
2 changes: 1 addition & 1 deletion filebeat/prospector/prospector.go
Original file line number Diff line number Diff line change
@@ -148,7 +148,7 @@ func (p *Prospector) Start() {
logp.Info("Prospector channel stopped")
return
case <-p.beatDone:
logp.Info("Prospector channel stopped")
logp.Info("Prospector channel stopped because beat is stopping.")
return
case event := <-p.harvesterChan:
// No stopping on error, because on error it is expected that beatDone is closed
4 changes: 4 additions & 0 deletions filebeat/prospector/prospector_log.go
Original file line number Diff line number Diff line change
@@ -32,6 +32,10 @@ func NewLog(p *Prospector) (*Log, error) {
config: p.config,
}

if len(p.config.Paths) == 0 {
return nil, fmt.Errorf("each prospector must have at least one path defined")
}

return prospectorer, nil
}

1 change: 1 addition & 0 deletions filebeat/prospector/prospector_test.go
Original file line number Diff line number Diff line change
@@ -28,6 +28,7 @@ func TestProspectorFileExclude(t *testing.T) {

prospector := Prospector{
config: prospectorConfig{
Paths: []string{"test.log"},
ExcludeFiles: []match.Matcher{match.MustCompile(`\.gz$`)},
},
}
3 changes: 3 additions & 0 deletions filebeat/tests/system/config/filebeat.yml.j2
Original file line number Diff line number Diff line change
@@ -82,6 +82,9 @@ filebeat.prospectors:
max_lines: {{ max_lines|default(500) }}
{% endif %}
{% endif %}
{% if prospector_raw %}
{{prospector_raw}}
{% endif %}

filebeat.spool_size:
filebeat.shutdown_timeout: {{ shutdown_timeout|default(0) }}
24 changes: 24 additions & 0 deletions filebeat/tests/system/test_shutdown.py
Original file line number Diff line number Diff line change
@@ -146,3 +146,27 @@ def nasa_logs(self):
self.copy_files(["logs/nasa-50k.log"],
source_dir="../files",
target_dir="log")

def test_stopping_empty_path(self):
"""
Test filebeat stops properly when 1 prospector has an invalid config.
"""

prospector_raw = """
- input_type: log
paths: []
"""

self.render_config_template(
path=os.path.abspath(self.working_dir) + "/log/*",
prospector_raw=prospector_raw,
)
filebeat = self.start_beat()
time.sleep(2)

# Wait until first flush
self.wait_until(
lambda: self.log_contains_count("No paths were defined for prospector") >= 1,
max_timeout=5)

filebeat.check_wait(exit_code=1)