From 4f76b580c10be32391991e7fbbd446dbe54a6f36 Mon Sep 17 00:00:00 2001 From: Robert Nagy Date: Mon, 9 Mar 2020 17:00:01 +0100 Subject: [PATCH 1/2] stream: make pipeline try to wait for 'close' Pipeline uses eos which will invoke the callback on 'finish' and 'end' before all streams have been fully destroyed. Fixes: https://github.com/nodejs/node/issues/32032 --- lib/internal/streams/end-of-stream.js | 25 ++++++++++++++++ test/parallel/test-stream-pipeline.js | 43 ++++++++++++++++++++++++++- 2 files changed, 67 insertions(+), 1 deletion(-) diff --git a/lib/internal/streams/end-of-stream.js b/lib/internal/streams/end-of-stream.js index aad43823f9afb9..6e590401f1e56e 100644 --- a/lib/internal/streams/end-of-stream.js +++ b/lib/internal/streams/end-of-stream.js @@ -63,15 +63,33 @@ function eos(stream, opts, callback) { const wState = stream._writableState; const rState = stream._readableState; + const state = wState || rState; const onlegacyfinish = () => { if (!stream.writable) onfinish(); }; + // TODO (ronag): Improve soft detection to include core modules and + // common ecosystem modules that do properly emit 'close' but fail + // this generic check. + const willEmitClose = ( + state && + state.autoDestroy && + state.emitClose && + state.closed === false + ); + let writableFinished = stream.writableFinished || (wState && wState.finished); const onfinish = () => { writableFinished = true; + if (willEmitClose && (!stream.readable || readable)) { + // TODO(ronag): Duplex won't autoDestroy if `stream.readable` was + // explicitly set to false. + if (!rState || rState.readable !== false) { + return; + } + } if (!readable || readableEnded) callback.call(stream); }; @@ -79,6 +97,13 @@ function eos(stream, opts, callback) { (rState && rState.endEmitted); const onend = () => { readableEnded = true; + if (willEmitClose && (!stream.writable || writable)) { + // TODO(ronag): Duplex won't autoDestroy if `stream.writable` was + // explicitly set to false. + if (!wState || wState.writable !== false) { + return; + } + } if (!writable || writableFinished) callback.call(stream); }; diff --git a/test/parallel/test-stream-pipeline.js b/test/parallel/test-stream-pipeline.js index c2b454e80b38af..b273fddfa3b613 100644 --- a/test/parallel/test-stream-pipeline.js +++ b/test/parallel/test-stream-pipeline.js @@ -7,7 +7,8 @@ const { Readable, Transform, pipeline, - PassThrough + PassThrough, + Duplex } = require('stream'); const assert = require('assert'); const http = require('http'); @@ -1077,3 +1078,43 @@ const { promisify } = require('util'); assert.ifError(err); })); } + +{ + let closed = false; + const src = new Readable({ + read() {}, + destroy(err, cb) { + process.nextTick(cb); + } + }); + const dst = new Writable({ + write(chunk, encoding, callback) { + callback(); + } + }); + src.on('close', () => { + closed = true; + }); + src.push(null); + pipeline(src, dst, common.mustCall((err) => { + assert.strictEqual(closed, true); + })); +} + +{ + let closed = false; + const src = new Readable({ + read() {}, + destroy(err, cb) { + process.nextTick(cb); + } + }); + const dst = new Duplex({}); + src.on('close', common.mustCall(() => { + closed = true; + })); + src.push(null); + pipeline(src, dst, common.mustCall((err) => { + assert.strictEqual(closed, true); + })); +} From c8020c4b438f44064ab1df51ec79b45e772c5397 Mon Sep 17 00:00:00 2001 From: Robert Nagy Date: Thu, 26 Mar 2020 01:31:02 +0100 Subject: [PATCH 2/2] fixup: remove no longer necessary check --- lib/internal/streams/end-of-stream.js | 16 ++-------------- 1 file changed, 2 insertions(+), 14 deletions(-) diff --git a/lib/internal/streams/end-of-stream.js b/lib/internal/streams/end-of-stream.js index 6e590401f1e56e..4742391fd71a7a 100644 --- a/lib/internal/streams/end-of-stream.js +++ b/lib/internal/streams/end-of-stream.js @@ -83,13 +83,7 @@ function eos(stream, opts, callback) { (wState && wState.finished); const onfinish = () => { writableFinished = true; - if (willEmitClose && (!stream.readable || readable)) { - // TODO(ronag): Duplex won't autoDestroy if `stream.readable` was - // explicitly set to false. - if (!rState || rState.readable !== false) { - return; - } - } + if (willEmitClose && (!stream.readable || readable)) return; if (!readable || readableEnded) callback.call(stream); }; @@ -97,13 +91,7 @@ function eos(stream, opts, callback) { (rState && rState.endEmitted); const onend = () => { readableEnded = true; - if (willEmitClose && (!stream.writable || writable)) { - // TODO(ronag): Duplex won't autoDestroy if `stream.writable` was - // explicitly set to false. - if (!wState || wState.writable !== false) { - return; - } - } + if (willEmitClose && (!stream.writable || writable)) return; if (!writable || writableFinished) callback.call(stream); };