-
Notifications
You must be signed in to change notification settings - Fork 4.9k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Fix concurrency bugs that could cause data loss in the aws-s3
input
#39131
Conversation
This pull request does not have a backport label.
To fixup this pull request, you need to add the backport labels for the needed
|
This pull request is now in conflicts. Could you fix it? 🙏
|
Pinging @elastic/elastic-agent (Team:Elastic-Agent) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM, great work!
- the worker/reader flow it is easier to read and follow
- the state logic is much more manageable
- It’s nice we are now using github.com/aws/aws-sdk-go-v2/aws/retry
// Failed is true when ProcessS3Object returned an error other than | ||
// s3DownloadError. | ||
// Before 8.14, this field was called "error". However, that field was | ||
// set for many ephemeral reasons including client-side rate limiting | ||
// (see https://github.com/elastic/beats/issues/39114). Now that we | ||
// don't treat download errors as permanent, the field name was changed | ||
// so that users upgrading from old versions aren't prevented from | ||
// retrying old download failures. | ||
Failed bool `json:"failed" struct:"failed"` |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good call! I like the new name and semantics, as well as the possibility of retrying past ephemeral failures.
…#39131) This is a cleanup of concurrency and error handling in the `aws-s3` input that could cause several known bugs: - Memory leaks ([1](elastic/integrations#9463), [2](#39052)). This issue was caused because the input could run several scans of its s3 bucket simultaneously, which led to the cleanup routine `s3Poller.Purge` being called many times concurrently. Inefficiencies in this function caused it to accumulate over time, creating many copies of the state data which could overload process memory. Fixed by: * Changing the `s3Poller` run loop to only run one scan at a time, and wait for it to complete before starting the next one. * Having each object persist its own state after completing, instead of waiting until the end of a scan and writing an entire bucket worth of metadata at once. - This also allowed the removal of other metadata: there is no longer any reason to track the detailed acknowledgment state of each "listing" (page of ~1K events during bucket enumeration), so the `states` helper object is now much simpler. - Skipped data due to buggy last-modified calculations ([3](#39065)). The most recent scanned timestamp was calculated incorrectly, causing the input to skip a growing number of events as ingestion progressed. * Fixed by removing the bucket-wide last modified check entirely. This feature was already risky, since objects with earlier creation timestamps can appear after ones with later timestamps, so there is always the possibility to miss objects. Since the value was calculated incorrectly and was discarded between runs, we can remove it without breaking compatibility and reimplement it more safely in the future if needed. - Skipped data because rate limiting is treated as permanent failure ([4](#39114)). The input treats all error types the same, which causes many objects to be skipped for ephemeral errors. * Fixed by creating an error, `errS3DownloadFailure`, that is returned when processing failure is caused by a download error. In this case, the S3 workers will not persist the failure to the `states` table, so the object will be retried on the next bucket scan. When this happens the worker also sleeps (using an exponential backoff) before trying the next object. * Exponential backoff was also added to the bucket scanning loop for page listing errors, so the bucket scan is not restarted needlessly. (cherry picked from commit e588628) # Conflicts: # x-pack/filebeat/input/awss3/input.go
…ss in the `aws-s3` input (#39262) * Fix concurrency bugs that could cause data loss in the `aws-s3` input (#39131) This is a cleanup of concurrency and error handling in the `aws-s3` input that could cause several known bugs: - Memory leaks ([1](elastic/integrations#9463), [2](#39052)). This issue was caused because the input could run several scans of its s3 bucket simultaneously, which led to the cleanup routine `s3Poller.Purge` being called many times concurrently. Inefficiencies in this function caused it to accumulate over time, creating many copies of the state data which could overload process memory. Fixed by: * Changing the `s3Poller` run loop to only run one scan at a time, and wait for it to complete before starting the next one. * Having each object persist its own state after completing, instead of waiting until the end of a scan and writing an entire bucket worth of metadata at once. - This also allowed the removal of other metadata: there is no longer any reason to track the detailed acknowledgment state of each "listing" (page of ~1K events during bucket enumeration), so the `states` helper object is now much simpler. - Skipped data due to buggy last-modified calculations ([3](#39065)). The most recent scanned timestamp was calculated incorrectly, causing the input to skip a growing number of events as ingestion progressed. * Fixed by removing the bucket-wide last modified check entirely. This feature was already risky, since objects with earlier creation timestamps can appear after ones with later timestamps, so there is always the possibility to miss objects. Since the value was calculated incorrectly and was discarded between runs, we can remove it without breaking compatibility and reimplement it more safely in the future if needed. - Skipped data because rate limiting is treated as permanent failure ([4](#39114)). The input treats all error types the same, which causes many objects to be skipped for ephemeral errors. * Fixed by creating an error, `errS3DownloadFailure`, that is returned when processing failure is caused by a download error. In this case, the S3 workers will not persist the failure to the `states` table, so the object will be retried on the next bucket scan. When this happens the worker also sleeps (using an exponential backoff) before trying the next object. * Exponential backoff was also added to the bucket scanning loop for page listing errors, so the bucket scan is not restarted needlessly. (cherry picked from commit e588628) # Conflicts: # x-pack/filebeat/input/awss3/input.go * fix merge --------- Co-authored-by: Fae Charlton <fae.charlton@elastic.co>
@faec there should be a changelog entry added for these fixes |
This is a cleanup of concurrency and error handling in the
aws-s3
input that could cause several known bugs:s3Poller.Purge
being called many times concurrently. Inefficiencies in this function caused it to accumulate over time, creating many copies of the state data which could overload process memory. Fixed by:s3Poller
run loop to only run one scan at a time, and wait for it to complete before starting the next one.states
helper object is now much simpler.errS3DownloadFailure
, that is returned when processing failure is caused by a download error. In this case, the S3 workers will not persist the failure to thestates
table, so the object will be retried on the next bucket scan. When this happens the worker also sleeps (using an exponential backoff) before trying the next object.Checklist
I have made corresponding changes to the documentationI have made corresponding change to the default configuration filesCHANGELOG.next.asciidoc
orCHANGELOG-developer.next.asciidoc
.Results
Comparison when ingesting a bucket of 1.9 million objects using the configuration (bucket/auth data redacted):
Without this PR
After ingesting 218K events in 1:15, ingestion stopped permanently.
With this PR
1.9 million events ingested in 3 hours. Ingestion then continues at a much lower rate as the input begins the next bucket scan, picking up new entries and retrying failures from the last pass.
This ingestion is now output-limited -- the slowdown visible around 11:30 was caused by Elasticsearch-side throttling producing
429 Too Many Requests
responses, not by any issue with the input.Related issues
aws-s3
input writes to Filebeat registry without proper synchronization #39052aws-s3
input skips events and slows ingestion based on object creation timestamp #39065aws-s3
input treats client rate limiting as permanent failure #39114