diff --git a/CHANGELOG.next.asciidoc b/CHANGELOG.next.asciidoc index b1d5cf5a7eb..9fc182da2ca 100644 --- a/CHANGELOG.next.asciidoc +++ b/CHANGELOG.next.asciidoc @@ -588,6 +588,7 @@ https://github.com/elastic/beats/compare/v7.0.0-alpha2...master[Check the HEAD d - Mark `filestream` input beta. {pull}25560[25560] - Update PanOS module to parse Global Protect & User ID logs. {issue}24722[24722] {issue}24724[24724] {pull}24927[24927] - Add HMAC signature validation support for http_endpoint input. {pull}24918[24918] +- Add multiline support to aws-s3 input. {issue}25249[25249] {pull}25710[25710] *Heartbeat* diff --git a/x-pack/filebeat/docs/inputs/input-aws-s3.asciidoc b/x-pack/filebeat/docs/inputs/input-aws-s3.asciidoc index e757ad6faea..894a0aff926 100644 --- a/x-pack/filebeat/docs/inputs/input-aws-s3.asciidoc +++ b/x-pack/filebeat/docs/inputs/input-aws-s3.asciidoc @@ -43,6 +43,22 @@ call will be interrupted. The default AWS API call timeout for a message is 120 seconds. The minimum is 0 seconds. The maximum is half of the visibility timeout value. +[id="input-{type}-buffer_size"] +[float] +==== `buffer_size` + +The size in bytes of the buffer that each harvester uses when fetching a file. +This only applies to non-JSON logs. +The default is 16384. + +[id="input-{type}-encoding"] +[float] +==== `encoding` + +The file encoding to use for reading data that contains international +characters. This only applies to non-JSON logs. See <<_encoding_5>>. + + [float] ==== `expand_event_list_from_field` @@ -91,7 +107,9 @@ setting. If `file_selectors` is given, then any global `expand_event_list_from_field` value is ignored in favor of the ones specified in the `file_selectors`. Regex syntax is the same as the Go language. Files that don't match one of the regexes won't be -processed. +processed. <>, <>, +<> and <> may also be +set for each file selector. ["source", "yml"] ---- @@ -106,12 +124,33 @@ file_selectors: Enabling this option changes the service name from `s3` to `s3-fips` for connecting to the correct service endpoint. For example: `s3-fips.us-gov-east-1.amazonaws.com`. +[id="input-{type}-max_bytes"] +[float] +==== `max_bytes` + +The maximum number of bytes that a single log message can have. All +bytes after `max_bytes` are discarded and not sent. This setting is +especially useful for multiline log messages, which can get +large. This only applies to non-JSON logs. The default is 10MB +(10485760). + [float] ==== `max_number_of_messages` The maximum number of messages to return. Amazon SQS never returns more messages than this value (however, fewer messages might be returned). Valid values: 1 to 10. Default: 5. +[id="input-{type}-multiline"] +[float] +==== `multiline` + +beta[] + +Options that control how {beatname_uc} deals with log messages that +span multiple lines. This only applies to non-JSON logs. See +<> for more information about configuring +multiline options. + [float] ==== `queue_url` diff --git a/x-pack/filebeat/input/awss3/collector.go b/x-pack/filebeat/input/awss3/collector.go index 8f7bb152295..3735467385c 100644 --- a/x-pack/filebeat/input/awss3/collector.go +++ b/x-pack/filebeat/input/awss3/collector.go @@ -13,6 +13,7 @@ import ( "encoding/json" "fmt" "io" + "io/ioutil" "net/http" "net/url" "strings" @@ -29,6 +30,10 @@ import ( "github.com/elastic/beats/v7/libbeat/beat" "github.com/elastic/beats/v7/libbeat/common" "github.com/elastic/beats/v7/libbeat/logp" + "github.com/elastic/beats/v7/libbeat/reader" + "github.com/elastic/beats/v7/libbeat/reader/multiline" + "github.com/elastic/beats/v7/libbeat/reader/readfile" + "github.com/elastic/beats/v7/libbeat/reader/readfile/encoding" "github.com/elastic/go-concert/unison" ) @@ -50,6 +55,11 @@ type s3Info struct { region string arn string expandEventListFromField string + maxBytes int + multiline *multiline.Config + lineTerminator readfile.LineTerminator + encoding string + bufferSize int } type bucket struct { @@ -273,6 +283,11 @@ func (c *s3Collector) handleSQSMessage(m sqs.Message) ([]s3Info, error) { key: filename, arn: record.S3.bucket.Arn, expandEventListFromField: c.config.ExpandEventListFromField, + maxBytes: c.config.MaxBytes, + multiline: c.config.Multiline, + lineTerminator: c.config.LineTerminator, + encoding: c.config.Encoding, + bufferSize: c.config.BufferSize, }) continue } @@ -282,15 +297,30 @@ func (c *s3Collector) handleSQSMessage(m sqs.Message) ([]s3Info, error) { continue } if fs.Regex.MatchString(filename) { - s3Infos = append(s3Infos, s3Info{ + info := s3Info{ region: record.AwsRegion, name: record.S3.bucket.Name, key: filename, arn: record.S3.bucket.Arn, expandEventListFromField: fs.ExpandEventListFromField, - }) - break + maxBytes: fs.MaxBytes, + multiline: fs.Multiline, + lineTerminator: fs.LineTerminator, + encoding: fs.Encoding, + bufferSize: fs.BufferSize, + } + if info.bufferSize == 0 { + info.bufferSize = c.config.BufferSize + } + if info.maxBytes == 0 { + info.maxBytes = c.config.MaxBytes + } + if info.lineTerminator == 0 { + info.lineTerminator = c.config.LineTerminator + } + s3Infos = append(s3Infos, info) } + break } } return s3Infos, nil @@ -348,67 +378,78 @@ func (c *s3Collector) createEventsFromS3Info(svc s3iface.ClientAPI, info s3Info, defer resp.Body.Close() - reader := bufio.NewReader(resp.Body) + bodyReader := bufio.NewReader(resp.Body) - isS3ObjGzipped, err := isStreamGzipped(reader) + isS3ObjGzipped, err := isStreamGzipped(bodyReader) if err != nil { c.logger.Error(fmt.Errorf("could not determine if S3 object is gzipped: %w", err)) return err } if isS3ObjGzipped { - gzipReader, err := gzip.NewReader(reader) + gzipReader, err := gzip.NewReader(bodyReader) if err != nil { c.logger.Error(fmt.Errorf("gzip.NewReader failed for '%s' from S3 bucket '%s': %w", info.key, info.name, err)) return err } - reader = bufio.NewReader(gzipReader) - gzipReader.Close() + defer gzipReader.Close() + bodyReader = bufio.NewReader(gzipReader) } // Decode JSON documents when content-type is "application/json" or expand_event_list_from_field is given in config if resp.ContentType != nil && *resp.ContentType == "application/json" || info.expandEventListFromField != "" { - decoder := json.NewDecoder(reader) + decoder := json.NewDecoder(bodyReader) err := c.decodeJSON(decoder, objectHash, info, s3Ctx) if err != nil { - c.logger.Error(fmt.Errorf("decodeJSONWithKey failed for '%s' from S3 bucket '%s': %w", info.key, info.name, err)) - return err + return fmt.Errorf("decodeJSONWithKey failed for '%s' from S3 bucket '%s': %w", info.key, info.name, err) } return nil } // handle s3 objects that are not json content-type - offset := 0 - for { - log, err := readStringAndTrimDelimiter(reader) - if err == io.EOF { - // create event for last line - offset += len([]byte(log)) - event := createEvent(log, offset, info, objectHash, s3Ctx) - err = c.forwardEvent(event) - if err != nil { - c.logger.Error(fmt.Errorf("forwardEvent failed: %w", err)) - return err - } - return nil - } else if err != nil { - c.logger.Error(fmt.Errorf("readStringAndTrimDelimiter failed: %w", err)) - return err - } + encodingFactory, ok := encoding.FindEncoding(info.encoding) + if !ok || encodingFactory == nil { + return fmt.Errorf("unable to find '%v' encoding", info.encoding) + } + enc, err := encodingFactory(bodyReader) + if err != nil { + return fmt.Errorf("failed to initialize encoding: %v", err) + } + var r reader.Reader + r, err = readfile.NewEncodeReader(ioutil.NopCloser(bodyReader), readfile.Config{ + Codec: enc, + BufferSize: info.bufferSize, + Terminator: info.lineTerminator, + MaxBytes: info.maxBytes * 4, + }) + r = readfile.NewStripNewline(r, info.lineTerminator) - if log == "" { - continue + if info.multiline != nil { + r, err = multiline.New(r, "\n", info.maxBytes, info.multiline) + if err != nil { + return fmt.Errorf("error setting up multiline: %v", err) } + } + + r = readfile.NewLimitReader(r, info.maxBytes) - // create event per log line - offset += len([]byte(log)) - event := createEvent(log, offset, info, objectHash, s3Ctx) - err = c.forwardEvent(event) + var offset int64 + for { + message, err := r.Next() + if err == io.EOF { + // No more lines + break + } if err != nil { - c.logger.Error(fmt.Errorf("forwardEvent failed: %w", err)) - return err + return fmt.Errorf("error reading message: %w", err) + } + event := createEvent(string(message.Content), offset, info, objectHash, s3Ctx) + offset += int64(message.Bytes) + if err = c.forwardEvent(event); err != nil { + return fmt.Errorf("forwardEvent failed: %w", err) } } + return nil } func (c *s3Collector) decodeJSON(decoder *json.Decoder, objectHash string, s3Info s3Info, s3Ctx *s3Context) error { @@ -498,7 +539,7 @@ func (c *s3Collector) convertJSONToEvent(jsonFields interface{}, offset int, obj logOriginal := string(vJSON) log := trimLogDelimiter(logOriginal) offset += len([]byte(log)) - event := createEvent(log, offset, s3Info, objectHash, s3Ctx) + event := createEvent(log, int64(offset), s3Info, objectHash, s3Ctx) err := c.forwardEvent(event) if err != nil { @@ -540,15 +581,7 @@ func trimLogDelimiter(log string) string { return strings.TrimSuffix(log, "\n") } -func readStringAndTrimDelimiter(reader *bufio.Reader) (string, error) { - logOriginal, err := reader.ReadString('\n') - if err != nil { - return logOriginal, err - } - return trimLogDelimiter(logOriginal), nil -} - -func createEvent(log string, offset int, info s3Info, objectHash string, s3Ctx *s3Context) beat.Event { +func createEvent(log string, offset int64, info s3Info, objectHash string, s3Ctx *s3Context) beat.Event { s3Ctx.Inc() event := beat.Event{ diff --git a/x-pack/filebeat/input/awss3/collector_test.go b/x-pack/filebeat/input/awss3/collector_test.go index 47313d7c877..4d4a0f5db7e 100644 --- a/x-pack/filebeat/input/awss3/collector_test.go +++ b/x-pack/filebeat/input/awss3/collector_test.go @@ -24,6 +24,9 @@ import ( "github.com/stretchr/testify/assert" "github.com/elastic/beats/v7/libbeat/beat" + "github.com/elastic/beats/v7/libbeat/reader" + "github.com/elastic/beats/v7/libbeat/reader/readfile" + "github.com/elastic/beats/v7/libbeat/reader/readfile/encoding" ) // MockS3Client struct is used for unit tests. @@ -238,23 +241,43 @@ func TestNewS3BucketReader(t *testing.T) { resp, err := req.Send(ctx) assert.NoError(t, err) - reader := bufio.NewReader(resp.Body) + bodyReader := bufio.NewReader(resp.Body) defer resp.Body.Close() + encFactory, ok := encoding.FindEncoding("plain") + if !ok { + t.Fatalf("unable to find 'plain' encoding") + } + + enc, err := encFactory(bodyReader) + if err != nil { + t.Fatalf("failed to initialize encoding: %v", err) + } + + var r reader.Reader + r, err = readfile.NewEncodeReader(ioutil.NopCloser(bodyReader), readfile.Config{ + Codec: enc, + BufferSize: 4096, + Terminator: readfile.LineFeed, + }) + if err != nil { + t.Fatalf("Failed to initialize line reader: %v", err) + } + + r = readfile.NewStripNewline(r, readfile.LineFeed) + for i := 0; i < 3; i++ { + msg, err := r.Next() switch i { case 0: - log, err := readStringAndTrimDelimiter(reader) assert.NoError(t, err) - assert.Equal(t, s3LogString1Trimmed, log) + assert.Equal(t, s3LogString1Trimmed, string(msg.Content)) case 1: - log, err := readStringAndTrimDelimiter(reader) assert.NoError(t, err) - assert.Equal(t, s3LogString2Trimmed, log) + assert.Equal(t, s3LogString2Trimmed, string(msg.Content)) case 2: - log, err := readStringAndTrimDelimiter(reader) assert.Error(t, io.EOF, err) - assert.Equal(t, "", log) + assert.Equal(t, "", string(msg.Content)) } } } @@ -300,12 +323,12 @@ func TestCreateEvent(t *testing.T) { break } if err == io.EOF { - event := createEvent(log, len([]byte(log)), s3Info, s3ObjectHash, s3Context) + event := createEvent(log, int64(len([]byte(log))), s3Info, s3ObjectHash, s3Context) events = append(events, event) break } - event := createEvent(log, len([]byte(log)), s3Info, s3ObjectHash, s3Context) + event := createEvent(log, int64(len([]byte(log))), s3Info, s3ObjectHash, s3Context) events = append(events, event) } diff --git a/x-pack/filebeat/input/awss3/config.go b/x-pack/filebeat/input/awss3/config.go index c40a493b8c7..2552095a105 100644 --- a/x-pack/filebeat/input/awss3/config.go +++ b/x-pack/filebeat/input/awss3/config.go @@ -9,25 +9,39 @@ import ( "regexp" "time" + "github.com/dustin/go-humanize" + + "github.com/elastic/beats/v7/libbeat/reader/multiline" + "github.com/elastic/beats/v7/libbeat/reader/readfile" awscommon "github.com/elastic/beats/v7/x-pack/libbeat/common/aws" ) type config struct { - APITimeout time.Duration `config:"api_timeout"` - ExpandEventListFromField string `config:"expand_event_list_from_field"` - FileSelectors []FileSelectorCfg `config:"file_selectors"` - FipsEnabled bool `config:"fips_enabled"` - MaxNumberOfMessages int `config:"max_number_of_messages"` - QueueURL string `config:"queue_url" validate:"nonzero,required"` - VisibilityTimeout time.Duration `config:"visibility_timeout"` - AwsConfig awscommon.ConfigAWS `config:",inline"` + APITimeout time.Duration `config:"api_timeout"` + ExpandEventListFromField string `config:"expand_event_list_from_field"` + FileSelectors []FileSelectorCfg `config:"file_selectors"` + FipsEnabled bool `config:"fips_enabled"` + MaxNumberOfMessages int `config:"max_number_of_messages"` + QueueURL string `config:"queue_url" validate:"nonzero,required"` + VisibilityTimeout time.Duration `config:"visibility_timeout"` + AwsConfig awscommon.ConfigAWS `config:",inline"` + MaxBytes int `config:"max_bytes" validate:"min=0,nonzero"` + Multiline *multiline.Config `config:"multiline"` + LineTerminator readfile.LineTerminator `config:"line_terminator"` + Encoding string `config:"encoding"` + BufferSize int `config:"buffer_size"` } // FileSelectorCfg defines type and configuration of FileSelectors type FileSelectorCfg struct { - RegexString string `config:"regex"` - Regex *regexp.Regexp `config:",ignore"` - ExpandEventListFromField string `config:"expand_event_list_from_field"` + RegexString string `config:"regex"` + Regex *regexp.Regexp `config:",ignore"` + ExpandEventListFromField string `config:"expand_event_list_from_field"` + MaxBytes int `config:"max_bytes" validate:"min=0,nonzero"` + Multiline *multiline.Config `config:"multiline"` + LineTerminator readfile.LineTerminator `config:"line_terminator"` + Encoding string `config:"encoding"` + BufferSize int `config:"buffer_size"` } func defaultConfig() config { @@ -36,6 +50,9 @@ func defaultConfig() config { FipsEnabled: false, MaxNumberOfMessages: 5, VisibilityTimeout: 300 * time.Second, + LineTerminator: readfile.AutoLineTerminator, + MaxBytes: 10 * humanize.MiByte, + BufferSize: 16 * humanize.KiByte, } } diff --git a/x-pack/filebeat/input/awss3/ftest/sample2.txt b/x-pack/filebeat/input/awss3/ftest/sample2.txt new file mode 100644 index 00000000000..431f86fa9a8 --- /dev/null +++ b/x-pack/filebeat/input/awss3/ftest/sample2.txt @@ -0,0 +1,8 @@ + + A + B + C + + D + E + F diff --git a/x-pack/filebeat/input/awss3/s3_integration_test.go b/x-pack/filebeat/input/awss3/s3_integration_test.go index f2eca42787e..0c99686aebf 100644 --- a/x-pack/filebeat/input/awss3/s3_integration_test.go +++ b/x-pack/filebeat/input/awss3/s3_integration_test.go @@ -11,7 +11,7 @@ import ( "context" "net/http" "os" - "path/filepath" + "strings" "sync" "testing" "time" @@ -33,12 +33,11 @@ import ( ) const ( - fileName = "sample1.txt" + fileName1 = "sample1.txt" + fileName2 = "sample2.txt" visibilityTimeout = 300 * time.Second ) -var filePath = filepath.Join("ftest", fileName) - // GetConfigForTest function gets aws credentials for integration tests. func getConfigForTest(t *testing.T) config { t.Helper() @@ -77,8 +76,23 @@ func getConfigForTest(t *testing.T) config { } func defaultTestConfig() *common.Config { - return common.MustNewConfigFrom(map[string]interface{}{ + return common.MustNewConfigFrom(common.MapStr{ "queue_url": os.Getenv("QUEUE_URL"), + "file_selectors": []common.MapStr{ + { + "regex": strings.Replace(fileName1, ".", "\\.", -1), + "max_bytes": 4096, + }, + { + "regex": strings.Replace(fileName2, ".", "\\.", -1), + "max_bytes": 4096, + "multiline": common.MapStr{ + "pattern": "^") + assert.Contains(t, message, "") + default: + t.Fatalf("object key %s is unknown", objectKey) + } + } }) }