Skip to content

Commit

Permalink
feat(forwarder): handle records concurrently (#226)
Browse files Browse the repository at this point in the history
Copying large objects can take a while when executing across regions.
This commit ensures we handle each SQS record concurrently, which should
be safe since:
- `s3:CopyObject` does not consume lambda resources, so we are not at
  risk of running out of memory / cpu
- our fan out factor is capped by our batch size on the SQS queue,
  which is quite small when compared to the AWS API limits.

Each SQS record itself may contain a list of files to copy. This list is
still processed serially for now. In practice we do not use this
capability yet.
  • Loading branch information
jta authored Apr 19, 2024
1 parent 031ce4d commit cc97274
Show file tree
Hide file tree
Showing 7 changed files with 196 additions and 73 deletions.
1 change: 1 addition & 0 deletions handler/forwarder/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ type Config struct {
Override Override
S3Client S3Client
GetTime func() *time.Time
MaxConcurrency int // fan out limit
}

func (c *Config) Validate() error {
Expand Down
115 changes: 72 additions & 43 deletions handler/forwarder/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"bytes"
"context"
"encoding/json"
"errors"
"fmt"
"io"
"net/url"
Expand Down Expand Up @@ -41,6 +42,7 @@ type Handler struct {
Override Override
SourceBucketNames []string
Now func() time.Time
MaxConcurrency int
}

// GetCopyObjectInput constructs the input struct for CopyObject.
Expand Down Expand Up @@ -105,65 +107,91 @@ func (h *Handler) WriteSQS(ctx context.Context, r io.Reader) error {
return nil
}

func (h *Handler) Handle(ctx context.Context, request events.SQSEvent) (response events.SQSEventResponse, err error) {
func (h *Handler) ProcessRecord(ctx context.Context, record *events.SQSMessage) error {
logger := logr.FromContextOrDiscard(ctx)

var messages bytes.Buffer
defer func() {
if err == nil {
logger.V(3).Info("logging messages")
err = h.WriteSQS(ctx, &messages)
copyRecords := GetObjectCreated(record)
for _, copyRecord := range copyRecords {
sourceURL, err := url.Parse(copyRecord.URI)
if err != nil {
logger.Error(err, "error parsing source URI", "SourceURI", copyRecord.URI)
continue
}
}()

encoder := json.NewEncoder(&messages)
if !h.isBucketAllowed(sourceURL.Host) {
logger.Info("Received event from a bucket not in the allowed list; skipping", "bucket", sourceURL.Host)
continue
}
if copyRecord.Size != nil && h.MaxFileSize > 0 && *copyRecord.Size > h.MaxFileSize {
logger.V(1).Info("object size exceeds the maximum file size limit; skipping copy",
"max", h.MaxFileSize, "size", *copyRecord.Size, "uri", copyRecord.URI)
// Log a warning and skip this object by continuing to the next iteration
continue
}

for _, record := range request.Records {
m := &SQSMessage{SQSMessage: record}
copyRecords := m.GetObjectCreated()
for _, copyRecord := range copyRecords {
sourceURL, err := url.Parse(copyRecord.URI)
if err != nil {
logger.Error(err, "error parsing source URI", "SourceURI", copyRecord.URI)
continue
}
copyInput := GetCopyObjectInput(sourceURL, h.DestinationURI)

if !h.isBucketAllowed(sourceURL.Host) {
logger.Info("Received event from a bucket not in the allowed list; skipping", "bucket", sourceURL.Host)
continue
}
if copyRecord.Size != nil && h.MaxFileSize > 0 && *copyRecord.Size > h.MaxFileSize {
logger.V(1).Info("object size exceeds the maximum file size limit; skipping copy",
"max", h.MaxFileSize, "size", *copyRecord.Size, "uri", copyRecord.URI)
// Log a warning and skip this object by continuing to the next iteration
if h.Override != nil {
if h.Override.Apply(ctx, copyInput) && copyInput.Key == nil {
logger.V(6).Info("ignoring object")
continue
}
}

if _, err := h.S3Client.CopyObject(ctx, copyInput); err != nil {
return fmt.Errorf("error copying file %q: %w", copyRecord.URI, err)
}
}
return nil
}

copyInput := GetCopyObjectInput(sourceURL, h.DestinationURI)
func (h *Handler) Handle(ctx context.Context, request events.SQSEvent) (response events.SQSEventResponse, err error) {
resultCh := make(chan *SQSMessage, len(request.Records))
defer close(resultCh)

if h.Override != nil {
if h.Override.Apply(ctx, copyInput) && copyInput.Key == nil {
logger.V(6).Info("ignoring object")
continue
}
}
var (
acquireToken = func() {}
releaseToken = func() {}
)

if h.MaxConcurrency > 0 {
limitCh := make(chan struct{}, h.MaxConcurrency)
defer close(limitCh)
acquireToken = func() { limitCh <- struct{}{} }
releaseToken = func() { <-limitCh }
}

if _, cerr := h.S3Client.CopyObject(ctx, copyInput); cerr != nil {
logger.Error(cerr, "error copying file", "SourceURI", copyRecord.URI)
m.ErrorMessage = cerr.Error()
response.BatchItemFailures = append(response.BatchItemFailures, events.SQSBatchItemFailure{
ItemIdentifier: record.MessageId,
})
break
for _, record := range request.Records {
acquireToken()
go func(m events.SQSMessage) {
defer releaseToken()
result := &SQSMessage{SQSMessage: m}
if err := h.ProcessRecord(ctx, &m); err != nil {
result.ErrorMessage = err.Error()
}
}
resultCh <- result
}(record)
}

if err := encoder.Encode(m); err != nil {
return response, fmt.Errorf("failed to encode message: %w", err)
var messages bytes.Buffer
encoder := json.NewEncoder(&messages)
for i := 0; i < len(request.Records); i++ {
result := <-resultCh
if result.ErrorMessage != "" {
response.BatchItemFailures = append(response.BatchItemFailures, events.SQSBatchItemFailure{
ItemIdentifier: result.SQSMessage.MessageId,
})
}
if e := encoder.Encode(result); e != nil {
err = errors.Join(err, fmt.Errorf("failed to encode message: %w", e))
}
}

if err == nil {
err = h.WriteSQS(ctx, &messages)
}

return response, nil
return
}

// isBucketAllowed checks if the given bucket is in the allowed list or matches a pattern.
Expand All @@ -190,6 +218,7 @@ func New(cfg *Config) (h *Handler, err error) {
Override: cfg.Override,
SourceBucketNames: cfg.SourceBucketNames,
Now: time.Now,
MaxConcurrency: cfg.MaxConcurrency,
}

return h, nil
Expand Down
55 changes: 46 additions & 9 deletions handler/forwarder/handler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"fmt"
"net/url"
"os"
"sync/atomic"
"testing"
"time"

Expand Down Expand Up @@ -99,14 +100,12 @@ var errSentinel = errors.New("sentinel error")
func TestHandler(t *testing.T) {
t.Parallel()

var copyFuncCallCount int

testcases := []struct {
RequestFile string
Config forwarder.Config
ExpectErr error
ExpectResponse events.SQSEventResponse
ExpectedCopyCalls int
ExpectedCopyCalls int64
}{
{
// File size does not exceed MaxFileSize limit
Expand Down Expand Up @@ -152,7 +151,6 @@ func TestHandler(t *testing.T) {
SourceBucketNames: []string{"observeinc*"},
S3Client: &handlertest.S3Client{
CopyObjectFunc: func(_ context.Context, _ *s3.CopyObjectInput, _ ...func(*s3.Options)) (*s3.CopyObjectOutput, error) {
copyFuncCallCount++
return nil, errSentinel
},
},
Expand Down Expand Up @@ -214,6 +212,45 @@ func TestHandler(t *testing.T) {
// Expect no batch item failures as the copy should be successful
},
},
{
// Successful copy of multiple records
RequestFile: "testdata/multiple-records.json",
Config: forwarder.Config{
DestinationURI: "s3://my-bucket",
SourceBucketNames: []string{"observeinc*"},
S3Client: &handlertest.S3Client{
CopyObjectFunc: func(_ context.Context, _ *s3.CopyObjectInput, _ ...func(*s3.Options)) (*s3.CopyObjectOutput, error) {
return nil, nil // Mock successful copy
},
},
},
ExpectedCopyCalls: 3, // Expect one successful call to CopyObjectFunc
ExpectResponse: events.SQSEventResponse{
// Expect no batch item failures as the copy should be successful
},
},
{
// Unsuccessful copy of multiple records
RequestFile: "testdata/multiple-records.json",
Config: forwarder.Config{
DestinationURI: "s3://my-bucket",
SourceBucketNames: []string{"observeinc*"},
S3Client: &handlertest.S3Client{
CopyObjectFunc: func(_ context.Context, _ *s3.CopyObjectInput, _ ...func(*s3.Options)) (*s3.CopyObjectOutput, error) {
return nil, errSentinel
},
},
MaxConcurrency: 1, // ensure ordering
},
ExpectedCopyCalls: 3, // Expect three unsuccessful calls to CopyObjectFunc
ExpectResponse: events.SQSEventResponse{
BatchItemFailures: []events.SQSBatchItemFailure{
{ItemIdentifier: "6aa4e980-26f6-46f4-bb73-fa6e82257191"},
{ItemIdentifier: "6aa4e980-26f6-46f4-bb73-fa6e82257192"},
{ItemIdentifier: "6aa4e980-26f6-46f4-bb73-fa6e82257193"},
},
},
},
}

for _, tc := range testcases {
Expand All @@ -228,21 +265,21 @@ func TestHandler(t *testing.T) {
}

// Initialize the local counter for each test case
localCopyFuncCallCount := 0
var localCopyFuncCallCount atomic.Int64

// Save the original CopyObjectFunc
originalCopyObjectFunc := mockS3Client.CopyObjectFunc

// Wrap the CopyObjectFunc to increment the counter
mockS3Client.CopyObjectFunc = func(ctx context.Context, params *s3.CopyObjectInput, optFns ...func(*s3.Options)) (*s3.CopyObjectOutput, error) {
localCopyFuncCallCount++
localCopyFuncCallCount.Add(1)
if originalCopyObjectFunc != nil {
return originalCopyObjectFunc(ctx, params, optFns...)
}
return nil, nil // Or appropriate default behavior
}

data, err := os.ReadFile("testdata/event.json")
data, err := os.ReadFile(tc.RequestFile)
if err != nil {
t.Fatal(err)
}
Expand Down Expand Up @@ -270,8 +307,8 @@ func TestHandler(t *testing.T) {
}

// Assert the expected number of CopyObjectFunc calls
if localCopyFuncCallCount != tc.ExpectedCopyCalls {
t.Errorf("Expected CopyObjectFunc to be called %d times, but was called %d times", tc.ExpectedCopyCalls, localCopyFuncCallCount)
if v := localCopyFuncCallCount.Load(); v != tc.ExpectedCopyCalls {
t.Errorf("Expected CopyObjectFunc to be called %d times, but was called %d times", tc.ExpectedCopyCalls, v)
}

mockS3Client.CopyObjectFunc = originalCopyObjectFunc
Expand Down
2 changes: 1 addition & 1 deletion handler/forwarder/message.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ type CopyEvent struct {
Copy []CopyRecord `json:"copy"`
}

func (m *SQSMessage) GetObjectCreated() (copyRecords []CopyRecord) {
func GetObjectCreated(m *events.SQSMessage) (copyRecords []CopyRecord) {
message := []byte(m.Body)

var snsEntity events.SNSEntity
Expand Down
2 changes: 1 addition & 1 deletion handler/forwarder/message_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -135,7 +135,7 @@ func TestObjectCreated(t *testing.T) {
}

// Directly compare the CopyRecords obtained from GetObjectCreated
copyRecords := message.GetObjectCreated()
copyRecords := forwarder.GetObjectCreated(&message.SQSMessage)
if diff := cmp.Diff(copyRecords, tc.Expected); diff != "" {
t.Errorf("unexpected result: %s", diff)
}
Expand Down
38 changes: 19 additions & 19 deletions handler/forwarder/testdata/event.json
Original file line number Diff line number Diff line change
@@ -1,21 +1,21 @@
{
"Records": [
{
"messageId": "6aa4e980-26f6-46f4-bb73-fa6e82257191",
"receiptHandle": "AQEBdiYF/uR3IpR5MARpzj0ELM7jWFWuuRoei5gup23IMOTSOT8tJB+i3yTl7oykC/UGx7S8bVs+oFkYi8oeK8SANvAYzudqi7WjrXpJMaQfHhebNemzKsBGQg/NRPSOUc+s12APZAp/6r/uGH2K8dBeQuXCD4j2x8w3356cUJ6Iy/sVUvY+KzxP6x7poz+89nQOqFS6g7JS5KVRlDlEpyAdrVGD9kaDI47gQ49A/SlbSwDCBP3AekiyY2gzI4sbPRdhAO0GMpSiNENKcBCgEcaJD6uh7QwnD+NcdF7IO5XktokrKdG7OeoHgKFNQ06nWZDTzxZTLJrk2sEK+Ta5objqGWCRd0PhTMlZNXRd5aEmYimd+1zLBciY1H/yOzUkVQ0cXYmSJ7q8VYk5goF+FBlUXEeImPXbD+E8p4yOtFp7QIRBzmPm3so7onJGXr7g5++I",
"body": "{\"Records\":[{\"eventVersion\":\"2.1\",\"eventSource\":\"aws:s3\",\"awsRegion\":\"us-west-2\",\"eventTime\":\"2023-09-19T03:49:59.565Z\",\"eventName\":\"ObjectCreated:Put\",\"userIdentity\":{\"principalId\":\"AWS:AIDA2YN7JF3XJWA6ZRUAE\"},\"requestParameters\":{\"sourceIPAddress\":\"192.184.182.96\"},\"responseElements\":{\"x-amz-request-id\":\"2PSYP94SQN8SDTJE\",\"x-amz-id-2\":\"uL6hVdR9zmVu2Alc0ZXeqnUiHq80WZsEmBj98oY2SX6r5T4Kc3NE6RKn9VjbiQAXGDZ5dKKPp++u+K7mSYJNI/Ai/aW5E5at\"},\"s3\":{\"s3SchemaVersion\":\"1.0\",\"configurationId\":\"tf-s3-queue-20230829163550234000000002\",\"bucket\":{\"name\":\"observeinc-filedrop-hoho-us-west-2-7xmjt\",\"ownerIdentity\":{\"principalId\":\"A21JCN1A8EHLG1\"},\"arn\":\"arn:aws:s3:::observeinc-filedrop-hoho-us-west-2-7xmjt\"},\"object\":{\"key\":\"ds101/test3.json\",\"size\":16,\"eTag\":\"ed818579e8cee1d812a77f19efa5e56a\",\"versionId\":\"gv3S8Vnq7asERIoRjci5ssZkjTCjbVjY\",\"sequencer\":\"0065091A67786DA574\"}}}]}",
"attributes": {
"ApproximateReceiveCount": "3",
"SentTimestamp": "1695095400530",
"SenderId": "AIDAJFWZWTE5KRAMGW5A2",
"ApproximateFirstReceiveTimestamp": "1695095400531"
},
"messageAttributes": {},
"md5OfMessageAttributes": null,
"md5OfBody": "b0a876f5ce4a6e24a1ea1db02ea02bd3",
"eventSource": "aws:sqs",
"eventSourceARN": "arn:aws:sqs:us-west-2:739672403694:observeinc-filedrop-hoho-us-west-2-7xmjt",
"awsRegion": "us-west-2"
}
]
"Records": [
{
"messageId": "6aa4e980-26f6-46f4-bb73-fa6e82257191",
"receiptHandle": "AQEBdiYF/uR3IpR5MARpzj0ELM7jWFWuuRoei5gup23IMOTSOT8tJB+i3yTl7oykC/UGx7S8bVs+oFkYi8oeK8SANvAYzudqi7WjrXpJMaQfHhebNemzKsBGQg/NRPSOUc+s12APZAp/6r/uGH2K8dBeQuXCD4j2x8w3356cUJ6Iy/sVUvY+KzxP6x7poz+89nQOqFS6g7JS5KVRlDlEpyAdrVGD9kaDI47gQ49A/SlbSwDCBP3AekiyY2gzI4sbPRdhAO0GMpSiNENKcBCgEcaJD6uh7QwnD+NcdF7IO5XktokrKdG7OeoHgKFNQ06nWZDTzxZTLJrk2sEK+Ta5objqGWCRd0PhTMlZNXRd5aEmYimd+1zLBciY1H/yOzUkVQ0cXYmSJ7q8VYk5goF+FBlUXEeImPXbD+E8p4yOtFp7QIRBzmPm3so7onJGXr7g5++I",
"body": "{\"Records\":[{\"eventVersion\":\"2.1\",\"eventSource\":\"aws:s3\",\"awsRegion\":\"us-west-2\",\"eventTime\":\"2023-09-19T03:49:59.565Z\",\"eventName\":\"ObjectCreated:Put\",\"userIdentity\":{\"principalId\":\"AWS:AIDA2YN7JF3XJWA6ZRUAE\"},\"requestParameters\":{\"sourceIPAddress\":\"192.184.182.96\"},\"responseElements\":{\"x-amz-request-id\":\"2PSYP94SQN8SDTJE\",\"x-amz-id-2\":\"uL6hVdR9zmVu2Alc0ZXeqnUiHq80WZsEmBj98oY2SX6r5T4Kc3NE6RKn9VjbiQAXGDZ5dKKPp++u+K7mSYJNI/Ai/aW5E5at\"},\"s3\":{\"s3SchemaVersion\":\"1.0\",\"configurationId\":\"tf-s3-queue-20230829163550234000000002\",\"bucket\":{\"name\":\"observeinc-filedrop-hoho-us-west-2-7xmjt\",\"ownerIdentity\":{\"principalId\":\"A21JCN1A8EHLG1\"},\"arn\":\"arn:aws:s3:::observeinc-filedrop-hoho-us-west-2-7xmjt\"},\"object\":{\"key\":\"ds101/test3.json\",\"size\":16,\"eTag\":\"ed818579e8cee1d812a77f19efa5e56a\",\"versionId\":\"gv3S8Vnq7asERIoRjci5ssZkjTCjbVjY\",\"sequencer\":\"0065091A67786DA574\"}}}]}",
"attributes": {
"ApproximateReceiveCount": "3",
"SentTimestamp": "1695095400530",
"SenderId": "AIDAJFWZWTE5KRAMGW5A2",
"ApproximateFirstReceiveTimestamp": "1695095400531"
},
"messageAttributes": {},
"md5OfMessageAttributes": null,
"md5OfBody": "b0a876f5ce4a6e24a1ea1db02ea02bd3",
"eventSource": "aws:sqs",
"eventSourceARN": "arn:aws:sqs:us-west-2:739672403694:observeinc-filedrop-hoho-us-west-2-7xmjt",
"awsRegion": "us-west-2"
}
]
}
Loading

0 comments on commit cc97274

Please sign in to comment.