Skip to content

Commit

Permalink
Merge pull request #181 from luddd3/feat/expose-amqpconnectionmanager
Browse files Browse the repository at this point in the history
feat: expose AmqpConnectionManagerClass
  • Loading branch information
jwalton authored Sep 21, 2021
2 parents b2c89ac + 835a81f commit 449f5db
Show file tree
Hide file tree
Showing 10 changed files with 406 additions and 24 deletions.
6 changes: 6 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,9 @@
# [3.6.0](https://github.com/jwalton/node-amqp-connection-manager/compare/v3.5.2...v3.6.0) (2021-08-27)

### Features

- reconnect and cancelAll consumers ([fb0c00b](https://github.com/jwalton/node-amqp-connection-manager/commit/fb0c00becc224ffedd28e810cbb314187d21efdb))

## [3.5.2](https://github.com/jwalton/node-amqp-connection-manager/compare/v3.5.1...v3.5.2) (2021-08-26)

### Bug Fixes
Expand Down
2 changes: 1 addition & 1 deletion package.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"name": "amqp-connection-manager",
"version": "3.5.2",
"version": "3.6.0",
"description": "Auto-reconnect and round robin support for amqplib.",
"module": "./dist/esm/index.js",
"main": "./dist/cjs/index.js",
Expand Down
41 changes: 40 additions & 1 deletion src/AmqpConnectionManager.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import amqp, { Connection } from 'amqplib';
import { EventEmitter } from 'events';
import { EventEmitter, once } from 'events';
import { TcpSocketConnectOpts } from 'net';
import pb from 'promise-breaker';
import { ConnectionOptions } from 'tls';
Expand Down Expand Up @@ -82,6 +82,8 @@ export interface IAmqpConnectionManager {
addListener(event: 'unblocked', listener: () => void): this;
addListener(event: 'disconnect', listener: (arg: { err: Error }) => void): this;

listeners(eventName: string | symbol): any;

on(event: string, listener: (...args: any[]) => void): this;
on(event: 'connect', listener: ConnectListener): this;
on(event: 'blocked', listener: (arg: { reason: string }) => void): this;
Expand All @@ -108,6 +110,8 @@ export interface IAmqpConnectionManager {

removeListener(event: string, listener: (...args: any[]) => void): this;

connect(options?: { timeout?: number }): Promise<void>;
reconnect(): void;
createChannel(options?: CreateChannelOpts): ChannelWrapper;
close(): Promise<void>;
isConnected(): boolean;
Expand Down Expand Up @@ -196,8 +200,43 @@ export default class AmqpConnectionManager extends EventEmitter implements IAmqp
this.setMaxListeners(0);

this._findServers = options.findServers || (() => Promise.resolve(urls));
}

/**
* Start the connect retries and await the first connect result. Even if the initial connect fails or timeouts, the
* reconnect attempts will continue in the background.
* @param [options={}] -
* @param [options.timeout] - Time to wait for initial connect
*/
async connect({ timeout }: { timeout?: number } = {}): Promise<void> {
this._connect();

let reject: (reason?: any) => void;
const onDisconnect = ({ err }: { err: any }) => {
// Ignore disconnects caused by dead servers etc., but throw on operational errors like bad credentials.
if (err.isOperational) {
reject(err);
}
};

try {
await Promise.race([
once(this, 'connect'),
new Promise((_resolve, innerReject) => {
reject = innerReject;
this.on('disconnect', onDisconnect);
}),
...(timeout
? [
wait(timeout).promise.then(() => {
throw new Error('amqp-connection-manager: connect timeout');
}),
]
: []),
]);
} finally {
this.removeListener('disconnect', onDisconnect);
}
}

// `options` here are any options that can be passed to ChannelWrapper.
Expand Down
109 changes: 105 additions & 4 deletions src/ChannelWrapper.ts
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,17 @@ interface SendToQueueMessage {
reject: (err: Error) => void;
}

interface ConsumerOptions extends amqplib.Options.Consume {
prefetch?: number
}

interface Consumer {
consumerTag: string | null;
queue: string;
onMessage: (msg: amqplib.ConsumeMessage) => void;
options: ConsumerOptions;
}

type Message = PublishMessage | SendToQueueMessage;

const IRRECOVERABLE_ERRORS = [
Expand Down Expand Up @@ -87,6 +98,8 @@ export default class ChannelWrapper extends EventEmitter {
private _unconfirmedMessages: Message[] = [];
/** Reason code during publish or sendtoqueue messages. */
private _irrecoverableCode: number | undefined;
/** Consumers which will be reconnected on channel errors etc. */
private _consumers: Consumer[] = [];

/**
* The currently connected channel. Note that not all setup functions
Expand Down Expand Up @@ -324,6 +337,8 @@ export default class ChannelWrapper extends EventEmitter {

// Array of setup functions to call.
this._setups = [];
this._consumers = [];

if (options.setup) {
this._setups.push(options.setup);
}
Expand Down Expand Up @@ -359,10 +374,13 @@ export default class ChannelWrapper extends EventEmitter {
this.emit('error', err, { name: this.name });
})
)
).then(() => {
this._settingUp = undefined;
});

)
.then(() => {
return Promise.all(this._consumers.map((c) => this._reconnectConsumer(c)));
})
.then(() => {
this._settingUp = undefined;
});
await this._settingUp;

if (!this._channel) {
Expand Down Expand Up @@ -581,6 +599,89 @@ export default class ChannelWrapper extends EventEmitter {
}
}

/**
* Setup a consumer
* This consumer will be reconnected on cancellation and channel errors.
*/
async consume(
queue: string,
onMessage: Consumer['onMessage'],
options: ConsumerOptions = {}
): Promise<void> {
const consumer: Consumer = {
consumerTag: null,
queue,
onMessage,
options,
};
this._consumers.push(consumer);
await this._consume(consumer);
}

private async _consume(consumer: Consumer): Promise<void> {
if (!this._channel) {
return;
}

const { prefetch, ...options } = consumer.options;
if (typeof prefetch === 'number') {
this._channel.prefetch(prefetch, false);
}

const { consumerTag } = await this._channel.consume(
consumer.queue,
(msg) => {
if (!msg) {
consumer.consumerTag = null;
this._reconnectConsumer(consumer).catch((err) => {
if (err.isOperational && err.message.includes('BasicConsume; 404')) {
// Ignore errors caused by queue not declared. In
// those cases the connection will reconnect and
// then consumers reestablished. The full reconnect
// might be avoided if we assert the queue again
// before starting to consume.
return;
}
throw err;
});
return;
}
consumer.onMessage(msg);
},
options
);
consumer.consumerTag = consumerTag;
}

private async _reconnectConsumer(consumer: Consumer): Promise<void> {
if (!this._consumers.includes(consumer)) {
// Intentionally canceled
return;
}
await this._consume(consumer);
}

/**
* Cancel all consumers
*/
async cancelAll(): Promise<void> {
const consumers = this._consumers;
this._consumers = [];
if (!this._channel) {
return;
}

const channel = this._channel;
await Promise.all(
consumers.reduce<any[]>((acc, consumer) => {
if (consumer.consumerTag) {
acc.push(channel.cancel(consumer.consumerTag));
}
return acc;
}, [])
);
}

/** Send an `ack` to the underlying channel. */
ack(message: amqplib.Message, allUpTo?: boolean): void {
this._channel && this._channel.ack(message, allUpTo);
Expand Down
8 changes: 7 additions & 1 deletion src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,15 @@ export function connect(
urls: ConnectionUrl | ConnectionUrl[] | undefined | null,
options?: AmqpConnectionManagerOptions
): IAmqpConnectionManager {
return new AmqpConnectionManager(urls, options);
const conn = new AmqpConnectionManager(urls, options);
conn.connect().catch(() => {
/* noop */
});
return conn;
}

export { AmqpConnectionManager as AmqpConnectionManagerClass };

const amqp = { connect };

export default amqp;
Loading

0 comments on commit 449f5db

Please sign in to comment.