Skip to content

Commit

Permalink
Merge branch 'release/2.2.0'
Browse files Browse the repository at this point in the history
  • Loading branch information
nfantone committed Jul 6, 2017
2 parents d260f38 + 22227cb commit 9abdb51
Show file tree
Hide file tree
Showing 8 changed files with 80 additions and 43 deletions.
16 changes: 16 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -147,6 +147,22 @@ require('seneca')()
});
```

You may also pass in additional options for the `channel#publish` and `channel#consume` methods of [amqplib][3] under `publish` and `consume`, respectively.

```js
require('seneca')()
.use('seneca-amqp-transport')
.client({
type: 'amqp',
hostname: 'rabbitmq.host',
publish: {
persistent: true
}
});
```

> Read the offical [amqplib][3] docs for a list of available options for [`publish`](http://www.squaremobius.net/amqp.node/channel_api.html#channel_publish) and [`consume`](http://www.squaremobius.net/amqp.node/channel_api.html#channel_consume).
### Socket options
Additionally, you may pass in options to the `amqp.connect` method of [amqplib][3] as documented in [its API reference][4], using the `socketOptions` parameter.

Expand Down
2 changes: 1 addition & 1 deletion lib/client/client-factory.js
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ function createClient(seneca, { ch, queue, exchange, options = {} }) {
const outmsg = utils.prepare_request(seneca, args, done);
const outstr = utils.stringifyJSON(seneca, `client-${options.type}`, outmsg);
const topic = amqputil.resolveClientTopic(args);
return pub.publish(outstr, exchange, topic);
return pub.publish(outstr, exchange, topic, options.publish);
}

function callback(spec, topic, sendDone) {
Expand Down
12 changes: 6 additions & 6 deletions lib/client/publisher.js
Original file line number Diff line number Diff line change
Expand Up @@ -18,22 +18,22 @@ module.exports = createPublisher;
*/
const JSON_CONTENT_TYPE = 'application/json';

function createPublisher(ch, { replyQueue, replyHandler, correlationId } = {}) {
function createPublisher(ch, {
replyQueue, replyHandler, correlationId = uuid.v4()
} = {}) {
if (!isObject(ch)) {
throw new TypeError('Channel parameter `ch` must be provided (got: [' +
typeof ch + '])');
}

correlationId = correlationId || uuid.v4();
const handleReply = isFunction(replyHandler) ? replyHandler
: Function.prototype;

function publish(message, exchange, rk) {
const opts = {
function publish(message, exchange, rk, options) {
const opts = Object.assign({}, options, {
replyTo: replyQueue,
contentType: JSON_CONTENT_TYPE,
correlationId: correlationId
};
});
return ch.publish(exchange, rk, Buffer.from(message), opts);
}

Expand Down
2 changes: 1 addition & 1 deletion lib/listener/consumer.js
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ function createConsumer(ch, { queue, messageHandler } = {}) {
}

const Consumer = {
consume: (q) => ch.consume(q || queue, onMessage)
consume: (q, options) => ch.consume(q || queue, onMessage, options)
};

return Object.create(Consumer);
Expand Down
4 changes: 2 additions & 2 deletions lib/listener/listener-factory.js
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ module.exports = createListener;
* @param {Object} options.options General plugin's options.
* @return {Listener} A ready AMQP RPC listener.
*/
function createListener(seneca, { ch, queue, options }) {
function createListener(seneca, { ch, queue, options = {} }) {
const utils = seneca.export('transport/utils');

function handleMessage(message, replyWith) {
Expand Down Expand Up @@ -62,7 +62,7 @@ function createListener(seneca, { ch, queue, options }) {
* messages.
*/
listen() {
return consumer.consume(queue)
return consumer.consume(queue, options.consume)
.then(() => {
this.started = true;
return this;
Expand Down
14 changes: 7 additions & 7 deletions test/lib/client/client-factory.test.js
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ describe('On client-factory module', function() {
});

it('should create a Client object with a `start` method', function() {
var client = Client(seneca, options);
const client = Client(seneca, options);
client.should.be.an('object');
client.should.have.property('start').that.is.a('function');
});
Expand All @@ -62,12 +62,12 @@ describe('On client-factory module', function() {
it('should make a new Seneca client', sinon.test(function(done) {
// Create seneca.export('transport/utils') stub
// and spy on utils#make_client function
var makeClient = this.spy(transportUtils, 'make_client');
const makeClient = this.spy(transportUtils, 'make_client');
this.stub(seneca, 'export')
.withArgs('transport/utils').returns(transportUtils);

var callback = Function.prototype;
var client = Client(seneca, options);
const callback = Function.prototype;
const client = Client(seneca, options);
client.start(callback)
.then(() => {
makeClient.should.have.been.calledOnce();
Expand Down Expand Up @@ -107,9 +107,9 @@ describe('On client-factory module', function() {
.withArgs('transport/utils').returns(transportUtils);

// Spy on `channel#publish()` method
var publish = this.spy(options.ch, 'publish');
const publish = this.spy(options.ch, 'publish');

var client = Client(seneca, options);
const client = Client(seneca, options);
client.start(Function.prototype)
.then(() => publish.should.have.been.calledOnce())
.asCallback(done);
Expand Down Expand Up @@ -146,7 +146,7 @@ describe('On client-factory module', function() {
let opts = Object.assign({}, options);
opts.options.correlationId = reply.properties.correlationId;

var client = Client(seneca, opts);
const client = Client(seneca, opts);
client.start(Function.prototype)
.then(() => handleResponse.should.have.been.calledOnce())
.asCallback(done);
Expand Down
40 changes: 26 additions & 14 deletions test/lib/client/publisher.test.js
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ describe('On publisher module', function() {
});

it('should create a new Publisher', function() {
var pub = Publisher({});
const pub = Publisher({});
pub.should.be.an('object');
pub.should.have.property('publish').that.is.a('function');
pub.should.have.property('awaitReply').that.is.a('function');
Expand Down Expand Up @@ -52,7 +52,7 @@ describe('On publisher module', function() {
const rk = 'foo.bar';

it('should publish a valid message to the channel', function() {
var pub = Publisher(channel);
const pub = Publisher(channel);
pub.publish(message, EXCHANGE, rk);

channel.publish.should.have.been.calledOnce();
Expand All @@ -61,15 +61,15 @@ describe('On publisher module', function() {
});

it('should publish to the given exchange and routing key', function() {
var pub = Publisher(channel);
const pub = Publisher(channel);
pub.publish(message, EXCHANGE, rk);

channel.publish.should.have.been.calledOnce();
channel.publish.should.have.been.calledWith(EXCHANGE, rk);
});

it('should set the `replyTo` option', function(done) {
var pub = Publisher(channel, { replyQueue: 'reply.queue' });
const pub = Publisher(channel, { replyQueue: 'reply.queue' });
pub.publish(message, EXCHANGE, rk)
.then(function() {
channel.publish.should.have.been.calledOnce();
Expand All @@ -81,7 +81,7 @@ describe('On publisher module', function() {
});

it('should set the `correlationId` option', function(done) {
var pub = Publisher(channel, { correlationId: CORRELATION_ID });
const pub = Publisher(channel, { correlationId: CORRELATION_ID });
pub.publish(message, EXCHANGE, rk)
.then(function() {
channel.publish.should.have.been.calledOnce();
Expand All @@ -91,6 +91,18 @@ describe('On publisher module', function() {
})
.asCallback(done);
});

it('should set channel#publish options', function(done) {
const pub = Publisher(channel, { correlationId: CORRELATION_ID });
pub.publish(message, EXCHANGE, rk, { persistent: true })
.then(function() {
channel.publish.should.have.been.calledOnce();
channel.publish.should.have.been.calledWith(sinon.match.string,
sinon.match.string, sinon.match.defined,
sinon.match.has('persistent', true));
})
.asCallback(done);
});
});

describe('the awaitReply() method', function() {
Expand All @@ -109,12 +121,12 @@ describe('On publisher module', function() {
});

it('should return a Promise', function() {
var pub = Publisher(channel);
const pub = Publisher(channel);
pub.awaitReply().should.be.instanceof(Promise);
});

it('should await for reply messages from the channel', function(done) {
var pub = Publisher(channel, { replyQueue: 'reply.queue' });
const pub = Publisher(channel, { replyQueue: 'reply.queue' });
pub.awaitReply()
.then(function() {
channel.consume.should.have.been.calledOnce();
Expand Down Expand Up @@ -148,8 +160,8 @@ describe('On publisher module', function() {
});

it('should call the `repyHandler` callback with a message', function() {
var replyHandler = sinon.spy();
var pub = Publisher(channel, {
const replyHandler = sinon.spy();
const pub = Publisher(channel, {
replyQueue: 'reply.queue',
replyHandler,
correlationId: CORRELATION_ID
Expand All @@ -161,8 +173,8 @@ describe('On publisher module', function() {
});

it('should ignore messages if `correlationId` does not match', function() {
var replyHandler = sinon.spy();
var pub = Publisher(channel, {
const replyHandler = sinon.spy();
const pub = Publisher(channel, {
replyQueue: 'reply.queue',
replyHandler,
correlationId: 'foo_' + CORRELATION_ID
Expand All @@ -173,7 +185,7 @@ describe('On publisher module', function() {
});

it('should handle reply messages with no content', function() {
var noContentChannel = {
const noContentChannel = {
consume: (queue, cb) => cb({ properties: reply.properties })
};

Expand All @@ -183,8 +195,8 @@ describe('On publisher module', function() {
properties: reply.properties
}));

var replyHandler = sinon.spy();
var pub = Publisher(noContentChannel, {
const replyHandler = sinon.spy();
const pub = Publisher(noContentChannel, {
replyQueue: 'reply.queue',
replyHandler,
correlationId: CORRELATION_ID
Expand Down
33 changes: 21 additions & 12 deletions test/lib/listener/consumer.test.js
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ describe('On consumer module', function() {
});

it('should create a new Consumer', function() {
var consumer = Consumer({});
const consumer = Consumer({});
consumer.should.be.an('object');
consumer.should.have.property('consume').that.is.a('function');
});
Expand All @@ -47,12 +47,21 @@ describe('On consumer module', function() {
});

it('should return a Promise', function() {
var consumer = Consumer(channel);
const consumer = Consumer(channel);
consumer.consume().should.be.instanceof(Promise);
});

it('should start consuming using the given options', function() {
const consumer = Consumer(channel);
consumer.consume(QUEUE, { noAck: true });

channel.consume.should.have.been.calledOnce();
channel.consume.should.have.been.calledWith(QUEUE,
sinon.match.func, sinon.match.has('noAck', true));
});

it('should start consuming from given queue on the channel', function() {
var consumer = Consumer(channel);
const consumer = Consumer(channel);
consumer.consume(QUEUE);

channel.consume.should.have.been.calledOnce();
Expand All @@ -61,7 +70,7 @@ describe('On consumer module', function() {

it('should consume from the queue given at creation time if not provided',
function() {
var consumer = Consumer(channel, {
const consumer = Consumer(channel, {
queue: QUEUE
});
consumer.consume();
Expand All @@ -88,8 +97,8 @@ describe('On consumer module', function() {
.then(() => handler(message))
};

var messageHandler = sinon.spy();
var consumer = Consumer(channel, { messageHandler });
const messageHandler = sinon.spy();
const consumer = Consumer(channel, { messageHandler });
consumer.consume()
.then(function() {
messageHandler.should.have.been.calledOnce();
Expand Down Expand Up @@ -119,8 +128,8 @@ describe('On consumer module', function() {
throw new Error();
};

var nack = sinon.spy(channel, 'nack');
var consumer = Consumer(channel, { messageHandler });
const nack = sinon.spy(channel, 'nack');
const consumer = Consumer(channel, { messageHandler });
consumer.consume()
.then(function() {
nack.should.have.been.calledOnce();
Expand All @@ -146,8 +155,8 @@ describe('On consumer module', function() {
// Create spies for channel methods
sinon.spy(channel, 'nack');

var messageHandler = sinon.spy();
var consumer = Consumer(channel, { messageHandler });
const messageHandler = sinon.spy();
const consumer = Consumer(channel, { messageHandler });
consumer.consume()
.then(function() {
messageHandler.should.not.have.been.called();
Expand All @@ -174,8 +183,8 @@ describe('On consumer module', function() {
// Create spies for channel methods
sinon.spy(channel, 'nack');

var messageHandler = sinon.spy();
var consumer = Consumer(channel, { messageHandler });
const messageHandler = sinon.spy();
const consumer = Consumer(channel, { messageHandler });
consumer.consume()
.then(function() {
messageHandler.should.not.have.been.called();
Expand Down

0 comments on commit 9abdb51

Please sign in to comment.