From 9d5ccc34612497a7dba5b99e1ab7a80fe5a67a34 Mon Sep 17 00:00:00 2001 From: Alex Robson Date: Sun, 18 Feb 2018 03:16:45 -0600 Subject: [PATCH] fix: (#104) improve poison message handling --- .gitignore | 1 + docs/receiving.md | 31 +++++++++ docs/topology.md | 8 +++ spec/behavior/ackBatch.spec.js | 2 +- spec/integration/bulkPublish.spec.js | 4 ++ spec/integration/poisonMessages.spec.js | 88 +++++++++++++++++++++++++ src/ackBatch.js | 62 ++++++++++++----- src/amqp/queue.js | 37 ++++++++--- src/index.js | 5 +- 9 files changed, 211 insertions(+), 27 deletions(-) create mode 100644 spec/integration/poisonMessages.spec.js diff --git a/.gitignore b/.gitignore index 1b2f88c..16af96c 100644 --- a/.gitignore +++ b/.gitignore @@ -12,3 +12,4 @@ tmp/ *.swo /Vagrantfile package-lock.json +debug.log diff --git a/docs/receiving.md b/docs/receiving.md index 94282e1..6a563f8 100644 --- a/docs/receiving.md +++ b/docs/receiving.md @@ -73,6 +73,7 @@ rabbit.ignoreHandlerErrors(); ``` ### Late-bound Error Handling + Provide a strategy for handling errors to multiple handles or attach an error handler after the fact. ```javascript @@ -91,6 +92,7 @@ handler.catch( function( err, msg ) { Failure to handle errors will result in silent failures and lost messages. ## Unhandled Messages + The default behavior is that any message received that doesn't have any elligible handlers will get `nack`'d and sent back to the queue immediately. > Caution: this can create churn on the client and server as the message will be redelivered indefinitely! @@ -98,6 +100,7 @@ The default behavior is that any message received that doesn't have any elligibl To avoid unhandled message churn, select one of the following mutually exclusive strategies: ### `rabbot.onUnhandled( handler )` + ```javascript rabbit.onUnhandled( function( message ) { // handle the message here @@ -105,21 +108,25 @@ rabbit.onUnhandled( function( message ) { ``` ### `rabbot.nackUnhandled()` - default + Sends all unhandled messages back to the queue. ```javascript rabbit.nackUnhandled(); ``` ### `rabbot.rejectUnhandled()` + Rejects unhandled messages so that will will _not_ be requeued. **DO NOT** use this unless there are dead letter exchanges for all queues. ```javascript rabbit.rejectUnhandled(); ``` ## Returned Messages + Unroutable messages that were published with `mandatory: true` will be returned. These messages cannot be ack/nack'ed. ### `rabbot.onReturned( handler )` + ```javascript rabbit.onReturned( function( message ) { // the returned message @@ -138,6 +145,7 @@ Starts a consumer on the queue specified. > Caution: using exclusive this way will allow your process to effectively "block" other processes from subscribing to a queue your process did not create. This can cause channel errors and closures on any other processes attempting to subscribe to the same queue. Make sure you know what you're doing. ## Message Format + The following structure shows and briefly explains the format of the message that is passed to the handle callback: ```javascript @@ -163,6 +171,7 @@ The following structure shows and briefly explains the format of the message tha content: { "type": "Buffer", "data": [ ... ] }, // raw buffer of message body body: , // this could be an object, string, etc - whatever was published type: "" // this also contains the type of the message published + quarantine: true|false // indicates the message arrived on a poison queue } ``` @@ -248,14 +257,17 @@ rabbit.addConnection( { ``` ## Custom Serializers + Serializers are objects with a `serialize` and `deserialize` method and get assigned to a specific content type. When a message is published or received with a specific `content-type`, rabbot will attempt to look up a serializer that matches. If one isn't found, an error will get thrown. > Note: you can over-write rabbot's default serializers but probably shouldn't unless you know what you're doing. ### `rabbot.serialize( object )` + The serialize function takes the message content and must return a Buffer object encoded as "utf8". ### `rabbot.deserialize( bytes, encoding )` + The deserialize function takes both the raw bytes and the encoding sent. While "utf8" is the only supported encoding rabbot produces, the encoding is passed in case the message was produced by another library using a different encoding. ### `rabbot.addSerializer( contentType, serializer )` @@ -272,3 +284,22 @@ rabbit.addSerializer( "application/yaml", { } } ); ``` + +## Failed Serialization + +Failed serialization is rejected without requeueing. If you want to catch this, you must: + + * assign a deadletter exchange (DLX) to your queues + * bind the deadletter queue (DLQ) to the DLX + * mark the DLQ with `poison: true` + * handle one of the topic forms: + * `original.topic.#` - regular and quarantined messages + * `original.topic.*` - regular and quarantined messages + * `original.topic.quarantined` - one topic's quarantined messages + * `#.quarantined` - all quarantined messages + +If your handler is getting both regular and quarantined messages, be sure to check the `quarantined` flag on the message to avoid trying to handle it like a usual message (since it will not be deserialized). + +### Rationale + +Without this approach, nacking a message body that cannot be processed causes the message to be continuously requeued and reprocessed indefinitely and can cause a queue to fill with garbage. diff --git a/docs/topology.md b/docs/topology.md index add748c..ea82610 100644 --- a/docs/topology.md +++ b/docs/topology.md @@ -89,6 +89,7 @@ Options is a hash that can contain the following: | **deadLetterRoutingKey** | string | the routing key to add to a dead-lettered message | **maxPriority** | 2^8 | the highest priority this queue supports | | | **unique** | `"hash", `"id", "consistent"` | creates a unique queue name by including the client id or hash in the name | | +| **poison** | boolean | indicates that this queue is specifically for poison / rejected messages| false | ### unique @@ -106,6 +107,13 @@ You can specify unique queues by their friendly-name when handling and subscribi const realQueueName = rabbot.getQueue('friendly-q-name').uniqueName; ``` +### poison + +If you want to capture instances where messages have no serializer or failed to deserialize properly, you can create a dead-letter exchange and bind it to a queue where you set `poison: true` so that in the event of further errors, rabbot will continue to deliver the message without deserialization. + + * `body` will be set to the raw Buffer + * `quarantine` will be set to `true` as well + ## `rabbot.bindExchange( sourceExchange, targetExchange, [routingKeys], [connectionName] )` Binds the target exchange to the source exchange. Messages flow from source to target. diff --git a/spec/behavior/ackBatch.spec.js b/spec/behavior/ackBatch.spec.js index e929702..15d28e0 100644 --- a/spec/behavior/ackBatch.spec.js +++ b/spec/behavior/ackBatch.spec.js @@ -173,13 +173,13 @@ describe('Ack Batching', function () { done(); }); + batch.listenForSignal(); batch.addMessage({ tag: 101, status: 'ack' }); batch.addMessage({ tag: 102, status: 'ack' }); batch.addMessage({ tag: 103, status: 'ack' }); batch.addMessage({ tag: 104, status: 'ack' }); batch.addMessage({ tag: 105, status: 'ack' }); batch.firstAck = 101; - batch.listenForSignal(); signal.publish('go', {}); }); diff --git a/spec/integration/bulkPublish.spec.js b/spec/integration/bulkPublish.spec.js index a7c3516..5e0f4ba 100644 --- a/spec/integration/bulkPublish.spec.js +++ b/spec/integration/bulkPublish.spec.js @@ -2,6 +2,10 @@ require('../setup'); const rabbit = require('../../src/index.js'); const config = require('./configuration'); +/* + Demonstrates how bulk publish API works + in both formats. +*/ describe('Bulk Publish', function () { var harness; diff --git a/spec/integration/poisonMessages.spec.js b/spec/integration/poisonMessages.spec.js new file mode 100644 index 0000000..8d5070a --- /dev/null +++ b/spec/integration/poisonMessages.spec.js @@ -0,0 +1,88 @@ +require('../setup'); +const rabbit = require('../../src/index.js'); +const config = require('./configuration'); + +/* + When garbage is in the queue from a publisher + rabbot should reject the unprocessable/busted + message instead of melting down the process +*/ +describe('Invalid Message Format', function () { + var harness; + + before(function (done) { + rabbit.configure({ + connection: config.connection, + exchanges: [ + { + name: 'rabbot-ex.fanout', + type: 'fanout', + autoDelete: true + }, + { + name: 'poison-ex', + type: 'fanout', + autoDelete: true + } + ], + queues: [ + { + name: 'rabbot-q.general1', + autoDelete: true, + subscribe: true, + deadletter: 'poison-ex' + }, + { + name: 'rabbot-q.poison', + noAck: true, + autoDelete: true, + subscribe: true, + poison: true + } + ], + bindings: [ + { + exchange: 'rabbot-ex.fanout', + target: 'rabbot-q.general1', + keys: [] + }, + { + exchange: 'poison-ex', + target: 'rabbot-q.poison', + keys: [] + } + ] + }).then(() => { + rabbit.publish('rabbot-ex.fanout', { + type: 'yuck', + routingKey: '', + body: 'lol{":parse this', + contentType: 'application/json' + }); + }); + + harness = harnessFactory(rabbit, done, 1); + harness.handle('yuck.quarantined'); + }); + + it('should have quarantined messages in unhandled', function () { + const results = harness.received.map((m) => ({ + body: m.body.toString(), + key: m.fields.routingKey, + quarantined: m.quarantined + })); + sortBy(results, 'body').should.eql( + [ + { + key: '', + body: 'lol{":parse this', + quarantined: true + } + ] + ); + }); + + after(function () { + return harness.clean('default'); + }); +}); diff --git a/src/ackBatch.js b/src/ackBatch.js index d8a0612..2e1a938 100644 --- a/src/ackBatch.js +++ b/src/ackBatch.js @@ -1,4 +1,3 @@ -const _ = require('lodash'); const postal = require('postal'); const Monologue = require('monologue.js'); const signal = postal.channel('rabbit.ack'); @@ -45,7 +44,7 @@ AckBatch.prototype._ackOrNackSequence = function () { const call = calls[ firstStatus ]; if (firstStatus === 'pending') { } else { - for (let i = 1; i < _.size(this.messages) - 1; i++) { + for (let i = 1; i < this.messages.length - 1; i++) { if (this.messages[ i ].status !== firstStatus) { break; } @@ -58,11 +57,30 @@ AckBatch.prototype._ackOrNackSequence = function () { }; AckBatch.prototype._firstByStatus = function (status) { - return _.find(this.messages, { status: status }); + for (var i = 0; i < this.messages.length; i++) { + if (this.messages[ i ].status === status) { + return this.messages[ i ]; + } + } + return undefined; +}; + +AckBatch.prototype._findIndex = function (status) { + for (var i = 0; i < this.messages.length; i++) { + if (this.messages[ i ].status === status) { + return i; + } + } + return -1; }; AckBatch.prototype._lastByStatus = function (status) { - return _.findLast(this.messages, { status: status }); + for (var i = this.messages.length - 1; i >= 0; i--) { + if (this.messages[ i ].status === status) { + return this.messages[ i ]; + } + } + return undefined; }; AckBatch.prototype._nack = function (tag, inclusive) { @@ -79,10 +97,10 @@ AckBatch.prototype._processBatch = function () { this.acking = this.acking !== undefined ? this.acking : false; if (!this.acking) { this.acking = true; - const hasPending = (_.findIndex(this.messages, { status: 'pending' }) >= 0); - const hasAck = this.firstAck; - const hasNack = this.firstNack; - const hasReject = this.firstReject; + const hasPending = (this._findIndex('pending') >= 0); + const hasAck = this.firstAck !== undefined; + const hasNack = this.firstNack !== undefined; + const hasReject = this.firstReject !== undefined; if (!hasPending && !hasNack && hasAck && !hasReject) { // just acks this._resolveAll('ack', 'firstAck', 'lastAck'); @@ -112,7 +130,7 @@ AckBatch.prototype._resolveAll = function (status, first, last) { this.emit('empty'); }.bind(this), 10); }.bind(this); - if (this.messages.length !== 0) { + if (this.messages.length > 0) { const lastTag = this._lastByStatus(status).tag; log.debug('%s ALL (%d) tags on %s up to %d - %s.', status, @@ -163,15 +181,25 @@ AckBatch.prototype._resolveTag = function (tag, operation, inclusive) { }; AckBatch.prototype._removeByStatus = function (status) { - return _.remove(this.messages, function (message) { - return message.status === status; - }); + this.messages = this.messages.reduce((acc, message) => { + if (message.status !== status) { + acc.push(message); + } + return acc; + }, []); }; AckBatch.prototype._removeUpToTag = function (tag) { - return _.remove(this.messages, function (message) { - return message.tag <= tag; - }); + let removed = 0; + this.messages = this.messages.reduce((acc, message) => { + if (message.tag > tag) { + acc.push(message); + } else { + removed++; + } + return acc; + }, []); + return removed; }; AckBatch.prototype.addMessage = function (message) { @@ -223,9 +251,9 @@ AckBatch.prototype.ignoreSignal = function () { AckBatch.prototype.listenForSignal = function () { if (!this.signalSubscription) { - this.signalSubscription = signal.subscribe('#', function () { + this.signalSubscription = signal.subscribe('#', () => { this._processBatch(); - }.bind(this)); + }); } }; diff --git a/src/amqp/queue.js b/src/amqp/queue.js index 5c9de09..bd9bc04 100644 --- a/src/amqp/queue.js +++ b/src/amqp/queue.js @@ -105,7 +105,7 @@ function getNoBatchOps (channel, raw, messages, noAck) { }; reject = function () { log.debug("Rejecting tag %d on '%s' - '%s'", raw.fields.deliveryTag, messages.name, messages.connectionName); - channel.reject({ fields: { deliveryTag: raw.fields.deliveryTag } }, false); + channel.reject({ fields: { deliveryTag: raw.fields.deliveryTag } }, false, false); }; } @@ -358,15 +358,38 @@ function subscribe (channelName, channel, topology, serializers, messages, optio var topic = parts.join('.'); var contentType = raw.properties.contentType || 'application/octet-stream'; var serializer = serializers[ contentType ]; + const track = () => { + if (shouldAck && shouldBatch) { + messages.addMessage(ops); + } + }; if (!serializer) { - log.error("Could not deserialize message id %s on queue '%s', connection '%s' - no serializer defined", - raw.properties.messageId, channelName, topology.connection.name); - ops.nack(); + if (options.poison) { + raw.body = raw.content; + raw.contentEncoding = raw.properties.contentEncoding; + raw.quarantined = true; + topic = `${topic}.quarantined` + } else { + log.error("Could not deserialize message id %s on queue '%s', connection '%s' - no serializer defined", + raw.properties.messageId, channelName, topology.connection.name); + track(); + ops.reject(); + return; + } } else { try { raw.body = serializer.deserialize(raw.content, raw.properties.contentEncoding); } catch (err) { - ops.nack(); + if (options.poison) { + raw.quarantined = true; + raw.body = raw.content; + raw.contentEncoding = raw.properties.contentEncoding; + topic = `${topic}.quarantined` + } else { + track(); + ops.reject(); + return; + } } } @@ -376,9 +399,7 @@ function subscribe (channelName, channel, topology, serializers, messages, optio if (data.activated) { handled = true; } - if (shouldAck && shouldBatch) { - messages.addMessage(ops); - } + track(); if (!handled) { unhandledLog.warn("Message of %s on queue '%s', connection '%s' was not processed by any registered handlers", diff --git a/src/index.js b/src/index.js index aa4da19..f01ee8d 100644 --- a/src/index.js +++ b/src/index.js @@ -33,7 +33,10 @@ const serializers = { return JSON.parse(bytes.toString(encoding || 'utf8')); }, serialize: (object) => { - return Buffer.from(JSON.stringify(object), 'utf8'); + const json = (typeof object === 'string') + ? object + : JSON.stringify(object); + return Buffer.from(json, 'utf8'); } }, 'application/octet-stream': {