From b30017263350f21cfdec2e190d9c5b528ad38ed2 Mon Sep 17 00:00:00 2001 From: Andres V Date: Mon, 10 Jun 2024 17:22:30 -0700 Subject: [PATCH] Add RetryPolicy Removes IRetry This change adds the RetryPolicy type which represents the retry policy to use as an union type of the different supported retry policies. This is an initial change that only replaces RetryPolicy with IRetry and keeps the same current behavior. However, this change will enable us to store the retry policy with the DurablePromise to be used in the recovery path. --- lib/async.ts | 10 ++-- lib/core/execution.ts | 3 +- lib/core/invocation.ts | 4 ++ lib/core/options.ts | 8 +-- lib/core/retries/retry.ts | 50 ---------------- lib/core/retry.ts | 119 +++++++++++++++++++++++++++++++++----- lib/index.ts | 5 +- lib/resonate.ts | 7 +-- test/async.test.ts | 6 +- test/combinators.test.ts | 4 +- test/generator.test.ts | 6 +- test/options.test.ts | 6 +- 12 files changed, 135 insertions(+), 93 deletions(-) delete mode 100644 lib/core/retries/retry.ts diff --git a/lib/async.ts b/lib/async.ts index 3432660c..cf81f6ba 100644 --- a/lib/async.ts +++ b/lib/async.ts @@ -3,7 +3,7 @@ import { ResonatePromise } from "./core/future"; import { Invocation } from "./core/invocation"; import { ResonateOptions, Options, PartialOptions } from "./core/options"; import { DurablePromise } from "./core/promises/promises"; -import { Retry } from "./core/retries/retry"; +import * as retryPolicy from "./core/retry"; import * as schedules from "./core/schedules/schedules"; import { ResonateBase } from "./resonate"; @@ -269,7 +269,7 @@ export class Context { // prettier-ignore return this.run(() => Promise.all(values), this.options({ - retry: Retry.never(), + retry: retryPolicy.never(), ...opts, })); } @@ -294,7 +294,7 @@ export class Context { // prettier-ignore return this.run(() => Promise.any(values), this.options({ - retry: Retry.never(), + retry: retryPolicy.never(), ...opts, })); } @@ -319,7 +319,7 @@ export class Context { // prettier-ignore return this.run(() => Promise.race(values), this.options({ - retry: Retry.never(), + retry: retryPolicy.never(), ...opts, })); } @@ -347,7 +347,7 @@ export class Context { // prettier-ignore return this.run(() => Promise.allSettled(values), this.options({ - retry: Retry.never(), + retry: retryPolicy.never(), ...opts, })); } diff --git a/lib/core/execution.ts b/lib/core/execution.ts index ec3a53f9..3389573c 100644 --- a/lib/core/execution.ts +++ b/lib/core/execution.ts @@ -3,6 +3,7 @@ import { ErrorCodes, ResonateError } from "./errors"; import { Future, ResonatePromise } from "./future"; import { Invocation } from "./invocation"; import { DurablePromise } from "./promises/promises"; +import { retryIterator } from "./retry"; ///////////////////////////////////////////////////////////////////// // Execution @@ -186,7 +187,7 @@ export class OrdinaryExecution extends Execution { let error; // invoke the function according to the retry policy - for (const delay of this.invocation.opts.retry.iterator(this.invocation)) { + for (const delay of retryIterator(this.invocation)) { await new Promise((resolve) => setTimeout(resolve, delay)); try { diff --git a/lib/core/invocation.ts b/lib/core/invocation.ts index 092f23ec..435fc4b0 100644 --- a/lib/core/invocation.ts +++ b/lib/core/invocation.ts @@ -1,5 +1,6 @@ import { Future } from "./future"; import { Options, PartialOptions, isOptions } from "./options"; +import { RetryPolicy, exponential } from "./retry"; ///////////////////////////////////////////////////////////////////// // Invocation @@ -19,6 +20,7 @@ export class Invocation { createdOn: number = Date.now(); counter: number = 0; attempt: number = 0; + retryPolicy: RetryPolicy = exponential(); awaited: Future[] = []; blocked: Future | null = null; @@ -69,6 +71,8 @@ export class Invocation { // - the current time plus the user provided relative time // - the parent timeout this.timeout = Math.min(this.createdOn + this.opts.timeout, this.parent?.timeout ?? Infinity); + + this.retryPolicy = this.opts.retry; } addChild(child: Invocation) { diff --git a/lib/core/options.ts b/lib/core/options.ts index f236f4ff..d1689f9f 100644 --- a/lib/core/options.ts +++ b/lib/core/options.ts @@ -1,6 +1,6 @@ import { IEncoder } from "./encoder"; import { ILogger } from "./logger"; -import { IRetry } from "./retry"; +import { RetryPolicy } from "./retry"; import { IStore } from "./store"; /** @@ -43,9 +43,9 @@ export type ResonateOptions = { logger: ILogger; /** - * A retry instance, defaults to exponential backoff. + * A retry policy, defaults to exponential backoff. */ - retry: IRetry; + retry: RetryPolicy; /** * Tags to add to all durable promises. @@ -110,7 +110,7 @@ export type Options = { /** * Overrides the default retry policy. */ - retry: IRetry; + retry: RetryPolicy; /** * Additional tags to add to the durable promise. diff --git a/lib/core/retries/retry.ts b/lib/core/retries/retry.ts deleted file mode 100644 index 21617317..00000000 --- a/lib/core/retries/retry.ts +++ /dev/null @@ -1,50 +0,0 @@ -import { IRetry, IterableRetry } from "../retry"; - -export class Retry extends IterableRetry implements IRetry { - constructor( - private initialDelay: number, - private backoffFactor: number, - private maxAttempts: number, - private maxDelay: number, - ) { - super(); - } - - static exponential( - initialDelay: number = 100, - backoffFactor: number = 2, - maxAttempts: number = Infinity, - maxDelay: number = 60000, // 1 minute - ): Retry { - return new Retry(initialDelay, backoffFactor, maxAttempts, maxDelay); - } - - static linear( - delay: number = 1000, // 1 second - maxAttempts: number = Infinity, - ): Retry { - return new Retry(delay, 1, maxAttempts, delay); - } - - static never(): Retry { - return new Retry(0, 0, 1, 0); - } - - next(ctx: T): { done: boolean; delay?: number } { - // attempt 0: 0ms delay - // attampt n: {initial * factor^(attempt-1)}ms delay (or max delay) - const delay = Math.min( - Math.min(ctx.attempt, 1) * this.initialDelay * Math.pow(this.backoffFactor, ctx.attempt - 1), - this.maxDelay, - ); - - if (Date.now() + delay >= ctx.timeout || ctx.attempt >= this.maxAttempts) { - return { done: true }; - } - - return { - done: false, - delay: delay, - }; - } -} diff --git a/lib/core/retry.ts b/lib/core/retry.ts index acdeeabb..f7d758fd 100644 --- a/lib/core/retry.ts +++ b/lib/core/retry.ts @@ -1,24 +1,111 @@ -export interface IRetry { - next(ctx: T): { done: boolean; delay?: number }; - iterator(ctx: T): IterableIterator; +export type RetryPolicy = Exponential | Linear | Never; + +export type Exponential = { + kind: "exponential"; + initialDelayMs: number; + backoffFactor: number; + maxAttempts: number; + maxDelayMs: number; +}; + +export type Linear = { + kind: "linear"; + delayMs: number; + maxAttempts: number; +}; + +export type Never = { + kind: "never"; +}; + +export function exponential( + initialDelayMs: number = 100, + backoffFactor: number = 2, + maxAttempts: number = Infinity, + maxDelayMs: number = 60000, +): Exponential { + return { + kind: "exponential", + initialDelayMs, + backoffFactor, + maxAttempts, + maxDelayMs, + }; } -export class IterableRetry implements IRetry { - next(ctx: T): { done: boolean; delay?: number } { - throw new Error("Method not implemented"); - } +export function linear(delayMs: number = 1000, maxAttempts: number = Infinity): Linear { + return { + kind: "linear", + delayMs, + maxAttempts, + }; +} - iterator(ctx: T): IterableIterator { - const self = this; // eslint-disable-line @typescript-eslint/no-this-alias +export function never(): Never { + return { kind: "never" }; +} + +export function retryIterator( + ctx: T, +): IterableIterator { + const { initialDelay, backoffFactor, maxAttempts, maxDelay } = retryDefaults(ctx.retryPolicy); + + const __next = (itCtx: { attempt: number; timeout: number }): { done: boolean; delay?: number } => { + // attempt 0: 0ms delay + // attampt n: {initial * factor^(attempt-1)}ms delay (or max delay) + const delay = Math.min( + Math.min(itCtx.attempt, 1) * initialDelay * Math.pow(backoffFactor, itCtx.attempt - 1), + maxDelay, + ); + + if (Date.now() + delay >= itCtx.timeout || itCtx.attempt >= maxAttempts) { + return { done: true }; + } return { - next() { - const { done, delay } = self.next(ctx); - return { done, value: delay || 0 }; - }, - [Symbol.iterator]() { - return this; - }, + done: false, + delay: delay, }; + }; + + return { + next() { + const { done, delay } = __next(ctx); + return { done, value: delay || 0 }; + }, + [Symbol.iterator]() { + return this; + }, + }; +} + +function retryDefaults(retryPolicy: RetryPolicy): { + initialDelay: number; + backoffFactor: number; + maxAttempts: number; + maxDelay: number; +} { + switch (retryPolicy.kind) { + case "exponential": + return { + initialDelay: retryPolicy.initialDelayMs, + backoffFactor: retryPolicy.backoffFactor, + maxAttempts: retryPolicy.maxAttempts, + maxDelay: retryPolicy.maxDelayMs, + }; + case "linear": + return { + initialDelay: retryPolicy.delayMs, + backoffFactor: 1, + maxAttempts: retryPolicy.maxAttempts, + maxDelay: retryPolicy.delayMs, + }; + case "never": + return { + initialDelay: 0, + backoffFactor: 0, + maxAttempts: 1, + maxDelay: 0, + }; } } diff --git a/lib/index.ts b/lib/index.ts index 725e6c6e..8da3049a 100644 --- a/lib/index.ts +++ b/lib/index.ts @@ -23,10 +23,12 @@ export * as promises from "./core/promises/promises"; // schedules export * as schedules from "./core/schedules/schedules"; +// retry policies +export * from "./core/retry"; + // interfaces export * from "./core/encoder"; export * from "./core/logger"; -export * from "./core/retry"; export * from "./core/storage"; export * from "./core/store"; @@ -34,7 +36,6 @@ export * from "./core/store"; export * from "./core/encoders/base64"; export * from "./core/encoders/json"; export * from "./core/loggers/logger"; -export * from "./core/retries/retry"; export * from "./core/storages/memory"; export * from "./core/storages/withTimeout"; export * from "./core/stores/local"; diff --git a/lib/resonate.ts b/lib/resonate.ts index 65a34ed9..723a743b 100644 --- a/lib/resonate.ts +++ b/lib/resonate.ts @@ -5,8 +5,7 @@ import { ILogger } from "./core/logger"; import { Logger } from "./core/loggers/logger"; import { ResonateOptions, Options, PartialOptions, isOptions } from "./core/options"; import * as promises from "./core/promises/promises"; -import { Retry } from "./core/retries/retry"; -import { IRetry } from "./core/retry"; +import * as retryPolicy from "./core/retry"; import * as schedules from "./core/schedules/schedules"; import { IStore } from "./core/store"; import { LocalStore } from "./core/stores/local"; @@ -36,7 +35,7 @@ export abstract class ResonateBase { public readonly encoder: IEncoder; public readonly logger: ILogger; - public readonly retry: IRetry; + public readonly retry: retryPolicy.RetryPolicy; public readonly store: IStore; private interval: NodeJS.Timeout | undefined; @@ -48,7 +47,7 @@ export abstract class ResonateBase { logger = new Logger(), pid = utils.randomId(), poll = 5000, // 5s - retry = Retry.exponential(), + retry = retryPolicy.exponential(), store = undefined, tags = {}, timeout = 10000, // 10s diff --git a/test/async.test.ts b/test/async.test.ts index 87542be6..f0e05add 100644 --- a/test/async.test.ts +++ b/test/async.test.ts @@ -1,6 +1,6 @@ import { describe, test, expect, jest } from "@jest/globals"; import { Resonate, Context } from "../lib/async"; -import { Retry } from "../lib/core/retries/retry"; +import * as retry from "../lib/core/retry"; import * as utils from "../lib/core/utils"; jest.setTimeout(10000); @@ -37,7 +37,7 @@ describe("Functions: async", () => { test("success", async () => { const resonate = new Resonate({ timeout: 1000, - retry: Retry.exponential( + retry: retry.exponential( 100, // initial delay (in ms) 2, // backoff factor Infinity, // max attempts @@ -71,7 +71,7 @@ describe("Functions: async", () => { test("failure", async () => { const resonate = new Resonate({ timeout: 1000, - retry: Retry.linear(0, 3), + retry: retry.linear(0, 3), }); resonate.register("run", run); diff --git a/test/combinators.test.ts b/test/combinators.test.ts index 93c006b8..6ebb1f56 100644 --- a/test/combinators.test.ts +++ b/test/combinators.test.ts @@ -1,6 +1,6 @@ import { describe, test, expect, jest } from "@jest/globals"; import { Resonate, Context } from "../lib/async"; -import { Retry } from "../lib/core/retries/retry"; +import * as retry from "../lib/core/retry"; jest.setTimeout(10000); @@ -14,7 +14,7 @@ async function throwOrReturn(v: any) { describe("Combinators", () => { const resonate = new Resonate({ - retry: Retry.never(), + retry: retry.never(), }); describe("all", () => { diff --git a/test/generator.test.ts b/test/generator.test.ts index 832a5b5a..9dd531d4 100644 --- a/test/generator.test.ts +++ b/test/generator.test.ts @@ -1,5 +1,5 @@ import { describe, test, expect, jest } from "@jest/globals"; -import { Retry } from "../lib/core/retries/retry"; +import * as retry from "../lib/core/retry"; import * as utils from "../lib/core/utils"; import { Resonate, Context } from "../lib/generator"; @@ -53,7 +53,7 @@ describe("Functions: generator", () => { test("success", async () => { const resonate = new Resonate({ timeout: 1000, - retry: Retry.linear(0, 3), + retry: retry.linear(0, 3), }); resonate.register("run", run); @@ -91,7 +91,7 @@ describe("Functions: generator", () => { test("failure", async () => { const resonate = new Resonate({ timeout: 1000, - retry: Retry.linear(0, 3), + retry: retry.linear(0, 3), }); resonate.register("run", run); diff --git a/test/options.test.ts b/test/options.test.ts index 1e81e6d1..ef234803 100644 --- a/test/options.test.ts +++ b/test/options.test.ts @@ -3,7 +3,7 @@ import * as a from "../lib/async"; import { Base64Encoder } from "../lib/core/encoders/base64"; import { JSONEncoder } from "../lib/core/encoders/json"; import { Options } from "../lib/core/options"; -import { Retry } from "../lib/core/retries/retry"; +import * as retry from "../lib/core/retry"; import * as utils from "../lib/core/utils"; import * as g from "../lib/generator"; @@ -36,7 +36,7 @@ describe("Options", () => { const resonateOpts = { encoder: new JSONEncoder(), poll: 1000, - retry: Retry.exponential(), + retry: retry.exponential(), tags: { a: "a", b: "b", c: "c" }, timeout: 1000, }; @@ -48,7 +48,7 @@ describe("Options", () => { idempotencyKey: "idempotencyKey", lock: false, poll: 2000, - retry: Retry.linear(), + retry: retry.linear(), tags: { c: "x", d: "d", e: "e" }, timeout: 2000, version: 2,