Skip to content
This repository has been archived by the owner on Mar 8, 2023. It is now read-only.

Commit

Permalink
replay-buffer: inherit EE plus end event and pipe & unpipe
Browse files Browse the repository at this point in the history
  • Loading branch information
skenqbx committed Apr 26, 2015
1 parent ce523fa commit 3cda5ae
Show file tree
Hide file tree
Showing 7 changed files with 133 additions and 104 deletions.
24 changes: 15 additions & 9 deletions doc/api.markdown
Original file line number Diff line number Diff line change
Expand Up @@ -31,13 +31,15 @@
- [new ReplayBuffer(opt_max)](#new-replaybufferopt_max)
- [replayBuffer.max](#replaybuffermax)
- [replayBuffer.length](#replaybufferlength)
- [replayBuffer.closed](#replaybufferclosed)
- [replayBuffer.ended](#replaybufferended)
- [replayBuffer.bailout](#replaybufferbailout)
- [replayBuffer.out(writable, opt_callback)](#replaybufferoutwritable-opt_callback)
- [replayBuffer.pipe(writable, opt_callback)](#replaybufferpipewritable-opt_callback)
- [replayBuffer.unpipe(writable)](#replaybufferunpipewritable)
- [replayBuffer.push(chunk, opt_encoding)](#replaybufferpushchunk-opt_encoding)
- [replayBuffer.replay(writable, callback)](#replaybufferreplaywritable-callback)
- [replayBuffer.dump()](#replaybufferdump)
- [replayBuffer.close()](#replaybufferclose)
- [replayBuffer.end()](#replaybufferend)
- [Event: 'end'](#event-end)

## Exports

Expand Down Expand Up @@ -141,8 +143,8 @@ The currently active `request` stream, if any.
The currently active `response` stream, if any.

### call.abort()
Immediately abort any request, free the send-buffer & prevent any further requests.
Internally, either `request.abort()` or `request.socket.destroy()` is called, depending on what is available.
Immediately abort any request, free the `ReplayBuffer` and prevent any further requests.
Internally `request.abort()` is called.

_Note_: An `error` is very likely to be emitted after a call to `abort()`.

Expand All @@ -155,7 +157,7 @@ See [writable.end()](https://nodejs.org/api/stream.html#stream_writable_end_chun
Returns `this`.

### Event 'request'
Emitted after the request object has been created and the send-buffer has been flushed.
Emitted after the request object has been created and the `ReplayBuffer` has been flushed.

`function({Object} request)`

Expand Down Expand Up @@ -189,15 +191,17 @@ The maximum number of bytes allowed to buffer.
#### replayBuffer.length
The current number of bytes buffered.

#### replayBuffer.closed
#### replayBuffer.ended
A boolean indicating if the buffer accepts more data.

#### replayBuffer.bailout
A boolean indicating that the buffer size is exceeding the maximum allowed size.

### replayBuffer.out(writable, opt_callback)
### replayBuffer.pipe(writable, opt_callback)
Set a writable stream to receive all chunks, existing & new ones.

### replayBuffer.unpipe(writable)

### replayBuffer.push(chunk, opt_encoding)
Push a new chunk to the buffer.

Expand All @@ -207,5 +211,7 @@ _Copy_ all buffered chunks to the writable stream.
### replayBuffer.dump()
Empties the buffer.

### replayBuffer.close()
### replayBuffer.end()
Prevents further additon of chunks and clear the writable stream.

### Event: 'end'
14 changes: 8 additions & 6 deletions doc/plugin-api.markdown
Original file line number Diff line number Diff line change
Expand Up @@ -56,8 +56,8 @@ Emitted after a new configuration has been pushed onto the stack.

`function({Call} call, {Object} options)`

### Event: 'plugin-send-buffer'
Emitted right before a request object is created and the buffer is _flushed_ to the `request` object.
### Event: 'plugin-replay-buffer'
Emitted when a `ReplayBuffer` has been created.

`function({Call} call, {Object} options, {ReplayBuffer} buffer)`

Expand Down Expand Up @@ -98,7 +98,7 @@ Registers an interceptor for an event.
Removes all registered interceptors.

### Event: 'request'
Emitted after the request object has been created and the send-buffer has been flushed.
Emitted after the request object has been created and the `ReplayBuffer` has been flushed.

`function({Call} call, {Object} options, {Object} request)`

Expand All @@ -125,14 +125,16 @@ The current configuration is always the default, meaning `options` only needs to
_Note_: Request options are _copied_, plugin options are _referenced_ when not primitive.

### call.\_\_buffer()
Enables the `plugin-send-buffer` event using a `ReplayBuffer`.
Enable request body buffering.

Returns `true` on success or when buffering is already enabled, `false` otherwise.
A `plugin-replay-buffer` event is emitted when the buffer is created.

Returns the current `ReplayBuffer`, `false` otherwise.

### call.\_\_request(opt_callback)
Create a request object when no request is pending and a configuration is available. When no configuration is available, a _non-interceptable_ error is emitted.

- `{function({?Error} err, {?Object=} request)} opt_callback` Called after the request object has been created and the send-buffer has been flushed, a possible connect error is passed to the callback _(that error has already been emitted)_
- `{function({?Error} err, {?Object=} request)} opt_callback` Called after the request object has been created and the `ReplayBuffer` has been flushed, a possible connect error is passed to the callback _(that error has already been emitted)_

Returns `true` when a request is pending, the newly created `request` object otherwise.

Expand Down
27 changes: 10 additions & 17 deletions lib/call.js
Original file line number Diff line number Diff line change
Expand Up @@ -76,9 +76,12 @@ Call.prototype.__configure = function(options) {
Call.prototype.__buffer = function() {
if (!this._buffer && !this.request) {
this._buffer = new ReplayBuffer();

this.rail.emit('plugin-replay-buffer', this,
this._stack[this._pointer], this._buffer);
}

return this._buffer ? true : false;
return this._buffer;
};


Expand All @@ -102,14 +105,7 @@ Call.prototype.__request = function(opt_callback) {
this.emit('error', err);
return setImmediate(opt_callback, err);
}

if (this._buffer) {
this.rail.emit('plugin-send-buffer', this,
this._stack[this._pointer], this._buffer);
}

request = protocols[options.proto].request(options.request);

this.rail.emit('plugin-request', this, options, request);

request.once('response', function(response) {
Expand Down Expand Up @@ -151,7 +147,7 @@ Call.prototype.__request = function(opt_callback) {
}

if (this._buffer) {
this._buffer.out(request, finish);
this._buffer.pipe(request, finish);
} else {
process.nextTick(finish);
}
Expand All @@ -163,7 +159,6 @@ Call.prototype.__request = function(opt_callback) {
Call.prototype.__emit = function(event, object) {
var listener;


if (this._interceptors[event] && this._interceptors[event].length) {
listener = this._interceptors[event].shift();
listener(this, this._stack[this._pointer], object);
Expand Down Expand Up @@ -228,7 +223,7 @@ Call.prototype.abort = function() {
this.__abort(true);

if (this._buffer) {
this._buffer.close();
this._buffer.end();
this._buffer.dump();
this._buffer = null;
}
Expand Down Expand Up @@ -267,18 +262,16 @@ Call.prototype.end = function(chunk, encoding, opt_callback) {
if (chunk) {
this.write(chunk, encoding);
}
if (this._buffer) {
this._buffer.end();
}

this.__request(function(err2, request) {
if (err2) {
return opt_callback(err2);
} else if (!request) {
return opt_callback(new Error('Not connected'));
}

if (self._buffer) {
self._buffer.close();
}

self._end(function() {
request.end();
opt_callback();
Expand All @@ -298,7 +291,7 @@ Call.prototype._write = function(chunk, encoding, callback) {
if (this._buffer.bailout) {
// the max buffer size is reached, bailout
this.__request(function() {
self._buffer.close();
self._buffer.end();
self._buffer.dump();
self._buffer = null;
callback();
Expand Down
5 changes: 3 additions & 2 deletions lib/plugins/retry.js
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ RetryPlugin.prototype._setup = function() {

if (options.retry.limit > 0) {
if (!call.__buffer()) {
return call.emit('warn', 'retry', 'error', 'failed to enable send-buffer');
return call.emit('warn', 'retry', 'error', 'failed to enable replay buffer');
}
call.__intercept('error', self._intercepterror);
}
Expand Down Expand Up @@ -75,8 +75,9 @@ RetryPlugin.prototype._retry = function(call, options) {
// create a new request configuration
var config = call.__configure(options);

// TODO: unref or clear on abort
setTimeout(function() {
// create the request & flush send-buffer
// create the request & flush replay buffer
call.__request(function(err2, request) {
if (request) {
call.__intercept('request', self._interceptrequest);
Expand Down
Loading

0 comments on commit 3cda5ae

Please sign in to comment.