Skip to content

Commit

Permalink
Set channel prefetch count (#57)
Browse files Browse the repository at this point in the history
* Add option to set channel prefetch count

* Set plugin tag

* Correct linting on gulpfile

* Rename and re-organize default properties

* Remove config object from README.md

* Change client queue default name -> .act

* Update default options schema

* Make simple example friendlier
  • Loading branch information
nfantone authored Oct 12, 2016
1 parent 9c67852 commit e2310a2
Show file tree
Hide file tree
Showing 10 changed files with 120 additions and 113 deletions.
51 changes: 9 additions & 42 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ setInterval(function() {
```

#### How it works
A client creates an [exclusive][6], randomly named response queue (something similar to `seneca.res.x42jK0l`) and starts consuming from it - much like a listener would do. On every `act`, the client publishes the message to the `seneca.topic` exchange using a routing key built from the _pin that matches the act pattern_. In the simple example above, the _pattern_ is `role:create` which equals the only declared pin. With that, the routing key `role.create` is inferred. An AMQP `replyTo` header is set to the name of the random queue, in an [RPC-schema][7] fashion.
A client creates an [exclusive][6], randomly named response queue (something similar to `seneca.act.x42jK0l`) and starts consuming from it - much like a listener would do. On every `act`, the client publishes the message to the `seneca.topic` exchange using a routing key built from the _pin that matches the act pattern_. In the simple example above, the _pattern_ is `role:create` which equals the only declared pin. With that, the routing key `role.create` is inferred. An AMQP `replyTo` header is set to the name of the random queue, in an [RPC-schema][7] fashion.

> Manual queue naming on a client (using the `name` parameter as seen in the listener configuration) is not supported. Client queues are deleted once the client disconnect and re-created each time.
Expand All @@ -81,55 +81,21 @@ As you can see, pins play an important role on routing messages on the broker, s
In the example, the following things are declared:

- A **topic** exchange named `seneca.topic`.
- An exclusive **queue** with a random alphanumeric name (like `seneca.res.x42jK0l`).
- An exclusive **queue** with a random alphanumeric name (like `seneca.act.x42jK0l`).

> Clients _do not_ declare the queue of their listener counterpart. So, if the message does not reach its destination and is discarded, the `seneca` instance will fail with a `TIMEOUT` error on the client side.
## Options
The following object describes the available options for this transport. These are applicable to both clients and listeners.

```json
{
"amqp": {
"type": "amqp",
"url": "amqp://localhost",
"exchange": {
"type": "topic",
"name": "seneca.topic",
"options": {
"durable": true,
"autoDelete": false
}
},
"queues": {
"action": {
"prefix": "seneca",
"separator": ".",
"options": {
"durable": true
}
},
"response": {
"prefix": "seneca.res",
"separator": ".",
"options": {
"autoDelete": true,
"exclusive": true
}
}
}
}
}
```
The JSON object in [`defaults.json`](./defaults.json) describes the available options for this transport. These are applicable to both clients and listeners.

To override this settings, pass them to the plugin's `.use` declaration:

```javascript
require('seneca')()
.use('seneca-amqp-transport', {
amqp: {
queues: {
action: {
client: {
queues: {
options: {
durable: false
}
Expand Down Expand Up @@ -212,9 +178,10 @@ AMQP_URL='amqp://guest:guest@dev.rabbitmq.com:5672' node listener.js
cd examples
AMQP_URL='amqp://guest:guest@dev.rabbitmq.com:5672' node client.js
{"kind":"notice","notice":"seneca started","level":"info","when":1476216473818}
{ pid: 7756, id: 99 }
{ pid: 7756, id: 63 }
{ pid: 7756, id: 94 }
{ id: 93,
message: 'Hello World!',
from: { pid: 4150, file: 'examples/listener.js' },
now: 1476306009801 }
# ...
```
Expand Down
16 changes: 12 additions & 4 deletions defaults.json
Original file line number Diff line number Diff line change
Expand Up @@ -10,16 +10,24 @@
"autoDelete": false
}
},
"queues": {
"action": {
"listen": {
"channel": {
"prefetch": 1
},
"queues": {
"prefix": "seneca",
"separator": ".",
"options": {
"durable": true
}
}
},
"client": {
"channel": {
"prefetch": 1
},
"response": {
"prefix": "seneca.res",
"queues": {
"prefix": "seneca.act",
"separator": ".",
"options": {
"autoDelete": true,
Expand Down
5 changes: 3 additions & 2 deletions examples/client.js
Original file line number Diff line number Diff line change
Expand Up @@ -6,12 +6,13 @@ var client = require('seneca')()
.use('..')
.client({
type: 'amqp',
pin: 'role:create',
pin: 'cmd:salute',
url: process.env.AMQP_URL
});

setInterval(function() {
client.act('role:create', {
client.act('cmd:salute', {
name: 'World',
max: 100,
min: 25
}, (err, res) => {
Expand Down
21 changes: 12 additions & 9 deletions examples/listener.js
Original file line number Diff line number Diff line change
@@ -1,20 +1,23 @@
#!/usr/bin/env node

'use strict';

const Path = require('path');

require('seneca')()
.use('..')
.add('role:create', function(message, done) {
.add('cmd:salute', function(message, done) {
return done(null, {
pid: process.pid,
id: Math.floor(Math.random() * (message.max - message.min + 1)) + message.min
id: Math.floor(Math.random() * (message.max - message.min + 1)) + message.min,
message: `Hello ${message.name}!`,
from: {
pid: process.pid,
file: Path.relative(process.cwd(), __filename)
},
now: Date.now()
});
})
.listen({
type: 'amqp',
pin: 'role:create',
url: process.env.AMQP_URL,
socketOptions: {
foo: 'bar'
}
pin: 'cmd:salute',
url: process.env.AMQP_URL
});
7 changes: 3 additions & 4 deletions gulpfile.js
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
/* eslint-disable */

'use strict';

const $ = require('gulp-load-plugins')();
Expand All @@ -17,9 +16,9 @@ $.release.register(gulp);
*/
gulp.task('eslint', () =>
gulp.src([].concat(config.paths.src, config.paths.test))
.pipe($.eslint())
.pipe($.eslint.format())
.pipe($.if(config.eslint.failOnError, $.eslint.failAfterError()))
.pipe($.eslint())
.pipe($.eslint.format())
.pipe($.if(config.eslint.failOnError, $.eslint.failAfterError()))
);

/**
Expand Down
4 changes: 3 additions & 1 deletion index.js
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
'use strict';
/**
* Plugin that allows seneca listeners
* Plugin that allows Seneca listeners
* and clients to communicate over AMQP 0-9-1.
*
* @module seneca-amqp-transport
Expand All @@ -10,6 +10,7 @@ const ClientHook = require('./lib/client-hook');
const ListenHook = require('./lib/listen-hook');

const PLUGIN_NAME = 'amqp-transport';
const PLUGIN_TAG = require('./package.json').version;
const TRANSPORT_TYPE = 'amqp';

module.exports = function(opts) {
Expand All @@ -30,6 +31,7 @@ module.exports = function(opts) {
}, client.hook(options));

return {
tag: PLUGIN_TAG,
name: PLUGIN_NAME
};
};
8 changes: 4 additions & 4 deletions lib/client-hook.js
Original file line number Diff line number Diff line change
Expand Up @@ -15,13 +15,13 @@ module.exports =
.then((conn) => conn.createChannel())
.then((channel) => {
var ex = args.exchange;
var qres = args.queues.response;
var queueName = Amqputil.resolveClientQueue(qres);
channel.prefetch(1);
var qclient = args.client.queues;
var queueName = Amqputil.resolveClientQueue(qclient);
channel.prefetch(args.client.channel.prefetch);
return Promise.props({
channel,
exchange: channel.assertExchange(ex.name, ex.type, ex.options),
queue: channel.assertQueue(queueName, qres.options)
queue: channel.assertQueue(queueName, qclient.options)
}).then((transport) => {
return {
channel: transport.channel,
Expand Down
8 changes: 4 additions & 4 deletions lib/listen-hook.js
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ module.exports =
.then((conn) => conn.createChannel())
.then((channel) => {
var ex = args.exchange;
channel.prefetch(1);
channel.prefetch(args.listen.channel.prefetch);
return Promise.all([
channel,
channel.assertExchange(ex.name, ex.type, ex.options),
Expand All @@ -25,9 +25,9 @@ module.exports =
})
.spread((channel, exchange, pins) => {
var topics = Amqputil.resolveListenTopics(pins);
var qact = args.queues.action;
var queue = _.trim(args.name) || Amqputil.resolveListenQueue(pins, qact);
return channel.assertQueue(queue, qact.options)
var qlisten = args.listen.queues;
var queue = _.trim(args.name) || Amqputil.resolveListenQueue(pins, qlisten);
return channel.assertQueue(queue, qlisten.options)
.then((q) => Promise.map(topics,
(topic) => channel.bindQueue(q.queue, exchange.exchange, topic))
)
Expand Down
101 changes: 64 additions & 37 deletions test/defaults.schema.json
Original file line number Diff line number Diff line change
@@ -1,45 +1,20 @@
{

"title": "Default options schema",
"type": "object",
"required": ["amqp"],
"type": "object",
"properties": {
"amqp": {
"required": ["client", "exchange", "listen", "type", "url"],
"type": "object",
"required": ["type", "url", "exchange", "queues"],
"properties": {
"type": {
"type": "string"
},
"url": {
"type": "string",
"format": "uri"
},
"exchange": {
"type": "object",
"required": ["type","name","options"],
"properties": {
"type": "string",
"name": "string",
"options": {
"type": "object",
"required": ["durable", "autoDelete"],
"properties": {
"durable": {
"type": "boolean"
},
"autoDelete": {
"type": "boolean"
}
}
}
}
"type": "string"
},
"queues": {
"listen": {
"required": ["channel", "queues"],
"type": "object",
"required": ["action","response"],
"properties": {
"action": {
"queues": {
"required": ["options", "prefix", "separator"],
"type": "object",
"properties": {
"prefix": {
Expand All @@ -49,8 +24,8 @@
"type": "string"
},
"options": {
"type": "object",
"required": ["durable"],
"type": "object",
"properties": {
"durable": {
"type": "boolean"
Expand All @@ -59,7 +34,23 @@
}
}
},
"response": {
"channel": {
"required": ["prefetch"],
"type": "object",
"properties": {
"prefetch": {
"type": "integer"
}
}
}
}
},
"client": {
"required": ["channel", "queues"],
"type": "object",
"properties": {
"queues": {
"required": ["options", "prefix", "separator"],
"type": "object",
"properties": {
"prefix": {
Expand All @@ -69,18 +60,54 @@
"type": "string"
},
"options": {
"type": "object",
"required": ["autoDelete", "exclusive"],
"type": "object",
"properties": {
"autoDelete": {
"exclusive": {
"type": "boolean"
},
"exclusive": {
"autoDelete": {
"type": "boolean"
}
}
}
}
},
"channel": {
"required": ["prefetch"],
"type": "object",
"properties": {
"prefetch": {
"type": "integer"
}
}
}
}
},
"type": {
"type": "string"
},
"exchange": {
"required": ["name", "options", "type"],
"type": "object",
"properties": {
"type": {
"type": "string"
},
"name": {
"type": "string"
},
"options": {
"required": ["autoDelete", "durable"],
"type": "object",
"properties": {
"durable": {
"type": "boolean"
},
"autoDelete": {
"type": "boolean"
}
}
}
}
}
Expand Down
Loading

0 comments on commit e2310a2

Please sign in to comment.