diff --git a/doc/api/stream.md b/doc/api/stream.md index 014b2f68b3359b..09ff2c02b1885b 100644 --- a/doc/api/stream.md +++ b/doc/api/stream.md @@ -1340,14 +1340,14 @@ run().catch(console.error); rs.resume(); // drain the stream ``` -### stream.pipeline(...streams[, callback]) +### stream.pipeline(...streams, callback) * `...streams` {Stream} Two or more streams to pipe between. -* `callback` {Function} A callback function that takes an optional error - argument. +* `callback` {Function} Called when the pipeline is fully done. + * `err` {Error} A module method to pipe between streams forwarding errors and properly cleaning up and provide a callback when the pipeline is complete. diff --git a/lib/internal/streams/pipeline.js b/lib/internal/streams/pipeline.js index 849b3d39dbe25b..caa4042339bd37 100644 --- a/lib/internal/streams/pipeline.js +++ b/lib/internal/streams/pipeline.js @@ -6,6 +6,7 @@ let eos; const { + ERR_INVALID_CALLBACK, ERR_MISSING_ARGS, ERR_STREAM_DESTROYED } = require('internal/errors').codes; @@ -19,11 +20,6 @@ function once(callback) { }; } -function noop(err) { - // Rethrow the error if it exists to avoid swallowing it - if (err) throw err; -} - function isRequest(stream) { return stream.setHeader && typeof stream.abort === 'function'; } @@ -66,8 +62,11 @@ function pipe(from, to) { } function popCallback(streams) { - if (!streams.length) return noop; - if (typeof streams[streams.length - 1] !== 'function') return noop; + // Streams should never be an empty array. It should always contain at least + // a single stream. Therefore optimize for the average case instead of + // checking for length === 0 as well. + if (typeof streams[streams.length - 1] !== 'function') + throw new ERR_INVALID_CALLBACK(); return streams.pop(); } diff --git a/test/parallel/test-stream-pipeline.js b/test/parallel/test-stream-pipeline.js index 12733d88a7ac85..f735054e88e9b8 100644 --- a/test/parallel/test-stream-pipeline.js +++ b/test/parallel/test-stream-pipeline.js @@ -60,7 +60,7 @@ common.crashOnUnhandledRejection(); }, /ERR_MISSING_ARGS/); assert.throws(() => { pipeline(); - }, /ERR_MISSING_ARGS/); + }, /ERR_INVALID_CALLBACK/); } { @@ -493,17 +493,8 @@ common.crashOnUnhandledRejection(); } }); - read.on('close', common.mustCall()); - transform.on('close', common.mustCall()); - write.on('close', common.mustCall()); - - process.on('uncaughtException', common.mustCall((err) => { - assert.deepStrictEqual(err, new Error('kaboom')); - })); - - const dst = pipeline(read, transform, write); - - assert.strictEqual(dst, write); - - read.push('hello'); + assert.throws( + () => pipeline(read, transform, write), + { code: 'ERR_INVALID_CALLBACK' } + ); }