Skip to content

Commit

Permalink
feature: Improve re-try logic with exponential backoff strategy (#252)
Browse files Browse the repository at this point in the history
  • Loading branch information
skambalin authored Jun 18, 2024
1 parent 9c15746 commit cd5d373
Show file tree
Hide file tree
Showing 4 changed files with 68 additions and 30 deletions.
59 changes: 46 additions & 13 deletions packages/ddc/src/nodes/BalancedNode.ts
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,22 @@ import {
CnsRecordGetOptions,
} from './NodeInterface';

/**
* The timeouts bettween retries are exponential, starting at `minTimeout` and increasing each time until `maxTimeout`.
*
* The formuka for the timeout between retries is:
*
* ```typescript
* const timeout = Math.min(random * minTimeout * Math.pow(factor, attempt), maxTimeout);
* ```
*/
export type OpperationRetryOptions = Omit<RetryOptions, 'retries'> & {
attempts?: number;
};

export type BalancedNodeConfig = LoggerOptions & {
router: Router;
retries?: number;
retries?: number | OpperationRetryOptions;
};

/**
Expand All @@ -41,12 +54,24 @@ export class BalancedNode implements NodeInterface {

private router: Router;
private logger: Logger;
private retries: number;

private retryOptions: RetryOptions = {
minTimeout: 50, // Starting timeout from which to increase exponentially
factor: 2, // Exponential backoff
retries: RETRY_MAX_ATTEPTS,
};

constructor({ router, ...config }: BalancedNodeConfig) {
this.router = router;
this.logger = createLogger('BalancedNode', config);
this.retries = config.retries ?? RETRY_MAX_ATTEPTS;

if (typeof config.retries === 'number') {
this.retryOptions.retries = config.retries;
} else if (config.retries) {
const { attempts = RETRY_MAX_ATTEPTS, ...retryOptions } = config.retries;

this.retryOptions = { ...this.retryOptions, ...retryOptions, retries: attempts };
}
}

/**
Expand All @@ -61,21 +86,31 @@ export class BalancedNode implements NodeInterface {
bucketId: BucketId,
operation: RouterOperation,
body: (node: NodeInterface, bail: (e: Error) => void, attempt: number) => Promise<T>,
options: RetryOptions = {},
) {
let lastError: RpcError | undefined;
const exclude: string[] = [];
const exclude: NodeInterface[] = [];

return retry(
async (bail, attempt) => {
let node: NodeInterface;
let node: NodeInterface | undefined;

try {
node = await this.router.getNode(operation, bucketId, exclude);
node = await this.router.getNode(
operation,
bucketId,
exclude.map((node) => node.nodeId),
);

exclude.push(node.nodeId);
exclude.unshift(node);
} catch (error) {
return bail(lastError || (error as Error));
/**
* In case we fail to get a node, we retry with previous nodes that failed until the max attempts.
*/
node = exclude.pop() || node;
}

if (!node) {
throw lastError ?? new Error('No nodes available to handle the operation');
}

