Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: reconnect and cancelAll consumers #179

Merged
merged 3 commits into from
Aug 27, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
109 changes: 105 additions & 4 deletions src/ChannelWrapper.ts
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,17 @@ interface SendToQueueMessage {
reject: (err: Error) => void;
}

interface ConsumerOptions extends amqplib.Options.Consume {
prefetch?: number
}

interface Consumer {
consumerTag: string | null;
queue: string;
onMessage: (msg: amqplib.ConsumeMessage) => void;
options: ConsumerOptions;
}

type Message = PublishMessage | SendToQueueMessage;

const IRRECOVERABLE_ERRORS = [
Expand Down Expand Up @@ -87,6 +98,8 @@ export default class ChannelWrapper extends EventEmitter {
private _unconfirmedMessages: Message[] = [];
/** Reason code during publish or sendtoqueue messages. */
private _irrecoverableCode: number | undefined;
/** Consumers which will be reconnected on channel errors etc. */
private _consumers: Consumer[] = [];

/**
* The currently connected channel. Note that not all setup functions
Expand Down Expand Up @@ -324,6 +337,8 @@ export default class ChannelWrapper extends EventEmitter {

// Array of setup functions to call.
this._setups = [];
this._consumers = [];

if (options.setup) {
this._setups.push(options.setup);
}
Expand Down Expand Up @@ -359,10 +374,13 @@ export default class ChannelWrapper extends EventEmitter {
this.emit('error', err, { name: this.name });
})
)
).then(() => {
this._settingUp = undefined;
});

)
.then(() => {
return Promise.all(this._consumers.map((c) => this._reconnectConsumer(c)));
})
.then(() => {
this._settingUp = undefined;
});
await this._settingUp;

if (!this._channel) {
Expand Down Expand Up @@ -581,6 +599,89 @@ export default class ChannelWrapper extends EventEmitter {
}
}

/**
* Setup a consumer
* This consumer will be reconnected on cancellation and channel errors.
*/
async consume(
queue: string,
onMessage: Consumer['onMessage'],
options: ConsumerOptions = {}
): Promise<void> {
const consumer: Consumer = {
consumerTag: null,
queue,
onMessage,
options,
};
this._consumers.push(consumer);
await this._consume(consumer);
}

private async _consume(consumer: Consumer): Promise<void> {
if (!this._channel) {
return;
}

const { prefetch, ...options } = consumer.options;
if (typeof prefetch === 'number') {
this._channel.prefetch(prefetch, false);
}

const { consumerTag } = await this._channel.consume(
consumer.queue,
(msg) => {
if (!msg) {
consumer.consumerTag = null;
this._reconnectConsumer(consumer).catch((err) => {
if (err.isOperational && err.message.includes('BasicConsume; 404')) {
// Ignore errors caused by queue not declared. In
jwalton marked this conversation as resolved.
Show resolved Hide resolved
// those cases the connection will reconnect and
// then consumers reestablished. The full reconnect
// might be avoided if we assert the queue again
// before starting to consume.
return;
}
throw err;
});
return;
}
consumer.onMessage(msg);
},
options
);
consumer.consumerTag = consumerTag;
}

private async _reconnectConsumer(consumer: Consumer): Promise<void> {
if (!this._consumers.includes(consumer)) {
// Intentionally canceled
return;
}
await this._consume(consumer);
}

/**
* Cancel all consumers
*/
async cancelAll(): Promise<void> {
const consumers = this._consumers;
this._consumers = [];
if (!this._channel) {
return;
}

const channel = this._channel;
await Promise.all(
consumers.reduce<any[]>((acc, consumer) => {
if (consumer.consumerTag) {
acc.push(channel.cancel(consumer.consumerTag));
}
return acc;
}, [])
);
}

/** Send an `ack` to the underlying channel. */
ack(message: amqplib.Message, allUpTo?: boolean): void {
this._channel && this._channel.ack(message, allUpTo);
Expand Down
178 changes: 175 additions & 3 deletions test/ChannelWrapperTest.ts
Original file line number Diff line number Diff line change
Expand Up @@ -266,9 +266,7 @@ describe('ChannelWrapper', function () {
});
});

it('should publish messages to the underlying channel with callbacks', function (done: (
err?: Error
) => void) {
it('should publish messages to the underlying channel with callbacks', function (done) {
connectionManager.simulateConnect();
const channelWrapper = new ChannelWrapper(connectionManager);
channelWrapper.waitForConnect(function (err) {
Expand Down Expand Up @@ -970,6 +968,180 @@ describe('ChannelWrapper', function () {
// Final message should have been published to the underlying queue.
expect(queue.length).to.equal(2);
});

it('should consume messages', async function () {
let onMessage: any = null;

connectionManager.simulateConnect();
const channelWrapper = new ChannelWrapper(connectionManager, {
async setup(channel: amqplib.ConfirmChannel) {
channel.consume = jest.fn().mockImplementation((_queue, onMsg, _options) => {
onMessage = onMsg;
return Promise.resolve({ consumerTag: 'abc' });
});
},
});
await channelWrapper.waitForConnect();

const messages: any[] = [];
await channelWrapper.consume(
'queue',
(msg) => {
messages.push(msg);
},
{ noAck: true }
);

onMessage(1);
onMessage(2);
onMessage(3);
expect(messages).to.deep.equal([1, 2, 3]);
});

it('should reconnect consumer on consumer cancellation', async function () {
let onMessage: any = null;
let consumerTag = 0;

connectionManager.simulateConnect();
const channelWrapper = new ChannelWrapper(connectionManager, {
async setup(channel: amqplib.ConfirmChannel) {
channel.consume = jest.fn().mockImplementation((_queue, onMsg, _options) => {
onMessage = onMsg;
return Promise.resolve({ consumerTag: `${consumerTag++}` });
});
},
});
await channelWrapper.waitForConnect();

const messages: any[] = [];
await channelWrapper.consume('queue', (msg) => {
messages.push(msg);
});

onMessage(1);
onMessage(null); // simulate consumer cancel
onMessage(2);
onMessage(null); // simulate second cancel
onMessage(3);

expect(messages).to.deep.equal([1, 2, 3]);
expect(consumerTag).to.equal(3);
});

it('should reconnect consumers on channel error', async function () {
let onQueue1: any = null;
let onQueue2: any = null;
let consumerTag = 0;

// Define a prefetch function here, because it will otherwise be
// unique for each new channel
const prefetchFn = jest
.fn()
.mockImplementation((_prefetch: number, _isGlobal: boolean) => {});

connectionManager.simulateConnect();
const channelWrapper = new ChannelWrapper(connectionManager, {
async setup(channel: amqplib.ConfirmChannel) {
channel.prefetch = prefetchFn;
channel.consume = jest.fn().mockImplementation((queue, onMsg, _options) => {
if (queue === 'queue1') {
onQueue1 = onMsg;
} else {
onQueue2 = onMsg;
}
return Promise.resolve({ consumerTag: `${consumerTag++}` });
});
},
});
await channelWrapper.waitForConnect();

const queue1: any[] = [];
await channelWrapper.consume(
'queue1',
(msg) => {
queue1.push(msg);
},
{ noAck: true, prefetch: 10 },
);

const queue2: any[] = [];
await channelWrapper.consume('queue2', (msg) => {
queue2.push(msg);
});

onQueue1(1);
onQueue2(1);

connectionManager.simulateDisconnect();
connectionManager.simulateConnect();
await channelWrapper.waitForConnect();

onQueue1(2);
onQueue2(2);

expect(queue1).to.deep.equal([1, 2]);
expect(queue2).to.deep.equal([1, 2]);
expect(consumerTag).to.equal(4);
expect(prefetchFn).to.have.beenCalledTimes(2);
expect(prefetchFn).to.have.beenNthCalledWith(1, 10, false);
expect(prefetchFn).to.have.beenNthCalledWith(2, 10, false);
});

it('should be able to cancel all consumers', async function () {
let onQueue1: any = null;
let onQueue2: any = null;
let consumerTag = 0;
const canceledTags: number[] = [];

connectionManager.simulateConnect();
const channelWrapper = new ChannelWrapper(connectionManager, {
async setup(channel: amqplib.ConfirmChannel) {
channel.consume = jest.fn().mockImplementation((queue, onMsg, _options) => {
if (queue === 'queue1') {
onQueue1 = onMsg;
} else {
onQueue2 = onMsg;
}
return Promise.resolve({ consumerTag: `${consumerTag++}` });
});
channel.cancel = jest.fn().mockImplementation((consumerTag) => {
canceledTags.push(consumerTag);
if (consumerTag === '0') {
onQueue1(null);
} else if (consumerTag === '1') {
onQueue2(null);
}
return Promise.resolve();
});
},
});
await channelWrapper.waitForConnect();

const queue1: any[] = [];
await channelWrapper.consume('queue1', (msg) => {
queue1.push(msg);
});

const queue2: any[] = [];
await channelWrapper.consume('queue2', (msg) => {
queue2.push(msg);
});

onQueue1(1);
onQueue2(1);

await channelWrapper.cancelAll();

// Consumers shouldn't be resumed after reconnect when canceled
connectionManager.simulateDisconnect();
connectionManager.simulateConnect();
await channelWrapper.waitForConnect();

expect(queue1).to.deep.equal([1]);
expect(queue2).to.deep.equal([1]);
expect(consumerTag).to.equal(2);
expect(canceledTags).to.deep.equal(['0', '1']);
});
});

/** Returns the arguments of the most recent call to this mock. */
Expand Down
6 changes: 6 additions & 0 deletions test/fixtures.ts
Original file line number Diff line number Diff line change
Expand Up @@ -146,6 +146,12 @@ export class FakeConfirmChannel extends EventEmitter {
close = jest.fn().mockImplementation(async (): Promise<void> => {
this.emit('close');
});

consume = jest.fn().mockImplementation(async (): Promise<Replies.Consume> => {
return { consumerTag: 'abc' };
});

prefetch = jest.fn().mockImplementation((_prefetch: number, _isGlobal: boolean): void => {});
}

export class FakeConnection extends EventEmitter {
Expand Down