Skip to content

Commit

Permalink
Add RetryPolicy Removes IRetry
Browse files Browse the repository at this point in the history
This change adds the RetryPolicy type which represents the retry
policy to use as an union type of the different supported retry
policies.
This is an initial change that only replaces RetryPolicy with IRetry
and keeps the same current behavior. However, this change will enable
us to store the retry policy with the DurablePromise to be used in
the recovery path.
  • Loading branch information
avillega committed Jun 11, 2024
1 parent c3f1166 commit b300172
Show file tree
Hide file tree
Showing 12 changed files with 135 additions and 93 deletions.
10 changes: 5 additions & 5 deletions lib/async.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ import { ResonatePromise } from "./core/future";
import { Invocation } from "./core/invocation";
import { ResonateOptions, Options, PartialOptions } from "./core/options";
import { DurablePromise } from "./core/promises/promises";
import { Retry } from "./core/retries/retry";
import * as retryPolicy from "./core/retry";
import * as schedules from "./core/schedules/schedules";
import { ResonateBase } from "./resonate";

Expand Down Expand Up @@ -269,7 +269,7 @@ export class Context {

// prettier-ignore
return this.run(() => Promise.all(values), this.options({
retry: Retry.never(),
retry: retryPolicy.never(),
...opts,
}));
}
Expand All @@ -294,7 +294,7 @@ export class Context {

// prettier-ignore
return this.run(() => Promise.any(values), this.options({
retry: Retry.never(),
retry: retryPolicy.never(),
...opts,
}));
}
Expand All @@ -319,7 +319,7 @@ export class Context {

// prettier-ignore
return this.run(() => Promise.race(values), this.options({
retry: Retry.never(),
retry: retryPolicy.never(),
...opts,
}));
}
Expand Down Expand Up @@ -347,7 +347,7 @@ export class Context {

// prettier-ignore
return this.run(() => Promise.allSettled(values), this.options({
retry: Retry.never(),
retry: retryPolicy.never(),
...opts,
}));
}
Expand Down
3 changes: 2 additions & 1 deletion lib/core/execution.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ import { ErrorCodes, ResonateError } from "./errors";
import { Future, ResonatePromise } from "./future";
import { Invocation } from "./invocation";
import { DurablePromise } from "./promises/promises";
import { retryIterator } from "./retry";

