Skip to content

Commit

Permalink
fix: (#104) improve poison message handling
Browse files Browse the repository at this point in the history
  • Loading branch information
arobson committed Feb 18, 2018
1 parent 3233e0d commit 9d5ccc3
Show file tree
Hide file tree
Showing 9 changed files with 211 additions and 27 deletions.
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -12,3 +12,4 @@ tmp/
*.swo
/Vagrantfile
package-lock.json
debug.log
31 changes: 31 additions & 0 deletions docs/receiving.md
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,7 @@ rabbit.ignoreHandlerErrors();
```

### Late-bound Error Handling

Provide a strategy for handling errors to multiple handles or attach an error handler after the fact.

```javascript
Expand All @@ -91,35 +92,41 @@ handler.catch( function( err, msg ) {
Failure to handle errors will result in silent failures and lost messages.

## Unhandled Messages

The default behavior is that any message received that doesn't have any elligible handlers will get `nack`'d and sent back to the queue immediately.

> Caution: this can create churn on the client and server as the message will be redelivered indefinitely!
To avoid unhandled message churn, select one of the following mutually exclusive strategies:

### `rabbot.onUnhandled( handler )`

```javascript
rabbit.onUnhandled( function( message ) {
// handle the message here
} );
```

### `rabbot.nackUnhandled()` - default

Sends all unhandled messages back to the queue.
```javascript
rabbit.nackUnhandled();
```

### `rabbot.rejectUnhandled()`

Rejects unhandled messages so that will will _not_ be requeued. **DO NOT** use this unless there are dead letter exchanges for all queues.
```javascript
rabbit.rejectUnhandled();
```

## Returned Messages

Unroutable messages that were published with `mandatory: true` will be returned. These messages cannot be ack/nack'ed.

### `rabbot.onReturned( handler )`

```javascript
rabbit.onReturned( function( message ) {
// the returned message
Expand All @@ -138,6 +145,7 @@ Starts a consumer on the queue specified.
> Caution: using exclusive this way will allow your process to effectively "block" other processes from subscribing to a queue your process did not create. This can cause channel errors and closures on any other processes attempting to subscribe to the same queue. Make sure you know what you're doing.
## Message Format

The following structure shows and briefly explains the format of the message that is passed to the handle callback:

```javascript
Expand All @@ -163,6 +171,7 @@ The following structure shows and briefly explains the format of the message tha
content: { "type": "Buffer", "data": [ ... ] }, // raw buffer of message body
body: , // this could be an object, string, etc - whatever was published
type: "" // this also contains the type of the message published
quarantine: true|false // indicates the message arrived on a poison queue
}
```

Expand Down Expand Up @@ -248,14 +257,17 @@ rabbit.addConnection( {
```

## Custom Serializers

Serializers are objects with a `serialize` and `deserialize` method and get assigned to a specific content type. When a message is published or received with a specific `content-type`, rabbot will attempt to look up a serializer that matches. If one isn't found, an error will get thrown.

> Note: you can over-write rabbot's default serializers but probably shouldn't unless you know what you're doing.
### `rabbot.serialize( object )`

The serialize function takes the message content and must return a Buffer object encoded as "utf8".

### `rabbot.deserialize( bytes, encoding )`

The deserialize function takes both the raw bytes and the encoding sent. While "utf8" is the only supported encoding rabbot produces, the encoding is passed in case the message was produced by another library using a different encoding.

### `rabbot.addSerializer( contentType, serializer )`
Expand All @@ -272,3 +284,22 @@ rabbit.addSerializer( "application/yaml", {
}
} );
```

## Failed Serialization

Failed serialization is rejected without requeueing. If you want to catch this, you must:

* assign a deadletter exchange (DLX) to your queues
* bind the deadletter queue (DLQ) to the DLX
* mark the DLQ with `poison: true`
* handle one of the topic forms:
* `original.topic.#` - regular and quarantined messages
* `original.topic.*` - regular and quarantined messages
* `original.topic.quarantined` - one topic's quarantined messages
* `#.quarantined` - all quarantined messages

If your handler is getting both regular and quarantined messages, be sure to check the `quarantined` flag on the message to avoid trying to handle it like a usual message (since it will not be deserialized).

### Rationale

Without this approach, nacking a message body that cannot be processed causes the message to be continuously requeued and reprocessed indefinitely and can cause a queue to fill with garbage.
8 changes: 8 additions & 0 deletions docs/topology.md
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,7 @@ Options is a hash that can contain the following:
| **deadLetterRoutingKey** | string | the routing key to add to a dead-lettered message
| **maxPriority** | 2^8 | the highest priority this queue supports | |
| **unique** | `"hash", `"id", "consistent"` | creates a unique queue name by including the client id or hash in the name | |
| **poison** | boolean | indicates that this queue is specifically for poison / rejected messages| false |

### unique

Expand All @@ -106,6 +107,13 @@ You can specify unique queues by their friendly-name when handling and subscribi
const realQueueName = rabbot.getQueue('friendly-q-name').uniqueName;
```

### poison

If you want to capture instances where messages have no serializer or failed to deserialize properly, you can create a dead-letter exchange and bind it to a queue where you set `poison: true` so that in the event of further errors, rabbot will continue to deliver the message without deserialization.

* `body` will be set to the raw Buffer
* `quarantine` will be set to `true` as well

## `rabbot.bindExchange( sourceExchange, targetExchange, [routingKeys], [connectionName] )`

Binds the target exchange to the source exchange. Messages flow from source to target.
Expand Down
2 changes: 1 addition & 1 deletion spec/behavior/ackBatch.spec.js
Original file line number Diff line number Diff line change
Expand Up @@ -173,13 +173,13 @@ describe('Ack Batching', function () {
done();
});

batch.listenForSignal();
batch.addMessage({ tag: 101, status: 'ack' });
batch.addMessage({ tag: 102, status: 'ack' });
batch.addMessage({ tag: 103, status: 'ack' });
batch.addMessage({ tag: 104, status: 'ack' });
batch.addMessage({ tag: 105, status: 'ack' });
batch.firstAck = 101;
batch.listenForSignal();
signal.publish('go', {});
});

