From a7790b5fa1c25789664484a88671d349b0524831 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jo=C3=A3o=20Taveira=20Ara=C3=BAjo?= Date: Thu, 5 Oct 2023 08:58:57 -0700 Subject: [PATCH] fix: allow configurable forwarder log prefix The forwarder writes all SQS messages processed to a file in the destination S3 bucket. Make the S3 prefix we use configurable (default is `forwarder/`). I also took the opportunity to use more precise language: SQS handles messages, not events. As a result, we are logging messages, rather than "recording results". I also fixed a few lint issues raised by golangci-lint which had slipped when committing straight to master. --- apps/forwarder/README.md | 2 +- cmd/forwarder/main.go | 8 ++++-- handlers/forwarder/config.go | 3 +- handlers/forwarder/handler.go | 14 +++++---- handlers/forwarder/handler_test.go | 46 ++++++++++++++++++++++++++++++ handlers/forwarder/message.go | 4 +-- 6 files changed, 65 insertions(+), 12 deletions(-) diff --git a/apps/forwarder/README.md b/apps/forwarder/README.md index 680cc3b7..34c3167f 100644 --- a/apps/forwarder/README.md +++ b/apps/forwarder/README.md @@ -4,7 +4,7 @@ This serverless application forwards data to an Observe FileDrop. Data is forwarded by a lambda in two ways: - any `s3:ObjectCreated` events trigger a copy from the source bucket to the destination bucket. -- all events read out of an SQS queue are written to a file in the destination bucket. +- all messages read out of an SQS queue are written to a file in the destination bucket. You can use the Observe Forwarder as a cost effective means of loading files into Observe or for exporting event streams such as EventBridge or SNS data. diff --git a/cmd/forwarder/main.go b/cmd/forwarder/main.go index f9798884..965baede 100644 --- a/cmd/forwarder/main.go +++ b/cmd/forwarder/main.go @@ -18,11 +18,14 @@ import ( var env struct { DestinationURI string `env:"DESTINATION_URI,required"` + LogPrefix string `env:"LOG_PREFIX,default=forwarder/"` Verbosity int `env:"VERBOSITY,default=1"` } -var logger logr.Logger -var handler *forwarder.Handler +var ( + logger logr.Logger + handler *forwarder.Handler +) func init() { if err := realInit(); err != nil { @@ -51,6 +54,7 @@ func realInit() error { handler, err = forwarder.New(&forwarder.Config{ DestinationURI: env.DestinationURI, + LogPrefix: env.LogPrefix, S3Client: s3client, Logger: &logger, }) diff --git a/handlers/forwarder/config.go b/handlers/forwarder/config.go index 33b6c9c5..e7aeb5a6 100644 --- a/handlers/forwarder/config.go +++ b/handlers/forwarder/config.go @@ -14,7 +14,8 @@ var ( ) type Config struct { - DestinationURI string + DestinationURI string // S3 URI to write messages and copy files to + LogPrefix string // prefix used when writing SQS messages to S3 S3Client S3Client Logger *logr.Logger } diff --git a/handlers/forwarder/handler.go b/handlers/forwarder/handler.go index 2d68a047..25327b69 100644 --- a/handlers/forwarder/handler.go +++ b/handlers/forwarder/handler.go @@ -26,6 +26,7 @@ type S3Client interface { type Handler struct { DestinationURI *url.URL + LogPrefix string S3Client S3Client Logger logr.Logger } @@ -50,12 +51,12 @@ func GetCopyObjectInput(source, destination *url.URL) *s3.CopyObjectInput { } } -func GetRecordInput(lctx *lambdacontext.LambdaContext, destination *url.URL, r io.Reader) *s3.PutObjectInput { +func GetLogInput(lctx *lambdacontext.LambdaContext, prefix string, destination *url.URL, r io.Reader) *s3.PutObjectInput { if lctx == nil || destination == nil { return nil } - key := strings.TrimLeft(fmt.Sprintf("%s/forwarder/%s/%s", strings.Trim(destination.Path, "/"), lctx.InvokedFunctionArn, lctx.AwsRequestID), "/") + key := strings.TrimLeft(fmt.Sprintf("%s/%s%s/%s", strings.Trim(destination.Path, "/"), prefix, lctx.InvokedFunctionArn, lctx.AwsRequestID), "/") return &s3.PutObjectInput{ Bucket: &destination.Host, @@ -88,15 +89,15 @@ func (h *Handler) Handle(ctx context.Context, request events.SQSEvent) (response } }() - var records bytes.Buffer + var messages bytes.Buffer defer func() { if err == nil { - logger.V(3).Info("recording results") - _, err = h.S3Client.PutObject(ctx, GetRecordInput(lctx, h.DestinationURI, &records)) + logger.V(3).Info("logging messages") + _, err = h.S3Client.PutObject(ctx, GetLogInput(lctx, h.LogPrefix, h.DestinationURI, &messages)) } }() - encoder := json.NewEncoder(&records) + encoder := json.NewEncoder(&messages) for _, record := range request.Records { m := &SQSMessage{SQSMessage: record} @@ -128,6 +129,7 @@ func New(cfg *Config) (*Handler, error) { h := &Handler{ DestinationURI: u, + LogPrefix: cfg.LogPrefix, S3Client: cfg.S3Client, Logger: logr.Discard(), } diff --git a/handlers/forwarder/handler_test.go b/handlers/forwarder/handler_test.go index 27947a58..d2aeedc0 100644 --- a/handlers/forwarder/handler_test.go +++ b/handlers/forwarder/handler_test.go @@ -5,6 +5,7 @@ import ( "encoding/json" "errors" "fmt" + "io" "net/url" "os" "testing" @@ -188,3 +189,48 @@ func TestHandler(t *testing.T) { }) } } + +func TestRecorder(t *testing.T) { + t.Parallel() + + testcases := []struct { + Context *lambdacontext.LambdaContext + Prefix string + DestinationURI string + Expect *s3.PutObjectInput + }{ + { + Context: &lambdacontext.LambdaContext{ + InvokedFunctionArn: "arn:aws:lambda:us-east-1:123456789012:function:test", + AwsRequestID: "c8ee04d5-5925-541a-b113-5942a0fc5985", + }, + Prefix: "test/", + DestinationURI: "s3://my-bucket/path/to", + Expect: &s3.PutObjectInput{ + Bucket: aws.String("my-bucket"), + Key: aws.String("path/to/test/arn:aws:lambda:us-east-1:123456789012:function:test/c8ee04d5-5925-541a-b113-5942a0fc5985"), + ContentType: aws.String("application/x-ndjson"), + }, + }, + } + + for i, tc := range testcases { + tc := tc + t.Run(fmt.Sprintf("%d", i), func(t *testing.T) { + t.Parallel() + u, err := url.ParseRequestURI(tc.DestinationURI) + if err != nil { + t.Fatal(err) + } + + var body io.Reader + + tc.Expect.Body = body + got := forwarder.GetLogInput(tc.Context, tc.Prefix, u, body) + + if diff := cmp.Diff(got, tc.Expect, cmpopts.IgnoreUnexported(s3.PutObjectInput{})); diff != "" { + t.Fatal(diff) + } + }) + } +} diff --git a/handlers/forwarder/message.go b/handlers/forwarder/message.go index 13397ca4..48f20109 100644 --- a/handlers/forwarder/message.go +++ b/handlers/forwarder/message.go @@ -60,11 +60,11 @@ func processS3Event(message []byte) (uris []*url.URL) { } type CopyEvent struct { - Copy []CopyRecord `"json"` + Copy []CopyRecord `json:"copy"` } type CopyRecord struct { - URI string `"uri"` + URI string `json:"uri"` } func processCopyEvent(message []byte) (uris []*url.URL) {