Skip to content

Commit

Permalink
fix: allow configurable forwarder log prefix
Browse files Browse the repository at this point in the history
The forwarder writes all SQS messages processed to a file in the
destination S3 bucket. Make the S3 prefix we use configurable (default
is `forwarder/`).

I also took the opportunity to use more precise language: SQS handles
messages, not events. As a result, we are logging messages, rather than
"recording results". I also fixed a few lint issues raised by
golangci-lint which had slipped when committing straight to master.
  • Loading branch information
jta committed Oct 5, 2023
1 parent 907c834 commit a7790b5
Show file tree
Hide file tree
Showing 6 changed files with 65 additions and 12 deletions.
2 changes: 1 addition & 1 deletion apps/forwarder/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ This serverless application forwards data to an Observe FileDrop. Data is
forwarded by a lambda in two ways:

- any `s3:ObjectCreated` events trigger a copy from the source bucket to the destination bucket.
- all events read out of an SQS queue are written to a file in the destination bucket.
- all messages read out of an SQS queue are written to a file in the destination bucket.

You can use the Observe Forwarder as a cost effective means of loading files
into Observe or for exporting event streams such as EventBridge or SNS data.
Expand Down
8 changes: 6 additions & 2 deletions cmd/forwarder/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,14 @@ import (

var env struct {
DestinationURI string `env:"DESTINATION_URI,required"`
LogPrefix string `env:"LOG_PREFIX,default=forwarder/"`
Verbosity int `env:"VERBOSITY,default=1"`
}

var logger logr.Logger
var handler *forwarder.Handler
var (
logger logr.Logger
handler *forwarder.Handler
)

func init() {
if err := realInit(); err != nil {
Expand Down Expand Up @@ -51,6 +54,7 @@ func realInit() error {

handler, err = forwarder.New(&forwarder.Config{
DestinationURI: env.DestinationURI,
LogPrefix: env.LogPrefix,
S3Client: s3client,
Logger: &logger,
})
Expand Down
3 changes: 2 additions & 1 deletion handlers/forwarder/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,8 @@ var (
)

type Config struct {
DestinationURI string
DestinationURI string // S3 URI to write messages and copy files to
LogPrefix string // prefix used when writing SQS messages to S3
S3Client S3Client
Logger *logr.Logger
}
Expand Down
14 changes: 8 additions & 6 deletions handlers/forwarder/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ type S3Client interface {

type Handler struct {
DestinationURI *url.URL
LogPrefix string
S3Client S3Client
Logger logr.Logger
}
Expand All @@ -50,12 +51,12 @@ func GetCopyObjectInput(source, destination *url.URL) *s3.CopyObjectInput {
}
}

func GetRecordInput(lctx *lambdacontext.LambdaContext, destination *url.URL, r io.Reader) *s3.PutObjectInput {
func GetLogInput(lctx *lambdacontext.LambdaContext, prefix string, destination *url.URL, r io.Reader) *s3.PutObjectInput {
if lctx == nil || destination == nil {
return nil
}

key := strings.TrimLeft(fmt.Sprintf("%s/forwarder/%s/%s", strings.Trim(destination.Path, "/"), lctx.InvokedFunctionArn, lctx.AwsRequestID), "/")
key := strings.TrimLeft(fmt.Sprintf("%s/%s%s/%s", strings.Trim(destination.Path, "/"), prefix, lctx.InvokedFunctionArn, lctx.AwsRequestID), "/")

return &s3.PutObjectInput{
Bucket: &destination.Host,
Expand Down Expand Up @@ -88,15 +89,15 @@ func (h *Handler) Handle(ctx context.Context, request events.SQSEvent) (response
}
}()

var records bytes.Buffer
var messages bytes.Buffer
defer func() {
if err == nil {
logger.V(3).Info("recording results")
_, err = h.S3Client.PutObject(ctx, GetRecordInput(lctx, h.DestinationURI, &records))
logger.V(3).Info("logging messages")
_, err = h.S3Client.PutObject(ctx, GetLogInput(lctx, h.LogPrefix, h.DestinationURI, &messages))
}
}()

encoder := json.NewEncoder(&records)
encoder := json.NewEncoder(&messages)

for _, record := range request.Records {
m := &SQSMessage{SQSMessage: record}
Expand Down Expand Up @@ -128,6 +129,7 @@ func New(cfg *Config) (*Handler, error) {

h := &Handler{
DestinationURI: u,
LogPrefix: cfg.LogPrefix,
S3Client: cfg.S3Client,
Logger: logr.Discard(),
}
Expand Down
46 changes: 46 additions & 0 deletions handlers/forwarder/handler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"encoding/json"
"errors"
"fmt"
"io"
"net/url"
"os"
"testing"
Expand Down Expand Up @@ -188,3 +189,48 @@ func TestHandler(t *testing.T) {
})
}
}

func TestRecorder(t *testing.T) {
t.Parallel()

testcases := []struct {
Context *lambdacontext.LambdaContext
Prefix string
DestinationURI string
Expect *s3.PutObjectInput
}{
{
Context: &lambdacontext.LambdaContext{
InvokedFunctionArn: "arn:aws:lambda:us-east-1:123456789012:function:test",
AwsRequestID: "c8ee04d5-5925-541a-b113-5942a0fc5985",
},
Prefix: "test/",
DestinationURI: "s3://my-bucket/path/to",
Expect: &s3.PutObjectInput{
Bucket: aws.String("my-bucket"),
Key: aws.String("path/to/test/arn:aws:lambda:us-east-1:123456789012:function:test/c8ee04d5-5925-541a-b113-5942a0fc5985"),
ContentType: aws.String("application/x-ndjson"),
},
},
}

for i, tc := range testcases {
tc := tc
t.Run(fmt.Sprintf("%d", i), func(t *testing.T) {
t.Parallel()
u, err := url.ParseRequestURI(tc.DestinationURI)
if err != nil {
t.Fatal(err)
}

var body io.Reader

tc.Expect.Body = body
got := forwarder.GetLogInput(tc.Context, tc.Prefix, u, body)

if diff := cmp.Diff(got, tc.Expect, cmpopts.IgnoreUnexported(s3.PutObjectInput{})); diff != "" {
t.Fatal(diff)
}
})
}
}
4 changes: 2 additions & 2 deletions handlers/forwarder/message.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,11 +60,11 @@ func processS3Event(message []byte) (uris []*url.URL) {
}

type CopyEvent struct {
Copy []CopyRecord `"json"`
Copy []CopyRecord `json:"copy"`
}

type CopyRecord struct {
URI string `"uri"`
URI string `json:"uri"`
}

func processCopyEvent(message []byte) (uris []*url.URL) {
Expand Down

0 comments on commit a7790b5

Please sign in to comment.