Expand Down
4 changes: 4 additions & 0 deletions spec/integration/bulkPublish.spec.js
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,10 @@ require('../setup');
const rabbit = require('../../src/index.js');
const config = require('./configuration');

/*
Demonstrates how bulk publish API works
in both formats.
*/
describe('Bulk Publish', function () {
var harness;

Expand Down
88 changes: 88 additions & 0 deletions spec/integration/poisonMessages.spec.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,88 @@
require('../setup');
const rabbit = require('../../src/index.js');
const config = require('./configuration');

/*
When garbage is in the queue from a publisher
rabbot should reject the unprocessable/busted
message instead of melting down the process
*/
describe('Invalid Message Format', function () {
var harness;

before(function (done) {
rabbit.configure({
connection: config.connection,
exchanges: [
{
name: 'rabbot-ex.fanout',
type: 'fanout',
autoDelete: true
},
{
name: 'poison-ex',
type: 'fanout',
autoDelete: true
}
],
queues: [
{
name: 'rabbot-q.general1',
autoDelete: true,
subscribe: true,
deadletter: 'poison-ex'
},
{
name: 'rabbot-q.poison',
noAck: true,
autoDelete: true,
subscribe: true,
poison: true
}
],
bindings: [
{
exchange: 'rabbot-ex.fanout',
target: 'rabbot-q.general1',
keys: []
},
{
exchange: 'poison-ex',
target: 'rabbot-q.poison',
keys: []
}
]
}).then(() => {
rabbit.publish('rabbot-ex.fanout', {
type: 'yuck',
routingKey: '',
body: 'lol{":parse this',
contentType: 'application/json'
});
});

harness = harnessFactory(rabbit, done, 1);
harness.handle('yuck.quarantined');
});

it('should have quarantined messages in unhandled', function () {
const results = harness.received.map((m) => ({
body: m.body.toString(),
key: m.fields.routingKey,
quarantined: m.quarantined
}));
sortBy(results, 'body').should.eql(
[
{
key: '',
body: 'lol{":parse this',
quarantined: true
}
]
);
});

after(function () {
return harness.clean('default');
});
});
62 changes: 45 additions & 17 deletions src/ackBatch.js
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
const _ = require('lodash');
const postal = require('postal');
const Monologue = require('monologue.js');
const signal = postal.channel('rabbit.ack');
Expand Down Expand Up @@ -45,7 +44,7 @@ AckBatch.prototype._ackOrNackSequence = function () {
const call = calls[ firstStatus ];
if (firstStatus === 'pending') {
} else {
for (let i = 1; i < _.size(this.messages) - 1; i++) {
for (let i = 1; i < this.messages.length - 1; i++) {
if (this.messages[ i ].status !== firstStatus) {
break;
}
Expand All @@ -58,11 +57,30 @@ AckBatch.prototype._ackOrNackSequence = function () {
};

AckBatch.prototype._firstByStatus = function (status) {
return _.find(this.messages, { status: status });
for (var i = 0; i < this.messages.length; i++) {
if (this.messages[ i ].status === status) {
return this.messages[ i ];
}
}
return undefined;
};

AckBatch.prototype._findIndex = function (status) {
for (var i = 0; i < this.messages.length; i++) {
if (this.messages[ i ].status === status) {
return i;
}
}
return -1;
};

AckBatch.prototype._lastByStatus = function (status) {
return _.findLast(this.messages, { status: status });
for (var i = this.messages.length - 1; i >= 0; i--) {
if (this.messages[ i ].status === status) {
return this.messages[ i ];
}
}
return undefined;
};

AckBatch.prototype._nack = function (tag, inclusive) {
Expand All @@ -79,10 +97,10 @@ AckBatch.prototype._processBatch = function () {
this.acking = this.acking !== undefined ? this.acking : false;
if (!this.acking) {
this.acking = true;
const hasPending = (_.findIndex(this.messages, { status: 'pending' }) >= 0);
const hasAck = this.firstAck;
const hasNack = this.firstNack;
const hasReject = this.firstReject;
const hasPending = (this._findIndex('pending') >= 0);
const hasAck = this.firstAck !== undefined;
const hasNack = this.firstNack !== undefined;
const hasReject = this.firstReject !== undefined;
if (!hasPending && !hasNack && hasAck && !hasReject) {
// just acks
this._resolveAll('ack', 'firstAck', 'lastAck');
Expand Down Expand Up @@ -112,7 +130,7 @@ AckBatch.prototype._resolveAll = function (status, first, last) {
this.emit('empty');
}.bind(this), 10);
}.bind(this);
if (this.messages.length !== 0) {
if (this.messages.length > 0) {
const lastTag = this._lastByStatus(status).tag;
log.debug('%s ALL (%d) tags on %s up to %d - %s.',
status,
Expand Down Expand Up @@ -163,15 +181,25 @@ AckBatch.prototype._resolveTag = function (tag, operation, inclusive) {
};

AckBatch.prototype._removeByStatus = function (status) {
return _.remove(this.messages, function (message) {
return message.status === status;
});
this.messages = this.messages.reduce((acc, message) => {
if (message.status !== status) {
acc.push(message);
}
return acc;
}, []);
};

AckBatch.prototype._removeUpToTag = function (tag) {
return _.remove(this.messages, function (message) {
return message.tag <= tag;
});
let removed = 0;
this.messages = this.messages.reduce((acc, message) => {
if (message.tag > tag) {
acc.push(message);
} else {
removed++;
}
return acc;
}, []);
return removed;
};

AckBatch.prototype.addMessage = function (message) {
Expand Down Expand Up @@ -223,9 +251,9 @@ AckBatch.prototype.ignoreSignal = function () {

AckBatch.prototype.listenForSignal = function () {
if (!this.signalSubscription) {
this.signalSubscription = signal.subscribe('#', function () {
this.signalSubscription = signal.subscribe('#', () => {
this._processBatch();
}.bind(this));
});
}
};

Expand Down
Loading

0 comments on commit 9d5ccc3

Please sign in to comment.