Skip to content

Commit

Permalink
chore(middleware-sdk-s3): use splitStream utility function to inspect…
Browse files Browse the repository at this point in the history
… 200-error
  • Loading branch information
kuhe committed Jul 16, 2024
1 parent 23ff9d3 commit 7a04eaa
Show file tree
Hide file tree
Showing 3 changed files with 67 additions and 24 deletions.
2 changes: 2 additions & 0 deletions packages/middleware-sdk-s3/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,8 @@
"@smithy/smithy-client": "^3.1.8",
"@smithy/types": "^3.3.0",
"@smithy/util-config-provider": "^3.0.0",
"@smithy/util-stream": "^3.0.6",
"@smithy/util-utf8": "^3.0.0",
"tslib": "^2.6.2"
},
"devDependencies": {
Expand Down
51 changes: 35 additions & 16 deletions packages/middleware-sdk-s3/src/throw-200-exceptions.spec.ts
Original file line number Diff line number Diff line change
@@ -1,25 +1,21 @@
import { HttpRequest, HttpResponse } from "@smithy/protocol-http";
import { toUtf8 } from "@smithy/util-utf8";
import { Readable } from "stream";

import { throw200ExceptionsMiddleware } from "./throw-200-exceptions";

describe("throw200ExceptionsMiddlewareOptions", () => {
const mockNextHandler = jest.fn();
const mockStreamCollector = jest.fn();
const mockUtf8Encoder = jest.fn();
const mockResponse = jest.fn();
const mockConfig = {
streamCollector: mockStreamCollector,
utf8Encoder: mockUtf8Encoder,
utf8Encoder: toUtf8,
};

beforeEach(() => {
jest.clearAllMocks();
});

describe("tests for statusCode < 200 and >= 300", () => {
mockStreamCollector.mockResolvedValue(Buffer.from(""));
mockUtf8Encoder.mockReturnValue("");

it.each([199, 300])("results for statusCode %i", async (statusCode) => {
mockNextHandler.mockReturnValue({
response: mockResponse,
Expand All @@ -39,13 +35,11 @@ describe("throw200ExceptionsMiddlewareOptions", () => {

it("should throw if response body is empty", async () => {
expect.assertions(3);
mockStreamCollector.mockResolvedValue(Buffer.from(""));
mockUtf8Encoder.mockReturnValue("");
mockNextHandler.mockReturnValue({
response: new HttpResponse({
statusCode: 200,
headers: {},
body: "",
body: Readable.from(Buffer.from("")),
}),
});
const handler = throw200ExceptionsMiddleware(mockConfig)(mockNextHandler, {
Expand Down Expand Up @@ -73,13 +67,11 @@ describe("throw200ExceptionsMiddlewareOptions", () => {
<RequestId>656c76696e6727732072657175657374</RequestId>
<HostId>Uuag1LuByRx9e6j5Onimru9pO4ZVKnJ2Qz7/C1NPcfTWAtRPfTaOFg==</HostId>
</Error>`;
mockStreamCollector.mockResolvedValue(Buffer.from(errorBody));
mockUtf8Encoder.mockReturnValue(errorBody);
mockNextHandler.mockReturnValue({
response: new HttpResponse({
statusCode: 200,
headers: {},
body: "",
body: Readable.from(Buffer.from(errorBody)),
}),
});
const handler = throw200ExceptionsMiddleware(mockConfig)(mockNextHandler, {} as any);
Expand All @@ -106,13 +98,40 @@ describe("throw200ExceptionsMiddlewareOptions", () => {
<Message>Access Denied</Message>
</Error>
</DeleteResult>`;
mockStreamCollector.mockResolvedValue(Buffer.from(errorBody));
mockUtf8Encoder.mockReturnValue(errorBody);
mockNextHandler.mockReturnValue({
response: new HttpResponse({
statusCode: 200,
headers: {},
body: "",
body: Readable.from(Buffer.from(errorBody)),
}),
});
const handler = throw200ExceptionsMiddleware(mockConfig)(mockNextHandler, {} as any);
const { response } = await handler({
input: {},
request: new HttpRequest({
hostname: "s3.us-east-1.amazonaws.com",
}),
});
expect(HttpResponse.isInstance(response)).toBe(true);
// @ts-ignore
expect(response.statusCode).toEqual(200);
});

/**
* This is an exception to the specification. We cannot afford to read
* a streaming body for its entire duration just to check for an extremely unlikely
* terminating XML tag if the stream is very long.
*/
it("should not throw if the Error tag is on an excessively long body", async () => {
const errorBody = `<?xml version="1.0" encoding="UTF-8"?>
<Error xmlns="http://s3.amazonaws.com/doc/2006-03-01/">
${"a".repeat(3000)}
</Error>`;
mockNextHandler.mockReturnValue({
response: new HttpResponse({
statusCode: 200,
headers: {},
body: Readable.from(Buffer.from(errorBody)),
}),
});
const handler = throw200ExceptionsMiddleware(mockConfig)(mockNextHandler, {} as any);
Expand Down
38 changes: 30 additions & 8 deletions packages/middleware-sdk-s3/src/throw-200-exceptions.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,9 @@ import {
RelativeMiddlewareOptions,
StreamCollector,
} from "@smithy/types";
import { headStream, splitStream } from "@smithy/util-stream";

type PreviouslyResolved = {
streamCollector: StreamCollector;
utf8Encoder: Encoder;
};

Expand All @@ -22,6 +22,13 @@ const THROW_IF_EMPTY_BODY: Record<string, boolean> = {
CompleteMultipartUploadCommand: true,
};

/**
* @internal
* We will check at most this many bytes from the stream when looking for
* an error-like 200 status.
*/
const MAX_BYTES_TO_INSPECT = 3000;

/**
* In case of an internal error/terminated connection, S3 operations may return 200 errors. CopyObject, UploadPartCopy,
* CompleteMultipartUpload may return empty payload or payload with only xml Preamble.
Expand All @@ -36,12 +43,23 @@ export const throw200ExceptionsMiddleware =
if (!HttpResponse.isInstance(response)) {
return result;
}
const { statusCode, body } = response;
const { statusCode, body: sourceBody } = response;
if (statusCode < 200 || statusCode >= 300) {
return result;
}

const bodyBytes: Uint8Array = await collectBody(body, config);
let bodyCopy = sourceBody;
let body = sourceBody;

if (sourceBody && typeof sourceBody === "object" && !(sourceBody instanceof Uint8Array)) {
[bodyCopy, body] = await splitStream(sourceBody);
}

const bodyBytes: Uint8Array = await collectBody(bodyCopy, {
streamCollector: async (stream: any) => {
return headStream(stream, MAX_BYTES_TO_INSPECT);
},
});
const bodyStringTail = config.utf8Encoder(bodyBytes.subarray(bodyBytes.length - 16));

// Throw on 200 response with empty body, legacy behavior allowlist.
Expand All @@ -56,14 +74,18 @@ export const throw200ExceptionsMiddleware =
response.statusCode = 400;
}

// Body stream is consumed and paused at this point. So replace the response.body to the collected bytes.
// So that the deserializer can consume the body as normal.
response.body = bodyBytes;
// restore split body to the response for deserialization.
response.body = body;
return result;
};

// Collect low-level response body stream to Uint8Array.
const collectBody = (streamBody: any = new Uint8Array(), context: PreviouslyResolved): Promise<Uint8Array> => {
/**
* @internal
*/
const collectBody = (
streamBody: any = new Uint8Array(),
context: { streamCollector: StreamCollector }
): Promise<Uint8Array> => {
if (streamBody instanceof Uint8Array) {
return Promise.resolve(streamBody);
}
Expand Down

0 comments on commit 7a04eaa

Please sign in to comment.