Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Filebeat] Refactor AWS S3 input with workers #27199

Merged
merged 10 commits into from
Aug 12, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion CHANGELOG.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -295,7 +295,7 @@ https://github.com/elastic/beats/compare/v7.12.1...v7.13.0[View commits]
- Improve Cisco ASA/FTD parsing of messages {pull}23766[23766]
- Better support for identity FW messages.
- Change network.bytes, source.bytes, and destination.bytes to long from integer since value can exceed integer capacity.
- Add descriptions for various processors for easier pipeline editing in Kibana UI.
- Add descriptions for various processors for easier pipeline editing in Kibana UI.
- Fix usage of unallowed ECS event.outcome values in Cisco ASA/FTD pipeline. {pull}24744[24744].
- Fix IPtables Pipeline and Ubiquiti dashboard. {issue}24878[24878] {pull}24928[24928]
- Strip Azure Eventhub connection string in debug logs. {pulll}25066[25066]
Expand Down Expand Up @@ -1108,6 +1108,7 @@ https://github.com/elastic/beats/compare/v7.9.3\...v7.10.0[View commits]
- Adding support for Microsoft 365 Defender (Microsoft Threat Protection) {pull}21446[21446]
- Adding support for FIPS in s3 input {pull}21446[21446]
- Update Okta documentation for new stateful restarts. {pull}22091[22091]
- Use workers in `aws-s3` input to process SQS messages. {pull}27199[27199]

*Heartbeat*

Expand Down
488 changes: 274 additions & 214 deletions NOTICE.txt

Large diffs are not rendered by default.

