Skip to content

Commit

Permalink
feat(forwarder): filter s3 buckets (#116)
Browse files Browse the repository at this point in the history
* feat(forwarder): filter s3 buckets

* test(forwarder): add go-unit tests for source bucket filtering
  • Loading branch information
obs-gh-colinhutchinson authored Dec 1, 2023
1 parent c7d3836 commit 247ba0a
Show file tree
Hide file tree
Showing 5 changed files with 74 additions and 12 deletions.
3 changes: 3 additions & 0 deletions apps/forwarder/template.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -291,6 +291,9 @@ Resources:
CONTENT_TYPE_OVERRIDES: !Join
- ","
- !Ref ContentTypeOverrides
SOURCE_BUCKET_NAMES: !Join
- ','
- !Ref SourceBucketNames
Outputs:
Function:
Description: "Lambda Function ARN"
Expand Down
2 changes: 2 additions & 0 deletions cmd/forwarder/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ var env struct {
Verbosity int `env:"VERBOSITY,default=1"`
MaxFileSize int64 `env:"MAX_FILE_SIZE"`
ContentTypeOverrides []string `env:"CONTENT_TYPE_OVERRIDES"`
SourceBucketNames []string `env:"SOURCE_BUCKET_NAMES"`
}

var (
Expand Down Expand Up @@ -61,6 +62,7 @@ func realInit() error {
S3Client: s3client,
Logger: &logger,
ContentTypeOverrides: env.ContentTypeOverrides,
SourceBucketNames: env.SourceBucketNames,
})
if err != nil {
return fmt.Errorf("failed to create handler: %w", err)
Expand Down
1 change: 1 addition & 0 deletions handler/forwarder/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ type Config struct {
LogPrefix string // prefix used when writing SQS messages to S3
MaxFileSize int64 // maximum file size in bytes for the files to be processed
ContentTypeOverrides []string // list of key pair values containing regular expressions to content type values
SourceBucketNames []string
S3Client S3Client
Logger *logr.Logger
}
Expand Down
29 changes: 23 additions & 6 deletions handler/forwarder/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"fmt"
"io"
"net/url"
"path/filepath"
"strings"

"github.com/aws/aws-lambda-go/events"
Expand Down Expand Up @@ -34,6 +35,7 @@ type Handler struct {
LogPrefix string
S3Client S3Client
ContentTypeOverride Matcher
SourceBucketNames []string
}

// GetCopyObjectInput constructs the input struct for CopyObject.
Expand Down Expand Up @@ -101,19 +103,23 @@ func (h *Handler) Handle(ctx context.Context, request events.SQSEvent) (response
m := &SQSMessage{SQSMessage: record}
copyRecords := m.GetObjectCreated()
for _, copyRecord := range copyRecords {
sourceURL, err := url.Parse(copyRecord.URI)
if err != nil {
logger.Error(err, "error parsing source URI", "SourceURI", copyRecord.URI)
continue
}

if !h.isBucketAllowed(sourceURL.Host) {
logger.Info("Received event from a bucket not in the allowed list; skipping", "bucket", sourceURL.Host)
continue
}
if copyRecord.Size != nil && h.MaxFileSize > 0 && *copyRecord.Size > h.MaxFileSize {
logger.V(1).Info("object size exceeds the maximum file size limit; skipping copy",
"max", h.MaxFileSize, "size", *copyRecord.Size, "uri", copyRecord.URI)
// Log a warning and skip this object by continuing to the next iteration
continue
}

sourceURL, err := url.Parse(copyRecord.URI)
if err != nil {
logger.Error(err, "error parsing source URI", "SourceURI", copyRecord.URI)
continue
}

copyInput := GetCopyObjectInput(sourceURL, h.DestinationURI)

if contentType := h.ContentTypeOverride.Match(sourceURL.String()); contentType != "" {
Expand All @@ -140,6 +146,16 @@ func (h *Handler) Handle(ctx context.Context, request events.SQSEvent) (response
return response, nil
}

// isBucketAllowed checks if the given bucket is in the allowed list or matches a pattern.
func (h *Handler) isBucketAllowed(bucket string) bool {
for _, pattern := range h.SourceBucketNames {
if match, _ := filepath.Match(pattern, bucket); match {
return true
}
}
return false
}

func New(cfg *Config) (h *Handler, err error) {
if err := cfg.Validate(); err != nil {
return nil, err
Expand All @@ -155,6 +171,7 @@ func New(cfg *Config) (h *Handler, err error) {
S3Client: cfg.S3Client,
MaxFileSize: cfg.MaxFileSize,
ContentTypeOverride: m,
SourceBucketNames: cfg.SourceBucketNames,
}

if cfg.Logger != nil {
Expand Down
51 changes: 45 additions & 6 deletions handler/forwarder/handler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -108,8 +108,9 @@ func TestHandler(t *testing.T) {
// File size does not exceed MaxFileSize limit
RequestFile: "testdata/event.json",
Config: forwarder.Config{
MaxFileSize: 20,
DestinationURI: "s3://my-bucket",
MaxFileSize: 20,
DestinationURI: "s3://my-bucket",
SourceBucketNames: []string{"observeinc*"},
S3Client: &handlertest.S3Client{
CopyObjectFunc: func(ctx context.Context, params *s3.CopyObjectInput, optFns ...func(*s3.Options)) (*s3.CopyObjectOutput, error) {
return nil, nil
Expand All @@ -125,8 +126,9 @@ func TestHandler(t *testing.T) {
// File size exceeds MaxFileSize limit
RequestFile: "testdata/event.json",
Config: forwarder.Config{
MaxFileSize: 1,
DestinationURI: "s3://my-bucket",
MaxFileSize: 1,
DestinationURI: "s3://my-bucket",
SourceBucketNames: []string{"observeinc*"},
S3Client: &handlertest.S3Client{
CopyObjectFunc: func(ctx context.Context, params *s3.CopyObjectInput, optFns ...func(*s3.Options)) (*s3.CopyObjectOutput, error) {
return nil, nil
Expand All @@ -142,7 +144,8 @@ func TestHandler(t *testing.T) {
// Failing a copy should fail the individual item in the queue affected
RequestFile: "testdata/event.json",
Config: forwarder.Config{
DestinationURI: "s3://my-bucket",
DestinationURI: "s3://my-bucket",
SourceBucketNames: []string{"observeinc*"},
S3Client: &handlertest.S3Client{
CopyObjectFunc: func(ctx context.Context, params *s3.CopyObjectInput, optFns ...func(*s3.Options)) (*s3.CopyObjectOutput, error) {
copyFuncCallCount++
Expand All @@ -161,7 +164,8 @@ func TestHandler(t *testing.T) {
// Failing to put a record to the destination URI is a terminal condition. Error out.
RequestFile: "testdata/event.json",
Config: forwarder.Config{
DestinationURI: "s3://my-bucket",
DestinationURI: "s3://my-bucket",
SourceBucketNames: []string{"observeinc*"},
S3Client: &handlertest.S3Client{
PutObjectFunc: func(ctx context.Context, params *s3.PutObjectInput, optFns ...func(*s3.Options)) (*s3.PutObjectOutput, error) {
return nil, errSentinel
Expand All @@ -171,6 +175,41 @@ func TestHandler(t *testing.T) {
ExpectedCopyCalls: 1,
ExpectErr: errSentinel,
},
{
// Source bucket isn't in source bucket names
RequestFile: "testdata/event.json",
Config: forwarder.Config{
DestinationURI: "s3://my-bucket",
SourceBucketNames: []string{"doesntexist"},
S3Client: &handlertest.S3Client{
CopyObjectFunc: func(ctx context.Context, params *s3.CopyObjectInput, optFns ...func(*s3.Options)) (*s3.CopyObjectOutput, error) {
return nil, nil
},
},
},
ExpectedCopyCalls: 0,
ExpectResponse: events.SQSEventResponse{
// Expect no batch item failures as the file should be skipped, not failed
},
},
{
// Successful copy where source bucket matches a name in SourceBucketNames
RequestFile: "testdata/event.json",
Config: forwarder.Config{
MaxFileSize: 50, // Adjust size limit to allow the file to be copied
DestinationURI: "s3://my-bucket",
SourceBucketNames: []string{"doesntexist", "observeinc-filedrop-hoho-us-west-2-7xmjt"}, // List includes the exact bucket name
S3Client: &handlertest.S3Client{
CopyObjectFunc: func(ctx context.Context, params *s3.CopyObjectInput, optFns ...func(*s3.Options)) (*s3.CopyObjectOutput, error) {
return nil, nil // Mock successful copy
},
},
},
ExpectedCopyCalls: 1, // Expect one successful call to CopyObjectFunc
ExpectResponse: events.SQSEventResponse{
// Expect no batch item failures as the copy should be successful
},
},
}

for _, tc := range testcases {
Expand Down

0 comments on commit 247ba0a

Please sign in to comment.