Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: reconnect and cancelAll consumers #179

Merged
merged 3 commits into from
Aug 27, 2021
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
108 changes: 104 additions & 4 deletions src/ChannelWrapper.ts
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,14 @@ interface SendToQueueMessage {
reject: (err: Error) => void;
}

interface Consumer {
consumerTag: string | null;
queue: string;
onMessage: (msg: amqplib.ConsumeMessage) => void;
consumeOptions?: amqplib.Options.Consume;
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What do you think of merging these two sets of options objects into one?

Suggested change
consumeOptions?: amqplib.Options.Consume;
consumeOptions?: amqplib.Options.Consume & { prefetch?: number };

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It might read better. I was a little bit torn about this because I couldn't decide if it was better to keep the ordinary consume options separated from the node-amqp-connection-manager specific options or not.

I can rewrite it so we get a feel for your alternative.

Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I know what you mean, because this is definitely "cleaner" from how we call into amqplib, but the API is a little weird with two sets of options...

If you do something like:

const { prefetch, ...amqpLibOptions } = options;

you can cleanly extract our "custom" options, and anything extra will get passed down to amqplib, so if amqplib adds extra options we don't have to explicitly add them.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah, you read my mind =)

options?: { prefetch?: number };
}

type Message = PublishMessage | SendToQueueMessage;

