diff --git a/README.markdown b/README.markdown index 7044375..352c06e 100644 --- a/README.markdown +++ b/README.markdown @@ -169,10 +169,10 @@ firefox coverage/lcov-report/index.html ### Coverage ``` -Statements : 94.79% ( 637/672 ) -Branches : 88.83% ( 342/385 ) -Functions : 97.70% ( 85/87 ) -Lines : 94.79% ( 637/672 ) +Statements : 94.88% ( 667/703 ) +Branches : 88.92% ( 353/397 ) +Functions : 96.63% ( 86/89 ) +Lines : 94.88% ( 667/703 ) ``` [back to top](#table-of-contents) diff --git a/doc/api.markdown b/doc/api.markdown index e04b0d4..ac371f4 100644 --- a/doc/api.markdown +++ b/doc/api.markdown @@ -23,9 +23,20 @@ - [call.write(chunk, encoding, opt_callback)](#callwritechunk-encoding-opt_callback) - [call.end(chunk, encoding, opt_callback)](#callendchunk-encoding-opt_callback) - [Event: 'request'](#event-request) - - [Event: 'response'](#event-repsonse) + - [Event: 'response'](#event-response) - [Event: 'warn'](#event-warn) - [Event: 'error'](#event-error) + - [Class: ReplayBuffer](#class-replaybuffer) + - [new ReplayBuffer(opt_max)](#new-replaybufferopt_max) + - [replayBuffer.max](#replaybuffermax) + - [replayBuffer.length](#replaybufferlength) + - [replayBuffer.closed](#replaybufferclosed) + - [replayBuffer.bailout](#replaybufferbailout) + - [replayBuffer.out(writable, opt_callback)](#replaybufferoutwritable-opt_callback) + - [replayBuffer.push(chunk, opt_encoding)](#replaybufferpushchunk-opt_encoding) + - [replayBuffer.replay(writable, callback)](#replaybufferreplaywritable-callback) + - [replayBuffer.dump()](#replaybufferdump) + - [replayBuffer.close()](#replaybufferclose) ## Exports @@ -116,6 +127,7 @@ The currently active `response` stream, if any. ### call.abort() Immediately abort any request, free the send-buffer & prevent any further requests. +Internally, either `request.abort()` or `request.socket.destroy()` is called, depending on what is available. _Note_: An `error` is very likely to be emitted after a call to `abort()`. @@ -128,10 +140,12 @@ See [writable.end()](https://nodejs.org/api/stream.html#stream_writable_end_chun Returns `this`. ### Event 'request' +Emitted after the request object has been created and the send-buffer has been flushed. `function({Object} request)` ### Event 'response' +Emitted after the response headers have been received. `function({Object} response)` @@ -142,3 +156,38 @@ Returns `this`. ### Event 'error' [back to top](#table-of-contents) + +## Class: ReplayBuffer +The `ReplayBuffer` is used to buffer the request body in case of redirects, retries or other use-cases. + +The plugin API offers [call.__buffer()](./plugin-api.markdown#call__buffer) to enable this buffer. + +### new ReplayBuffer(opt_max) +Creates a new `ReplayBuffer` object. + +#### replayBuffer.max +The maximum number of bytes allowed to buffer. + +#### replayBuffer.length +The current number of bytes buffered. + +#### replayBuffer.closed +A boolean indicating if the buffer accepts more data. + +#### replayBuffer.bailout +A boolean indicating that the buffer size is exceeding the maximum allowed size. + +### replayBuffer.out(writable, opt_callback) +Set a writable stream to receive all chunks, existing & new ones. + +### replayBuffer.push(chunk, opt_encoding) +Push a new chunk to the buffer. + +### replayBuffer.replay(writable, callback) +_Copy_ all buffered chunks to the writable stream. + +### replayBuffer.dump() +Empties the buffer. + +### replayBuffer.close() +Prevents further additon of chunks and clear the writable stream. diff --git a/doc/plugin-api.markdown b/doc/plugin-api.markdown index 3e732db..816c16b 100644 --- a/doc/plugin-api.markdown +++ b/doc/plugin-api.markdown @@ -87,7 +87,7 @@ Invokes the next pending interceptor or emits the event. On each call to `__emit()` only one interceptor is invoked. This way plugins can _blackhole_ responses by not calling `__emit()`. Creating a new request is obligatory in these cases. ### call.\_\_intercept(event, interceptor) -Registers an interceptor `function({Call} call, {Object} options, {*} object)` for an event. +Registers an interceptor for an event. ### call.\_\_clear() Removes all registered interceptors. @@ -95,11 +95,17 @@ Removes all registered interceptors. ### Event: 'request' Emitted after the request object has been created and the send-buffer has been flushed. +`function({Call} call, {Object} options, {Object} request)` + ### Event: 'response' Emitted after the response headers have been received. +`function({Call} call, {Object} options, {Object} response)` + ### Event: 'error' -Emitted on an error in context of a request. +Emitted on an error. + +`function({Error} err)` [back to top](#table-of-contents) @@ -116,9 +122,11 @@ Enables the `plugin-send-buffer` event using a `ReplayBuffer`. Returns `true` on success or when buffering is already enabled, `false` otherwise. -### call.\_\_request() +### call.\_\_request(opt_callback) Create a request object when no request is pending and a configuration is available. When no configuration is available, a _non-interceptable_ error is emitted. + - `{function({?Error} err, {?Object=} request)} opt_callback` Called after the request object has been created and the send-buffer has been flushed, a possible connect error is passed to the callback _(that error has already been emitted)_ + Returns `true` when a request is pending, the newly created `request` object otherwise. ### call.\_\_abort(opt_reason) diff --git a/doc/plugins.markdown b/doc/plugins.markdown index 68689d1..f99c079 100644 --- a/doc/plugins.markdown +++ b/doc/plugins.markdown @@ -19,6 +19,8 @@ When the body is empty its value is `null`, otherwise a `Buffer`. - `{boolean} default` Enable buffering for all requests, defaults to `false` - `{number} max` The maximum buffer size, defaults to `134217728` (128 MiB) +_Note_: When the maximum buffer size is reached, a _bailout_ is performed putting all buffered data back into the response stream and emitting the response. + **request options** - `{boolean} buffer` En-/disable buffering @@ -50,7 +52,9 @@ Uses the `buffer` plugin. **options** - `{boolean} auto` Enable auto-parsing when `Content-Type: application/json` - - `{number} max` The maximum buffer size, defaults to `1048576` (1 MiB) + - `{number} max` The maximum JSON size, defaults to `1048576` (1 MiB) + +_Note_: When the JSON size exeeds the maximum size, it's not parsed. The `response.buffer` is still available for manual parsing though. **request options** diff --git a/lib/call.js b/lib/call.js index 10c3913..7d7ac68 100644 --- a/lib/call.js +++ b/lib/call.js @@ -164,20 +164,23 @@ Call.prototype.__buffer = function() { Call.prototype.__request = function(opt_callback) { var self = this; - var request; + var request, err; var options = this._stack[this._pointer]; opt_callback = opt_callback || function() {}; if (this.aborted) { + setImmediate(opt_callback); return false; + } else if (this.request) { + setImmediate(opt_callback, null, this.request); return true; + } else if (!options) { - return this.emit('error', new Error('No configuration available')); - } else if (!protocols[options.proto]) { - return this.emit('error', - new Error('Unknown protocol "' + options.proto + '"')); + err = new Error('No configuration available'); + this.emit('error', err); + return setImmediate(opt_callback, err); } if (this._buffer) { @@ -222,15 +225,15 @@ Call.prototype.__request = function(opt_callback) { if (self.response) { return; } + opt_callback(err, self.request); if (!err) { - opt_callback(); self.__emit('request', request); // interceptable event } } if (this._buffer) { - this._buffer.replay(request, finish); + this._buffer.out(request, finish); } else { process.nextTick(finish); } @@ -313,6 +316,7 @@ Call.prototype.abort = function() { Call.prototype._end = Call.prototype.end; Call.prototype.end = function(chunk, encoding, opt_callback) { var self = this; + var err; if (typeof encoding === 'function') { opt_callback = encoding; @@ -323,38 +327,35 @@ Call.prototype.end = function(chunk, encoding, opt_callback) { opt_callback = opt_callback || function() {}; if (this.ended) { + err = new Error('Trying to write after end'); if (chunk) { - this.emit('error', new Error('Trying to write after end')); + this.emit('error', err); } - setImmediate(opt_callback); + setImmediate(opt_callback, err); return this; } this.ended = true; - if (this._buffer || this.__request()) { - this._end(chunk, encoding, function() { + if (chunk) { + this.write(chunk, encoding); + } + + this.__request(function(err2, request) { + if (err2) { + return opt_callback(err2); + } else if (!request) { + return opt_callback(new Error('Not connected')); + } + + self._end(function() { if (self._buffer) { - assert(!self.request); - self._buffer.closed = true; - - self.__request(function() { - if (self.request) { // the request may already have errored - self.request.end(); - } - opt_callback(); - }); - - } else { - self.request.end(); - setImmediate(opt_callback); + self._buffer.close(); } + request.end(); + opt_callback(err2); }); - - } else { - this.emit('error', new Error('Not connected')); - setImmediate(opt_callback); - } + }); return this; }; @@ -364,9 +365,12 @@ Call.prototype._write = function(chunk, encoding, callback) { var self = this; if (this._buffer) { - if (!this._buffer.push(chunk, encoding)) { + this._buffer.push(chunk, encoding); + + if (this._buffer.bailout) { // the max buffer size is reached, bailout this.__request(function() { + self._buffer.close(); self._buffer.dump(); self._buffer = null; callback(); @@ -374,11 +378,20 @@ Call.prototype._write = function(chunk, encoding, callback) { } else { callback(); // ZALGO! } - } else if (this.__request()) { + + } else if (this.request) { this.request.write(chunk, encoding, callback); } else { - callback(new Error('Not connected')); // ZALGO! + this.__request(function(err, request) { + if (err) { + return callback(err); + } else if (!request) { + callback(new Error('Not connected')); + } + request.write(chunk, encoding); + callback(); + }); } }; diff --git a/lib/plugins/redirect.js b/lib/plugins/redirect.js index c5be31c..1c3a736 100644 --- a/lib/plugins/redirect.js +++ b/lib/plugins/redirect.js @@ -52,7 +52,7 @@ RedirectPlugin.prototype._setup = function() { }); this._interceptrequest = function(call, options, request) { - self._interceptRequest(call, options, request); + // blackhole }; this._interceptresponse = function(call, options, response) { @@ -61,13 +61,6 @@ RedirectPlugin.prototype._setup = function() { }; -RedirectPlugin.prototype._interceptRequest = function(call, options, request) { - // send the request made in `interceptResponse` below - request.end(); - call.emit('redirect', options); -}; - - RedirectPlugin.prototype._interceptResponse = function(call, options, response) { var self = this; var p, config, url, proto1, proto2; @@ -119,17 +112,16 @@ RedirectPlugin.prototype._interceptResponse = function(call, options, response) } function onEnd() { - var req; - call.__clear(); - req = call.__request(); - - if (req && req !== true) { - call.__intercept('request', self._interceptrequest); - } else { - call.emit('warn', 'redirect', 'failed', 'could not create request'); - call.__emit('response', response); - } + call.__request(function(err2, request) { + if (request) { + call.__intercept('request', self._interceptrequest); + request.end(); + call.emit('redirect', options); + } else { + call.__emit('response', response); + } + }); } // check if buffer plugin already handled the response body diff --git a/lib/plugins/retry.js b/lib/plugins/retry.js index 46373b6..f565ba8 100644 --- a/lib/plugins/retry.js +++ b/lib/plugins/retry.js @@ -51,7 +51,7 @@ RetryPlugin.prototype._setup = function() { }; this._interceptrequest = function(call, options, request) { - self._interceptRequest(call, options, request); + // blackhole }; // this._interceptresponse = function(call, options, response) { @@ -62,7 +62,7 @@ RetryPlugin.prototype._setup = function() { RetryPlugin.prototype._interceptError = function(call, options, err) { var self = this; - var config, req; + var config; var syscall = err.syscall; if (syscall === 'connect' && this.connectErrors.indexOf(err.code) > -1) { @@ -79,8 +79,11 @@ RetryPlugin.prototype._interceptError = function(call, options, err) { setTimeout(function() { // create the request & flush send-buffer - call.__request(function() { - call.__intercept('request', self._interceptrequest); + call.__request(function(err2, request) { + if (request) { + call.__intercept('request', self._interceptrequest); + request.end(); + } }); }, options.retry.interval); @@ -90,11 +93,6 @@ RetryPlugin.prototype._interceptError = function(call, options, err) { }; -RetryPlugin.prototype._interceptRequest = function(call, options, request) { - request.end(); -}; - - // RetryPlugin.prototype._interceptResponse = function(call, options, response) { // call.__emit('response', response); // }; diff --git a/lib/replay-buffer.js b/lib/replay-buffer.js index 303eada..afff44a 100644 --- a/lib/replay-buffer.js +++ b/lib/replay-buffer.js @@ -7,33 +7,67 @@ * Buffers a readable stream & * replays chunks on as many writable stream as needed * - * @param {?Object=} opt_options + * @param {?number} opt_max * * @constructor */ -function ReplayBuffer(opt_options) { - opt_options = opt_options || {}; - - this.max = opt_options.max || 134217728; // 128 MiB +function ReplayBuffer(opt_max) { + this.max = opt_max || 134217728; // 128 MiB this.chunks = []; this.encodings = []; this.length = 0; // number of chunk bytes + // status flags this.closed = false; + this.bailout = false; + + // @type {stream.Writable} + this._writable = null; } module.exports = ReplayBuffer; -ReplayBuffer.prototype.push = function(chunk, encoding) { +ReplayBuffer.prototype.out = function(writable, opt_callback) { + var self = this; + + if (!this._writable) { + this.replay(writable, function() { + self._writable = writable; + + if (opt_callback) { + opt_callback(); + } + }); + return true; + } + return false; +}; + + +ReplayBuffer.prototype.push = function(chunk, opt_encoding) { if (this.closed) { return false; } + + if (this._writable) { + this._writable.write(chunk, opt_encoding); + } + this.chunks.push(chunk); this.length += chunk.length; - this.encodings.push(encoding); + this.encodings.push(opt_encoding); - if (this.length > this.max) { + if (this.bailout) { + return false; + + } else if (this.length >= this.max) { + this.bailout = true; + + // dump the buffer when there is a writable stream getting the data anyway + if (this._writable) { + this.dump(); + } return false; } @@ -42,43 +76,48 @@ ReplayBuffer.prototype.push = function(chunk, encoding) { ReplayBuffer.prototype.replay = function(writable, callback) { - var self = this; - var i = 0; + var chunks = this.chunks; + var encodings = this.encodings; + var offset = 0; + var n = chunks.length; - if (!this.chunks.length) { - return setImmediate(function() { - callback(); - }); + if (!n) { + return setImmediate(callback); } // async variation of _duff's device_ // https://en.wikipedia.org/wiki/Duff%27s_device - (function next() { + setImmediate(write); + + function write() { var mod = 0; - var n = self.chunks.length - i; + var more; if (n < 4) { mod = n % 4; } - // TODO: write callback! switch (mod) { case 0: - writable.write(self.chunks[i], self.encodings[i++]); /* falls through */ + writable.write(chunks[offset], encodings[offset++]); /* falls through */ case 3: - writable.write(self.chunks[i], self.encodings[i++]); /* falls through */ + writable.write(chunks[offset], encodings[offset++]); /* falls through */ case 2: - writable.write(self.chunks[i], self.encodings[i++]); /* falls through */ + writable.write(chunks[offset], encodings[offset++]); /* falls through */ case 1: - writable.write(self.chunks[i], self.encodings[i++]); /* falls through */ + more = writable.write(chunks[offset], encodings[offset++]); } - if (self.chunks[i]) { - setImmediate(next); - } else { - setImmediate(callback); + n = chunks.length - offset; + + if (n === 0) { + return callback(); + } else if (more) { + return setImmediate(write); } - })(/* auto-exec */); + + writable.once('drain', write); + } }; @@ -87,3 +126,9 @@ ReplayBuffer.prototype.dump = function() { this.encodings = []; this.length = 0; }; + + +ReplayBuffer.prototype.close = function() { + this.closed = true; + this._writable = null; +}; diff --git a/test/common.js b/test/common.js index 3942a19..921e651 100644 --- a/test/common.js +++ b/test/common.js @@ -8,3 +8,5 @@ exports.serverKey = fs.readFileSync(exports.fixtures + '/server.key'); exports.serverCert = fs.readFileSync(exports.fixtures + '/server.crt'); exports.port = 57647; + +exports.Writable = require('./fixtures/writable'); diff --git a/test/fixtures/writable.js b/test/fixtures/writable.js new file mode 100644 index 0000000..c265e89 --- /dev/null +++ b/test/fixtures/writable.js @@ -0,0 +1,23 @@ +'use strict'; +var util = require('util'); +var stream = require('stream'); + + + +function Writable() { + stream.Writable.call(this, { + highWaterMark: 1 + }); + + this.chunks = []; + this.encodings = []; +} +util.inherits(Writable, stream.Writable); +module.exports = Writable; + + +Writable.prototype._write = function(chunk, encoding, callback) { + this.chunks.push(chunk); + this.encodings.push(encoding); + callback(); +}; diff --git a/test/test-http-retry.js b/test/test-http-retry.js index 9f3fb0c..76cbec6 100644 --- a/test/test-http-retry.js +++ b/test/test-http-retry.js @@ -19,7 +19,10 @@ suite('http:retry', function() { suiteSetup(function(done) { - rail = new RAIL(); + rail = new RAIL({ + proto: 'http' + }); + rail.use('retry', { limit: 3, interval: 20 @@ -45,13 +48,12 @@ suite('http:retry', function() { }); - test('call', function(done) { + test('connect errors', function(done) { var retries = 0; var errors = 0; var ended = false; rail.call({ - proto: 'http', port: 55555 }).on('error', function(err) { assert(err); diff --git a/test/test-replay-buffer.js b/test/test-replay-buffer.js new file mode 100644 index 0000000..9e54e23 --- /dev/null +++ b/test/test-replay-buffer.js @@ -0,0 +1,94 @@ +'use strict'; +/* global suite: false, setup: false, test: false, + teardown: false, suiteSetup: false, suiteTeardown: false */ +var assert = require('assert'); +var crypto = require('crypto'); +var common = require('./common'); +var RAIL = require('../'); + + +suite('replay-buffer', function() { + + + test('push', function() { + var buffer = new RAIL.ReplayBuffer(); + + assert.strictEqual(buffer.length, 0); + assert.strictEqual(buffer.chunks.length, 0); + + var more = buffer.push(new Buffer('123')); + assert(more); + assert.strictEqual(buffer.length, 3); + assert.strictEqual(buffer.chunks.length, 1); + }); + + + test('push - bailout', function() { + var buffer = new RAIL.ReplayBuffer(5); + var more; + + more = buffer.push(new Buffer('123')); + assert(more); + assert.strictEqual(buffer.length, 3); + assert.strictEqual(buffer.chunks.length, 1); + assert.strictEqual(buffer.chunks[0].length, 3); + + more = buffer.push(new Buffer('456')); + assert(!more); + assert.strictEqual(buffer.length, 6); + assert.strictEqual(buffer.chunks.length, 2); + + more = buffer.push(new Buffer('789')); + assert(!more); + assert.strictEqual(buffer.length, 9); + assert.strictEqual(buffer.chunks.length, 3); + }); + + + test('push - bailout - writable', function(done) { + var buffer = new RAIL.ReplayBuffer(2055); + var writable = new common.Writable(); + + buffer.push(crypto.randomBytes(256)); + buffer.push(crypto.randomBytes(256)); + buffer.push(crypto.randomBytes(256)); + buffer.push(crypto.randomBytes(256)); + + buffer.push(crypto.randomBytes(256)); + buffer.push(crypto.randomBytes(256)); + buffer.push(crypto.randomBytes(256)); + buffer.push(crypto.randomBytes(256)); + + assert.strictEqual(buffer.length, 2048); + + buffer.out(writable, function() { + var more; + + assert.strictEqual(writable.chunks.length, 8); + assert.strictEqual(writable.chunks[0].length, 256); + + more = buffer.push(new Buffer('456')); + assert(more); + more = buffer.push(new Buffer('789')); + assert(more); + more = buffer.push(new Buffer('0ab')); + assert(!more); + + assert(buffer.bailout); + assert.strictEqual(buffer.length, 0); + assert.strictEqual(buffer.chunks.length, 0); + + done(); + }); + }); + + + test('push - closed', function() { + var buffer = new RAIL.ReplayBuffer(); + + buffer.close(); + + var more = buffer.push(new Buffer('123')); + assert(!more); + }); +});