/////////////////////////////////////////////////////////////////////
// Execution
Expand Down Expand Up @@ -186,7 +187,7 @@ export class OrdinaryExecution<T> extends Execution<T> {
let error;

// invoke the function according to the retry policy
for (const delay of this.invocation.opts.retry.iterator(this.invocation)) {
for (const delay of retryIterator(this.invocation)) {
await new Promise((resolve) => setTimeout(resolve, delay));

try {
Expand Down
4 changes: 4 additions & 0 deletions lib/core/invocation.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import { Future } from "./future";
import { Options, PartialOptions, isOptions } from "./options";
import { RetryPolicy, exponential } from "./retry";

/////////////////////////////////////////////////////////////////////
// Invocation
Expand All @@ -19,6 +20,7 @@ export class Invocation<T> {
createdOn: number = Date.now();
counter: number = 0;
attempt: number = 0;
retryPolicy: RetryPolicy = exponential();

awaited: Future<any>[] = [];
blocked: Future<any> | null = null;
Expand Down Expand Up @@ -69,6 +71,8 @@ export class Invocation<T> {
// - the current time plus the user provided relative time
// - the parent timeout
this.timeout = Math.min(this.createdOn + this.opts.timeout, this.parent?.timeout ?? Infinity);

this.retryPolicy = this.opts.retry;
}

addChild(child: Invocation<any>) {
Expand Down
8 changes: 4 additions & 4 deletions lib/core/options.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import { IEncoder } from "./encoder";
import { ILogger } from "./logger";
import { IRetry } from "./retry";
import { RetryPolicy } from "./retry";
import { IStore } from "./store";

/**
Expand Down Expand Up @@ -43,9 +43,9 @@ export type ResonateOptions = {
logger: ILogger;

/**
* A retry instance, defaults to exponential backoff.
* A retry policy, defaults to exponential backoff.
*/
retry: IRetry;
retry: RetryPolicy;

/**
* Tags to add to all durable promises.
Expand Down Expand Up @@ -110,7 +110,7 @@ export type Options = {
/**
* Overrides the default retry policy.
*/
retry: IRetry;
retry: RetryPolicy;

/**
* Additional tags to add to the durable promise.
Expand Down
50 changes: 0 additions & 50 deletions lib/core/retries/retry.ts

This file was deleted.

119 changes: 103 additions & 16 deletions lib/core/retry.ts
Original file line number Diff line number Diff line change
@@ -1,24 +1,111 @@
export interface IRetry {
next<T extends { attempt: number; timeout: number }>(ctx: T): { done: boolean; delay?: number };
iterator<T extends { attempt: number; timeout: number }>(ctx: T): IterableIterator<number>;
export type RetryPolicy = Exponential | Linear | Never;

export type Exponential = {
kind: "exponential";
initialDelayMs: number;
backoffFactor: number;
maxAttempts: number;
maxDelayMs: number;
};

export type Linear = {
kind: "linear";
delayMs: number;
maxAttempts: number;
};

export type Never = {
kind: "never";
};

export function exponential(
initialDelayMs: number = 100,
backoffFactor: number = 2,
maxAttempts: number = Infinity,
maxDelayMs: number = 60000,
): Exponential {
return {
kind: "exponential",
initialDelayMs,
backoffFactor,
maxAttempts,
maxDelayMs,
};
}

export class IterableRetry implements IRetry {
next<T extends { attempt: number; timeout: number }>(ctx: T): { done: boolean; delay?: number } {
throw new Error("Method not implemented");
}
export function linear(delayMs: number = 1000, maxAttempts: number = Infinity): Linear {
return {
kind: "linear",
delayMs,
maxAttempts,
};
}

iterator<T extends { attempt: number; timeout: number }>(ctx: T): IterableIterator<number> {
const self = this; // eslint-disable-line @typescript-eslint/no-this-alias
export function never(): Never {
return { kind: "never" };
}

export function retryIterator<T extends { retryPolicy: RetryPolicy; attempt: number; timeout: number }>(
ctx: T,
): IterableIterator<number> {
const { initialDelay, backoffFactor, maxAttempts, maxDelay } = retryDefaults(ctx.retryPolicy);

const __next = (itCtx: { attempt: number; timeout: number }): { done: boolean; delay?: number } => {
// attempt 0: 0ms delay
// attampt n: {initial * factor^(attempt-1)}ms delay (or max delay)
const delay = Math.min(
Math.min(itCtx.attempt, 1) * initialDelay * Math.pow(backoffFactor, itCtx.attempt - 1),
maxDelay,
);

if (Date.now() + delay >= itCtx.timeout || itCtx.attempt >= maxAttempts) {
return { done: true };
}

return {
next() {
const { done, delay } = self.next(ctx);
return { done, value: delay || 0 };
},
[Symbol.iterator]() {
return this;
},
done: false,
delay: delay,
};
};

return {
next() {
const { done, delay } = __next(ctx);
return { done, value: delay || 0 };
},
[Symbol.iterator]() {
return this;
},
};
}

function retryDefaults(retryPolicy: RetryPolicy): {
initialDelay: number;
backoffFactor: number;
maxAttempts: number;
maxDelay: number;
} {
switch (retryPolicy.kind) {
case "exponential":
return {
initialDelay: retryPolicy.initialDelayMs,
backoffFactor: retryPolicy.backoffFactor,
maxAttempts: retryPolicy.maxAttempts,
maxDelay: retryPolicy.maxDelayMs,
};
case "linear":
return {
initialDelay: retryPolicy.delayMs,
backoffFactor: 1,
maxAttempts: retryPolicy.maxAttempts,
maxDelay: retryPolicy.delayMs,
};
case "never":
return {
initialDelay: 0,
backoffFactor: 0,
maxAttempts: 1,
maxDelay: 0,
};
}
}
5 changes: 3 additions & 2 deletions lib/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -23,18 +23,19 @@ export * as promises from "./core/promises/promises";
// schedules
export * as schedules from "./core/schedules/schedules";

// retry policies
export * from "./core/retry";

// interfaces
export * from "./core/encoder";
export * from "./core/logger";
export * from "./core/retry";
export * from "./core/storage";
export * from "./core/store";

// implementations
export * from "./core/encoders/base64";
export * from "./core/encoders/json";
export * from "./core/loggers/logger";
export * from "./core/retries/retry";
export * from "./core/storages/memory";
export * from "./core/storages/withTimeout";
export * from "./core/stores/local";
Expand Down
7 changes: 3 additions & 4 deletions lib/resonate.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,7 @@ import { ILogger } from "./core/logger";
import { Logger } from "./core/loggers/logger";
import { ResonateOptions, Options, PartialOptions, isOptions } from "./core/options";
import * as promises from "./core/promises/promises";
import { Retry } from "./core/retries/retry";
import { IRetry } from "./core/retry";
import * as retryPolicy from "./core/retry";
import * as schedules from "./core/schedules/schedules";
import { IStore } from "./core/store";
import { LocalStore } from "./core/stores/local";
Expand Down Expand Up @@ -36,7 +35,7 @@ export abstract class ResonateBase {

public readonly encoder: IEncoder<unknown, string | undefined>;
public readonly logger: ILogger;
public readonly retry: IRetry;
public readonly retry: retryPolicy.RetryPolicy;
public readonly store: IStore;

private interval: NodeJS.Timeout | undefined;
Expand All @@ -48,7 +47,7 @@ export abstract class ResonateBase {
logger = new Logger(),
pid = utils.randomId(),
poll = 5000, // 5s
retry = Retry.exponential(),
retry = retryPolicy.exponential(),
store = undefined,
tags = {},
timeout = 10000, // 10s
Expand Down
6 changes: 3 additions & 3 deletions test/async.test.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import { describe, test, expect, jest } from "@jest/globals";
import { Resonate, Context } from "../lib/async";
import { Retry } from "../lib/core/retries/retry";
import * as retry from "../lib/core/retry";
import * as utils from "../lib/core/utils";

jest.setTimeout(10000);
Expand Down Expand Up @@ -37,7 +37,7 @@ describe("Functions: async", () => {
test("success", async () => {
const resonate = new Resonate({
timeout: 1000,
retry: Retry.exponential(
retry: retry.exponential(
100, // initial delay (in ms)
2, // backoff factor
Infinity, // max attempts
Expand Down Expand Up @@ -71,7 +71,7 @@ describe("Functions: async", () => {
test("failure", async () => {
const resonate = new Resonate({
timeout: 1000,
retry: Retry.linear(0, 3),
retry: retry.linear(0, 3),
});

resonate.register("run", run);
Expand Down
4 changes: 2 additions & 2 deletions test/combinators.test.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import { describe, test, expect, jest } from "@jest/globals";
import { Resonate, Context } from "../lib/async";
import { Retry } from "../lib/core/retries/retry";
import * as retry from "../lib/core/retry";

jest.setTimeout(10000);

Expand All @@ -14,7 +14,7 @@ async function throwOrReturn(v: any) {

describe("Combinators", () => {
const resonate = new Resonate({
retry: Retry.never(),
retry: retry.never(),
});

describe("all", () => {
Expand Down
Loading

0 comments on commit b300172

Please sign in to comment.