try {
Expand All @@ -94,11 +129,9 @@ export class BalancedNode implements NodeInterface {
}
},
{
minTimeout: 0,
retries: this.retries,
...options,
...this.retryOptions,
onRetry: (err, attempt) => {
options.onRetry?.(err, attempt);
this.retryOptions.onRetry?.(err, attempt);

this.logger.warn({ err, attempt }, 'Retrying operation');
},
Expand Down
8 changes: 6 additions & 2 deletions packages/ddc/src/routing/NodeTypeStrategy.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,10 @@ import { BaseStrategy } from './BaseStrategy.web';
type ModePriorityMap = Partial<Record<Mode, number>>;
type OperationPriorityMap = Record<Operation, ModePriorityMap>;

/**
* The `priorityMap` defines the priority of the operation for each mode.
* The lower the number, the higher the priority.
*/
const priorityMap: OperationPriorityMap = {
[Operation.READ_DAG_NODE]: {
[Mode.Full]: 1,
Expand Down Expand Up @@ -55,8 +59,8 @@ export abstract class NodeTypeStrategy extends BaseStrategy {
const operationNodes = nodes.filter(({ mode }) => opertaionPriorityMap[mode] !== undefined);

return operationNodes.sort((a, b) => {
const aPriority = a.priority || opertaionPriorityMap[a.mode] || 0;
const bPriority = a.priority || opertaionPriorityMap[b.mode] || 0;
const aPriority = a.priority ?? opertaionPriorityMap[a.mode] ?? 0;
const bPriority = b.priority ?? opertaionPriorityMap[b.mode] ?? 0;

return aPriority - bPriority;
});
Expand Down
26 changes: 14 additions & 12 deletions packages/ddc/src/routing/PingStrategy.ts
Original file line number Diff line number Diff line change
Expand Up @@ -77,12 +77,16 @@ export abstract class PingStrategy extends NodeTypeStrategy {
}

async marshalNodes(operation: RouterOperation, allNodes: RouterNode[]): Promise<RouterNode[]> {
const allNodesShuffled = shuffle([...allNodes]);
const pingedNodes = await super.marshalNodes(operation, this.getPingedNodes());
const allOperationNodes = await super.marshalNodes(operation, allNodes);
const allOperationNodes = await super.marshalNodes(operation, allNodesShuffled);
const notPingedNodes = allOperationNodes.filter((node) => !this.nodesMap.has(node.httpUrl));
const toPingSync = notPingedNodes.splice(0, Math.max(0, PING_THRESHOLD - pingedNodes.length));
const toPingAsync = notPingedNodes.splice(0, PING_THRESHOLD_INC);
const syncPings = [...this.getPingedNodes(), ...toPingSync].map((node) => this.enqueuePing(node));
const toPingSync = notPingedNodes.slice(0, Math.max(0, PING_THRESHOLD - pingedNodes.length));
const toPingAsync = notPingedNodes.slice(0, PING_THRESHOLD_INC);
const syncPings = [
...pingedNodes, // Include already pinged nodes to make sure they are settled
...toPingSync,
].map((node) => this.enqueuePing(node));

/**
* Wait for all sync pings to complete
Expand All @@ -95,22 +99,20 @@ export abstract class PingStrategy extends NodeTypeStrategy {
setTimeout(() => toPingAsync.forEach((node) => this.enqueuePing(node)), PING_BACKGROUND_DELAY);

/**
* Shuffle nodes and sort by latency levels
* Sort opperation nodes by latency
*/
const latencySortedNodes = shuffle(allOperationNodes).sort((a, b) => {
const pingA = this.nodesMap.get(a.httpUrl);
const pingB = this.nodesMap.get(b.httpUrl);
return allOperationNodes.sort((a, b) => {
const latencyA = this.nodesMap.get(a.httpUrl)?.latency;
const latencyB = this.nodesMap.get(b.httpUrl)?.latency;

/**
* Group latency by PING_LATENCY_GROUP ms levels to avaoid sorting by small differences.
* Keep nodes without ping at the end for fallback scenarios when all pings fail.
*/
const levelA = pingA ? Math.ceil(pingA.latency! / PING_LATENCY_GROUP) : Infinity;
const levelB = pingB ? Math.ceil(pingB.latency! / PING_LATENCY_GROUP) : Infinity;
const levelA = latencyA ? Math.ceil(latencyA / PING_LATENCY_GROUP) : Infinity;
const levelB = latencyB ? Math.ceil(latencyB / PING_LATENCY_GROUP) : Infinity;

return levelA - levelB;
});

return super.marshalNodes(operation, latencySortedNodes);
}
}
5 changes: 2 additions & 3 deletions packages/file-storage/src/FileStorage.ts
Original file line number Diff line number Diff line change
Expand Up @@ -18,14 +18,13 @@ import {
BalancedNode,
withChunkSize,
streamConsumers,
BalancedNodeConfig,
} from '@cere-ddc-sdk/ddc';

import { File, FileResponse } from './File';
import { DEFAULT_BUFFER_SIZE, MAX_BUFFER_SIZE, MIN_BUFFER_SIZE } from './constants';

type Config = LoggerOptions & {
retries?: number;
};
type Config = LoggerOptions & Pick<BalancedNodeConfig, 'retries'>;

export type FileStorageConfig = Config &
Omit<ConfigPreset, 'blockchain'> & {
Expand Down

0 comments on commit cd5d373

Please sign in to comment.