diff --git a/src/ChannelWrapper.ts b/src/ChannelWrapper.ts index 6a050f4..953cab8 100644 --- a/src/ChannelWrapper.ts +++ b/src/ChannelWrapper.ts @@ -44,6 +44,17 @@ interface SendToQueueMessage { reject: (err: Error) => void; } +interface ConsumerOptions extends amqplib.Options.Consume { + prefetch?: number +} + +interface Consumer { + consumerTag: string | null; + queue: string; + onMessage: (msg: amqplib.ConsumeMessage) => void; + options: ConsumerOptions; +} + type Message = PublishMessage | SendToQueueMessage; const IRRECOVERABLE_ERRORS = [ @@ -87,6 +98,8 @@ export default class ChannelWrapper extends EventEmitter { private _unconfirmedMessages: Message[] = []; /** Reason code during publish or sendtoqueue messages. */ private _irrecoverableCode: number | undefined; + /** Consumers which will be reconnected on channel errors etc. */ + private _consumers: Consumer[] = []; /** * The currently connected channel. Note that not all setup functions @@ -324,6 +337,8 @@ export default class ChannelWrapper extends EventEmitter { // Array of setup functions to call. this._setups = []; + this._consumers = []; + if (options.setup) { this._setups.push(options.setup); } @@ -359,10 +374,13 @@ export default class ChannelWrapper extends EventEmitter { this.emit('error', err, { name: this.name }); }) ) - ).then(() => { - this._settingUp = undefined; - }); - + ) + .then(() => { + return Promise.all(this._consumers.map((c) => this._reconnectConsumer(c))); + }) + .then(() => { + this._settingUp = undefined; + }); await this._settingUp; if (!this._channel) { @@ -581,6 +599,89 @@ export default class ChannelWrapper extends EventEmitter { } } + /** + * Setup a consumer + * This consumer will be reconnected on cancellation and channel errors. + */ + async consume( + queue: string, + onMessage: Consumer['onMessage'], + options: ConsumerOptions = {} + ): Promise { + const consumer: Consumer = { + consumerTag: null, + queue, + onMessage, + options, + }; + this._consumers.push(consumer); + await this._consume(consumer); + } + + private async _consume(consumer: Consumer): Promise { + if (!this._channel) { + return; + } + + const { prefetch, ...options } = consumer.options; + if (typeof prefetch === 'number') { + this._channel.prefetch(prefetch, false); + } + + const { consumerTag } = await this._channel.consume( + consumer.queue, + (msg) => { + if (!msg) { + consumer.consumerTag = null; + this._reconnectConsumer(consumer).catch((err) => { + if (err.isOperational && err.message.includes('BasicConsume; 404')) { + // Ignore errors caused by queue not declared. In + // those cases the connection will reconnect and + // then consumers reestablished. The full reconnect + // might be avoided if we assert the queue again + // before starting to consume. + return; + } + throw err; + }); + return; + } + consumer.onMessage(msg); + }, + options + ); + consumer.consumerTag = consumerTag; + } + + private async _reconnectConsumer(consumer: Consumer): Promise { + if (!this._consumers.includes(consumer)) { + // Intentionally canceled + return; + } + await this._consume(consumer); + } + + /** + * Cancel all consumers + */ + async cancelAll(): Promise { + const consumers = this._consumers; + this._consumers = []; + if (!this._channel) { + return; + } + + const channel = this._channel; + await Promise.all( + consumers.reduce((acc, consumer) => { + if (consumer.consumerTag) { + acc.push(channel.cancel(consumer.consumerTag)); + } + return acc; + }, []) + ); + } + /** Send an `ack` to the underlying channel. */ ack(message: amqplib.Message, allUpTo?: boolean): void { this._channel && this._channel.ack(message, allUpTo); diff --git a/test/ChannelWrapperTest.ts b/test/ChannelWrapperTest.ts index 2d7d1ef..56cea05 100644 --- a/test/ChannelWrapperTest.ts +++ b/test/ChannelWrapperTest.ts @@ -266,9 +266,7 @@ describe('ChannelWrapper', function () { }); }); - it('should publish messages to the underlying channel with callbacks', function (done: ( - err?: Error - ) => void) { + it('should publish messages to the underlying channel with callbacks', function (done) { connectionManager.simulateConnect(); const channelWrapper = new ChannelWrapper(connectionManager); channelWrapper.waitForConnect(function (err) { @@ -970,6 +968,180 @@ describe('ChannelWrapper', function () { // Final message should have been published to the underlying queue. expect(queue.length).to.equal(2); }); + + it('should consume messages', async function () { + let onMessage: any = null; + + connectionManager.simulateConnect(); + const channelWrapper = new ChannelWrapper(connectionManager, { + async setup(channel: amqplib.ConfirmChannel) { + channel.consume = jest.fn().mockImplementation((_queue, onMsg, _options) => { + onMessage = onMsg; + return Promise.resolve({ consumerTag: 'abc' }); + }); + }, + }); + await channelWrapper.waitForConnect(); + + const messages: any[] = []; + await channelWrapper.consume( + 'queue', + (msg) => { + messages.push(msg); + }, + { noAck: true } + ); + + onMessage(1); + onMessage(2); + onMessage(3); + expect(messages).to.deep.equal([1, 2, 3]); + }); + + it('should reconnect consumer on consumer cancellation', async function () { + let onMessage: any = null; + let consumerTag = 0; + + connectionManager.simulateConnect(); + const channelWrapper = new ChannelWrapper(connectionManager, { + async setup(channel: amqplib.ConfirmChannel) { + channel.consume = jest.fn().mockImplementation((_queue, onMsg, _options) => { + onMessage = onMsg; + return Promise.resolve({ consumerTag: `${consumerTag++}` }); + }); + }, + }); + await channelWrapper.waitForConnect(); + + const messages: any[] = []; + await channelWrapper.consume('queue', (msg) => { + messages.push(msg); + }); + + onMessage(1); + onMessage(null); // simulate consumer cancel + onMessage(2); + onMessage(null); // simulate second cancel + onMessage(3); + + expect(messages).to.deep.equal([1, 2, 3]); + expect(consumerTag).to.equal(3); + }); + + it('should reconnect consumers on channel error', async function () { + let onQueue1: any = null; + let onQueue2: any = null; + let consumerTag = 0; + + // Define a prefetch function here, because it will otherwise be + // unique for each new channel + const prefetchFn = jest + .fn() + .mockImplementation((_prefetch: number, _isGlobal: boolean) => {}); + + connectionManager.simulateConnect(); + const channelWrapper = new ChannelWrapper(connectionManager, { + async setup(channel: amqplib.ConfirmChannel) { + channel.prefetch = prefetchFn; + channel.consume = jest.fn().mockImplementation((queue, onMsg, _options) => { + if (queue === 'queue1') { + onQueue1 = onMsg; + } else { + onQueue2 = onMsg; + } + return Promise.resolve({ consumerTag: `${consumerTag++}` }); + }); + }, + }); + await channelWrapper.waitForConnect(); + + const queue1: any[] = []; + await channelWrapper.consume( + 'queue1', + (msg) => { + queue1.push(msg); + }, + { noAck: true, prefetch: 10 }, + ); + + const queue2: any[] = []; + await channelWrapper.consume('queue2', (msg) => { + queue2.push(msg); + }); + + onQueue1(1); + onQueue2(1); + + connectionManager.simulateDisconnect(); + connectionManager.simulateConnect(); + await channelWrapper.waitForConnect(); + + onQueue1(2); + onQueue2(2); + + expect(queue1).to.deep.equal([1, 2]); + expect(queue2).to.deep.equal([1, 2]); + expect(consumerTag).to.equal(4); + expect(prefetchFn).to.have.beenCalledTimes(2); + expect(prefetchFn).to.have.beenNthCalledWith(1, 10, false); + expect(prefetchFn).to.have.beenNthCalledWith(2, 10, false); + }); + + it('should be able to cancel all consumers', async function () { + let onQueue1: any = null; + let onQueue2: any = null; + let consumerTag = 0; + const canceledTags: number[] = []; + + connectionManager.simulateConnect(); + const channelWrapper = new ChannelWrapper(connectionManager, { + async setup(channel: amqplib.ConfirmChannel) { + channel.consume = jest.fn().mockImplementation((queue, onMsg, _options) => { + if (queue === 'queue1') { + onQueue1 = onMsg; + } else { + onQueue2 = onMsg; + } + return Promise.resolve({ consumerTag: `${consumerTag++}` }); + }); + channel.cancel = jest.fn().mockImplementation((consumerTag) => { + canceledTags.push(consumerTag); + if (consumerTag === '0') { + onQueue1(null); + } else if (consumerTag === '1') { + onQueue2(null); + } + return Promise.resolve(); + }); + }, + }); + await channelWrapper.waitForConnect(); + + const queue1: any[] = []; + await channelWrapper.consume('queue1', (msg) => { + queue1.push(msg); + }); + + const queue2: any[] = []; + await channelWrapper.consume('queue2', (msg) => { + queue2.push(msg); + }); + + onQueue1(1); + onQueue2(1); + + await channelWrapper.cancelAll(); + + // Consumers shouldn't be resumed after reconnect when canceled + connectionManager.simulateDisconnect(); + connectionManager.simulateConnect(); + await channelWrapper.waitForConnect(); + + expect(queue1).to.deep.equal([1]); + expect(queue2).to.deep.equal([1]); + expect(consumerTag).to.equal(2); + expect(canceledTags).to.deep.equal(['0', '1']); + }); }); /** Returns the arguments of the most recent call to this mock. */ diff --git a/test/fixtures.ts b/test/fixtures.ts index 9586068..e9e4ef6 100644 --- a/test/fixtures.ts +++ b/test/fixtures.ts @@ -146,6 +146,12 @@ export class FakeConfirmChannel extends EventEmitter { close = jest.fn().mockImplementation(async (): Promise => { this.emit('close'); }); + + consume = jest.fn().mockImplementation(async (): Promise => { + return { consumerTag: 'abc' }; + }); + + prefetch = jest.fn().mockImplementation((_prefetch: number, _isGlobal: boolean): void => {}); } export class FakeConnection extends EventEmitter {