diff --git a/lib/internal/streams/end-of-stream.js b/lib/internal/streams/end-of-stream.js index 8bbd4827b23a3a..4ad7b93337f633 100644 --- a/lib/internal/streams/end-of-stream.js +++ b/lib/internal/streams/end-of-stream.js @@ -36,8 +36,6 @@ function eos(stream, opts, callback) { callback = once(callback); - const ws = stream._writableState; - const rs = stream._readableState; let readable = opts.readable || (opts.readable !== false && stream.readable); let writable = opts.writable || (opts.writable !== false && stream.writable); @@ -45,13 +43,17 @@ function eos(stream, opts, callback) { if (!stream.writable) onfinish(); }; + var writableEnded = stream._writableState && stream._writableState.finished; const onfinish = () => { writable = false; + writableEnded = true; if (!readable) callback.call(stream); }; + var readableEnded = stream._readableState && stream._readableState.endEmitted; const onend = () => { readable = false; + readableEnded = true; if (!writable) callback.call(stream); }; @@ -60,11 +62,16 @@ function eos(stream, opts, callback) { }; const onclose = () => { - if (readable && !(rs && rs.ended)) { - return callback.call(stream, new ERR_STREAM_PREMATURE_CLOSE()); + let err; + if (readable && !readableEnded) { + if (!stream._readableState || !stream._readableState.ended) + err = new ERR_STREAM_PREMATURE_CLOSE(); + return callback.call(stream, err); } - if (writable && !(ws && ws.ended)) { - return callback.call(stream, new ERR_STREAM_PREMATURE_CLOSE()); + if (writable && !writableEnded) { + if (!stream._writableState || !stream._writableState.ended) + err = new ERR_STREAM_PREMATURE_CLOSE(); + return callback.call(stream, err); } }; @@ -77,7 +84,7 @@ function eos(stream, opts, callback) { stream.on('abort', onclose); if (stream.req) onrequest(); else stream.on('request', onrequest); - } else if (writable && !ws) { // legacy streams + } else if (writable && !stream._writableState) { // legacy streams stream.on('end', onlegacyfinish); stream.on('close', onlegacyfinish); } diff --git a/test/parallel/parallel.status b/test/parallel/parallel.status index cfc4e36e9a565f..b45e4448d97c06 100644 --- a/test/parallel/parallel.status +++ b/test/parallel/parallel.status @@ -12,8 +12,6 @@ test-net-connect-options-port: PASS,FLAKY test-http2-pipe: PASS,FLAKY test-worker-syntax-error: PASS,FLAKY test-worker-syntax-error-file: PASS,FLAKY -# https://github.com/nodejs/node/issues/24456 -test-stream-pipeline-http2: PASS,FLAKY [$system==linux] diff --git a/test/parallel/test-stream-pipeline-queued-end-in-destroy.js b/test/parallel/test-stream-pipeline-queued-end-in-destroy.js new file mode 100644 index 00000000000000..d5e399ddda531b --- /dev/null +++ b/test/parallel/test-stream-pipeline-queued-end-in-destroy.js @@ -0,0 +1,39 @@ +'use strict'; +const common = require('../common'); +const assert = require('assert'); +const { Readable, Duplex, pipeline } = require('stream'); + +// Test that the callback for pipeline() is called even when the ._destroy() +// method of the stream places an .end() request to itself that does not +// get processed before the destruction of the stream (i.e. the 'close' event). +// Refs: https://github.com/nodejs/node/issues/24456 + +const readable = new Readable({ + read: common.mustCall(() => {}) +}); + +const duplex = new Duplex({ + write(chunk, enc, cb) { + // Simulate messages queueing up. + }, + read() {}, + destroy(err, cb) { + // Call end() from inside the destroy() method, like HTTP/2 streams + // do at the time of writing. + this.end(); + cb(err); + } +}); + +duplex.on('finished', common.mustNotCall()); + +pipeline(readable, duplex, common.mustCall((err) => { + assert.strictEqual(err.code, 'ERR_STREAM_PREMATURE_CLOSE'); +})); + +// Write one chunk of data, and destroy the stream later. +// That should trigger the pipeline destruction. +readable.push('foo'); +setImmediate(() => { + readable.destroy(); +});