From 942de1f0b4868f0f6464b2e0702b621a3373c4ee Mon Sep 17 00:00:00 2001 From: Leibale Eidelman Date: Mon, 22 Aug 2022 17:59:45 -0400 Subject: [PATCH] Handle unhandled errors in `socket.reconnectStrategry` (#2226) * handle errors in reconnect strategy * add test * fix retries typo * fix #2237 - flush queues on reconnect strategy error * Update socket.ts * Update socket.ts --- packages/client/lib/client/socket.spec.ts | 56 ++++++++++++++++------- packages/client/lib/client/socket.ts | 27 ++++++++--- 2 files changed, 59 insertions(+), 24 deletions(-) diff --git a/packages/client/lib/client/socket.spec.ts b/packages/client/lib/client/socket.spec.ts index 54f84eb9fe0..1b2d050c012 100644 --- a/packages/client/lib/client/socket.spec.ts +++ b/packages/client/lib/client/socket.spec.ts @@ -1,41 +1,63 @@ import { strict as assert } from 'assert'; import { SinonFakeTimers, useFakeTimers, spy } from 'sinon'; -import RedisSocket from './socket'; +import RedisSocket, { RedisSocketOptions } from './socket'; describe('Socket', () => { + function createSocket(options: RedisSocketOptions): RedisSocket { + const socket = new RedisSocket( + () => Promise.resolve(), + options + ); + + socket.on('error', (err) => { + // ignore errors + console.log(err); + }); + + return socket; + } + describe('reconnectStrategy', () => { let clock: SinonFakeTimers; beforeEach(() => clock = useFakeTimers()); afterEach(() => clock.restore()); - it('custom strategy', () => { - const reconnectStrategy = spy((retries: number): number | Error => { + it('custom strategy', async () => { + const reconnectStrategy = spy((retries: number) => { assert.equal(retries + 1, reconnectStrategy.callCount); - if (retries === 50) { - return Error('50'); - } + if (retries === 50) return new Error('50'); const time = retries * 2; queueMicrotask(() => clock.tick(time)); return time; }); - const socket = new RedisSocket( - () => Promise.resolve(), - { - host: 'error', - reconnectStrategy - } - ); - - socket.on('error', () => { - // ignore errors + const socket = createSocket({ + host: 'error', + reconnectStrategy }); - return assert.rejects(socket.connect(), { + await assert.rejects(socket.connect(), { message: '50' }); + + assert.equal(socket.isOpen, false); + }); + + it('should handle errors', async () => { + const socket = createSocket({ + host: 'error', + reconnectStrategy(retries: number) { + if (retries === 1) return new Error('done'); + queueMicrotask(() => clock.tick(500)); + throw new Error(); + } + }); + + await assert.rejects(socket.connect()); + + assert.equal(socket.isOpen, false); }); }); }); diff --git a/packages/client/lib/client/socket.ts b/packages/client/lib/client/socket.ts index 5fab5c5601b..2a955159323 100644 --- a/packages/client/lib/client/socket.ts +++ b/packages/client/lib/client/socket.ts @@ -44,10 +44,6 @@ export default class RedisSocket extends EventEmitter { return options; } - static #defaultReconnectStrategy(retries: number): number { - return Math.min(retries * 50, 500); - } - static #isTlsSocket(options: RedisSocketOptions): options is RedisTlsSocketOptions { return (options as RedisTlsSocketOptions).tls === true; } @@ -87,6 +83,23 @@ export default class RedisSocket extends EventEmitter { this.#options = RedisSocket.#initiateOptions(options); } + reconnectStrategy(retries: number): number | Error { + if (this.#options.reconnectStrategy) { + try { + const retryIn = this.#options.reconnectStrategy(retries); + if (typeof retryIn !== 'number' && !(retryIn instanceof Error)) { + throw new TypeError('Reconnect strategy should return `number | Error`'); + } + + return retryIn; + } catch (err) { + this.emit('error', err); + } + } + + return Math.min(retries * 50, 500); + } + async connect(): Promise { if (this.#isOpen) { throw new Error('Socket already opened'); @@ -116,14 +129,14 @@ export default class RedisSocket extends EventEmitter { this.#isReady = true; this.emit('ready'); } catch (err) { - this.emit('error', err); - - const retryIn = (this.#options?.reconnectStrategy ?? RedisSocket.#defaultReconnectStrategy)(retries); + const retryIn = this.reconnectStrategy(retries); if (retryIn instanceof Error) { this.#isOpen = false; + this.emit('error', err); throw new ReconnectStrategyError(retryIn, err); } + this.emit('error', err); await promiseTimeout(retryIn); return this.#connect(retries + 1); }