Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(forwarder): add SourceObjectKeys #308

Merged
merged 1 commit into from
Jun 18, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
40 changes: 34 additions & 6 deletions apps/forwarder/template.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ Metadata:
default: Data Sources
Parameters:
- SourceBucketNames
- SourceObjectKeys
- SourceTopicArns
- SourceKMSKeyArns
- ContentTypeOverrides
Expand Down Expand Up @@ -69,6 +70,12 @@ Parameters:
to the forwarder.
Default: ''
AllowedPattern: "^[a-z0-9-.]*(\\*)?$"
SourceObjectKeys:
Type: CommaDelimitedList
Description: >-
A list of object keys which the forwarder should process. This list
applies across all source buckets, and supports wildcards.
Default: '*'
SourceTopicArns:
Type: CommaDelimitedList
Description: >-
Expand Down Expand Up @@ -154,11 +161,17 @@ Conditions:
UseDefaultVerbosity: !Equals
- !Ref Verbosity
- ''
DisableSourceS3: !Equals
- !Join
DisableSourceS3: !Or
- !Equals
- !Join
- ''
- !Ref SourceBucketNames
- ''
- !Equals
- !Join
- ''
- !Ref SourceObjectKeys
- ''
- !Ref SourceBucketNames
- ''
EnableSourceS3: !Not
- !Condition DisableSourceS3
DisableKMSDecrypt: !Equals
Expand Down Expand Up @@ -244,11 +257,15 @@ Resources:
{
"source": ["aws.s3"],
"detail-type": ["Object Created"],
"detail.bucket.name": [{"wildcard": "${wildcards}"}]
"detail.bucket.name": [{"wildcard": "${buckets}"}],
"detail.object.key": [{"wildcard": "${objects}"}]
}
- wildcards: !Join
- buckets: !Join
- '"}, {"wildcard":"'
- !Ref SourceBucketNames
objects: !Join
- '"}, {"wildcard":"'
- !Ref SourceObjectKeys
Targets:
- Arn: !GetAtt Queue.Arn
Id: "Forwarder"
Expand Down Expand Up @@ -343,6 +360,14 @@ Resources:
Action:
- s3:GetObject
- s3:GetObjectTagging
# NOTE: ideally we'd filter the resource list to the set of
# source object keys. That would require taking the cross
# product of SourceBucketNames and SourceObjectKeys, which
# can not be natively expressed in CloudFormation.
#
# We rely instead on filtering within the Lambda function,
# assisted by filtering at the event subscription layer to
# reduce the chances of false positives.
Resource: !Split
- ","
- !Sub
Expand Down Expand Up @@ -435,6 +460,9 @@ Resources:
SOURCE_BUCKET_NAMES: !Join
- ','
- !Ref SourceBucketNames
SOURCE_OBJECT_KEYS: !Join
- ','
- !Ref SourceObjectKeys
OTEL_EXPORTER_OTLP_ENDPOINT: !Ref DebugEndpoint
OTEL_TRACES_EXPORTER: !If [DisableOTEL, "none", "otlp"]
Outputs:
Expand Down
14 changes: 13 additions & 1 deletion docs/forwarder.md
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ The forwarder stack can be configured with the following parameters:
| `DataAccessPointArn` | String | The access point ARN for your Filedrop. |
| `NameOverride` | String | Name of IAM role expected by Filedrop. This name will also be applied to the SQS Queue and Lambda Function processing events. In the absence of a value, the stack name will be used. |
| `SourceBucketNames` | CommaDelimitedList | A list of bucket names which the forwarder is allowed to read from. This list only affects permissions, and supports wildcards. In order to have files copied to Filedrop, you must also subscribe S3 Bucket Notifications to the forwarder. |
| `SourceObjectKeys` | CommaDelimitedList | A list of object keys which the forwarder should process. This list applies across all source buckets, and supports wildcards. |
| `SourceTopicArns` | CommaDelimitedList | A list of SNS topics the forwarder is allowed to be subscribed to. |
| `SourceKMSKeyArns` | CommaDelimitedList | A list of KMS Key ARNs the forwarder is allowed to use to decrypt objects in S3. |
| `ContentTypeOverrides` | CommaDelimitedList | A list of key value pairs. The key is a regular expression which is applied to the S3 source (<bucket>/<key>) of forwarded files. The value is the content type to set for matching files. For example, `\.json$=application/x-ndjson` would forward all files ending in `.json` as newline delimited JSON files. |
Expand Down Expand Up @@ -78,7 +79,8 @@ These parameters must be used to configure the Forwarder stack:
To forward files from an S3 bucket to the Filedrop:

1. Include the bucket name in `SourceBucketNames` or use a wildcard pattern.
2. Configure S3 Event Notifications to trigger the Forwarder's SQS queue.
2. Ensure any object key you want to forward matches the patterns in `SourceObjectKeys`.
3. Configure S3 Event Notifications to trigger the Forwarder's SQS queue.

