Skip to content

Commit

Permalink
[8.14](backport #39131) Fix concurrency bugs that could cause data lo…
Browse files Browse the repository at this point in the history
…ss in the `aws-s3` input (#39262)

* Fix concurrency bugs that could cause data loss in the `aws-s3` input (#39131)

This is a cleanup of concurrency and error handling in the `aws-s3` input that could cause several known bugs:

- Memory leaks ([1](elastic/integrations#9463), [2](#39052)). This issue was caused because the input could run several scans of its s3 bucket simultaneously, which led to the cleanup routine `s3Poller.Purge` being called many times concurrently. Inefficiencies in this function caused it to accumulate over time, creating many copies of the state data which could overload process memory. Fixed by:
  * Changing the `s3Poller` run loop to only run one scan at a time, and wait for it to complete before starting the next one.
  * Having each object persist its own state after completing, instead of waiting until the end of a scan and writing an entire bucket worth of metadata at once.
    - This also allowed the removal of other metadata: there is no longer any reason to track the detailed acknowledgment state of each "listing" (page of ~1K events during bucket enumeration), so the `states` helper object is now much simpler.
- Skipped data due to buggy last-modified calculations ([3](#39065)). The most recent scanned timestamp was calculated incorrectly, causing the input to skip a growing number of events as ingestion progressed.
  * Fixed by removing the bucket-wide last modified check entirely. This feature was already risky, since objects with earlier creation timestamps can appear after ones with later timestamps, so there is always the possibility to miss objects. Since the value was calculated incorrectly and was discarded between runs, we can remove it without breaking compatibility and reimplement it more safely in the future if needed.
- Skipped data because rate limiting is treated as permanent failure ([4](#39114)). The input treats all error types the same, which causes many objects to be skipped for ephemeral errors.
  * Fixed by creating an error, `errS3DownloadFailure`, that is returned when processing failure is caused by a download error. In this case, the S3 workers will not persist the failure to the `states` table, so the object will be retried on the next bucket scan. When this happens the worker also sleeps (using an exponential backoff) before trying the next object.
  * Exponential backoff was also added to the bucket scanning loop for page listing errors, so the bucket scan is not restarted needlessly.

(cherry picked from commit e588628)

# Conflicts:
#	x-pack/filebeat/input/awss3/input.go

* fix merge

---------

Co-authored-by: Fae Charlton <fae.charlton@elastic.co>
  • Loading branch information
mergify[bot] and faec authored Apr 29, 2024
1 parent 12d9312 commit fbc2db5
Show file tree
Hide file tree
Showing 10 changed files with 246 additions and 915 deletions.
40 changes: 21 additions & 19 deletions x-pack/filebeat/input/awss3/input.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import (
"time"

awssdk "github.com/aws/aws-sdk-go-v2/aws"
"github.com/aws/aws-sdk-go-v2/aws/retry"
"github.com/aws/aws-sdk-go-v2/service/s3"
"github.com/aws/aws-sdk-go-v2/service/sqs"
"github.com/aws/smithy-go"
Expand All @@ -21,7 +22,6 @@ import (
v2 "github.com/elastic/beats/v7/filebeat/input/v2"
"github.com/elastic/beats/v7/libbeat/beat"
"github.com/elastic/beats/v7/libbeat/feature"
"github.com/elastic/beats/v7/libbeat/statestore"
awscommon "github.com/elastic/beats/v7/x-pack/libbeat/common/aws"
conf "github.com/elastic/elastic-agent-libs/config"
"github.com/elastic/go-concert/unison"
Expand Down Expand Up @@ -99,21 +99,6 @@ func (in *s3Input) Test(ctx v2.TestContext) error {
}

func (in *s3Input) Run(inputContext v2.Context, pipeline beat.Pipeline) error {
var err error

persistentStore, err := in.store.Access()
if err != nil {
return fmt.Errorf("can not access persistent store: %w", err)
}

defer persistentStore.Close()

states := newStates(inputContext)
err = states.readStatesFrom(persistentStore)
if err != nil {
return fmt.Errorf("can not start persistent store: %w", err)
}

// Wrap input Context's cancellation Done channel a context.Context. This
// goroutine stops with the parent closes the Done channel.
ctx, cancelInputCtx := context.WithCancel(context.Background())
Expand Down Expand Up @@ -168,8 +153,20 @@ func (in *s3Input) Run(inputContext v2.Context, pipeline beat.Pipeline) error {
}
defer client.Close()

// Connect to the registry and create our states lookup
persistentStore, err := in.store.Access()
if err != nil {
return fmt.Errorf("can not access persistent store: %w", err)
}
defer persistentStore.Close()

states, err := newStates(inputContext, persistentStore)
if err != nil {
return fmt.Errorf("can not start persistent store: %w", err)
}

// Create S3 receiver and S3 notification processor.
poller, err := in.createS3Lister(inputContext, ctx, client, persistentStore, states)
poller, err := in.createS3Lister(inputContext, ctx, client, states)
if err != nil {
return fmt.Errorf("failed to initialize s3 poller: %w", err)
}
Expand Down Expand Up @@ -240,7 +237,7 @@ func (n nonAWSBucketResolver) ResolveEndpoint(region string, options s3.Endpoint
return awssdk.Endpoint{URL: n.endpoint, SigningRegion: region, HostnameImmutable: true, Source: awssdk.EndpointSourceCustom}, nil
}

func (in *s3Input) createS3Lister(ctx v2.Context, cancelCtx context.Context, client beat.Client, persistentStore *statestore.Store, states *states) (*s3Poller, error) {
func (in *s3Input) createS3Lister(ctx v2.Context, cancelCtx context.Context, client beat.Client, states *states) (*s3Poller, error) {
var bucketName string
var bucketID string
if in.config.NonAWSBucketName != "" {
Expand All @@ -260,6 +257,12 @@ func (in *s3Input) createS3Lister(ctx v2.Context, cancelCtx context.Context, cli
o.EndpointOptions.UseFIPSEndpoint = awssdk.FIPSEndpointStateEnabled
}
o.UsePathStyle = in.config.PathStyle

o.Retryer = retry.NewStandard(func(so *retry.StandardOptions) {
so.MaxAttempts = 5
// Recover quickly when requests start working again
so.NoRetryIncrement = 100
})
})
regionName, err := getRegionForBucket(cancelCtx, s3Client, bucketName)
if err != nil {
Expand Down Expand Up @@ -305,7 +308,6 @@ func (in *s3Input) createS3Lister(ctx v2.Context, cancelCtx context.Context, cli
client,
s3EventHandlerFactory,
states,
persistentStore,
bucketID,
in.config.BucketListPrefix,
in.awsConfig.Region,
Expand Down
14 changes: 6 additions & 8 deletions x-pack/filebeat/input/awss3/input_benchmark_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,14 +8,15 @@ import (
"context"
"errors"
"fmt"
"io/ioutil"
"os"
"path/filepath"
"runtime"
"sync"
"testing"
"time"

"github.com/stretchr/testify/assert"

"github.com/elastic/beats/v7/libbeat/statestore"
"github.com/elastic/beats/v7/libbeat/statestore/storetest"

Expand Down Expand Up @@ -132,7 +133,7 @@ type constantS3 struct {
var _ s3API = (*constantS3)(nil)

func newConstantS3(t testing.TB) *constantS3 {
data, err := ioutil.ReadFile(cloudtrailTestFile)
data, err := os.ReadFile(cloudtrailTestFile)
if err != nil {
t.Fatal(err)
}
Expand Down Expand Up @@ -342,14 +343,11 @@ func benchmarkInputS3(t *testing.T, numberOfWorkers int) testing.BenchmarkResult
return
}

err = store.Set(awsS3WriteCommitPrefix+"bucket"+listPrefix, &commitWriteState{time.Time{}})
if err != nil {
errChan <- err
return
}
states, err := newStates(inputCtx, store)
assert.NoError(t, err, "states creation should succeed")

s3EventHandlerFactory := newS3ObjectProcessorFactory(log.Named("s3"), metrics, s3API, config.FileSelectors, backupConfig{}, numberOfWorkers)
s3Poller := newS3Poller(logp.NewLogger(inputName), metrics, s3API, client, s3EventHandlerFactory, newStates(inputCtx), store, "bucket", listPrefix, "region", "provider", numberOfWorkers, time.Second)
s3Poller := newS3Poller(logp.NewLogger(inputName), metrics, s3API, client, s3EventHandlerFactory, states, "bucket", listPrefix, "region", "provider", numberOfWorkers, time.Second)

if err := s3Poller.Poll(ctx); err != nil {
if !errors.Is(err, context.DeadlineExceeded) {
Expand Down
Loading

0 comments on commit fbc2db5

Please sign in to comment.