Skip to content

Commit

Permalink
feat: socket reconnect (#2073)
Browse files Browse the repository at this point in the history
* feat: socket reconnect

Co-Authored-By: Sk Sohab <SKSHOAIBPERSONAL@GMAIL.COM>

* docs

---------

Co-authored-by: Sk Sohab <SKSHOAIBPERSONAL@GMAIL.COM>
  • Loading branch information
jxom and Sk Sohab authored Apr 7, 2024
1 parent e35c48d commit 212eab2
Show file tree
Hide file tree
Showing 9 changed files with 460 additions and 33 deletions.
5 changes: 5 additions & 0 deletions .changeset/good-coins-smoke.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
"viem": patch
---

Added reconnect functionality to `webSocket` & `ipc` transports.
49 changes: 49 additions & 0 deletions site/pages/docs/clients/transports/ipc.md
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,55 @@ const transport = ipc('/tmp/reth.ipc', {
})
```

### reconnect (optional)

- **Type:** `boolean | { maxAttempts?: number, delay?: number }`
- **Default:** `true`

Whether or not to attempt to reconnect on socket failure.

```ts twoslash
import { ipc } from 'viem/node'
// ---cut---
const transport = ipc('/tmp/reth.ipc', {
reconnect: false, // [!code focus]
})
```

#### reconnect.attempts (optional)

- **Type:** `number`
- **Default:** `5`

The max number of times to attempt to reconnect.

```ts twoslash
import { ipc } from 'viem/node'
// ---cut---
const transport = ipc('/tmp/reth.ipc', {
reconnect: {
attempts: 10, // [!code focus]
}
})
```

#### reconnect.delay (optional)

- **Type:** `number`
- **Default:** `2_000`

Retry delay (in ms) between reconnect attempts.

```ts twoslash
import { ipc } from 'viem/node'
// ---cut---
const transport = ipc('/tmp/reth.ipc', {
reconnect: {
delay: 1_000, // [!code focus]
}
})
```

### retryCount (optional)

- **Type:** `number`
Expand Down
49 changes: 49 additions & 0 deletions site/pages/docs/clients/transports/websocket.md
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,55 @@ const transport = webSocket('wss://eth-mainnet.g.alchemy.com/v2/...', {
})
```

### reconnect (optional)

- **Type:** `boolean | { maxAttempts?: number, delay?: number }`
- **Default:** `true`

Whether or not to attempt to reconnect on socket failure.

```ts twoslash
import { webSocket } from 'viem'
// ---cut---
const transport = webSocket('wss://eth-mainnet.g.alchemy.com/v2/...', {
reconnect: false, // [!code focus]
})
```

#### reconnect.attempts (optional)

- **Type:** `number`
- **Default:** `5`

The max number of times to attempt to reconnect.

```ts twoslash
import { webSocket } from 'viem'
// ---cut---
const transport = webSocket('wss://eth-mainnet.g.alchemy.com/v2/...', {
reconnect: {
attempts: 10, // [!code focus]
}
})
```

#### reconnect.delay (optional)

- **Type:** `number`
- **Default:** `2_000`

Retry delay (in ms) between reconnect attempts.

```ts twoslash
import { webSocket } from 'viem'
// ---cut---
const transport = webSocket('wss://eth-mainnet.g.alchemy.com/v2/...', {
reconnect: {
delay: 1_000, // [!code focus]
}
})
```

### retryCount (optional)

- **Type:** `number`
Expand Down
15 changes: 12 additions & 3 deletions src/clients/transports/ipc.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,11 @@ import { type UrlRequiredErrorType } from '../../errors/transport.js'
import type { ErrorType } from '../../errors/utils.js'
import type { Hash } from '../../types/misc.js'
import type { RpcResponse } from '../../types/rpc.js'
import { type IpcRpcClient, getIpcRpcClient } from '../../utils/rpc/ipc.js'
import {
type GetIpcRpcClientOptions,
type IpcRpcClient,
getIpcRpcClient,
} from '../../utils/rpc/ipc.js'
import {
type CreateTransportErrorType,
type Transport,
Expand Down Expand Up @@ -38,6 +42,11 @@ export type IpcTransportConfig = {
key?: TransportConfig['key'] | undefined
/** The name of the Ipc transport. */
name?: TransportConfig['name'] | undefined
/**
* Whether or not to attempt to reconnect on socket failure.
* @default true
*/
reconnect?: GetIpcRpcClientOptions['reconnect'] | undefined
/** The max number of times to retry. */
retryCount?: TransportConfig['retryCount'] | undefined
/** The base delay (in ms) between retries. */
Expand Down Expand Up @@ -66,7 +75,7 @@ export function ipc(
path: string,
config: IpcTransportConfig = {},
): IpcTransport {
const { key = 'ipc', name = 'IPC JSON-RPC', retryDelay } = config
const { key = 'ipc', name = 'IPC JSON-RPC', reconnect, retryDelay } = config
return ({ retryCount: retryCount_, timeout: timeout_ }) => {
const retryCount = config.retryCount ?? retryCount_
const timeout = timeout_ ?? config.timeout ?? 10_000
Expand All @@ -76,7 +85,7 @@ export function ipc(
name,
async request({ method, params }) {
const body = { method, params }
const rpcClient = await getIpcRpcClient(path)
const rpcClient = await getIpcRpcClient(path, { reconnect })
const { error, result } = await rpcClient.requestAsync({
body,
timeout,
Expand Down
19 changes: 16 additions & 3 deletions src/clients/transports/webSocket.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,10 @@ import type { Hash } from '../../types/misc.js'
import type { RpcResponse } from '../../types/rpc.js'
import { getSocket } from '../../utils/rpc/compat.js'
import type { SocketRpcClient } from '../../utils/rpc/socket.js'
import { getWebSocketRpcClient } from '../../utils/rpc/webSocket.js'
import {
type GetWebSocketRpcClientOptions,
getWebSocketRpcClient,
} from '../../utils/rpc/webSocket.js'
import {
type CreateTransportErrorType,
type Transport,
Expand Down Expand Up @@ -43,6 +46,11 @@ export type WebSocketTransportConfig = {
key?: TransportConfig['key'] | undefined
/** The name of the WebSocket transport. */
name?: TransportConfig['name'] | undefined
/**
* Whether or not to attempt to reconnect on socket failure.
* @default true
*/
reconnect?: GetWebSocketRpcClientOptions['reconnect'] | undefined
/** The max number of times to retry. */
retryCount?: TransportConfig['retryCount'] | undefined
/** The base delay (in ms) between retries. */
Expand Down Expand Up @@ -76,7 +84,12 @@ export function webSocket(
url?: string,
config: WebSocketTransportConfig = {},
): WebSocketTransport {
const { key = 'webSocket', name = 'WebSocket JSON-RPC', retryDelay } = config
const {
key = 'webSocket',
name = 'WebSocket JSON-RPC',
reconnect,
retryDelay,
} = config
return ({ chain, retryCount: retryCount_, timeout: timeout_ }) => {
const retryCount = config.retryCount ?? retryCount_
const timeout = timeout_ ?? config.timeout ?? 10_000
Expand All @@ -88,7 +101,7 @@ export function webSocket(
name,
async request({ method, params }) {
const body = { method, params }
const rpcClient = await getWebSocketRpcClient(url_)
const rpcClient = await getWebSocketRpcClient(url_, { reconnect })
const { error, result } = await rpcClient.requestAsync({
body,
timeout,
Expand Down
20 changes: 18 additions & 2 deletions src/utils/rpc/ipc.ts
Original file line number Diff line number Diff line change
@@ -1,11 +1,17 @@
import { type Socket as NetSocket, connect } from 'node:net'
import { WebSocketRequestError } from '../../index.js'
import {
type GetSocketRpcClientParameters,
type Socket,
type SocketRpcClient,
getSocketRpcClient,
} from './socket.js'

export type GetIpcRpcClientOptions = Pick<
GetSocketRpcClientParameters,
'reconnect'
>

const openingBrace = '{'.charCodeAt(0)
const closingBrace = '}'.charCodeAt(0)

Expand Down Expand Up @@ -33,14 +39,21 @@ export function extractMessages(buffer: Buffer): [Buffer[], Buffer] {

export type IpcRpcClient = SocketRpcClient<NetSocket>

export async function getIpcRpcClient(path: string): Promise<IpcRpcClient> {
export async function getIpcRpcClient(
path: string,
options: GetIpcRpcClientOptions = {},
): Promise<IpcRpcClient> {
const { reconnect } = options

return getSocketRpcClient({
async getSocket({ onResponse }) {
async getSocket({ onError, onOpen, onResponse }) {
const socket = connect(path)

function onClose() {
socket.off('close', onClose)
socket.off('message', onData)
socket.off('error', onError)
socket.off('connect', onOpen)
}

let lastRemaining = Buffer.alloc(0)
Expand All @@ -57,6 +70,8 @@ export async function getIpcRpcClient(path: string): Promise<IpcRpcClient> {

socket.on('close', onClose)
socket.on('data', onData)
socket.on('error', onError)
socket.on('connect', onOpen)

// Wait for the socket to open.
await new Promise<void>((resolve, reject) => {
Expand Down Expand Up @@ -84,6 +99,7 @@ export async function getIpcRpcClient(path: string): Promise<IpcRpcClient> {
},
} as Socket<{}>)
},
reconnect,
url: path,
})
}
Loading

0 comments on commit 212eab2

Please sign in to comment.