From a1d4e04270c9b7e713dd9ae54769b10d495e9052 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jo=C3=A3o=20Taveira=20Ara=C3=BAjo?= Date: Fri, 14 Jun 2024 11:25:56 -0700 Subject: [PATCH] fix(forwarder): cap number of concurrency SQS tasks The forwarder function processes SQS tasks concurrently. By default this value was previously unbounded, which would cause significant delay when redriving a Dead Letter Queue. This commit changes the default to be set to the runtime number of CPUs, which allows the behaviour to scale with the amount of memory allocated to the lambda function. --- pkg/handler/forwarder/config.go | 16 +++++------ pkg/handler/forwarder/handler.go | 39 ++++++++++++++++----------- pkg/handler/forwarder/handler_test.go | 2 +- pkg/lambda/forwarder/lambda.go | 14 +++++----- 4 files changed, 40 insertions(+), 31 deletions(-) diff --git a/pkg/handler/forwarder/config.go b/pkg/handler/forwarder/config.go index dfb14763..f39404bf 100644 --- a/pkg/handler/forwarder/config.go +++ b/pkg/handler/forwarder/config.go @@ -15,14 +15,14 @@ var ( ) type Config struct { - DestinationURI string // S3 URI to write messages and copy files to - MaxFileSize int64 // maximum file size in bytes for the files to be processed - SourceBucketNames []string - SourceObjectKeys []string - Override Override - S3Client S3Client - GetTime func() *time.Time - MaxConcurrency int // fan out limit + DestinationURI string // S3 URI to write messages and copy files to + MaxFileSize int64 // maximum file size in bytes for the files to be processed + SourceBucketNames []string + SourceObjectKeys []string + Override Override + S3Client S3Client + GetTime func() *time.Time + MaxConcurrentTasks int // fan out limit } func (c *Config) Validate() error { diff --git a/pkg/handler/forwarder/handler.go b/pkg/handler/forwarder/handler.go index 54f98210..1a171958 100644 --- a/pkg/handler/forwarder/handler.go +++ b/pkg/handler/forwarder/handler.go @@ -8,6 +8,7 @@ import ( "fmt" "io" "net/url" + "runtime" "strings" "time" @@ -36,13 +37,14 @@ type Override interface { type Handler struct { handler.Mux - MaxFileSize int64 - DestinationURI *url.URL - S3Client S3Client - Override Override - ObjectPolicy interface{ Allow(string) bool } - Now func() time.Time - MaxConcurrency int + + MaxFileSize int64 + DestinationURI *url.URL + S3Client S3Client + Override Override + ObjectPolicy interface{ Allow(string) bool } + Now func() time.Time + MaxConcurrentTasks int } // GetCopyObjectInput constructs the input struct for CopyObject. @@ -169,8 +171,8 @@ func (h *Handler) Handle(ctx context.Context, request events.SQSEvent) (response releaseToken = func() {} ) - if h.MaxConcurrency > 0 { - limitCh := make(chan struct{}, h.MaxConcurrency) + if h.MaxConcurrentTasks > 0 { + limitCh := make(chan struct{}, h.MaxConcurrentTasks) defer close(limitCh) acquireToken = func() { limitCh <- struct{}{} } releaseToken = func() { <-limitCh } @@ -217,16 +219,21 @@ func New(cfg *Config) (h *Handler, err error) { u, _ := url.ParseRequestURI(cfg.DestinationURI) + maxConcurrentTasks := cfg.MaxConcurrentTasks + if maxConcurrentTasks == 0 { + maxConcurrentTasks = runtime.NumCPU() + } + objectFilter, _ := NewObjectFilter(cfg.SourceBucketNames, cfg.SourceObjectKeys) h = &Handler{ - DestinationURI: u, - S3Client: cfg.S3Client, - MaxFileSize: cfg.MaxFileSize, - Override: cfg.Override, - ObjectPolicy: objectFilter, - Now: time.Now, - MaxConcurrency: cfg.MaxConcurrency, + DestinationURI: u, + S3Client: cfg.S3Client, + MaxFileSize: cfg.MaxFileSize, + Override: cfg.Override, + ObjectPolicy: objectFilter, + Now: time.Now, + MaxConcurrentTasks: maxConcurrentTasks, } return h, nil diff --git a/pkg/handler/forwarder/handler_test.go b/pkg/handler/forwarder/handler_test.go index f32a660f..95716c20 100644 --- a/pkg/handler/forwarder/handler_test.go +++ b/pkg/handler/forwarder/handler_test.go @@ -275,7 +275,7 @@ func TestHandler(t *testing.T) { return nil, errSentinel }, }, - MaxConcurrency: 1, // ensure ordering + MaxConcurrentTasks: 1, // ensure ordering }, ExpectedCopyCalls: 3, // Expect three unsuccessful calls to CopyObjectFunc ExpectResponse: events.SQSEventResponse{ diff --git a/pkg/lambda/forwarder/lambda.go b/pkg/lambda/forwarder/lambda.go index e5abdba5..5538774c 100644 --- a/pkg/lambda/forwarder/lambda.go +++ b/pkg/lambda/forwarder/lambda.go @@ -38,6 +38,7 @@ type Config struct { PresetOverrides []string `env:"PRESET_OVERRIDES,default=aws/v1,infer/v1"` SourceBucketNames []string `env:"SOURCE_BUCKET_NAMES"` SourceObjectKeys []string `env:"SOURCE_OBJECT_KEYS"` + MaxConcurrentTasks int `env:"MAX_CONCURRENT_TASKS"` Logging *logging.Config @@ -136,12 +137,13 @@ func New(ctx context.Context, cfg *Config) (*Lambda, error) { } f, err := forwarder.New(&forwarder.Config{ - DestinationURI: cfg.DestinationURI, - MaxFileSize: cfg.MaxFileSize, - S3Client: s3Client, - Override: append(override.Sets{customOverrides}, presets...), - SourceBucketNames: cfg.SourceBucketNames, - SourceObjectKeys: cfg.SourceObjectKeys, + DestinationURI: cfg.DestinationURI, + MaxFileSize: cfg.MaxFileSize, + S3Client: s3Client, + Override: append(override.Sets{customOverrides}, presets...), + SourceBucketNames: cfg.SourceBucketNames, + SourceObjectKeys: cfg.SourceObjectKeys, + MaxConcurrentTasks: cfg.MaxConcurrentTasks, }) if err != nil { return nil, fmt.Errorf("failed to create handler: %w", err)