Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[7.x](backport #27199) [Filebeat] Refactor AWS S3 input with workers #27338

Merged
merged 1 commit into from
Aug 13, 2021

Conversation

mergify[bot]
Copy link
Contributor

@mergify mergify bot commented Aug 12, 2021

This is an automatic backport of pull request #27199 done by Mergify.
Cherry-pick of 7c76158 has failed:

On branch mergify/bp/7.x/pr-27199
Your branch is up to date with 'origin/7.x'.

You are currently cherry-picking commit 7c7615875.
  (fix conflicts and run "git cherry-pick --continue")
  (use "git cherry-pick --skip" to skip this patch)
  (use "git cherry-pick --abort" to cancel the cherry-pick operation)

Changes to be committed:
	modified:   CHANGELOG.asciidoc
	modified:   NOTICE.txt
	modified:   go.mod
	modified:   go.sum
	modified:   x-pack/filebeat/_meta/config/filebeat.inputs.reference.xpack.yml.tmpl
	modified:   x-pack/filebeat/docs/inputs/input-aws-s3.asciidoc
	modified:   x-pack/filebeat/filebeat.reference.yml
	deleted:    x-pack/filebeat/input/awss3/_meta/s3-input.asciidoc
	new file:   x-pack/filebeat/input/awss3/_meta/terraform/.gitignore
	new file:   x-pack/filebeat/input/awss3/_meta/terraform/.terraform.lock.hcl
	new file:   x-pack/filebeat/input/awss3/_meta/terraform/README.md
	new file:   x-pack/filebeat/input/awss3/_meta/terraform/main.tf
	new file:   x-pack/filebeat/input/awss3/_meta/terraform/outputs.tf
	new file:   x-pack/filebeat/input/awss3/_meta/terraform/variables.tf
	new file:   x-pack/filebeat/input/awss3/acker.go
	new file:   x-pack/filebeat/input/awss3/acker_test.go
	modified:   x-pack/filebeat/input/awss3/config.go
	modified:   x-pack/filebeat/input/awss3/config_test.go
	modified:   x-pack/filebeat/input/awss3/input.go
	new file:   x-pack/filebeat/input/awss3/input_benchmark_test.go
	new file:   x-pack/filebeat/input/awss3/input_integration_test.go
	new file:   x-pack/filebeat/input/awss3/interfaces.go
	new file:   x-pack/filebeat/input/awss3/mock_interfaces_test.go
	new file:   x-pack/filebeat/input/awss3/mock_publisher_test.go
	new file:   x-pack/filebeat/input/awss3/s3.go
	deleted:    x-pack/filebeat/input/awss3/s3_integration_test.go
	new file:   x-pack/filebeat/input/awss3/s3_test.go
	new file:   x-pack/filebeat/input/awss3/semaphore.go
	new file:   x-pack/filebeat/input/awss3/semaphore_test.go
	new file:   x-pack/filebeat/input/awss3/sqs.go
	new file:   x-pack/filebeat/input/awss3/sqs_s3_event.go
	new file:   x-pack/filebeat/input/awss3/sqs_s3_event_test.go
	new file:   x-pack/filebeat/input/awss3/sqs_test.go
	new file:   x-pack/filebeat/input/awss3/testdata/aws-cloudtrail.json.gz
	new file:   x-pack/filebeat/input/awss3/testdata/events-array.json
	new file:   x-pack/filebeat/input/awss3/testdata/invalid.json
	new file:   x-pack/filebeat/input/awss3/testdata/log.json
	new file:   x-pack/filebeat/input/awss3/testdata/log.ndjson
	renamed:    x-pack/filebeat/input/awss3/testdata/sample1.txt -> x-pack/filebeat/input/awss3/testdata/log.txt
	new file:   x-pack/filebeat/input/awss3/testdata/multiline.json
	new file:   x-pack/filebeat/input/awss3/testdata/multiline.json.gz
	renamed:    x-pack/filebeat/input/awss3/testdata/sample2.txt -> x-pack/filebeat/input/awss3/testdata/multiline.txt

Unmerged paths:
  (use "git add/rm <file>..." as appropriate to mark resolution)
	deleted by them: x-pack/filebeat/input/awss3/collector.go
	deleted by them: x-pack/filebeat/input/awss3/collector_test.go

To fix up this pull request, you can check it out locally. See documentation: https://docs.github.com/en/github/collaborating-with-pull-requests/reviewing-changes-in-pull-requests/checking-out-pull-requests-locally


Mergify commands and options

More conditions and actions can be found in the documentation.

You can also trigger Mergify actions by commenting on this pull request:

  • @Mergifyio refresh will re-evaluate the rules
  • @Mergifyio rebase will rebase this PR on its base branch
  • @Mergifyio update will merge the base branch into this PR
  • @Mergifyio backport <destination> will backport this PR on <destination> branch

Additionally, on Mergify dashboard you can:

  • look at your merge queues
  • generate the Mergify configuration with the config editor.

Finally, you can contact us on https://mergify.io/

@mergify mergify bot added backport conflicts There is a conflict in the backported pull request labels Aug 12, 2021
@botelastic botelastic bot added the needs_team Indicates that the issue/PR needs a Team:* label label Aug 12, 2021
@elasticmachine
Copy link
Collaborator

Pinging @elastic/integrations (Team:Integrations)

@elasticmachine
Copy link
Collaborator

Pinging @elastic/security-external-integrations (Team:Security-External Integrations)

@botelastic botelastic bot removed the needs_team Indicates that the issue/PR needs a Team:* label label Aug 12, 2021
@elasticmachine
Copy link
Collaborator

elasticmachine commented Aug 12, 2021

💚 Build Succeeded

the below badges are clickable and redirect to their specific view in the CI or DOCS
Pipeline View Test View Changes Artifacts preview preview

Expand to view the summary

Build stats

  • Start Time: 2021-08-13T00:56:53.925+0000

  • Duration: 211 min 10 sec

  • Commit: 39b0e75

Test stats 🧪

Test Results
Failed 0
Passed 52531
Skipped 5265
Total 57796

Trends 🧪

Image of Build Times

Image of Tests

💚 Flaky test report

Tests succeeded.

Expand to view the summary

Test stats 🧪

Test Results
Failed 0
Passed 52531
Skipped 5265
Total 57796

* Refactor AWS S3 input with workers

This changes the AWS S3 input to allow it to process more SQS messages in parallel
by having workers that are fully utilized while there are SQS message to process.

The previous design processed SQS messages in batches ranging from 1 to 10 in size.
It waited until all messages were processed before requesting more. This left some
workers idle toward the the end of processing the batch. This also limited the maximum
number of messages processed in parallel to 10 because that is the largest request
size allowed by SQS.

The refactored input uses ephemeral goroutines as workers to process SQS messages. It
receives as many SQS messages as there are free workers. The total number of workers
is controlled by `max_number_of_messages` (same as before but without an upper limit).

Other changes

Prevent poison pill messages

When an S3 object processing error occurs the SQS message is returned to the after
the visibility timeout expires. This allows it to be reprocessed again or moved to
the SQS dead letter queue (if configured). But if no dead letter queue policy is
configured and the error is permanent (reprocessing won't fix it) then the message
would continuosly be reprocessed. On error the input will now check the
`ApproximateReceiveCount` attribute of the SQS message and delete it if it exceeds
the configured maximum retries.

Removal of api_timeout from S3 GetObject calls

The `api_timeout` has been removed from the S3 `GetObject` call. This limited the
maximum amount of time for processing the object since the response body is processed
as a stream while the request is open. Requests can still timeout in the server due
to inactivity.

Improved debug logs

The log messages have been enriched with more data about the related SQS message and
S3 object. For example the SQS `message_id`, `s3_bucket`, and `s3_object` are
included in some messages.

`DEBUG   [aws-s3.sqs_s3_event]   awss3/s3.go:127 End S3 object processing.       {"id": "test_id", "queue_url": "https://sqs.us-east-1.amazonaws.com/144492464627/filebeat-s3-integtest-lxlmx6", "message_id": "a11de9f9-0a68-4c4e-a09d-979b87602958", "s3_bucket": "filebeat-s3-integtest-lxlmx6", "s3_object": "events-array.json", "elapsed_time_ns": 23262327}`

Increased test coverage

The refactored input has about 88% test coverage.

The specific AWS API methods used by the input were turned into interfaces to allow
for easier testing. The unit tests mock the AWS interfaces.

The parts of the input were separted into three components listed below. There's a defined
interface for each to allow for mock testing there too. To test the interactions between
these components go-mock is used to generate mocks and then assert the expectations.

1. The SQS receiver. (sqs.go)
2. The S3 Notification Event handler. (sqs_s3_event.go)
3. The S3 Object reader. (s3.go)

Terraform setup for integration test

Setup for executing the integration tests is now handled by Terraform.
See _meta/terraform/README.md for instructions.

Benchmark test

I added a benchmark that tests the input in isolation with mocked SQS and S3 responeses.
It uses a 7KB cloudtrail json.gz file containing about 60 messages for its input.
This removes any variability related to the network, but also means these do not reflect
real-work rates. They can be used to measure the effect of future changes.

+-------------------+--------------------+------------------+--------------------+------+
| MAX MSGS INFLIGHT |   EVENTS PER SEC   | S3 BYTES PER SEC |     TIME (SEC)     | CPUS |
+-------------------+--------------------+------------------+--------------------+------+
|                 1 | 23019.782175720782 | 3.0 MB           |        1.257266458 |   12 |
|                 2 |  36237.53174269319 | 4.8 MB           |        1.158798571 |   12 |
|                 4 |  56456.84532752983 | 7.5 MB           |        1.138285351 |   12 |
|                 8 |  90485.15755430676 | 12 MB            |        1.117244007 |   12 |
|                16 |  103853.8984324643 | 14 MB            |        1.165541225 |   12 |
|                32 | 110380.28141417276 | 15 MB            |        1.110814345 |   12 |
|                64 | 116074.13735061679 | 15 MB            |        1.408100062 |   12 |
|               128 | 114854.80273666105 | 15 MB            | 1.5331357140000001 |   12 |
|               256 | 118767.73924992209 | 16 MB            |        2.041783413 |   12 |
|               512 |  122933.1033660647 | 16 MB            |        1.255463303 |   12 |
|              1024 | 124222.51861746894 | 17 MB            |        1.505765638 |   12 |
+-------------------+--------------------+------------------+--------------------+------+

Relates #25750

* Use InitializeAWSConfig

* Add s3Lister interface for mocking pagination of S3 ListObjects calls

* Add new config parameters to reference.yml

* Optimize uploading b/c it was slow in aws v2 sdk
@andrewkroh andrewkroh force-pushed the mergify/bp/7.x/pr-27199 branch from 6b41f01 to 39b0e75 Compare August 13, 2021 00:56
@andrewkroh andrewkroh merged commit caaedae into 7.x Aug 13, 2021
@mergify mergify bot deleted the mergify/bp/7.x/pr-27199 branch August 13, 2021 10:36
@mergify
Copy link
Contributor Author

mergify bot commented Nov 11, 2021

This pull request is now in conflicts. Could you fix it? 🙏
To fixup this pull request, you can check out it locally. See documentation: https://help.github.com/articles/checking-out-pull-requests-locally/

git fetch upstream
git checkout -b mergify/bp/7.x/pr-27199 upstream/mergify/bp/7.x/pr-27199
git merge upstream/7.x
git push upstream mergify/bp/7.x/pr-27199

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
backport conflicts There is a conflict in the backported pull request Team:Integrations Label for the Integrations team
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants