diff --git a/doc/api/stream.md b/doc/api/stream.md index 92dfe1a42a39bb..1a2d14f865f5e5 100644 --- a/doc/api/stream.md +++ b/doc/api/stream.md @@ -1555,17 +1555,30 @@ const cleanup = finished(rs, (err) => { }); ``` -### `stream.pipeline(...streams, callback)` +### `stream.pipeline(source, ...transforms, destination, callback)` - -* `...streams` {Stream} Two or more streams to pipe between. +changes: + - version: REPLACEME + pr-url: https://github.com/nodejs/node/pull/31223 + description: Add support for async generators. +--> + +* `source` {Stream|Iterable|AsyncIterable|Function} + * Returns: {Iterable|AsyncIterable} +* `...transforms` {Stream|Function} + * `source` {AsyncIterable} + * Returns: {AsyncIterable} +* `destination` {Stream|Function} + * `source` {AsyncIterable} + * Returns: {AsyncIterable|Promise} * `callback` {Function} Called when the pipeline is fully done. * `err` {Error} + * `val` Resolved value of `Promise` returned by `destination`. +* Returns: {Stream} -A module method to pipe between streams forwarding errors and properly cleaning -up and provide a callback when the pipeline is complete. +A module method to pipe between streams and generators forwarding errors and +properly cleaning up and provide a callback when the pipeline is complete. ```js const { pipeline } = require('stream'); @@ -1608,6 +1621,28 @@ async function run() { run().catch(console.error); ``` +The `pipeline` API also supports async generators: + +```js +const pipeline = util.promisify(stream.pipeline); +const fs = require('fs').promises; + +async function run() { + await pipeline( + fs.createReadStream('lowercase.txt'), + async function* (source) { + for await (const chunk of source) { + yield String(chunk).toUpperCase(); + } + }, + fs.createWriteStream('uppercase.txt') + ); + console.log('Pipeline succeeded.'); +} + +run().catch(console.error); +``` + `stream.pipeline()` will call `stream.destroy(err)` on all streams except: * `Readable` streams which have emitted `'end'` or `'close'`. * `Writable` streams which have emitted `'finish'` or `'close'`. @@ -2707,8 +2742,7 @@ const pipeline = util.promisify(stream.pipeline); const writable = fs.createWriteStream('./file'); (async function() { - const readable = Readable.from(iterable); - await pipeline(readable, writable); + await pipeline(iterable, writable); })(); ``` @@ -2843,7 +2877,7 @@ contain multi-byte characters. [`stream.cork()`]: #stream_writable_cork [`stream.finished()`]: #stream_stream_finished_stream_options_callback [`stream.pipe()`]: #stream_readable_pipe_destination_options -[`stream.pipeline()`]: #stream_stream_pipeline_streams_callback +[`stream.pipeline()`]: #stream_stream_pipeline_source_transforms_destination_callback [`stream.uncork()`]: #stream_writable_uncork [`stream.unpipe()`]: #stream_readable_unpipe_destination [`stream.wrap()`]: #stream_readable_wrap_stream diff --git a/lib/internal/streams/pipeline.js b/lib/internal/streams/pipeline.js index 92a91c30171af1..df0d7bc85d366c 100644 --- a/lib/internal/streams/pipeline.js +++ b/lib/internal/streams/pipeline.js @@ -5,55 +5,57 @@ const { ArrayIsArray, + SymbolAsyncIterator, + SymbolIterator } = primordials; let eos; const { once } = require('internal/util'); const { + ERR_INVALID_ARG_TYPE, + ERR_INVALID_RETURN_VALUE, ERR_INVALID_CALLBACK, ERR_MISSING_ARGS, ERR_STREAM_DESTROYED } = require('internal/errors').codes; +let EE; +let PassThrough; +let createReadableStreamAsyncIterator; + function isRequest(stream) { return stream && stream.setHeader && typeof stream.abort === 'function'; } +function destroyStream(stream, err) { + // request.destroy just do .end - .abort is what we want + if (isRequest(stream)) return stream.abort(); + if (isRequest(stream.req)) return stream.req.abort(); + if (typeof stream.destroy === 'function') return stream.destroy(err); + if (typeof stream.close === 'function') return stream.close(); +} + function destroyer(stream, reading, writing, callback) { callback = once(callback); - - let closed = false; - stream.on('close', () => { - closed = true; - }); + let destroyed = false; if (eos === undefined) eos = require('internal/streams/end-of-stream'); eos(stream, { readable: reading, writable: writing }, (err) => { - if (err) return callback(err); - closed = true; - callback(); + if (destroyed) return; + destroyed = true; + destroyStream(stream, err); + callback(err); }); - let destroyed = false; return (err) => { - if (closed) return; if (destroyed) return; destroyed = true; - - // request.destroy just do .end - .abort is what we want - if (isRequest(stream)) return stream.abort(); - if (isRequest(stream.req)) return stream.req.abort(); - if (typeof stream.destroy === 'function') return stream.destroy(err); - + destroyStream(stream, err); callback(err || new ERR_STREAM_DESTROYED('pipe')); }; } -function pipe(from, to) { - return from.pipe(to); -} - function popCallback(streams) { // Streams should never be an empty array. It should always contain at least // a single stream. Therefore optimize for the average case instead of @@ -63,8 +65,89 @@ function popCallback(streams) { return streams.pop(); } +function isPromise(obj) { + return !!(obj && typeof obj.then === 'function'); +} + +function isReadable(obj) { + return !!(obj && typeof obj.pipe === 'function'); +} + +function isWritable(obj) { + return !!(obj && typeof obj.write === 'function'); +} + +function isStream(obj) { + return isReadable(obj) || isWritable(obj); +} + +function isIterable(obj, isAsync) { + if (!obj) return false; + if (isAsync === true) return typeof obj[SymbolAsyncIterator] === 'function'; + if (isAsync === false) return typeof obj[SymbolIterator] === 'function'; + return typeof obj[SymbolAsyncIterator] === 'function' || + typeof obj[SymbolIterator] === 'function'; +} + +function makeAsyncIterable(val) { + if (isIterable(val)) { + return val; + } else if (isReadable(val)) { + // Legacy streams are not Iterable. + return _fromReadable(val); + } else { + throw new ERR_INVALID_ARG_TYPE( + 'val', ['Readable', 'Iterable', 'AsyncIterable'], val); + } +} + +async function* _fromReadable(val) { + if (!createReadableStreamAsyncIterator) { + createReadableStreamAsyncIterator = + require('internal/streams/async_iterator'); + } + + try { + if (typeof val.read !== 'function') { + // createReadableStreamAsyncIterator does not support + // v1 streams. Convert it into a v2 stream. + + if (!PassThrough) { + PassThrough = require('_stream_passthrough'); + } + + const pt = new PassThrough(); + val + .on('error', (err) => pt.destroy(err)) + .pipe(pt); + yield* createReadableStreamAsyncIterator(pt); + } else { + yield* createReadableStreamAsyncIterator(val); + } + } finally { + destroyStream(val); + } +} + +async function pump(iterable, writable, finish) { + if (!EE) { + EE = require('events'); + } + try { + for await (const chunk of iterable) { + if (!writable.write(chunk)) { + if (writable.destroyed) return; + await EE.once(writable, 'drain'); + } + } + writable.end(); + } catch (err) { + finish(err); + } +} + function pipeline(...streams) { - const callback = popCallback(streams); + const callback = once(popCallback(streams)); if (ArrayIsArray(streams[0])) streams = streams[0]; @@ -73,25 +156,104 @@ function pipeline(...streams) { } let error; - const destroys = streams.map(function(stream, i) { + let value; + const destroys = []; + + function finish(err, final) { + if (!error && err) { + error = err; + } + + if (error || final) { + for (const destroy of destroys) { + destroy(error); + } + } + + if (final) { + callback(error, value); + } + } + + function wrap(stream, reading, writing, final) { + destroys.push(destroyer(stream, reading, writing, (err) => { + finish(err, final); + })); + } + + let ret; + for (let i = 0; i < streams.length; i++) { + const stream = streams[i]; const reading = i < streams.length - 1; const writing = i > 0; - return destroyer(stream, reading, writing, function(err) { - if (!error) error = err; - if (err) { - for (const destroy of destroys) { - destroy(err); + + if (isStream(stream)) { + wrap(stream, reading, writing, !reading); + } + + if (i === 0) { + if (typeof stream === 'function') { + ret = stream(); + if (!isIterable(ret)) { + throw new ERR_INVALID_RETURN_VALUE( + 'Iterable, AsyncIterable or Stream', 'source', ret); } + } else if (isIterable(stream) || isReadable(stream)) { + ret = stream; + } else { + throw new ERR_INVALID_ARG_TYPE( + 'source', ['Stream', 'Iterable', 'AsyncIterable', 'Function'], + stream); } - if (reading) return; - for (const destroy of destroys) { - destroy(); + } else if (typeof stream === 'function') { + ret = makeAsyncIterable(ret); + ret = stream(ret); + + if (reading) { + if (!isIterable(ret, true)) { + throw new ERR_INVALID_RETURN_VALUE( + 'AsyncIterable', `transform[${i - 1}]`, ret); + } + } else { + if (!PassThrough) { + PassThrough = require('_stream_passthrough'); + } + + const pt = new PassThrough(); + if (isPromise(ret)) { + ret + .then((val) => { + value = val; + pt.end(val); + }, (err) => { + pt.destroy(err); + }); + } else if (isIterable(ret, true)) { + pump(ret, pt, finish); + } else { + throw new ERR_INVALID_RETURN_VALUE( + 'AsyncIterable or Promise', 'destination', ret); + } + + ret = pt; + wrap(ret, false, true, true); } - callback(error); - }); - }); + } else if (isStream(stream)) { + if (isReadable(ret)) { + ret.pipe(stream); + } else { + ret = makeAsyncIterable(ret); + pump(ret, stream, finish); + } + ret = stream; + } else { + const name = reading ? `transform[${i - 1}]` : 'destination'; + throw new ERR_INVALID_ARG_TYPE( + name, ['Stream', 'Function'], ret); + } + } - return streams.reduce(pipe); + return ret; } module.exports = pipeline; diff --git a/test/parallel/test-stream-pipeline-uncaught.js b/test/parallel/test-stream-pipeline-uncaught.js new file mode 100644 index 00000000000000..90d141ec44fef1 --- /dev/null +++ b/test/parallel/test-stream-pipeline-uncaught.js @@ -0,0 +1,25 @@ +'use strict'; + +const common = require('../common'); +const { + pipeline, + PassThrough +} = require('stream'); +const assert = require('assert'); + +process.on('uncaughtException', common.mustCall((err) => { + assert.strictEqual(err.message, 'error'); +})); + +// Ensure that pipeline that ends with Promise +// still propagates error to uncaughtException. +const s = new PassThrough(); +s.end('data'); +pipeline(s, async function(source) { + for await (const chunk of source) { + chunk; + } +}, common.mustCall((err) => { + assert.ifError(err); + throw new Error('error'); +})); diff --git a/test/parallel/test-stream-pipeline.js b/test/parallel/test-stream-pipeline.js index f6ee97ba43d053..6bfa1331834968 100644 --- a/test/parallel/test-stream-pipeline.js +++ b/test/parallel/test-stream-pipeline.js @@ -516,3 +516,409 @@ const { promisify } = require('util'); }).on('error', common.mustNotCall()); }); } + +{ + let res = ''; + const w = new Writable({ + write(chunk, encoding, callback) { + res += chunk; + callback(); + } + }); + pipeline(function*() { + yield 'hello'; + yield 'world'; + }(), w, common.mustCall((err) => { + assert.ok(!err); + assert.strictEqual(res, 'helloworld'); + })); +} + +{ + let res = ''; + const w = new Writable({ + write(chunk, encoding, callback) { + res += chunk; + callback(); + } + }); + pipeline(async function*() { + await Promise.resolve(); + yield 'hello'; + yield 'world'; + }(), w, common.mustCall((err) => { + assert.ok(!err); + assert.strictEqual(res, 'helloworld'); + })); +} + +{ + let res = ''; + const w = new Writable({ + write(chunk, encoding, callback) { + res += chunk; + callback(); + } + }); + pipeline(function*() { + yield 'hello'; + yield 'world'; + }, w, common.mustCall((err) => { + assert.ok(!err); + assert.strictEqual(res, 'helloworld'); + })); +} + +{ + let res = ''; + const w = new Writable({ + write(chunk, encoding, callback) { + res += chunk; + callback(); + } + }); + pipeline(async function*() { + await Promise.resolve(); + yield 'hello'; + yield 'world'; + }, w, common.mustCall((err) => { + assert.ok(!err); + assert.strictEqual(res, 'helloworld'); + })); +} + +{ + let res = ''; + pipeline(async function*() { + await Promise.resolve(); + yield 'hello'; + yield 'world'; + }, async function*(source) { + for await (const chunk of source) { + yield chunk.toUpperCase(); + } + }, async function(source) { + for await (const chunk of source) { + res += chunk; + } + }, common.mustCall((err) => { + assert.ok(!err); + assert.strictEqual(res, 'HELLOWORLD'); + })); +} + +{ + pipeline(async function*() { + await Promise.resolve(); + yield 'hello'; + yield 'world'; + }, async function*(source) { + for await (const chunk of source) { + yield chunk.toUpperCase(); + } + }, async function(source) { + let ret = ''; + for await (const chunk of source) { + ret += chunk; + } + return ret; + }, common.mustCall((err, val) => { + assert.ok(!err); + assert.strictEqual(val, 'HELLOWORLD'); + })); +} + +{ + // AsyncIterable destination is returned and finalizes. + + const ret = pipeline(async function*() { + await Promise.resolve(); + yield 'hello'; + }, async function*(source) { + for await (const chunk of source) { + chunk; + } + }, common.mustCall((err) => { + assert.strictEqual(err, undefined); + })); + ret.resume(); + assert.strictEqual(typeof ret.pipe, 'function'); +} + +{ + // AsyncFunction destination is not returned and error is + // propagated. + + const ret = pipeline(async function*() { + await Promise.resolve(); + throw new Error('kaboom'); + }, async function*(source) { + for await (const chunk of source) { + chunk; + } + }, common.mustCall((err) => { + assert.strictEqual(err.message, 'kaboom'); + })); + ret.resume(); + assert.strictEqual(typeof ret.pipe, 'function'); +} + +{ + const s = new PassThrough(); + pipeline(async function*() { + throw new Error('kaboom'); + }, s, common.mustCall((err) => { + assert.strictEqual(err.message, 'kaboom'); + assert.strictEqual(s.destroyed, true); + })); +} + +{ + const s = new PassThrough(); + pipeline(async function*() { + throw new Error('kaboom'); + }(), s, common.mustCall((err) => { + assert.strictEqual(err.message, 'kaboom'); + assert.strictEqual(s.destroyed, true); + })); +} + +{ + const s = new PassThrough(); + pipeline(function*() { + throw new Error('kaboom'); + }, s, common.mustCall((err, val) => { + assert.strictEqual(err.message, 'kaboom'); + assert.strictEqual(s.destroyed, true); + })); +} + +{ + const s = new PassThrough(); + pipeline(function*() { + throw new Error('kaboom'); + }(), s, common.mustCall((err, val) => { + assert.strictEqual(err.message, 'kaboom'); + assert.strictEqual(s.destroyed, true); + })); +} + +{ + const s = new PassThrough(); + pipeline(async function*() { + await Promise.resolve(); + yield 'hello'; + yield 'world'; + }, s, async function(source) { + for await (const chunk of source) { + chunk; + throw new Error('kaboom'); + } + }, common.mustCall((err, val) => { + assert.strictEqual(err.message, 'kaboom'); + assert.strictEqual(s.destroyed, true); + })); +} + +{ + const s = new PassThrough(); + const ret = pipeline(function() { + return ['hello', 'world']; + }, s, async function*(source) { + for await (const chunk of source) { + chunk; + throw new Error('kaboom'); + } + }, common.mustCall((err) => { + assert.strictEqual(err.message, 'kaboom'); + assert.strictEqual(s.destroyed, true); + })); + ret.resume(); + assert.strictEqual(typeof ret.pipe, 'function'); +} + +{ + // Legacy streams without async iterator. + + const s = new PassThrough(); + s.push('asd'); + s.push(null); + s[Symbol.asyncIterator] = null; + let ret = ''; + pipeline(s, async function(source) { + for await (const chunk of source) { + ret += chunk; + } + }, common.mustCall((err) => { + assert.strictEqual(err, undefined); + assert.strictEqual(ret, 'asd'); + })); +} + +{ + // v1 streams without read(). + + const s = new Stream(); + process.nextTick(() => { + s.emit('data', 'asd'); + s.emit('end'); + }); + // 'destroyer' can be called multiple times, + // once from stream wrapper and + // once from iterator wrapper. + s.close = common.mustCallAtLeast(1); + let ret = ''; + pipeline(s, async function(source) { + for await (const chunk of source) { + ret += chunk; + } + }, common.mustCall((err) => { + assert.strictEqual(err, undefined); + assert.strictEqual(ret, 'asd'); + })); +} + +{ + // v1 error streams without read(). + + const s = new Stream(); + process.nextTick(() => { + s.emit('error', new Error('kaboom')); + }); + s.destroy = common.mustCall(); + pipeline(s, async function(source) { + }, common.mustCall((err) => { + assert.strictEqual(err.message, 'kaboom'); + })); +} + +{ + const s = new PassThrough(); + assert.throws(() => { + pipeline(function(source) { + }, s, () => {}); + }, (err) => { + assert.strictEqual(err.code, 'ERR_INVALID_RETURN_VALUE'); + assert.strictEqual(s.destroyed, false); + return true; + }); +} + +{ + const s = new PassThrough(); + assert.throws(() => { + pipeline(s, function(source) { + }, s, () => {}); + }, (err) => { + assert.strictEqual(err.code, 'ERR_INVALID_RETURN_VALUE'); + assert.strictEqual(s.destroyed, false); + return true; + }); +} + +{ + const s = new PassThrough(); + assert.throws(() => { + pipeline(s, function(source) { + }, () => {}); + }, (err) => { + assert.strictEqual(err.code, 'ERR_INVALID_RETURN_VALUE'); + assert.strictEqual(s.destroyed, false); + return true; + }); +} + +{ + const s = new PassThrough(); + assert.throws(() => { + pipeline(s, function*(source) { + }, () => {}); + }, (err) => { + assert.strictEqual(err.code, 'ERR_INVALID_RETURN_VALUE'); + assert.strictEqual(s.destroyed, false); + return true; + }); +} + +{ + let res = ''; + pipeline(async function*() { + await Promise.resolve(); + yield 'hello'; + yield 'world'; + }, new Transform({ + transform(chunk, encoding, cb) { + cb(new Error('kaboom')); + } + }), async function(source) { + for await (const chunk of source) { + res += chunk; + } + }, common.mustCall((err) => { + assert.strictEqual(err.message, 'kaboom'); + assert.strictEqual(res, ''); + })); +} + +{ + let res = ''; + pipeline(async function*() { + await Promise.resolve(); + yield 'hello'; + yield 'world'; + }, new Transform({ + transform(chunk, encoding, cb) { + process.nextTick(cb, new Error('kaboom')); + } + }), async function(source) { + for await (const chunk of source) { + res += chunk; + } + }, common.mustCall((err) => { + assert.strictEqual(err.message, 'kaboom'); + assert.strictEqual(res, ''); + })); +} + +{ + let res = ''; + pipeline(async function*() { + await Promise.resolve(); + yield 'hello'; + yield 'world'; + }, new Transform({ + decodeStrings: false, + transform(chunk, encoding, cb) { + cb(null, chunk.toUpperCase()); + } + }), async function(source) { + for await (const chunk of source) { + res += chunk; + } + }, common.mustCall((err) => { + assert.ok(!err); + assert.strictEqual(res, 'HELLOWORLD'); + })); +} + +{ + // Ensure no unhandled rejection from async function. + + pipeline(async function*() { + yield 'hello'; + }, async function(source) { + throw new Error('kaboom'); + }, common.mustCall((err) => { + assert.strictEqual(err.message, 'kaboom'); + })); +} + +{ + const src = new PassThrough({ autoDestroy: false }); + const dst = new PassThrough({ autoDestroy: false }); + pipeline(src, dst, common.mustCall(() => { + assert.strictEqual(src.destroyed, true); + assert.strictEqual(dst.destroyed, true); + })); + src.end(); +} diff --git a/tools/doc/type-parser.js b/tools/doc/type-parser.js index add331016c2204..02b59d37ffd278 100644 --- a/tools/doc/type-parser.js +++ b/tools/doc/type-parser.js @@ -28,6 +28,8 @@ const customTypesMap = { 'AsyncIterator': 'https://tc39.github.io/ecma262/#sec-asynciterator-interface', + 'AsyncIterable': 'https://tc39.github.io/ecma262/#sec-asynciterable-interface', + 'bigint': `${jsDocPrefix}Reference/Global_Objects/BigInt`, 'Iterable':