diff --git a/CHANGES.md b/CHANGES.md index 487663b..331e2da 100644 --- a/CHANGES.md +++ b/CHANGES.md @@ -1,5 +1,7 @@ + - Add support for progress observers on readable stream `read` promise. + ## 2.0.2 - Fixed problems with hanging connections, and problems with early termination diff --git a/http.js b/http.js index 9f52915..d5473d9 100644 --- a/http.js +++ b/http.js @@ -177,12 +177,12 @@ exports.ServerRequest = function (_request, ssl) { path: request.path }); /*** A Q IO asynchronous text reader */ - request.body = NodeReader(_request); + request.body = NodeReader(_request, null, +request.headers["content-length"]); /*** {Object} HTTP headers (JSGI)*/ request.headers = _request.headers; /*** The underlying Node request */ request.node = _request; - request.nodeRequest = _request; // Deprecated + request.nodeRequest = _request; // XXX Deprecated /*** The underlying Node TCP connection */ request.nodeConnection = _request.connection; @@ -337,7 +337,7 @@ exports.ClientResponse = function (_response, charset) { response.node = _response; response.nodeResponse = _response; // Deprecated response.nodeConnection = _response.connection; // Deprecated - response.body = NodeReader(_response, charset); + response.body = NodeReader(_response, charset, +response.headers["content-length"]); return response; }; diff --git a/node/reader.js b/node/reader.js index 94abfe6..babc972 100644 --- a/node/reader.js +++ b/node/reader.js @@ -8,38 +8,29 @@ var version = process.versions.node.split('.'); var supportsReadable = version[0] >= 0 && version[1] >= 10; module.exports = Reader; -function Reader(_stream, charset) { +function Reader(_stream, charset, length) { if (charset && _stream.setEncoding) // TODO complain about inconsistency _stream.setEncoding(charset); var window = 0; + var index = 0; var drained = true; var queue = new Queue(); var output = new BufferStream({ get: function () { window++; if (window > 0) { - tick(); + flush(); } return queue.get(); }, put: queue.put }); - // this triggers a switch in StreamReader#read + // this triggers a switch in `read` output.charset = charset; - - function tick() { - if (window > 0 && !drained) { - var chunk = _stream.read(); - if (chunk === null) { - drained = true; - return; - } - window--; - output.yield(chunk); - } - } + // also used in `read` for progress estimates + output.length = length; _stream.on("error", function (error) { output.throw(error); @@ -49,16 +40,22 @@ function Reader(_stream, charset) { output.return(); }); + _stream.on("readable", function () { + drained = false; + flush(); + }); - if (supportsReadable) { - _stream.on("readable", function () { - drained = false; - tick(); - }); - } else { - _stream.on("data", function (chunk) { - output.yield(chunk); - }); + function flush() { + if (window > 0 && !drained) { + var chunk = _stream.read(); + if (chunk === null) { + drained = true; + return; + } + window--; + output.yield(chunk, index); + index += chunk.length; + } } return output; diff --git a/streams.js b/streams.js index f5ca107..07f1714 100644 --- a/streams.js +++ b/streams.js @@ -223,16 +223,27 @@ Readable.prototype.copy = function (output) { Readable.prototype.read = function () { var chunks = []; var self = this; - return this.forEach(function (chunk) { + var deferred = Q.defer(); + var index = 0; + var start = Date.now(); + this.forEach(function (chunk) { chunks.push(chunk); - }) - .then(function () { + index += chunk.length; + if (this.length) { + var now = Date.now(); // milliseconds + var speed = index / (now - start); // bytes per milliseconds + var remaining = this.length - index; // bytes + deferred.setEstimate(now + remaining / speed); // milliseconds + } + }, this) + .done(function () { if (self.charset) { - return chunks.join(""); + deferred.resolve(chunks.join("")); } else { - return Buffer.concat(chunks); + deferred.resolve(Buffer.concat(chunks)); } - }); + }, deferred.reject); + return deferred.promise; } Readable.prototype.cancel = function () {