Skip to content

Commit

Permalink
Add support for progress observer on stream read
Browse files Browse the repository at this point in the history
Drop support for former Node.js streams
  • Loading branch information
kriskowal committed May 30, 2014
1 parent dccc693 commit 2c515a2
Show file tree
Hide file tree
Showing 4 changed files with 43 additions and 33 deletions.
2 changes: 2 additions & 0 deletions CHANGES.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
<!-- vim:ts=4:sts=4:sw=4:et:tw=60 -->

- Add support for progress observers on readable stream `read` promise.

## 2.0.2

- Fixed problems with hanging connections, and problems with early termination
Expand Down
6 changes: 3 additions & 3 deletions http.js
Original file line number Diff line number Diff line change
Expand Up @@ -177,12 +177,12 @@ exports.ServerRequest = function (_request, ssl) {
path: request.path
});
/*** A Q IO asynchronous text reader */
request.body = NodeReader(_request);
request.body = NodeReader(_request, null, +request.headers["content-length"]);
/*** {Object} HTTP headers (JSGI)*/
request.headers = _request.headers;
/*** The underlying Node request */
request.node = _request;
request.nodeRequest = _request; // Deprecated
request.nodeRequest = _request; // XXX Deprecated
/*** The underlying Node TCP connection */
request.nodeConnection = _request.connection;

Expand Down Expand Up @@ -337,7 +337,7 @@ exports.ClientResponse = function (_response, charset) {
response.node = _response;
response.nodeResponse = _response; // Deprecated
response.nodeConnection = _response.connection; // Deprecated
response.body = NodeReader(_response, charset);
response.body = NodeReader(_response, charset, +response.headers["content-length"]);
return response;
};

Expand Down
45 changes: 21 additions & 24 deletions node/reader.js
Original file line number Diff line number Diff line change
Expand Up @@ -8,38 +8,29 @@ var version = process.versions.node.split('.');
var supportsReadable = version[0] >= 0 && version[1] >= 10;

module.exports = Reader;
function Reader(_stream, charset) {
function Reader(_stream, charset, length) {
if (charset && _stream.setEncoding) // TODO complain about inconsistency
_stream.setEncoding(charset);

var window = 0;
var index = 0;
var drained = true;
var queue = new Queue();
var output = new BufferStream({
get: function () {
window++;
if (window > 0) {
tick();
flush();
}
return queue.get();
},
put: queue.put
});

// this triggers a switch in StreamReader#read
// this triggers a switch in `read`
output.charset = charset;

function tick() {
if (window > 0 && !drained) {
var chunk = _stream.read();
if (chunk === null) {
drained = true;
return;
}
window--;
output.yield(chunk);
}
}
// also used in `read` for progress estimates
output.length = length;

_stream.on("error", function (error) {
output.throw(error);
Expand All @@ -49,16 +40,22 @@ function Reader(_stream, charset) {
output.return();
});

_stream.on("readable", function () {
drained = false;
flush();
});

if (supportsReadable) {
_stream.on("readable", function () {
drained = false;
tick();
});
} else {
_stream.on("data", function (chunk) {
output.yield(chunk);
});
function flush() {
if (window > 0 && !drained) {
var chunk = _stream.read();
if (chunk === null) {
drained = true;
return;
}
window--;
output.yield(chunk, index);
index += chunk.length;
}
}

return output;
Expand Down
23 changes: 17 additions & 6 deletions streams.js
Original file line number Diff line number Diff line change
Expand Up @@ -223,16 +223,27 @@ Readable.prototype.copy = function (output) {
Readable.prototype.read = function () {
var chunks = [];
var self = this;
return this.forEach(function (chunk) {
var deferred = Q.defer();
var index = 0;
var start = Date.now();
this.forEach(function (chunk) {
chunks.push(chunk);
})
.then(function () {
index += chunk.length;
if (this.length) {
var now = Date.now(); // milliseconds
var speed = index / (now - start); // bytes per milliseconds
var remaining = this.length - index; // bytes
deferred.setEstimate(now + remaining / speed); // milliseconds
}
}, this)
.done(function () {
if (self.charset) {
return chunks.join("");
deferred.resolve(chunks.join(""));
} else {
return Buffer.concat(chunks);
deferred.resolve(Buffer.concat(chunks));
}
});
}, deferred.reject);
return deferred.promise;
}

Readable.prototype.cancel = function () {
Expand Down

0 comments on commit 2c515a2

Please sign in to comment.