From fb0c00becc224ffedd28e810cbb314187d21efdb Mon Sep 17 00:00:00 2001 From: Mathias Lundell Date: Fri, 27 Aug 2021 10:34:48 +0200 Subject: [PATCH 1/3] feat: reconnect and cancelAll consumers Provided that they consumed with the channel wrapper function `consume` --- src/ChannelWrapper.ts | 108 ++++++++++++++++++++++- test/ChannelWrapperTest.ts | 175 +++++++++++++++++++++++++++++++++++++ test/fixtures.ts | 6 ++ 3 files changed, 285 insertions(+), 4 deletions(-) diff --git a/src/ChannelWrapper.ts b/src/ChannelWrapper.ts index 6a050f4..2bf4b4c 100644 --- a/src/ChannelWrapper.ts +++ b/src/ChannelWrapper.ts @@ -44,6 +44,14 @@ interface SendToQueueMessage { reject: (err: Error) => void; } +interface Consumer { + consumerTag: string | null; + queue: string; + onMessage: (msg: amqplib.ConsumeMessage) => void; + consumeOptions?: amqplib.Options.Consume; + options?: { prefetch?: number }; +} + type Message = PublishMessage | SendToQueueMessage; const IRRECOVERABLE_ERRORS = [ @@ -87,6 +95,8 @@ export default class ChannelWrapper extends EventEmitter { private _unconfirmedMessages: Message[] = []; /** Reason code during publish or sendtoqueue messages. */ private _irrecoverableCode: number | undefined; + /** Consumers which will be reconnected on channel errors etc. */ + private _consumers: Consumer[] = []; /** * The currently connected channel. Note that not all setup functions @@ -324,6 +334,8 @@ export default class ChannelWrapper extends EventEmitter { // Array of setup functions to call. this._setups = []; + this._consumers = []; + if (options.setup) { this._setups.push(options.setup); } @@ -359,10 +371,13 @@ export default class ChannelWrapper extends EventEmitter { this.emit('error', err, { name: this.name }); }) ) - ).then(() => { - this._settingUp = undefined; - }); - + ) + .then(() => { + return Promise.all(this._consumers.map((c) => this._reconnectConsumer(c))); + }) + .then(() => { + this._settingUp = undefined; + }); await this._settingUp; if (!this._channel) { @@ -581,6 +596,91 @@ export default class ChannelWrapper extends EventEmitter { } } + /** + * Setup a consumer + * This consumer will be reconnected on cancellation and channel errors. + */ + async consume( + queue: string, + onMessage: Consumer['onMessage'], + consumeOptions?: Consumer['consumeOptions'], + options?: Consumer['options'] + ): Promise { + const consumer: Consumer = { + consumerTag: null, + queue, + onMessage, + consumeOptions, + options, + }; + this._consumers.push(consumer); + await this._consume(consumer); + } + + private async _consume(consumer: Consumer): Promise { + if (!this._channel) { + return; + } + + const options = consumer.options; + if (options?.prefetch) { + this._channel.prefetch(options.prefetch, false); + } + + const { consumerTag } = await this._channel.consume( + consumer.queue, + (msg) => { + if (!msg) { + consumer.consumerTag = null; + this._reconnectConsumer(consumer).catch((err) => { + if (err.isOperational && err.message.includes('BasicConsume; 404')) { + // Ignore errors caused by queue not declared. In + // those cases the connection will reconnect and + // then consumers reestablished. The full reconnect + // might be avoided if we assert the queue again + // before starting to consume. + return; + } + throw err; + }); + return; + } + consumer.onMessage(msg); + }, + consumer.consumeOptions + ); + consumer.consumerTag = consumerTag; + } + + private async _reconnectConsumer(consumer: Consumer): Promise { + if (!this._consumers.includes(consumer)) { + // Intentionally canceled + return; + } + await this._consume(consumer); + } + + /** + * Cancel all consumers + */ + async cancelAll(): Promise { + const consumers = this._consumers; + this._consumers = []; + if (!this._channel) { + return; + } + + const channel = this._channel; + await Promise.all( + consumers.reduce((acc, consumer) => { + if (consumer.consumerTag) { + acc.push(channel.cancel(consumer.consumerTag)); + } + return acc; + }, []) + ); + } + /** Send an `ack` to the underlying channel. */ ack(message: amqplib.Message, allUpTo?: boolean): void { this._channel && this._channel.ack(message, allUpTo); diff --git a/test/ChannelWrapperTest.ts b/test/ChannelWrapperTest.ts index 2d7d1ef..9c2bb08 100644 --- a/test/ChannelWrapperTest.ts +++ b/test/ChannelWrapperTest.ts @@ -970,6 +970,181 @@ describe('ChannelWrapper', function () { // Final message should have been published to the underlying queue. expect(queue.length).to.equal(2); }); + + it('should consume messages', async function () { + let onMessage: any = null; + + connectionManager.simulateConnect(); + const channelWrapper = new ChannelWrapper(connectionManager, { + async setup(channel: amqplib.ConfirmChannel) { + channel.consume = jest.fn().mockImplementation((_queue, onMsg, _options) => { + onMessage = onMsg; + return Promise.resolve({ consumerTag: 'abc' }); + }); + }, + }); + await channelWrapper.waitForConnect(); + + const messages: any[] = []; + await channelWrapper.consume( + 'queue', + (msg) => { + messages.push(msg); + }, + { noAck: true } + ); + + onMessage(1); + onMessage(2); + onMessage(3); + expect(messages).to.deep.equal([1, 2, 3]); + }); + + it('should reconnect consumer on consumer cancellation', async function () { + let onMessage: any = null; + let consumerTag = 0; + + connectionManager.simulateConnect(); + const channelWrapper = new ChannelWrapper(connectionManager, { + async setup(channel: amqplib.ConfirmChannel) { + channel.consume = jest.fn().mockImplementation((_queue, onMsg, _options) => { + onMessage = onMsg; + return Promise.resolve({ consumerTag: `${consumerTag++}` }); + }); + }, + }); + await channelWrapper.waitForConnect(); + + const messages: any[] = []; + await channelWrapper.consume('queue', (msg) => { + messages.push(msg); + }); + + onMessage(1); + onMessage(null); // simulate consumer cancel + onMessage(2); + onMessage(null); // simulate second cancel + onMessage(3); + + expect(messages).to.deep.equal([1, 2, 3]); + expect(consumerTag).to.equal(3); + }); + + it('should reconnect consumers on channel error', async function () { + let onQueue1: any = null; + let onQueue2: any = null; + let consumerTag = 0; + + // Define a prefetch function here, because it will otherwise be + // unique for each new channel + const prefetchFn = jest + .fn() + .mockImplementation((_prefetch: number, _isGlobal: boolean) => {}); + + connectionManager.simulateConnect(); + const channelWrapper = new ChannelWrapper(connectionManager, { + async setup(channel: amqplib.ConfirmChannel) { + channel.prefetch = prefetchFn; + channel.consume = jest.fn().mockImplementation((queue, onMsg, _options) => { + if (queue === 'queue1') { + onQueue1 = onMsg; + } else { + onQueue2 = onMsg; + } + return Promise.resolve({ consumerTag: `${consumerTag++}` }); + }); + }, + }); + await channelWrapper.waitForConnect(); + + const queue1: any[] = []; + await channelWrapper.consume( + 'queue1', + (msg) => { + queue1.push(msg); + }, + { noAck: true }, + { prefetch: 10 } + ); + + const queue2: any[] = []; + await channelWrapper.consume('queue2', (msg) => { + queue2.push(msg); + }); + + onQueue1(1); + onQueue2(1); + + connectionManager.simulateDisconnect(); + connectionManager.simulateConnect(); + await channelWrapper.waitForConnect(); + + onQueue1(2); + onQueue2(2); + + expect(queue1).to.deep.equal([1, 2]); + expect(queue2).to.deep.equal([1, 2]); + expect(consumerTag).to.equal(4); + expect(prefetchFn).to.have.beenCalledTimes(2); + expect(prefetchFn).to.have.beenNthCalledWith(1, 10, false); + expect(prefetchFn).to.have.beenNthCalledWith(2, 10, false); + }); + + it('should be able to cancel all consumers', async function () { + let onQueue1: any = null; + let onQueue2: any = null; + let consumerTag = 0; + const canceledTags: number[] = []; + + connectionManager.simulateConnect(); + const channelWrapper = new ChannelWrapper(connectionManager, { + async setup(channel: amqplib.ConfirmChannel) { + channel.consume = jest.fn().mockImplementation((queue, onMsg, _options) => { + if (queue === 'queue1') { + onQueue1 = onMsg; + } else { + onQueue2 = onMsg; + } + return Promise.resolve({ consumerTag: `${consumerTag++}` }); + }); + channel.cancel = jest.fn().mockImplementation((consumerTag) => { + canceledTags.push(consumerTag); + if (consumerTag === '0') { + onQueue1(null); + } else if (consumerTag === '1') { + onQueue2(null); + } + return Promise.resolve(); + }); + }, + }); + await channelWrapper.waitForConnect(); + + const queue1: any[] = []; + await channelWrapper.consume('queue1', (msg) => { + queue1.push(msg); + }); + + const queue2: any[] = []; + await channelWrapper.consume('queue2', (msg) => { + queue2.push(msg); + }); + + onQueue1(1); + onQueue2(1); + + await channelWrapper.cancelAll(); + + // Consumers shouldn't be resumed after reconnect when canceled + connectionManager.simulateDisconnect(); + connectionManager.simulateConnect(); + await channelWrapper.waitForConnect(); + + expect(queue1).to.deep.equal([1]); + expect(queue2).to.deep.equal([1]); + expect(consumerTag).to.equal(2); + expect(canceledTags).to.deep.equal(['0', '1']); + }); }); /** Returns the arguments of the most recent call to this mock. */ diff --git a/test/fixtures.ts b/test/fixtures.ts index 9586068..e9e4ef6 100644 --- a/test/fixtures.ts +++ b/test/fixtures.ts @@ -146,6 +146,12 @@ export class FakeConfirmChannel extends EventEmitter { close = jest.fn().mockImplementation(async (): Promise => { this.emit('close'); }); + + consume = jest.fn().mockImplementation(async (): Promise => { + return { consumerTag: 'abc' }; + }); + + prefetch = jest.fn().mockImplementation((_prefetch: number, _isGlobal: boolean): void => {}); } export class FakeConnection extends EventEmitter { From 9eb027c513d0766b2bd2500c7924c33cece79e31 Mon Sep 17 00:00:00 2001 From: Mathias Lundell Date: Fri, 27 Aug 2021 11:29:27 +0200 Subject: [PATCH 2/3] test: typescript error on done with err: unknown --- test/ChannelWrapperTest.ts | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/test/ChannelWrapperTest.ts b/test/ChannelWrapperTest.ts index 9c2bb08..7e5be65 100644 --- a/test/ChannelWrapperTest.ts +++ b/test/ChannelWrapperTest.ts @@ -266,9 +266,7 @@ describe('ChannelWrapper', function () { }); }); - it('should publish messages to the underlying channel with callbacks', function (done: ( - err?: Error - ) => void) { + it('should publish messages to the underlying channel with callbacks', function (done) { connectionManager.simulateConnect(); const channelWrapper = new ChannelWrapper(connectionManager); channelWrapper.waitForConnect(function (err) { From 9caf187b2f1c5c9add1df4bbdde88ac9a8e9b60c Mon Sep 17 00:00:00 2001 From: Mathias Lundell Date: Fri, 27 Aug 2021 16:48:11 +0200 Subject: [PATCH 3/3] refactor: merge prefetch with other consume options --- src/ChannelWrapper.ts | 19 ++++++++++--------- test/ChannelWrapperTest.ts | 3 +-- 2 files changed, 11 insertions(+), 11 deletions(-) diff --git a/src/ChannelWrapper.ts b/src/ChannelWrapper.ts index 2bf4b4c..953cab8 100644 --- a/src/ChannelWrapper.ts +++ b/src/ChannelWrapper.ts @@ -44,12 +44,15 @@ interface SendToQueueMessage { reject: (err: Error) => void; } +interface ConsumerOptions extends amqplib.Options.Consume { + prefetch?: number +} + interface Consumer { consumerTag: string | null; queue: string; onMessage: (msg: amqplib.ConsumeMessage) => void; - consumeOptions?: amqplib.Options.Consume; - options?: { prefetch?: number }; + options: ConsumerOptions; } type Message = PublishMessage | SendToQueueMessage; @@ -603,14 +606,12 @@ export default class ChannelWrapper extends EventEmitter { async consume( queue: string, onMessage: Consumer['onMessage'], - consumeOptions?: Consumer['consumeOptions'], - options?: Consumer['options'] + options: ConsumerOptions = {} ): Promise { const consumer: Consumer = { consumerTag: null, queue, onMessage, - consumeOptions, options, }; this._consumers.push(consumer); @@ -622,9 +623,9 @@ export default class ChannelWrapper extends EventEmitter { return; } - const options = consumer.options; - if (options?.prefetch) { - this._channel.prefetch(options.prefetch, false); + const { prefetch, ...options } = consumer.options; + if (typeof prefetch === 'number') { + this._channel.prefetch(prefetch, false); } const { consumerTag } = await this._channel.consume( @@ -647,7 +648,7 @@ export default class ChannelWrapper extends EventEmitter { } consumer.onMessage(msg); }, - consumer.consumeOptions + options ); consumer.consumerTag = consumerTag; } diff --git a/test/ChannelWrapperTest.ts b/test/ChannelWrapperTest.ts index 7e5be65..56cea05 100644 --- a/test/ChannelWrapperTest.ts +++ b/test/ChannelWrapperTest.ts @@ -1061,8 +1061,7 @@ describe('ChannelWrapper', function () { (msg) => { queue1.push(msg); }, - { noAck: true }, - { prefetch: 10 } + { noAck: true, prefetch: 10 }, ); const queue2: any[] = [];