From 313ecaabe5e82f48d3d8a71b234cefe5c577972c Mon Sep 17 00:00:00 2001 From: Robert Nagy Date: Fri, 21 Feb 2020 12:50:34 +0100 Subject: [PATCH] stream: fix broken pipeline error propagation If the destination was an async function any error thrown from that function would be swallowed. Backport-PR-URL: https://github.com/nodejs/node/pull/31975 PR-URL: https://github.com/nodejs/node/pull/31835 Reviewed-By: Benjamin Gruenbaum Reviewed-By: Matteo Collina Reviewed-By: Denys Otrishko --- lib/internal/streams/pipeline.js | 16 ++++++------ .../parallel/test-stream-pipeline-uncaught.js | 25 +++++++++++++++++++ test/parallel/test-stream-pipeline.js | 6 +---- 3 files changed, 34 insertions(+), 13 deletions(-) create mode 100644 test/parallel/test-stream-pipeline-uncaught.js diff --git a/lib/internal/streams/pipeline.js b/lib/internal/streams/pipeline.js index e0834171bfb8fc..fdc154c32edf5d 100644 --- a/lib/internal/streams/pipeline.js +++ b/lib/internal/streams/pipeline.js @@ -163,9 +163,10 @@ function pipeline(...streams) { } let error; + let value; const destroys = []; - function finish(err, val, final) { + function finish(err, final) { if (!error && err) { error = err; } @@ -177,13 +178,13 @@ function pipeline(...streams) { } if (final) { - callback(error, val); + callback(error, value); } } function wrap(stream, reading, writing, final) { destroys.push(destroyer(stream, reading, writing, (err) => { - finish(err, null, final); + finish(err, final); })); } @@ -229,11 +230,10 @@ function pipeline(...streams) { if (isPromise(ret)) { ret .then((val) => { + value = val; pt.end(val); - finish(null, val, true); - }) - .catch((err) => { - finish(err, null, true); + }, (err) => { + pt.destroy(err); }); } else if (isIterable(ret, true)) { pump(ret, pt, finish); @@ -243,7 +243,7 @@ function pipeline(...streams) { } ret = pt; - wrap(ret, true, false, true); + wrap(ret, false, true, true); } } else if (isStream(stream)) { if (isReadable(ret)) { diff --git a/test/parallel/test-stream-pipeline-uncaught.js b/test/parallel/test-stream-pipeline-uncaught.js new file mode 100644 index 00000000000000..90d141ec44fef1 --- /dev/null +++ b/test/parallel/test-stream-pipeline-uncaught.js @@ -0,0 +1,25 @@ +'use strict'; + +const common = require('../common'); +const { + pipeline, + PassThrough +} = require('stream'); +const assert = require('assert'); + +process.on('uncaughtException', common.mustCall((err) => { + assert.strictEqual(err.message, 'error'); +})); + +// Ensure that pipeline that ends with Promise +// still propagates error to uncaughtException. +const s = new PassThrough(); +s.end('data'); +pipeline(s, async function(source) { + for await (const chunk of source) { + chunk; + } +}, common.mustCall((err) => { + assert.ifError(err); + throw new Error('error'); +})); diff --git a/test/parallel/test-stream-pipeline.js b/test/parallel/test-stream-pipeline.js index 19fc246e2bf3cd..b3d4064c6a9783 100644 --- a/test/parallel/test-stream-pipeline.js +++ b/test/parallel/test-stream-pipeline.js @@ -613,11 +613,9 @@ const { promisify } = require('util'); yield 'hello'; yield 'world'; }, async function*(source) { - const ret = []; for await (const chunk of source) { - ret.push(chunk.toUpperCase()); + yield chunk.toUpperCase(); } - yield ret; }, async function(source) { let ret = ''; for await (const chunk of source) { @@ -754,7 +752,6 @@ const { promisify } = require('util'); }, common.mustCall((err) => { assert.strictEqual(err, undefined); assert.strictEqual(ret, 'asd'); - assert.strictEqual(s.destroyed, true); })); } @@ -775,7 +772,6 @@ const { promisify } = require('util'); }, common.mustCall((err) => { assert.strictEqual(err, undefined); assert.strictEqual(ret, 'asd'); - assert.strictEqual(s.destroyed, true); })); }