Skip to content

Commit

Permalink
Fix async publisher sending empty events (elastic#2455) (elastic#2462)
Browse files Browse the repository at this point in the history
  • Loading branch information
ruflin authored and tsg committed Sep 7, 2016
1 parent 132a3e6 commit 0e9dc48
Show file tree
Hide file tree
Showing 5 changed files with 76 additions and 4 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ https://github.com/elastic/beats/compare/v5.0.0-alpha5...master[Check the HEAD d

*Filebeat*
- Fix processor failure in Filebeat when using regex, contain, or equals with the message field. {issue}2178[2178]
- Fix async publisher sending empty events {pull}2455[2455]

*Winlogbeat*
- Fix corrupt registry file that occurs on power loss by disabling file write caching. {issue}2313[2313]
Expand Down
6 changes: 6 additions & 0 deletions filebeat/input/event.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,12 @@ func (f *Event) ToMapStr() common.MapStr {
return event
}

// HasData returns true if the event itself contains data
// Events without data are only state updates
func (e *Event) HasData() bool {
return e.Bytes > 0
}

// mergeJSONFields writes the JSON fields in the event map,
// respecting the KeysUnderRoot and OverwriteKeys configuration options.
// If MessageKey is defined, the Text value from the event always
Expand Down
12 changes: 8 additions & 4 deletions filebeat/publish/publish.go
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,7 @@ func (p *syncLogPublisher) Start() {
pubEvents := make([]common.MapStr, 0, len(events))
for _, event := range events {
// Only send event with bytes read. 0 Bytes means state update only
if event.Bytes > 0 {
if event.HasData() {
pubEvents = append(pubEvents, event.ToMapStr())
}
}
Expand Down Expand Up @@ -172,9 +172,12 @@ func (p *asyncLogPublisher) Start() {
case <-p.done:
return
case events := <-p.in:
pubEvents := make([]common.MapStr, len(events))
for i, event := range events {
pubEvents[i] = event.ToMapStr()

pubEvents := make([]common.MapStr, 0, len(events))
for _, event := range events {
if event.HasData() {
pubEvents = append(pubEvents, event.ToMapStr())
}
}

batch := &eventsBatch{
Expand All @@ -185,6 +188,7 @@ func (p *asyncLogPublisher) Start() {
publisher.Signal(batch), publisher.Guaranteed)

p.active.append(batch)

case <-ticker.C:
}

Expand Down
1 change: 1 addition & 0 deletions filebeat/tests/system/config/filebeat.yml.j2
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,7 @@ filebeat.idle_timeout: 0.1s
{% if not skip_registry_config %}
filebeat.registry_file: {{ beat.working_dir + '/' }}{{ registryFile|default("registry")}}
{%endif%}
filebeat.publish_async: {{publish_async}}


#================================ General =====================================
Expand Down
60 changes: 60 additions & 0 deletions filebeat/tests/system/test_publisher.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
from filebeat import BaseTest

import os
import platform
import time
import shutil
import json
from nose.plugins.skip import Skip, SkipTest


# Additional tests: to be implemented
# * Check if registrar file can be configured, set config param
# * Check "updating" of registrar file
# * Check what happens when registrar file is deleted


class Test(BaseTest):

def test_registrar_file_content(self):
"""
Check if registrar file is created correctly and content is as expected
"""

self.render_config_template(
path=os.path.abspath(self.working_dir) + "/log/*",
publish_async=True
)
os.mkdir(self.working_dir + "/log/")

filebeat = self.start_beat()

testfile = self.working_dir + "/log/test.log"
file = open(testfile, 'w')

iterations = 5
for n in range(0, iterations):
file.write("line " + str(n+1))
file.write("\n")

file.close()

# Let it read the file
self.wait_until(
lambda: self.output_has(lines=iterations), max_timeout=10)

self.wait_until(
lambda: self.output_has(lines=iterations), max_timeout=10)

# Wait until registry file is written
self.wait_until(
lambda: self.log_contains(
"Registry file updated."),
max_timeout=15)

filebeat.check_kill_and_wait()

data = self.get_registry()
assert len(data) == 1
assert self.output_has(lines=iterations)

0 comments on commit 0e9dc48

Please sign in to comment.