Skip to content

Commit

Permalink
[Filebeat] Refactor AWS S3 input with workers (#27199)
Browse files Browse the repository at this point in the history
* Refactor AWS S3 input with workers

This changes the AWS S3 input to allow it to process more SQS messages in parallel
by having workers that are fully utilized while there are SQS message to process.

The previous design processed SQS messages in batches ranging from 1 to 10 in size.
It waited until all messages were processed before requesting more. This left some
workers idle toward the the end of processing the batch. This also limited the maximum
number of messages processed in parallel to 10 because that is the largest request
size allowed by SQS.

The refactored input uses ephemeral goroutines as workers to process SQS messages. It
receives as many SQS messages as there are free workers. The total number of workers
is controlled by `max_number_of_messages` (same as before but without an upper limit).

Other changes

Prevent poison pill messages

When an S3 object processing error occurs the SQS message is returned to the after
the visibility timeout expires. This allows it to be reprocessed again or moved to
the SQS dead letter queue (if configured). But if no dead letter queue policy is
configured and the error is permanent (reprocessing won't fix it) then the message
would continuosly be reprocessed. On error the input will now check the
`ApproximateReceiveCount` attribute of the SQS message and delete it if it exceeds
the configured maximum retries.

Removal of api_timeout from S3 GetObject calls

The `api_timeout` has been removed from the S3 `GetObject` call. This limited the
maximum amount of time for processing the object since the response body is processed
as a stream while the request is open. Requests can still timeout in the server due
to inactivity.

Improved debug logs

The log messages have been enriched with more data about the related SQS message and
S3 object. For example the SQS `message_id`, `s3_bucket`, and `s3_object` are
included in some messages.

`DEBUG   [aws-s3.sqs_s3_event]   awss3/s3.go:127 End S3 object processing.       {"id": "test_id", "queue_url": "https://sqs.us-east-1.amazonaws.com/144492464627/filebeat-s3-integtest-lxlmx6", "message_id": "a11de9f9-0a68-4c4e-a09d-979b87602958", "s3_bucket": "filebeat-s3-integtest-lxlmx6", "s3_object": "events-array.json", "elapsed_time_ns": 23262327}`

Increased test coverage

The refactored input has about 88% test coverage.

The specific AWS API methods used by the input were turned into interfaces to allow
for easier testing. The unit tests mock the AWS interfaces.

The parts of the input were separted into three components listed below. There's a defined
interface for each to allow for mock testing there too. To test the interactions between
these components go-mock is used to generate mocks and then assert the expectations.

1. The SQS receiver. (sqs.go)
2. The S3 Notification Event handler. (sqs_s3_event.go)
3. The S3 Object reader. (s3.go)

Terraform setup for integration test

Setup for executing the integration tests is now handled by Terraform.
See _meta/terraform/README.md for instructions.

Benchmark test

I added a benchmark that tests the input in isolation with mocked SQS and S3 responeses.
It uses a 7KB cloudtrail json.gz file containing about 60 messages for its input.
This removes any variability related to the network, but also means these do not reflect
real-work rates. They can be used to measure the effect of future changes.

+-------------------+--------------------+------------------+--------------------+------+
| MAX MSGS INFLIGHT |   EVENTS PER SEC   | S3 BYTES PER SEC |     TIME (SEC)     | CPUS |
+-------------------+--------------------+------------------+--------------------+------+
|                 1 | 23019.782175720782 | 3.0 MB           |        1.257266458 |   12 |
|                 2 |  36237.53174269319 | 4.8 MB           |        1.158798571 |   12 |
|                 4 |  56456.84532752983 | 7.5 MB           |        1.138285351 |   12 |
|                 8 |  90485.15755430676 | 12 MB            |        1.117244007 |   12 |
|                16 |  103853.8984324643 | 14 MB            |        1.165541225 |   12 |
|                32 | 110380.28141417276 | 15 MB            |        1.110814345 |   12 |
|                64 | 116074.13735061679 | 15 MB            |        1.408100062 |   12 |
|               128 | 114854.80273666105 | 15 MB            | 1.5331357140000001 |   12 |
|               256 | 118767.73924992209 | 16 MB            |        2.041783413 |   12 |
|               512 |  122933.1033660647 | 16 MB            |        1.255463303 |   12 |
|              1024 | 124222.51861746894 | 17 MB            |        1.505765638 |   12 |
+-------------------+--------------------+------------------+--------------------+------+

Relates #25750

* Use InitializeAWSConfig

* Add s3Lister interface for mocking pagination of S3 ListObjects calls

* Add new config parameters to reference.yml

* Optimize uploading b/c it was slow in aws v2 sdk

(cherry picked from commit 7c76158)

# Conflicts:
#	x-pack/filebeat/input/awss3/collector.go
#	x-pack/filebeat/input/awss3/collector_test.go
  • Loading branch information
andrewkroh authored and mergify-bot committed Aug 12, 2021
1 parent 7e43fd9 commit 6b41f01
Show file tree
Hide file tree
Showing 42 changed files with 3,583 additions and 694 deletions.
3 changes: 2 additions & 1 deletion CHANGELOG.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -290,7 +290,7 @@ https://github.com/elastic/beats/compare/v7.12.1...v7.13.0[View commits]
- Improve Cisco ASA/FTD parsing of messages {pull}23766[23766]
- Better support for identity FW messages.
- Change network.bytes, source.bytes, and destination.bytes to long from integer since value can exceed integer capacity.
- Add descriptions for various processors for easier pipeline editing in Kibana UI.
- Add descriptions for various processors for easier pipeline editing in Kibana UI.
- Fix usage of unallowed ECS event.outcome values in Cisco ASA/FTD pipeline. {pull}24744[24744].
- Fix IPtables Pipeline and Ubiquiti dashboard. {issue}24878[24878] {pull}24928[24928]
- Strip Azure Eventhub connection string in debug logs. {pulll}25066[25066]
Expand Down Expand Up @@ -1103,6 +1103,7 @@ https://github.com/elastic/beats/compare/v7.9.3\...v7.10.0[View commits]
- Adding support for Microsoft 365 Defender (Microsoft Threat Protection) {pull}21446[21446]
- Adding support for FIPS in s3 input {pull}21446[21446]
- Update Okta documentation for new stateful restarts. {pull}22091[22091]
- Use workers in `aws-s3` input to process SQS messages. {pull}27199[27199]

*Heartbeat*

Expand Down
488 changes: 274 additions & 214 deletions NOTICE.txt

Large diffs are not rendered by default.

4 changes: 3 additions & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,7 @@ require (
github.com/gofrs/flock v0.7.2-0.20190320160742-5135e617513b
github.com/gofrs/uuid v3.3.0+incompatible
github.com/gogo/protobuf v1.3.1
github.com/golang/mock v1.6.0
github.com/golang/protobuf v1.4.3
github.com/golang/snappy v0.0.1
github.com/gomodule/redigo v1.8.3
Expand Down Expand Up @@ -127,6 +128,7 @@ require (
github.com/mitchellh/mapstructure v1.3.3
github.com/morikuni/aec v1.0.0 // indirect
github.com/oklog/ulid v1.3.1
github.com/olekukonko/tablewriter v0.0.5
github.com/opencontainers/go-digest v1.0.0-rc1.0.20190228220655-ac19fd6e7483 // indirect
github.com/opencontainers/image-spec v1.0.2-0.20190823105129-775207bd45b6 // indirect
github.com/otiai10/copy v1.2.0
Expand Down Expand Up @@ -173,7 +175,7 @@ require (
golang.org/x/sys v0.0.0-20210630005230-0f9fa26af87c
golang.org/x/text v0.3.5
golang.org/x/time v0.0.0-20191024005414-555d28b269f0
golang.org/x/tools v0.0.0-20200731060945-b5fad4ed8dd6
golang.org/x/tools v0.1.1
google.golang.org/api v0.15.0
google.golang.org/genproto v0.0.0-20210303154014-9728d6b83eeb
google.golang.org/grpc v1.29.1
Expand Down
9 changes: 8 additions & 1 deletion go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -355,6 +355,8 @@ github.com/golang/groupcache v0.0.0-20191227052852-215e87163ea7/go.mod h1:cIg4er
github.com/golang/mock v1.1.1/go.mod h1:oTYuIxOrZwtPieC+H1uAHpcLFnEyAGVDL/k47Jfbm0A=
github.com/golang/mock v1.2.0/go.mod h1:oTYuIxOrZwtPieC+H1uAHpcLFnEyAGVDL/k47Jfbm0A=
github.com/golang/mock v1.3.1/go.mod h1:sBzyDLLjw3U8JLTeZvSv8jJB+tU5PVekmnlKIyFUx0Y=
github.com/golang/mock v1.6.0 h1:ErTB+efbowRARo13NNdxyJji2egdxLGQhRaY+DUumQc=
github.com/golang/mock v1.6.0/go.mod h1:p6yTPP+5HYm5mzsMV8JkE6ZKdX+/wYM6Hr+LicevLPs=
github.com/golang/protobuf v1.2.0/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U=
github.com/golang/protobuf v1.3.1/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U=
github.com/golang/protobuf v1.3.2/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U=
Expand Down Expand Up @@ -531,6 +533,8 @@ github.com/mattn/go-isatty v0.0.8/go.mod h1:Iq45c/XA43vh69/j3iqttzPXn0bhXyGjM0Hd
github.com/mattn/go-isatty v0.0.11/go.mod h1:PhnuNfih5lzO57/f3n+odYbM4JtupLOxQOAqxQCu2WE=
github.com/mattn/go-isatty v0.0.12 h1:wuysRhFDzyxgEmMf5xjvJ2M9dZoWAXNNr5LSBS7uHXY=
github.com/mattn/go-isatty v0.0.12/go.mod h1:cbi8OIDigv2wuxKPP5vlRcQ1OAZbq2CE4Kysco4FUpU=
github.com/mattn/go-runewidth v0.0.9 h1:Lm995f3rfxdpd6TSmuVCHVb/QhupuXlYr8sCI/QdE+0=
github.com/mattn/go-runewidth v0.0.9/go.mod h1:H031xJmbD/WCDINGzjvQ9THkh0rPKHF+m2gUSrubnMI=
github.com/mattn/go-sqlite3 v1.9.0 h1:pDRiWfl+++eC2FEFRy6jXmQlvp4Yh3z1MJKg4UeYM/4=
github.com/mattn/go-sqlite3 v1.9.0/go.mod h1:FPy6KqzDD04eiIsT53CuJW3U88zkxoIYsOqkbpncsNc=
github.com/matttproud/golang_protobuf_extensions v1.0.1/go.mod h1:D8He9yQNgCq6Z5Ld7szi9bcBfOoFv/3dc6xSMkL2PC0=
Expand Down Expand Up @@ -566,6 +570,8 @@ github.com/niemeyer/pretty v0.0.0-20200227124842-a10e7caefd8e h1:fD57ERR4JtEqsWb
github.com/niemeyer/pretty v0.0.0-20200227124842-a10e7caefd8e/go.mod h1:zD1mROLANZcx1PVRCS0qkT7pwLkGfwJo4zjcN/Tysno=
github.com/oklog/ulid v1.3.1 h1:EGfNDEx6MqHz8B3uNV6QAib1UR2Lm97sHi3ocA6ESJ4=
github.com/oklog/ulid v1.3.1/go.mod h1:CirwcVhetQ6Lv90oh/F+FBtV6XMibvdAFo93nm5qn4U=
github.com/olekukonko/tablewriter v0.0.5 h1:P2Ga83D34wi1o9J6Wh1mRuqd4mF/x/lgBS7N7AbDhec=
github.com/olekukonko/tablewriter v0.0.5/go.mod h1:hPp6KlRPjbx+hW8ykQs1w3UBbZlj6HuIJcUGPhkA7kY=
github.com/onsi/ginkgo v0.0.0-20170829012221-11459a886d9c/go.mod h1:lLunBs/Ym6LB5Z9jYTR76FiuTmxDTDusOGeTQH+WWjE=
github.com/onsi/ginkgo v1.5.0/go.mod h1:lLunBs/Ym6LB5Z9jYTR76FiuTmxDTDusOGeTQH+WWjE=
github.com/onsi/ginkgo v1.6.0/go.mod h1:lLunBs/Ym6LB5Z9jYTR76FiuTmxDTDusOGeTQH+WWjE=
Expand Down Expand Up @@ -800,8 +806,9 @@ golang.org/x/mobile v0.0.0-20190312151609-d3739f865fa6/go.mod h1:z+o9i4GpDbdi3rU
golang.org/x/mobile v0.0.0-20190719004257-d2bd2a29d028/go.mod h1:E/iHnbuqvinMTCcRqshq8CkpyQDoeVncDDYHnLhea+o=
golang.org/x/mod v0.0.0-20190513183733-4bf6d317e70e/go.mod h1:mXi4GBBbnImb6dmsKGUJ2LatrhH/nqhxcFungHvyanc=
golang.org/x/mod v0.1.0/go.mod h1:0QHyrYULN0/3qlju5TqG8bIK38QM8yzMo5ekMj3DlcY=
golang.org/x/mod v0.3.0 h1:RM4zey1++hCTbCVQfnWeKs9/IEsaBLA8vTkd0WVtmH4=
golang.org/x/mod v0.3.0/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA=
golang.org/x/mod v0.4.2 h1:Gz96sIWK3OalVv/I/qNygP42zyoKp3xptRVCWRFEBvo=
golang.org/x/mod v0.4.2/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA=
golang.org/x/net v0.0.0-20180724234803-3673e40ba225/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4=
golang.org/x/net v0.0.0-20180826012351-8a410e7b638d/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4=
golang.org/x/net v0.0.0-20180906233101-161cd47e91fd/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4=
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,16 +67,29 @@
#session_token: '${AWS_SESSION_TOKEN:"”}'
#credential_profile_name: test-aws-s3-input

# Queue url (required) to receive queue messages from
# SQS queue URL to receive messages from (required).
#queue_url: "https://sqs.us-east-1.amazonaws.com/1234/test-aws-s3-logs-queue"

# The duration (in seconds) that the received messages are hidden from subsequent
# retrieve requests after being retrieved by a ReceiveMessage request.
#visibility_timeout: 300
# Maximum number of SQS messages that can be inflight at any time.
#max_number_of_messages: 5

# Maximum duration of an AWS API call (excluding S3 GetObject calls).
#api_timeout: 120s

# Duration that received SQS messages are hidden from subsequent
# requests after being retrieved by a ReceiveMessage request.
#visibility_timeout: 300s

# List of S3 object metadata keys to include in events.
#include_s3_metadata: []

# The max number of times an SQS message should be received (retried) before deleting it.
#sqs.max_receive_count: 5

# Maximum duration for which the SQS ReceiveMessage call waits for a message
# to arrive in the queue before returning.
#sqs.wait_time: 20s

#------------------------------ AWS CloudWatch input --------------------------------
# Beta: Config options for AWS CloudWatch input
#- type: aws-cloudwatch
Expand Down
50 changes: 37 additions & 13 deletions x-pack/filebeat/docs/inputs/input-aws-s3.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -11,11 +11,15 @@
<titleabbrev>AWS S3</titleabbrev>
++++

Use the `aws-s3` input to retrieve logs from S3 objects that are pointed by
messages from specific SQS queues. This input can, for example, be used to
receive S3 server access logs to monitor detailed records for the requests that
Use the `aws-s3` input to retrieve logs from S3 objects that are pointed to by
S3 notification events read from an SQS queue. This input can, for example, be
used to receive S3 access logs to monitor detailed records for the requests that
are made to a bucket.

This input depends on S3 notifications delivered to an SQS queue for
`s3:ObjectCreated:*` events. You must create an SQS queue and configure S3
to publish events to the queue.

When processing a S3 object which pointed by a SQS message, if half of the set
visibility timeout passed and the processing is still ongoing, then the
visibility timeout of that SQS message will be reset to make sure the message
Expand All @@ -39,8 +43,9 @@ The `aws-s3` input supports the following configuration options plus the
==== `api_timeout`

The maximum duration of the AWS API call. If it exceeds the timeout, the AWS API
call will be interrupted. The default AWS API call timeout for a message is 120
seconds. The maximum is half of the visibility timeout value.
call will be interrupted. The default AWS API timeout is `120s`.

The API timeout must be longer than the `sqs.wait_time` value.

[id="input-{type}-buffer_size"]
[float]
Expand Down Expand Up @@ -162,9 +167,8 @@ The default is `10 MiB`.
[float]
==== `max_number_of_messages`

The maximum number of messages to return. Amazon SQS never returns more messages
than this value (however, fewer messages might be returned). Valid values: 1 to
10. Default: 5.
The maximum number of SQS messages that can be inflight at any time. Defaults
to 5.

[id="input-{type}-parsers"]
[float]
Expand Down Expand Up @@ -212,11 +216,31 @@ URL of the AWS SQS queue that messages will be received from. Required.
[float]
==== `visibility_timeout`

The duration that the received messages are hidden from subsequent retrieve
requests after being retrieved by a ReceiveMessage request. This value needs to
be a lot bigger than {beatname_uc} collection frequency so if it took too long
to read the S3 log, this SQS message will not be reprocessed. The default
visibility timeout for a message is 300 seconds. The maximum is 12 hours.
The duration that the received SQS messages are hidden from subsequent retrieve
requests after being retrieved by a `ReceiveMessage` request. The default
visibility timeout is `300s`. The maximum is `12h`. {beatname_uc} will
automatically reset the visibility timeout of a message after 1/2 of the
duration passes to prevent a message that is still being processed from
returning to the queue.

[float]
==== `sqs.max_receive_count`

The maximum number of times a SQS message should be received (retried) before
deleting it. This feature prevents poison-pill messages (messages that can be
received but can't be processed) from consuming resources. The number of times
a message has been received is tracked using the `ApproximateReceiveCount` SQS
attribute. The default value is 5.

If you have configured a dead letter queue then you can set this value to
`-1` to disable deletion on failure.

[float]
==== `sqs.wait_time`

The maximum duration that an SQS `ReceiveMessage` call should wait for a message
to arrive in the queue before returning. The default value is `20s`. The maximum
value is `20s`.

[float]
==== `aws credentials`
Expand Down
21 changes: 17 additions & 4 deletions x-pack/filebeat/filebeat.reference.yml
Original file line number Diff line number Diff line change
Expand Up @@ -3117,16 +3117,29 @@ filebeat.inputs:
#session_token: '${AWS_SESSION_TOKEN:"”}'
#credential_profile_name: test-aws-s3-input

# Queue url (required) to receive queue messages from
# SQS queue URL to receive messages from (required).
#queue_url: "https://sqs.us-east-1.amazonaws.com/1234/test-aws-s3-logs-queue"

# The duration (in seconds) that the received messages are hidden from subsequent
# retrieve requests after being retrieved by a ReceiveMessage request.
#visibility_timeout: 300
# Maximum number of SQS messages that can be inflight at any time.
#max_number_of_messages: 5

# Maximum duration of an AWS API call (excluding S3 GetObject calls).
#api_timeout: 120s

# Duration that received SQS messages are hidden from subsequent
# requests after being retrieved by a ReceiveMessage request.
#visibility_timeout: 300s

# List of S3 object metadata keys to include in events.
#include_s3_metadata: []

# The max number of times an SQS message should be received (retried) before deleting it.
#sqs.max_receive_count: 5

# Maximum duration for which the SQS ReceiveMessage call waits for a message
# to arrive in the queue before returning.
#sqs.wait_time: 20s

#------------------------------ AWS CloudWatch input --------------------------------
# Beta: Config options for AWS CloudWatch input
#- type: aws-cloudwatch
Expand Down
62 changes: 0 additions & 62 deletions x-pack/filebeat/input/awss3/_meta/s3-input.asciidoc

This file was deleted.

3 changes: 3 additions & 0 deletions x-pack/filebeat/input/awss3/_meta/terraform/.gitignore
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
terraform/
outputs.yml
*.tfstate*
57 changes: 57 additions & 0 deletions x-pack/filebeat/input/awss3/_meta/terraform/.terraform.lock.hcl

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

43 changes: 43 additions & 0 deletions x-pack/filebeat/input/awss3/_meta/terraform/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
# Terraform setup for AWS S3 Input Integration Tests

This directory contains a Terrafrom module that creates the AWS resources needed
for executing the integration tests for the `aws-s3` Filebeat input. It creates
an S3 bucket and SQS queue and configures S3 `ObjectCreated:*` notifications to
be delivered to SQS.

It outputs configuration information that is consumed by the tests to
`outputs.yml`. The AWS resources are randomly named to prevent name collisions
between multiple users.

### Usage

You must have the appropriate AWS environment variables for authentication set
before running Terraform or the integration tests. The AWS key must be
authorized to create and destroy S3 buckets and SQS queues.

1. Execute terraform in this directory to create the resources. This will also
write the `outputs.yml`.

`terraform apply`

2. (Optional) View the output configuration.

```yaml
"aws_region": "us-east-1"
"bucket_name": "filebeat-s3-integtest-8iok1h"
"queue_url": "https://sqs.us-east-1.amazonaws.com/144492464627/filebeat-s3-integtest-8iok1h"
```
2. Execute the integration test.
```
cd x-pack/filebeat/inputs/awss3
go test -tags aws,integration -run TestInputRun -v .
```

3. Cleanup AWS resources. Execute terraform to remove the SQS queue and delete
the S3 bucket and its contents.

`terraform destroy`


Loading

0 comments on commit 6b41f01

Please sign in to comment.