diff --git a/.changeset/good-coins-smoke.md b/.changeset/good-coins-smoke.md new file mode 100644 index 0000000000..5196725189 --- /dev/null +++ b/.changeset/good-coins-smoke.md @@ -0,0 +1,5 @@ +--- +"viem": patch +--- + +Added reconnect functionality to `webSocket` & `ipc` transports. diff --git a/site/pages/docs/clients/transports/ipc.md b/site/pages/docs/clients/transports/ipc.md index adc4710686..b99b67ff58 100644 --- a/site/pages/docs/clients/transports/ipc.md +++ b/site/pages/docs/clients/transports/ipc.md @@ -65,6 +65,55 @@ const transport = ipc('/tmp/reth.ipc', { }) ``` +### reconnect (optional) + +- **Type:** `boolean | { maxAttempts?: number, delay?: number }` +- **Default:** `true` + +Whether or not to attempt to reconnect on socket failure. + +```ts twoslash +import { ipc } from 'viem/node' +// ---cut--- +const transport = ipc('/tmp/reth.ipc', { + reconnect: false, // [!code focus] +}) +``` + +#### reconnect.attempts (optional) + +- **Type:** `number` +- **Default:** `5` + +The max number of times to attempt to reconnect. + +```ts twoslash +import { ipc } from 'viem/node' +// ---cut--- +const transport = ipc('/tmp/reth.ipc', { + reconnect: { + attempts: 10, // [!code focus] + } +}) +``` + +#### reconnect.delay (optional) + +- **Type:** `number` +- **Default:** `2_000` + +Retry delay (in ms) between reconnect attempts. + +```ts twoslash +import { ipc } from 'viem/node' +// ---cut--- +const transport = ipc('/tmp/reth.ipc', { + reconnect: { + delay: 1_000, // [!code focus] + } +}) +``` + ### retryCount (optional) - **Type:** `number` diff --git a/site/pages/docs/clients/transports/websocket.md b/site/pages/docs/clients/transports/websocket.md index 800d73c7cf..62be1380b3 100644 --- a/site/pages/docs/clients/transports/websocket.md +++ b/site/pages/docs/clients/transports/websocket.md @@ -68,6 +68,55 @@ const transport = webSocket('wss://eth-mainnet.g.alchemy.com/v2/...', { }) ``` +### reconnect (optional) + +- **Type:** `boolean | { maxAttempts?: number, delay?: number }` +- **Default:** `true` + +Whether or not to attempt to reconnect on socket failure. + +```ts twoslash +import { webSocket } from 'viem' +// ---cut--- +const transport = webSocket('wss://eth-mainnet.g.alchemy.com/v2/...', { + reconnect: false, // [!code focus] +}) +``` + +#### reconnect.attempts (optional) + +- **Type:** `number` +- **Default:** `5` + +The max number of times to attempt to reconnect. + +```ts twoslash +import { webSocket } from 'viem' +// ---cut--- +const transport = webSocket('wss://eth-mainnet.g.alchemy.com/v2/...', { + reconnect: { + attempts: 10, // [!code focus] + } +}) +``` + +#### reconnect.delay (optional) + +- **Type:** `number` +- **Default:** `2_000` + +Retry delay (in ms) between reconnect attempts. + +```ts twoslash +import { webSocket } from 'viem' +// ---cut--- +const transport = webSocket('wss://eth-mainnet.g.alchemy.com/v2/...', { + reconnect: { + delay: 1_000, // [!code focus] + } +}) +``` + ### retryCount (optional) - **Type:** `number` diff --git a/src/clients/transports/ipc.ts b/src/clients/transports/ipc.ts index e2157b75f1..a6f46d9376 100644 --- a/src/clients/transports/ipc.ts +++ b/src/clients/transports/ipc.ts @@ -3,7 +3,11 @@ import { type UrlRequiredErrorType } from '../../errors/transport.js' import type { ErrorType } from '../../errors/utils.js' import type { Hash } from '../../types/misc.js' import type { RpcResponse } from '../../types/rpc.js' -import { type IpcRpcClient, getIpcRpcClient } from '../../utils/rpc/ipc.js' +import { + type GetIpcRpcClientOptions, + type IpcRpcClient, + getIpcRpcClient, +} from '../../utils/rpc/ipc.js' import { type CreateTransportErrorType, type Transport, @@ -38,6 +42,11 @@ export type IpcTransportConfig = { key?: TransportConfig['key'] | undefined /** The name of the Ipc transport. */ name?: TransportConfig['name'] | undefined + /** + * Whether or not to attempt to reconnect on socket failure. + * @default true + */ + reconnect?: GetIpcRpcClientOptions['reconnect'] | undefined /** The max number of times to retry. */ retryCount?: TransportConfig['retryCount'] | undefined /** The base delay (in ms) between retries. */ @@ -66,7 +75,7 @@ export function ipc( path: string, config: IpcTransportConfig = {}, ): IpcTransport { - const { key = 'ipc', name = 'IPC JSON-RPC', retryDelay } = config + const { key = 'ipc', name = 'IPC JSON-RPC', reconnect, retryDelay } = config return ({ retryCount: retryCount_, timeout: timeout_ }) => { const retryCount = config.retryCount ?? retryCount_ const timeout = timeout_ ?? config.timeout ?? 10_000 @@ -76,7 +85,7 @@ export function ipc( name, async request({ method, params }) { const body = { method, params } - const rpcClient = await getIpcRpcClient(path) + const rpcClient = await getIpcRpcClient(path, { reconnect }) const { error, result } = await rpcClient.requestAsync({ body, timeout, diff --git a/src/clients/transports/webSocket.ts b/src/clients/transports/webSocket.ts index 78bddc6e11..372a032da0 100644 --- a/src/clients/transports/webSocket.ts +++ b/src/clients/transports/webSocket.ts @@ -8,7 +8,10 @@ import type { Hash } from '../../types/misc.js' import type { RpcResponse } from '../../types/rpc.js' import { getSocket } from '../../utils/rpc/compat.js' import type { SocketRpcClient } from '../../utils/rpc/socket.js' -import { getWebSocketRpcClient } from '../../utils/rpc/webSocket.js' +import { + type GetWebSocketRpcClientOptions, + getWebSocketRpcClient, +} from '../../utils/rpc/webSocket.js' import { type CreateTransportErrorType, type Transport, @@ -43,6 +46,11 @@ export type WebSocketTransportConfig = { key?: TransportConfig['key'] | undefined /** The name of the WebSocket transport. */ name?: TransportConfig['name'] | undefined + /** + * Whether or not to attempt to reconnect on socket failure. + * @default true + */ + reconnect?: GetWebSocketRpcClientOptions['reconnect'] | undefined /** The max number of times to retry. */ retryCount?: TransportConfig['retryCount'] | undefined /** The base delay (in ms) between retries. */ @@ -76,7 +84,12 @@ export function webSocket( url?: string, config: WebSocketTransportConfig = {}, ): WebSocketTransport { - const { key = 'webSocket', name = 'WebSocket JSON-RPC', retryDelay } = config + const { + key = 'webSocket', + name = 'WebSocket JSON-RPC', + reconnect, + retryDelay, + } = config return ({ chain, retryCount: retryCount_, timeout: timeout_ }) => { const retryCount = config.retryCount ?? retryCount_ const timeout = timeout_ ?? config.timeout ?? 10_000 @@ -88,7 +101,7 @@ export function webSocket( name, async request({ method, params }) { const body = { method, params } - const rpcClient = await getWebSocketRpcClient(url_) + const rpcClient = await getWebSocketRpcClient(url_, { reconnect }) const { error, result } = await rpcClient.requestAsync({ body, timeout, diff --git a/src/utils/rpc/ipc.ts b/src/utils/rpc/ipc.ts index 26be3a1bb9..5f8c231c67 100644 --- a/src/utils/rpc/ipc.ts +++ b/src/utils/rpc/ipc.ts @@ -1,11 +1,17 @@ import { type Socket as NetSocket, connect } from 'node:net' import { WebSocketRequestError } from '../../index.js' import { + type GetSocketRpcClientParameters, type Socket, type SocketRpcClient, getSocketRpcClient, } from './socket.js' +export type GetIpcRpcClientOptions = Pick< + GetSocketRpcClientParameters, + 'reconnect' +> + const openingBrace = '{'.charCodeAt(0) const closingBrace = '}'.charCodeAt(0) @@ -33,14 +39,21 @@ export function extractMessages(buffer: Buffer): [Buffer[], Buffer] { export type IpcRpcClient = SocketRpcClient -export async function getIpcRpcClient(path: string): Promise { +export async function getIpcRpcClient( + path: string, + options: GetIpcRpcClientOptions = {}, +): Promise { + const { reconnect } = options + return getSocketRpcClient({ - async getSocket({ onResponse }) { + async getSocket({ onError, onOpen, onResponse }) { const socket = connect(path) function onClose() { socket.off('close', onClose) socket.off('message', onData) + socket.off('error', onError) + socket.off('connect', onOpen) } let lastRemaining = Buffer.alloc(0) @@ -57,6 +70,8 @@ export async function getIpcRpcClient(path: string): Promise { socket.on('close', onClose) socket.on('data', onData) + socket.on('error', onError) + socket.on('connect', onOpen) // Wait for the socket to open. await new Promise((resolve, reject) => { @@ -84,6 +99,7 @@ export async function getIpcRpcClient(path: string): Promise { }, } as Socket<{}>) }, + reconnect, url: path, }) } diff --git a/src/utils/rpc/socket.test.ts b/src/utils/rpc/socket.test.ts index edccdb3ee1..43591e99b5 100644 --- a/src/utils/rpc/socket.test.ts +++ b/src/utils/rpc/socket.test.ts @@ -1,5 +1,6 @@ import { expect, test } from 'vitest' import { localWsUrl } from '../../../test/src/constants.js' +import { wait } from '../wait.js' import { getSocketRpcClient } from './socket.js' test('default', async () => { @@ -130,6 +131,116 @@ test('request', async () => { socketClient.close() }) +test('reconnect', async () => { + let active = true + let count = -1 + const socketClient = await getSocketRpcClient({ + async getSocket({ onError, onOpen, onResponse }) { + count++ + + // reopen on 3rd attempt + if (active || count === 3) { + onOpen() + active = true + } else { + onError(new Error('connection failed.')) + active = false + } + + return { + close() {}, + request({ body }) { + wait(100).then(() => { + if (!active) return + onResponse({ id: body.id ?? 0, jsonrpc: '2.0', result: body }) + + wait(100).then(() => { + if (count === 0) onError(new Error('connection failed.')) + active = false + }) + }) + }, + } + }, + reconnect: { + delay: 200, + attempts: 5, + }, + url: localWsUrl, + }) + + await wait(200) + + expect( + await new Promise((res, rej) => { + socketClient.request({ + body: { method: 'test' }, + onResponse(data) { + res(data) + }, + onError(error) { + rej(error) + }, + }) + }), + ).toMatchInlineSnapshot(` + { + "id": 1, + "jsonrpc": "2.0", + "result": { + "id": 1, + "jsonrpc": "2.0", + "method": "test", + }, + } + `) + + await wait(200) + + await expect( + () => + new Promise((res, rej) => { + socketClient.request({ + body: { method: 'test' }, + onResponse(data) { + res(data) + }, + onError(error) { + rej(error) + }, + }) + }), + ).rejects.toThrowErrorMatchingInlineSnapshot('[Error: connection failed.]') + + await wait(1000) + + expect( + await new Promise((res, rej) => { + socketClient.request({ + body: { method: 'test' }, + onResponse(data) { + res(data) + }, + onError(error) { + rej(error) + }, + }) + }), + ).toMatchInlineSnapshot(` + { + "id": 3, + "jsonrpc": "2.0", + "result": { + "id": 3, + "jsonrpc": "2.0", + "method": "test", + }, + } + `) + + socketClient.close() +}) + test('request (eth_subscribe)', async () => { const socketClient = await getSocketRpcClient({ async getSocket({ onResponse }) { @@ -153,7 +264,7 @@ test('request (eth_subscribe)', async () => { }) expect(response).toMatchInlineSnapshot(` { - "id": 1, + "id": 4, "jsonrpc": "2.0", "result": "0xabc", } @@ -164,6 +275,108 @@ test('request (eth_subscribe)', async () => { socketClient.close() }) +test('reconnect (eth_subscribe)', async () => { + let active = true + let count = -1 + const socketClient = await getSocketRpcClient({ + async getSocket({ onError, onOpen, onResponse }) { + count++ + + // reopen on 3rd attempt + if (active || count === 3) { + onOpen() + active = true + } else { + onError(new Error('connection failed.')) + active = false + } + + return { + close() {}, + request({ body }) { + wait(100).then(() => { + if (!active) return + onResponse({ id: body.id ?? 0, jsonrpc: '2.0', result: '0xabc' }) + + wait(100).then(() => { + if (count === 0) onError(new Error('connection failed.')) + active = false + }) + }) + }, + } + }, + reconnect: { + delay: 200, + attempts: 5, + }, + url: localWsUrl, + }) + + await wait(200) + + expect( + await new Promise((res, rej) => { + socketClient.request({ + body: { method: 'eth_subscribe' }, + onResponse(data) { + res(data) + }, + onError(error) { + rej(error) + }, + }) + }), + ).toMatchInlineSnapshot(` + { + "id": 5, + "jsonrpc": "2.0", + "result": "0xabc", + } + `) + + await wait(200) + + await expect( + () => + new Promise((res, rej) => { + socketClient.request({ + body: { method: 'eth_subscribe' }, + onResponse(data) { + res(data) + }, + onError(error) { + rej(error) + }, + }) + }), + ).rejects.toThrowErrorMatchingInlineSnapshot('[Error: connection failed.]') + + await wait(1000) + + expect( + await new Promise((res, rej) => { + socketClient.request({ + body: { method: 'eth_subscribe' }, + onResponse(data) { + res(data) + }, + onError(error) { + rej(error) + }, + }) + }), + ).toMatchInlineSnapshot(` + { + "id": 7, + "jsonrpc": "2.0", + "result": "0xabc", + } + `) + + socketClient.close() +}) + test('request (eth_unsubscribe)', async () => { const socketClient = await getSocketRpcClient({ async getSocket({ onResponse }) { @@ -187,7 +400,7 @@ test('request (eth_unsubscribe)', async () => { }) expect(response).toMatchInlineSnapshot(` { - "id": 2, + "id": 8, "jsonrpc": "2.0", "result": "0xabc", } @@ -231,7 +444,7 @@ test('request (eth_subscription)', async () => { }) expect(response).toMatchInlineSnapshot(` { - "id": 3, + "id": 9, "jsonrpc": "2.0", "method": "eth_subscription", "params": { @@ -291,10 +504,10 @@ test('requestAsync', async () => { }) expect(response).toMatchInlineSnapshot(` { - "id": 5, + "id": 11, "jsonrpc": "2.0", "result": { - "id": 5, + "id": 11, "jsonrpc": "2.0", "method": "test", }, diff --git a/src/utils/rpc/socket.ts b/src/utils/rpc/socket.ts index b324815ee7..6be00c4d81 100644 --- a/src/utils/rpc/socket.ts +++ b/src/utils/rpc/socket.ts @@ -9,10 +9,15 @@ import { withTimeout } from '../promise/withTimeout.js' import { idCache } from './id.js' type Id = string | number -type CallbackFn = (message: any) => void +type CallbackFn = { + onResponse: (message: any) => void + onError?: ((error?: Error | Event | undefined) => void) | undefined +} type CallbackMap = Map export type GetSocketParameters = { + onError: (error?: Error | Event | undefined) => void + onOpen: () => void onResponse: (data: RpcResponse) => void } @@ -28,7 +33,7 @@ export type SocketRpcClient = { socket: Socket request(params: { body: RpcRequest - onError?: ((error: Error) => void) | undefined + onError?: ((error?: Error | Event | undefined) => void) | undefined onResponse: (message: RpcResponse) => void }): void requestAsync(params: { @@ -40,9 +45,28 @@ export type SocketRpcClient = { url: string } -export type GetSocketRpcClientParameters = { - url: string +export type GetSocketRpcClientParameters = { getSocket(params: GetSocketParameters): Promise> + /** + * Whether or not to attempt to reconnect on socket failure. + * @default true + */ + reconnect?: + | boolean + | { + /** + * The maximum number of reconnection attempts. + * @default 5 + */ + attempts?: number | undefined + /** + * The delay (in ms) between reconnection attempts. + * @default 2_000 + */ + delay?: number | undefined + } + | undefined + url: string } export type GetSocketRpcClientErrorType = @@ -57,13 +81,16 @@ export const socketClientCache = /*#__PURE__*/ new Map< export async function getSocketRpcClient( params: GetSocketRpcClientParameters, ): Promise> { - const { getSocket, url } = params + const { getSocket, reconnect = true, url } = params + const { attempts = 5, delay = 2_000 } = + typeof reconnect === 'object' ? reconnect : {} let socketClient = socketClientCache.get(url) // If the socket already exists, return it. if (socketClient) return socketClient as {} as SocketRpcClient + let reconnectCount = 0 const { schedule } = createBatchScheduler< undefined, [SocketRpcClient] @@ -76,17 +103,46 @@ export async function getSocketRpcClient( // Set up a cache for subscriptions (eth_subscribe). const subscriptions = new Map() + let error: Error | Event | undefined + let socket: Socket // Set up socket implementation. - const socket = await getSocket({ - onResponse(data) { - const isSubscription = data.method === 'eth_subscription' - const id = isSubscription ? data.params.subscription : data.id - const cache = isSubscription ? subscriptions : requests - const callback = cache.get(id) - if (callback) callback(data) - if (!isSubscription) cache.delete(id) - }, - }) + async function setup() { + return getSocket({ + onError(error_) { + error = error_ + + // Notify all requests and subscriptions of the error. + for (const request of requests.values()) request.onError?.(error) + for (const subscription of subscriptions.values()) + subscription.onError?.(error) + + // Clear all requests and subscriptions. + requests.clear() + subscriptions.clear() + + // Attempt to reconnect. + if (reconnect && reconnectCount < attempts) + setTimeout(async () => { + reconnectCount++ + socket = await setup().catch(console.error) + }, delay) + }, + onOpen() { + error = undefined + reconnectCount = 0 + }, + onResponse(data) { + const isSubscription = data.method === 'eth_subscription' + const id = isSubscription ? data.params.subscription : data.id + const cache = isSubscription ? subscriptions : requests + const callback = cache.get(id) + if (callback) callback.onResponse(data) + if (!isSubscription) cache.delete(id) + }, + }) + } + socket = await setup() + error = undefined // Create a new socket instance. socketClient = { @@ -96,6 +152,8 @@ export async function getSocketRpcClient( }, socket, request({ body, onError, onResponse }) { + if (error && onError) onError(error) + const id = body.id ?? idCache.take() const callback = (response: RpcResponse) => { @@ -107,18 +165,19 @@ export async function getSocketRpcClient( body.method === 'eth_subscribe' && typeof response.result === 'string' ) - subscriptions.set(response.result, callback) + subscriptions.set(response.result, { + onResponse: callback, + onError, + }) // If we are unsubscribing from a topic, we want to remove the listener. if (body.method === 'eth_unsubscribe') subscriptions.delete(body.params?.[0]) onResponse(response) - - // TODO: delete request? } - requests.set(id, callback) + requests.set(id, { onResponse: callback, onError }) try { socket.request({ body: { diff --git a/src/utils/rpc/webSocket.ts b/src/utils/rpc/webSocket.ts index 82a729bf95..e172d71192 100644 --- a/src/utils/rpc/webSocket.ts +++ b/src/utils/rpc/webSocket.ts @@ -2,22 +2,33 @@ import type { MessageEvent } from 'isows' import { WebSocketRequestError } from '../../errors/request.js' import { + type GetSocketRpcClientParameters, type Socket, type SocketRpcClient, getSocketRpcClient, } from './socket.js' +export type GetWebSocketRpcClientOptions = Pick< + GetSocketRpcClientParameters, + 'reconnect' +> + export async function getWebSocketRpcClient( url: string, + options: GetWebSocketRpcClientOptions | undefined = {}, ): Promise> { + const { reconnect } = options + return getSocketRpcClient({ - async getSocket({ onResponse }) { + async getSocket({ onError, onOpen, onResponse }) { const WebSocket = await import('isows').then((module) => module.WebSocket) const socket = new WebSocket(url) function onClose() { socket.removeEventListener('close', onClose) socket.removeEventListener('message', onMessage) + socket.removeEventListener('error', onError) + socket.removeEventListener('open', onOpen) } function onMessage({ data }: MessageEvent) { onResponse(JSON.parse(data)) @@ -26,6 +37,8 @@ export async function getWebSocketRpcClient( // Setup event listeners for RPC & subscription responses. socket.addEventListener('close', onClose) socket.addEventListener('message', onMessage) + socket.addEventListener('error', onError) + socket.addEventListener('open', onOpen) // Wait for the socket to open. if (socket.readyState === WebSocket.CONNECTING) { @@ -58,6 +71,7 @@ export async function getWebSocketRpcClient( }, } as Socket) }, + reconnect, url, }) }