Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Filebeat] Refactor AWS S3 input with workers #27199

Merged
merged 10 commits into from
Aug 12, 2021

Conversation

andrewkroh
Copy link
Member

@andrewkroh andrewkroh commented Aug 2, 2021

What does this PR do?

sqs-workers

This changes the AWS S3 input to allow it to process more SQS messages in parallel
by having workers that are fully utilized while there are SQS message to process.

The previous design processed SQS messages in batches ranging from 1 to 10 in size.
It waited until all messages were processed before requesting more. This left some
workers idle toward the end of processing the batch (as seen in the monitoring metrics
below). This also limited the maximum number of messages processed in parallel to
10 because that is the largest request size allowed by SQS.

Screen Shot 2021-07-29 at 10 31 57 AM

The refactored input uses ephemeral goroutines as workers to process SQS messages. It
receives as many SQS messages as there are free workers. The total number of workers
is controlled by max_number_of_messages (same as before but without an upper limit).

s3-worker

Other changes

Prevent poison pill messages

When an S3 object processing error occurs the SQS message is returned to the after
the visibility timeout expires. This allows it to be reprocessed again or moved to
the SQS dead letter queue (if configured). But if no dead letter queue policy is
configured and the error is permanent (reprocessing won't fix it) then the message
would continuosly be reprocessed. On error the input will now check the
ApproximateReceiveCount attribute of the SQS message and delete it if it exceeds
the configured maximum retries.

Removal of api_timeout from S3 GetObject calls

The api_timeout has been removed from the S3 GetObject call. This limited the
maximum amount of time for processing the object since the response body is processed
as a stream while the request is open. Requests can still timeout in the server due
to inactivity.

Improved debug logs

The log messages have been enriched with more data about the related SQS message and
S3 object. For example the SQS message_id, s3_bucket, and s3_object are
included in some messages.

DEBUG [aws-s3.sqs_s3_event] awss3/s3.go:127 End S3 object processing. {"id": "test_id", "queue_url": "https://sqs.us-east-1.amazonaws.com/144492464627/filebeat-s3-integtest-lxlmx6", "message_id": "a11de9f9-0a68-4c4e-a09d-979b87602958", "s3_bucket": "filebeat-s3-integtest-lxlmx6", "s3_object": "events-array.json", "elapsed_time_ns": 23262327}

Increased test coverage

The refactored input has about 88% test coverage.

The specific AWS API methods used by the input were turned into interfaces to allow
for easier testing. The unit tests mock the AWS interfaces.

The parts of the input were separted into three components listed below. There's a defined
interface for each to allow for mock testing there too. To test the interactions between
these components go-mock is used to generate mocks and then assert the expectations.

  1. The SQS receiver. (sqs.go)
  2. The S3 Notification Event handler. (sqs_s3_event.go)
  3. The S3 Object reader. (s3.go)

Terraform setup for integration test

Setup for executing the integration tests is now handled by Terraform.
See _meta/terraform/README.md for instructions.

Benchmark test

I added a benchmark that tests the input in isolation with mocked SQS and S3 responeses.
It uses a 7KB cloudtrail json.gz file containing about 60 messages for its input.
This removes any variability related to the network, but also means these do not reflect
real-work rates. They can be used to measure the effect of future changes.

+-------------------+--------------------+------------------+--------------------+------+
| MAX MSGS INFLIGHT |   EVENTS PER SEC   | S3 BYTES PER SEC |     TIME (SEC)     | CPUS |
+-------------------+--------------------+------------------+--------------------+------+
|                 1 | 36380.253532518276 | 1.2 MB           |        1.243366816 |   12 |
|                 2 | 61727.549671738896 | 2.1 MB           | 1.0951187980000001 |   12 |
|                 4 |  86218.70431874547 | 3.0 MB           |        1.208577661 |   12 |
|                 8 | 131900.69854257331 | 4.5 MB           |        1.179751144 |   12 |
|                16 |   151824.404438336 | 5.2 MB           |        1.083857372 |   12 |
|                32 | 155548.56015625654 | 5.3 MB           |        1.170502638 |   12 |
|                64 | 166188.27838709904 | 5.7 MB           |         1.17403587 |   12 |
|               128 | 185429.33410590928 | 6.4 MB           |        3.380193339 |   12 |
|               256 | 186181.15705271234 | 6.4 MB           |         1.66313823 |   12 |
|               512 |  197793.9906993712 | 7.3 MB           |        1.230735065 |   12 |
|              1024 | 211492.91373843007 | 7.4 MB           |        1.246637513 |   12 |
+-------------------+--------------------+------------------+--------------------+------+

Relates #25750

Why is it important?

This enabled easier vertical scaling of Filebeat aws-s3 input and helps it better utilize the CPU that's available.

Checklist

  • My code follows the style guidelines of this project
  • I have commented my code, particularly in hard-to-understand areas
  • I have made corresponding changes to the documentation
  • I have made corresponding change to the default configuration files
  • I have added tests that prove my fix is effective or that my feature works
  • I have added an entry in CHANGELOG.next.asciidoc or CHANGELOG-developer.next.asciidoc.

Author's Checklist

  • Update documentation
  • Make max receive message retries configurable.
  • Streaming JSON parser code needs cleaned up
  • Check for any recent changes since May that were not merged into refactoring
  • Changelog

Related issues

@botelastic botelastic bot added the needs_team Indicates that the issue/PR needs a Team:* label label Aug 2, 2021
This changes the AWS S3 input to allow it to process more SQS messages in parallel
by having workers that are fully utilized while there are SQS message to process.

The previous design processed SQS messages in batches ranging from 1 to 10 in size.
It waited until all messages were processed before requesting more. This left some
workers idle toward the the end of processing the batch. This also limited the maximum
number of messages processed in parallel to 10 because that is the largest request
size allowed by SQS.

The refactored input uses ephemeral goroutines as workers to process SQS messages. It
receives as many SQS messages as there are free workers. The total number of workers
is controlled by `max_number_of_messages` (same as before but without an upper limit).

Other changes

Prevent poison pill messages

When an S3 object processing error occurs the SQS message is returned to the after
the visibility timeout expires. This allows it to be reprocessed again or moved to
the SQS dead letter queue (if configured). But if no dead letter queue policy is
configured and the error is permanent (reprocessing won't fix it) then the message
would continuosly be reprocessed. On error the input will now check the
`ApproximateReceiveCount` attribute of the SQS message and delete it if it exceeds
the configured maximum retries.

Removal of api_timeout from S3 GetObject calls

The `api_timeout` has been removed from the S3 `GetObject` call. This limited the
maximum amount of time for processing the object since the response body is processed
as a stream while the request is open. Requests can still timeout in the server due
to inactivity.

Improved debug logs

The log messages have been enriched with more data about the related SQS message and
S3 object. For example the SQS `message_id`, `s3_bucket`, and `s3_object` are
included in some messages.

`DEBUG   [aws-s3.sqs_s3_event]   awss3/s3.go:127 End S3 object processing.       {"id": "test_id", "queue_url": "https://sqs.us-east-1.amazonaws.com/144492464627/filebeat-s3-integtest-lxlmx6", "message_id": "a11de9f9-0a68-4c4e-a09d-979b87602958", "s3_bucket": "filebeat-s3-integtest-lxlmx6", "s3_object": "events-array.json", "elapsed_time_ns": 23262327}`

Increased test coverage

The refactored input has about 88% test coverage.

The specific AWS API methods used by the input were turned into interfaces to allow
for easier testing. The unit tests mock the AWS interfaces.

The parts of the input were separted into three components listed below. There's a defined
interface for each to allow for mock testing there too. To test the interactions between
these components go-mock is used to generate mocks and then assert the expectations.

1. The SQS receiver. (sqs.go)
2. The S3 Notification Event handler. (sqs_s3_event.go)
3. The S3 Object reader. (s3.go)

Terraform setup for integration test

Setup for executing the integration tests is now handled by Terraform.
See _meta/terraform/README.md for instructions.

Benchmark test

I added a benchmark that tests the input in isolation with mocked SQS and S3 responeses.
It uses a 7KB cloudtrail json.gz file containing about 60 messages for its input.
This removes any variability related to the network, but also means these do not reflect
real-work rates. They can be used to measure the effect of future changes.

+-------------------+--------------------+------------------+--------------------+------+
| MAX MSGS INFLIGHT |   EVENTS PER SEC   | S3 BYTES PER SEC |     TIME (SEC)     | CPUS |
+-------------------+--------------------+------------------+--------------------+------+
|                 1 | 23019.782175720782 | 3.0 MB           |        1.257266458 |   12 |
|                 2 |  36237.53174269319 | 4.8 MB           |        1.158798571 |   12 |
|                 4 |  56456.84532752983 | 7.5 MB           |        1.138285351 |   12 |
|                 8 |  90485.15755430676 | 12 MB            |        1.117244007 |   12 |
|                16 |  103853.8984324643 | 14 MB            |        1.165541225 |   12 |
|                32 | 110380.28141417276 | 15 MB            |        1.110814345 |   12 |
|                64 | 116074.13735061679 | 15 MB            |        1.408100062 |   12 |
|               128 | 114854.80273666105 | 15 MB            | 1.5331357140000001 |   12 |
|               256 | 118767.73924992209 | 16 MB            |        2.041783413 |   12 |
|               512 |  122933.1033660647 | 16 MB            |        1.255463303 |   12 |
|              1024 | 124222.51861746894 | 17 MB            |        1.505765638 |   12 |
+-------------------+--------------------+------------------+--------------------+------+

Relates elastic#25750

Docs update
@andrewkroh andrewkroh force-pushed the feature/fb/aws-s3-refactor branch from 2776dfd to d9e2eb3 Compare August 2, 2021 19:11
@botelastic botelastic bot removed the needs_team Indicates that the issue/PR needs a Team:* label label Aug 2, 2021
@andrewkroh andrewkroh marked this pull request as ready for review August 2, 2021 19:17
@elasticmachine
Copy link
Collaborator

Pinging @elastic/integrations (Team:Integrations)

@elasticmachine
Copy link
Collaborator

Pinging @elastic/security-external-integrations (Team:Security-External Integrations)

@elasticmachine
Copy link
Collaborator

elasticmachine commented Aug 2, 2021

💚 Build Succeeded

the below badges are clickable and redirect to their specific view in the CI or DOCS
Pipeline View Test View Changes Artifacts preview preview

Expand to view the summary

Build stats

  • Start Time: 2021-08-11T20:10:48.769+0000

  • Duration: 218 min 16 sec

  • Commit: d246c04

Test stats 🧪

Test Results
Failed 0
Passed 53000
Skipped 5318
Total 58318

Trends 🧪

Image of Build Times

Image of Tests

💚 Flaky test report

Tests succeeded.

Expand to view the summary

Test stats 🧪

Test Results
Failed 0
Passed 53000
Skipped 5318
Total 58318

@kaiyan-sheng kaiyan-sheng requested a review from aspacca August 2, 2021 20:55
@aspacca
Copy link

aspacca commented Aug 4, 2021

LGTM

Comment on lines +34 to +35
cd x-pack/filebeat/inputs/awss3
go test -tags aws,integration -run TestInputRun -v .
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@kaiyan-sheng would this work with our existing AWS ci integration? that would be great.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we have the aws tests run weekly right now in the CI and this would work. @v1v Do you know if this is easy to add?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The AWS tests run on a weekly basis, as you said, and it runs the commands defined in:

  • cloud:
    cloud: "mage build test"
    withModule: true ## run the ITs only if the changeset affects a specific module.
    dirs: ## run the cloud tests for the given modules.
    - "x-pack/metricbeat/module/aws"
    when: ## Override the top-level when.
    parameters:
    - "awsCloudTests"
    comments:
    - "/test x-pack/metricbeat for aws cloud"
    labels:
    - "aws"
    stage: extended

mage build test runs in the folder x-pack/metricbeat so, if the above-mentioned command is part of the mage then it should work.

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the test requires applying a terraform file (and destroying it at the end):
https://github.com/elastic/beats/blob/master/x-pack/filebeat/input/awss3/_meta/terraform/README.md#usage

will this happen?

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Comment on lines +236 to +237
acker := newEventACKTracker(ctx)
defer acker.Wait()
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

actually one doubt here: do we really want to wait for all the events to be acked for the worker to shutdown?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes. S3 event notifications for ObjectCreated sometimes contain information for more than object. This function returns after all the events generated from each of those objects (usually it's just 1 object) has been ACKed. Then the caller of this function can stop updating the SQS visibility timeout for this one SQS message and decide based on whether an error occurred while processing the S3 object if it should delete the SQS message or try to process it again.

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, I understand the reason for waiting in the PR, but I was wondering if we should wait for ACK for actually handling the SQS message.
The current implementation doesn't have this: in theory ACK could be blocked for a certain amount of time (network issue reaching elasticsearch, for example), filebeat queue should be able to later catch up. While here we will block starting new workers if I got it right.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The reason for waiting until all ACKs are received before deleting the SQS message (and freeing the worker) is that the queued events are non-persistent by default. If we delete the SQS message before receiving all ACKs then Filebeat is vulnerable to data loss if the process is killed before the queue is emptied (events are ACKed).

By waiting until all ACKs are received we ensure that another instance of Filebeat will process the same message again if this process is killed before all events are ACKed.

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree we should not delete the SQS message before receiving all ACKs, but by making the worker wait for the ACKs we prevent it from filling the Filebeat queue in cases of backpressure. If the ACK wait / keepalive / delete message handling was done somewhere else (in a goroutine) that would free the worker for keep processing other SQS messages and filling the queue.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We could write it that way.

The current implementation is derived from having a single tuning knob of max_number_of_messages that controls the number of inflight messages. If we change the model then we'd add another option that controls the number of workers independently of the max_number_of_messages.

I'm hesitant to change it now without doing some testing to see if that is actually a problem. I think that if you size the max_number_of_messages parameter appropriately based on the files you're processing and your queue.mem.events size then you should be able to keep the internal memory queue full in this model without much of a problem.


ctrl, ctx := gomock.WithContext(ctx, t)
defer ctrl.Finish()
mockS3Pager := newMockS3Pager(ctrl, 1, fakeObjects)
Copy link
Member Author

@andrewkroh andrewkroh Aug 5, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@aspacca I updated the S3 interface to account for your need to call ListObjects. I included a helper to mock the S3 pagination calls for your tests.

Copy link
Contributor

@leehinman leehinman left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM. Just a question about the acker wait function.

x-pack/filebeat/input/awss3/acker.go Show resolved Hide resolved
@aspacca aspacca requested a review from faec August 9, 2021 15:06
@aspacca
Copy link

aspacca commented Aug 11, 2021

/test

@andrewkroh andrewkroh requested a review from a team August 11, 2021 14:43
@aspacca
Copy link

aspacca commented Aug 11, 2021

LGTM

please, @andrewkroh , could you wait for #27126 to be merged and adapt any needed changes in your PR?

E2E tests failure should be fixed if you merge latest master

@andrewkroh
Copy link
Member Author

Sure, I'll wait for that and then fix any merge conflicts.

Copy link
Contributor

@kaiyan-sheng kaiyan-sheng left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you @andrewkroh for this enhancement. Should filebeat.reference.yml also get updated in the aws-s3 input section? Other than that, everything looks good.

@andrewkroh
Copy link
Member Author

I'll add the two new settings to filebeat.reference.yml when I do the merge for #27126.

sqs.max_receive_count

sqs.wait_time

@elastic elastic deleted a comment from mergify bot Aug 11, 2021
@elastic elastic deleted a comment from mergify bot Aug 11, 2021
@andrewkroh andrewkroh merged commit 7c76158 into elastic:master Aug 12, 2021
@aspacca aspacca added the backport-v7.15.0 Automated backport with mergify label Aug 12, 2021
mergify bot pushed a commit that referenced this pull request Aug 12, 2021
* Refactor AWS S3 input with workers

This changes the AWS S3 input to allow it to process more SQS messages in parallel
by having workers that are fully utilized while there are SQS message to process.

The previous design processed SQS messages in batches ranging from 1 to 10 in size.
It waited until all messages were processed before requesting more. This left some
workers idle toward the the end of processing the batch. This also limited the maximum
number of messages processed in parallel to 10 because that is the largest request
size allowed by SQS.

The refactored input uses ephemeral goroutines as workers to process SQS messages. It
receives as many SQS messages as there are free workers. The total number of workers
is controlled by `max_number_of_messages` (same as before but without an upper limit).

Other changes

Prevent poison pill messages

When an S3 object processing error occurs the SQS message is returned to the after
the visibility timeout expires. This allows it to be reprocessed again or moved to
the SQS dead letter queue (if configured). But if no dead letter queue policy is
configured and the error is permanent (reprocessing won't fix it) then the message
would continuosly be reprocessed. On error the input will now check the
`ApproximateReceiveCount` attribute of the SQS message and delete it if it exceeds
the configured maximum retries.

Removal of api_timeout from S3 GetObject calls

The `api_timeout` has been removed from the S3 `GetObject` call. This limited the
maximum amount of time for processing the object since the response body is processed
as a stream while the request is open. Requests can still timeout in the server due
to inactivity.

Improved debug logs

The log messages have been enriched with more data about the related SQS message and
S3 object. For example the SQS `message_id`, `s3_bucket`, and `s3_object` are
included in some messages.

`DEBUG   [aws-s3.sqs_s3_event]   awss3/s3.go:127 End S3 object processing.       {"id": "test_id", "queue_url": "https://sqs.us-east-1.amazonaws.com/144492464627/filebeat-s3-integtest-lxlmx6", "message_id": "a11de9f9-0a68-4c4e-a09d-979b87602958", "s3_bucket": "filebeat-s3-integtest-lxlmx6", "s3_object": "events-array.json", "elapsed_time_ns": 23262327}`

Increased test coverage

The refactored input has about 88% test coverage.

The specific AWS API methods used by the input were turned into interfaces to allow
for easier testing. The unit tests mock the AWS interfaces.

The parts of the input were separted into three components listed below. There's a defined
interface for each to allow for mock testing there too. To test the interactions between
these components go-mock is used to generate mocks and then assert the expectations.

1. The SQS receiver. (sqs.go)
2. The S3 Notification Event handler. (sqs_s3_event.go)
3. The S3 Object reader. (s3.go)

Terraform setup for integration test

Setup for executing the integration tests is now handled by Terraform.
See _meta/terraform/README.md for instructions.

Benchmark test

I added a benchmark that tests the input in isolation with mocked SQS and S3 responeses.
It uses a 7KB cloudtrail json.gz file containing about 60 messages for its input.
This removes any variability related to the network, but also means these do not reflect
real-work rates. They can be used to measure the effect of future changes.

+-------------------+--------------------+------------------+--------------------+------+
| MAX MSGS INFLIGHT |   EVENTS PER SEC   | S3 BYTES PER SEC |     TIME (SEC)     | CPUS |
+-------------------+--------------------+------------------+--------------------+------+
|                 1 | 23019.782175720782 | 3.0 MB           |        1.257266458 |   12 |
|                 2 |  36237.53174269319 | 4.8 MB           |        1.158798571 |   12 |
|                 4 |  56456.84532752983 | 7.5 MB           |        1.138285351 |   12 |
|                 8 |  90485.15755430676 | 12 MB            |        1.117244007 |   12 |
|                16 |  103853.8984324643 | 14 MB            |        1.165541225 |   12 |
|                32 | 110380.28141417276 | 15 MB            |        1.110814345 |   12 |
|                64 | 116074.13735061679 | 15 MB            |        1.408100062 |   12 |
|               128 | 114854.80273666105 | 15 MB            | 1.5331357140000001 |   12 |
|               256 | 118767.73924992209 | 16 MB            |        2.041783413 |   12 |
|               512 |  122933.1033660647 | 16 MB            |        1.255463303 |   12 |
|              1024 | 124222.51861746894 | 17 MB            |        1.505765638 |   12 |
+-------------------+--------------------+------------------+--------------------+------+

Relates #25750

* Use InitializeAWSConfig

* Add s3Lister interface for mocking pagination of S3 ListObjects calls

* Add new config parameters to reference.yml

* Optimize uploading b/c it was slow in aws v2 sdk

(cherry picked from commit 7c76158)

# Conflicts:
#	x-pack/filebeat/input/awss3/collector.go
#	x-pack/filebeat/input/awss3/collector_test.go
andrewkroh added a commit that referenced this pull request Aug 13, 2021
* Refactor AWS S3 input with workers

This changes the AWS S3 input to allow it to process more SQS messages in parallel
by having workers that are fully utilized while there are SQS message to process.

The previous design processed SQS messages in batches ranging from 1 to 10 in size.
It waited until all messages were processed before requesting more. This left some
workers idle toward the the end of processing the batch. This also limited the maximum
number of messages processed in parallel to 10 because that is the largest request
size allowed by SQS.

The refactored input uses ephemeral goroutines as workers to process SQS messages. It
receives as many SQS messages as there are free workers. The total number of workers
is controlled by `max_number_of_messages` (same as before but without an upper limit).

Other changes

Prevent poison pill messages

When an S3 object processing error occurs the SQS message is returned to the after
the visibility timeout expires. This allows it to be reprocessed again or moved to
the SQS dead letter queue (if configured). But if no dead letter queue policy is
configured and the error is permanent (reprocessing won't fix it) then the message
would continuosly be reprocessed. On error the input will now check the
`ApproximateReceiveCount` attribute of the SQS message and delete it if it exceeds
the configured maximum retries.

Removal of api_timeout from S3 GetObject calls

The `api_timeout` has been removed from the S3 `GetObject` call. This limited the
maximum amount of time for processing the object since the response body is processed
as a stream while the request is open. Requests can still timeout in the server due
to inactivity.

Improved debug logs

The log messages have been enriched with more data about the related SQS message and
S3 object. For example the SQS `message_id`, `s3_bucket`, and `s3_object` are
included in some messages.

`DEBUG   [aws-s3.sqs_s3_event]   awss3/s3.go:127 End S3 object processing.       {"id": "test_id", "queue_url": "https://sqs.us-east-1.amazonaws.com/144492464627/filebeat-s3-integtest-lxlmx6", "message_id": "a11de9f9-0a68-4c4e-a09d-979b87602958", "s3_bucket": "filebeat-s3-integtest-lxlmx6", "s3_object": "events-array.json", "elapsed_time_ns": 23262327}`

Increased test coverage

The refactored input has about 88% test coverage.

The specific AWS API methods used by the input were turned into interfaces to allow
for easier testing. The unit tests mock the AWS interfaces.

The parts of the input were separted into three components listed below. There's a defined
interface for each to allow for mock testing there too. To test the interactions between
these components go-mock is used to generate mocks and then assert the expectations.

1. The SQS receiver. (sqs.go)
2. The S3 Notification Event handler. (sqs_s3_event.go)
3. The S3 Object reader. (s3.go)

Terraform setup for integration test

Setup for executing the integration tests is now handled by Terraform.
See _meta/terraform/README.md for instructions.

Benchmark test

I added a benchmark that tests the input in isolation with mocked SQS and S3 responeses.
It uses a 7KB cloudtrail json.gz file containing about 60 messages for its input.
This removes any variability related to the network, but also means these do not reflect
real-work rates. They can be used to measure the effect of future changes.

+-------------------+--------------------+------------------+--------------------+------+
| MAX MSGS INFLIGHT |   EVENTS PER SEC   | S3 BYTES PER SEC |     TIME (SEC)     | CPUS |
+-------------------+--------------------+------------------+--------------------+------+
|                 1 | 23019.782175720782 | 3.0 MB           |        1.257266458 |   12 |
|                 2 |  36237.53174269319 | 4.8 MB           |        1.158798571 |   12 |
|                 4 |  56456.84532752983 | 7.5 MB           |        1.138285351 |   12 |
|                 8 |  90485.15755430676 | 12 MB            |        1.117244007 |   12 |
|                16 |  103853.8984324643 | 14 MB            |        1.165541225 |   12 |
|                32 | 110380.28141417276 | 15 MB            |        1.110814345 |   12 |
|                64 | 116074.13735061679 | 15 MB            |        1.408100062 |   12 |
|               128 | 114854.80273666105 | 15 MB            | 1.5331357140000001 |   12 |
|               256 | 118767.73924992209 | 16 MB            |        2.041783413 |   12 |
|               512 |  122933.1033660647 | 16 MB            |        1.255463303 |   12 |
|              1024 | 124222.51861746894 | 17 MB            |        1.505765638 |   12 |
+-------------------+--------------------+------------------+--------------------+------+

Relates #25750

* Use InitializeAWSConfig

* Add s3Lister interface for mocking pagination of S3 ListObjects calls

* Add new config parameters to reference.yml

* Optimize uploading b/c it was slow in aws v2 sdk
andrewkroh added a commit that referenced this pull request Aug 13, 2021
* Refactor AWS S3 input with workers

This changes the AWS S3 input to allow it to process more SQS messages in parallel
by having workers that are fully utilized while there are SQS message to process.

The previous design processed SQS messages in batches ranging from 1 to 10 in size.
It waited until all messages were processed before requesting more. This left some
workers idle toward the the end of processing the batch. This also limited the maximum
number of messages processed in parallel to 10 because that is the largest request
size allowed by SQS.

The refactored input uses ephemeral goroutines as workers to process SQS messages. It
receives as many SQS messages as there are free workers. The total number of workers
is controlled by `max_number_of_messages` (same as before but without an upper limit).

Other changes

Prevent poison pill messages

When an S3 object processing error occurs the SQS message is returned to the after
the visibility timeout expires. This allows it to be reprocessed again or moved to
the SQS dead letter queue (if configured). But if no dead letter queue policy is
configured and the error is permanent (reprocessing won't fix it) then the message
would continuosly be reprocessed. On error the input will now check the
`ApproximateReceiveCount` attribute of the SQS message and delete it if it exceeds
the configured maximum retries.

Removal of api_timeout from S3 GetObject calls

The `api_timeout` has been removed from the S3 `GetObject` call. This limited the
maximum amount of time for processing the object since the response body is processed
as a stream while the request is open. Requests can still timeout in the server due
to inactivity.

Improved debug logs

The log messages have been enriched with more data about the related SQS message and
S3 object. For example the SQS `message_id`, `s3_bucket`, and `s3_object` are
included in some messages.

`DEBUG   [aws-s3.sqs_s3_event]   awss3/s3.go:127 End S3 object processing.       {"id": "test_id", "queue_url": "https://sqs.us-east-1.amazonaws.com/144492464627/filebeat-s3-integtest-lxlmx6", "message_id": "a11de9f9-0a68-4c4e-a09d-979b87602958", "s3_bucket": "filebeat-s3-integtest-lxlmx6", "s3_object": "events-array.json", "elapsed_time_ns": 23262327}`

Increased test coverage

The refactored input has about 88% test coverage.

The specific AWS API methods used by the input were turned into interfaces to allow
for easier testing. The unit tests mock the AWS interfaces.

The parts of the input were separted into three components listed below. There's a defined
interface for each to allow for mock testing there too. To test the interactions between
these components go-mock is used to generate mocks and then assert the expectations.

1. The SQS receiver. (sqs.go)
2. The S3 Notification Event handler. (sqs_s3_event.go)
3. The S3 Object reader. (s3.go)

Terraform setup for integration test

Setup for executing the integration tests is now handled by Terraform.
See _meta/terraform/README.md for instructions.

Benchmark test

I added a benchmark that tests the input in isolation with mocked SQS and S3 responeses.
It uses a 7KB cloudtrail json.gz file containing about 60 messages for its input.
This removes any variability related to the network, but also means these do not reflect
real-work rates. They can be used to measure the effect of future changes.

+-------------------+--------------------+------------------+--------------------+------+
| MAX MSGS INFLIGHT |   EVENTS PER SEC   | S3 BYTES PER SEC |     TIME (SEC)     | CPUS |
+-------------------+--------------------+------------------+--------------------+------+
|                 1 | 23019.782175720782 | 3.0 MB           |        1.257266458 |   12 |
|                 2 |  36237.53174269319 | 4.8 MB           |        1.158798571 |   12 |
|                 4 |  56456.84532752983 | 7.5 MB           |        1.138285351 |   12 |
|                 8 |  90485.15755430676 | 12 MB            |        1.117244007 |   12 |
|                16 |  103853.8984324643 | 14 MB            |        1.165541225 |   12 |
|                32 | 110380.28141417276 | 15 MB            |        1.110814345 |   12 |
|                64 | 116074.13735061679 | 15 MB            |        1.408100062 |   12 |
|               128 | 114854.80273666105 | 15 MB            | 1.5331357140000001 |   12 |
|               256 | 118767.73924992209 | 16 MB            |        2.041783413 |   12 |
|               512 |  122933.1033660647 | 16 MB            |        1.255463303 |   12 |
|              1024 | 124222.51861746894 | 17 MB            |        1.505765638 |   12 |
+-------------------+--------------------+------------------+--------------------+------+

Relates #25750

* Use InitializeAWSConfig

* Add s3Lister interface for mocking pagination of S3 ListObjects calls

* Add new config parameters to reference.yml

* Optimize uploading b/c it was slow in aws v2 sdk

Co-authored-by: Andrew Kroh <andrew.kroh@elastic.co>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
backport-v7.15.0 Automated backport with mergify enhancement Filebeat Filebeat Team:Integrations Label for the Integrations team
Projects
None yet
Development

Successfully merging this pull request may close these issues.

8 participants