4 changes: 3 additions & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,7 @@ require (
github.com/gofrs/flock v0.7.2-0.20190320160742-5135e617513b
github.com/gofrs/uuid v3.3.0+incompatible
github.com/gogo/protobuf v1.3.1
github.com/golang/mock v1.6.0
github.com/golang/protobuf v1.4.3
github.com/golang/snappy v0.0.1
github.com/gomodule/redigo v1.8.3
Expand Down Expand Up @@ -127,6 +128,7 @@ require (
github.com/mitchellh/mapstructure v1.3.3
github.com/morikuni/aec v1.0.0 // indirect
github.com/oklog/ulid v1.3.1
github.com/olekukonko/tablewriter v0.0.5
github.com/opencontainers/go-digest v1.0.0-rc1.0.20190228220655-ac19fd6e7483 // indirect
github.com/opencontainers/image-spec v1.0.2-0.20190823105129-775207bd45b6 // indirect
github.com/otiai10/copy v1.2.0
Expand Down Expand Up @@ -173,7 +175,7 @@ require (
golang.org/x/sys v0.0.0-20210630005230-0f9fa26af87c
golang.org/x/text v0.3.5
golang.org/x/time v0.0.0-20191024005414-555d28b269f0
golang.org/x/tools v0.0.0-20200731060945-b5fad4ed8dd6
golang.org/x/tools v0.1.1
google.golang.org/api v0.15.0
google.golang.org/genproto v0.0.0-20210303154014-9728d6b83eeb
google.golang.org/grpc v1.29.1
Expand Down
9 changes: 8 additions & 1 deletion go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -355,6 +355,8 @@ github.com/golang/groupcache v0.0.0-20191227052852-215e87163ea7/go.mod h1:cIg4er
github.com/golang/mock v1.1.1/go.mod h1:oTYuIxOrZwtPieC+H1uAHpcLFnEyAGVDL/k47Jfbm0A=
github.com/golang/mock v1.2.0/go.mod h1:oTYuIxOrZwtPieC+H1uAHpcLFnEyAGVDL/k47Jfbm0A=
github.com/golang/mock v1.3.1/go.mod h1:sBzyDLLjw3U8JLTeZvSv8jJB+tU5PVekmnlKIyFUx0Y=
github.com/golang/mock v1.6.0 h1:ErTB+efbowRARo13NNdxyJji2egdxLGQhRaY+DUumQc=
github.com/golang/mock v1.6.0/go.mod h1:p6yTPP+5HYm5mzsMV8JkE6ZKdX+/wYM6Hr+LicevLPs=
github.com/golang/protobuf v1.2.0/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U=
github.com/golang/protobuf v1.3.1/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U=
github.com/golang/protobuf v1.3.2/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U=
Expand Down Expand Up @@ -533,6 +535,8 @@ github.com/mattn/go-isatty v0.0.8/go.mod h1:Iq45c/XA43vh69/j3iqttzPXn0bhXyGjM0Hd
github.com/mattn/go-isatty v0.0.11/go.mod h1:PhnuNfih5lzO57/f3n+odYbM4JtupLOxQOAqxQCu2WE=
github.com/mattn/go-isatty v0.0.12 h1:wuysRhFDzyxgEmMf5xjvJ2M9dZoWAXNNr5LSBS7uHXY=
github.com/mattn/go-isatty v0.0.12/go.mod h1:cbi8OIDigv2wuxKPP5vlRcQ1OAZbq2CE4Kysco4FUpU=
github.com/mattn/go-runewidth v0.0.9 h1:Lm995f3rfxdpd6TSmuVCHVb/QhupuXlYr8sCI/QdE+0=
github.com/mattn/go-runewidth v0.0.9/go.mod h1:H031xJmbD/WCDINGzjvQ9THkh0rPKHF+m2gUSrubnMI=
github.com/mattn/go-sqlite3 v1.9.0 h1:pDRiWfl+++eC2FEFRy6jXmQlvp4Yh3z1MJKg4UeYM/4=
github.com/mattn/go-sqlite3 v1.9.0/go.mod h1:FPy6KqzDD04eiIsT53CuJW3U88zkxoIYsOqkbpncsNc=
github.com/matttproud/golang_protobuf_extensions v1.0.1/go.mod h1:D8He9yQNgCq6Z5Ld7szi9bcBfOoFv/3dc6xSMkL2PC0=
Expand Down Expand Up @@ -568,6 +572,8 @@ github.com/niemeyer/pretty v0.0.0-20200227124842-a10e7caefd8e h1:fD57ERR4JtEqsWb
github.com/niemeyer/pretty v0.0.0-20200227124842-a10e7caefd8e/go.mod h1:zD1mROLANZcx1PVRCS0qkT7pwLkGfwJo4zjcN/Tysno=
github.com/oklog/ulid v1.3.1 h1:EGfNDEx6MqHz8B3uNV6QAib1UR2Lm97sHi3ocA6ESJ4=
github.com/oklog/ulid v1.3.1/go.mod h1:CirwcVhetQ6Lv90oh/F+FBtV6XMibvdAFo93nm5qn4U=
github.com/olekukonko/tablewriter v0.0.5 h1:P2Ga83D34wi1o9J6Wh1mRuqd4mF/x/lgBS7N7AbDhec=
github.com/olekukonko/tablewriter v0.0.5/go.mod h1:hPp6KlRPjbx+hW8ykQs1w3UBbZlj6HuIJcUGPhkA7kY=
github.com/onsi/ginkgo v0.0.0-20170829012221-11459a886d9c/go.mod h1:lLunBs/Ym6LB5Z9jYTR76FiuTmxDTDusOGeTQH+WWjE=
github.com/onsi/ginkgo v1.5.0/go.mod h1:lLunBs/Ym6LB5Z9jYTR76FiuTmxDTDusOGeTQH+WWjE=
github.com/onsi/ginkgo v1.6.0/go.mod h1:lLunBs/Ym6LB5Z9jYTR76FiuTmxDTDusOGeTQH+WWjE=
Expand Down Expand Up @@ -802,8 +808,9 @@ golang.org/x/mobile v0.0.0-20190312151609-d3739f865fa6/go.mod h1:z+o9i4GpDbdi3rU
golang.org/x/mobile v0.0.0-20190719004257-d2bd2a29d028/go.mod h1:E/iHnbuqvinMTCcRqshq8CkpyQDoeVncDDYHnLhea+o=
golang.org/x/mod v0.0.0-20190513183733-4bf6d317e70e/go.mod h1:mXi4GBBbnImb6dmsKGUJ2LatrhH/nqhxcFungHvyanc=
golang.org/x/mod v0.1.0/go.mod h1:0QHyrYULN0/3qlju5TqG8bIK38QM8yzMo5ekMj3DlcY=
golang.org/x/mod v0.3.0 h1:RM4zey1++hCTbCVQfnWeKs9/IEsaBLA8vTkd0WVtmH4=
golang.org/x/mod v0.3.0/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA=
golang.org/x/mod v0.4.2 h1:Gz96sIWK3OalVv/I/qNygP42zyoKp3xptRVCWRFEBvo=
golang.org/x/mod v0.4.2/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA=
golang.org/x/net v0.0.0-20180724234803-3673e40ba225/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4=
golang.org/x/net v0.0.0-20180826012351-8a410e7b638d/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4=
golang.org/x/net v0.0.0-20180906233101-161cd47e91fd/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4=
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,16 +67,29 @@
#session_token: '${AWS_SESSION_TOKEN:"”}'
#credential_profile_name: test-aws-s3-input

# Queue url (required) to receive queue messages from
# SQS queue URL to receive messages from (required).
#queue_url: "https://sqs.us-east-1.amazonaws.com/1234/test-aws-s3-logs-queue"

# The duration (in seconds) that the received messages are hidden from subsequent
# retrieve requests after being retrieved by a ReceiveMessage request.
#visibility_timeout: 300
# Maximum number of SQS messages that can be inflight at any time.
#max_number_of_messages: 5

# Maximum duration of an AWS API call (excluding S3 GetObject calls).
#api_timeout: 120s

# Duration that received SQS messages are hidden from subsequent
# requests after being retrieved by a ReceiveMessage request.
#visibility_timeout: 300s

# List of S3 object metadata keys to include in events.
#include_s3_metadata: []

# The max number of times an SQS message should be received (retried) before deleting it.
#sqs.max_receive_count: 5

# Maximum duration for which the SQS ReceiveMessage call waits for a message
# to arrive in the queue before returning.
#sqs.wait_time: 20s

#------------------------------ AWS CloudWatch input --------------------------------
# Beta: Config options for AWS CloudWatch input
#- type: aws-cloudwatch
Expand Down
50 changes: 37 additions & 13 deletions x-pack/filebeat/docs/inputs/input-aws-s3.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -11,11 +11,15 @@
<titleabbrev>AWS S3</titleabbrev>
++++

Use the `aws-s3` input to retrieve logs from S3 objects that are pointed by
messages from specific SQS queues. This input can, for example, be used to
receive S3 server access logs to monitor detailed records for the requests that
Use the `aws-s3` input to retrieve logs from S3 objects that are pointed to by
S3 notification events read from an SQS queue. This input can, for example, be
used to receive S3 access logs to monitor detailed records for the requests that
are made to a bucket.

This input depends on S3 notifications delivered to an SQS queue for
`s3:ObjectCreated:*` events. You must create an SQS queue and configure S3
to publish events to the queue.

When processing a S3 object which pointed by a SQS message, if half of the set
visibility timeout passed and the processing is still ongoing, then the
visibility timeout of that SQS message will be reset to make sure the message
Expand All @@ -39,8 +43,9 @@ The `aws-s3` input supports the following configuration options plus the
==== `api_timeout`

The maximum duration of the AWS API call. If it exceeds the timeout, the AWS API
call will be interrupted. The default AWS API call timeout for a message is 120
seconds. The maximum is half of the visibility timeout value.
call will be interrupted. The default AWS API timeout is `120s`.

The API timeout must be longer than the `sqs.wait_time` value.

[id="input-{type}-buffer_size"]
[float]
Expand Down Expand Up @@ -162,9 +167,8 @@ The default is `10 MiB`.
[float]
==== `max_number_of_messages`

The maximum number of messages to return. Amazon SQS never returns more messages
than this value (however, fewer messages might be returned). Valid values: 1 to
10. Default: 5.
The maximum number of SQS messages that can be inflight at any time. Defaults
to 5.

[id="input-{type}-parsers"]
[float]
Expand Down Expand Up @@ -212,11 +216,31 @@ URL of the AWS SQS queue that messages will be received from. Required.
[float]
==== `visibility_timeout`

The duration that the received messages are hidden from subsequent retrieve
requests after being retrieved by a ReceiveMessage request. This value needs to
be a lot bigger than {beatname_uc} collection frequency so if it took too long
to read the S3 log, this SQS message will not be reprocessed. The default
visibility timeout for a message is 300 seconds. The maximum is 12 hours.
The duration that the received SQS messages are hidden from subsequent retrieve
requests after being retrieved by a `ReceiveMessage` request. The default
visibility timeout is `300s`. The maximum is `12h`. {beatname_uc} will
automatically reset the visibility timeout of a message after 1/2 of the
duration passes to prevent a message that is still being processed from
returning to the queue.

[float]
==== `sqs.max_receive_count`

The maximum number of times a SQS message should be received (retried) before
deleting it. This feature prevents poison-pill messages (messages that can be
received but can't be processed) from consuming resources. The number of times
a message has been received is tracked using the `ApproximateReceiveCount` SQS
attribute. The default value is 5.

If you have configured a dead letter queue then you can set this value to
`-1` to disable deletion on failure.

[float]
==== `sqs.wait_time`

The maximum duration that an SQS `ReceiveMessage` call should wait for a message
to arrive in the queue before returning. The default value is `20s`. The maximum
value is `20s`.

[float]
==== `aws credentials`
Expand Down
21 changes: 17 additions & 4 deletions x-pack/filebeat/filebeat.reference.yml
Original file line number Diff line number Diff line change
Expand Up @@ -3117,16 +3117,29 @@ filebeat.inputs:
#session_token: '${AWS_SESSION_TOKEN:"”}'
#credential_profile_name: test-aws-s3-input

# Queue url (required) to receive queue messages from
# SQS queue URL to receive messages from (required).
#queue_url: "https://sqs.us-east-1.amazonaws.com/1234/test-aws-s3-logs-queue"

# The duration (in seconds) that the received messages are hidden from subsequent
# retrieve requests after being retrieved by a ReceiveMessage request.
#visibility_timeout: 300
# Maximum number of SQS messages that can be inflight at any time.
#max_number_of_messages: 5

# Maximum duration of an AWS API call (excluding S3 GetObject calls).
#api_timeout: 120s

# Duration that received SQS messages are hidden from subsequent
# requests after being retrieved by a ReceiveMessage request.
#visibility_timeout: 300s

# List of S3 object metadata keys to include in events.
#include_s3_metadata: []

# The max number of times an SQS message should be received (retried) before deleting it.
#sqs.max_receive_count: 5

# Maximum duration for which the SQS ReceiveMessage call waits for a message
# to arrive in the queue before returning.
#sqs.wait_time: 20s

#------------------------------ AWS CloudWatch input --------------------------------
# Beta: Config options for AWS CloudWatch input
#- type: aws-cloudwatch
Expand Down
62 changes: 0 additions & 62 deletions x-pack/filebeat/input/awss3/_meta/s3-input.asciidoc

This file was deleted.

3 changes: 3 additions & 0 deletions x-pack/filebeat/input/awss3/_meta/terraform/.gitignore
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
terraform/
outputs.yml
*.tfstate*
57 changes: 57 additions & 0 deletions x-pack/filebeat/input/awss3/_meta/terraform/.terraform.lock.hcl

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

43 changes: 43 additions & 0 deletions x-pack/filebeat/input/awss3/_meta/terraform/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
# Terraform setup for AWS S3 Input Integration Tests

This directory contains a Terrafrom module that creates the AWS resources needed
for executing the integration tests for the `aws-s3` Filebeat input. It creates
an S3 bucket and SQS queue and configures S3 `ObjectCreated:*` notifications to
be delivered to SQS.

It outputs configuration information that is consumed by the tests to
`outputs.yml`. The AWS resources are randomly named to prevent name collisions
between multiple users.

### Usage

You must have the appropriate AWS environment variables for authentication set
before running Terraform or the integration tests. The AWS key must be
authorized to create and destroy S3 buckets and SQS queues.

1. Execute terraform in this directory to create the resources. This will also
write the `outputs.yml`.

`terraform apply`

2. (Optional) View the output configuration.

```yaml
"aws_region": "us-east-1"
"bucket_name": "filebeat-s3-integtest-8iok1h"
"queue_url": "https://sqs.us-east-1.amazonaws.com/144492464627/filebeat-s3-integtest-8iok1h"
```
2. Execute the integration test.
```
cd x-pack/filebeat/inputs/awss3
go test -tags aws,integration -run TestInputRun -v .
Comment on lines +34 to +35
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@kaiyan-sheng would this work with our existing AWS ci integration? that would be great.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we have the aws tests run weekly right now in the CI and this would work. @v1v Do you know if this is easy to add?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The AWS tests run on a weekly basis, as you said, and it runs the commands defined in:

  • cloud:
    cloud: "mage build test"
    withModule: true ## run the ITs only if the changeset affects a specific module.
    dirs: ## run the cloud tests for the given modules.
    - "x-pack/metricbeat/module/aws"
    when: ## Override the top-level when.
    parameters:
    - "awsCloudTests"
    comments:
    - "/test x-pack/metricbeat for aws cloud"
    labels:
    - "aws"
    stage: extended

mage build test runs in the folder x-pack/metricbeat so, if the above-mentioned command is part of the mage then it should work.

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the test requires applying a terraform file (and destroying it at the end):
https://github.com/elastic/beats/blob/master/x-pack/filebeat/input/awss3/_meta/terraform/README.md#usage

will this happen?

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

```

3. Cleanup AWS resources. Execute terraform to remove the SQS queue and delete
the S3 bucket and its contents.

`terraform destroy`


Loading