Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Filebeat] aws-s3 input visibility timeout extensions lead to deadlocked ACKHandlers #25750

Closed
andrewkroh opened this issue May 18, 2021 · 3 comments
Assignees
Labels
bug Filebeat Filebeat Team:Integrations Label for the Integrations team

Comments

@andrewkroh
Copy link
Member

The error channel used with the aws-s3 input in not properly handled leading to deadlocked channel sends. A goroutine is started to receive from the unbuffered error channel, but this goroutine can stop while the error channel is still in use. When this happens the next sender gets blocked because the receiver has already exited. The receiver is processKeepAlive and the sender is an ACKHandler.

Eventually you will reach a state where the queue.mem is full, but no events are being removed since the ACKHandlers are blocked. These deadlocked ACKHandlers are unrecoverable. With the output down, in the metrics slowly active will increase, but acked remains at 12.

    "pipeline": {
      "clients": 1,
      "events": {
        "active": 22,
        "dropped": 0,
        "failed": 0,
        "filtered": 0,
        "published": 34,
        "retry": 144,
        "total": 34
      },
      "queue": {
        "acked": 12,
        "max_events": 128
      }
    }

Here's a breakdown of a goroutine dump.

processMessage is blocked on publishing.

goroutine 758 [select, 48 minutes]:
github.com/elastic/beats/v7/libbeat/publisher/queue/memqueue.(*openState).publish(0xc00060c528, 0xf6f2430, 0xed834f0c1, 0x0, 0xc0005edc80, 0xc0005edcb0, 0x10cb57680, 0xc00059a4e0, 0x0, 0x1, ...)
	/Users/akroh/go/src/github.com/elastic/beats/libbeat/publisher/queue/memqueue/produce.go:133 +0xe5
github.com/elastic/beats/v7/libbeat/publisher/queue/memqueue.(*ackProducer).Publish(0xc00060c500, 0xf6f2430, 0xed834f0c1, 0x0, 0xc0005edc80, 0xc0005edcb0, 0x10cb57680, 0xc00059a4e0, 0x0, 0x1, ...)
	/Users/akroh/go/src/github.com/elastic/beats/libbeat/publisher/queue/memqueue/produce.go:88 +0x125
github.com/elastic/beats/v7/libbeat/publisher/pipeline.(*client).publish(0xc000a12480, 0xf6f2430, 0xed834f0c1, 0x0, 0xc0005edc80, 0xc0005ed9e0, 0x10cb57680, 0xc00059a4e0, 0x0)
	/Users/akroh/go/src/github.com/elastic/beats/libbeat/publisher/pipeline/client.go:134 +0x3bd
github.com/elastic/beats/v7/libbeat/publisher/pipeline.(*client).Publish(0xc000a12480, 0xf6f2430, 0xed834f0c1, 0x0, 0xc0005edc80, 0xc0005ed9e0, 0x10cb57680, 0xc00059a4e0, 0x0)
	/Users/akroh/go/src/github.com/elastic/beats/libbeat/publisher/pipeline/client.go:80 +0x9d
github.com/elastic/beats/v7/filebeat/beater.(*countingClient).Publish(0xc0004a0f20, 0xf6f2430, 0xed834f0c1, 0x0, 0xc0005edc80, 0xc0005ed9e0, 0x10cb57680, 0xc00059a4e0, 0x0)
	/Users/akroh/go/src/github.com/elastic/beats/filebeat/beater/channels.go:136 +0x7c
github.com/elastic/beats/v7/x-pack/filebeat/input/awss3.(*s3Collector).forwardEvent(0xc000a2e000, 0xf6f2430, 0xed834f0c1, 0x0, 0xc0005edc80, 0xc0005ed9e0, 0x10cb57680, 0xc00059a4e0, 0x0, 0xc000cc1cc0, ...)
	/Users/akroh/go/src/github.com/elastic/beats/x-pack/filebeat/input/awss3/collector.go:590 +0x66
github.com/elastic/beats/v7/x-pack/filebeat/input/awss3.(*s3Collector).convertJSONToEvent(0xc000a2e000, 0x10caa5580, 0xc00059bb30, 0x13ef, 0xc000354200, 0xa, 0xc000b406a0, 0x8, 0xc00080aa20, 0x89, ...)
	/Users/akroh/go/src/github.com/elastic/beats/x-pack/filebeat/input/awss3/collector.go:581 +0x1f8
github.com/elastic/beats/v7/x-pack/filebeat/input/awss3.(*s3Collector).jsonFieldsType(0xc000a2e000, 0x10caa5580, 0xc00059b170, 0x1026, 0xc000354200, 0xa, 0xc000b406a0, 0x8, 0xc00080aa20, 0x89, ...)
	/Users/akroh/go/src/github.com/elastic/beats/x-pack/filebeat/input/awss3/collector.go:551 +0x191
