diff --git a/apps/forwarder/template.yaml b/apps/forwarder/template.yaml index d59ca7e4..5b138ec4 100644 --- a/apps/forwarder/template.yaml +++ b/apps/forwarder/template.yaml @@ -25,6 +25,7 @@ Metadata: default: Data Sources Parameters: - SourceBucketNames + - SourceObjectKeys - SourceTopicArns - SourceKMSKeyArns - ContentTypeOverrides @@ -69,6 +70,12 @@ Parameters: to the forwarder. Default: '' AllowedPattern: "^[a-z0-9-.]*(\\*)?$" + SourceObjectKeys: + Type: CommaDelimitedList + Description: >- + A list of object keys which the forwarder should process. This list + applies across all source buckets, and supports wildcards. + Default: '*' SourceTopicArns: Type: CommaDelimitedList Description: >- @@ -154,11 +161,17 @@ Conditions: UseDefaultVerbosity: !Equals - !Ref Verbosity - '' - DisableSourceS3: !Equals - - !Join + DisableSourceS3: !Or + - !Equals + - !Join + - '' + - !Ref SourceBucketNames + - '' + - !Equals + - !Join + - '' + - !Ref SourceObjectKeys - '' - - !Ref SourceBucketNames - - '' EnableSourceS3: !Not - !Condition DisableSourceS3 DisableKMSDecrypt: !Equals @@ -244,11 +257,15 @@ Resources: { "source": ["aws.s3"], "detail-type": ["Object Created"], - "detail.bucket.name": [{"wildcard": "${wildcards}"}] + "detail.bucket.name": [{"wildcard": "${buckets}"}], + "detail.object.key": [{"wildcard": "${objects}"}] } - - wildcards: !Join + - buckets: !Join - '"}, {"wildcard":"' - !Ref SourceBucketNames + objects: !Join + - '"}, {"wildcard":"' + - !Ref SourceObjectKeys Targets: - Arn: !GetAtt Queue.Arn Id: "Forwarder" @@ -343,6 +360,14 @@ Resources: Action: - s3:GetObject - s3:GetObjectTagging + # NOTE: ideally we'd filter the resource list to the set of + # source object keys. That would require taking the cross + # product of SourceBucketNames and SourceObjectKeys, which + # can not be natively expressed in CloudFormation. + # + # We rely instead on filtering within the Lambda function, + # assisted by filtering at the event subscription layer to + # reduce the chances of false positives. Resource: !Split - "," - !Sub @@ -435,6 +460,9 @@ Resources: SOURCE_BUCKET_NAMES: !Join - ',' - !Ref SourceBucketNames + SOURCE_OBJECT_KEYS: !Join + - ',' + - !Ref SourceObjectKeys OTEL_EXPORTER_OTLP_ENDPOINT: !Ref DebugEndpoint OTEL_TRACES_EXPORTER: !If [DisableOTEL, "none", "otlp"] Outputs: diff --git a/docs/forwarder.md b/docs/forwarder.md index 9dd641ce..0dd5b988 100644 --- a/docs/forwarder.md +++ b/docs/forwarder.md @@ -40,6 +40,7 @@ The forwarder stack can be configured with the following parameters: | `DataAccessPointArn` | String | The access point ARN for your Filedrop. | | `NameOverride` | String | Name of IAM role expected by Filedrop. This name will also be applied to the SQS Queue and Lambda Function processing events. In the absence of a value, the stack name will be used. | | `SourceBucketNames` | CommaDelimitedList | A list of bucket names which the forwarder is allowed to read from. This list only affects permissions, and supports wildcards. In order to have files copied to Filedrop, you must also subscribe S3 Bucket Notifications to the forwarder. | +| `SourceObjectKeys` | CommaDelimitedList | A list of object keys which the forwarder should process. This list applies across all source buckets, and supports wildcards. | | `SourceTopicArns` | CommaDelimitedList | A list of SNS topics the forwarder is allowed to be subscribed to. | | `SourceKMSKeyArns` | CommaDelimitedList | A list of KMS Key ARNs the forwarder is allowed to use to decrypt objects in S3. | | `ContentTypeOverrides` | CommaDelimitedList | A list of key value pairs. The key is a regular expression which is applied to the S3 source (/) of forwarded files. The value is the content type to set for matching files. For example, `\.json$=application/x-ndjson` would forward all files ending in `.json` as newline delimited JSON files. | @@ -78,7 +79,8 @@ These parameters must be used to configure the Forwarder stack: To forward files from an S3 bucket to the Filedrop: 1. Include the bucket name in `SourceBucketNames` or use a wildcard pattern. -2. Configure S3 Event Notifications to trigger the Forwarder's SQS queue. +2. Ensure any object key you want to forward matches the patterns in `SourceObjectKeys`. +3. Configure S3 Event Notifications to trigger the Forwarder's SQS queue. **Note**: The Forwarder stack does not manage source buckets. You must manually set up the event notifications using one of the following methods: @@ -154,6 +156,16 @@ In order to grant the Forwarder lambda function permission to use the KMS key fo 1. **Update your Forwarder stack**: include your KMS Key ARN in `SourceKMSKeyArns` in your forwarder stack. 2. **Update your KMS key policy**: your key policy must grant the Forwarder Lambda function permission to call `kms:Decrypt`. The [default KMS key policy](https://docs.aws.amazon.com/kms/latest/developerguide/key-policy-default.html) is sufficient to satisfy this constraint, since it will delegate access to the KMS key to IAM. +## Filtering Object Keys + +The forwarder will only attempt to forward files for which it receives events. As a result, to ensure a subset of objects is not forwarded you should filter out bucket notifications delivered to the forwarder: +- S3 bucket notifications can be filtered by object [prefix or suffix](https://docs.aws.amazon.com/AmazonS3/latest/userguide/notification-how-to-filtering.html) +- SNS topic subscriptions allow for [event filtering](https://docs.aws.amazon.com/sns/latest/dg/sns-message-filtering.html) +- EventBridge supports filtering events according to [event patterns](https://docs.aws.amazon.com/eventbridge/latest/userguide/eb-event-patterns.html) + +Given it is not always possible to filter events at their source, the Forwarder function restricts processing to objects that match a set of key patterns provided through the `SourceObjectKeys` parameter.This set of patterns supports any valid S3 object wildcard. For example, to only ingest data for a subset of account IDs that dump data to a logging account, we could set `SourceObjectKeys=*/AWSLogs/123456789012/*,*/AWSLogs/98987654321098/*`. + + ## HTTP destination For backward compatability, the forwarder supports sending data to an HTTPS endpoint. Every `s3:CopyObject` triggers an `s3:GetObject` from the source. The source file is converted into newline delimited JSON and submitted over one or more HTTP POST requests. By default, a request body will not exceed 10MB when uncompressed. diff --git a/integration/tests/forwarder_s3.tftest.hcl b/integration/tests/forwarder_s3.tftest.hcl index d530ce4d..4c4a818f 100644 --- a/integration/tests/forwarder_s3.tftest.hcl +++ b/integration/tests/forwarder_s3.tftest.hcl @@ -35,6 +35,7 @@ run "install_forwarder" { parameters = { DestinationUri = "s3://${run.target_bucket.id}/" SourceBucketNames = "${join(",", [for k, v in run.sources.buckets : v.id])}" + SourceObjectKeys = "*/allowed/*" SourceTopicArns = "arn:aws:sns:${run.setup.region}:${run.setup.account_id}:*" NameOverride = run.setup.id } @@ -66,8 +67,9 @@ run "check_sqs" { variables { command = "./scripts/check_object_diff" env_vars = { - SOURCE = run.sources.buckets["sqs"].id - DESTINATION = run.target_bucket.id + SOURCE = run.sources.buckets["sqs"].id + DESTINATION = run.target_bucket.id + OBJECT_PREFIX = "test/allowed/" } } @@ -76,3 +78,23 @@ run "check_sqs" { error_message = "Failed to copy object using SQS" } } + +run "check_disallowed" { + module { + source = "observeinc/collection/aws//modules/testing/exec" + version = "2.9.0" + } + + variables { + command = "./scripts/check_object_diff" + env_vars = { + SOURCE = run.sources.buckets["sqs"].id + DESTINATION = run.target_bucket.id + } + } + + assert { + condition = output.exitcode != 0 + error_message = "Succeeded copying object not in source object keys" + } +} diff --git a/pkg/handler/forwarder/config.go b/pkg/handler/forwarder/config.go index 176021be..dfb14763 100644 --- a/pkg/handler/forwarder/config.go +++ b/pkg/handler/forwarder/config.go @@ -9,6 +9,7 @@ import ( var ( ErrInvalidDestination = errors.New("invalid destination URI") + ErrInvalidFilter = errors.New("invalid source filter") ErrMissingS3Client = errors.New("missing S3 client") ErrPresetNotFound = errors.New("not found") ) @@ -17,6 +18,7 @@ type Config struct { DestinationURI string // S3 URI to write messages and copy files to MaxFileSize int64 // maximum file size in bytes for the files to be processed SourceBucketNames []string + SourceObjectKeys []string Override Override S3Client S3Client GetTime func() *time.Time @@ -39,6 +41,10 @@ func (c *Config) Validate() error { } } + if _, err := NewObjectFilter(c.SourceBucketNames, c.SourceObjectKeys); err != nil { + errs = append(errs, fmt.Errorf("%w: %w", ErrInvalidFilter, err)) + } + if c.S3Client == nil { errs = append(errs, ErrMissingS3Client) } diff --git a/pkg/handler/forwarder/config_test.go b/pkg/handler/forwarder/config_test.go index cffcdf41..a45e4280 100644 --- a/pkg/handler/forwarder/config_test.go +++ b/pkg/handler/forwarder/config_test.go @@ -35,6 +35,14 @@ func TestConfig(t *testing.T) { }, ExpectError: forwarder.ErrInvalidDestination, }, + { + Config: forwarder.Config{ + DestinationURI: "https://example.com", + S3Client: &awstest.S3Client{}, + SourceBucketNames: []string{"bucket*"}, + SourceObjectKeys: []string{"*/te?t/*"}, + }, + }, } for i, tc := range testcases { diff --git a/pkg/handler/forwarder/filter.go b/pkg/handler/forwarder/filter.go new file mode 100644 index 00000000..144525a9 --- /dev/null +++ b/pkg/handler/forwarder/filter.go @@ -0,0 +1,44 @@ +package forwarder + +import ( + "fmt" + "regexp" +) + +var globOperators = regexp.MustCompile(`(\*|\?)`) + +// ObjectFilter verifies if object is intended for processing +type ObjectFilter struct { + filters []*regexp.Regexp +} + +// Allow verifies if object source should be accessed +func (o *ObjectFilter) Allow(source string) bool { + for _, re := range o.filters { + if re.MatchString(source) { + return true + } + } + return false +} + +// NewObjectFilter initializes an ObjectFilter. +// This function will error if any bucket or object pattern are not valid glob expressions. +func NewObjectFilter(names, keys []string) (*ObjectFilter, error) { + var obj ObjectFilter + // TODO: for simplicity we compute the cross product of regular expressions. It + // would be more efficient to verify buckets and object key separately, but + // we don't expect either list to be very long. + + for _, name := range names { + for _, key := range keys { + source := name + "/" + key + re, err := regexp.Compile(globOperators.ReplaceAllString(source, ".$1")) + if err != nil { + return nil, fmt.Errorf("failed to compile %s: %w", source, err) + } + obj.filters = append(obj.filters, re) + } + } + return &obj, nil +} diff --git a/pkg/handler/forwarder/handler.go b/pkg/handler/forwarder/handler.go index 2f52b63f..54f98210 100644 --- a/pkg/handler/forwarder/handler.go +++ b/pkg/handler/forwarder/handler.go @@ -8,7 +8,6 @@ import ( "fmt" "io" "net/url" - "path/filepath" "strings" "time" @@ -37,13 +36,13 @@ type Override interface { type Handler struct { handler.Mux - MaxFileSize int64 - DestinationURI *url.URL - S3Client S3Client - Override Override - SourceBucketNames []string - Now func() time.Time - MaxConcurrency int + MaxFileSize int64 + DestinationURI *url.URL + S3Client S3Client + Override Override + ObjectPolicy interface{ Allow(string) bool } + Now func() time.Time + MaxConcurrency int } // GetCopyObjectInput constructs the input struct for CopyObject. @@ -131,10 +130,13 @@ func (h *Handler) ProcessRecord(ctx context.Context, record *events.SQSMessage) continue } - if !h.isBucketAllowed(sourceURL.Host) { - logger.Info("Received event from a bucket not in the allowed list; skipping", "bucket", sourceURL.Host) + copyInput := GetCopyObjectInput(sourceURL, h.DestinationURI) + + if !h.ObjectPolicy.Allow(aws.ToString(copyInput.CopySource)) { + logger.Info("Ignoring object not in allowed sources", "bucket", copyInput.Bucket, "key", copyInput.Key) continue } + if copyRecord.Size != nil && h.MaxFileSize > 0 && *copyRecord.Size > h.MaxFileSize { logger.V(1).Info("object size exceeds the maximum file size limit; skipping copy", "max", h.MaxFileSize, "size", *copyRecord.Size, "uri", copyRecord.URI) @@ -142,8 +144,6 @@ func (h *Handler) ProcessRecord(ctx context.Context, record *events.SQSMessage) continue } - copyInput := GetCopyObjectInput(sourceURL, h.DestinationURI) - if h.Override != nil { if h.Override.Apply(ctx, copyInput) && copyInput.Key == nil { logger.V(6).Info("ignoring object") @@ -210,16 +210,6 @@ func (h *Handler) Handle(ctx context.Context, request events.SQSEvent) (response return } -// isBucketAllowed checks if the given bucket is in the allowed list or matches a pattern. -func (h *Handler) isBucketAllowed(bucket string) bool { - for _, pattern := range h.SourceBucketNames { - if match, _ := filepath.Match(pattern, bucket); match { - return true - } - } - return false -} - func New(cfg *Config) (h *Handler, err error) { if err := cfg.Validate(); err != nil { return nil, err @@ -227,14 +217,16 @@ func New(cfg *Config) (h *Handler, err error) { u, _ := url.ParseRequestURI(cfg.DestinationURI) + objectFilter, _ := NewObjectFilter(cfg.SourceBucketNames, cfg.SourceObjectKeys) + h = &Handler{ - DestinationURI: u, - S3Client: cfg.S3Client, - MaxFileSize: cfg.MaxFileSize, - Override: cfg.Override, - SourceBucketNames: cfg.SourceBucketNames, - Now: time.Now, - MaxConcurrency: cfg.MaxConcurrency, + DestinationURI: u, + S3Client: cfg.S3Client, + MaxFileSize: cfg.MaxFileSize, + Override: cfg.Override, + ObjectPolicy: objectFilter, + Now: time.Now, + MaxConcurrency: cfg.MaxConcurrency, } return h, nil diff --git a/pkg/handler/forwarder/handler_test.go b/pkg/handler/forwarder/handler_test.go index 5cefff8e..f32a660f 100644 --- a/pkg/handler/forwarder/handler_test.go +++ b/pkg/handler/forwarder/handler_test.go @@ -123,6 +123,7 @@ func TestHandler(t *testing.T) { MaxFileSize: 20, DestinationURI: "s3://my-bucket", SourceBucketNames: []string{"observeinc*"}, + SourceObjectKeys: []string{"*"}, S3Client: &awstest.S3Client{ CopyObjectFunc: func(_ context.Context, _ *s3.CopyObjectInput, _ ...func(*s3.Options)) (*s3.CopyObjectOutput, error) { return nil, nil @@ -141,6 +142,7 @@ func TestHandler(t *testing.T) { MaxFileSize: 1, DestinationURI: "s3://my-bucket", SourceBucketNames: []string{"observeinc*"}, + SourceObjectKeys: []string{"*"}, S3Client: &awstest.S3Client{ CopyObjectFunc: func(_ context.Context, _ *s3.CopyObjectInput, _ ...func(*s3.Options)) (*s3.CopyObjectOutput, error) { return nil, nil @@ -158,6 +160,7 @@ func TestHandler(t *testing.T) { Config: forwarder.Config{ DestinationURI: "s3://my-bucket", SourceBucketNames: []string{"observeinc*"}, + SourceObjectKeys: []string{"*"}, S3Client: &awstest.S3Client{ CopyObjectFunc: func(_ context.Context, _ *s3.CopyObjectInput, _ ...func(*s3.Options)) (*s3.CopyObjectOutput, error) { return nil, errSentinel @@ -177,6 +180,7 @@ func TestHandler(t *testing.T) { Config: forwarder.Config{ DestinationURI: "s3://my-bucket", SourceBucketNames: []string{"observeinc*"}, + SourceObjectKeys: []string{"*"}, S3Client: &awstest.S3Client{ PutObjectFunc: func(_ context.Context, _ *s3.PutObjectInput, _ ...func(*s3.Options)) (*s3.PutObjectOutput, error) { return nil, errSentinel @@ -192,6 +196,25 @@ func TestHandler(t *testing.T) { Config: forwarder.Config{ DestinationURI: "s3://my-bucket", SourceBucketNames: []string{"doesntexist"}, + SourceObjectKeys: []string{"*"}, + S3Client: &awstest.S3Client{ + CopyObjectFunc: func(_ context.Context, _ *s3.CopyObjectInput, _ ...func(*s3.Options)) (*s3.CopyObjectOutput, error) { + return nil, nil + }, + }, + }, + ExpectedCopyCalls: 0, + ExpectResponse: events.SQSEventResponse{ + // Expect no batch item failures as the file should be skipped, not failed + }, + }, + { + // Source key isn't in source object keys + RequestFile: "testdata/event.json", + Config: forwarder.Config{ + DestinationURI: "s3://my-bucket", + SourceBucketNames: []string{"*"}, + SourceObjectKeys: []string{"nope"}, S3Client: &awstest.S3Client{ CopyObjectFunc: func(_ context.Context, _ *s3.CopyObjectInput, _ ...func(*s3.Options)) (*s3.CopyObjectOutput, error) { return nil, nil @@ -210,6 +233,7 @@ func TestHandler(t *testing.T) { MaxFileSize: 50, // Adjust size limit to allow the file to be copied DestinationURI: "s3://my-bucket", SourceBucketNames: []string{"doesntexist", "observeinc-filedrop-hoho-us-west-2-7xmjt"}, // List includes the exact bucket name + SourceObjectKeys: []string{"ds*"}, S3Client: &awstest.S3Client{ CopyObjectFunc: func(_ context.Context, _ *s3.CopyObjectInput, _ ...func(*s3.Options)) (*s3.CopyObjectOutput, error) { return nil, nil // Mock successful copy @@ -227,6 +251,7 @@ func TestHandler(t *testing.T) { Config: forwarder.Config{ DestinationURI: "s3://my-bucket", SourceBucketNames: []string{"observeinc*"}, + SourceObjectKeys: []string{"*"}, S3Client: &awstest.S3Client{ CopyObjectFunc: func(_ context.Context, _ *s3.CopyObjectInput, _ ...func(*s3.Options)) (*s3.CopyObjectOutput, error) { return nil, nil // Mock successful copy @@ -244,6 +269,7 @@ func TestHandler(t *testing.T) { Config: forwarder.Config{ DestinationURI: "s3://my-bucket", SourceBucketNames: []string{"observeinc*"}, + SourceObjectKeys: []string{"*"}, S3Client: &awstest.S3Client{ CopyObjectFunc: func(_ context.Context, _ *s3.CopyObjectInput, _ ...func(*s3.Options)) (*s3.CopyObjectOutput, error) { return nil, errSentinel diff --git a/pkg/lambda/forwarder/lambda.go b/pkg/lambda/forwarder/lambda.go index adb0a75e..e5abdba5 100644 --- a/pkg/lambda/forwarder/lambda.go +++ b/pkg/lambda/forwarder/lambda.go @@ -37,6 +37,7 @@ type Config struct { ContentTypeOverrides []*override.Rule `env:"CONTENT_TYPE_OVERRIDES"` PresetOverrides []string `env:"PRESET_OVERRIDES,default=aws/v1,infer/v1"` SourceBucketNames []string `env:"SOURCE_BUCKET_NAMES"` + SourceObjectKeys []string `env:"SOURCE_OBJECT_KEYS"` Logging *logging.Config @@ -140,6 +141,7 @@ func New(ctx context.Context, cfg *Config) (*Lambda, error) { S3Client: s3Client, Override: append(override.Sets{customOverrides}, presets...), SourceBucketNames: cfg.SourceBucketNames, + SourceObjectKeys: cfg.SourceObjectKeys, }) if err != nil { return nil, fmt.Errorf("failed to create handler: %w", err)