Skip to content

Commit

Permalink
feat: (#87) add support for scatter-gather pattern via request
Browse files Browse the repository at this point in the history
  • Loading branch information
arobson committed Feb 18, 2018
1 parent 50045a8 commit db23b48
Show file tree
Hide file tree
Showing 4 changed files with 191 additions and 22 deletions.
101 changes: 90 additions & 11 deletions docs/publishing.md
Original file line number Diff line number Diff line change
Expand Up @@ -62,17 +62,96 @@ This works just like a publish except that the promise returned provides the res

> Note: the default replyTimeout will be double the publish timeout or 1 second if no publish timeout was ever specified.
```javascript
// when multiple responses are provided, all but the last will be passed to an optional progress callback.
// the last/only reply will always be provided to the .then callback
rabbit.request( "request.exchange", {
// see publish example to see options for the outgoing message
}, function ( reply ) {
// if multiple replies are provided, all but the last will be sent to this callback
} )
.then( function( final ) {
// the last message in a series OR the only reply will be sent to this callback
} );
Request provides for two ways to get multiple responses; one is to allow a single replier to stream a set of responses back and the other is to send a request to multiple potential responders and wait until a specific number comes back.

### Expecting A Singe Reply

```js
// request side
const parts = [];
rabbit.request('request.ex', {
type: 'request',
body: id
})
.then( reply => {
// done - do something with all the data?
reply.ack();
});

// receiver sides
rabbit.handle('request', (req) => {
req.reply(database.get(req.id));
});
```

### Expecting A Stream

`reply` takes an additional hash argument where you can set `more` to `true` to indicate there are more messages incoming as part of the reply.

In this case, the third argument to the `request` function will get every message **except** the last.

```js
// request side
const parts = [];
rabbit.request('request.ex', {
type: 'request',
body: id
},
reply => {
parts.push(part);
part.ack();
})
.then( final => {
// done - do something with all the data?
final.ack();
});

// receiver side
rabbit.handle('request', (req) => {
const stream = data.getById(req.body);
stream.on('data', data => {
req.reply(data, { more: true });
});
stream.on('end', () => {
req.reply({ body: 'done' });
});
stream.on('error', (err) => {
req.reply({ body: { error: true, detail: err.message });
});
});
```
### Scatter-Gather
In scatter-gather: the recipients don't know how many of them there are and don't have to be aware that they are participating in scatter-gather/race-conditions.
They just reply. The limit is applied on the requesting side by setting a `expects` property on the outgoing message to let rabbot how many messages to collect before stopping and considering the request satisfied.
Normally this is done with mutliple responders on the other side of a topic or fanout exchange.
> !IMPORTANT! - messages beyond the limit are treated as unhandled. You'll need to have an unhandled message strategy in place or at least understand how rabbot deals with them by default.
```js
// request side
const parts = [];
rabbit.request('request.ex', {
type: 'request',
body: id,
limit: 3 // will stop after 3 even if many more reply
},
reply => {
parts.push(part);
part.ack();
})
.then( final => {
// done - do something with all the data?
final.ack();
});

// receiver sides
rabbit.handle('request', (req) => {
req.reply(database.get(req.id));
});
```
## `rabbot.bulkPublish( set, [connectionName] )`
Expand Down
2 changes: 1 addition & 1 deletion spec/integration/queueSpecificHandle.spec.js
Original file line number Diff line number Diff line change
Expand Up @@ -131,7 +131,7 @@ describe('Queue Specific Handler', function () {
});

it('should only handle messages for the specified queue', function () {
const uniqueName = rabbit.getQueue('rabbot-q.general1').uniqueName
const uniqueName = rabbit.getQueue('rabbot-q.general1').uniqueName;
const results = harness.received.map((m) => ({
body: m.body,
queue: m.queue
Expand Down
97 changes: 90 additions & 7 deletions spec/integration/request.spec.js
Original file line number Diff line number Diff line change
Expand Up @@ -16,15 +16,55 @@ describe('Request & Response', function () {
],
queues: [
{
name: 'rabbot-q.request',
name: 'rabbot-q.request-1',
autoDelete: true,
subscribe: true
},
{
name: 'rabbot-q.request-2',
autoDelete: true,
subscribe: true
},
{
name: 'rabbot-q.request-3',
autoDelete: true,
subscribe: true
},
{
name: 'rabbot-q.request-4',
autoDelete: true,
subscribe: true
},
{
name: 'rabbot-q.request-5',
autoDelete: true,
subscribe: true
}
],
bindings: [
{
exchange: 'rabbot-ex.request',
target: 'rabbot-q.request',
target: 'rabbot-q.request-1',
keys: []
},
{
exchange: 'rabbot-ex.request',
target: 'rabbot-q.request-2',
keys: []
},
{
exchange: 'rabbot-ex.request',
target: 'rabbot-q.request-3',
keys: []
},
{
exchange: 'rabbot-ex.request',
target: 'rabbot-q.request-4',
keys: []
},
{
exchange: 'rabbot-ex.request',
target: 'rabbot-q.request-5',
keys: []
}
]
Expand All @@ -38,22 +78,22 @@ describe('Request & Response', function () {

before(function (done) {
this.timeout(3000);
harness = harnessFactory(rabbit, done, 9);
harness = harnessFactory(rabbit, done, 21);

harness.handle('polite', (q) => {
q.reply(':D');
});
}, 'rabbot-q.request-1');

harness.handle('rude', (q) => {
q.reply('>:@');
});
}, 'rabbot-q.request-1');

harness.handle('silly', (q) => {
q.reply('...', { more: true });
q.reply('...', { more: true });
q.reply('...', { more: true });
setTimeout(() => q.reply('...'), 10);
});
}, 'rabbot-q.request-1');

rabbit.request('rabbot-ex.request', { type: 'polite', body: 'how are you?' })
.then((response) => {
Expand Down Expand Up @@ -111,11 +151,54 @@ describe('Request & Response', function () {
});
});

describe('when performing scatter-gather', function () {
const gather = [];
before(function (done) {
harness = harnessFactory(rabbit, () => {}, 4);
let index = 0;
harness.handle('scatter', (q) => {
q.reply(`number: ${++index}`);
});

function onReply (msg) {
gather.push(msg);
msg.ack();
done();
}

rabbit.request(
'rabbot-ex.request',
{ type: 'scatter', body: 'whatever', expect: 3 },
(msg) => {
gather.push(msg);
msg.ack();
}
).then(
onReply
);
});

it('should have gathered desired replies', function () {
gather.length.should.equal(3);
});

it('should have ignored responses past limit', function () {
harness.unhandled.length.should.equal(2);
});

after(function () {
harness.clean();
});
});

describe('when the request times out', function () {
var timeoutError;
const timeout = 100;
before(function () {
return rabbit.request('rabbot-ex.request', { type: 'polite', body: 'how are you?', replyTimeout: timeout })
return rabbit.request(
'rabbot-ex.request',
{ type: 'polite', body: 'how are you?', replyTimeout: timeout }
)
.then(null, (err) => {
timeoutError = err;
});
Expand Down
13 changes: 10 additions & 3 deletions src/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -458,10 +458,17 @@ Broker.prototype.request = function (exchangeName, options = {}, notify, connect
subscription.unsubscribe();
reject(new Error('No reply received within the configured timeout of ' + replyTimeout + ' ms'));
}, replyTimeout);
const subscription = responses.subscribe(requestId, function (message) {
if (message.properties.headers[ 'sequence_end' ]) { // jshint ignore:line
const scatter = options.expect;
let remaining = options.expect;
const subscription = responses.subscribe(requestId, message => {
const end = scatter
? --remaining <= 0
: message.properties.headers[ 'sequence_end' ];
if (end) {
clearTimeout(timeout);
resolve(message);
if (!scatter || remaining === 0) {
resolve(message);
}
subscription.unsubscribe();
} else if (notify) {
notify(message);
Expand Down

0 comments on commit db23b48

Please sign in to comment.