Skip to content

Commit

Permalink
multiline reader normalizing multiline character
Browse files Browse the repository at this point in the history
  • Loading branch information
urso committed May 3, 2016
1 parent 2d983b2 commit bbf3db2
Show file tree
Hide file tree
Showing 4 changed files with 31 additions and 17 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,7 @@ https://github.com/elastic/beats/compare/v5.0.0alpha1...v5.0.0-alpha2[View commi
- Fix issue with JSON decoding where `@timestamp` or `type` keys with the wrong type could cause Filebeat
to crash. {issue}1378[1378]
- Fix issue with JSON decoding where values having `null` as values could crash Filebeat. {issue}1466[1466]
- Multiline reader normalizing newline to use `\n`. {pull}1552[1552]
*Winlogbeat*
Expand Down
6 changes: 2 additions & 4 deletions filebeat/harvester/linereader.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,15 +33,13 @@ func createLineReader(
p = processor.NewJSONProcessor(p, jsonConfig)
}

p = processor.NewStripNewline(p)
if mlrConfig != nil {
p, err = processor.NewMultiline(p, maxBytes, mlrConfig)
p, err = processor.NewMultiline(p, "\n", maxBytes, mlrConfig)
if err != nil {
return nil, err
}

return processor.NewStripNewline(p), nil
}

p = processor.NewStripNewline(p)
return processor.NewLimitProcessor(p, maxBytes), nil
}
36 changes: 25 additions & 11 deletions filebeat/harvester/processor/multiline.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,10 +22,11 @@ import (
// Errors will force the multiline processor to return the currently active
// multiline event first and finally return the actual error on next call to Next.
type MultiLine struct {
reader LineProcessor
pred matcher
maxBytes int // bytes stored in content
maxLines int
reader LineProcessor
pred matcher
maxBytes int // bytes stored in content
maxLines int
separator []byte

ts time.Time
content []byte
Expand Down Expand Up @@ -58,6 +59,7 @@ var (
// line events into stream of multi-line events.
func NewMultiline(
r LineProcessor,
separator string,
maxBytes int,
config *config.MultilineConfig,
) (*MultiLine, error) {
Expand Down Expand Up @@ -102,11 +104,12 @@ func NewMultiline(
}

mlr := &MultiLine{
reader: r,
pred: matcher,
state: (*MultiLine).readFirst,
maxBytes: maxBytes,
maxLines: maxLines,
reader: r,
pred: matcher,
state: (*MultiLine).readFirst,
maxBytes: maxBytes,
maxLines: maxLines,
separator: []byte(separator),
}
return mlr, nil
}
Expand Down Expand Up @@ -239,14 +242,25 @@ func (mlr *MultiLine) addLine(l Line) {
return
}

space := mlr.maxBytes - len(mlr.content)
sz := len(mlr.content)
addSeparator := len(mlr.content) > 0 && len(mlr.separator) > 0
if addSeparator {
sz += len(mlr.separator)
}

space := mlr.maxBytes - sz
spaceLeft := (mlr.maxBytes <= 0 || space > 0) &&
(mlr.maxLines <= 0 || mlr.numLines < mlr.maxLines)
if spaceLeft {
if space < 0 || space > len(l.Content) {
space = len(l.Content)
}
mlr.content = append(mlr.content, l.Content[:space]...)

tmp := mlr.content
if addSeparator {
tmp = append(tmp, mlr.separator...)
}
mlr.content = append(tmp, l.Content[:space]...)
mlr.numLines++
}

Expand Down
5 changes: 3 additions & 2 deletions filebeat/harvester/processor/multiline_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"bytes"
"errors"
"os"
"strings"
"testing"
"time"

Expand Down Expand Up @@ -89,7 +90,7 @@ func testMultilineOK(t *testing.T, cfg config.MultilineConfig, expected ...strin
var tsZero time.Time

assert.NotEqual(t, tsZero, line.Ts)
assert.Equal(t, expected[i], string(line.Content))
assert.Equal(t, strings.TrimRight(expected[i], "\r\n "), string(line.Content))
assert.Equal(t, len(expected[i]), int(line.Bytes))
}
}
Expand All @@ -111,7 +112,7 @@ func createMultilineTestReader(t *testing.T, in *bytes.Buffer, cfg config.Multil
t.Fatalf("Failed to initialize line reader: %v", err)
}

reader, err = NewMultiline(reader, 1<<20, &cfg)
reader, err = NewMultiline(NewStripNewline(reader), "\n", 1<<20, &cfg)
if err != nil {
t.Fatalf("failed to initializ reader: %v", err)
}
Expand Down

0 comments on commit bbf3db2

Please sign in to comment.