diff --git a/.gitignore b/.gitignore index 1b2f88c..16af96c 100644 --- a/.gitignore +++ b/.gitignore @@ -12,3 +12,4 @@ tmp/ *.swo /Vagrantfile package-lock.json +debug.log diff --git a/docs/receiving.md b/docs/receiving.md index 94282e1..d1d43b8 100644 --- a/docs/receiving.md +++ b/docs/receiving.md @@ -73,6 +73,7 @@ rabbit.ignoreHandlerErrors(); ``` ### Late-bound Error Handling + Provide a strategy for handling errors to multiple handles or attach an error handler after the fact. ```javascript @@ -91,6 +92,7 @@ handler.catch( function( err, msg ) { Failure to handle errors will result in silent failures and lost messages. ## Unhandled Messages + The default behavior is that any message received that doesn't have any elligible handlers will get `nack`'d and sent back to the queue immediately. > Caution: this can create churn on the client and server as the message will be redelivered indefinitely! @@ -98,6 +100,7 @@ The default behavior is that any message received that doesn't have any elligibl To avoid unhandled message churn, select one of the following mutually exclusive strategies: ### `rabbot.onUnhandled( handler )` + ```javascript rabbit.onUnhandled( function( message ) { // handle the message here @@ -105,21 +108,25 @@ rabbit.onUnhandled( function( message ) { ``` ### `rabbot.nackUnhandled()` - default + Sends all unhandled messages back to the queue. ```javascript rabbit.nackUnhandled(); ``` ### `rabbot.rejectUnhandled()` + Rejects unhandled messages so that will will _not_ be requeued. **DO NOT** use this unless there are dead letter exchanges for all queues. ```javascript rabbit.rejectUnhandled(); ``` ## Returned Messages + Unroutable messages that were published with `mandatory: true` will be returned. These messages cannot be ack/nack'ed. ### `rabbot.onReturned( handler )` + ```javascript rabbit.onReturned( function( message ) { // the returned message @@ -138,6 +145,7 @@ Starts a consumer on the queue specified. > Caution: using exclusive this way will allow your process to effectively "block" other processes from subscribing to a queue your process did not create. This can cause channel errors and closures on any other processes attempting to subscribe to the same queue. Make sure you know what you're doing. ## Message Format + The following structure shows and briefly explains the format of the message that is passed to the handle callback: ```javascript @@ -163,6 +171,7 @@ The following structure shows and briefly explains the format of the message tha content: { "type": "Buffer", "data": [ ... ] }, // raw buffer of message body body: , // this could be an object, string, etc - whatever was published type: "" // this also contains the type of the message published + quarantine: true|false // indicates the message arrived on a poison queue } ``` @@ -248,14 +257,17 @@ rabbit.addConnection( { ``` ## Custom Serializers + Serializers are objects with a `serialize` and `deserialize` method and get assigned to a specific content type. When a message is published or received with a specific `content-type`, rabbot will attempt to look up a serializer that matches. If one isn't found, an error will get thrown. > Note: you can over-write rabbot's default serializers but probably shouldn't unless you know what you're doing. ### `rabbot.serialize( object )` + The serialize function takes the message content and must return a Buffer object encoded as "utf8". ### `rabbot.deserialize( bytes, encoding )` + The deserialize function takes both the raw bytes and the encoding sent. While "utf8" is the only supported encoding rabbot produces, the encoding is passed in case the message was produced by another library using a different encoding. ### `rabbot.addSerializer( contentType, serializer )` @@ -272,3 +284,13 @@ rabbit.addSerializer( "application/yaml", { } } ); ``` + +## Failed Serialization + +Failed serialization is rejected without requeueing. If you want to catch this, you must use a deadletter exchange bound to a queue and mark it as `poison: true`. Without this, you're on your own. + +### Rationale + +Without this approach, nacking a message body that cannot be processed causes the message to be continuously requeued and reprocessed indefinitely and can cause a queue to fill with garbage. + +You are required to set up your DLX and DLQ explicitly. Rabbot uses the `poison: true` setting to as a way to know that it's ok to stop trying to deserialize and in the event of an error, it will mark the message as `quarantine: true` to make handler logic simpler. diff --git a/docs/topology.md b/docs/topology.md index add748c..ea82610 100644 --- a/docs/topology.md +++ b/docs/topology.md @@ -89,6 +89,7 @@ Options is a hash that can contain the following: | **deadLetterRoutingKey** | string | the routing key to add to a dead-lettered message | **maxPriority** | 2^8 | the highest priority this queue supports | | | **unique** | `"hash", `"id", "consistent"` | creates a unique queue name by including the client id or hash in the name | | +| **poison** | boolean | indicates that this queue is specifically for poison / rejected messages| false | ### unique @@ -106,6 +107,13 @@ You can specify unique queues by their friendly-name when handling and subscribi const realQueueName = rabbot.getQueue('friendly-q-name').uniqueName; ``` +### poison + +If you want to capture instances where messages have no serializer or failed to deserialize properly, you can create a dead-letter exchange and bind it to a queue where you set `poison: true` so that in the event of further errors, rabbot will continue to deliver the message without deserialization. + + * `body` will be set to the raw Buffer + * `quarantine` will be set to `true` as well + ## `rabbot.bindExchange( sourceExchange, targetExchange, [routingKeys], [connectionName] )` Binds the target exchange to the source exchange. Messages flow from source to target. diff --git a/spec/behavior/ackBatch.spec.js b/spec/behavior/ackBatch.spec.js index e929702..15d28e0 100644 --- a/spec/behavior/ackBatch.spec.js +++ b/spec/behavior/ackBatch.spec.js @@ -173,13 +173,13 @@ describe('Ack Batching', function () { done(); }); + batch.listenForSignal(); batch.addMessage({ tag: 101, status: 'ack' }); batch.addMessage({ tag: 102, status: 'ack' }); batch.addMessage({ tag: 103, status: 'ack' }); batch.addMessage({ tag: 104, status: 'ack' }); batch.addMessage({ tag: 105, status: 'ack' }); batch.firstAck = 101; - batch.listenForSignal(); signal.publish('go', {}); }); diff --git a/spec/integration/bulkPublish.spec.js b/spec/integration/bulkPublish.spec.js index a7c3516..5e0f4ba 100644 --- a/spec/integration/bulkPublish.spec.js +++ b/spec/integration/bulkPublish.spec.js @@ -2,6 +2,10 @@ require('../setup'); const rabbit = require('../../src/index.js'); const config = require('./configuration'); +/* + Demonstrates how bulk publish API works + in both formats. +*/ describe('Bulk Publish', function () { var harness; diff --git a/spec/integration/poisonMessages.spec.js b/spec/integration/poisonMessages.spec.js new file mode 100644 index 0000000..8a4544d --- /dev/null +++ b/spec/integration/poisonMessages.spec.js @@ -0,0 +1,89 @@ +require('../setup'); +const rabbit = require('../../src/index.js'); +const config = require('./configuration'); + +/* + When garbage is in the queue from a publisher + rabbot should reject the unprocessable/busted + message instead of melting down the process +*/ +describe('Invalid Message Format', function () { + var harness; + + before(function (done) { + this.timeout(100000); + rabbit.configure({ + connection: config.connection, + exchanges: [ + { + name: 'rabbot-ex.fanout', + type: 'fanout', + autoDelete: true + }, + { + name: 'poison-ex', + type: 'fanout', + autoDelete: true + } + ], + queues: [ + { + name: 'rabbot-q.general1', + autoDelete: true, + subscribe: true, + deadletter: 'poison-ex' + }, + { + name: 'rabbot-q.poison', + noAck: true, + autoDelete: true, + subscribe: true, + poison: true + } + ], + bindings: [ + { + exchange: 'rabbot-ex.fanout', + target: 'rabbot-q.general1', + keys: [] + }, + { + exchange: 'poison-ex', + target: 'rabbot-q.poison', + keys: [] + } + ] + }).then(() => { + rabbit.publish('rabbot-ex.fanout', { + type: 'yuck', + routingKey: '', + body: 'lol{":parse this', + contentType: 'application/json' + }); + }); + + harness = harnessFactory(rabbit, done, 1); + harness.handle('yuck'); + }); + + it('should have quarantined messages in unhandled', function () { + const results = harness.received.map((m) => ({ + body: m.body.toString(), + key: m.fields.routingKey, + quarantined: m.quarantined + })); + sortBy(results, 'body').should.eql( + [ + { + key: '', + body: 'lol{":parse this', + quarantined: true + } + ] + ); + }); + + after(function () { + return harness.clean('default'); + }); +}); diff --git a/src/ackBatch.js b/src/ackBatch.js index d8a0612..2e1a938 100644 --- a/src/ackBatch.js +++ b/src/ackBatch.js @@ -1,4 +1,3 @@ -const _ = require('lodash'); const postal = require('postal'); const Monologue = require('monologue.js'); const signal = postal.channel('rabbit.ack'); @@ -45,7 +44,7 @@ AckBatch.prototype._ackOrNackSequence = function () { const call = calls[ firstStatus ]; if (firstStatus === 'pending') { } else { - for (let i = 1; i < _.size(this.messages) - 1; i++) { + for (let i = 1; i < this.messages.length - 1; i++) { if (this.messages[ i ].status !== firstStatus) { break; } @@ -58,11 +57,30 @@ AckBatch.prototype._ackOrNackSequence = function () { }; AckBatch.prototype._firstByStatus = function (status) { - return _.find(this.messages, { status: status }); + for (var i = 0; i < this.messages.length; i++) { + if (this.messages[ i ].status === status) { + return this.messages[ i ]; + } + } + return undefined; +}; + +AckBatch.prototype._findIndex = function (status) { + for (var i = 0; i < this.messages.length; i++) { + if (this.messages[ i ].status === status) { + return i; + } + } + return -1; }; AckBatch.prototype._lastByStatus = function (status) { - return _.findLast(this.messages, { status: status }); + for (var i = this.messages.length - 1; i >= 0; i--) { + if (this.messages[ i ].status === status) { + return this.messages[ i ]; + } + } + return undefined; }; AckBatch.prototype._nack = function (tag, inclusive) { @@ -79,10 +97,10 @@ AckBatch.prototype._processBatch = function () { this.acking = this.acking !== undefined ? this.acking : false; if (!this.acking) { this.acking = true; - const hasPending = (_.findIndex(this.messages, { status: 'pending' }) >= 0); - const hasAck = this.firstAck; - const hasNack = this.firstNack; - const hasReject = this.firstReject; + const hasPending = (this._findIndex('pending') >= 0); + const hasAck = this.firstAck !== undefined; + const hasNack = this.firstNack !== undefined; + const hasReject = this.firstReject !== undefined; if (!hasPending && !hasNack && hasAck && !hasReject) { // just acks this._resolveAll('ack', 'firstAck', 'lastAck'); @@ -112,7 +130,7 @@ AckBatch.prototype._resolveAll = function (status, first, last) { this.emit('empty'); }.bind(this), 10); }.bind(this); - if (this.messages.length !== 0) { + if (this.messages.length > 0) { const lastTag = this._lastByStatus(status).tag; log.debug('%s ALL (%d) tags on %s up to %d - %s.', status, @@ -163,15 +181,25 @@ AckBatch.prototype._resolveTag = function (tag, operation, inclusive) { }; AckBatch.prototype._removeByStatus = function (status) { - return _.remove(this.messages, function (message) { - return message.status === status; - }); + this.messages = this.messages.reduce((acc, message) => { + if (message.status !== status) { + acc.push(message); + } + return acc; + }, []); }; AckBatch.prototype._removeUpToTag = function (tag) { - return _.remove(this.messages, function (message) { - return message.tag <= tag; - }); + let removed = 0; + this.messages = this.messages.reduce((acc, message) => { + if (message.tag > tag) { + acc.push(message); + } else { + removed++; + } + return acc; + }, []); + return removed; }; AckBatch.prototype.addMessage = function (message) { @@ -223,9 +251,9 @@ AckBatch.prototype.ignoreSignal = function () { AckBatch.prototype.listenForSignal = function () { if (!this.signalSubscription) { - this.signalSubscription = signal.subscribe('#', function () { + this.signalSubscription = signal.subscribe('#', () => { this._processBatch(); - }.bind(this)); + }); } }; diff --git a/src/amqp/queue.js b/src/amqp/queue.js index 5c9de09..2b498f9 100644 --- a/src/amqp/queue.js +++ b/src/amqp/queue.js @@ -105,7 +105,7 @@ function getNoBatchOps (channel, raw, messages, noAck) { }; reject = function () { log.debug("Rejecting tag %d on '%s' - '%s'", raw.fields.deliveryTag, messages.name, messages.connectionName); - channel.reject({ fields: { deliveryTag: raw.fields.deliveryTag } }, false); + channel.reject({ fields: { deliveryTag: raw.fields.deliveryTag } }, false, false); }; } @@ -358,15 +358,36 @@ function subscribe (channelName, channel, topology, serializers, messages, optio var topic = parts.join('.'); var contentType = raw.properties.contentType || 'application/octet-stream'; var serializer = serializers[ contentType ]; + const track = () => { + if (shouldAck && shouldBatch) { + messages.addMessage(ops); + } + }; if (!serializer) { - log.error("Could not deserialize message id %s on queue '%s', connection '%s' - no serializer defined", - raw.properties.messageId, channelName, topology.connection.name); - ops.nack(); + if (options.poison) { + raw.body = raw.content; + raw.contentEncoding = raw.properties.contentEncoding; + raw.quarantined = true; + } else { + log.error("Could not deserialize message id %s on queue '%s', connection '%s' - no serializer defined", + raw.properties.messageId, channelName, topology.connection.name); + track(); + ops.reject(); + return; + } } else { try { raw.body = serializer.deserialize(raw.content, raw.properties.contentEncoding); } catch (err) { - ops.nack(); + if (options.poison) { + raw.quarantined = true; + raw.body = raw.content; + raw.contentEncoding = raw.properties.contentEncoding; + } else { + track(); + ops.reject(); + return; + } } } @@ -376,9 +397,7 @@ function subscribe (channelName, channel, topology, serializers, messages, optio if (data.activated) { handled = true; } - if (shouldAck && shouldBatch) { - messages.addMessage(ops); - } + track(); if (!handled) { unhandledLog.warn("Message of %s on queue '%s', connection '%s' was not processed by any registered handlers", diff --git a/src/index.js b/src/index.js index aa4da19..f01ee8d 100644 --- a/src/index.js +++ b/src/index.js @@ -33,7 +33,10 @@ const serializers = { return JSON.parse(bytes.toString(encoding || 'utf8')); }, serialize: (object) => { - return Buffer.from(JSON.stringify(object), 'utf8'); + const json = (typeof object === 'string') + ? object + : JSON.stringify(object); + return Buffer.from(json, 'utf8'); } }, 'application/octet-stream': {