Skip to content

Commit

Permalink
Add multiline support to awss3 input (#25710)
Browse files Browse the repository at this point in the history
* Add multiline support to awss3 input

- only applies to non JSON logs

Closes #25249

Co-authored-by: Andrew Kroh <andrew.kroh@elastic.co>
(cherry picked from commit 5f242e3)
  • Loading branch information
leehinman committed May 17, 2021
1 parent 61dab38 commit b601a2c
Show file tree
Hide file tree
Showing 7 changed files with 230 additions and 84 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.next.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -586,6 +586,7 @@ https://github.com/elastic/beats/compare/v7.0.0-alpha2...master[Check the HEAD d
- New module `zookeeper` for Zookeeper service and audit logs {issue}25061[25061] {pull}25128[25128]
- Add parsing for `haproxy.http.request.raw_request_line` field {issue}25480[25480] {pull}25482[25482]
- Mark `filestream` input beta. {pull}25560[25560]
- Add multiline support to aws-s3 input. {issue}25249[25249] {pull}25710[25710]

*Heartbeat*

Expand Down
41 changes: 40 additions & 1 deletion x-pack/filebeat/docs/inputs/input-aws-s3.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,22 @@ call will be interrupted.
The default AWS API call timeout for a message is 120 seconds. The minimum
is 0 seconds. The maximum is half of the visibility timeout value.

[id="input-{type}-buffer_size"]
[float]
==== `buffer_size`

The size in bytes of the buffer that each harvester uses when fetching a file.
This only applies to non-JSON logs.
The default is 16384.

[id="input-{type}-encoding"]
[float]
==== `encoding`

The file encoding to use for reading data that contains international
characters. This only applies to non-JSON logs. See <<_encoding_5>>.


[float]
==== `expand_event_list_from_field`

Expand Down Expand Up @@ -91,7 +107,9 @@ setting. If `file_selectors` is given, then any global
`expand_event_list_from_field` value is ignored in favor of the ones
specified in the `file_selectors`. Regex syntax is the same as the Go
language. Files that don't match one of the regexes won't be
processed.
processed. <<input-aws-s3-multiline>>, <<input-aws-s3-max_bytes>>,
<<input-aws-s3-buffer_size>> and <<input-aws-s3-encoding>> may also be
set for each file selector.

["source", "yml"]
----
Expand All @@ -106,12 +124,33 @@ file_selectors:

Enabling this option changes the service name from `s3` to `s3-fips` for connecting to the correct service endpoint. For example: `s3-fips.us-gov-east-1.amazonaws.com`.

[id="input-{type}-max_bytes"]
[float]
==== `max_bytes`

The maximum number of bytes that a single log message can have. All
bytes after `max_bytes` are discarded and not sent. This setting is
especially useful for multiline log messages, which can get
large. This only applies to non-JSON logs. The default is 10MB
(10485760).

[float]
==== `max_number_of_messages`
The maximum number of messages to return. Amazon SQS never returns more messages
than this value (however, fewer messages might be returned).
Valid values: 1 to 10. Default: 5.

[id="input-{type}-multiline"]
[float]
==== `multiline`

beta[]

Options that control how {beatname_uc} deals with log messages that
span multiple lines. This only applies to non-JSON logs. See
<<multiline-examples>> for more information about configuring
multiline options.

[float]
==== `queue_url`

Expand Down
125 changes: 79 additions & 46 deletions x-pack/filebeat/input/awss3/collector.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import (
"encoding/json"
"fmt"
"io"
"io/ioutil"
"net/http"
"net/url"
"strings"
Expand All @@ -29,6 +30,10 @@ import (
"github.com/elastic/beats/v7/libbeat/beat"
"github.com/elastic/beats/v7/libbeat/common"
"github.com/elastic/beats/v7/libbeat/logp"
"github.com/elastic/beats/v7/libbeat/reader"
"github.com/elastic/beats/v7/libbeat/reader/multiline"
"github.com/elastic/beats/v7/libbeat/reader/readfile"
"github.com/elastic/beats/v7/libbeat/reader/readfile/encoding"
"github.com/elastic/go-concert/unison"
)

Expand All @@ -50,6 +55,11 @@ type s3Info struct {
region string
arn string
expandEventListFromField string
maxBytes int
multiline *multiline.Config
lineTerminator readfile.LineTerminator
encoding string
bufferSize int
}

type bucket struct {
Expand Down Expand Up @@ -273,6 +283,11 @@ func (c *s3Collector) handleSQSMessage(m sqs.Message) ([]s3Info, error) {
key: filename,
arn: record.S3.bucket.Arn,
expandEventListFromField: c.config.ExpandEventListFromField,
maxBytes: c.config.MaxBytes,
multiline: c.config.Multiline,
lineTerminator: c.config.LineTerminator,
encoding: c.config.Encoding,
bufferSize: c.config.BufferSize,
})
continue
}
Expand All @@ -282,15 +297,30 @@ func (c *s3Collector) handleSQSMessage(m sqs.Message) ([]s3Info, error) {
continue
}
if fs.Regex.MatchString(filename) {
s3Infos = append(s3Infos, s3Info{
info := s3Info{
region: record.AwsRegion,
name: record.S3.bucket.Name,
key: filename,
arn: record.S3.bucket.Arn,
expandEventListFromField: fs.ExpandEventListFromField,
})
break
maxBytes: fs.MaxBytes,
multiline: fs.Multiline,
lineTerminator: fs.LineTerminator,
encoding: fs.Encoding,
bufferSize: fs.BufferSize,
}
if info.bufferSize == 0 {
info.bufferSize = c.config.BufferSize
}
if info.maxBytes == 0 {
info.maxBytes = c.config.MaxBytes
}
if info.lineTerminator == 0 {
info.lineTerminator = c.config.LineTerminator
}
s3Infos = append(s3Infos, info)
}
break
}
}
return s3Infos, nil
Expand Down Expand Up @@ -348,67 +378,78 @@ func (c *s3Collector) createEventsFromS3Info(svc s3iface.ClientAPI, info s3Info,

defer resp.Body.Close()

reader := bufio.NewReader(resp.Body)
bodyReader := bufio.NewReader(resp.Body)

isS3ObjGzipped, err := isStreamGzipped(reader)
isS3ObjGzipped, err := isStreamGzipped(bodyReader)
if err != nil {
c.logger.Error(fmt.Errorf("could not determine if S3 object is gzipped: %w", err))
return err
}

if isS3ObjGzipped {
gzipReader, err := gzip.NewReader(reader)
gzipReader, err := gzip.NewReader(bodyReader)
if err != nil {
c.logger.Error(fmt.Errorf("gzip.NewReader failed for '%s' from S3 bucket '%s': %w", info.key, info.name, err))
return err
}
reader = bufio.NewReader(gzipReader)
gzipReader.Close()
defer gzipReader.Close()
bodyReader = bufio.NewReader(gzipReader)
}

// Decode JSON documents when content-type is "application/json" or expand_event_list_from_field is given in config
if resp.ContentType != nil && *resp.ContentType == "application/json" || info.expandEventListFromField != "" {
decoder := json.NewDecoder(reader)
decoder := json.NewDecoder(bodyReader)
err := c.decodeJSON(decoder, objectHash, info, s3Ctx)
if err != nil {
c.logger.Error(fmt.Errorf("decodeJSONWithKey failed for '%s' from S3 bucket '%s': %w", info.key, info.name, err))
return err
return fmt.Errorf("decodeJSONWithKey failed for '%s' from S3 bucket '%s': %w", info.key, info.name, err)
}
return nil
}

// handle s3 objects that are not json content-type
offset := 0
for {
log, err := readStringAndTrimDelimiter(reader)
if err == io.EOF {
// create event for last line
offset += len([]byte(log))
event := createEvent(log, offset, info, objectHash, s3Ctx)
err = c.forwardEvent(event)
if err != nil {
c.logger.Error(fmt.Errorf("forwardEvent failed: %w", err))
return err
}
return nil
} else if err != nil {
c.logger.Error(fmt.Errorf("readStringAndTrimDelimiter failed: %w", err))
return err
}
encodingFactory, ok := encoding.FindEncoding(info.encoding)
if !ok || encodingFactory == nil {
return fmt.Errorf("unable to find '%v' encoding", info.encoding)
}
enc, err := encodingFactory(bodyReader)
if err != nil {
return fmt.Errorf("failed to initialize encoding: %v", err)
}
var r reader.Reader
r, err = readfile.NewEncodeReader(ioutil.NopCloser(bodyReader), readfile.Config{
Codec: enc,
BufferSize: info.bufferSize,
Terminator: info.lineTerminator,
MaxBytes: info.maxBytes * 4,
})
r = readfile.NewStripNewline(r, info.lineTerminator)

if log == "" {
continue
if info.multiline != nil {
r, err = multiline.New(r, "\n", info.maxBytes, info.multiline)
if err != nil {
return fmt.Errorf("error setting up multiline: %v", err)
}
}

r = readfile.NewLimitReader(r, info.maxBytes)

// create event per log line
offset += len([]byte(log))
event := createEvent(log, offset, info, objectHash, s3Ctx)
err = c.forwardEvent(event)
var offset int64
for {
message, err := r.Next()
if err == io.EOF {
// No more lines
break
}
if err != nil {
c.logger.Error(fmt.Errorf("forwardEvent failed: %w", err))
return err
return fmt.Errorf("error reading message: %w", err)
}
event := createEvent(string(message.Content), offset, info, objectHash, s3Ctx)
offset += int64(message.Bytes)
if err = c.forwardEvent(event); err != nil {
return fmt.Errorf("forwardEvent failed: %w", err)
}
}
return nil
}

func (c *s3Collector) decodeJSON(decoder *json.Decoder, objectHash string, s3Info s3Info, s3Ctx *s3Context) error {
Expand Down Expand Up @@ -498,7 +539,7 @@ func (c *s3Collector) convertJSONToEvent(jsonFields interface{}, offset int, obj
logOriginal := string(vJSON)
log := trimLogDelimiter(logOriginal)
offset += len([]byte(log))
event := createEvent(log, offset, s3Info, objectHash, s3Ctx)
event := createEvent(log, int64(offset), s3Info, objectHash, s3Ctx)

err := c.forwardEvent(event)
if err != nil {
Expand Down Expand Up @@ -540,15 +581,7 @@ func trimLogDelimiter(log string) string {
return strings.TrimSuffix(log, "\n")
}

func readStringAndTrimDelimiter(reader *bufio.Reader) (string, error) {
logOriginal, err := reader.ReadString('\n')
if err != nil {
return logOriginal, err
}
return trimLogDelimiter(logOriginal), nil
}

func createEvent(log string, offset int, info s3Info, objectHash string, s3Ctx *s3Context) beat.Event {
func createEvent(log string, offset int64, info s3Info, objectHash string, s3Ctx *s3Context) beat.Event {
s3Ctx.Inc()

event := beat.Event{
Expand Down
41 changes: 32 additions & 9 deletions x-pack/filebeat/input/awss3/collector_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,9 @@ import (
"github.com/stretchr/testify/assert"

"github.com/elastic/beats/v7/libbeat/beat"
"github.com/elastic/beats/v7/libbeat/reader"
"github.com/elastic/beats/v7/libbeat/reader/readfile"
"github.com/elastic/beats/v7/libbeat/reader/readfile/encoding"
)

// MockS3Client struct is used for unit tests.
Expand Down Expand Up @@ -238,23 +241,43 @@ func TestNewS3BucketReader(t *testing.T) {

resp, err := req.Send(ctx)
assert.NoError(t, err)
reader := bufio.NewReader(resp.Body)
bodyReader := bufio.NewReader(resp.Body)
defer resp.Body.Close()

encFactory, ok := encoding.FindEncoding("plain")
if !ok {
t.Fatalf("unable to find 'plain' encoding")
}

enc, err := encFactory(bodyReader)
if err != nil {
t.Fatalf("failed to initialize encoding: %v", err)
}

var r reader.Reader
r, err = readfile.NewEncodeReader(ioutil.NopCloser(bodyReader), readfile.Config{
Codec: enc,
BufferSize: 4096,
Terminator: readfile.LineFeed,
})
if err != nil {
t.Fatalf("Failed to initialize line reader: %v", err)
}

r = readfile.NewStripNewline(r, readfile.LineFeed)

for i := 0; i < 3; i++ {
msg, err := r.Next()
switch i {
case 0:
log, err := readStringAndTrimDelimiter(reader)
assert.NoError(t, err)
assert.Equal(t, s3LogString1Trimmed, log)
assert.Equal(t, s3LogString1Trimmed, string(msg.Content))
case 1:
log, err := readStringAndTrimDelimiter(reader)
assert.NoError(t, err)
assert.Equal(t, s3LogString2Trimmed, log)
assert.Equal(t, s3LogString2Trimmed, string(msg.Content))
case 2:
log, err := readStringAndTrimDelimiter(reader)
assert.Error(t, io.EOF, err)
assert.Equal(t, "", log)
assert.Equal(t, "", string(msg.Content))
}
}
}
Expand Down Expand Up @@ -300,12 +323,12 @@ func TestCreateEvent(t *testing.T) {
break
}
if err == io.EOF {
event := createEvent(log, len([]byte(log)), s3Info, s3ObjectHash, s3Context)
event := createEvent(log, int64(len([]byte(log))), s3Info, s3ObjectHash, s3Context)
events = append(events, event)
break
}

event := createEvent(log, len([]byte(log)), s3Info, s3ObjectHash, s3Context)
event := createEvent(log, int64(len([]byte(log))), s3Info, s3ObjectHash, s3Context)
events = append(events, event)
}

Expand Down
Loading

0 comments on commit b601a2c

Please sign in to comment.