github.com/elastic/beats/v7/x-pack/filebeat/input/awss3.(*s3Collector).decodeJSON(0xc000a2e000, 0xc000bd06e0, 0xc000354200, 0xa, 0xc000b406a0, 0x8, 0xc00080aa20, 0x89, 0xc000b40696, 0x9, ...)
	/Users/akroh/go/src/github.com/elastic/beats/x-pack/filebeat/input/awss3/collector.go:513 +0x14f
github.com/elastic/beats/v7/x-pack/filebeat/input/awss3.(*s3Collector).createEventsFromS3Info(0xc000a2e000, 0x10d5ab5e0, 0xc000a244f0, 0xc000b406a0, 0x8, 0xc00080aa20, 0x89, 0xc000b40696, 0x9, 0xc000cc1cc0, ...)
	/Users/akroh/go/src/github.com/elastic/beats/x-pack/filebeat/input/awss3/collector.go:436 +0xc88
github.com/elastic/beats/v7/x-pack/filebeat/input/awss3.(*s3Collector).handleS3Objects(0xc000a2e000, 0x10d5ab5e0, 0xc000a244f0, 0xc00017e880, 0x1, 0x1, 0xc000118ea0, 0x0, 0x0)
	/Users/akroh/go/src/github.com/elastic/beats/x-pack/filebeat/input/awss3/collector.go:363 +0x291
github.com/elastic/beats/v7/x-pack/filebeat/input/awss3.(*s3Collector).processMessage(0xc000a2e000, 0x10d5ab5e0, 0xc000a244f0, 0x0, 0xc0009c6450, 0xc0009c6480, 0x0, 0x0, 0xc0009c64b0, 0xc0009c64e0, ...)
	/Users/akroh/go/src/github.com/elastic/beats/x-pack/filebeat/input/awss3/collector.go:161 +0x2d4
github.com/elastic/beats/v7/x-pack/filebeat/input/awss3.(*s3Collector).processor.func1(0x0, 0xc00041e7c0)
	/Users/akroh/go/src/github.com/elastic/beats/x-pack/filebeat/input/awss3/collector.go:139 +0xdd
github.com/elastic/go-concert/unison.(*MultiErrGroup).Go.func1(0xc00059a3f0, 0xc00060dae0)
	/Users/akroh/go/pkg/mod/github.com/elastic/go-concert@v0.1.0/unison/multierrgroup.go:42 +0x64
created by github.com/elastic/go-concert/unison.(*MultiErrGroup).Go
	/Users/akroh/go/pkg/mod/github.com/elastic/go-concert@v0.1.0/unison/multierrgroup.go:40 +0x66

The SQS reader thread is blocked waiting on processMessage to finish.

goroutine 59 [semacquire, 48 minutes]:
sync.runtime_Semacquire(0xc00059a418)
	/Users/akroh/.gvm/versions/go1.15.7.darwin.amd64/src/runtime/sema.go:56 +0x45
sync.(*WaitGroup).Wait(0xc00059a410)
	/Users/akroh/.gvm/versions/go1.15.7.darwin.amd64/src/sync/waitgroup.go:130 +0x65
github.com/elastic/go-concert/unison.(*MultiErrGroup).Wait(0xc00059a3f0, 0x0, 0x0, 0x0)
	/Users/akroh/go/pkg/mod/github.com/elastic/go-concert@v0.1.0/unison/multierrgroup.go:54 +0x53
github.com/elastic/beats/v7/x-pack/filebeat/input/awss3.(*s3Collector).processor(0xc000a2e000, 0xc0004de230, 0x4a, 0xc0005f7a80, 0x1, 0x1, 0x78, 0x10d5ab5e0, 0xc000a244f0, 0x10d586ea0, ...)
	/Users/akroh/go/src/github.com/elastic/beats/x-pack/filebeat/input/awss3/collector.go:149 +0x47b
github.com/elastic/beats/v7/x-pack/filebeat/input/awss3.(*s3Collector).run(0xc000a2e000)
	/Users/akroh/go/src/github.com/elastic/beats/x-pack/filebeat/input/awss3/collector.go:123 +0x2a5
github.com/elastic/beats/v7/x-pack/filebeat/input/awss3.(*s3Input).Run(0xc000a16d80, 0xc000a24170, 0xc000a26260, 0x10, 0x10cfd27b1, 0x8, 0x10cfd27b1, 0x8, 0x10cfcb939, 0x5, ...)
	/Users/akroh/go/src/github.com/elastic/beats/x-pack/filebeat/input/awss3/input.go:72 +0x12b
github.com/elastic/beats/v7/filebeat/input/v2/compat.(*runner).Start.func1(0xc000a24170, 0x10cfcdb1e, 0x6, 0xc0002ce620)
	/Users/akroh/go/src/github.com/elastic/beats/filebeat/input/v2/compat/compat.go:112 +0x1e8
created by github.com/elastic/beats/v7/filebeat/input/v2/compat.(*runner).Start
	/Users/akroh/go/src/github.com/elastic/beats/filebeat/input/v2/compat/compat.go:110 +0x7d

ACKs are blocked.

