Skip to content

Commit

Permalink
fix: (#78) make it possible to handle messages from a unique queue us…
Browse files Browse the repository at this point in the history
…ing the alias/friendly name
  • Loading branch information
arobson committed Feb 18, 2018
1 parent d2df5ea commit 50045a8
Show file tree
Hide file tree
Showing 5 changed files with 145 additions and 61 deletions.
6 changes: 6 additions & 0 deletions docs/topology.md
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,12 @@ The unique option has 3 different possible values, each with its own behavior:

> Note: the concept of queue recovery is that the same queue name will be generated in the event of a process restart. If using `hash` or `id`, the pid is used and a different queue name will be generated each time the process starts.
You can specify unique queues by their friendly-name when handling and subscribing. To get the actual assigned queue name (which you should not need), you can use:

```js
const realQueueName = rabbot.getQueue('friendly-q-name').uniqueName;
```

## `rabbot.bindExchange( sourceExchange, targetExchange, [routingKeys], [connectionName] )`

Binds the target exchange to the source exchange. Messages flow from source to target.
Expand Down
195 changes: 136 additions & 59 deletions spec/integration/queueSpecificHandle.spec.js
Original file line number Diff line number Diff line change
Expand Up @@ -8,71 +8,148 @@ const config = require('./configuration');
of the bound fanout queue's.
*/
describe('Queue Specific Handler', function () {
var harness;
describe('with standard queues', function () {
var harness;

before(function (done) {
rabbit.configure({
connection: config.connection,
exchanges: [
{
name: 'rabbot-ex.fanout',
type: 'fanout',
autoDelete: true
}
],
queues: [
{
name: 'rabbot-q.general1',
autoDelete: true,
subscribe: true
},
{
name: 'rabbot-q.general2',
noAck: true,
autoDelete: true,
subscribe: true
}
],
bindings: [
{
exchange: 'rabbot-ex.fanout',
target: 'rabbot-q.general1',
keys: []
},
{
exchange: 'rabbot-ex.fanout',
target: 'rabbot-q.general2',
keys: []
}
]
}).then(() => {
rabbit.publish('rabbot-ex.fanout', { type: '', routingKey: '', body: 'one' });
rabbit.publish('rabbot-ex.fanout', { type: '', routingKey: '', body: 'two' });
rabbit.publish('rabbot-ex.fanout', { type: '', routingKey: '', body: 'three' });
before(function (done) {
rabbit.configure({
connection: config.connection,
exchanges: [
{
name: 'rabbot-ex.fanout',
type: 'fanout',
autoDelete: true
}
],
queues: [
{
name: 'rabbot-q.general1',
autoDelete: true,
subscribe: true
},
{
name: 'rabbot-q.general2',
noAck: true,
autoDelete: true,
subscribe: true
}
],
bindings: [
{
exchange: 'rabbot-ex.fanout',
target: 'rabbot-q.general1',
keys: []
},
{
exchange: 'rabbot-ex.fanout',
target: 'rabbot-q.general2',
keys: []
}
]
}).then(() => {
rabbit.publish('rabbot-ex.fanout', { type: '', routingKey: '', body: 'one' });
rabbit.publish('rabbot-ex.fanout', { type: '', routingKey: '', body: 'two' });
rabbit.publish('rabbot-ex.fanout', { type: '', routingKey: '', body: 'three' });
});

harness = harnessFactory(rabbit, done, 6);
harness.handle('', undefined, 'rabbot-q.general1');
});

harness = harnessFactory(rabbit, done, 6);
harness.handle('', undefined, 'rabbot-q.general1');
});
it('should only handle messages for the specified queue', function () {
const results = harness.received.map((m) => ({
body: m.body,
queue: m.queue
}));
sortBy(results, 'body').should.eql(
[
{ body: 'one', queue: 'rabbot-q.general1' },
{ body: 'three', queue: 'rabbot-q.general1' },
{ body: 'two', queue: 'rabbot-q.general1' }
]);
});

it('should only handle messages for the specified queue', function () {
const results = harness.received.map((m) => ({
body: m.body,
queue: m.queue
}));
sortBy(results, 'body').should.eql(
[
{ body: 'one', queue: 'rabbot-q.general1' },
{ body: 'three', queue: 'rabbot-q.general1' },
{ body: 'two', queue: 'rabbot-q.general1' }
]);
});
it('should show the other messages as unhandled', function () {
harness.unhandled.length.should.eql(3);
});

it('should show the other messages as unhandled', function () {
harness.unhandled.length.should.eql(3);
after(function () {
return harness.clean('default');
});
});

after(function () {
return harness.clean('default');
describe('with unique queue', function () {
var harness;

before(function (done) {
rabbit.configure({
connection: config.connection,
exchanges: [
{
name: 'rabbot-ex.topic',
type: 'topic',
autoDelete: true
}
],
queues: [
{
name: 'rabbot-q.general1',
autoDelete: true,
unique: 'hash',
subscribe: true
},
{
name: 'rabbot-q.general2',
noAck: true,
autoDelete: true,
subscribe: true
}
],
bindings: [
{
exchange: 'rabbot-ex.topic',
target: 'rabbot-q.general1',
keys: ['a']
},
{
exchange: 'rabbot-ex.topic',
target: 'rabbot-q.general2',
keys: ['b']
}
]
}).then(() => {
rabbit.publish('rabbot-ex.topic', { type: 'a', body: 'one' });
rabbit.publish('rabbot-ex.topic', { type: 'b', body: 'two' });
rabbit.publish('rabbot-ex.topic', { type: 'a', body: 'three' });
rabbit.publish('rabbot-ex.topic', { type: 'b', body: 'four' });
rabbit.publish('rabbot-ex.topic', { type: 'a', body: 'five' });
rabbit.publish('rabbot-ex.topic', { type: 'b', body: 'six' });
});

harness = harnessFactory(rabbit, done, 6);
harness.handle('a', undefined, 'rabbot-q.general1');
});

it('should only handle messages for the specified queue', function () {
const uniqueName = rabbit.getQueue('rabbot-q.general1').uniqueName
const results = harness.received.map((m) => ({
body: m.body,
queue: m.queue
}));
sortBy(results, 'body').should.eql(
[
{ body: 'five', queue: uniqueName },
{ body: 'one', queue: uniqueName },
{ body: 'three', queue: uniqueName }
]);
});

it('should show the other messages as unhandled', function () {
harness.unhandled.length.should.eql(3);
});

after(function () {
return harness.clean('default');
});
});
});
2 changes: 1 addition & 1 deletion src/amqp/queue.js
Original file line number Diff line number Diff line change
Expand Up @@ -351,7 +351,7 @@ function subscribe (channelName, channel, topology, serializers, messages, optio
options.exclusive = true;
}
raw.queue = channelName;
var parts = [ channelName.replace(/[.]/g, '-') ];
var parts = [ options.name.replace(/[.]/g, '-') ];
if (raw.type) {
parts.push(raw.type);
}
Expand Down
2 changes: 1 addition & 1 deletion src/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -267,7 +267,7 @@ Broker.prototype.getExchange = function (name, connectionName = DEFAULT) {
};

Broker.prototype.getQueue = function (name, connectionName = DEFAULT) {
return this.connections[ connectionName ].channels[ 'queue:' + name ];
return this.connections[ connectionName ].channels[ `queue:${name}` ];
};

Broker.prototype.handle = function (messageType, handler, queueName, context) {
Expand Down
1 change: 1 addition & 0 deletions src/queueFsm.js
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ var Factory = function (options, connection, topology, serializers, queueFn) {

var Fsm = machina.Fsm.extend({
name: options.name,
uniqueName: options.uniqueName,
responseSubscriptions: {},
signalSubscription: undefined,
subscribed: false,
Expand Down

0 comments on commit 50045a8

Please sign in to comment.