const IRRECOVERABLE_ERRORS = [
Expand Down Expand Up @@ -87,6 +95,8 @@ export default class ChannelWrapper extends EventEmitter {
private _unconfirmedMessages: Message[] = [];
/** Reason code during publish or sendtoqueue messages. */
private _irrecoverableCode: number | undefined;
/** Consumers which will be reconnected on channel errors etc. */
private _consumers: Consumer[] = [];

/**
* The currently connected channel. Note that not all setup functions
Expand Down Expand Up @@ -324,6 +334,8 @@ export default class ChannelWrapper extends EventEmitter {

// Array of setup functions to call.
this._setups = [];
this._consumers = [];

if (options.setup) {
this._setups.push(options.setup);
}
Expand Down Expand Up @@ -359,10 +371,13 @@ export default class ChannelWrapper extends EventEmitter {
this.emit('error', err, { name: this.name });
})
)
).then(() => {
this._settingUp = undefined;
});

)
.then(() => {
return Promise.all(this._consumers.map((c) => this._reconnectConsumer(c)));
})
.then(() => {
this._settingUp = undefined;
});
await this._settingUp;

if (!this._channel) {
Expand Down Expand Up @@ -581,6 +596,91 @@ export default class ChannelWrapper extends EventEmitter {
}
}

/**
* Setup a consumer
* This consumer will be reconnected on cancellation and channel errors.
*/
async consume(
queue: string,
onMessage: Consumer['onMessage'],
consumeOptions?: Consumer['consumeOptions'],
options?: Consumer['options']
): Promise<void> {
const consumer: Consumer = {
consumerTag: null,
queue,
onMessage,
consumeOptions,
options,
};
this._consumers.push(consumer);
await this._consume(consumer);
}

private async _consume(consumer: Consumer): Promise<void> {
if (!this._channel) {
return;
}

const options = consumer.options;
if (options?.prefetch) {
this._channel.prefetch(options.prefetch, false);
}

const { consumerTag } = await this._channel.consume(
consumer.queue,
(msg) => {
if (!msg) {
consumer.consumerTag = null;
this._reconnectConsumer(consumer).catch((err) => {
if (err.isOperational && err.message.includes('BasicConsume; 404')) {
// Ignore errors caused by queue not declared. In
jwalton marked this conversation as resolved.
Show resolved Hide resolved
// those cases the connection will reconnect and
// then consumers reestablished. The full reconnect
// might be avoided if we assert the queue again
// before starting to consume.
return;
}
throw err;
});
return;
}
consumer.onMessage(msg);
},
consumer.consumeOptions
);
consumer.consumerTag = consumerTag;
}

private async _reconnectConsumer(consumer: Consumer): Promise<void> {
if (!this._consumers.includes(consumer)) {
// Intentionally canceled
return;
}
await this._consume(consumer);
}

/**
* Cancel all consumers
*/
async cancelAll(): Promise<void> {
const consumers = this._consumers;
this._consumers = [];
if (!this._channel) {
return;
}

const channel = this._channel;
await Promise.all(
consumers.reduce<any[]>((acc, consumer) => {
if (consumer.consumerTag) {
acc.push(channel.cancel(consumer.consumerTag));
}
return acc;
}, [])
);
}

/** Send an `ack` to the underlying channel. */
ack(message: amqplib.Message, allUpTo?: boolean): void {
this._channel && this._channel.ack(message, allUpTo);
Expand Down
179 changes: 176 additions & 3 deletions test/ChannelWrapperTest.ts
Original file line number Diff line number Diff line change
Expand Up @@ -266,9 +266,7 @@ describe('ChannelWrapper', function () {
});
});

it('should publish messages to the underlying channel with callbacks', function (done: (
err?: Error
) => void) {
it('should publish messages to the underlying channel with callbacks', function (done) {
connectionManager.simulateConnect();
const channelWrapper = new ChannelWrapper(connectionManager);
channelWrapper.waitForConnect(function (err) {
Expand Down Expand Up @@ -970,6 +968,181 @@ describe('ChannelWrapper', function () {
// Final message should have been published to the underlying queue.
expect(queue.length).to.equal(2);
});

it('should consume messages', async function () {
let onMessage: any = null;

connectionManager.simulateConnect();
const channelWrapper = new ChannelWrapper(connectionManager, {
async setup(channel: amqplib.ConfirmChannel) {
channel.consume = jest.fn().mockImplementation((_queue, onMsg, _options) => {
onMessage = onMsg;
return Promise.resolve({ consumerTag: 'abc' });
});
},
});
await channelWrapper.waitForConnect();

const messages: any[] = [];
await channelWrapper.consume(
'queue',
(msg) => {
messages.push(msg);
},
{ noAck: true }
);

onMessage(1);
onMessage(2);
onMessage(3);
expect(messages).to.deep.equal([1, 2, 3]);
});

it('should reconnect consumer on consumer cancellation', async function () {
let onMessage: any = null;
let consumerTag = 0;

connectionManager.simulateConnect();
const channelWrapper = new ChannelWrapper(connectionManager, {
async setup(channel: amqplib.ConfirmChannel) {
channel.consume = jest.fn().mockImplementation((_queue, onMsg, _options) => {
onMessage = onMsg;
return Promise.resolve({ consumerTag: `${consumerTag++}` });
});
},
});
await channelWrapper.waitForConnect();

const messages: any[] = [];
await channelWrapper.consume('queue', (msg) => {
messages.push(msg);
});

onMessage(1);
onMessage(null); // simulate consumer cancel
onMessage(2);
onMessage(null); // simulate second cancel
onMessage(3);

expect(messages).to.deep.equal([1, 2, 3]);
expect(consumerTag).to.equal(3);
});

it('should reconnect consumers on channel error', async function () {
let onQueue1: any = null;
let onQueue2: any = null;
let consumerTag = 0;

// Define a prefetch function here, because it will otherwise be
// unique for each new channel
const prefetchFn = jest
.fn()
.mockImplementation((_prefetch: number, _isGlobal: boolean) => {});

connectionManager.simulateConnect();
const channelWrapper = new ChannelWrapper(connectionManager, {
async setup(channel: amqplib.ConfirmChannel) {
channel.prefetch = prefetchFn;
channel.consume = jest.fn().mockImplementation((queue, onMsg, _options) => {
if (queue === 'queue1') {
onQueue1 = onMsg;
} else {
onQueue2 = onMsg;
}
return Promise.resolve({ consumerTag: `${consumerTag++}` });
});
},
});
await channelWrapper.waitForConnect();

const queue1: any[] = [];
await channelWrapper.consume(
'queue1',
(msg) => {
queue1.push(msg);
},
{ noAck: true },
{ prefetch: 10 }
);

const queue2: any[] = [];
await channelWrapper.consume('queue2', (msg) => {
queue2.push(msg);
});

onQueue1(1);
onQueue2(1);

connectionManager.simulateDisconnect();
connectionManager.simulateConnect();
await channelWrapper.waitForConnect();

onQueue1(2);
onQueue2(2);

expect(queue1).to.deep.equal([1, 2]);
expect(queue2).to.deep.equal([1, 2]);
expect(consumerTag).to.equal(4);
expect(prefetchFn).to.have.beenCalledTimes(2);
expect(prefetchFn).to.have.beenNthCalledWith(1, 10, false);
expect(prefetchFn).to.have.beenNthCalledWith(2, 10, false);
});

it('should be able to cancel all consumers', async function () {
let onQueue1: any = null;
let onQueue2: any = null;
let consumerTag = 0;
const canceledTags: number[] = [];

connectionManager.simulateConnect();
const channelWrapper = new ChannelWrapper(connectionManager, {
async setup(channel: amqplib.ConfirmChannel) {
channel.consume = jest.fn().mockImplementation((queue, onMsg, _options) => {
if (queue === 'queue1') {
onQueue1 = onMsg;
} else {
onQueue2 = onMsg;
}
return Promise.resolve({ consumerTag: `${consumerTag++}` });
});
channel.cancel = jest.fn().mockImplementation((consumerTag) => {
canceledTags.push(consumerTag);
if (consumerTag === '0') {
onQueue1(null);
} else if (consumerTag === '1') {
onQueue2(null);
}
return Promise.resolve();
});
},
});
await channelWrapper.waitForConnect();

const queue1: any[] = [];
await channelWrapper.consume('queue1', (msg) => {
queue1.push(msg);
});

const queue2: any[] = [];
await channelWrapper.consume('queue2', (msg) => {
queue2.push(msg);
});

onQueue1(1);
onQueue2(1);

await channelWrapper.cancelAll();

// Consumers shouldn't be resumed after reconnect when canceled
connectionManager.simulateDisconnect();
connectionManager.simulateConnect();
await channelWrapper.waitForConnect();

expect(queue1).to.deep.equal([1]);
expect(queue2).to.deep.equal([1]);
expect(consumerTag).to.equal(2);
expect(canceledTags).to.deep.equal(['0', '1']);
});
});

/** Returns the arguments of the most recent call to this mock. */
Expand Down
6 changes: 6 additions & 0 deletions test/fixtures.ts
Original file line number Diff line number Diff line change
Expand Up @@ -146,6 +146,12 @@ export class FakeConfirmChannel extends EventEmitter {
close = jest.fn().mockImplementation(async (): Promise<void> => {
this.emit('close');
});

consume = jest.fn().mockImplementation(async (): Promise<Replies.Consume> => {
return { consumerTag: 'abc' };
});

prefetch = jest.fn().mockImplementation((_prefetch: number, _isGlobal: boolean): void => {});
}

export class FakeConnection extends EventEmitter {
Expand Down