diff --git a/packages/types/src/index.ts b/packages/types/src/index.ts index 8f7e1141221f4..94228ab744dba 100644 --- a/packages/types/src/index.ts +++ b/packages/types/src/index.ts @@ -14,6 +14,7 @@ export * from "./middleware"; export * from "./pagination"; export * from "./profile"; export * from "./response"; +export * from "./retry"; export * from "./serde"; export * from "./shapes"; export * from "./signature"; diff --git a/packages/types/src/retry.ts b/packages/types/src/retry.ts new file mode 100644 index 0000000000000..bc3dc40e1c2e4 --- /dev/null +++ b/packages/types/src/retry.ts @@ -0,0 +1,134 @@ +export type RetryErrorType = + /** + * This is a connection level error such as a socket timeout, socket connect + * error, tls negotiation timeout etc... + * Typically these should never be applied for non-idempotent request types + * since in this scenario, it's impossible to know whether the operation had + * a side effect on the server. + */ + | "TRANSIENT" + + /** + * This is an error where the server explicitly told the client to back off, + * such as a 429 or 503 Http error. + */ + | "THROTTLING" + + /** + * This is a server error that isn't explicitly throttling but is considered + * by the client to be something that should be retried. + */ + | "SERVER_ERROR" + + /** + * Doesn't count against any budgets. This could be something like a 401 + * challenge in Http. + */ + | "CLIENT_ERROR"; + +export interface RetryErrorInfo { + errorType: RetryErrorType; + + /** + * Protocol hint. This could come from Http's 'retry-after' header or + * something from MQTT or any other protocol that has the ability to convey + * retry info from a peer. + * + * @returns the Date after which a retry should be attempted. + */ + retryAfterHint?: Date; +} + +export interface RetryBackoffStrategy { + /** + * @returns the number of milliseconds to wait before retrying an action. + */ + computeNextBackoffDelay(retryAttempt: number): number; +} + +export interface StandardRetryBackoffStrategy extends RetryBackoffStrategy { + /** + * Sets the delayBase used to compute backoff delays. + * @param delayBase + */ + setDelayBase(delayBase: number): void; +} + +export interface RetryStrategyOptions { + backoffStrategy: RetryBackoffStrategy; + + maxRetriesBase: number; +} + +export interface RetryToken { + /** + * @returns the current count of retry. + */ + getRetryCount(): number; + + /** + * @returns the number of milliseconds to wait before retrying an action. + */ + getRetryDelay(): number; +} + +export interface StandardRetryToken extends RetryToken { + /** + * @returns wheather token has remaining tokens. + */ + hasRetryTokens(errorType: RetryErrorType): boolean; + + /** + * @returns the number of available tokens. + */ + retrieveRetryTokens(errorInfo: RetryErrorInfo): number; + + /** + * @returns the cost of the last retry attemp. + */ + getLastRetryCost(): number | undefined; + + /** + * Releases a number of tokens. + * + * @param amount of tokens to release. + */ + releaseRetryTokens(amount?: number): void; +} + +export interface RetryStrategyV2 { + /** + * Called before any retries (for the first call to the operation). It either + * returns a retry token or an error upon the failure to acquire a token prior. + * + * tokenScope is arbitrary and out of scope for this component. However, + * adding it here offers us a lot of future flexibility for outage detection. + * For example, it could be "us-east-1" on a shared retry strategy, or + * "us-west-2-c:dynamodb". + */ + acquireInitialRetryToken(retryTokenScope: string): Promise; + + /** + * After a failed operation call, this function is invoked to refresh the + * retryToken returned by acquireInitialRetryToken(). This function can + * either choose to allow another retry and send a new or updated token, + * or reject the retry attempt and report the error either in an exception + * or returning an error. + */ + refreshRetryTokenForRetry(tokenToRenew: RetryToken, errorInfo: RetryErrorInfo): Promise; + + /** + * Upon successful completion of the operation, a user calls this function + * to record that the operation was successful. + */ + recordSuccess(token: RetryToken): void; +} + +export type ExponentialBackoffJitterType = "DEFAULT" | "NONE" | "FULL" | "DECORRELATED"; + +export interface ExponentialBackoffStrategyOptions { + jitterType: ExponentialBackoffJitterType; + + /* Scaling factor to add for the backoff in milliseconds. Default is 25ms */ + backoffScaleValue?: number; +} diff --git a/packages/util-retry/CHANGELOG.md b/packages/util-retry/CHANGELOG.md new file mode 100644 index 0000000000000..e9fb6ecf59301 --- /dev/null +++ b/packages/util-retry/CHANGELOG.md @@ -0,0 +1,4 @@ +# Change Log + +All notable changes to this project will be documented in this file. +See [Conventional Commits](https://conventionalcommits.org) for commit guidelines. \ No newline at end of file diff --git a/packages/util-retry/LICENSE b/packages/util-retry/LICENSE new file mode 100644 index 0000000000000..a1895fac30dc7 --- /dev/null +++ b/packages/util-retry/LICENSE @@ -0,0 +1,201 @@ +Apache License + Version 2.0, January 2004 + http://www.apache.org/licenses/ + + TERMS AND CONDITIONS FOR USE, REPRODUCTION, AND DISTRIBUTION + + 1. Definitions. + + "License" shall mean the terms and conditions for use, reproduction, + and distribution as defined by Sections 1 through 9 of this document. + + "Licensor" shall mean the copyright owner or entity authorized by + the copyright owner that is granting the License. + + "Legal Entity" shall mean the union of the acting entity and all + other entities that control, are controlled by, or are under common + control with that entity. For the purposes of this definition, + "control" means (i) the power, direct or indirect, to cause the + direction or management of such entity, whether by contract or + otherwise, or (ii) ownership of fifty percent (50%) or more of the + outstanding shares, or (iii) beneficial ownership of such entity. + + "You" (or "Your") shall mean an individual or Legal Entity + exercising permissions granted by this License. + + "Source" form shall mean the preferred form for making modifications, + including but not limited to software source code, documentation + source, and configuration files. + + "Object" form shall mean any form resulting from mechanical + transformation or translation of a Source form, including but + not limited to compiled object code, generated documentation, + and conversions to other media types. + + "Work" shall mean the work of authorship, whether in Source or + Object form, made available under the License, as indicated by a + copyright notice that is included in or attached to the work + (an example is provided in the Appendix below). + + "Derivative Works" shall mean any work, whether in Source or Object + form, that is based on (or derived from) the Work and for which the + editorial revisions, annotations, elaborations, or other modifications + represent, as a whole, an original work of authorship. For the purposes + of this License, Derivative Works shall not include works that remain + separable from, or merely link (or bind by name) to the interfaces of, + the Work and Derivative Works thereof. + + "Contribution" shall mean any work of authorship, including + the original version of the Work and any modifications or additions + to that Work or Derivative Works thereof, that is intentionally + submitted to Licensor for inclusion in the Work by the copyright owner + or by an individual or Legal Entity authorized to submit on behalf of + the copyright owner. For the purposes of this definition, "submitted" + means any form of electronic, verbal, or written communication sent + to the Licensor or its representatives, including but not limited to + communication on electronic mailing lists, source code control systems, + and issue tracking systems that are managed by, or on behalf of, the + Licensor for the purpose of discussing and improving the Work, but + excluding communication that is conspicuously marked or otherwise + designated in writing by the copyright owner as "Not a Contribution." + + "Contributor" shall mean Licensor and any individual or Legal Entity + on behalf of whom a Contribution has been received by Licensor and + subsequently incorporated within the Work. + + 2. Grant of Copyright License. Subject to the terms and conditions of + this License, each Contributor hereby grants to You a perpetual, + worldwide, non-exclusive, no-charge, royalty-free, irrevocable + copyright license to reproduce, prepare Derivative Works of, + publicly display, publicly perform, sublicense, and distribute the + Work and such Derivative Works in Source or Object form. + + 3. Grant of Patent License. Subject to the terms and conditions of + this License, each Contributor hereby grants to You a perpetual, + worldwide, non-exclusive, no-charge, royalty-free, irrevocable + (except as stated in this section) patent license to make, have made, + use, offer to sell, sell, import, and otherwise transfer the Work, + where such license applies only to those patent claims licensable + by such Contributor that are necessarily infringed by their + Contribution(s) alone or by combination of their Contribution(s) + with the Work to which such Contribution(s) was submitted. If You + institute patent litigation against any entity (including a + cross-claim or counterclaim in a lawsuit) alleging that the Work + or a Contribution incorporated within the Work constitutes direct + or contributory patent infringement, then any patent licenses + granted to You under this License for that Work shall terminate + as of the date such litigation is filed. + + 4. Redistribution. You may reproduce and distribute copies of the + Work or Derivative Works thereof in any medium, with or without + modifications, and in Source or Object form, provided that You + meet the following conditions: + + (a) You must give any other recipients of the Work or + Derivative Works a copy of this License; and + + (b) You must cause any modified files to carry prominent notices + stating that You changed the files; and + + (c) You must retain, in the Source form of any Derivative Works + that You distribute, all copyright, patent, trademark, and + attribution notices from the Source form of the Work, + excluding those notices that do not pertain to any part of + the Derivative Works; and + + (d) If the Work includes a "NOTICE" text file as part of its + distribution, then any Derivative Works that You distribute must + include a readable copy of the attribution notices contained + within such NOTICE file, excluding those notices that do not + pertain to any part of the Derivative Works, in at least one + of the following places: within a NOTICE text file distributed + as part of the Derivative Works; within the Source form or + documentation, if provided along with the Derivative Works; or, + within a display generated by the Derivative Works, if and + wherever such third-party notices normally appear. The contents + of the NOTICE file are for informational purposes only and + do not modify the License. You may add Your own attribution + notices within Derivative Works that You distribute, alongside + or as an addendum to the NOTICE text from the Work, provided + that such additional attribution notices cannot be construed + as modifying the License. + + You may add Your own copyright statement to Your modifications and + may provide additional or different license terms and conditions + for use, reproduction, or distribution of Your modifications, or + for any such Derivative Works as a whole, provided Your use, + reproduction, and distribution of the Work otherwise complies with + the conditions stated in this License. + + 5. Submission of Contributions. Unless You explicitly state otherwise, + any Contribution intentionally submitted for inclusion in the Work + by You to the Licensor shall be under the terms and conditions of + this License, without any additional terms or conditions. + Notwithstanding the above, nothing herein shall supersede or modify + the terms of any separate license agreement you may have executed + with Licensor regarding such Contributions. + + 6. Trademarks. This License does not grant permission to use the trade + names, trademarks, service marks, or product names of the Licensor, + except as required for reasonable and customary use in describing the + origin of the Work and reproducing the content of the NOTICE file. + + 7. Disclaimer of Warranty. Unless required by applicable law or + agreed to in writing, Licensor provides the Work (and each + Contributor provides its Contributions) on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + implied, including, without limitation, any warranties or conditions + of TITLE, NON-INFRINGEMENT, MERCHANTABILITY, or FITNESS FOR A + PARTICULAR PURPOSE. You are solely responsible for determining the + appropriateness of using or redistributing the Work and assume any + risks associated with Your exercise of permissions under this License. + + 8. Limitation of Liability. In no event and under no legal theory, + whether in tort (including negligence), contract, or otherwise, + unless required by applicable law (such as deliberate and grossly + negligent acts) or agreed to in writing, shall any Contributor be + liable to You for damages, including any direct, indirect, special, + incidental, or consequential damages of any character arising as a + result of this License or out of the use or inability to use the + Work (including but not limited to damages for loss of goodwill, + work stoppage, computer failure or malfunction, or any and all + other commercial damages or losses), even if such Contributor + has been advised of the possibility of such damages. + + 9. Accepting Warranty or Additional Liability. While redistributing + the Work or Derivative Works thereof, You may choose to offer, + and charge a fee for, acceptance of support, warranty, indemnity, + or other liability obligations and/or rights consistent with this + License. However, in accepting such obligations, You may act only + on Your own behalf and on Your sole responsibility, not on behalf + of any other Contributor, and only if You agree to indemnify, + defend, and hold each Contributor harmless for any liability + incurred by, or claims asserted against, such Contributor by reason + of your accepting any such warranty or additional liability. + + END OF TERMS AND CONDITIONS + + APPENDIX: How to apply the Apache License to your work. + + To apply the Apache License to your work, attach the following + boilerplate notice, with the fields enclosed by brackets "{}" + replaced with your own identifying information. (Don't include + the brackets!) The text should be enclosed in the appropriate + comment syntax for the file format. We also recommend that a + file or class name and description of purpose be included on the + same "printed page" as the copyright notice for easier + identification within third-party archives. + + Copyright 2021 Amazon.com, Inc. or its affiliates. All Rights Reserved. + + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. \ No newline at end of file diff --git a/packages/util-retry/README.md b/packages/util-retry/README.md new file mode 100644 index 0000000000000..b35e76c1089fe --- /dev/null +++ b/packages/util-retry/README.md @@ -0,0 +1,12 @@ +# @aws-sdk/util-retry + +[![NPM version](https://img.shields.io/npm/v/@aws-sdk/util-retry/latest.svg)](https://www.npmjs.com/package/@aws-sdk/util-retry) +[![NPM downloads](https://img.shields.io/npm/dm/@aws-sdk/util-retry.svg)](https://www.npmjs.com/package/@aws-sdk/util-retry) + +> An internal package + +This package provides shared utilities for retries. + +## Usage + +You probably shouldn't, at least directly. diff --git a/packages/util-retry/jest.config.js b/packages/util-retry/jest.config.js new file mode 100644 index 0000000000000..a8d1c2e499123 --- /dev/null +++ b/packages/util-retry/jest.config.js @@ -0,0 +1,5 @@ +const base = require("../../jest.config.base.js"); + +module.exports = { + ...base, +}; diff --git a/packages/util-retry/package.json b/packages/util-retry/package.json new file mode 100644 index 0000000000000..be8a74954f8ff --- /dev/null +++ b/packages/util-retry/package.json @@ -0,0 +1,60 @@ +{ + "name": "@aws-sdk/util-retry", + "version": "3.0.0", + "description": "Shared retry utilities to be used in middleware packages.", + "main": "./dist-cjs/index.js", + "module": "./dist-es/index.js", + "scripts": { + "build": "concurrently 'yarn:build:cjs' 'yarn:build:es' 'yarn:build:types'", + "build:cjs": "tsc -p tsconfig.cjs.json", + "build:es": "tsc -p tsconfig.es.json", + "build:include:deps": "lerna run --scope $npm_package_name --include-dependencies build", + "build:types": "tsc -p tsconfig.types.json", + "build:types:downlevel": "downlevel-dts dist-types dist-types/ts3.4", + "clean": "rimraf ./dist-* && rimraf *.tsbuildinfo", + "test": "jest" + }, + "keywords": [ + "aws", + "retry" + ], + "author": { + "name": "AWS SDK for JavaScript Team", + "url": "https://aws.amazon.com/javascript/" + }, + "license": "Apache-2.0", + "dependencies": { + "@aws-sdk/service-error-classification": "*", + "tslib": "^2.3.1" + }, + "devDependencies": { + "@aws-sdk/types": "*", + "@tsconfig/recommended": "1.0.1", + "@types/node": "^14.14.31", + "concurrently": "7.0.0", + "downlevel-dts": "0.10.1", + "rimraf": "3.0.2", + "typedoc": "0.19.2", + "typescript": "~4.6.2" + }, + "types": "./dist-types/index.d.ts", + "engines": { + "node": ">= 14.0.0" + }, + "typesVersions": { + "<4.0": { + "types/*": [ + "types/ts3.4/*" + ] + } + }, + "files": [ + "dist-*" + ], + "homepage": "https://github.com/aws/aws-sdk-js-v3/tree/master/packages/util-retry", + "repository": { + "type": "git", + "url": "https://github.com/aws/aws-sdk-js-v3.git", + "directory": "packages/util-retry" + } +} diff --git a/packages/util-retry/src/AdaptiveRetryStrategy.spec.ts b/packages/util-retry/src/AdaptiveRetryStrategy.spec.ts new file mode 100644 index 0000000000000..0cbf056c431f2 --- /dev/null +++ b/packages/util-retry/src/AdaptiveRetryStrategy.spec.ts @@ -0,0 +1,105 @@ +import { RetryErrorInfo, RetryErrorType, StandardRetryToken } from "@aws-sdk/types"; + +import { AdaptiveRetryStrategy } from "./AdaptiveRetryStrategy"; +import { RETRY_MODES } from "./config"; +import { DefaultRateLimiter } from "./DefaultRateLimiter"; +import { StandardRetryStrategy } from "./StandardRetryStrategy"; +import { RateLimiter } from "./types"; + +jest.mock("./StandardRetryStrategy"); +jest.mock("./DefaultRateLimiter"); + +describe(AdaptiveRetryStrategy.name, () => { + const maxAttemptsProvider = jest.fn(); + const retryTokenScope = "scope"; + const mockDefaultRateLimiter = { + getSendToken: jest.fn(), + updateClientSendingRate: jest.fn(), + }; + const mockRetryToken: StandardRetryToken = { + hasRetryTokens: (errorType: RetryErrorType) => true, + getLastRetryCost: () => 1, + retrieveRetryTokens: (errorInfo: RetryErrorInfo) => 1, + releaseRetryTokens: (amount: number) => {}, + getRetryCount: () => 1, + getRetryDelay: () => 1, + }; + const errorInfo = { + errorType: "TRANSIENT", + } as RetryErrorInfo; + + beforeEach(() => { + (DefaultRateLimiter as jest.Mock).mockReturnValue(mockDefaultRateLimiter); + }); + + afterEach(() => { + jest.clearAllMocks(); + }); + + it(`sets mode=${RETRY_MODES.ADAPTIVE}`, () => { + const retryStrategy = new AdaptiveRetryStrategy(maxAttemptsProvider); + expect(retryStrategy.mode).toStrictEqual(RETRY_MODES.ADAPTIVE); + }); + + describe("rateLimiter init", () => { + it("sets getDefaultrateLimiter if options is undefined", () => { + const retryStrategy = new AdaptiveRetryStrategy(maxAttemptsProvider); + expect(retryStrategy["rateLimiter"]).toBe(mockDefaultRateLimiter); + }); + + it("sets DefaultRateLimiter if options.rateLimiter undefined", () => { + const retryStrategy = new AdaptiveRetryStrategy(maxAttemptsProvider, {}); + expect(retryStrategy["rateLimiter"]).toBe(mockDefaultRateLimiter); + }); + + it("sets options.rateLimiter if defined", () => { + const rateLimiter = {} as RateLimiter; + const retryStrategy = new AdaptiveRetryStrategy(maxAttemptsProvider, { + rateLimiter, + }); + expect(retryStrategy["rateLimiter"]).toBe(rateLimiter); + }); + }); + + describe("acquireInitialRetryToken", () => { + it("calls rateLimiter.getSendToken and returns initial retry token ", async () => { + const mockedStandardRetryStrategy = jest.spyOn(StandardRetryStrategy.prototype, "acquireInitialRetryToken"); + mockedStandardRetryStrategy.mockResolvedValue(mockRetryToken); + const retryStrategy = new AdaptiveRetryStrategy(maxAttemptsProvider, { + rateLimiter: mockDefaultRateLimiter, + }); + const token = await retryStrategy.acquireInitialRetryToken(retryTokenScope); + expect(mockDefaultRateLimiter.getSendToken).toHaveBeenCalledTimes(1); + expect(mockedStandardRetryStrategy).toHaveBeenCalledTimes(1); + expect(token).toStrictEqual(mockRetryToken); + }); + }); + describe("refreshRetryTokenForRetry", () => { + it("calls rateLimiter.updateCientSendingRate and refreshes retry token", async () => { + const mockedStandardRetryStrategy = jest.spyOn(StandardRetryStrategy.prototype, "refreshRetryTokenForRetry"); + mockedStandardRetryStrategy.mockResolvedValue(mockRetryToken); + const retryStrategy = new AdaptiveRetryStrategy(maxAttemptsProvider, { + rateLimiter: mockDefaultRateLimiter, + }); + const token = await retryStrategy.refreshRetryTokenForRetry(mockRetryToken, errorInfo); + expect(mockDefaultRateLimiter.updateClientSendingRate).toHaveBeenCalledTimes(1); + expect(mockDefaultRateLimiter.updateClientSendingRate).toHaveBeenCalledWith(errorInfo); + expect(mockedStandardRetryStrategy).toHaveBeenCalledTimes(1); + expect(mockedStandardRetryStrategy).toHaveBeenCalledWith(mockRetryToken, errorInfo); + expect(token).toStrictEqual(mockRetryToken); + }); + }); + describe("recordSuccess", () => { + it("rateLimiter.updateCientSendingRate and records success on token", async () => { + const mockedStandardRetryStrategy = jest.spyOn(StandardRetryStrategy.prototype, "recordSuccess"); + const retryStrategy = new AdaptiveRetryStrategy(maxAttemptsProvider, { + rateLimiter: mockDefaultRateLimiter, + }); + retryStrategy.recordSuccess(mockRetryToken); + expect(mockDefaultRateLimiter.updateClientSendingRate).toHaveBeenCalledTimes(1); + expect(mockDefaultRateLimiter.updateClientSendingRate).toHaveBeenCalledWith({}); + expect(mockedStandardRetryStrategy).toHaveBeenCalledTimes(1); + expect(mockedStandardRetryStrategy).toHaveBeenCalledWith(mockRetryToken); + }); + }); +}); diff --git a/packages/util-retry/src/AdaptiveRetryStrategy.ts b/packages/util-retry/src/AdaptiveRetryStrategy.ts new file mode 100644 index 0000000000000..8e27d5d20dbe4 --- /dev/null +++ b/packages/util-retry/src/AdaptiveRetryStrategy.ts @@ -0,0 +1,54 @@ +import { Provider, RetryErrorInfo, RetryStrategyV2, RetryToken, StandardRetryToken } from "@aws-sdk/types"; + +import { RETRY_MODES } from "./config"; +import { DefaultRateLimiter } from "./DefaultRateLimiter"; +import { StandardRetryStrategy } from "./StandardRetryStrategy"; +import { RateLimiter } from "./types"; + +/** + * Strategy options to be passed to AdaptiveRetryStrategy + */ +export interface AdaptiveRetryStrategyOptions { + rateLimiter?: RateLimiter; +} + +/** + * The AdaptiveRetryStrategy is a retry strategy for executing against a very + * resource constrained set of resources. Care should be taken when using this + * retry strategy. By default, it uses a dynamic backoff delay based on load + * currently perceived against the downstream resource and performs circuit + * breaking to disable retries in the event of high downstream failures using + * the DefaultRateLimiter. + * + * @see {@link StandardRetryStrategy} + * @see {@link DefaultRateLimiter } + */ +export class AdaptiveRetryStrategy implements RetryStrategyV2 { + private rateLimiter: RateLimiter; + private standardRetryStrategy: StandardRetryStrategy; + public readonly mode: string = RETRY_MODES.ADAPTIVE; + + constructor(private readonly maxAttemptsProvider: Provider, options?: AdaptiveRetryStrategyOptions) { + const { rateLimiter } = options ?? {}; + this.rateLimiter = rateLimiter ?? new DefaultRateLimiter(); + this.standardRetryStrategy = new StandardRetryStrategy(maxAttemptsProvider); + } + + public async acquireInitialRetryToken(retryTokenScope: string): Promise { + await this.rateLimiter.getSendToken(); + return this.standardRetryStrategy.acquireInitialRetryToken(retryTokenScope); + } + + public async refreshRetryTokenForRetry( + tokenToRenew: StandardRetryToken, + errorInfo: RetryErrorInfo + ): Promise { + this.rateLimiter.updateClientSendingRate(errorInfo); + return this.standardRetryStrategy.refreshRetryTokenForRetry(tokenToRenew, errorInfo); + } + + public recordSuccess(token: StandardRetryToken): void { + this.rateLimiter.updateClientSendingRate({}); + this.standardRetryStrategy.recordSuccess(token); + } +} diff --git a/packages/util-retry/src/DefaultRateLimiter.spec.ts b/packages/util-retry/src/DefaultRateLimiter.spec.ts new file mode 100644 index 0000000000000..a548c48c3ac77 --- /dev/null +++ b/packages/util-retry/src/DefaultRateLimiter.spec.ts @@ -0,0 +1,120 @@ +import { isThrottlingError } from "@aws-sdk/service-error-classification"; + +import { DefaultRateLimiter } from "./DefaultRateLimiter"; + +jest.mock("@aws-sdk/service-error-classification"); + +describe(DefaultRateLimiter.name, () => { + beforeEach(() => { + (isThrottlingError as jest.Mock).mockReturnValue(false); + }); + + afterEach(() => { + jest.clearAllMocks(); + }); + + describe("getSendToken", () => { + beforeEach(() => { + jest.useFakeTimers({ legacyFakeTimers: true }); + }); + + afterEach(() => { + jest.useRealTimers(); + }); + + it.each([ + [0.5, 892.8571428571428], + [1, 1785.7142857142856], + [2, 2000], + ])("timestamp: %d, delay: %d", async (timestamp, delay) => { + jest.spyOn(Date, "now").mockImplementation(() => 0); + const rateLimiter = new DefaultRateLimiter(); + + (isThrottlingError as jest.Mock).mockReturnValueOnce(true); + jest.spyOn(Date, "now").mockImplementation(() => timestamp * 1000); + rateLimiter.updateClientSendingRate({}); + + rateLimiter.getSendToken(); + jest.runAllTimers(); + expect(setTimeout).toHaveBeenLastCalledWith(expect.any(Function), delay); + }); + }); + + describe("cubicSuccess", () => { + it.each([ + [5, 7], + [6, 9.64893601], + [7, 10.00003085], + [8, 10.45328452], + [9, 13.40869703], + [10, 21.26626836], + [11, 36.42599853], + ])("timestamp: %d, calculatedRate: %d", (timestamp, calculatedRate) => { + jest.spyOn(Date, "now").mockImplementation(() => 0); + const rateLimiter = new DefaultRateLimiter(); + rateLimiter["lastMaxRate"] = 10; + rateLimiter["lastThrottleTime"] = 5; + + jest.spyOn(Date, "now").mockImplementation(() => timestamp * 1000); + + const cubicSuccessSpy = jest.spyOn(DefaultRateLimiter.prototype as any, "cubicSuccess"); + rateLimiter.updateClientSendingRate({}); + expect(cubicSuccessSpy).toHaveLastReturnedWith(calculatedRate); + }); + }); + + describe("cubicThrottle", () => { + it.each([ + [5, 0.112], + [6, 0.09333333], + [7, 0.08], + [8, 0.07], + [9, 0.06222222], + ])("timestamp: %d, calculatedRate: %d", (timestamp, calculatedRate) => { + jest.spyOn(Date, "now").mockImplementation(() => 0); + const rateLimiter = new DefaultRateLimiter(); + rateLimiter["lastMaxRate"] = 10; + rateLimiter["lastThrottleTime"] = 5; + + (isThrottlingError as jest.Mock).mockReturnValueOnce(true); + jest.spyOn(Date, "now").mockImplementation(() => timestamp * 1000); + const cubicThrottleSpy = jest.spyOn(DefaultRateLimiter.prototype as any, "cubicThrottle"); + rateLimiter.updateClientSendingRate({}); + expect(cubicThrottleSpy).toHaveLastReturnedWith(calculatedRate); + }); + }); + + it("updateClientSendingRate", () => { + jest.spyOn(Date, "now").mockImplementation(() => 0); + const rateLimiter = new DefaultRateLimiter(); + + const testCases: [boolean, number, number, number][] = [ + [false, 0.2, 0, 0.5], + [false, 0.4, 0, 0.5], + [false, 0.6, 4.8, 0.5], + [false, 0.8, 4.8, 0.5], + [false, 1, 4.16, 0.5], + [false, 1.2, 4.16, 0.6912], + [false, 1.4, 4.16, 1.0976], + [false, 1.6, 5.632, 1.6384], + [false, 1.8, 5.632, 2.3328], + [true, 2, 4.3264, 3.02848], + [false, 2.2, 4.3264, 3.486639], + [false, 2.4, 4.3264, 3.821874], + [false, 2.6, 5.66528, 4.053386], + [false, 2.8, 5.66528, 4.200373], + [false, 3.0, 4.333056, 4.282037], + [true, 3.2, 4.333056, 2.997426], + [false, 3.4, 4.333056, 3.452226], + ]; + + testCases.forEach(([isThrottlingErrorReturn, timestamp, measuredTxRate, fillRate]) => { + (isThrottlingError as jest.Mock).mockReturnValue(isThrottlingErrorReturn); + jest.spyOn(Date, "now").mockImplementation(() => timestamp * 1000); + + rateLimiter.updateClientSendingRate({}); + expect(rateLimiter["measuredTxRate"]).toEqual(measuredTxRate); + expect(parseFloat(rateLimiter["fillRate"].toFixed(6))).toEqual(fillRate); + }); + }); +}); diff --git a/packages/util-retry/src/DefaultRateLimiter.ts b/packages/util-retry/src/DefaultRateLimiter.ts new file mode 100644 index 0000000000000..4167b4aa70d1b --- /dev/null +++ b/packages/util-retry/src/DefaultRateLimiter.ts @@ -0,0 +1,150 @@ +import { isThrottlingError } from "@aws-sdk/service-error-classification"; + +import { RateLimiter } from "./types"; + +export interface DefaultRateLimiterOptions { + beta?: number; + minCapacity?: number; + minFillRate?: number; + scaleConstant?: number; + smooth?: number; +} + +export class DefaultRateLimiter implements RateLimiter { + // User configurable constants + private beta: number; + private minCapacity: number; + private minFillRate: number; + private scaleConstant: number; + private smooth: number; + + // Pre-set state variables + private currentCapacity = 0; + private enabled = false; + private lastMaxRate = 0; + private measuredTxRate = 0; + private requestCount = 0; + + // Other state variables + private fillRate: number; + private lastThrottleTime: number; + private lastTimestamp = 0; + private lastTxRateBucket: number; + private maxCapacity: number; + private timeWindow = 0; + + constructor(options?: DefaultRateLimiterOptions) { + this.beta = options?.beta ?? 0.7; + this.minCapacity = options?.minCapacity ?? 1; + this.minFillRate = options?.minFillRate ?? 0.5; + this.scaleConstant = options?.scaleConstant ?? 0.4; + this.smooth = options?.smooth ?? 0.8; + + const currentTimeInSeconds = this.getCurrentTimeInSeconds(); + this.lastThrottleTime = currentTimeInSeconds; + this.lastTxRateBucket = Math.floor(this.getCurrentTimeInSeconds()); + + this.fillRate = this.minFillRate; + this.maxCapacity = this.minCapacity; + } + + private getCurrentTimeInSeconds() { + return Date.now() / 1000; + } + + public async getSendToken() { + return this.acquireTokenBucket(1); + } + + private async acquireTokenBucket(amount: number) { + // Client side throttling is not enabled until we see a throttling error. + if (!this.enabled) { + return; + } + + this.refillTokenBucket(); + if (amount > this.currentCapacity) { + const delay = ((amount - this.currentCapacity) / this.fillRate) * 1000; + await new Promise((resolve) => setTimeout(resolve, delay)); + } + this.currentCapacity = this.currentCapacity - amount; + } + + private refillTokenBucket() { + const timestamp = this.getCurrentTimeInSeconds(); + if (!this.lastTimestamp) { + this.lastTimestamp = timestamp; + return; + } + + const fillAmount = (timestamp - this.lastTimestamp) * this.fillRate; + this.currentCapacity = Math.min(this.maxCapacity, this.currentCapacity + fillAmount); + this.lastTimestamp = timestamp; + } + + public updateClientSendingRate(response: any) { + let calculatedRate: number; + this.updateMeasuredRate(); + + if (isThrottlingError(response)) { + const rateToUse = !this.enabled ? this.measuredTxRate : Math.min(this.measuredTxRate, this.fillRate); + this.lastMaxRate = rateToUse; + this.calculateTimeWindow(); + this.lastThrottleTime = this.getCurrentTimeInSeconds(); + calculatedRate = this.cubicThrottle(rateToUse); + this.enableTokenBucket(); + } else { + this.calculateTimeWindow(); + calculatedRate = this.cubicSuccess(this.getCurrentTimeInSeconds()); + } + + const newRate = Math.min(calculatedRate, 2 * this.measuredTxRate); + this.updateTokenBucketRate(newRate); + } + + private calculateTimeWindow() { + this.timeWindow = this.getPrecise(Math.pow((this.lastMaxRate * (1 - this.beta)) / this.scaleConstant, 1 / 3)); + } + + private cubicThrottle(rateToUse: number) { + return this.getPrecise(rateToUse * this.beta); + } + + private cubicSuccess(timestamp: number) { + return this.getPrecise( + this.scaleConstant * Math.pow(timestamp - this.lastThrottleTime - this.timeWindow, 3) + this.lastMaxRate + ); + } + + private enableTokenBucket() { + this.enabled = true; + } + + private updateTokenBucketRate(newRate: number) { + // Refill based on our current rate before we update to the new fill rate. + this.refillTokenBucket(); + + this.fillRate = Math.max(newRate, this.minFillRate); + this.maxCapacity = Math.max(newRate, this.minCapacity); + + // When we scale down we can't have a current capacity that exceeds our maxCapacity. + this.currentCapacity = Math.min(this.currentCapacity, this.maxCapacity); + } + + private updateMeasuredRate() { + const t = this.getCurrentTimeInSeconds(); + const timeBucket = Math.floor(t * 2) / 2; + this.requestCount++; + + if (timeBucket > this.lastTxRateBucket) { + const currentRate = this.requestCount / (timeBucket - this.lastTxRateBucket); + this.measuredTxRate = this.getPrecise(currentRate * this.smooth + this.measuredTxRate * (1 - this.smooth)); + this.requestCount = 0; + this.lastTxRateBucket = timeBucket; + } + } + + private getPrecise(num: number) { + return parseFloat(num.toFixed(8)); + } +} diff --git a/packages/util-retry/src/StandardRetryStrategy.spec.ts b/packages/util-retry/src/StandardRetryStrategy.spec.ts new file mode 100644 index 0000000000000..9fb1891298691 --- /dev/null +++ b/packages/util-retry/src/StandardRetryStrategy.spec.ts @@ -0,0 +1,160 @@ +import { RetryErrorInfo, RetryErrorType } from "@aws-sdk/types"; + +import { RETRY_MODES } from "./config"; +import { DEFAULT_RETRY_DELAY_BASE, INITIAL_RETRY_TOKENS } from "./constants"; +import { getDefaultRetryToken } from "./defaultRetryToken"; +import { StandardRetryStrategy } from "./StandardRetryStrategy"; + +jest.mock("./defaultRetryToken"); + +describe(StandardRetryStrategy.name, () => { + const maxAttempts = 3; + const retryTokenScope = "scope"; + const mockRetryToken = { + getRetryCount: () => 1, + retrieveRetryTokens: (errorInfo: any) => 1, + }; + const noRetryTokenAvailableError = new Error("No retry token available"); + const errorInfo = { errorType: "TRANSIENT" } as RetryErrorInfo; + + beforeEach(() => { + (getDefaultRetryToken as jest.Mock).mockReturnValue(mockRetryToken); + }); + + afterEach(() => { + jest.clearAllMocks; + }); + + it("sets maxAttemptsProvider as a class member variable", () => { + [1, 2, 3].forEach((maxAttempts) => { + const retryStrategy = new StandardRetryStrategy(() => Promise.resolve(maxAttempts)); + expect(retryStrategy["maxAttemptsProvider"]()).resolves.toBe(maxAttempts); + }); + }); + + it(`sets mode=${RETRY_MODES.STANDARD}`, () => { + const retryStrategy = new StandardRetryStrategy(() => Promise.resolve(maxAttempts)); + expect(retryStrategy.mode).toStrictEqual(RETRY_MODES.STANDARD); + }); + + describe("retryToken init", () => { + it("sets retryToken", () => { + const retryStrategy = new StandardRetryStrategy(() => Promise.resolve(maxAttempts)); + expect(retryStrategy["retryToken"]).toBe(getDefaultRetryToken(INITIAL_RETRY_TOKENS, DEFAULT_RETRY_DELAY_BASE)); + }); + }); + + describe("acquireInitialRetryToken", () => { + it("returns default retryToken", async () => { + const retryStrategy = new StandardRetryStrategy(() => Promise.resolve(maxAttempts)); + const retryToken = await retryStrategy.acquireInitialRetryToken(retryTokenScope); + expect(retryToken).toEqual(getDefaultRetryToken(INITIAL_RETRY_TOKENS, DEFAULT_RETRY_DELAY_BASE)); + }); + }); + + describe("refreshRetryTokenForRetry", () => { + it("refreshes the token", async () => { + const retrieveRetryTokens = jest.fn().mockReturnValue(1); + const getRetryCount = jest.fn().mockReturnValue(0); + const hasRetryTokens = jest.fn().mockReturnValue(true); + const mockRetryToken = { + getRetryCount, + retrieveRetryTokens, + hasRetryTokens, + }; + (getDefaultRetryToken as jest.Mock).mockReturnValue(mockRetryToken); + const retryStrategy = new StandardRetryStrategy(() => Promise.resolve(maxAttempts)); + const token = await retryStrategy.acquireInitialRetryToken(retryTokenScope); + const refreshedToken = await retryStrategy.refreshRetryTokenForRetry(token, errorInfo); + expect(retrieveRetryTokens).toHaveBeenCalledTimes(1); + expect(retrieveRetryTokens).toHaveBeenCalledWith(errorInfo); + expect(getRetryCount).toHaveBeenCalledTimes(1); + expect(hasRetryTokens).toHaveBeenCalledTimes(1); + expect(hasRetryTokens).toHaveBeenCalledWith(errorInfo.errorType); + }); + + it("throws when attempts exceeds maxAttempts", async () => { + const mockRetryToken = { + getRetryCount: () => 2, + retrieveRetryTokens: (errorInfo: any) => 1, + }; + (getDefaultRetryToken as jest.Mock).mockReturnValue(mockRetryToken); + const retryStrategy = new StandardRetryStrategy(() => Promise.resolve(1)); + const token = await retryStrategy.acquireInitialRetryToken(retryTokenScope); + try { + await retryStrategy.refreshRetryTokenForRetry(token, errorInfo); + } catch (error) { + expect(error).toStrictEqual(noRetryTokenAvailableError); + } + }); + + it("throws when attempts exceeds default max attempts (3)", async () => { + const mockRetryToken = { + getRetryCount: () => 5, + retrieveRetryTokens: (errorInfo: any) => 1, + }; + (getDefaultRetryToken as jest.Mock).mockReturnValue(mockRetryToken); + const retryStrategy = new StandardRetryStrategy(() => Promise.resolve(5)); + const token = await retryStrategy.acquireInitialRetryToken(retryTokenScope); + try { + await retryStrategy.refreshRetryTokenForRetry(token, errorInfo); + } catch (error) { + expect(error).toStrictEqual(noRetryTokenAvailableError); + } + }); + + it("throws when no tokens are available", async () => { + const mockRetryToken = { + getRetryCount: () => 0, + retrieveRetryTokens: (errorInfo: any) => 1, + hasRetryTokens: (errorType: RetryErrorType) => false, + }; + (getDefaultRetryToken as jest.Mock).mockReturnValue(mockRetryToken); + const retryStrategy = new StandardRetryStrategy(() => Promise.resolve(maxAttempts)); + const token = await retryStrategy.acquireInitialRetryToken(retryTokenScope); + try { + await retryStrategy.refreshRetryTokenForRetry(token, errorInfo); + } catch (error) { + expect(error).toStrictEqual(noRetryTokenAvailableError); + } + }); + + it("throws when error is non-retryable", async () => { + const mockRetryToken = { + getRetryCount: () => 0, + retrieveRetryTokens: (errorInfo: any) => 1, + hasRetryTokens: (errorType: RetryErrorType) => true, + }; + (getDefaultRetryToken as jest.Mock).mockReturnValue(mockRetryToken); + const retryStrategy = new StandardRetryStrategy(() => Promise.resolve(maxAttempts)); + const token = await retryStrategy.acquireInitialRetryToken(retryTokenScope); + const errorInfo = { + errorType: "CLIENT_ERROR", + } as RetryErrorInfo; + try { + await retryStrategy.refreshRetryTokenForRetry(token, errorInfo); + } catch (error) { + expect(error).toStrictEqual(noRetryTokenAvailableError); + } + }); + + describe("recordSuccess", () => { + it("releases tokens", async () => { + const retryCost = 1; + const releaseRetryTokens = jest.fn(); + const getLastRetryCost = jest.fn().mockReturnValue(retryCost); + const mockRetryToken = { + releaseRetryTokens, + getLastRetryCost, + }; + (getDefaultRetryToken as jest.Mock).mockReturnValue(mockRetryToken); + const retryStrategy = new StandardRetryStrategy(() => Promise.resolve(maxAttempts)); + const token = await retryStrategy.acquireInitialRetryToken(retryTokenScope); + retryStrategy.recordSuccess(token); + expect(releaseRetryTokens).toHaveBeenCalledTimes(1); + expect(releaseRetryTokens).toHaveBeenCalledWith(retryCost); + expect(getLastRetryCost).toHaveBeenCalledTimes(1); + }); + }); + }); +}); diff --git a/packages/util-retry/src/StandardRetryStrategy.ts b/packages/util-retry/src/StandardRetryStrategy.ts new file mode 100644 index 0000000000000..92bcb38efc49a --- /dev/null +++ b/packages/util-retry/src/StandardRetryStrategy.ts @@ -0,0 +1,59 @@ +import { Provider, RetryErrorInfo, RetryErrorType, RetryStrategyV2, StandardRetryToken } from "@aws-sdk/types"; + +import { DEFAULT_MAX_ATTEMPTS, RETRY_MODES } from "./config"; +import { DEFAULT_RETRY_DELAY_BASE, INITIAL_RETRY_TOKENS } from "./constants"; +import { getDefaultRetryToken } from "./defaultRetryToken"; + +export class StandardRetryStrategy implements RetryStrategyV2 { + private retryToken: StandardRetryToken; + public readonly mode: string = RETRY_MODES.STANDARD; + + constructor(private readonly maxAttemptsProvider: Provider) { + this.retryToken = getDefaultRetryToken(INITIAL_RETRY_TOKENS, DEFAULT_RETRY_DELAY_BASE); + this.maxAttemptsProvider = maxAttemptsProvider; + } + + public async acquireInitialRetryToken(retryTokenScope: string): Promise { + return this.retryToken; + } + + public async refreshRetryTokenForRetry( + tokenToRenew: StandardRetryToken, + errorInfo: RetryErrorInfo + ): Promise { + const maxAttempts = await this.getMaxAttempts(); + + if (this.shouldRetry(tokenToRenew, errorInfo, maxAttempts)) { + tokenToRenew.retrieveRetryTokens(errorInfo); + return tokenToRenew; + } + throw new Error("No retry token available"); + } + + public recordSuccess(token: StandardRetryToken): void { + this.retryToken.releaseRetryTokens(token.getLastRetryCost()); + } + + private async getMaxAttempts() { + let maxAttempts: number; + try { + return await this.maxAttemptsProvider(); + } catch (error) { + console.warn(`Max attempts provider could not resolve. Using default of ${DEFAULT_MAX_ATTEMPTS}`); + return DEFAULT_MAX_ATTEMPTS; + } + } + + private shouldRetry(tokenToRenew: StandardRetryToken, errorInfo: RetryErrorInfo, maxAttempts: number): boolean { + const attempts = tokenToRenew.getRetryCount(); + return ( + attempts < maxAttempts && + tokenToRenew.hasRetryTokens(errorInfo.errorType) && + this.isRetryableError(errorInfo.errorType) + ); + } + + private isRetryableError(errorType: RetryErrorType): boolean { + return errorType === "THROTTLING" || errorType === "TRANSIENT"; + } +} diff --git a/packages/util-retry/src/config.ts b/packages/util-retry/src/config.ts new file mode 100644 index 0000000000000..2f58e8634959a --- /dev/null +++ b/packages/util-retry/src/config.ts @@ -0,0 +1,15 @@ +export enum RETRY_MODES { + STANDARD = "standard", + ADAPTIVE = "adaptive", +} + +/** + * The default value for how many HTTP requests an SDK should make for a + * single SDK operation invocation before giving up + */ +export const DEFAULT_MAX_ATTEMPTS = 3; + +/** + * The default retry algorithm to use. + */ +export const DEFAULT_RETRY_MODE = "STANDARD" as RETRY_MODES; diff --git a/packages/util-retry/src/constants.ts b/packages/util-retry/src/constants.ts new file mode 100644 index 0000000000000..2df12ce6fa9e0 --- /dev/null +++ b/packages/util-retry/src/constants.ts @@ -0,0 +1,49 @@ +/** + * The base number of milliseconds to use in calculating a suitable cool-down + * time when a retryable error is encountered. + */ +export const DEFAULT_RETRY_DELAY_BASE = 100; + +/** + * The maximum amount of time (in milliseconds) that will be used as a delay + * between retry attempts. + */ +export const MAXIMUM_RETRY_DELAY = 20 * 1000; + +/** + * The retry delay base (in milliseconds) to use when a throttling error is + * encountered. + */ +export const THROTTLING_RETRY_DELAY_BASE = 500; + +/** + * Initial number of retry tokens in Retry Quota + */ +export const INITIAL_RETRY_TOKENS = 500; + +/** + * The total amount of retry tokens to be decremented from retry token balance. + */ +export const RETRY_COST = 5; + +/** + * The total amount of retry tokens to be decremented from retry token balance + * when a throttling error is encountered. + */ +export const TIMEOUT_RETRY_COST = 10; + +/** + * The total amount of retry token to be incremented from retry token balance + * if an SDK operation invocation succeeds without requiring a retry request. + */ +export const NO_RETRY_INCREMENT = 1; + +/** + * Header name for SDK invocation ID + */ +export const INVOCATION_ID_HEADER = "amz-sdk-invocation-id"; + +/** + * Header name for request retry information. + */ +export const REQUEST_HEADER = "amz-sdk-request"; diff --git a/packages/util-retry/src/defaultRetryBackoffStrategy.spec.ts b/packages/util-retry/src/defaultRetryBackoffStrategy.spec.ts new file mode 100644 index 0000000000000..dc0af34aed983 --- /dev/null +++ b/packages/util-retry/src/defaultRetryBackoffStrategy.spec.ts @@ -0,0 +1,67 @@ +import { DEFAULT_RETRY_DELAY_BASE, MAXIMUM_RETRY_DELAY } from "./constants"; +import { getDefaultRetryBackoffStrategy } from "./defaultRetryBackoffStrategy"; + +describe("defaultRetryBackoffStrategy", () => { + const mathDotRandom = Math.random; + + beforeEach(() => { + Math.random = jest.fn().mockReturnValue(1); + }); + + afterEach(() => { + Math.random = mathDotRandom; + }); + + describe(`uses ${DEFAULT_RETRY_DELAY_BASE} by default`, () => { + [0, 1, 2, 3].forEach((attempts) => { + const expectedDelay = Math.floor(2 ** attempts * DEFAULT_RETRY_DELAY_BASE); + const retryBackoffStrategy = getDefaultRetryBackoffStrategy(); + it(`(${attempts}) returns ${expectedDelay}`, () => { + expect(retryBackoffStrategy.computeNextBackoffDelay(attempts)).toBe(expectedDelay); + }); + }); + }); + + describe("retry delay increases exponentially with attempt number", () => { + [0, 1, 2, 3].forEach((attempts) => { + const mockDelayBase = 50; + const expectedDelay = Math.floor(2 ** attempts * mockDelayBase); + const retryBackoffStrategy = getDefaultRetryBackoffStrategy(); + retryBackoffStrategy.setDelayBase(mockDelayBase); + it(`(${attempts}) returns ${expectedDelay}`, () => { + expect(retryBackoffStrategy.computeNextBackoffDelay(attempts)).toBe(expectedDelay); + }); + }); + }); + + describe(`caps retry delay at ${MAXIMUM_RETRY_DELAY / 1000} seconds`, () => { + const retryBackoffStrategy = getDefaultRetryBackoffStrategy(); + it("when value exceeded because of high delayBase", () => { + retryBackoffStrategy.setDelayBase(MAXIMUM_RETRY_DELAY + 1); + expect(retryBackoffStrategy.computeNextBackoffDelay(0)).toBe(MAXIMUM_RETRY_DELAY); + retryBackoffStrategy.setDelayBase(MAXIMUM_RETRY_DELAY + 2); + expect(retryBackoffStrategy.computeNextBackoffDelay(0)).toBe(MAXIMUM_RETRY_DELAY); + }); + + it("when value exceeded because of high attempts number", () => { + const largeAttemptsNumber = Math.ceil(Math.log2(MAXIMUM_RETRY_DELAY)); + retryBackoffStrategy.setDelayBase(1); + expect(retryBackoffStrategy.computeNextBackoffDelay(largeAttemptsNumber)).toBe(MAXIMUM_RETRY_DELAY); + expect(retryBackoffStrategy.computeNextBackoffDelay(largeAttemptsNumber + 1)).toBe(MAXIMUM_RETRY_DELAY); + }); + }); + + describe("randomizes the retry delay value", () => { + const retryBackoffStrategy = getDefaultRetryBackoffStrategy(); + Array.from({ length: 3 }, () => Math.random()).forEach((mockRandomValue) => { + const attempts = 0; + const delayBase = 100; + const expectedDelay = Math.floor(mockRandomValue * 2 ** attempts * delayBase); + retryBackoffStrategy.setDelayBase(delayBase); + it(`(${delayBase}, ${attempts}) with mock Math.random=${mockRandomValue} returns ${expectedDelay}`, () => { + Math.random = jest.fn().mockReturnValue(mockRandomValue); + expect(retryBackoffStrategy.computeNextBackoffDelay(attempts)).toBe(expectedDelay); + }); + }); + }); +}); diff --git a/packages/util-retry/src/defaultRetryBackoffStrategy.ts b/packages/util-retry/src/defaultRetryBackoffStrategy.ts new file mode 100644 index 0000000000000..b712276ca1a89 --- /dev/null +++ b/packages/util-retry/src/defaultRetryBackoffStrategy.ts @@ -0,0 +1,20 @@ +import { StandardRetryBackoffStrategy } from "@aws-sdk/types"; + +import { DEFAULT_RETRY_DELAY_BASE, MAXIMUM_RETRY_DELAY } from "./constants"; + +export const getDefaultRetryBackoffStrategy = (): StandardRetryBackoffStrategy => { + let delayBase: number = DEFAULT_RETRY_DELAY_BASE; + + const computeNextBackoffDelay = (attempts: number) => { + return Math.floor(Math.min(MAXIMUM_RETRY_DELAY, Math.random() * 2 ** attempts * delayBase)); + }; + + const setDelayBase = (delay: number) => { + delayBase = delay; + }; + + return { + computeNextBackoffDelay, + setDelayBase, + }; +}; diff --git a/packages/util-retry/src/defaultRetryToken.spec.ts b/packages/util-retry/src/defaultRetryToken.spec.ts new file mode 100644 index 0000000000000..0e53534ab0c68 --- /dev/null +++ b/packages/util-retry/src/defaultRetryToken.spec.ts @@ -0,0 +1,304 @@ +import { RetryErrorInfo, RetryErrorType, SdkError } from "@aws-sdk/types"; + +import { + DEFAULT_RETRY_DELAY_BASE, + INITIAL_RETRY_TOKENS, + MAXIMUM_RETRY_DELAY, + NO_RETRY_INCREMENT, + RETRY_COST, + TIMEOUT_RETRY_COST, +} from "./constants"; +import { getDefaultRetryBackoffStrategy } from "./defaultRetryBackoffStrategy"; +import { getDefaultRetryToken } from "./defaultRetryToken"; + +jest.mock("./defaultRetryBackoffStrategy"); + +describe("defaultRetryToken", () => { + const transientErrorType = "TRANSIENT" as RetryErrorType; + const nonTransientErrorType = "THROTTLING" as RetryErrorType; + + const getDrainedRetryToken = ( + targetCapacity: number, + error: RetryErrorInfo, + initialRetryTokens: number = INITIAL_RETRY_TOKENS + ) => { + const retryToken = getDefaultRetryToken(initialRetryTokens, DEFAULT_RETRY_DELAY_BASE); + let availableCapacity = initialRetryTokens; + while (availableCapacity >= targetCapacity) { + retryToken.retrieveRetryTokens(error); + availableCapacity -= targetCapacity; + } + return retryToken; + }; + const mathDotRandom = Math.random; + const setDelayBase = jest.fn(); + const mockRetryBackoffStrategy = { + computeNextBackoffDelay: (attempts: number) => 100, + setDelayBase, + }; + + beforeEach(() => { + Math.random = jest.fn().mockReturnValue(1); + (getDefaultRetryBackoffStrategy as jest.Mock).mockReturnValue(mockRetryBackoffStrategy); + }); + + afterEach(() => { + Math.random = mathDotRandom; + }); + + describe("custom initial retry tokens", () => { + it("hasRetryTokens returns false if capacity is not available", () => { + const customRetryTokens = 5; + const retryToken = getDefaultRetryToken(customRetryTokens, DEFAULT_RETRY_DELAY_BASE); + expect(retryToken.hasRetryTokens(transientErrorType)).toBe(false); + }); + + it("retrieveRetryToken throws error if retry tokens not available", () => { + const customRetryTokens = 5; + const retryToken = getDefaultRetryToken(customRetryTokens, DEFAULT_RETRY_DELAY_BASE); + expect(() => { + retryToken.retrieveRetryTokens({ errorType: transientErrorType }); + }).toThrowError(new Error("No retry token available")); + }); + }); + + describe("hasRetryTokens", () => { + describe("returns true if capacity is available", () => { + it("when it's transient error", () => { + const retryToken = getDefaultRetryToken(INITIAL_RETRY_TOKENS, DEFAULT_RETRY_DELAY_BASE); + expect(retryToken.hasRetryTokens(transientErrorType)).toBe(true); + }); + + it("when it's not transient error", () => { + const retryToken = getDefaultRetryToken(INITIAL_RETRY_TOKENS, DEFAULT_RETRY_DELAY_BASE); + expect(retryToken.hasRetryTokens(nonTransientErrorType)).toBe(true); + }); + }); + + describe("returns false if capacity is not available", () => { + it("when it's transient error", () => { + const retryToken = getDrainedRetryToken(TIMEOUT_RETRY_COST, { errorType: transientErrorType }); + expect(retryToken.hasRetryTokens(transientErrorType)).toBe(false); + }); + + it("when it's not transient error", () => { + const retryToken = getDrainedRetryToken(RETRY_COST, { errorType: nonTransientErrorType }); + expect(retryToken.hasRetryTokens(nonTransientErrorType)).toBe(false); + }); + }); + }); + + describe("retrieveRetryToken", () => { + describe("returns retry tokens amount if available", () => { + it("when it's transient error", () => { + const retryToken = getDefaultRetryToken(INITIAL_RETRY_TOKENS, DEFAULT_RETRY_DELAY_BASE); + expect(retryToken.retrieveRetryTokens({ errorType: transientErrorType })).toBe(TIMEOUT_RETRY_COST); + expect(retryToken.getLastRetryCost()).toBe(TIMEOUT_RETRY_COST); + }); + + it("when it's not transient error", () => { + const retryToken = getDefaultRetryToken(INITIAL_RETRY_TOKENS, DEFAULT_RETRY_DELAY_BASE); + expect(retryToken.retrieveRetryTokens({ errorType: nonTransientErrorType })).toBe(RETRY_COST); + expect(retryToken.getLastRetryCost()).toBe(RETRY_COST); + }); + }); + + describe("throws error if retry tokens not available", () => { + it("when it's transient error", () => { + const retryToken = getDrainedRetryToken(TIMEOUT_RETRY_COST, { errorType: transientErrorType }); + expect(() => { + retryToken.retrieveRetryTokens({ errorType: transientErrorType }); + }).toThrowError(new Error("No retry token available")); + }); + + it("when it's not transient error", () => { + const retryToken = getDrainedRetryToken(RETRY_COST, { errorType: nonTransientErrorType }); + expect(() => { + retryToken.retrieveRetryTokens({ errorType: nonTransientErrorType }); + }).toThrowError(new Error("No retry token available")); + }); + }); + }); + + describe("getLastRetryCost", () => { + it("is undefined before an error is encountered", () => { + const retryToken = getDefaultRetryToken(INITIAL_RETRY_TOKENS, DEFAULT_RETRY_DELAY_BASE); + expect(retryToken.getLastRetryCost()).toBeUndefined(); + }); + + it("is updated with successive errors", () => { + const retryToken = getDefaultRetryToken(INITIAL_RETRY_TOKENS, DEFAULT_RETRY_DELAY_BASE); + retryToken.retrieveRetryTokens({ errorType: transientErrorType }); + expect(retryToken.getLastRetryCost()).toBe(TIMEOUT_RETRY_COST); + retryToken.retrieveRetryTokens({ errorType: nonTransientErrorType }); + expect(retryToken.getLastRetryCost()).toBe(RETRY_COST); + }); + }); + + describe("getRetryCount", () => { + it("returns 0 when count is not set", () => { + const retryToken = getDefaultRetryToken(INITIAL_RETRY_TOKENS, DEFAULT_RETRY_DELAY_BASE); + expect(retryToken.getRetryCount()).toBe(0); + }); + + it("returns amount set when token is created", () => { + const retryCount = 3; + const retryToken = getDefaultRetryToken(INITIAL_RETRY_TOKENS, DEFAULT_RETRY_DELAY_BASE, retryCount); + expect(retryToken.getRetryCount()).toBe(retryCount); + }); + + it("increments when retries occur", () => { + const retryToken = getDefaultRetryToken(INITIAL_RETRY_TOKENS, DEFAULT_RETRY_DELAY_BASE, 1); + expect(retryToken.getRetryCount()).toBe(1); + retryToken.retrieveRetryTokens({ errorType: transientErrorType }); + expect(retryToken.getRetryCount()).toBe(2); + retryToken.retrieveRetryTokens({ errorType: nonTransientErrorType }); + expect(retryToken.getRetryCount()).toBe(3); + }); + }); + + describe("getRetryDelay", () => { + it("returns initial delay", () => { + const retryToken = getDefaultRetryToken(INITIAL_RETRY_TOKENS, DEFAULT_RETRY_DELAY_BASE); + expect(retryToken.getRetryDelay()).toBe(DEFAULT_RETRY_DELAY_BASE); + }); + + describe("retry delay increases exponentially with attempt number for non-throttling error", () => { + const computeNextBackoffDelay = jest + .fn() + .mockReturnValueOnce(100) + .mockReturnValueOnce(200) + .mockReturnValueOnce(400) + .mockReturnValueOnce(800); + const mockRetryBackoffStrategy = { + computeNextBackoffDelay, + setDelayBase, + }; + (getDefaultRetryBackoffStrategy as jest.Mock).mockReturnValue(mockRetryBackoffStrategy); + const retryToken = getDefaultRetryToken(INITIAL_RETRY_TOKENS, DEFAULT_RETRY_DELAY_BASE); + [0, 1, 2, 3].forEach((attempts) => { + const mockDelayBase = 100; + const expectedDelay = Math.floor(2 ** attempts * mockDelayBase); + it(`(${mockDelayBase}, ${attempts}) returns ${expectedDelay}`, () => { + retryToken.retrieveRetryTokens({ errorType: transientErrorType }); + expect(retryToken.getRetryDelay()).toBe(expectedDelay); + expect(computeNextBackoffDelay).toHaveBeenCalledTimes(attempts + 1); + }); + }); + }); + + describe("retry delay increases exponentially with attempt number for throttling error", () => { + const computeNextBackoffDelay = jest + .fn() + .mockReturnValueOnce(500) + .mockReturnValueOnce(1000) + .mockReturnValueOnce(2000) + .mockReturnValueOnce(4000); + const mockRetryBackoffStrategy = { + computeNextBackoffDelay, + setDelayBase, + }; + (getDefaultRetryBackoffStrategy as jest.Mock).mockReturnValue(mockRetryBackoffStrategy); + const retryToken = getDefaultRetryToken(INITIAL_RETRY_TOKENS, DEFAULT_RETRY_DELAY_BASE); + [0, 1, 2, 3].forEach((attempts) => { + const mockDelayBase = 500; + const expectedDelay = Math.floor(2 ** attempts * mockDelayBase); + it(`(${mockDelayBase}, ${attempts}) returns ${expectedDelay}`, () => { + retryToken.retrieveRetryTokens({ errorType: nonTransientErrorType }); + expect(retryToken.getRetryDelay()).toBe(expectedDelay); + expect(computeNextBackoffDelay).toHaveBeenCalledTimes(attempts + 1); + }); + }); + }); + + describe(`caps retry delay at ${MAXIMUM_RETRY_DELAY / 1000} seconds`, () => { + it("when value exceeded because of high delayBase", () => { + const retryToken = getDefaultRetryToken(INITIAL_RETRY_TOKENS, DEFAULT_RETRY_DELAY_BASE * 1000); + expect(retryToken.getRetryDelay()).toBe(MAXIMUM_RETRY_DELAY); + }); + + it("when value exceeded because of high attempts number", () => { + const computeNextBackoffDelay = jest.fn().mockReturnValue(MAXIMUM_RETRY_DELAY); + const setDelayBase = jest.fn(); + const mockRetryBackoffStrategy = { + computeNextBackoffDelay, + setDelayBase, + }; + (getDefaultRetryBackoffStrategy as jest.Mock).mockReturnValue(mockRetryBackoffStrategy); + const largeAttemptsNumber = Math.ceil(Math.log2(MAXIMUM_RETRY_DELAY)); + const retryToken = getDefaultRetryToken( + INITIAL_RETRY_TOKENS * largeAttemptsNumber, + DEFAULT_RETRY_DELAY_BASE, + largeAttemptsNumber + ); + retryToken.retrieveRetryTokens({ errorType: transientErrorType }); + expect(retryToken.getRetryDelay()).toBe(MAXIMUM_RETRY_DELAY); + }); + }); + + it("uses retry-after hint", () => { + const retryToken = getDefaultRetryToken(INITIAL_RETRY_TOKENS, DEFAULT_RETRY_DELAY_BASE); + // 5 minutes, greater than maximum allowed for normal retry. + const expectedDelay = 5 * 60 * 1000; + const retryAfterHint = new Date(Date.now() + expectedDelay); + const errorInfo: RetryErrorInfo = { + errorType: "TRANSIENT", + retryAfterHint, + }; + retryToken.retrieveRetryTokens(errorInfo); + // Subtract small offset on expectedDelay to account for delta to when + // Date.now() is invoked. + expect(retryToken.getRetryDelay()).toBeGreaterThan(expectedDelay - 50); + }); + }); + + describe("releaseRetryToken", () => { + it("adds capacityReleaseAmount if passed", () => { + const { errorType } = { errorType: nonTransientErrorType }; + const retryToken = getDrainedRetryToken(RETRY_COST, { errorType: nonTransientErrorType }); + + // Ensure that retry tokens are not available. + expect(retryToken.hasRetryTokens(errorType)).toBe(false); + + // Release RETRY_COST tokens. + retryToken.releaseRetryTokens(RETRY_COST); + expect(retryToken.hasRetryTokens(errorType)).toBe(true); + expect(retryToken.retrieveRetryTokens({ errorType: nonTransientErrorType })).toBe(RETRY_COST); + expect(retryToken.hasRetryTokens(errorType)).toBe(false); + }); + + it("adds NO_RETRY_INCREMENT if capacityReleaseAmount not passed", () => { + const { errorType } = { errorType: nonTransientErrorType }; + const retryToken = getDrainedRetryToken(RETRY_COST, { errorType: nonTransientErrorType }); + + // retry tokens will not be available till NO_RETRY_INCREMENT is added + // till it's equal to RETRY_COST - (INITIAL_RETRY_TOKENS % RETRY_COST) + let tokensReleased = 0; + const tokensToBeReleased = RETRY_COST - (INITIAL_RETRY_TOKENS % RETRY_COST); + while (tokensReleased < tokensToBeReleased) { + expect(retryToken.hasRetryTokens(errorType)).toBe(false); + retryToken.releaseRetryTokens(); + tokensReleased += NO_RETRY_INCREMENT; + } + expect(retryToken.hasRetryTokens(errorType)).toBe(true); + }); + + it("ensures availableCapacity is maxed at INITIAL_RETRY_TOKENS", () => { + const retryToken = getDefaultRetryToken(INITIAL_RETRY_TOKENS, DEFAULT_RETRY_DELAY_BASE); + const { errorType } = { errorType: nonTransientErrorType }; + + // release 100 tokens. + [...Array(100).keys()].forEach(() => { + retryToken.releaseRetryTokens(); + }); + + // availableCapacity is still maxed at INITIAL_RETRY_TOKENS + // hasRetryTokens would be true only till INITIAL_RETRY_TOKENS/RETRY_COST times + [...Array(Math.floor(INITIAL_RETRY_TOKENS / RETRY_COST)).keys()].forEach(() => { + expect(retryToken.hasRetryTokens(errorType)).toBe(true); + retryToken.retrieveRetryTokens({ errorType: nonTransientErrorType }); + }); + expect(retryToken.hasRetryTokens(errorType)).toBe(false); + }); + }); +}); diff --git a/packages/util-retry/src/defaultRetryToken.ts b/packages/util-retry/src/defaultRetryToken.ts new file mode 100644 index 0000000000000..952f8233311d7 --- /dev/null +++ b/packages/util-retry/src/defaultRetryToken.ts @@ -0,0 +1,91 @@ +import { RetryErrorInfo, RetryErrorType, StandardRetryBackoffStrategy, StandardRetryToken } from "@aws-sdk/types"; + +import { + DEFAULT_RETRY_DELAY_BASE, + MAXIMUM_RETRY_DELAY, + NO_RETRY_INCREMENT, + RETRY_COST, + THROTTLING_RETRY_DELAY_BASE, + TIMEOUT_RETRY_COST, +} from "./constants"; +import { getDefaultRetryBackoffStrategy } from "./defaultRetryBackoffStrategy"; + +export interface DefaultRetryTokenOptions { + /** + * The total amount of retry tokens to be decremented from retry token balance. + */ + retryCost?: number; + + /** + * The total amount of retry tokens to be decremented from retry token balance + * when a throttling error is encountered. + */ + timeoutRetryCost?: number; + + /** + * + */ + retryBackoffStrategy?: StandardRetryBackoffStrategy; +} + +export const getDefaultRetryToken = ( + initialRetryTokens: number, + initialRetryDelay: number, + initialRetryCount?: number, + options?: DefaultRetryTokenOptions +): StandardRetryToken => { + const MAX_CAPACITY = initialRetryTokens; + const retryCost = options?.retryCost ?? RETRY_COST; + const timeoutRetryCost = options?.timeoutRetryCost ?? TIMEOUT_RETRY_COST; + const retryBackoffStrategy = options?.retryBackoffStrategy ?? getDefaultRetryBackoffStrategy(); + + let availableCapacity = initialRetryTokens; + let retryDelay = Math.min(MAXIMUM_RETRY_DELAY, initialRetryDelay); + let lastRetryCost: number | undefined = undefined; + let retryCount = initialRetryCount ?? 0; + + const getCapacityAmount = (errorType: RetryErrorType) => (errorType === "TRANSIENT" ? timeoutRetryCost : retryCost); + + const getRetryCount = (): number => retryCount; + + const getRetryDelay = (): number => retryDelay; + + const getLastRetryCost = (): number | undefined => lastRetryCost; + + const hasRetryTokens = (errorType: RetryErrorType): boolean => getCapacityAmount(errorType) <= availableCapacity; + + const retrieveRetryTokens = (errorInfo: RetryErrorInfo) => { + const errorType = errorInfo.errorType; + if (!hasRetryTokens(errorType)) { + throw new Error("No retry token available"); + } + const capacityAmount = getCapacityAmount(errorType); + const delayBase = errorType === "THROTTLING" ? THROTTLING_RETRY_DELAY_BASE : DEFAULT_RETRY_DELAY_BASE; + retryBackoffStrategy.setDelayBase(delayBase); + const delayFromErrorType = retryBackoffStrategy.computeNextBackoffDelay(retryCount); + if (errorInfo.retryAfterHint) { + const delayFromRetryAfterHint = errorInfo.retryAfterHint.getTime() - Date.now(); + retryDelay = Math.max(delayFromRetryAfterHint || 0, delayFromErrorType); + } else { + retryDelay = delayFromErrorType; + } + retryCount++; + lastRetryCost = capacityAmount; + availableCapacity -= capacityAmount; + return capacityAmount; + }; + + const releaseRetryTokens = (releaseAmount?: number) => { + availableCapacity += releaseAmount ?? NO_RETRY_INCREMENT; + availableCapacity = Math.min(availableCapacity, MAX_CAPACITY); + }; + + return { + getRetryCount, + getRetryDelay, + getLastRetryCost, + hasRetryTokens, + retrieveRetryTokens, + releaseRetryTokens, + }; +}; diff --git a/packages/util-retry/src/index.ts b/packages/util-retry/src/index.ts new file mode 100644 index 0000000000000..ad2af069770c2 --- /dev/null +++ b/packages/util-retry/src/index.ts @@ -0,0 +1,6 @@ +export * from "./AdaptiveRetryStrategy"; +export * from "./DefaultRateLimiter"; +export * from "./StandardRetryStrategy"; +export * from "./config"; +export * from "./constants"; +export * from "./types"; diff --git a/packages/util-retry/src/types.ts b/packages/util-retry/src/types.ts new file mode 100644 index 0000000000000..e103444fb95d7 --- /dev/null +++ b/packages/util-retry/src/types.ts @@ -0,0 +1,17 @@ +export interface RateLimiter { + /** + * If there is sufficient capacity (tokens) available, it immediately returns. + * If there is not sufficient capacity, it will either sleep a certain amount + * of time until the rate limiter can retrieve a token from its token bucket + * or raise an exception indicating there is insufficient capacity. + */ + getSendToken: () => Promise; + + /** + * Updates the client sending rate based on response. + * If the response was successful, the capacity and fill rate are increased. + * If the response was a throttling response, the capacity and fill rate are + * decreased. Transient errors do not affect the rate limiter. + */ + updateClientSendingRate: (response: any) => void; +} diff --git a/packages/util-retry/tsconfig.cjs.json b/packages/util-retry/tsconfig.cjs.json new file mode 100644 index 0000000000000..96198be81644a --- /dev/null +++ b/packages/util-retry/tsconfig.cjs.json @@ -0,0 +1,9 @@ +{ + "compilerOptions": { + "baseUrl": ".", + "outDir": "dist-cjs", + "rootDir": "src" + }, + "extends": "../../tsconfig.cjs.json", + "include": ["src/"] +} diff --git a/packages/util-retry/tsconfig.es.json b/packages/util-retry/tsconfig.es.json new file mode 100644 index 0000000000000..7f162b266e26c --- /dev/null +++ b/packages/util-retry/tsconfig.es.json @@ -0,0 +1,10 @@ +{ + "compilerOptions": { + "baseUrl": ".", + "lib": [], + "outDir": "dist-es", + "rootDir": "src" + }, + "extends": "../../tsconfig.es.json", + "include": ["src/"] +} diff --git a/packages/util-retry/tsconfig.types.json b/packages/util-retry/tsconfig.types.json new file mode 100644 index 0000000000000..6cdf9f52ea065 --- /dev/null +++ b/packages/util-retry/tsconfig.types.json @@ -0,0 +1,9 @@ +{ + "compilerOptions": { + "baseUrl": ".", + "declarationDir": "dist-types", + "rootDir": "src" + }, + "extends": "../../tsconfig.types.json", + "include": ["src/"] +}