goroutine 90 [chan send, 1 minutes]:
github.com/elastic/beats/v7/x-pack/filebeat/input/awss3.(*s3Context).done(0xc0009e2b40)
	/Users/akroh/go/src/github.com/elastic/beats/x-pack/filebeat/input/awss3/collector.go:683 +0xa5
github.com/elastic/beats/v7/x-pack/filebeat/input/awss3.newACKHandler.func1(0xe, 0xc000a8d000, 0xe, 0x100)
	/Users/akroh/go/src/github.com/elastic/beats/x-pack/filebeat/input/awss3/input.go:137 +0x67
github.com/elastic/beats/v7/libbeat/common/acker.(*eventDataACKer).onACK(0xc00041e480, 0xe, 0xe)
	/Users/akroh/go/src/github.com/elastic/beats/libbeat/common/acker/acker.go:257 +0xd8
github.com/elastic/beats/v7/libbeat/common/acker.(*trackingACKer).ACKEvents(0xc000a222a0, 0xe)
	/Users/akroh/go/src/github.com/elastic/beats/libbeat/common/acker/acker.go:206 +0x262
github.com/elastic/beats/v7/libbeat/common/acker.(*clientOnlyACKer).ACKEvents(0xc0004a0e00, 0xe)
	/Users/akroh/go/src/github.com/elastic/beats/libbeat/common/acker/acker.go:329 +0x73
github.com/elastic/beats/v7/libbeat/common/acker.ackerList.ACKEvents(0xc0004a0e40, 0x2, 0x2, 0xe)
	/Users/akroh/go/src/github.com/elastic/beats/libbeat/common/acker/acker.go:294 +0x64
github.com/elastic/beats/v7/libbeat/publisher/queue/memqueue.(*bufferingEventLoop).processACK(0xc000787720, 0xc0014d8180, 0xc0014d8180, 0xe)
	/Users/akroh/go/src/github.com/elastic/beats/libbeat/publisher/queue/memqueue/eventloop.go:548 +0x22d
github.com/elastic/beats/v7/libbeat/publisher/queue/memqueue.(*ackLoop).handleBatchSig(0xc0002bf1d0, 0xc0005f3eb8)
	/Users/akroh/go/src/github.com/elastic/beats/libbeat/publisher/queue/memqueue/ackloop.go:120 +0x9f
github.com/elastic/beats/v7/libbeat/publisher/queue/memqueue.(*ackLoop).run(0xc0002bf1d0)
	/Users/akroh/go/src/github.com/elastic/beats/libbeat/publisher/queue/memqueue/ackloop.go:83 +0x18f
github.com/elastic/beats/v7/libbeat/publisher/queue/memqueue.NewQueue.func2(0xc0002c9ab0, 0xc0002bf1d0)
	/Users/akroh/go/src/github.com/elastic/beats/libbeat/publisher/queue/memqueue/broker.go:180 +0x59
created by github.com/elastic/beats/v7/libbeat/publisher/queue/memqueue.NewQueue
	/Users/akroh/go/src/github.com/elastic/beats/libbeat/publisher/queue/memqueue/broker.go:178 +0x3fe

