Skip to content

Commit

Permalink
Limit the number of bytes read by LineReader in Filebeat (elastic#19552)
Browse files Browse the repository at this point in the history
  • Loading branch information
aleksmaus authored Jul 21, 2020
1 parent 7007d97 commit 0e049f0
Show file tree
Hide file tree
Showing 5 changed files with 244 additions and 22 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.next.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -217,6 +217,7 @@ https://github.com/elastic/beats/compare/v7.0.0-alpha2...master[Check the HEAD d
- Fix bug with empty filter values in system/service {pull}19812[19812]
- Fix S3 input to trim delimiter /n from each log line. {pull}19972[19972]
- Ignore missing in Zeek module when dropping unecessary fields. {pull}19984[19984]
- Fix Filebeat OOMs on very long lines {issue}19500[19500], {pull}19552[19552]

*Heartbeat*

Expand Down
9 changes: 9 additions & 0 deletions filebeat/input/log/harvester.go
Original file line number Diff line number Diff line change
Expand Up @@ -631,6 +631,8 @@ func (h *Harvester) newLogFileReader() (reader.Reader, error) {
var r reader.Reader
var err error

logp.Debug("harvester", "newLogFileReader with config.MaxBytes: %d", h.config.MaxBytes)

// TODO: NewLineReader uses additional buffering to deal with encoding and testing
// for new lines in input stream. Simple 8-bit based encodings, or plain
// don't require 'complicated' logic.
Expand All @@ -644,10 +646,17 @@ func (h *Harvester) newLogFileReader() (reader.Reader, error) {
return nil, err
}

// Configure MaxBytes limit for EncodeReader as multiplied by 4
// for the worst case scenario where incoming UTF32 charchers are decoded to the single byte UTF-8 characters.
// This limit serves primarily to avoid memory bload or potential OOM with expectedly long lines in the file.
// The further size limiting is performed by LimitReader at the end of the readers pipeline as needed.
encReaderMaxBytes := h.config.MaxBytes * 4

r, err = readfile.NewEncodeReader(reader, readfile.Config{
Codec: h.encoding,
BufferSize: h.config.BufferSize,
Terminator: h.config.LineTerminator,
MaxBytes: encReaderMaxBytes,
})
if err != nil {
return nil, err
Expand Down
1 change: 1 addition & 0 deletions libbeat/reader/readfile/encode.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ type Config struct {
Codec encoding.Encoding
BufferSize int
Terminator LineTerminator
MaxBytes int
}

// New creates a new Encode reader from input reader by applying
Expand Down
86 changes: 78 additions & 8 deletions libbeat/reader/readfile/line.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,12 +28,15 @@ import (
"github.com/elastic/beats/v7/libbeat/logp"
)

const unlimited = 0

// lineReader reads lines from underlying reader, decoding the input stream
// using the configured codec. The reader keeps track of bytes consumed
// from raw input stream for every decoded line.
type LineReader struct {
reader io.Reader
bufferSize int
maxBytes int // max bytes per line limit to avoid OOM with malformatted files
nl []byte
decodedNl []byte
inBuffer *streambuf.Buffer
Expand Down Expand Up @@ -62,6 +65,7 @@ func NewLineReader(input io.Reader, config Config) (*LineReader, error) {
return &LineReader{
reader: input,
bufferSize: config.BufferSize,
maxBytes: config.MaxBytes,
decoder: config.Codec.NewDecoder(),
nl: nl,
decodedNl: terminator,
Expand Down Expand Up @@ -121,17 +125,17 @@ func (r *LineReader) advance() error {
// Initial check if buffer has already a newLine character
idx := r.inBuffer.IndexFrom(r.inOffset, r.nl)

// fill inBuffer until newline sequence has been found in input buffer
// Fill inBuffer until newline sequence has been found in input buffer
for idx == -1 {
// increase search offset to reduce iterations on buffer when looping
// Increase search offset to reduce iterations on buffer when looping
newOffset := r.inBuffer.Len() - len(r.nl)
if newOffset > r.inOffset {
r.inOffset = newOffset
}

buf := make([]byte, r.bufferSize)

// try to read more bytes into buffer
// Try to read more bytes into buffer
n, err := r.reader.Read(buf)

// Appends buffer also in case of err
Expand All @@ -140,16 +144,39 @@ func (r *LineReader) advance() error {
return err
}

// empty read => return buffer error (more bytes required error)
// Empty read => return buffer error (more bytes required error)
if n == 0 {
return streambuf.ErrNoMoreBytes
}

// Check if buffer has newLine character
idx = r.inBuffer.IndexFrom(r.inOffset, r.nl)

// If max bytes limit per line is set, then drop the lines that are longer
if r.maxBytes != 0 {
// If newLine is found, drop the lines longer than maxBytes
for idx != -1 && idx > r.maxBytes {
r.logger.Warnf("Exceeded %d max bytes in line limit, skipped %d bytes line", r.maxBytes, idx)
err = r.inBuffer.Advance(idx + len(r.nl))
r.inBuffer.Reset()
r.inOffset = 0
idx = r.inBuffer.IndexFrom(r.inOffset, r.nl)
}

// If newLine is not found and the incoming data buffer exceeded max bytes limit, then skip until the next newLine
if idx == -1 && r.inBuffer.Len() > r.maxBytes {
skipped, err := r.skipUntilNewLine(buf)
if err != nil {
r.logger.Error("Error skipping until new line, err:", err)
return err
}
r.logger.Warnf("Exceeded %d max bytes in line limit, skipped %d bytes line", r.maxBytes, skipped)
idx = r.inBuffer.IndexFrom(r.inOffset, r.nl)
}
}
}

// found encoded byte sequence for newline in buffer
// Found encoded byte sequence for newline in buffer
// -> decode input sequence into outBuffer
sz, err := r.decode(idx + len(r.nl))
if err != nil {
Expand All @@ -158,20 +185,63 @@ func (r *LineReader) advance() error {
sz = idx + len(r.nl)
}

// consume transformed bytes from input buffer
// Consume transformed bytes from input buffer
err = r.inBuffer.Advance(sz)
r.inBuffer.Reset()

// continue scanning input buffer from last position + 1
// Continue scanning input buffer from last position + 1
r.inOffset = idx + 1 - sz
if r.inOffset < 0 {
// fix inOffset if newline has encoding > 8bits + firl line has been decoded
// Fix inOffset if newline has encoding > 8bits + firl line has been decoded
r.inOffset = 0
}

return err
}

func (r *LineReader) skipUntilNewLine(buf []byte) (int, error) {
// The length of the line skipped
skipped := r.inBuffer.Len()

// Clean up the buffer
err := r.inBuffer.Advance(skipped)
r.inBuffer.Reset()

// Reset inOffset
r.inOffset = 0

if err != nil {
return 0, err
}

// Read until the new line is found
for idx := -1; idx == -1; {
n, err := r.reader.Read(buf)

// Check bytes read for newLine
if n > 0 {
idx = bytes.Index(buf[:n], r.nl)

if idx != -1 {
r.inBuffer.Append(buf[idx+len(r.nl) : n])
skipped += idx
} else {
skipped += n
}
}

if err != nil {
return skipped, err
}

if n == 0 {
return skipped, streambuf.ErrNoMoreBytes
}
}

return skipped, nil
}

func (r *LineReader) decode(end int) (int, error) {
var err error
buffer := make([]byte, 1024)
Expand Down
Loading

0 comments on commit 0e049f0

Please sign in to comment.