Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Cherry-pick #19552 to 6.8: Add max bytes in line limit #20087

Merged
merged 1 commit into from
Jul 23, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.next.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ https://github.com/elastic/beats/compare/v6.8.0...6.8.1[Check the HEAD diff]

*Filebeat*

- Fix Filebeat OOMs on very long lines {issue}19500[19500], {pull}19552[19552]

*Heartbeat*

Expand Down
10 changes: 9 additions & 1 deletion filebeat/input/log/harvester.go
Original file line number Diff line number Diff line change
Expand Up @@ -557,6 +557,8 @@ func (h *Harvester) newLogFileReader() (reader.Reader, error) {
var r reader.Reader
var err error

logp.Debug("harvester", "newLogFileReader with config.MaxBytes: %d", h.config.MaxBytes)

// TODO: NewLineReader uses additional buffering to deal with encoding and testing
// for new lines in input stream. Simple 8-bit based encodings, or plain
// don't require 'complicated' logic.
Expand All @@ -570,7 +572,13 @@ func (h *Harvester) newLogFileReader() (reader.Reader, error) {
return nil, err
}

r, err = readfile.NewEncodeReader(reader, h.encoding, h.config.BufferSize)
// Configure MaxBytes limit for EncodeReader as multiplied by 4
// for the worst case scenario where incoming UTF32 charchers are decoded to the single byte UTF-8 characters.
// This limit serves primarily to avoid memory bload or potential OOM with expectedly long lines in the file.
// The further size limiting is performed by LimitReader at the end of the readers pipeline as needed.
encReaderMaxBytes := h.config.MaxBytes * 4

r, err = readfile.NewEncodeReader(reader, h.encoding, h.config.BufferSize, encReaderMaxBytes)
if err != nil {
return nil, err
}
Expand Down
2 changes: 1 addition & 1 deletion filebeat/reader/multiline/multiline_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -277,7 +277,7 @@ func createMultilineTestReader(t *testing.T, in *bytes.Buffer, cfg Config) reade
}

