From c6fb6196211e025212b0d9f6b5b99771e5dc801f Mon Sep 17 00:00:00 2001 From: Robert Nagy Date: Fri, 2 Jul 2021 14:51:30 +0200 Subject: [PATCH 1/4] stream: finished should error on errored stream. Calling finished before or after a stream has errored or closed should end up with the same behavior. --- lib/_http_client.js | 3 ++ lib/_http_incoming.js | 3 ++ lib/internal/streams/end-of-stream.js | 57 ++++++++++++++++----------- test/parallel/test-stream-finished.js | 23 +++++++++++ 4 files changed, 63 insertions(+), 23 deletions(-) diff --git a/lib/_http_client.js b/lib/_http_client.js index fde7fde86bbf25..2af1f9eb138d83 100644 --- a/lib/_http_client.js +++ b/lib/_http_client.js @@ -806,6 +806,9 @@ function onSocketNT(req, socket, err) { socket.emit('free'); } else { finished(socket.destroy(err || req[kError]), (er) => { + if (er.code === 'ERR_STREAM_PREMATURE_CLOSE') { + er = null; + } _destroy(req, er || err); }); return; diff --git a/lib/_http_incoming.js b/lib/_http_incoming.js index 3961b583de9ddc..05b8def95e3066 100644 --- a/lib/_http_incoming.js +++ b/lib/_http_incoming.js @@ -188,6 +188,9 @@ IncomingMessage.prototype._destroy = function _destroy(err, cb) { if (this.socket && !this.socket.destroyed && this.aborted) { this.socket.destroy(err); const cleanup = finished(this.socket, (e) => { + if (e.code === 'ERR_STREAM_PREMATURE_CLOSE') { + e = null; + } cleanup(); process.nextTick(onError, this, e || err, cb); }); diff --git a/lib/internal/streams/end-of-stream.js b/lib/internal/streams/end-of-stream.js index 318ab4c2e6a8b7..eb1d70c4cecdef 100644 --- a/lib/internal/streams/end-of-stream.js +++ b/lib/internal/streams/end-of-stream.js @@ -128,7 +128,17 @@ function eos(stream, options, callback) { callback.call(stream, err); }; + let closed = false; + const onclose = () => { + closed = true; + + const errored = wState?.errored || rState?.errored; + + if (errored && typeof errored !== 'boolean') { + return callback.call(stream, errored); + } + if (readable && !readableEnded) { if (!isReadableEnded(stream)) return callback.call(stream, @@ -139,6 +149,7 @@ function eos(stream, options, callback) { return callback.call(stream, new ERR_STREAM_PREMATURE_CLOSE()); } + callback.call(stream); }; @@ -168,29 +179,29 @@ function eos(stream, options, callback) { if (options.error !== false) stream.on('error', onerror); stream.on('close', onclose); - // _closed is for OutgoingMessage which is not a proper Writable. - const closed = (!wState && !rState && stream._closed === true) || ( - (wState && wState.closed) || - (rState && rState.closed) || - (wState && wState.errorEmitted) || - (rState && rState.errorEmitted) || - (rState && stream.req && stream.aborted) || - ( - (!writable || (wState && wState.finished)) && - (!readable || (rState && rState.endEmitted)) - ) - ); - - if (closed) { - // TODO(ronag): Re-throw error if errorEmitted? - // TODO(ronag): Throw premature close as if finished was called? - // before being closed? i.e. if closed but not errored, ended or finished. - // TODO(ronag): Throw some kind of error? Does it make sense - // to call finished() on a "finished" stream? - // TODO(ronag): willEmitClose? - process.nextTick(() => { - callback(); - }); + if (closed || wState?.closed || rState?.closed) { + process.nextTick(onclose); + } else if (wState?.errorEmitted || rState?.errorEmitted) { + if (!willEmitClose) { + process.nextTick(onclose); + } + } else if ( + !readable && + (!willEmitClose || stream.readable) && + writableFinished + ) { + process.nextTick(onclose); + } else if ( + !writable && + (!willEmitClose || stream.writable) && + readableEnded + ) { + process.nextTick(onclose); + } else if (!wState && !rState && stream._closed === true) { + // _closed is for OutgoingMessage which is not a proper Writable. + process.nextTick(onclose); + } else if ((rState && stream.req && stream.aborted)) { + process.nextTick(onclose); } const cleanup = () => { diff --git a/test/parallel/test-stream-finished.js b/test/parallel/test-stream-finished.js index 43b1e36a547402..8e371911698336 100644 --- a/test/parallel/test-stream-finished.js +++ b/test/parallel/test-stream-finished.js @@ -608,3 +608,26 @@ testClosed((opts) => new Writable({ write() {}, ...opts })); assert.strictEqual(closed, true); })); } + +{ + const w = new Writable(); + const _err = new Error(); + w.destroy(_err); + finished(w, common.mustCall((err) => { + assert.strictEqual(_err, err); + finished(w, common.mustCall((err) => { + assert.strictEqual(_err, err); + })); + })); +} + +{ + const w = new Writable(); + w.destroy(); + finished(w, common.mustCall((err) => { + assert.strictEqual(err.code, 'ERR_STREAM_PREMATURE_CLOSE'); + finished(w, common.mustCall((err) => { + assert.strictEqual(err.code, 'ERR_STREAM_PREMATURE_CLOSE'); + })); + })); +} From 1d2286f43f44ccebb711629b2a4fd5bd8b822002 Mon Sep 17 00:00:00 2001 From: Robert Nagy Date: Fri, 2 Jul 2021 15:02:23 +0200 Subject: [PATCH 2/4] fixup --- lib/internal/streams/end-of-stream.js | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/lib/internal/streams/end-of-stream.js b/lib/internal/streams/end-of-stream.js index eb1d70c4cecdef..f920146d410c05 100644 --- a/lib/internal/streams/end-of-stream.js +++ b/lib/internal/streams/end-of-stream.js @@ -128,7 +128,7 @@ function eos(stream, options, callback) { callback.call(stream, err); }; - let closed = false; + let closed = wState?.closed || rState?.closed; const onclose = () => { closed = true; @@ -179,7 +179,7 @@ function eos(stream, options, callback) { if (options.error !== false) stream.on('error', onerror); stream.on('close', onclose); - if (closed || wState?.closed || rState?.closed) { + if (closed) { process.nextTick(onclose); } else if (wState?.errorEmitted || rState?.errorEmitted) { if (!willEmitClose) { From afc82fa8331b8b29a0c03a090890bb6a39e0bdfb Mon Sep 17 00:00:00 2001 From: Robert Nagy Date: Fri, 2 Jul 2021 15:05:53 +0200 Subject: [PATCH 3/4] fixup --- lib/internal/streams/end-of-stream.js | 6 ++---- 1 file changed, 2 insertions(+), 4 deletions(-) diff --git a/lib/internal/streams/end-of-stream.js b/lib/internal/streams/end-of-stream.js index f920146d410c05..efc2441c51ee39 100644 --- a/lib/internal/streams/end-of-stream.js +++ b/lib/internal/streams/end-of-stream.js @@ -98,8 +98,7 @@ function eos(stream, options, callback) { isWritable(stream) === writable ); - let writableFinished = stream.writableFinished || - (wState && wState.finished); + let writableFinished = stream.writableFinished || wState?.finished; const onfinish = () => { writableFinished = true; // Stream should not be destroyed here. If it is that @@ -111,8 +110,7 @@ function eos(stream, options, callback) { if (!readable || readableEnded) callback.call(stream); }; - let readableEnded = stream.readableEnded || - (rState && rState.endEmitted); + let readableEnded = stream.readableEnded || rState?.endEmitted; const onend = () => { readableEnded = true; // Stream should not be destroyed here. If it is that From 04e1ac64c3339276f043de900529d28a3a1785e3 Mon Sep 17 00:00:00 2001 From: Robert Nagy Date: Fri, 2 Jul 2021 15:31:35 +0200 Subject: [PATCH 4/4] fixup --- lib/_http_client.js | 2 +- lib/_http_incoming.js | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/lib/_http_client.js b/lib/_http_client.js index 2af1f9eb138d83..598b585bcfa383 100644 --- a/lib/_http_client.js +++ b/lib/_http_client.js @@ -806,7 +806,7 @@ function onSocketNT(req, socket, err) { socket.emit('free'); } else { finished(socket.destroy(err || req[kError]), (er) => { - if (er.code === 'ERR_STREAM_PREMATURE_CLOSE') { + if (er?.code === 'ERR_STREAM_PREMATURE_CLOSE') { er = null; } _destroy(req, er || err); diff --git a/lib/_http_incoming.js b/lib/_http_incoming.js index 05b8def95e3066..a92687ce37bfbc 100644 --- a/lib/_http_incoming.js +++ b/lib/_http_incoming.js @@ -188,7 +188,7 @@ IncomingMessage.prototype._destroy = function _destroy(err, cb) { if (this.socket && !this.socket.destroyed && this.aborted) { this.socket.destroy(err); const cleanup = finished(this.socket, (e) => { - if (e.code === 'ERR_STREAM_PREMATURE_CLOSE') { + if (e?.code === 'ERR_STREAM_PREMATURE_CLOSE') { e = null; } cleanup();