Skip to content
This repository has been archived by the owner on Mar 8, 2023. It is now read-only.

Commit

Permalink
reply-buffer: upgrade, document & update affected plugins
Browse files Browse the repository at this point in the history
  • Loading branch information
skenqbx committed Apr 24, 2015
1 parent 914a0d5 commit 8f8c0fb
Show file tree
Hide file tree
Showing 12 changed files with 327 additions and 97 deletions.
8 changes: 4 additions & 4 deletions README.markdown
Original file line number Diff line number Diff line change
Expand Up @@ -169,10 +169,10 @@ firefox coverage/lcov-report/index.html
### Coverage

```
Statements : 94.79% ( 637/672 )
Branches : 88.83% ( 342/385 )
Functions : 97.70% ( 85/87 )
Lines : 94.79% ( 637/672 )
Statements : 94.88% ( 667/703 )
Branches : 88.92% ( 353/397 )
Functions : 96.63% ( 86/89 )
Lines : 94.88% ( 667/703 )
```

[back to top](#table-of-contents)
51 changes: 50 additions & 1 deletion doc/api.markdown
Original file line number Diff line number Diff line change
Expand Up @@ -23,9 +23,20 @@
- [call.write(chunk, encoding, opt_callback)](#callwritechunk-encoding-opt_callback)
- [call.end(chunk, encoding, opt_callback)](#callendchunk-encoding-opt_callback)
- [Event: 'request'](#event-request)
- [Event: 'response'](#event-repsonse)
- [Event: 'response'](#event-response)
- [Event: 'warn'](#event-warn)
- [Event: 'error'](#event-error)
- [Class: ReplayBuffer](#class-replaybuffer)
- [new ReplayBuffer(opt_max)](#new-replaybufferopt_max)
- [replayBuffer.max](#replaybuffermax)
- [replayBuffer.length](#replaybufferlength)
- [replayBuffer.closed](#replaybufferclosed)
- [replayBuffer.bailout](#replaybufferbailout)
- [replayBuffer.out(writable, opt_callback)](#replaybufferoutwritable-opt_callback)
- [replayBuffer.push(chunk, opt_encoding)](#replaybufferpushchunk-opt_encoding)
- [replayBuffer.replay(writable, callback)](#replaybufferreplaywritable-callback)
- [replayBuffer.dump()](#replaybufferdump)
- [replayBuffer.close()](#replaybufferclose)

## Exports

Expand Down Expand Up @@ -116,6 +127,7 @@ The currently active `response` stream, if any.

### call.abort()
Immediately abort any request, free the send-buffer & prevent any further requests.
Internally, either `request.abort()` or `request.socket.destroy()` is called, depending on what is available.

_Note_: An `error` is very likely to be emitted after a call to `abort()`.

Expand All @@ -128,10 +140,12 @@ See [writable.end()](https://nodejs.org/api/stream.html#stream_writable_end_chun
Returns `this`.

### Event 'request'
Emitted after the request object has been created and the send-buffer has been flushed.

`function({Object} request)`

### Event 'response'
Emitted after the response headers have been received.

`function({Object} response)`

Expand All @@ -142,3 +156,38 @@ Returns `this`.
### Event 'error'

[back to top](#table-of-contents)

## Class: ReplayBuffer
The `ReplayBuffer` is used to buffer the request body in case of redirects, retries or other use-cases.

The plugin API offers [call.__buffer()](./plugin-api.markdown#call__buffer) to enable this buffer.

### new ReplayBuffer(opt_max)
Creates a new `ReplayBuffer` object.

#### replayBuffer.max
The maximum number of bytes allowed to buffer.

#### replayBuffer.length
The current number of bytes buffered.

#### replayBuffer.closed
A boolean indicating if the buffer accepts more data.

#### replayBuffer.bailout
A boolean indicating that the buffer size is exceeding the maximum allowed size.

### replayBuffer.out(writable, opt_callback)
Set a writable stream to receive all chunks, existing & new ones.

### replayBuffer.push(chunk, opt_encoding)
Push a new chunk to the buffer.

### replayBuffer.replay(writable, callback)
_Copy_ all buffered chunks to the writable stream.

### replayBuffer.dump()
Empties the buffer.

### replayBuffer.close()
Prevents further additon of chunks and clear the writable stream.
14 changes: 11 additions & 3 deletions doc/plugin-api.markdown
Original file line number Diff line number Diff line change
Expand Up @@ -87,19 +87,25 @@ Invokes the next pending interceptor or emits the event.
On each call to `__emit()` only one interceptor is invoked. This way plugins can _blackhole_ responses by not calling `__emit()`. Creating a new request is obligatory in these cases.

### call.\_\_intercept(event, interceptor)
Registers an interceptor `function({Call} call, {Object} options, {*} object)` for an event.
Registers an interceptor for an event.

### call.\_\_clear()
Removes all registered interceptors.

### Event: 'request'
Emitted after the request object has been created and the send-buffer has been flushed.

`function({Call} call, {Object} options, {Object} request)`

### Event: 'response'
Emitted after the response headers have been received.

`function({Call} call, {Object} options, {Object} response)`

### Event: 'error'
Emitted on an error in context of a request.
Emitted on an error.

`function({Error} err)`

[back to top](#table-of-contents)

Expand All @@ -116,9 +122,11 @@ Enables the `plugin-send-buffer` event using a `ReplayBuffer`.

Returns `true` on success or when buffering is already enabled, `false` otherwise.

### call.\_\_request()
### call.\_\_request(opt_callback)
Create a request object when no request is pending and a configuration is available. When no configuration is available, a _non-interceptable_ error is emitted.

- `{function({?Error} err, {?Object=} request)} opt_callback` Called after the request object has been created and the send-buffer has been flushed, a possible connect error is passed to the callback _(that error has already been emitted)_

Returns `true` when a request is pending, the newly created `request` object otherwise.

### call.\_\_abort(opt_reason)
Expand Down
6 changes: 5 additions & 1 deletion doc/plugins.markdown
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@ When the body is empty its value is `null`, otherwise a `Buffer`.
- `{boolean} default` Enable buffering for all requests, defaults to `false`
- `{number} max` The maximum buffer size, defaults to `134217728` (128 MiB)

_Note_: When the maximum buffer size is reached, a _bailout_ is performed putting all buffered data back into the response stream and emitting the response.

**request options**

- `{boolean} buffer` En-/disable buffering
Expand Down Expand Up @@ -50,7 +52,9 @@ Uses the `buffer` plugin.
**options**

- `{boolean} auto` Enable auto-parsing when `Content-Type: application/json`
- `{number} max` The maximum buffer size, defaults to `1048576` (1 MiB)
- `{number} max` The maximum JSON size, defaults to `1048576` (1 MiB)

_Note_: When the JSON size exeeds the maximum size, it's not parsed. The `response.buffer` is still available for manual parsing though.

**request options**

Expand Down
77 changes: 45 additions & 32 deletions lib/call.js
Original file line number Diff line number Diff line change
Expand Up @@ -164,20 +164,23 @@ Call.prototype.__buffer = function() {

Call.prototype.__request = function(opt_callback) {
var self = this;
var request;
var request, err;
var options = this._stack[this._pointer];

opt_callback = opt_callback || function() {};

if (this.aborted) {
setImmediate(opt_callback);
return false;

} else if (this.request) {
setImmediate(opt_callback, null, this.request);
return true;

} else if (!options) {
return this.emit('error', new Error('No configuration available'));
} else if (!protocols[options.proto]) {
return this.emit('error',
new Error('Unknown protocol "' + options.proto + '"'));
err = new Error('No configuration available');
this.emit('error', err);
return setImmediate(opt_callback, err);
}

if (this._buffer) {
Expand Down Expand Up @@ -222,15 +225,15 @@ Call.prototype.__request = function(opt_callback) {
if (self.response) {
return;
}
opt_callback(err, self.request);

if (!err) {
opt_callback();
self.__emit('request', request); // interceptable event
}
}

if (this._buffer) {
this._buffer.replay(request, finish);
this._buffer.out(request, finish);
} else {
process.nextTick(finish);
}
Expand Down Expand Up @@ -313,6 +316,7 @@ Call.prototype.abort = function() {
Call.prototype._end = Call.prototype.end;
Call.prototype.end = function(chunk, encoding, opt_callback) {
var self = this;
var err;

if (typeof encoding === 'function') {
opt_callback = encoding;
Expand All @@ -323,38 +327,35 @@ Call.prototype.end = function(chunk, encoding, opt_callback) {
opt_callback = opt_callback || function() {};

if (this.ended) {
err = new Error('Trying to write after end');
if (chunk) {
this.emit('error', new Error('Trying to write after end'));
this.emit('error', err);
}
setImmediate(opt_callback);
setImmediate(opt_callback, err);
return this;
}

this.ended = true;

if (this._buffer || this.__request()) {
this._end(chunk, encoding, function() {
if (chunk) {
this.write(chunk, encoding);
}

this.__request(function(err2, request) {
if (err2) {
return opt_callback(err2);
} else if (!request) {
return opt_callback(new Error('Not connected'));
}

self._end(function() {
if (self._buffer) {
assert(!self.request);
self._buffer.closed = true;

self.__request(function() {
if (self.request) { // the request may already have errored
self.request.end();
}
opt_callback();
});

} else {
self.request.end();
setImmediate(opt_callback);
self._buffer.close();
}
request.end();
opt_callback(err2);
});

} else {
this.emit('error', new Error('Not connected'));
setImmediate(opt_callback);
}
});

return this;
};
Expand All @@ -364,21 +365,33 @@ Call.prototype._write = function(chunk, encoding, callback) {
var self = this;

if (this._buffer) {
if (!this._buffer.push(chunk, encoding)) {
this._buffer.push(chunk, encoding);

if (this._buffer.bailout) {
// the max buffer size is reached, bailout
this.__request(function() {
self._buffer.close();
self._buffer.dump();
self._buffer = null;
callback();
});
} else {
callback(); // ZALGO!
}
} else if (this.__request()) {

} else if (this.request) {
this.request.write(chunk, encoding, callback);

} else {
callback(new Error('Not connected')); // ZALGO!
this.__request(function(err, request) {
if (err) {
return callback(err);
} else if (!request) {
callback(new Error('Not connected'));
}
request.write(chunk, encoding);
callback();
});
}
};

Expand Down
28 changes: 10 additions & 18 deletions lib/plugins/redirect.js
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ RedirectPlugin.prototype._setup = function() {
});

this._interceptrequest = function(call, options, request) {
self._interceptRequest(call, options, request);
// blackhole
};

this._interceptresponse = function(call, options, response) {
Expand All @@ -61,13 +61,6 @@ RedirectPlugin.prototype._setup = function() {
};


RedirectPlugin.prototype._interceptRequest = function(call, options, request) {
// send the request made in `interceptResponse` below
request.end();
call.emit('redirect', options);
};


RedirectPlugin.prototype._interceptResponse = function(call, options, response) {
var self = this;
var p, config, url, proto1, proto2;
Expand Down Expand Up @@ -119,17 +112,16 @@ RedirectPlugin.prototype._interceptResponse = function(call, options, response)
}

function onEnd() {
var req;

call.__clear();
req = call.__request();

if (req && req !== true) {
call.__intercept('request', self._interceptrequest);
} else {
call.emit('warn', 'redirect', 'failed', 'could not create request');
call.__emit('response', response);
}
call.__request(function(err2, request) {
if (request) {
call.__intercept('request', self._interceptrequest);
request.end();
call.emit('redirect', options);
} else {
call.__emit('response', response);
}
});
}

// check if buffer plugin already handled the response body
Expand Down
Loading

0 comments on commit 8f8c0fb

Please sign in to comment.