Skip to content

Commit

Permalink
fix(forwarder): cap number of concurrency SQS tasks
Browse files Browse the repository at this point in the history
The forwarder function processes SQS tasks concurrently. By default this
value was previously unbounded, which would cause significant delay when
redriving a Dead Letter Queue.

This commit changes the default to be set to the runtime number of CPUs,
which allows the behaviour to scale with the amount of memory allocated
to the lambda function.
  • Loading branch information
jta committed Jun 18, 2024
1 parent 07e5a5a commit a1d4e04
Show file tree
Hide file tree
Showing 4 changed files with 40 additions and 31 deletions.
16 changes: 8 additions & 8 deletions pkg/handler/forwarder/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,14 +15,14 @@ var (
)

type Config struct {
DestinationURI string // S3 URI to write messages and copy files to
MaxFileSize int64 // maximum file size in bytes for the files to be processed
SourceBucketNames []string
SourceObjectKeys []string
Override Override
S3Client S3Client
GetTime func() *time.Time
MaxConcurrency int // fan out limit
DestinationURI string // S3 URI to write messages and copy files to
MaxFileSize int64 // maximum file size in bytes for the files to be processed
SourceBucketNames []string
SourceObjectKeys []string
Override Override
S3Client S3Client
GetTime func() *time.Time
MaxConcurrentTasks int // fan out limit
}

func (c *Config) Validate() error {
Expand Down
39 changes: 23 additions & 16 deletions pkg/handler/forwarder/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"fmt"
"io"
"net/url"
"runtime"
"strings"
"time"

Expand Down Expand Up @@ -36,13 +37,14 @@ type Override interface {

type Handler struct {
handler.Mux
MaxFileSize int64
DestinationURI *url.URL
S3Client S3Client
Override Override
ObjectPolicy interface{ Allow(string) bool }
Now func() time.Time
MaxConcurrency int

MaxFileSize int64
DestinationURI *url.URL
S3Client S3Client
Override Override
ObjectPolicy interface{ Allow(string) bool }
Now func() time.Time
MaxConcurrentTasks int
}

// GetCopyObjectInput constructs the input struct for CopyObject.
Expand Down Expand Up @@ -169,8 +171,8 @@ func (h *Handler) Handle(ctx context.Context, request events.SQSEvent) (response
releaseToken = func() {}
)

if h.MaxConcurrency > 0 {
limitCh := make(chan struct{}, h.MaxConcurrency)
if h.MaxConcurrentTasks > 0 {
limitCh := make(chan struct{}, h.MaxConcurrentTasks)
defer close(limitCh)
acquireToken = func() { limitCh <- struct{}{} }
releaseToken = func() { <-limitCh }
Expand Down Expand Up @@ -217,16 +219,21 @@ func New(cfg *Config) (h *Handler, err error) {

u, _ := url.ParseRequestURI(cfg.DestinationURI)

maxConcurrentTasks := cfg.MaxConcurrentTasks
if maxConcurrentTasks == 0 {
maxConcurrentTasks = runtime.NumCPU()
}

objectFilter, _ := NewObjectFilter(cfg.SourceBucketNames, cfg.SourceObjectKeys)

h = &Handler{
DestinationURI: u,
S3Client: cfg.S3Client,
MaxFileSize: cfg.MaxFileSize,
Override: cfg.Override,
ObjectPolicy: objectFilter,
Now: time.Now,
MaxConcurrency: cfg.MaxConcurrency,
DestinationURI: u,
S3Client: cfg.S3Client,
MaxFileSize: cfg.MaxFileSize,
Override: cfg.Override,
ObjectPolicy: objectFilter,
Now: time.Now,
MaxConcurrentTasks: maxConcurrentTasks,
}

return h, nil
Expand Down
2 changes: 1 addition & 1 deletion pkg/handler/forwarder/handler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -275,7 +275,7 @@ func TestHandler(t *testing.T) {
return nil, errSentinel
},
},
MaxConcurrency: 1, // ensure ordering
MaxConcurrentTasks: 1, // ensure ordering
},
ExpectedCopyCalls: 3, // Expect three unsuccessful calls to CopyObjectFunc
ExpectResponse: events.SQSEventResponse{
Expand Down
14 changes: 8 additions & 6 deletions pkg/lambda/forwarder/lambda.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ type Config struct {
PresetOverrides []string `env:"PRESET_OVERRIDES,default=aws/v1,infer/v1"`
SourceBucketNames []string `env:"SOURCE_BUCKET_NAMES"`
SourceObjectKeys []string `env:"SOURCE_OBJECT_KEYS"`
MaxConcurrentTasks int `env:"MAX_CONCURRENT_TASKS"`

Logging *logging.Config

Expand Down Expand Up @@ -136,12 +137,13 @@ func New(ctx context.Context, cfg *Config) (*Lambda, error) {
}

f, err := forwarder.New(&forwarder.Config{
DestinationURI: cfg.DestinationURI,
MaxFileSize: cfg.MaxFileSize,
S3Client: s3Client,
Override: append(override.Sets{customOverrides}, presets...),
SourceBucketNames: cfg.SourceBucketNames,
SourceObjectKeys: cfg.SourceObjectKeys,
DestinationURI: cfg.DestinationURI,
MaxFileSize: cfg.MaxFileSize,
S3Client: s3Client,
Override: append(override.Sets{customOverrides}, presets...),
SourceBucketNames: cfg.SourceBucketNames,
SourceObjectKeys: cfg.SourceObjectKeys,
MaxConcurrentTasks: cfg.MaxConcurrentTasks,
})
if err != nil {
return nil, fmt.Errorf("failed to create handler: %w", err)
Expand Down

0 comments on commit a1d4e04

Please sign in to comment.