**Note**: The Forwarder stack does not manage source buckets. You must manually set up the event notifications using one of the following methods:

Expand Down Expand Up @@ -154,6 +156,16 @@ In order to grant the Forwarder lambda function permission to use the KMS key fo
1. **Update your Forwarder stack**: include your KMS Key ARN in `SourceKMSKeyArns` in your forwarder stack.
2. **Update your KMS key policy**: your key policy must grant the Forwarder Lambda function permission to call `kms:Decrypt`. The [default KMS key policy](https://docs.aws.amazon.com/kms/latest/developerguide/key-policy-default.html) is sufficient to satisfy this constraint, since it will delegate access to the KMS key to IAM.

## Filtering Object Keys

The forwarder will only attempt to forward files for which it receives events. As a result, to ensure a subset of objects is not forwarded you should filter out bucket notifications delivered to the forwarder:
- S3 bucket notifications can be filtered by object [prefix or suffix](https://docs.aws.amazon.com/AmazonS3/latest/userguide/notification-how-to-filtering.html)
- SNS topic subscriptions allow for [event filtering](https://docs.aws.amazon.com/sns/latest/dg/sns-message-filtering.html)
- EventBridge supports filtering events according to [event patterns](https://docs.aws.amazon.com/eventbridge/latest/userguide/eb-event-patterns.html)

Given it is not always possible to filter events at their source, the Forwarder function restricts processing to objects that match a set of key patterns provided through the `SourceObjectKeys` parameter.This set of patterns supports any valid S3 object wildcard. For example, to only ingest data for a subset of account IDs that dump data to a logging account, we could set `SourceObjectKeys=*/AWSLogs/123456789012/*,*/AWSLogs/98987654321098/*`.


## HTTP destination

For backward compatability, the forwarder supports sending data to an HTTPS endpoint. Every `s3:CopyObject` triggers an `s3:GetObject` from the source. The source file is converted into newline delimited JSON and submitted over one or more HTTP POST requests. By default, a request body will not exceed 10MB when uncompressed.
Expand Down
26 changes: 24 additions & 2 deletions integration/tests/forwarder_s3.tftest.hcl
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ run "install_forwarder" {
parameters = {
DestinationUri = "s3://${run.target_bucket.id}/"
SourceBucketNames = "${join(",", [for k, v in run.sources.buckets : v.id])}"
SourceObjectKeys = "*/allowed/*"
SourceTopicArns = "arn:aws:sns:${run.setup.region}:${run.setup.account_id}:*"
NameOverride = run.setup.id
}
Expand Down Expand Up @@ -66,8 +67,9 @@ run "check_sqs" {
variables {
command = "./scripts/check_object_diff"
env_vars = {
SOURCE = run.sources.buckets["sqs"].id
DESTINATION = run.target_bucket.id
SOURCE = run.sources.buckets["sqs"].id
DESTINATION = run.target_bucket.id
OBJECT_PREFIX = "test/allowed/"
}
}

Expand All @@ -76,3 +78,23 @@ run "check_sqs" {
error_message = "Failed to copy object using SQS"
}
}

run "check_disallowed" {
module {
source = "observeinc/collection/aws//modules/testing/exec"
version = "2.9.0"
}

variables {
command = "./scripts/check_object_diff"
env_vars = {
SOURCE = run.sources.buckets["sqs"].id
DESTINATION = run.target_bucket.id
}
}

assert {
condition = output.exitcode != 0
error_message = "Succeeded copying object not in source object keys"
}
}
6 changes: 6 additions & 0 deletions pkg/handler/forwarder/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (

var (
ErrInvalidDestination = errors.New("invalid destination URI")
ErrInvalidFilter = errors.New("invalid source filter")
ErrMissingS3Client = errors.New("missing S3 client")
ErrPresetNotFound = errors.New("not found")
)
Expand All @@ -17,6 +18,7 @@ type Config struct {
DestinationURI string // S3 URI to write messages and copy files to
MaxFileSize int64 // maximum file size in bytes for the files to be processed
SourceBucketNames []string
SourceObjectKeys []string
Override Override
S3Client S3Client
GetTime func() *time.Time
Expand All @@ -39,6 +41,10 @@ func (c *Config) Validate() error {
}
}

if _, err := NewObjectFilter(c.SourceBucketNames, c.SourceObjectKeys); err != nil {
errs = append(errs, fmt.Errorf("%w: %w", ErrInvalidFilter, err))
}

if c.S3Client == nil {
errs = append(errs, ErrMissingS3Client)
}
Expand Down
8 changes: 8 additions & 0 deletions pkg/handler/forwarder/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,14 @@ func TestConfig(t *testing.T) {
},
ExpectError: forwarder.ErrInvalidDestination,
},
{
Config: forwarder.Config{
DestinationURI: "https://example.com",
S3Client: &awstest.S3Client{},
SourceBucketNames: []string{"bucket*"},
SourceObjectKeys: []string{"*/te?t/*"},
},
},
}

for i, tc := range testcases {
Expand Down
44 changes: 44 additions & 0 deletions pkg/handler/forwarder/filter.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
package forwarder

import (
"fmt"
"regexp"
)

var globOperators = regexp.MustCompile(`(\*|\?)`)

// ObjectFilter verifies if object is intended for processing
type ObjectFilter struct {
filters []*regexp.Regexp
}

// Allow verifies if object source should be accessed
func (o *ObjectFilter) Allow(source string) bool {
for _, re := range o.filters {
if re.MatchString(source) {
return true
}
}
return false
}

// NewObjectFilter initializes an ObjectFilter.
// This function will error if any bucket or object pattern are not valid glob expressions.
func NewObjectFilter(names, keys []string) (*ObjectFilter, error) {
var obj ObjectFilter
// TODO: for simplicity we compute the cross product of regular expressions. It
// would be more efficient to verify buckets and object key separately, but
// we don't expect either list to be very long.

for _, name := range names {
for _, key := range keys {
source := name + "/" + key
re, err := regexp.Compile(globOperators.ReplaceAllString(source, ".$1"))
if err != nil {
return nil, fmt.Errorf("failed to compile %s: %w", source, err)
}
obj.filters = append(obj.filters, re)
}
}
return &obj, nil
}
50 changes: 21 additions & 29 deletions pkg/handler/forwarder/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@ import (
"fmt"
"io"
"net/url"
"path/filepath"
"strings"
"time"

Expand Down Expand Up @@ -37,13 +36,13 @@ type Override interface {

type Handler struct {
handler.Mux
MaxFileSize int64
DestinationURI *url.URL
S3Client S3Client
Override Override
SourceBucketNames []string
Now func() time.Time
MaxConcurrency int
MaxFileSize int64
DestinationURI *url.URL
S3Client S3Client
Override Override
ObjectPolicy interface{ Allow(string) bool }
Now func() time.Time
MaxConcurrency int
}

// GetCopyObjectInput constructs the input struct for CopyObject.
Expand Down Expand Up @@ -131,19 +130,20 @@ func (h *Handler) ProcessRecord(ctx context.Context, record *events.SQSMessage)
continue
}

if !h.isBucketAllowed(sourceURL.Host) {
logger.Info("Received event from a bucket not in the allowed list; skipping", "bucket", sourceURL.Host)
copyInput := GetCopyObjectInput(sourceURL, h.DestinationURI)

if !h.ObjectPolicy.Allow(aws.ToString(copyInput.CopySource)) {
logger.Info("Ignoring object not in allowed sources", "bucket", copyInput.Bucket, "key", copyInput.Key)
continue
}

if copyRecord.Size != nil && h.MaxFileSize > 0 && *copyRecord.Size > h.MaxFileSize {
logger.V(1).Info("object size exceeds the maximum file size limit; skipping copy",
"max", h.MaxFileSize, "size", *copyRecord.Size, "uri", copyRecord.URI)
// Log a warning and skip this object by continuing to the next iteration
continue
}

copyInput := GetCopyObjectInput(sourceURL, h.DestinationURI)

if h.Override != nil {
if h.Override.Apply(ctx, copyInput) && copyInput.Key == nil {
logger.V(6).Info("ignoring object")
Expand Down Expand Up @@ -210,31 +210,23 @@ func (h *Handler) Handle(ctx context.Context, request events.SQSEvent) (response
return
}

// isBucketAllowed checks if the given bucket is in the allowed list or matches a pattern.
func (h *Handler) isBucketAllowed(bucket string) bool {
for _, pattern := range h.SourceBucketNames {
if match, _ := filepath.Match(pattern, bucket); match {
return true
}
}
return false
}

func New(cfg *Config) (h *Handler, err error) {
if err := cfg.Validate(); err != nil {
return nil, err
}

u, _ := url.ParseRequestURI(cfg.DestinationURI)

objectFilter, _ := NewObjectFilter(cfg.SourceBucketNames, cfg.SourceObjectKeys)

h = &Handler{
DestinationURI: u,
S3Client: cfg.S3Client,
MaxFileSize: cfg.MaxFileSize,
Override: cfg.Override,
SourceBucketNames: cfg.SourceBucketNames,
Now: time.Now,
MaxConcurrency: cfg.MaxConcurrency,
DestinationURI: u,
S3Client: cfg.S3Client,
MaxFileSize: cfg.MaxFileSize,
Override: cfg.Override,
ObjectPolicy: objectFilter,
Now: time.Now,
MaxConcurrency: cfg.MaxConcurrency,
}

return h, nil
Expand Down
Loading