Skip to content

Commit

Permalink
fix: (#109) randomize initial connection index based on pid or hostname
Browse files Browse the repository at this point in the history
  • Loading branch information
arobson committed Feb 17, 2018
1 parent fce8aaa commit 2d6f372
Showing 1 changed file with 46 additions and 30 deletions.
76 changes: 46 additions & 30 deletions src/amqp/connection.js
Original file line number Diff line number Diff line change
@@ -1,10 +1,12 @@
var amqp = require('amqplib');
var fs = require('fs');
var AmqpConnection = require('amqplib/lib/callback_model').CallbackModel;
var monad = require('./iomonad');
var log = require('../log')('rabbot.connection');
var info = require('../info');
var url = require('url');
const amqp = require('amqplib');
const fs = require('fs');
const AmqpConnection = require('amqplib/lib/callback_model').CallbackModel;
const monad = require('./iomonad');
const log = require('../log')('rabbot.connection');
const info = require('../info');
const url = require('url');
const crypto = require('crypto');
const os = require('os');

/* log
* `rabbot.amqp-connection`
Expand All @@ -18,11 +20,25 @@ var url = require('url');
*/

function getArgs (fn) {
var fnString = fn.toString();
var argList = /[(]([^)]*)[)]/.exec(fnString)[ 1 ].split(',');
const fnString = fn.toString();
const argList = /[(]([^)]*)[)]/.exec(fnString)[ 1 ].split(',');
return argList.map(String.prototype.trim);
}

function getInitialIndex (limit) {
const pid = process.pid;
let index = 0;
if (pid <= limit) {
const sha = crypto.createHash('sha1');
sha.write(os.hostname());
const buffer = sha.digest();
index = Math.abs(buffer.readInt32LE()) % limit;
} else {
index = pid % limit;
}
return index;
}

function getOption (opts, key, alt) {
if (opts.get && supportsDefaults(opts.get)) {
return opts.get(key, alt);
Expand Down Expand Up @@ -76,31 +92,31 @@ function trim (x) {
return x.trim(' ');
}

var Adapter = function (parameters) {
const Adapter = function (parameters) {
var uriOpts = parseUri(parameters.uri);
Object.assign(parameters, uriOpts);
var hosts = getOption(parameters, 'host');
var servers = getOption(parameters, 'server');
var brokers = getOption(parameters, 'RABBIT_BROKER');
var serverList = brokers || hosts || servers || 'localhost';
var portList = getOption(parameters, 'RABBIT_PORT') || getOption(parameters, 'port', 5672);
const hosts = getOption(parameters, 'host');
const servers = getOption(parameters, 'server');
const brokers = getOption(parameters, 'RABBIT_BROKER');
const serverList = brokers || hosts || servers || 'localhost';
const portList = getOption(parameters, 'RABBIT_PORT') || getOption(parameters, 'port', 5672);

this.name = parameters ? (parameters.name || 'default') : 'default';
this.connectionIndex = 0;
this.servers = split(serverList);
this.connectionIndex = getInitialIndex(this.servers.length);
this.ports = split(portList);
this.heartbeat = getOption(parameters, 'RABBIT_HEARTBEAT') || getOption(parameters, 'heartbeat', 30);
this.protocol = getOption(parameters, 'RABBIT_PROTOCOL') || getOption(parameters, 'protocol', 'amqp://');
this.pass = getOption(parameters, 'RABBIT_PASSWORD') || getOption(parameters, 'pass', 'guest');
this.user = getOption(parameters, 'RABBIT_USER') || getOption(parameters, 'user', 'guest');
this.vhost = getOption(parameters, 'RABBIT_VHOST') || getOption(parameters, 'vhost', '%2f');
var timeout = getOption(parameters, 'RABBIT_TIMEOUT') || getOption(parameters, 'timeout', 2000);
var certPath = getOption(parameters, 'RABBIT_CERT') || getOption(parameters, 'certPath');
var keyPath = getOption(parameters, 'RABBIT_KEY') || getOption(parameters, 'keyPath');
var caPaths = getOption(parameters, 'RABBIT_CA') || getOption(parameters, 'caPath');
var passphrase = getOption(parameters, 'RABBIT_PASSPHRASE') || getOption(parameters, 'passphrase');
var pfxPath = getOption(parameters, 'RABBIT_PFX') || getOption(parameters, 'pfxPath');
var useSSL = certPath || keyPath || passphrase || caPaths || pfxPath || parameters.useSSL;
const timeout = getOption(parameters, 'RABBIT_TIMEOUT') || getOption(parameters, 'timeout', 2000);
const certPath = getOption(parameters, 'RABBIT_CERT') || getOption(parameters, 'certPath');
const keyPath = getOption(parameters, 'RABBIT_KEY') || getOption(parameters, 'keyPath');
const caPaths = getOption(parameters, 'RABBIT_CA') || getOption(parameters, 'caPath');
const passphrase = getOption(parameters, 'RABBIT_PASSPHRASE') || getOption(parameters, 'passphrase');
const pfxPath = getOption(parameters, 'RABBIT_PFX') || getOption(parameters, 'pfxPath');
const useSSL = certPath || keyPath || passphrase || caPaths || pfxPath || parameters.useSSL;
this.options = { noDelay: true };
if (timeout) {
this.options.timeout = timeout;
Expand All @@ -118,7 +134,7 @@ var Adapter = function (parameters) {
this.options.pfx = fs.existsSync(pfxPath) ? fs.readFileSync(pfxPath) : pfxPath;
}
if (caPaths) {
var list = caPaths.split(',');
const list = caPaths.split(',');
this.options.ca = list.map((caPath) => {
fs.existsSync(caPath) ? fs.readFileSync(caPath) : caPath; // eslint-disable-line no-unused-expressions
});
Expand All @@ -136,7 +152,7 @@ var Adapter = function (parameters) {

Adapter.prototype.connect = function () {
return new Promise(function (resolve, reject) {
var attempted = [];
const attempted = [];
var attempt;
attempt = function () {
var nextUri = this.getNextUri();
Expand Down Expand Up @@ -178,9 +194,9 @@ Adapter.prototype.bumpIndex = function () {
};

Adapter.prototype.getNextUri = function () {
var server = this.getNext(this.servers);
var port = this.getNext(this.ports);
var uri = getUri(this.protocol, this.user, escape(this.pass), server, port, this.vhost, this.heartbeat);
const server = this.getNext(this.servers);
const port = this.getNext(this.ports);
const uri = getUri(this.protocol, this.user, escape(this.pass), server, port, this.vhost, this.heartbeat);
return uri;
};

Expand All @@ -193,7 +209,7 @@ Adapter.prototype.getNext = function (list) {
};

module.exports = function (options) {
var close = function (connection) {
const close = function (connection) {
connection.close()
.then(null, function (err) {
// for some reason calling close always gets a rejected promise
Expand All @@ -202,6 +218,6 @@ module.exports = function (options) {
log.debug(`Error was reported during close of connection '${options.name}' - '${err}'`);
});
};
var adapter = new Adapter(options);
const adapter = new Adapter(options);
return monad(options, 'connection', adapter.connect.bind(adapter), AmqpConnection, close);
};

0 comments on commit 2d6f372

Please sign in to comment.