diff --git a/CHANGELOG.asciidoc b/CHANGELOG.asciidoc index 4b34f1010b76..c2826be5b452 100644 --- a/CHANGELOG.asciidoc +++ b/CHANGELOG.asciidoc @@ -38,6 +38,7 @@ https://github.com/elastic/beats/compare/v5.0.0-beta1...master[Check the HEAD di *Topbeat* *Filebeat* +- Fix input buffer on encoding problem *Winlogbeat* diff --git a/filebeat/harvester/reader/line.go b/filebeat/harvester/reader/line.go index ca7aac09d57b..de9c0e62e4e3 100644 --- a/filebeat/harvester/reader/line.go +++ b/filebeat/harvester/reader/line.go @@ -131,6 +131,8 @@ func (l *Line) advance() error { sz, err := l.decode(idx + len(l.nl)) if err != nil { logp.Err("Error decoding line: %s", err) + // In case of error increase size by unencoded length + sz = idx + len(l.nl) } // consume transformed bytes from input buffer @@ -157,19 +159,20 @@ func (l *Line) decode(end int) (int, error) { var nDst, nSrc int nDst, nSrc, err = l.decoder.Transform(buffer, inBytes[start:end], false) - - start += nSrc - - l.outBuffer.Write(buffer[:nDst]) - if err != nil { - if err == transform.ErrShortDst { // continue transforming - // Reset error as decoding continues - err = nil - continue + // Check if error is different from destination buffer too short + if err != transform.ErrShortDst { + l.outBuffer.Write(inBytes[0:end]) + start = end + break } - break + + // Reset error as decoding continues + err = nil } + + start += nSrc + l.outBuffer.Write(buffer[:nDst]) } l.byteCount += start diff --git a/filebeat/tests/system/test_harvester.py b/filebeat/tests/system/test_harvester.py index 9af6c1119a98..fa4fcef612f0 100644 --- a/filebeat/tests/system/test_harvester.py +++ b/filebeat/tests/system/test_harvester.py @@ -1,3 +1,5 @@ +# coding=utf-8 + from filebeat import BaseTest import os import codecs @@ -756,3 +758,48 @@ def test_truncate(self): # Check that only 1 registry entry as original was only truncated data = self.get_registry() assert len(data) == 1 + + + def test_decode_error(self): + """ + Tests that in case of a decoding error it is handled gracefully + """ + self.render_config_template( + path=os.path.abspath(self.working_dir) + "/log/*", + encoding="GBK", # Set invalid encoding for entry below which is actually uft-8 + ) + + os.mkdir(self.working_dir + "/log/") + + logfile = self.working_dir + "/log/test.log" + + with open(logfile, 'w') as file: + file.write("hello world1" + "\n") + + file.write('' + '\n') + file.write("hello world2" + "\n") + + filebeat = self.start_beat() + + # Make sure both files were read + self.wait_until( + lambda: self.output_has(lines=3), + max_timeout=10) + + # Wait until error shows up + self.wait_until( + lambda: self.log_contains("Error decoding line: simplifiedchinese: invalid GBK encoding"), + max_timeout=5) + + filebeat.check_kill_and_wait() + + # Check that only 1 registry entry as original was only truncated + data = self.get_registry() + assert len(data) == 1 + + output = self.read_output_json() + assert output[2]["message"] == "hello world2" + + + +