var r reader.Reader
r, err = readfile.NewEncodeReader(in, enc, 4096)
r, err = readfile.NewEncodeReader(in, enc, 4096, 4096)
if err != nil {
t.Fatalf("Failed to initialize line reader: %v", err)
}
Expand Down
4 changes: 2 additions & 2 deletions filebeat/reader/readfile/encode.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,9 +36,9 @@ type EncoderReader struct {
func NewEncodeReader(
r io.Reader,
codec encoding.Encoding,
bufferSize int,
bufferSize, maxBytes int,
) (EncoderReader, error) {
eReader, err := NewLineReader(r, codec, bufferSize)
eReader, err := NewLineReader(r, codec, bufferSize, maxBytes)
return EncoderReader{eReader}, err
}

Expand Down
71 changes: 70 additions & 1 deletion filebeat/reader/readfile/line.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package readfile

import (
"bytes"
"io"

"golang.org/x/text/encoding"
Expand All @@ -34,6 +35,7 @@ type LineReader struct {
reader io.Reader
codec encoding.Encoding
bufferSize int
maxBytes int
nl []byte
inBuffer *streambuf.Buffer
outBuffer *streambuf.Buffer
Expand All @@ -43,7 +45,7 @@ type LineReader struct {
}

// New creates a new reader object
func NewLineReader(input io.Reader, codec encoding.Encoding, bufferSize int) (*LineReader, error) {
func NewLineReader(input io.Reader, codec encoding.Encoding, bufferSize, maxBytes int) (*LineReader, error) {
encoder := codec.NewEncoder()

// Create newline char based on encoding
Expand All @@ -56,6 +58,7 @@ func NewLineReader(input io.Reader, codec encoding.Encoding, bufferSize int) (*L
reader: input,
codec: codec,
bufferSize: bufferSize,
maxBytes: maxBytes,
nl: nl,
decoder: codec.NewDecoder(),
inBuffer: streambuf.New(nil),
Expand Down Expand Up @@ -138,6 +141,29 @@ func (r *LineReader) advance() error {

// Check if buffer has newLine character
idx = r.inBuffer.IndexFrom(r.inOffset, r.nl)

// If max bytes limit per line is set, then drop the lines that are longer
if r.maxBytes != 0 {
// If newLine is found, drop the lines longer than maxBytes
for idx != -1 && idx > r.maxBytes {
logp.Warn("Exceeded %d max bytes in line limit, skipped %d bytes line", r.maxBytes, idx)
err = r.inBuffer.Advance(idx + len(r.nl))
r.inBuffer.Reset()
r.inOffset = 0
idx = r.inBuffer.IndexFrom(r.inOffset, r.nl)
}

// If newLine is not found and the incoming data buffer exceeded max bytes limit, then skip until the next newLine
if idx == -1 && r.inBuffer.Len() > r.maxBytes {
skipped, err := r.skipUntilNewLine(buf)
if err != nil {
logp.Err("Error skipping until new line, err: %s", err)
return err
}
logp.Warn("Exceeded %d max bytes in line limit, skipped %d bytes line", r.maxBytes, skipped)
idx = r.inBuffer.IndexFrom(r.inOffset, r.nl)
}
}
}

// found encoded byte sequence for '\n' in buffer
Expand All @@ -163,6 +189,49 @@ func (r *LineReader) advance() error {
return err
}

func (r *LineReader) skipUntilNewLine(buf []byte) (int, error) {
// The length of the line skipped
skipped := r.inBuffer.Len()

// Clean up the buffer
err := r.inBuffer.Advance(skipped)
r.inBuffer.Reset()

// Reset inOffset
r.inOffset = 0

if err != nil {
return 0, err
}

// Read until the new line is found
for idx := -1; idx == -1; {
n, err := r.reader.Read(buf)

// Check bytes read for newLine
if n > 0 {
idx = bytes.Index(buf[:n], r.nl)

if idx != -1 {
r.inBuffer.Append(buf[idx+len(r.nl) : n])
skipped += idx
} else {
skipped += n
}
}

if err != nil {
return skipped, err
}

if n == 0 {
return skipped, streambuf.ErrNoMoreBytes
}
}

return skipped, nil
}

func (r *LineReader) decode(end int) (int, error) {
var err error
buffer := make([]byte, 1024)
Expand Down
143 changes: 141 additions & 2 deletions filebeat/reader/readfile/line_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,12 @@ package readfile

import (
"bytes"
"encoding/hex"
"io"
"math/rand"
"strings"
"testing"
"time"

"github.com/stretchr/testify/assert"
"golang.org/x/text/transform"
Expand Down Expand Up @@ -68,7 +72,7 @@ func TestReaderEncodings(t *testing.T) {
}

// create line reader
reader, err := NewLineReader(buffer, codec, 1024)
reader, err := NewLineReader(buffer, codec, 1024, 1024)
if err != nil {
t.Errorf("failed to initialize reader: %v", err)
continue
Expand Down Expand Up @@ -159,7 +163,8 @@ func testReadLines(t *testing.T, inputLines [][]byte) {
// initialize reader
buffer := bytes.NewBuffer(inputStream)
codec, _ := encoding.Plain(buffer)
reader, err := NewLineReader(buffer, codec, buffer.Len())
bufLen := buffer.Len()
reader, err := NewLineReader(buffer, codec, bufLen, bufLen)
if err != nil {
t.Fatalf("Error initializing reader: %v", err)
}
Expand All @@ -185,3 +190,137 @@ func testReadLines(t *testing.T, inputLines [][]byte) {
func testReadLine(t *testing.T, line []byte) {
testReadLines(t, [][]byte{line})
}

func randomInt(r *rand.Rand, min, max int) int {
return r.Intn(max+1-min) + min
}

func randomBool(r *rand.Rand) bool {
n := randomInt(r, 0, 1)
return n != 0
}

func randomBytes(r *rand.Rand, sz int) ([]byte, error) {
bytes := make([]byte, sz)
if _, err := rand.Read(bytes); err != nil {
return nil, err
}
return bytes, nil
}

func randomString(r *rand.Rand, sz int) (string, error) {
if sz == 0 {
return "", nil
}

var bytes []byte
var err error
if bytes, err = randomBytes(r, sz/2+sz%2); err != nil {
return "", err
}
s := hex.EncodeToString(bytes)
return s[:sz], nil
}

func setupTestMaxBytesLimit(lineMaxLimit, lineLen int, nl []byte) (lines []string, data string, err error) {
rnd := rand.New(rand.NewSource(time.Now().UnixNano()))

lineCount := randomInt(rnd, 11, 142)
lines = make([]string, lineCount)

var b strings.Builder

for i := 0; i < lineCount; i++ {
var sz int
// Non-empty line
if randomBool(rnd) {
// Boundary to the lineMaxLimit
if randomBool(rnd) {
sz = randomInt(rnd, lineMaxLimit-1, lineMaxLimit+1)
} else {
sz = randomInt(rnd, 0, lineLen)
}
} else {
// Randomly empty or one characters lines(another possibly boundary conditions)
sz = randomInt(rnd, 0, 1)
}

s, err := randomString(rnd, sz)
if err != nil {
return nil, "", err
}

lines[i] = s
if len(s) > 0 {
b.WriteString(s)
}
b.Write(nl)
}
return lines, b.String(), nil
}

func TestMaxBytesLimit(t *testing.T) {
const (
enc = "plain"
numberOfLines = 102
bufferSize = 1024
lineMaxLimit = 3012
lineLen = 5720 // exceeds lineMaxLimit
)

codecFactory, ok := encoding.FindEncoding(enc)
if !ok {
t.Fatalf("can not find encoding '%v'", enc)
}

buffer := bytes.NewBuffer(nil)
codec, _ := codecFactory(buffer)

// Generate random lines lengths including empty lines
nl := []byte("\n")
lines, input, err := setupTestMaxBytesLimit(lineMaxLimit, lineLen, nl)
if err != nil {
t.Fatal("failed to generate random input:", err)
}

// Create line reader
reader, err := NewLineReader(strings.NewReader(input), codec, bufferSize, lineMaxLimit)
if err != nil {
t.Fatal("failed to initialize reader:", err)
}

// Read decodec lines and test
var idx int
for i := 0; ; i++ {
b, n, err := reader.Next()
if err != nil {
if err == io.EOF {
break
} else {
t.Fatal("unexpected error:", err)
}
}

// Find the next expected line from the original test array
var line string
for ; idx < len(lines); idx++ {
// Expected to be dropped
if len(lines[idx]) > lineMaxLimit {
continue
}
line = lines[idx]
idx++
break
}

gotLen := n - len(nl)
s := string(b[:len(b)-len(nl)])
if len(line) != gotLen {
t.Fatalf("invalid line length, expected: %d got: %d", len(line), gotLen)
}

if line != s {
t.Fatalf("lines do not match, expected: %s got: %s", line, s)
}
}
}
2 changes: 1 addition & 1 deletion filebeat/scripts/tester/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -133,7 +133,7 @@ func getLogsFromFile(logfile string, conf *logReaderConfig) ([]string, error) {
}

var r reader.Reader
r, err = readfile.NewEncodeReader(f, enc, 4096)
r, err = readfile.NewEncodeReader(f, enc, 4096, 4096)
if err != nil {
return nil, err
}
Expand Down