diff --git a/CHANGELOG.asciidoc b/CHANGELOG.asciidoc index 07f79171c5c..4a231243d41 100644 --- a/CHANGELOG.asciidoc +++ b/CHANGELOG.asciidoc @@ -57,6 +57,7 @@ https://github.com/elastic/beats/compare/v5.0.0-alpha5...master[Check the HEAD d *Filebeat* - Fix processor failure in Filebeat when using regex, contain, or equals with the message field. {issue}2178[2178] +- Fix async publisher sending empty events {pull}2455[2455] *Winlogbeat* - Fix corrupt registry file that occurs on power loss by disabling file write caching. {issue}2313[2313] diff --git a/filebeat/input/event.go b/filebeat/input/event.go index 2b8b526e478..9bd43214c89 100644 --- a/filebeat/input/event.go +++ b/filebeat/input/event.go @@ -48,6 +48,12 @@ func (f *Event) ToMapStr() common.MapStr { return event } +// HasData returns true if the event itself contains data +// Events without data are only state updates +func (e *Event) HasData() bool { + return e.Bytes > 0 +} + // mergeJSONFields writes the JSON fields in the event map, // respecting the KeysUnderRoot and OverwriteKeys configuration options. // If MessageKey is defined, the Text value from the event always diff --git a/filebeat/publish/publish.go b/filebeat/publish/publish.go index 1a680ab37ba..bf811505a82 100644 --- a/filebeat/publish/publish.go +++ b/filebeat/publish/publish.go @@ -111,7 +111,7 @@ func (p *syncLogPublisher) Start() { pubEvents := make([]common.MapStr, 0, len(events)) for _, event := range events { // Only send event with bytes read. 0 Bytes means state update only - if event.Bytes > 0 { + if event.HasData() { pubEvents = append(pubEvents, event.ToMapStr()) } } @@ -172,9 +172,12 @@ func (p *asyncLogPublisher) Start() { case <-p.done: return case events := <-p.in: - pubEvents := make([]common.MapStr, len(events)) - for i, event := range events { - pubEvents[i] = event.ToMapStr() + + pubEvents := make([]common.MapStr, 0, len(events)) + for _, event := range events { + if event.HasData() { + pubEvents = append(pubEvents, event.ToMapStr()) + } } batch := &eventsBatch{ @@ -185,6 +188,7 @@ func (p *asyncLogPublisher) Start() { publisher.Signal(batch), publisher.Guaranteed) p.active.append(batch) + case <-ticker.C: } diff --git a/filebeat/tests/system/config/filebeat.yml.j2 b/filebeat/tests/system/config/filebeat.yml.j2 index 972644cb407..49572333508 100644 --- a/filebeat/tests/system/config/filebeat.yml.j2 +++ b/filebeat/tests/system/config/filebeat.yml.j2 @@ -74,6 +74,7 @@ filebeat.idle_timeout: 0.1s {% if not skip_registry_config %} filebeat.registry_file: {{ beat.working_dir + '/' }}{{ registryFile|default("registry")}} {%endif%} +filebeat.publish_async: {{publish_async}} #================================ General ===================================== diff --git a/filebeat/tests/system/test_publisher.py b/filebeat/tests/system/test_publisher.py new file mode 100644 index 00000000000..35b7c234d6e --- /dev/null +++ b/filebeat/tests/system/test_publisher.py @@ -0,0 +1,60 @@ +from filebeat import BaseTest + +import os +import platform +import time +import shutil +import json +from nose.plugins.skip import Skip, SkipTest + + +# Additional tests: to be implemented +# * Check if registrar file can be configured, set config param +# * Check "updating" of registrar file +# * Check what happens when registrar file is deleted + + +class Test(BaseTest): + + def test_registrar_file_content(self): + """ + Check if registrar file is created correctly and content is as expected + """ + + self.render_config_template( + path=os.path.abspath(self.working_dir) + "/log/*", + publish_async=True + ) + os.mkdir(self.working_dir + "/log/") + + filebeat = self.start_beat() + + testfile = self.working_dir + "/log/test.log" + file = open(testfile, 'w') + + iterations = 5 + for n in range(0, iterations): + file.write("line " + str(n+1)) + file.write("\n") + + file.close() + + # Let it read the file + self.wait_until( + lambda: self.output_has(lines=iterations), max_timeout=10) + + self.wait_until( + lambda: self.output_has(lines=iterations), max_timeout=10) + + # Wait until registry file is written + self.wait_until( + lambda: self.log_contains( + "Registry file updated."), + max_timeout=15) + + filebeat.check_kill_and_wait() + + data = self.get_registry() + assert len(data) == 1 + assert self.output_has(lines=iterations) +