From bbf3db2765788accbfbcd34d9873ae3bf702e860 Mon Sep 17 00:00:00 2001 From: urso Date: Mon, 2 May 2016 17:48:39 +0200 Subject: [PATCH] multiline reader normalizing multiline character --- CHANGELOG.asciidoc | 1 + filebeat/harvester/linereader.go | 6 ++-- filebeat/harvester/processor/multiline.go | 36 +++++++++++++------ .../harvester/processor/multiline_test.go | 5 +-- 4 files changed, 31 insertions(+), 17 deletions(-) diff --git a/CHANGELOG.asciidoc b/CHANGELOG.asciidoc index 38cbf6cfa6d..c70f370b163 100644 --- a/CHANGELOG.asciidoc +++ b/CHANGELOG.asciidoc @@ -104,6 +104,7 @@ https://github.com/elastic/beats/compare/v5.0.0alpha1...v5.0.0-alpha2[View commi - Fix issue with JSON decoding where `@timestamp` or `type` keys with the wrong type could cause Filebeat to crash. {issue}1378[1378] - Fix issue with JSON decoding where values having `null` as values could crash Filebeat. {issue}1466[1466] +- Multiline reader normalizing newline to use `\n`. {pull}1552[1552] *Winlogbeat* diff --git a/filebeat/harvester/linereader.go b/filebeat/harvester/linereader.go index 23b436d54d1..bc7ff79914c 100644 --- a/filebeat/harvester/linereader.go +++ b/filebeat/harvester/linereader.go @@ -33,15 +33,13 @@ func createLineReader( p = processor.NewJSONProcessor(p, jsonConfig) } + p = processor.NewStripNewline(p) if mlrConfig != nil { - p, err = processor.NewMultiline(p, maxBytes, mlrConfig) + p, err = processor.NewMultiline(p, "\n", maxBytes, mlrConfig) if err != nil { return nil, err } - - return processor.NewStripNewline(p), nil } - p = processor.NewStripNewline(p) return processor.NewLimitProcessor(p, maxBytes), nil } diff --git a/filebeat/harvester/processor/multiline.go b/filebeat/harvester/processor/multiline.go index e9b1202506f..87258377e08 100644 --- a/filebeat/harvester/processor/multiline.go +++ b/filebeat/harvester/processor/multiline.go @@ -22,10 +22,11 @@ import ( // Errors will force the multiline processor to return the currently active // multiline event first and finally return the actual error on next call to Next. type MultiLine struct { - reader LineProcessor - pred matcher - maxBytes int // bytes stored in content - maxLines int + reader LineProcessor + pred matcher + maxBytes int // bytes stored in content + maxLines int + separator []byte ts time.Time content []byte @@ -58,6 +59,7 @@ var ( // line events into stream of multi-line events. func NewMultiline( r LineProcessor, + separator string, maxBytes int, config *config.MultilineConfig, ) (*MultiLine, error) { @@ -102,11 +104,12 @@ func NewMultiline( } mlr := &MultiLine{ - reader: r, - pred: matcher, - state: (*MultiLine).readFirst, - maxBytes: maxBytes, - maxLines: maxLines, + reader: r, + pred: matcher, + state: (*MultiLine).readFirst, + maxBytes: maxBytes, + maxLines: maxLines, + separator: []byte(separator), } return mlr, nil } @@ -239,14 +242,25 @@ func (mlr *MultiLine) addLine(l Line) { return } - space := mlr.maxBytes - len(mlr.content) + sz := len(mlr.content) + addSeparator := len(mlr.content) > 0 && len(mlr.separator) > 0 + if addSeparator { + sz += len(mlr.separator) + } + + space := mlr.maxBytes - sz spaceLeft := (mlr.maxBytes <= 0 || space > 0) && (mlr.maxLines <= 0 || mlr.numLines < mlr.maxLines) if spaceLeft { if space < 0 || space > len(l.Content) { space = len(l.Content) } - mlr.content = append(mlr.content, l.Content[:space]...) + + tmp := mlr.content + if addSeparator { + tmp = append(tmp, mlr.separator...) + } + mlr.content = append(tmp, l.Content[:space]...) mlr.numLines++ } diff --git a/filebeat/harvester/processor/multiline_test.go b/filebeat/harvester/processor/multiline_test.go index d04c8de1f8e..8e97af9ef95 100644 --- a/filebeat/harvester/processor/multiline_test.go +++ b/filebeat/harvester/processor/multiline_test.go @@ -6,6 +6,7 @@ import ( "bytes" "errors" "os" + "strings" "testing" "time" @@ -89,7 +90,7 @@ func testMultilineOK(t *testing.T, cfg config.MultilineConfig, expected ...strin var tsZero time.Time assert.NotEqual(t, tsZero, line.Ts) - assert.Equal(t, expected[i], string(line.Content)) + assert.Equal(t, strings.TrimRight(expected[i], "\r\n "), string(line.Content)) assert.Equal(t, len(expected[i]), int(line.Bytes)) } } @@ -111,7 +112,7 @@ func createMultilineTestReader(t *testing.T, in *bytes.Buffer, cfg config.Multil t.Fatalf("Failed to initialize line reader: %v", err) } - reader, err = NewMultiline(reader, 1<<20, &cfg) + reader, err = NewMultiline(NewStripNewline(reader), "\n", 1<<20, &cfg) if err != nil { t.Fatalf("failed to initializ reader: %v", err) }