Skip to content

Commit

Permalink
Merge branch 'main' into apigateway-websocket
Browse files Browse the repository at this point in the history
  • Loading branch information
bmoffatt authored Apr 16, 2024
2 parents 2da54a0 + de51f68 commit 9acd9d1
Show file tree
Hide file tree
Showing 9 changed files with 229 additions and 12 deletions.
2 changes: 1 addition & 1 deletion events/README_S3_Object_Lambda.md
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ import (
)

func handler(ctx context.Context, event events.S3ObjectLambdaEvent) error {
url := event.GetObjectContext.InputS3Url
url := event.GetObjectContext.InputS3URL
resp, err := http.Get(url)
if err != nil {
return err
Expand Down
22 changes: 22 additions & 0 deletions events/s3_batch_job.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,28 @@ type S3BatchJobTask struct {
S3BucketARN string `json:"s3BucketArn"`
}

// S3BatchJobEventV2 encapsulates the detail of a s3 batch job
type S3BatchJobEventV2 struct {
InvocationSchemaVersion string `json:"invocationSchemaVersion"`
InvocationID string `json:"invocationId"`
Job S3BatchJobV2 `json:"job"`
Tasks []S3BatchJobTaskV2 `json:"tasks"`
}

// S3BatchJobV2 whichs have the job id
type S3BatchJobV2 struct {
ID string `json:"id"`
UserArguments map[string]string `json:"userArguments"`
}

// S3BatchJobTaskV2 represents one task in the s3 batch job and have all task details
type S3BatchJobTaskV2 struct {
TaskID string `json:"taskId"`
S3Key string `json:"s3Key"`
S3VersionID string `json:"s3VersionId"`
S3Bucket string `json:"s3Bucket"`
}

// S3BatchJobResponse is the response of a iven s3 batch job with the results
type S3BatchJobResponse struct {
InvocationSchemaVersion string `json:"invocationSchemaVersion"`
Expand Down
22 changes: 21 additions & 1 deletion events/s3_batch_job_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ import (
func TestS3BatchJobEventMarshaling(t *testing.T) {

// 1. read JSON from file
inputJSON := test.ReadJSONFromFile(t, "./testdata/s3-batch-job-event-request.json")
inputJSON := test.ReadJSONFromFile(t, "./testdata/s3-batch-job-event-request-1.0.json")

// 2. de-serialize into Go object
var inputEvent S3BatchJobEvent
Expand All @@ -31,6 +31,26 @@ func TestS3BatchJobEventMarshaling(t *testing.T) {
assert.JSONEq(t, string(inputJSON), string(outputJSON))
}

func TestS3BatchJobEventV2Marshaling(t *testing.T) {
// 1. read JSON from file
inputJSON := test.ReadJSONFromFile(t, "./testdata/s3-batch-job-event-request-2.0.json")

// 2. de-serialize into Go object
var inputEvent S3BatchJobEventV2
if err := json.Unmarshal(inputJSON, &inputEvent); err != nil {
t.Errorf("could not unmarshal event. details: %v", err)
}

// 3. serialize to JSON
outputJSON, err := json.Marshal(inputEvent)
if err != nil {
t.Errorf("could not marshal event. details: %v", err)
}

// 4. check result
assert.JSONEq(t, string(inputJSON), string(outputJSON))
}

func TestS3BatchJobResponseMarshaling(t *testing.T) {

// 1. read JSON from file
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,4 +12,4 @@
"s3BucketArn": "arn:aws:s3:us-east-1:0123456788:awsexamplebucket"
}
]
}
}
19 changes: 19 additions & 0 deletions events/testdata/s3-batch-job-event-request-2.0.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
{
"invocationSchemaVersion": "2.0",
"invocationId": "YXNkbGZqYWRmaiBhc2RmdW9hZHNmZGpmaGFzbGtkaGZza2RmaAo",
"job": {
"id": "f3cc4f60-61f6-4a2b-8a21-d07600c373ce",
"userArguments": {
"k1": "v1",
"k2": "v2"
}
},
"tasks": [
{
"taskId": "dGFza2lkZ29lc2hlcmUK",
"s3Key": "customerImage1.jpg",
"s3VersionId": "jbo9_jhdPEyB4RrmOxWS0kU0EoNrU_oI",
"s3Bucket": "awsexamplebucket"
}
]
}
46 changes: 45 additions & 1 deletion lambda/invoke_loop.go
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,13 @@ func handleInvoke(invoke *invoke, handler *handlerOptions) error {
func reportFailure(invoke *invoke, invokeErr *messages.InvokeResponse_Error) error {
errorPayload := safeMarshal(invokeErr)
log.Printf("%s", errorPayload)
if err := invoke.failure(bytes.NewReader(errorPayload), contentTypeJSON); err != nil {

causeForXRay, err := json.Marshal(makeXRayError(invokeErr))
if err != nil {
return fmt.Errorf("unexpected error occured when serializing the function error cause for X-Ray: %v", err)
}

if err := invoke.failure(bytes.NewReader(errorPayload), contentTypeJSON, causeForXRay); err != nil {
return fmt.Errorf("unexpected error occurred when sending the function error to the API: %v", err)
}
return nil
Expand Down Expand Up @@ -166,3 +172,41 @@ func safeMarshal(v interface{}) []byte {
}
return payload
}

type xrayException struct {
Type string `json:"type"`
Message string `json:"message"`
Stack []*messages.InvokeResponse_Error_StackFrame `json:"stack"`
}

type xrayError struct {
WorkingDirectory string `json:"working_directory"`
Exceptions []xrayException `json:"exceptions"`
Paths []string `json:"paths"`
}

func makeXRayError(invokeResponseError *messages.InvokeResponse_Error) *xrayError {
paths := make([]string, 0, len(invokeResponseError.StackTrace))
visitedPaths := make(map[string]struct{}, len(invokeResponseError.StackTrace))
for _, frame := range invokeResponseError.StackTrace {
if _, exists := visitedPaths[frame.Path]; !exists {
visitedPaths[frame.Path] = struct{}{}
paths = append(paths, frame.Path)
}
}

cwd, _ := os.Getwd()
exceptions := []xrayException{{
Type: invokeResponseError.Type,
Message: invokeResponseError.Message,
Stack: invokeResponseError.StackTrace,
}}
if exceptions[0].Stack == nil {
exceptions[0].Stack = []*messages.InvokeResponse_Error_StackFrame{}
}
return &xrayError{
WorkingDirectory: cwd,
Paths: paths,
Exceptions: exceptions,
}
}
106 changes: 106 additions & 0 deletions lambda/invoke_loop_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,110 @@ func TestCustomErrorMarshaling(t *testing.T) {
}
}

func TestXRayCausePlumbing(t *testing.T) {
errors := []error{
errors.New("barf"),
messages.InvokeResponse_Error{
Type: "yoloError",
Message: "hello yolo",
StackTrace: []*messages.InvokeResponse_Error_StackFrame{
{Label: "yolo", Path: "yolo", Line: 2},
{Label: "hi", Path: "hello/hello", Line: 12},
},
},
messages.InvokeResponse_Error{
Type: "yoloError",
Message: "hello yolo",
StackTrace: []*messages.InvokeResponse_Error_StackFrame{
{Label: "hi", Path: "hello/hello", Line: 12},
{Label: "hihi", Path: "hello/hello", Line: 13},
{Label: "yolo", Path: "yolo", Line: 2},
{Label: "hi", Path: "hello/hello", Line: 14},
},
},
messages.InvokeResponse_Error{
Type: "yoloError",
Message: "hello yolo",
StackTrace: []*messages.InvokeResponse_Error_StackFrame{},
},
messages.InvokeResponse_Error{
Type: "yoloError",
Message: "hello yolo",
},
}
wd, _ := os.Getwd()
expected := []string{
`{
"working_directory":"` + wd + `",
"paths": [],
"exceptions": [{
"type": "errorString",
"message": "barf",
"stack": []
}]
}`,
`{
"working_directory":"` + wd + `",
"paths": ["yolo", "hello/hello"],
"exceptions": [{
"type": "yoloError",
"message": "hello yolo",
"stack": [
{"label": "yolo", "path": "yolo", "line": 2},
{"label": "hi", "path": "hello/hello", "line": 12}
]
}]
}`,
`{
"working_directory":"` + wd + `",
"paths": ["hello/hello", "yolo"],
"exceptions": [{
"type": "yoloError",
"message": "hello yolo",
"stack": [
{"label": "hi", "path": "hello/hello", "line": 12},
{"label": "hihi", "path": "hello/hello", "line": 13},
{"label": "yolo", "path": "yolo", "line": 2},
{"label": "hi", "path": "hello/hello", "line": 14}
]
}]
}`,
`{
"working_directory":"` + wd + `",
"paths": [],
"exceptions": [{
"type": "yoloError",
"message": "hello yolo",
"stack": []
}]
}`,
`{
"working_directory":"` + wd + `",
"paths": [],
"exceptions": [{
"type": "yoloError",
"message": "hello yolo",
"stack": []
}]
}`,
}
require.Equal(t, len(errors), len(expected))
ts, record := runtimeAPIServer(``, len(errors))
defer ts.Close()
n := 0
handler := NewHandler(func() error {
defer func() { n++ }()
return errors[n]
})
endpoint := strings.Split(ts.URL, "://")[1]
expectedError := fmt.Sprintf("failed to GET http://%s/2018-06-01/runtime/invocation/next: got unexpected status code: 410", endpoint)
assert.EqualError(t, startRuntimeAPILoop(endpoint, handler), expectedError)
for i := range errors {
assert.JSONEq(t, expected[i], string(record.xrayCauses[i]))
}

}

func TestRuntimeAPIContextPlumbing(t *testing.T) {
handler := NewHandler(func(ctx context.Context) (interface{}, error) {
lc, _ := lambdacontext.FromContext(ctx)
Expand Down Expand Up @@ -271,6 +375,7 @@ type requestRecord struct {
nPosts int
responses [][]byte
contentTypes []string
xrayCauses []string
}

type eventMetadata struct {
Expand Down Expand Up @@ -336,6 +441,7 @@ func runtimeAPIServer(eventPayload string, failAfter int, overrides ...eventMeta
w.WriteHeader(http.StatusAccepted)
record.responses = append(record.responses, response.Bytes())
record.contentTypes = append(record.contentTypes, r.Header.Get("Content-Type"))
record.xrayCauses = append(record.xrayCauses, r.Header.Get(headerXRayErrorCause))
default:
w.WriteHeader(http.StatusBadRequest)
}
Expand Down
14 changes: 10 additions & 4 deletions lambda/runtime_api_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,11 +22,13 @@ const (
headerCognitoIdentity = "Lambda-Runtime-Cognito-Identity"
headerClientContext = "Lambda-Runtime-Client-Context"
headerInvokedFunctionARN = "Lambda-Runtime-Invoked-Function-Arn"
headerXRayErrorCause = "Lambda-Runtime-Function-Xray-Error-Cause"
trailerLambdaErrorType = "Lambda-Runtime-Function-Error-Type"
trailerLambdaErrorBody = "Lambda-Runtime-Function-Error-Body"
contentTypeJSON = "application/json"
contentTypeBytes = "application/octet-stream"
apiVersion = "2018-06-01"
xrayErrorCauseMaxSize = 1024 * 1024
)

type runtimeAPIClient struct {
Expand Down Expand Up @@ -57,17 +59,17 @@ type invoke struct {
// - An invoke is not complete until next() is called again!
func (i *invoke) success(body io.Reader, contentType string) error {
url := i.client.baseURL + i.id + "/response"
return i.client.post(url, body, contentType)
return i.client.post(url, body, contentType, nil)
}

// failure sends the payload to the Runtime API. This marks the function's invoke as a failure.
// Notes:
// - The execution of the function process continues, and is billed, until next() is called again!
// - A Lambda Function continues to be re-used for future invokes even after a failure.
// If the error is fatal (panic, unrecoverable state), exit the process immediately after calling failure()
func (i *invoke) failure(body io.Reader, contentType string) error {
func (i *invoke) failure(body io.Reader, contentType string, causeForXRay []byte) error {
url := i.client.baseURL + i.id + "/error"
return i.client.post(url, body, contentType)
return i.client.post(url, body, contentType, causeForXRay)
}

// next connects to the Runtime API and waits for a new invoke Request to be available.
Expand Down Expand Up @@ -108,7 +110,7 @@ func (c *runtimeAPIClient) next() (*invoke, error) {
}, nil
}

func (c *runtimeAPIClient) post(url string, body io.Reader, contentType string) error {
func (c *runtimeAPIClient) post(url string, body io.Reader, contentType string, xrayErrorCause []byte) error {
b := newErrorCapturingReader(body)
req, err := http.NewRequest(http.MethodPost, url, b)
if err != nil {
Expand All @@ -118,6 +120,10 @@ func (c *runtimeAPIClient) post(url string, body io.Reader, contentType string)
req.Header.Set("User-Agent", c.userAgent)
req.Header.Set("Content-Type", contentType)

if xrayErrorCause != nil && len(xrayErrorCause) < xrayErrorCauseMaxSize {
req.Header.Set(headerXRayErrorCause, string(xrayErrorCause))
}

resp, err := c.httpClient.Do(req)
if err != nil {
return fmt.Errorf("failed to POST to %s: %v", url, err)
Expand Down
8 changes: 4 additions & 4 deletions lambda/runtime_api_client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,7 @@ func TestClientDoneAndError(t *testing.T) {
assert.NoError(t, err)
})
t.Run(fmt.Sprintf("happy Error with payload[%d]", i), func(t *testing.T) {
err := invoke.failure(bytes.NewReader(payload), contentTypeJSON)
err := invoke.failure(bytes.NewReader(payload), contentTypeJSON, nil)
assert.NoError(t, err)
})
}
Expand All @@ -105,7 +105,7 @@ func TestInvalidRequestsForMalformedEndpoint(t *testing.T) {
require.Error(t, err)
err = (&invoke{client: newRuntimeAPIClient("🚨")}).success(nil, "")
require.Error(t, err)
err = (&invoke{client: newRuntimeAPIClient("🚨")}).failure(nil, "")
err = (&invoke{client: newRuntimeAPIClient("🚨")}).failure(nil, "", nil)
require.Error(t, err)
}

Expand Down Expand Up @@ -145,7 +145,7 @@ func TestStatusCodes(t *testing.T) {
require.NoError(t, err)
})
t.Run("failure should not error", func(t *testing.T) {
err := invoke.failure(nil, "")
err := invoke.failure(nil, "", nil)
require.NoError(t, err)
})
} else {
Expand All @@ -158,7 +158,7 @@ func TestStatusCodes(t *testing.T) {
}
})
t.Run("failure should error", func(t *testing.T) {
err := invoke.failure(nil, "")
err := invoke.failure(nil, "", nil)
require.Error(t, err)
if i != 301 && i != 302 && i != 303 {
assert.Contains(t, err.Error(), "unexpected status code")
Expand Down

0 comments on commit 9acd9d1

Please sign in to comment.