diff --git a/packages/node-http-handler/src/node-http-handler.ts b/packages/node-http-handler/src/node-http-handler.ts index 2cbe7aeb67ab..9db581383689 100644 --- a/packages/node-http-handler/src/node-http-handler.ts +++ b/packages/node-http-handler/src/node-http-handler.ts @@ -6,8 +6,6 @@ import { Agent as hsAgent, request as hsRequest, RequestOptions } from "https"; import { NODEJS_TIMEOUT_ERROR_CODES } from "./constants"; import { getTransformedHeaders } from "./get-transformed-headers"; -import { setConnectionTimeout } from "./set-connection-timeout"; -import { setSocketTimeout } from "./set-socket-timeout"; import { writeRequestBody } from "./write-request-body"; /** @@ -15,12 +13,28 @@ import { writeRequestBody } from "./write-request-body"; */ export interface NodeHttpHandlerOptions { /** + * @deprecated Use {@link requestTimeout} + * + * Note:{@link NodeHttpHandler} will resolve request timeout via nullish coalescing the following fields: + * {@link requestTimeout} ?? {@link connectionTimeout} ?? {@link socketTimeout} ?? {@link DEFAULT_REQUEST_TIMEOUT} + * * The maximum time in milliseconds that the connection phase of a request * may take before the connection attempt is abandoned. */ connectionTimeout?: number; /** + * The maximum time in milliseconds that the connection phase of a request + * may take before the connection attempt is abandoned. + */ + requestTimeout?: number; + + /** + * @deprecated Use {@link requestTimeout} + * + * Note:{@link NodeHttpHandler} will resolve request timeout via nullish coalescing the following fields: + * {@link requestTimeout} ?? {@link connectionTimeout} ?? {@link socketTimeout} ?? {@link DEFAULT_REQUEST_TIMEOUT} + * * The maximum time in milliseconds that a socket may remain idle before it * is closed. */ @@ -31,12 +45,15 @@ export interface NodeHttpHandlerOptions { } interface ResolvedNodeHttpHandlerConfig { + requestTimeout: number; connectionTimeout?: number; socketTimeout?: number; httpAgent: hAgent; httpsAgent: hsAgent; } +export const DEFAULT_REQUEST_TIMEOUT = 0; + export class NodeHttpHandler implements HttpHandler { private config?: ResolvedNodeHttpHandlerConfig; private readonly configProvider: Promise; @@ -59,12 +76,14 @@ export class NodeHttpHandler implements HttpHandler { } private resolveDefaultConfig(options?: NodeHttpHandlerOptions | void): ResolvedNodeHttpHandlerConfig { - const { connectionTimeout, socketTimeout, httpAgent, httpsAgent } = options || {}; + const { requestTimeout, connectionTimeout, socketTimeout, httpAgent, httpsAgent } = options || {}; const keepAlive = true; const maxSockets = 50; + return { connectionTimeout, socketTimeout, + requestTimeout: requestTimeout ?? connectionTimeout ?? socketTimeout ?? DEFAULT_REQUEST_TIMEOUT, httpAgent: httpAgent || new hAgent({ keepAlive, maxSockets }), httpsAgent: httpsAgent || new hsAgent({ keepAlive, maxSockets }), }; @@ -123,9 +142,11 @@ export class NodeHttpHandler implements HttpHandler { } }); - // wire-up any timeout logic - setConnectionTimeout(req, reject, this.config.connectionTimeout); - setSocketTimeout(req, reject, this.config.socketTimeout); + const timeout: number = this.config?.requestTimeout ?? DEFAULT_REQUEST_TIMEOUT; + req.setTimeout(timeout, () => { + req.destroy(); + reject(Object.assign(new Error(`Connection timed out after ${timeout} ms`), { name: "TimeoutError" })); + }); // wire-up abort logic if (abortSignal) { diff --git a/packages/node-http-handler/src/node-http2-connection-manager.ts b/packages/node-http-handler/src/node-http2-connection-manager.ts new file mode 100644 index 000000000000..a2dac8496350 --- /dev/null +++ b/packages/node-http-handler/src/node-http2-connection-manager.ts @@ -0,0 +1,125 @@ +import { RequestContext } from "@aws-sdk/types"; +import { ConnectConfiguration } from "@aws-sdk/types/src/connection/config"; +import { ConnectionManager, ConnectionManagerConfiguration } from "@aws-sdk/types/src/connection/manager"; +import http2, { ClientHttp2Session } from "http2"; + +import { NodeHttp2ConnectionPool } from "./node-http2-connection-pool"; + +export class NodeHttp2ConnectionManager implements ConnectionManager { + constructor(config: ConnectionManagerConfiguration) { + this.config = config; + + if (this.config.maxConcurrency && this.config.maxConcurrency <= 0) { + throw new RangeError("maxConcurrency must be greater than zero."); + } + } + + private config: ConnectionManagerConfiguration; + + private readonly sessionCache: Map = new Map(); + + public lease(requestContext: RequestContext, connectionConfiguration: ConnectConfiguration): ClientHttp2Session { + const url = this.getUrlString(requestContext); + + const existingPool = this.sessionCache.get(url); + + if (existingPool) { + const existingSession = existingPool.poll(); + if (existingSession && !this.config.disableConcurrency) { + return existingSession; + } + } + + const session = http2.connect(url); + + if (this.config.maxConcurrency) { + session.settings({ maxConcurrentStreams: this.config.maxConcurrency }, (err) => { + if (err) { + throw new Error( + "Fail to set maxConcurrentStreams to " + + this.config.maxConcurrency + + "when creating new session for " + + requestContext.destination.toString() + ); + } + }); + } + + // AWS SDK does not expect server push streams, don't keep node alive without a request. + session.unref(); + + const destroySessionCb = () => { + session.destroy(); + this.deleteSession(url, session); + }; + session.on("goaway", destroySessionCb); + session.on("error", destroySessionCb); + session.on("frameError", destroySessionCb); + session.on("close", () => this.deleteSession(url, session)); + + if (connectionConfiguration.requestTimeout) { + session.setTimeout(connectionConfiguration.requestTimeout, destroySessionCb); + } + + const connectionPool = this.sessionCache.get(url) || new NodeHttp2ConnectionPool(); + + connectionPool.offerLast(session); + + this.sessionCache.set(url, connectionPool); + + return session; + } + + /** + * Delete a session from the connection pool. + * @param authority The authority of the session to delete. + * @param session The session to delete. + */ + public deleteSession(authority: string, session: ClientHttp2Session): void { + const existingConnectionPool = this.sessionCache.get(authority); + + if (!existingConnectionPool) { + return; + } + + if (!existingConnectionPool.contains(session)) { + return; + } + + existingConnectionPool.remove(session); + + this.sessionCache.set(authority, existingConnectionPool); + } + + public release(requestContext: RequestContext, session: ClientHttp2Session): void { + const cacheKey = this.getUrlString(requestContext); + this.sessionCache.get(cacheKey)?.offerLast(session); + } + + public destroy(): void { + for (const [key, connectionPool] of this.sessionCache) { + for (const session of connectionPool) { + if (!session.destroyed) { + session.destroy(); + } + connectionPool.remove(session); + } + this.sessionCache.delete(key); + } + } + + public setMaxConcurrentStreams(maxConcurrentStreams: number) { + if (this.config.maxConcurrency && this.config.maxConcurrency <= 0) { + throw new RangeError("maxConcurrentStreams must be greater than zero."); + } + this.config.maxConcurrency = maxConcurrentStreams; + } + + public setDisableConcurrentStreams(disableConcurrentStreams: boolean) { + this.config.disableConcurrency = disableConcurrentStreams; + } + + private getUrlString(request: RequestContext): string { + return request.destination.toString(); + } +} diff --git a/packages/node-http-handler/src/node-http2-connection-pool.ts b/packages/node-http-handler/src/node-http2-connection-pool.ts new file mode 100644 index 000000000000..35b81154fc57 --- /dev/null +++ b/packages/node-http-handler/src/node-http2-connection-pool.ts @@ -0,0 +1,42 @@ +import { ConnectionPool } from "@aws-sdk/types/src/connection/pool"; +import { ClientHttp2Session } from "http2"; + +export class NodeHttp2ConnectionPool implements ConnectionPool { + private sessions: ClientHttp2Session[] = []; + + constructor(sessions?: ClientHttp2Session[]) { + this.sessions = sessions ?? []; + } + + public poll(): ClientHttp2Session | void { + if (this.sessions.length > 0) { + return this.sessions.shift(); + } + } + + public offerLast(session: ClientHttp2Session): void { + this.sessions.push(session); + } + + public contains(session: ClientHttp2Session): boolean { + return this.sessions.includes(session); + } + + public remove(session: ClientHttp2Session): void { + this.sessions = this.sessions.filter((s) => s !== session); + } + + public [Symbol.iterator]() { + return this.sessions[Symbol.iterator](); + } + + public destroy(connection: ClientHttp2Session): void { + for (const session of this.sessions) { + if (session === connection) { + if (!session.destroyed) { + session.destroy(); + } + } + } + } +} diff --git a/packages/node-http-handler/src/node-http2-handler.spec.ts b/packages/node-http-handler/src/node-http2-handler.spec.ts index 60626387d028..af29c1b23b86 100644 --- a/packages/node-http-handler/src/node-http2-handler.spec.ts +++ b/packages/node-http-handler/src/node-http2-handler.spec.ts @@ -1,11 +1,12 @@ import { AbortController } from "@aws-sdk/abort-controller"; import { HttpRequest, HttpResponse } from "@aws-sdk/protocol-http"; import { rejects } from "assert"; -import http2, { ClientHttp2Session, ClientHttp2Stream, constants, Http2Stream } from "http2"; +import http2, { ClientHttp2Session, ClientHttp2Stream, constants, Http2Server, Http2Stream } from "http2"; import { Duplex } from "stream"; import { promisify } from "util"; -import { NodeHttp2Handler } from "./node-http2-handler"; +import { NodeHttp2ConnectionPool } from "./node-http2-connection-pool"; +import { NodeHttp2Handler, NodeHttp2HandlerOptions } from "./node-http2-handler"; import { createMockHttp2Server, createResponseFunction, createResponseFunctionWithDelay } from "./server.mock"; describe(NodeHttp2Handler.name, () => { @@ -14,7 +15,10 @@ describe(NodeHttp2Handler.name, () => { const protocol = "http:"; const hostname = "localhost"; const port = 45321; - const mockH2Server = createMockHttp2Server().listen(port); + let mockH2Server = undefined; + let mockH2Servers: Record = {}; + + const authority = `${protocol}//${hostname}:${port}/`; const getMockReqOptions = () => ({ protocol, hostname, @@ -31,16 +35,23 @@ describe(NodeHttp2Handler.name, () => { }; beforeEach(() => { + mockH2Servers = { + 45321: createMockHttp2Server().listen(port), + 45322: createMockHttp2Server().listen(port + 1), + 45323: createMockHttp2Server().listen(port + 2), + 45324: createMockHttp2Server().listen(port + 3), + }; + mockH2Server = mockH2Servers[port]; mockH2Server.on("request", createResponseFunction(mockResponse)); }); afterEach(() => { mockH2Server.removeAllListeners("request"); jest.clearAllMocks(); - }); - - afterAll(() => { - mockH2Server.close(); + for (const p in mockH2Servers) { + mockH2Servers[p].removeAllListeners("request"); + mockH2Servers[p].close(); + } }); describe.each([ @@ -59,6 +70,7 @@ describe(NodeHttp2Handler.name, () => { const session = connectReal(...args); jest.spyOn(session, "ref"); jest.spyOn(session, "unref"); + jest.spyOn(session, "settings"); createdSessions.push(session); return session; }); @@ -102,7 +114,6 @@ describe(NodeHttp2Handler.name, () => { // Make single request. const { response } = await nodeH2Handler.handle(new HttpRequest(getMockReqOptions()), {}); - const authority = `${protocol}//${hostname}:${port}`; expect(connectSpy).toHaveBeenCalledTimes(1); expect(connectSpy).toHaveBeenCalledWith(authority); @@ -118,7 +129,6 @@ describe(NodeHttp2Handler.name, () => { const { response: response1 } = await nodeH2Handler.handle(new HttpRequest(getMockReqOptions()), {}); const { response: response2 } = await nodeH2Handler.handle(new HttpRequest(getMockReqOptions()), {}); - const authority = `${protocol}//${hostname}:${port}`; expect(connectSpy).toHaveBeenCalledTimes(1); expect(connectSpy).toHaveBeenCalledWith(authority); @@ -135,7 +145,7 @@ describe(NodeHttp2Handler.name, () => { const { response: response1 } = await nodeH2Handler.handle(new HttpRequest(getMockReqOptions()), {}); const port2 = port + 1; - const mockH2Server2 = createMockHttp2Server().listen(port2); + const mockH2Server2 = mockH2Servers[port2]; mockH2Server2.on("request", createResponseFunction(mockResponse)); // Make second request on URL with port2. @@ -146,8 +156,8 @@ describe(NodeHttp2Handler.name, () => { const authorityPrefix = `${protocol}//${hostname}`; expect(connectSpy).toHaveBeenCalledTimes(2); - expect(connectSpy).toHaveBeenNthCalledWith(1, `${authorityPrefix}:${port}`); - expect(connectSpy).toHaveBeenNthCalledWith(2, `${authorityPrefix}:${port2}`); + expect(connectSpy).toHaveBeenNthCalledWith(1, `${authorityPrefix}:${port}/`); + expect(connectSpy).toHaveBeenNthCalledWith(2, `${authorityPrefix}:${port2}/`); mockH2Server2.close(); expectSessionCreatedAndReferred(createdSessions[0]); @@ -163,7 +173,7 @@ describe(NodeHttp2Handler.name, () => { const UNEXPECTEDLY_CLOSED_REGEX = /closed|destroy|cancel|did not get a response/i; it("handles goaway frames", async () => { const port3 = port + 2; - const mockH2Server3 = createMockHttp2Server().listen(port3); + const mockH2Server3 = mockH2Servers[port3]; let establishedConnections = 0; let numRequests = 0; let shouldSendGoAway = true; @@ -243,7 +253,7 @@ describe(NodeHttp2Handler.name, () => { ["destroy", port + 2], ["close", port + 3], ])("handles servers calling connections %s", async (func, port) => { - const mockH2Server4 = createMockHttp2Server().listen(port); + const mockH2Server4 = mockH2Servers[port]; let establishedConnections = 0; let numRequests = 0; @@ -292,16 +302,15 @@ describe(NodeHttp2Handler.name, () => { it("destroys session and clears sessionCache", async () => { await nodeH2Handler.handle(new HttpRequest(getMockReqOptions()), {}); - const authority = `${protocol}//${hostname}:${port}`; // @ts-ignore: access private property - const session: ClientHttp2Session = nodeH2Handler.sessionCache.get(authority)[0]; + const session: ClientHttp2Session = nodeH2Handler.connectionManager.sessionCache.get(authority).sessions[0]; // @ts-ignore: access private property - expect(nodeH2Handler.sessionCache.size).toBe(1); + expect(nodeH2Handler.connectionManager.sessionCache.size).toBe(1); expect(session.destroyed).toBe(false); nodeH2Handler.destroy(); // @ts-ignore: access private property - expect(nodeH2Handler.sessionCache.size).toBe(0); + expect(nodeH2Handler.connectionManager.sessionCache.size).toBe(0); expect(session.destroyed).toBe(true); }); }); @@ -309,7 +318,7 @@ describe(NodeHttp2Handler.name, () => { describe("abortSignal", () => { it("will not create session if request already aborted", async () => { // @ts-ignore: access private property - expect(nodeH2Handler.sessionCache.size).toBe(0); + expect(nodeH2Handler.connectionManager.sessionCache.size).toBe(0); await expect( nodeH2Handler.handle(new HttpRequest(getMockReqOptions()), { abortSignal: { @@ -319,16 +328,15 @@ describe(NodeHttp2Handler.name, () => { }) ).rejects.toHaveProperty("name", "AbortError"); // @ts-ignore: access private property - expect(nodeH2Handler.sessionCache.size).toBe(0); + expect(nodeH2Handler.connectionManager.sessionCache.size).toBe(0); }); it("will not create request on session if request already aborted", async () => { // Create a session by sending a request. await nodeH2Handler.handle(new HttpRequest(getMockReqOptions()), {}); - const authority = `${protocol}//${hostname}:${port}`; // @ts-ignore: access private property - const session: ClientHttp2Session = nodeH2Handler.sessionCache.get(authority)[0]; + const session: ClientHttp2Session = nodeH2Handler.connectionManager.sessionCache.get(authority).sessions[0]; const requestSpy = jest.spyOn(session, "request"); await expect( @@ -426,18 +434,17 @@ describe(NodeHttp2Handler.name, () => { ["static object", { sessionTimeout }], ])("disableConcurrentStreams: false (default) in constructor parameter of %s", async (_, options) => { nodeH2Handler = new NodeHttp2Handler(options); - await nodeH2Handler.handle(new HttpRequest(getMockReqOptions()), {}); + await nodeH2Handler.handle(new HttpRequest(getMockReqOptions()), { requestTimeout: sessionTimeout }); - const authority = `${protocol}//${hostname}:${port}`; // @ts-ignore: access private property - const session: ClientHttp2Session = nodeH2Handler.sessionCache.get(authority)[0]; + const session: ClientHttp2Session = nodeH2Handler.connectionManager.sessionCache.get(authority).sessions[0]; expect(session.destroyed).toBe(false); // @ts-ignore: access private property - expect(nodeH2Handler.sessionCache.get(authority).length).toStrictEqual(1); + expect(nodeH2Handler.connectionManager.sessionCache.get(authority).sessions.length).toStrictEqual(1); await promisify(setTimeout)(sessionTimeout + 100); expect(session.destroyed).toBe(true); // @ts-ignore: access private property - expect(nodeH2Handler.sessionCache.get(authority).length).toStrictEqual(0); + expect(nodeH2Handler.connectionManager.sessionCache.get(authority).sessions.length).toStrictEqual(0); }); it.each([ @@ -445,14 +452,13 @@ describe(NodeHttp2Handler.name, () => { ["static object", { sessionTimeout, disableConcurrentStreams: true }], ])("disableConcurrentStreams: true in constructor parameter of %s", async (_, options) => { let session; - const authority = `${protocol}//${hostname}:${port}`; nodeH2Handler = new NodeHttp2Handler(options); mockH2Server.removeAllListeners("request"); mockH2Server.on("request", (request: any, response: any) => { // @ts-ignore: access private property - session = nodeH2Handler.sessionCache.get(authority)[0]; + session = nodeH2Handler.connectionManager.sessionCache.get(authority).sessions[0]; createResponseFunction(mockResponse)(request, response); }); await nodeH2Handler.handle(new HttpRequest(getMockReqOptions()), {}); @@ -464,20 +470,55 @@ describe(NodeHttp2Handler.name, () => { }); }); + describe("maxConcurrency", () => { + it.each([ + ["static object", {}], + ["static object", { maxConcurrentStreams: 0 }], + ["static object", { maxConcurrentStreams: 1 }], + ["static object", { maxConcurrentStreams: 2 }], + ["static object", { maxConcurrentStreams: 3 }], + ])("verify session settings' maxConcurrentStreams", async (_, options: NodeHttp2HandlerOptions) => { + nodeH2Handler = new NodeHttp2Handler(options); + await nodeH2Handler.handle(new HttpRequest(getMockReqOptions()), {}); + + // @ts-ignore: access private property + const session = nodeH2Handler.connectionManager.sessionCache.get(authority).sessions[0]; + + if (options.maxConcurrentStreams) { + expect(session.localSettings.maxConcurrentStreams).toBe(options.maxConcurrentStreams); + expect(session.settings).toHaveBeenCalled(); + } else { + expect(session.localSettings.maxConcurrentStreams).toBe(4294967295); + } + }); + + it("verify error thrown when maxConcurrentStreams is negative", async () => { + let error: Error | undefined = undefined; + try { + nodeH2Handler = new NodeHttp2Handler({ maxConcurrentStreams: -1 }); + await nodeH2Handler.handle(new HttpRequest(getMockReqOptions()), {}); + } catch (e) { + error = e; + } + + expect(error).toBeDefined(); + expect(error!.message).toEqual('Invalid value for setting "maxConcurrentStreams": -1'); + }); + }); + it("will throw reasonable error when connection aborted abnormally", async () => { nodeH2Handler = new NodeHttp2Handler(); // Create a session by sending a request. await nodeH2Handler.handle(new HttpRequest(getMockReqOptions()), {}); - const authority = `${protocol}//${hostname}:${port}`; // @ts-ignore: access private property - const session: ClientHttp2Session = nodeH2Handler.sessionCache.get(authority)[0]; + const session: ClientHttp2Session = nodeH2Handler.connectionManager.sessionCache.get(authority).sessions[0]; const fakeStream = new Duplex() as ClientHttp2Stream; const fakeRstCode = 1; // @ts-ignore: fake result code fakeStream.rstCode = fakeRstCode; jest.spyOn(session, "request").mockImplementation(() => fakeStream); // @ts-ignore: access private property - nodeH2Handler.sessionCache.set(`${protocol}//${hostname}:${port}`, [session]); + nodeH2Handler.connectionManager.sessionCache.set(authority, new NodeHttp2ConnectionPool([session])); // Delay response so that onabort is called earlier setTimeout(() => { fakeStream.emit("aborted"); @@ -493,13 +534,12 @@ describe(NodeHttp2Handler.name, () => { nodeH2Handler = new NodeHttp2Handler(); // Create a session by sending a request. await nodeH2Handler.handle(new HttpRequest(getMockReqOptions()), {}); - const authority = `${protocol}//${hostname}:${port}`; // @ts-ignore: access private property - const session: ClientHttp2Session = nodeH2Handler.sessionCache.get(authority)[0]; + const session: ClientHttp2Session = nodeH2Handler.connectionManager.sessionCache.get(authority).sessions[0]; const fakeStream = new Duplex() as ClientHttp2Stream; jest.spyOn(session, "request").mockImplementation(() => fakeStream); // @ts-ignore: access private property - nodeH2Handler.sessionCache.set(`${protocol}//${hostname}:${port}`, [session]); + nodeH2Handler.connectionManager.sessionCache.set(authority, new NodeHttp2ConnectionPool([session])); // Delay response so that onabort is called earlier setTimeout(() => { fakeStream.emit("frameError", "TYPE", "CODE", "ID"); @@ -530,8 +570,6 @@ describe(NodeHttp2Handler.name, () => { // Make single request. await nodeH2Handler.handle(new HttpRequest(getMockReqOptions()), {}); - - const authority = `${protocol}//${hostname}:${port}`; expect(connectSpy).toHaveBeenCalledTimes(1); expect(connectSpy).toHaveBeenCalledWith(authority); }); @@ -543,7 +581,6 @@ describe(NodeHttp2Handler.name, () => { await nodeH2Handler.handle(new HttpRequest(getMockReqOptions()), {}); await nodeH2Handler.handle(new HttpRequest(getMockReqOptions()), {}); - const authority = `${protocol}//${hostname}:${port}`; expect(connectSpy).toHaveBeenCalledTimes(2); expect(connectSpy).toHaveBeenNthCalledWith(1, authority); expect(connectSpy).toHaveBeenNthCalledWith(2, authority); @@ -556,7 +593,7 @@ describe(NodeHttp2Handler.name, () => { await nodeH2Handler.handle(new HttpRequest(getMockReqOptions()), {}); const port2 = port + 1; - const mockH2Server2 = createMockHttp2Server().listen(port2); + const mockH2Server2 = mockH2Servers[port2]; mockH2Server2.on("request", createResponseFunction(mockResponse)); // Make second request on URL with port2. @@ -564,8 +601,8 @@ describe(NodeHttp2Handler.name, () => { const authorityPrefix = `${protocol}//${hostname}`; expect(connectSpy).toHaveBeenCalledTimes(2); - expect(connectSpy).toHaveBeenNthCalledWith(1, `${authorityPrefix}:${port}`); - expect(connectSpy).toHaveBeenNthCalledWith(2, `${authorityPrefix}:${port2}`); + expect(connectSpy).toHaveBeenNthCalledWith(1, `${authorityPrefix}:${port}/`); + expect(connectSpy).toHaveBeenNthCalledWith(2, `${authorityPrefix}:${port2}/`); mockH2Server2.close(); }); }); @@ -575,10 +612,16 @@ describe(NodeHttp2Handler.name, () => { await nodeH2Handler.handle(new HttpRequest(getMockReqOptions()), {}); // @ts-ignore: access private property - expect(nodeH2Handler.sessionCache.size).toBe(1); + const session: ClientHttp2Session = nodeH2Handler.connectionManager.sessionCache.get(authority).sessions[0]; + + // @ts-ignore: access private property + expect(nodeH2Handler.connectionManager.sessionCache.size).toBe(1); + expect(session.destroyed).toBe(false); + nodeH2Handler.destroy(); // @ts-ignore: access private property - expect(nodeH2Handler.sessionCache.size).toBe(0); + expect(nodeH2Handler.connectionManager.sessionCache.size).toBe(0); + expect(session.destroyed).toBe(true); }); }); }); diff --git a/packages/node-http-handler/src/node-http2-handler.ts b/packages/node-http-handler/src/node-http2-handler.ts index 155f90a319d7..eac7679e9335 100644 --- a/packages/node-http-handler/src/node-http2-handler.ts +++ b/packages/node-http-handler/src/node-http2-handler.ts @@ -1,9 +1,11 @@ import { HttpHandler, HttpRequest, HttpResponse } from "@aws-sdk/protocol-http"; import { buildQueryString } from "@aws-sdk/querystring-builder"; -import { HttpHandlerOptions, Provider } from "@aws-sdk/types"; -import { ClientHttp2Session, connect, constants } from "http2"; +import { HttpHandlerOptions, Provider, RequestContext } from "@aws-sdk/types"; +import { ConnectConfiguration } from "@aws-sdk/types/dist-types/connection/config"; +import { ClientHttp2Session, constants } from "http2"; import { getTransformedHeaders } from "./get-transformed-headers"; +import { NodeHttp2ConnectionManager } from "./node-http2-connection-manager"; import { writeRequestBody } from "./write-request-body"; /** @@ -25,11 +27,19 @@ export interface NodeHttp2HandlerOptions { /** * Disables processing concurrent streams on a ClientHttp2Session instance. When set - * to true, the handler will create a new session instance for each request to a URL. + * to true, a new session instance is created for each request to a URL. * **Default:** false. * https://nodejs.org/api/http2.html#http2_class_clienthttp2session */ disableConcurrentStreams?: boolean; + + /** + * Maximum number of concurrent Http2Stream instances per ClientHttp2Session. Each session + * may have up to 2^31-1 Http2Stream instances over its lifetime. + * This value must be greater than or equal to 0. + * https://nodejs.org/api/http2.html#class-http2stream + */ + maxConcurrentStreams?: number; } export class NodeHttp2Handler implements HttpHandler { @@ -37,7 +47,8 @@ export class NodeHttp2Handler implements HttpHandler { private readonly configProvider: Promise; public readonly metadata = { handlerProtocol: "h2" }; - private sessionCache: Map; + + private readonly connectionManager: NodeHttp2ConnectionManager = new NodeHttp2ConnectionManager({}); constructor(options?: NodeHttp2HandlerOptions | Provider) { this.configProvider = new Promise((resolve, reject) => { @@ -51,19 +62,19 @@ export class NodeHttp2Handler implements HttpHandler { resolve(options || {}); } }); - this.sessionCache = new Map(); } destroy(): void { - for (const sessions of this.sessionCache.values()) { - sessions.forEach((session) => this.destroySession(session)); - } - this.sessionCache.clear(); + this.connectionManager.destroy(); } async handle(request: HttpRequest, { abortSignal }: HttpHandlerOptions = {}): Promise<{ response: HttpResponse }> { if (!this.config) { this.config = await this.configProvider; + this.connectionManager.setDisableConcurrentStreams(this.config.disableConcurrentStreams || false); + if (this.config.maxConcurrentStreams) { + this.connectionManager.setMaxConcurrentStreams(this.config.maxConcurrentStreams); + } } const { requestTimeout, disableConcurrentStreams } = this.config; return new Promise((resolve, rejectOriginal) => { @@ -82,7 +93,11 @@ export class NodeHttp2Handler implements HttpHandler { const { hostname, method, port, protocol, path, query } = request; const authority = `${protocol}//${hostname}${port ? `:${port}` : ""}`; - const session = this.getSession(authority, disableConcurrentStreams || false); + const requestContext = { destination: new URL(authority) } as RequestContext; + const session = this.connectionManager.lease(requestContext, { + requestTimeout: this.config?.sessionTimeout, + disableConcurrentStreams: disableConcurrentStreams || false, + } as ConnectConfiguration); const reject = (err: Error) => { if (disableConcurrentStreams) { @@ -115,7 +130,7 @@ export class NodeHttp2Handler implements HttpHandler { // Gracefully closes the Http2Session, allowing any existing streams to complete // on their own and preventing new Http2Stream instances from being created. session.close(); - this.deleteSessionFromCache(authority, session); + this.connectionManager.deleteSession(authority, session); } }); @@ -163,45 +178,6 @@ export class NodeHttp2Handler implements HttpHandler { }); } - /** - * Returns a session for the given URL. - * - * @param authority The URL to create a session for. - * @param disableConcurrentStreams If true, a new session will be created for each request. - * @returns A session for the given URL. - */ - private getSession(authority: string, disableConcurrentStreams: boolean): ClientHttp2Session { - const sessionCache = this.sessionCache; - - const existingSessions = sessionCache.get(authority) || []; - - // If concurrent streams are not disabled, we can use the existing session. - if (existingSessions.length > 0 && !disableConcurrentStreams) return existingSessions[0]; - - const newSession = connect(authority); - // AWS SDK does not expect server push streams, don't keep node alive without a request. - newSession.unref(); - - const destroySessionCb = () => { - this.destroySession(newSession); - this.deleteSessionFromCache(authority, newSession); - }; - newSession.on("goaway", destroySessionCb); - newSession.on("error", destroySessionCb); - newSession.on("frameError", destroySessionCb); - - newSession.on("close", () => this.deleteSessionFromCache(authority, newSession)); - - if (this.config?.sessionTimeout) { - newSession.setTimeout(this.config.sessionTimeout, destroySessionCb); - } - - existingSessions.push(newSession); - sessionCache.set(authority, existingSessions); - - return newSession; - } - /** * Destroys a session. * @param session The session to destroy. @@ -211,21 +187,4 @@ export class NodeHttp2Handler implements HttpHandler { session.destroy(); } } - - /** - * Delete a session from the connection pool. - * @param authority The authority of the session to delete. - * @param session The session to delete. - */ - private deleteSessionFromCache(authority: string, session: ClientHttp2Session): void { - const existingSessions = this.sessionCache.get(authority) || []; - if (!existingSessions.includes(session)) { - // If the session is not in the cache, it has already been deleted. - return; - } - this.sessionCache.set( - authority, - existingSessions.filter((s) => s !== session) - ); - } } diff --git a/packages/types/src/connection/config.ts b/packages/types/src/connection/config.ts new file mode 100644 index 000000000000..65567f914014 --- /dev/null +++ b/packages/types/src/connection/config.ts @@ -0,0 +1,7 @@ +export interface ConnectConfiguration { + /** + * The maximum time in milliseconds that the connection phase of a request + * may take before the connection attempt is abandoned. + */ + requestTimeout?: number; +} diff --git a/packages/types/src/connection/manager.ts b/packages/types/src/connection/manager.ts new file mode 100644 index 000000000000..d45ec907634b --- /dev/null +++ b/packages/types/src/connection/manager.ts @@ -0,0 +1,33 @@ +import { RequestContext } from "../transfer"; +import { ConnectConfiguration } from "./config"; + +export interface ConnectionManagerConfiguration { + /** + * Maximum number of allowed concurrent requests per connection. + */ + maxConcurrency?: number; + + /** + * Disables concurrent requests per connection. + */ + disableConcurrency?: boolean; +} + +export interface ConnectionManager { + /** + * Retrieves a connection from the connection pool if available, + * otherwise establish a new connection + */ + lease(requestContext: RequestContext, connectionConfiguration: ConnectConfiguration): T; + + /** + * Releases the connection back to the pool making it potentially + * re-usable by other requests. + */ + release(requestContext: RequestContext, connection: T): void; + + /** + * Destroys the connection manager. All connections will be closed. + */ + destroy(): void; +} diff --git a/packages/types/src/connection/pool.ts b/packages/types/src/connection/pool.ts new file mode 100644 index 000000000000..48295a91f0c3 --- /dev/null +++ b/packages/types/src/connection/pool.ts @@ -0,0 +1,28 @@ +export interface ConnectionPool { + /** + * Retrieve the first connection in the pool + */ + poll(): T | void; + + /** + * Release the connection back to the pool making it potentially + * re-usable by other requests. + */ + offerLast(connection: T): void; + + /** + * Removes the connection from the pool, and destroys it. + */ + destroy(connection: T): void; + + /** + * Implements the iterable protocol and allows arrays to be consumed + * by most syntaxes expecting iterables, such as the spread syntax + * and for...of loops + */ + [Symbol.iterator](): Iterator; +} + +export interface CacheKey { + destination: string; +} diff --git a/packages/types/src/http.ts b/packages/types/src/http.ts index ffbeaaa7bf2c..a5c31bdf0774 100644 --- a/packages/types/src/http.ts +++ b/packages/types/src/http.ts @@ -117,4 +117,10 @@ export interface ResolvedHttpResponse extends HttpResponse { */ export interface HttpHandlerOptions { abortSignal?: AbortSignal; + + /** + * The maximum time in milliseconds that the connection phase of a request + * may take before the connection attempt is abandoned. + */ + requestTimeout?: number; } diff --git a/packages/types/src/transfer.ts b/packages/types/src/transfer.ts index 596783b0a2d5..b1eefe81b8ef 100644 --- a/packages/types/src/transfer.ts +++ b/packages/types/src/transfer.ts @@ -21,7 +21,17 @@ export interface RequestHandler * @public */ export interface RequestHandlerMetadata { - // This infers request handler's protocol - // valid values are stated: https://www.iana.org/assignments/tls-extensiontype-values/tls-extensiontype-values.xhtml#alpn-protocol-ids - handlerProtocol: string; + handlerProtocol: RequestHandlerProtocol | string; +} + +// Values from ALPN Protocol IDs +// https://www.iana.org/assignments/tls-extensiontype-values/tls-extensiontype-values.xhtml#alpn-protocol-ids +export enum RequestHandlerProtocol { + HTTP_0_9 = "http/0.9", + HTTP_1_0 = "http/1.0", + TDS_8_0 = "tds/8.0", +} + +export interface RequestContext { + destination: URL; }