For confirmed bugs, please report:

  • Version: 7.12.1 (testing with 5f242e3)
  • Operating System: All
  • Steps to Reproduce: Block the output or slowdown processing enough for visibility_timeout/2 to pass while processing an SQS message (that's 2.5 minute with defaults).
@elasticmachine
Copy link
Collaborator

Pinging @elastic/integrations (Team:Integrations)

@elasticmachine
Copy link
Collaborator

Pinging @elastic/security-external-integrations (Team:Security-External Integrations)

@andrewkroh andrewkroh self-assigned this May 18, 2021
@kaiyan-sheng
Copy link
Contributor

Thank you for finding/fixing this issue!!!

andrewkroh added a commit to andrewkroh/beats that referenced this issue May 19, 2021
This is a small fix to stop the SQS keep-alive routing from returning when no error occurs while updating the visibility timeout.
I also made the error chan buffered (size 1) to allow the ACKHandler to unblock even if the channel receiver routine has exited.
There are still problems with the channel handling that need fixed, but IMO this should correct the most common cause.

Relates elastic#25750
andrewkroh added a commit that referenced this issue May 19, 2021
* Rename ftest to conventional testdata

* Add missing encoder reader error check

* Use time.Duration to represent durations in code

* Mitigate deadlock in aws-s3

This is a small fix to stop the SQS keep-alive routing from returning when no error occurs while updating the visibility timeout.
I also made the error chan buffered (size 1) to allow the ACKHandler to unblock even if the channel receiver routine has exited.
There are still problems with the channel handling that need fixed, but IMO this should correct the most common cause.

Relates #25750
mergify bot pushed a commit that referenced this issue May 19, 2021
* Rename ftest to conventional testdata

* Add missing encoder reader error check

* Use time.Duration to represent durations in code

* Mitigate deadlock in aws-s3

This is a small fix to stop the SQS keep-alive routing from returning when no error occurs while updating the visibility timeout.
I also made the error chan buffered (size 1) to allow the ACKHandler to unblock even if the channel receiver routine has exited.
There are still problems with the channel handling that need fixed, but IMO this should correct the most common cause.

Relates #25750

(cherry picked from commit f0432df)
andrewkroh added a commit that referenced this issue May 19, 2021
* Rename ftest to conventional testdata

* Add missing encoder reader error check

* Use time.Duration to represent durations in code

* Mitigate deadlock in aws-s3

This is a small fix to stop the SQS keep-alive routing from returning when no error occurs while updating the visibility timeout.
I also made the error chan buffered (size 1) to allow the ACKHandler to unblock even if the channel receiver routine has exited.
There are still problems with the channel handling that need fixed, but IMO this should correct the most common cause.

Relates #25750

(cherry picked from commit f0432df)

Co-authored-by: Andrew Kroh <andrew.kroh@elastic.co>
andrewkroh added a commit that referenced this issue May 24, 2021
* Rename ftest to conventional testdata

* Add missing encoder reader error check

* Use time.Duration to represent durations in code

* Mitigate deadlock in aws-s3

This is a small fix to stop the SQS keep-alive routing from returning when no error occurs while updating the visibility timeout.
I also made the error chan buffered (size 1) to allow the ACKHandler to unblock even if the channel receiver routine has exited.
There are still problems with the channel handling that need fixed, but IMO this should correct the most common cause.

Relates #25750

(cherry picked from commit f0432df)
andrewkroh added a commit that referenced this issue May 24, 2021
* Rename ftest to conventional testdata

* Add missing encoder reader error check

* Use time.Duration to represent durations in code

* Mitigate deadlock in aws-s3

This is a small fix to stop the SQS keep-alive routing from returning when no error occurs while updating the visibility timeout.
I also made the error chan buffered (size 1) to allow the ACKHandler to unblock even if the channel receiver routine has exited.
There are still problems with the channel handling that need fixed, but IMO this should correct the most common cause.

Relates #25750

(cherry picked from commit f0432df)

Co-authored-by: Andrew Kroh <andrew.kroh@elastic.co>
andrewkroh added a commit to andrewkroh/beats that referenced this issue Aug 2, 2021
This changes the AWS S3 input to allow it to process more SQS messages in parallel
by having workers that are fully utilized while there are SQS message to process.

The previous design processed SQS messages in batches ranging from 1 to 10 in size.
It waited until all messages were processed before requesting more. This left some
workers idle toward the the end of processing the batch. This also limited the maximum
number of messages processed in parallel to 10 because that is the largest request
size allowed by SQS.

The refactored input uses ephemeral goroutines as workers to process SQS messages. It
receives as many SQS messages as there are free workers. The total number of workers
is controlled by `max_number_of_messages` (same as before but without an upper limit).

Other changes

Prevent poison pill messages

When an S3 object processing error occurs the SQS message is returned to the after
the visibility timeout expires. This allows it to be reprocessed again or moved to
the SQS dead letter queue (if configured). But if no dead letter queue policy is
configured and the error is permanent (reprocessing won't fix it) then the message
would continuosly be reprocessed. On error the input will now check the
`ApproximateReceiveCount` attribute of the SQS message and delete it if it exceeds
the configured maximum retries.

Removal of api_timeout from S3 GetObject calls

The `api_timeout` has been removed from the S3 `GetObject` call. This limited the
maximum amount of time for processing the object since the response body is processed
as a stream while the request is open. Requests can still timeout in the server due
to inactivity.

Improved debug logs

The log messages have been enriched with more data about the related SQS message and
S3 object. For example the SQS `message_id`, `s3_bucket`, and `s3_object` are
included in some messages.

`DEBUG   [aws-s3.sqs_s3_event]   awss3/s3.go:127 End S3 object processing.       {"id": "test_id", "queue_url": "https://sqs.us-east-1.amazonaws.com/144492464627/filebeat-s3-integtest-lxlmx6", "message_id": "a11de9f9-0a68-4c4e-a09d-979b87602958", "s3_bucket": "filebeat-s3-integtest-lxlmx6", "s3_object": "events-array.json", "elapsed_time_ns": 23262327}`

Increased test coverage

The refactored input has about 88% test coverage.

The specific AWS API methods used by the input were turned into interfaces to allow
for easier testing. The unit tests mock the AWS interfaces.

The parts of the input were separted into three components listed below. There's a defined
interface for each to allow for mock testing there too. To test the interactions between
these components go-mock is used to generate mocks and then assert the expectations.

1. The SQS receiver. (sqs.go)
2. The S3 Notification Event handler. (sqs_s3_event.go)
3. The S3 Object reader. (s3.go)

Terraform setup for integration test

Setup for executing the integration tests is now handled by Terraform.
See _meta/terraform/README.md for instructions.

Benchmark test

I added a benchmark that tests the input in isolation with mocked SQS and S3 responeses.
It uses a 7KB cloudtrail json.gz file containing about 60 messages for its input.
This removes any variability related to the network, but also means these do not reflect
real-work rates. They can be used to measure the effect of future changes.

+-------------------+--------------------+------------------+--------------------+------+
| MAX MSGS INFLIGHT |   EVENTS PER SEC   | S3 BYTES PER SEC |     TIME (SEC)     | CPUS |
+-------------------+--------------------+------------------+--------------------+------+
|                 1 | 23019.782175720782 | 3.0 MB           |        1.257266458 |   12 |
|                 2 |  36237.53174269319 | 4.8 MB           |        1.158798571 |   12 |
|                 4 |  56456.84532752983 | 7.5 MB           |        1.138285351 |   12 |
|                 8 |  90485.15755430676 | 12 MB            |        1.117244007 |   12 |
|                16 |  103853.8984324643 | 14 MB            |        1.165541225 |   12 |
|                32 | 110380.28141417276 | 15 MB            |        1.110814345 |   12 |
|                64 | 116074.13735061679 | 15 MB            |        1.408100062 |   12 |
|               128 | 114854.80273666105 | 15 MB            | 1.5331357140000001 |   12 |
|               256 | 118767.73924992209 | 16 MB            |        2.041783413 |   12 |
|               512 |  122933.1033660647 | 16 MB            |        1.255463303 |   12 |
|              1024 | 124222.51861746894 | 17 MB            |        1.505765638 |   12 |
+-------------------+--------------------+------------------+--------------------+------+

Relates elastic#25750

Docs update
andrewkroh added a commit that referenced this issue Aug 12, 2021
* Refactor AWS S3 input with workers

This changes the AWS S3 input to allow it to process more SQS messages in parallel
by having workers that are fully utilized while there are SQS message to process.

The previous design processed SQS messages in batches ranging from 1 to 10 in size.
It waited until all messages were processed before requesting more. This left some
workers idle toward the the end of processing the batch. This also limited the maximum
number of messages processed in parallel to 10 because that is the largest request
size allowed by SQS.

The refactored input uses ephemeral goroutines as workers to process SQS messages. It
receives as many SQS messages as there are free workers. The total number of workers
is controlled by `max_number_of_messages` (same as before but without an upper limit).

Other changes

Prevent poison pill messages

When an S3 object processing error occurs the SQS message is returned to the after
the visibility timeout expires. This allows it to be reprocessed again or moved to
the SQS dead letter queue (if configured). But if no dead letter queue policy is
configured and the error is permanent (reprocessing won't fix it) then the message
would continuosly be reprocessed. On error the input will now check the
`ApproximateReceiveCount` attribute of the SQS message and delete it if it exceeds
the configured maximum retries.

Removal of api_timeout from S3 GetObject calls

The `api_timeout` has been removed from the S3 `GetObject` call. This limited the
maximum amount of time for processing the object since the response body is processed
as a stream while the request is open. Requests can still timeout in the server due
to inactivity.

Improved debug logs

The log messages have been enriched with more data about the related SQS message and
S3 object. For example the SQS `message_id`, `s3_bucket`, and `s3_object` are
included in some messages.

`DEBUG   [aws-s3.sqs_s3_event]   awss3/s3.go:127 End S3 object processing.       {"id": "test_id", "queue_url": "https://sqs.us-east-1.amazonaws.com/144492464627/filebeat-s3-integtest-lxlmx6", "message_id": "a11de9f9-0a68-4c4e-a09d-979b87602958", "s3_bucket": "filebeat-s3-integtest-lxlmx6", "s3_object": "events-array.json", "elapsed_time_ns": 23262327}`

Increased test coverage

The refactored input has about 88% test coverage.

The specific AWS API methods used by the input were turned into interfaces to allow
for easier testing. The unit tests mock the AWS interfaces.

The parts of the input were separted into three components listed below. There's a defined
interface for each to allow for mock testing there too. To test the interactions between
these components go-mock is used to generate mocks and then assert the expectations.

1. The SQS receiver. (sqs.go)
2. The S3 Notification Event handler. (sqs_s3_event.go)
3. The S3 Object reader. (s3.go)

Terraform setup for integration test

Setup for executing the integration tests is now handled by Terraform.
See _meta/terraform/README.md for instructions.

Benchmark test

I added a benchmark that tests the input in isolation with mocked SQS and S3 responeses.
It uses a 7KB cloudtrail json.gz file containing about 60 messages for its input.
This removes any variability related to the network, but also means these do not reflect
real-work rates. They can be used to measure the effect of future changes.

+-------------------+--------------------+------------------+--------------------+------+
| MAX MSGS INFLIGHT |   EVENTS PER SEC   | S3 BYTES PER SEC |     TIME (SEC)     | CPUS |
+-------------------+--------------------+------------------+--------------------+------+
|                 1 | 23019.782175720782 | 3.0 MB           |        1.257266458 |   12 |
|                 2 |  36237.53174269319 | 4.8 MB           |        1.158798571 |   12 |
|                 4 |  56456.84532752983 | 7.5 MB           |        1.138285351 |   12 |
|                 8 |  90485.15755430676 | 12 MB            |        1.117244007 |   12 |
|                16 |  103853.8984324643 | 14 MB            |        1.165541225 |   12 |
|                32 | 110380.28141417276 | 15 MB            |        1.110814345 |   12 |
|                64 | 116074.13735061679 | 15 MB            |        1.408100062 |   12 |
|               128 | 114854.80273666105 | 15 MB            | 1.5331357140000001 |   12 |
|               256 | 118767.73924992209 | 16 MB            |        2.041783413 |   12 |
|               512 |  122933.1033660647 | 16 MB            |        1.255463303 |   12 |
|              1024 | 124222.51861746894 | 17 MB            |        1.505765638 |   12 |
+-------------------+--------------------+------------------+--------------------+------+

Relates #25750

* Use InitializeAWSConfig

* Add s3Lister interface for mocking pagination of S3 ListObjects calls

* Add new config parameters to reference.yml

* Optimize uploading b/c it was slow in aws v2 sdk
mergify bot pushed a commit that referenced this issue Aug 12, 2021
* Refactor AWS S3 input with workers

This changes the AWS S3 input to allow it to process more SQS messages in parallel
by having workers that are fully utilized while there are SQS message to process.

The previous design processed SQS messages in batches ranging from 1 to 10 in size.
It waited until all messages were processed before requesting more. This left some
workers idle toward the the end of processing the batch. This also limited the maximum
number of messages processed in parallel to 10 because that is the largest request
size allowed by SQS.

The refactored input uses ephemeral goroutines as workers to process SQS messages. It
receives as many SQS messages as there are free workers. The total number of workers
is controlled by `max_number_of_messages` (same as before but without an upper limit).

Other changes

Prevent poison pill messages

When an S3 object processing error occurs the SQS message is returned to the after
the visibility timeout expires. This allows it to be reprocessed again or moved to
the SQS dead letter queue (if configured). But if no dead letter queue policy is
configured and the error is permanent (reprocessing won't fix it) then the message
would continuosly be reprocessed. On error the input will now check the
`ApproximateReceiveCount` attribute of the SQS message and delete it if it exceeds
the configured maximum retries.

Removal of api_timeout from S3 GetObject calls

The `api_timeout` has been removed from the S3 `GetObject` call. This limited the
maximum amount of time for processing the object since the response body is processed
as a stream while the request is open. Requests can still timeout in the server due
to inactivity.

Improved debug logs

The log messages have been enriched with more data about the related SQS message and
S3 object. For example the SQS `message_id`, `s3_bucket`, and `s3_object` are
included in some messages.

`DEBUG   [aws-s3.sqs_s3_event]   awss3/s3.go:127 End S3 object processing.       {"id": "test_id", "queue_url": "https://sqs.us-east-1.amazonaws.com/144492464627/filebeat-s3-integtest-lxlmx6", "message_id": "a11de9f9-0a68-4c4e-a09d-979b87602958", "s3_bucket": "filebeat-s3-integtest-lxlmx6", "s3_object": "events-array.json", "elapsed_time_ns": 23262327}`

Increased test coverage

The refactored input has about 88% test coverage.

The specific AWS API methods used by the input were turned into interfaces to allow
for easier testing. The unit tests mock the AWS interfaces.

The parts of the input were separted into three components listed below. There's a defined
interface for each to allow for mock testing there too. To test the interactions between
these components go-mock is used to generate mocks and then assert the expectations.

1. The SQS receiver. (sqs.go)
2. The S3 Notification Event handler. (sqs_s3_event.go)
3. The S3 Object reader. (s3.go)

Terraform setup for integration test

Setup for executing the integration tests is now handled by Terraform.
See _meta/terraform/README.md for instructions.

Benchmark test

I added a benchmark that tests the input in isolation with mocked SQS and S3 responeses.
It uses a 7KB cloudtrail json.gz file containing about 60 messages for its input.
This removes any variability related to the network, but also means these do not reflect
real-work rates. They can be used to measure the effect of future changes.

+-------------------+--------------------+------------------+--------------------+------+
| MAX MSGS INFLIGHT |   EVENTS PER SEC   | S3 BYTES PER SEC |     TIME (SEC)     | CPUS |
+-------------------+--------------------+------------------+--------------------+------+
|                 1 | 23019.782175720782 | 3.0 MB           |        1.257266458 |   12 |
|                 2 |  36237.53174269319 | 4.8 MB           |        1.158798571 |   12 |
|                 4 |  56456.84532752983 | 7.5 MB           |        1.138285351 |   12 |
|                 8 |  90485.15755430676 | 12 MB            |        1.117244007 |   12 |
|                16 |  103853.8984324643 | 14 MB            |        1.165541225 |   12 |
|                32 | 110380.28141417276 | 15 MB            |        1.110814345 |   12 |
|                64 | 116074.13735061679 | 15 MB            |        1.408100062 |   12 |
|               128 | 114854.80273666105 | 15 MB            | 1.5331357140000001 |   12 |
|               256 | 118767.73924992209 | 16 MB            |        2.041783413 |   12 |
|               512 |  122933.1033660647 | 16 MB            |        1.255463303 |   12 |
|              1024 | 124222.51861746894 | 17 MB            |        1.505765638 |   12 |
+-------------------+--------------------+------------------+--------------------+------+

Relates #25750

* Use InitializeAWSConfig

* Add s3Lister interface for mocking pagination of S3 ListObjects calls

* Add new config parameters to reference.yml

* Optimize uploading b/c it was slow in aws v2 sdk

(cherry picked from commit 7c76158)

# Conflicts:
#	x-pack/filebeat/input/awss3/collector.go
#	x-pack/filebeat/input/awss3/collector_test.go
andrewkroh added a commit that referenced this issue Aug 13, 2021
* Refactor AWS S3 input with workers

This changes the AWS S3 input to allow it to process more SQS messages in parallel
by having workers that are fully utilized while there are SQS message to process.

The previous design processed SQS messages in batches ranging from 1 to 10 in size.
It waited until all messages were processed before requesting more. This left some
workers idle toward the the end of processing the batch. This also limited the maximum
number of messages processed in parallel to 10 because that is the largest request
size allowed by SQS.

The refactored input uses ephemeral goroutines as workers to process SQS messages. It
receives as many SQS messages as there are free workers. The total number of workers
is controlled by `max_number_of_messages` (same as before but without an upper limit).

Other changes

Prevent poison pill messages

When an S3 object processing error occurs the SQS message is returned to the after
the visibility timeout expires. This allows it to be reprocessed again or moved to
the SQS dead letter queue (if configured). But if no dead letter queue policy is
configured and the error is permanent (reprocessing won't fix it) then the message
would continuosly be reprocessed. On error the input will now check the
`ApproximateReceiveCount` attribute of the SQS message and delete it if it exceeds
the configured maximum retries.

Removal of api_timeout from S3 GetObject calls

The `api_timeout` has been removed from the S3 `GetObject` call. This limited the
maximum amount of time for processing the object since the response body is processed
as a stream while the request is open. Requests can still timeout in the server due
to inactivity.

Improved debug logs

The log messages have been enriched with more data about the related SQS message and
S3 object. For example the SQS `message_id`, `s3_bucket`, and `s3_object` are
included in some messages.

`DEBUG   [aws-s3.sqs_s3_event]   awss3/s3.go:127 End S3 object processing.       {"id": "test_id", "queue_url": "https://sqs.us-east-1.amazonaws.com/144492464627/filebeat-s3-integtest-lxlmx6", "message_id": "a11de9f9-0a68-4c4e-a09d-979b87602958", "s3_bucket": "filebeat-s3-integtest-lxlmx6", "s3_object": "events-array.json", "elapsed_time_ns": 23262327}`

Increased test coverage

The refactored input has about 88% test coverage.

The specific AWS API methods used by the input were turned into interfaces to allow
for easier testing. The unit tests mock the AWS interfaces.

The parts of the input were separted into three components listed below. There's a defined
interface for each to allow for mock testing there too. To test the interactions between
these components go-mock is used to generate mocks and then assert the expectations.

1. The SQS receiver. (sqs.go)
2. The S3 Notification Event handler. (sqs_s3_event.go)
3. The S3 Object reader. (s3.go)

Terraform setup for integration test

Setup for executing the integration tests is now handled by Terraform.
See _meta/terraform/README.md for instructions.

Benchmark test

I added a benchmark that tests the input in isolation with mocked SQS and S3 responeses.
It uses a 7KB cloudtrail json.gz file containing about 60 messages for its input.
This removes any variability related to the network, but also means these do not reflect
real-work rates. They can be used to measure the effect of future changes.

+-------------------+--------------------+------------------+--------------------+------+
| MAX MSGS INFLIGHT |   EVENTS PER SEC   | S3 BYTES PER SEC |     TIME (SEC)     | CPUS |
+-------------------+--------------------+------------------+--------------------+------+
|                 1 | 23019.782175720782 | 3.0 MB           |        1.257266458 |   12 |
|                 2 |  36237.53174269319 | 4.8 MB           |        1.158798571 |   12 |
|                 4 |  56456.84532752983 | 7.5 MB           |        1.138285351 |   12 |
|                 8 |  90485.15755430676 | 12 MB            |        1.117244007 |   12 |
|                16 |  103853.8984324643 | 14 MB            |        1.165541225 |   12 |
|                32 | 110380.28141417276 | 15 MB            |        1.110814345 |   12 |
|                64 | 116074.13735061679 | 15 MB            |        1.408100062 |   12 |
|               128 | 114854.80273666105 | 15 MB            | 1.5331357140000001 |   12 |
|               256 | 118767.73924992209 | 16 MB            |        2.041783413 |   12 |
|               512 |  122933.1033660647 | 16 MB            |        1.255463303 |   12 |
|              1024 | 124222.51861746894 | 17 MB            |        1.505765638 |   12 |
+-------------------+--------------------+------------------+--------------------+------+

Relates #25750

* Use InitializeAWSConfig

* Add s3Lister interface for mocking pagination of S3 ListObjects calls

* Add new config parameters to reference.yml

* Optimize uploading b/c it was slow in aws v2 sdk
andrewkroh added a commit that referenced this issue Aug 13, 2021
* Refactor AWS S3 input with workers

This changes the AWS S3 input to allow it to process more SQS messages in parallel
by having workers that are fully utilized while there are SQS message to process.

The previous design processed SQS messages in batches ranging from 1 to 10 in size.
It waited until all messages were processed before requesting more. This left some
workers idle toward the the end of processing the batch. This also limited the maximum
number of messages processed in parallel to 10 because that is the largest request
size allowed by SQS.

The refactored input uses ephemeral goroutines as workers to process SQS messages. It
receives as many SQS messages as there are free workers. The total number of workers
is controlled by `max_number_of_messages` (same as before but without an upper limit).

Other changes

Prevent poison pill messages

When an S3 object processing error occurs the SQS message is returned to the after
the visibility timeout expires. This allows it to be reprocessed again or moved to
the SQS dead letter queue (if configured). But if no dead letter queue policy is
configured and the error is permanent (reprocessing won't fix it) then the message
would continuosly be reprocessed. On error the input will now check the
`ApproximateReceiveCount` attribute of the SQS message and delete it if it exceeds
the configured maximum retries.

Removal of api_timeout from S3 GetObject calls

The `api_timeout` has been removed from the S3 `GetObject` call. This limited the
maximum amount of time for processing the object since the response body is processed
as a stream while the request is open. Requests can still timeout in the server due
to inactivity.

Improved debug logs

The log messages have been enriched with more data about the related SQS message and
S3 object. For example the SQS `message_id`, `s3_bucket`, and `s3_object` are
included in some messages.

`DEBUG   [aws-s3.sqs_s3_event]   awss3/s3.go:127 End S3 object processing.       {"id": "test_id", "queue_url": "https://sqs.us-east-1.amazonaws.com/144492464627/filebeat-s3-integtest-lxlmx6", "message_id": "a11de9f9-0a68-4c4e-a09d-979b87602958", "s3_bucket": "filebeat-s3-integtest-lxlmx6", "s3_object": "events-array.json", "elapsed_time_ns": 23262327}`

Increased test coverage

The refactored input has about 88% test coverage.

The specific AWS API methods used by the input were turned into interfaces to allow
for easier testing. The unit tests mock the AWS interfaces.

The parts of the input were separted into three components listed below. There's a defined
interface for each to allow for mock testing there too. To test the interactions between
these components go-mock is used to generate mocks and then assert the expectations.

1. The SQS receiver. (sqs.go)
2. The S3 Notification Event handler. (sqs_s3_event.go)
3. The S3 Object reader. (s3.go)

Terraform setup for integration test

Setup for executing the integration tests is now handled by Terraform.
See _meta/terraform/README.md for instructions.

Benchmark test

I added a benchmark that tests the input in isolation with mocked SQS and S3 responeses.
It uses a 7KB cloudtrail json.gz file containing about 60 messages for its input.
This removes any variability related to the network, but also means these do not reflect
real-work rates. They can be used to measure the effect of future changes.

+-------------------+--------------------+------------------+--------------------+------+
| MAX MSGS INFLIGHT |   EVENTS PER SEC   | S3 BYTES PER SEC |     TIME (SEC)     | CPUS |
+-------------------+--------------------+------------------+--------------------+------+
|                 1 | 23019.782175720782 | 3.0 MB           |        1.257266458 |   12 |
|                 2 |  36237.53174269319 | 4.8 MB           |        1.158798571 |   12 |
|                 4 |  56456.84532752983 | 7.5 MB           |        1.138285351 |   12 |
|                 8 |  90485.15755430676 | 12 MB            |        1.117244007 |   12 |
|                16 |  103853.8984324643 | 14 MB            |        1.165541225 |   12 |
|                32 | 110380.28141417276 | 15 MB            |        1.110814345 |   12 |
|                64 | 116074.13735061679 | 15 MB            |        1.408100062 |   12 |
|               128 | 114854.80273666105 | 15 MB            | 1.5331357140000001 |   12 |
|               256 | 118767.73924992209 | 16 MB            |        2.041783413 |   12 |
|               512 |  122933.1033660647 | 16 MB            |        1.255463303 |   12 |
|              1024 | 124222.51861746894 | 17 MB            |        1.505765638 |   12 |
+-------------------+--------------------+------------------+--------------------+------+

Relates #25750

* Use InitializeAWSConfig

* Add s3Lister interface for mocking pagination of S3 ListObjects calls

* Add new config parameters to reference.yml

* Optimize uploading b/c it was slow in aws v2 sdk

Co-authored-by: Andrew Kroh <andrew.kroh@elastic.co>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
bug Filebeat Filebeat Team:Integrations Label for the Integrations team
Projects
None yet
Development

No branches or pull requests

3 participants