Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[v13.x backport] stream: support passing generator functions into pipeline() #31975

Closed
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
52 changes: 43 additions & 9 deletions doc/api/stream.md
Original file line number Diff line number Diff line change
Expand Up @@ -1555,17 +1555,30 @@ const cleanup = finished(rs, (err) => {
});
```

### `stream.pipeline(...streams, callback)`
### `stream.pipeline(source, ...transforms, destination, callback)`
<!-- YAML
added: v10.0.0
-->

* `...streams` {Stream} Two or more streams to pipe between.
changes:
- version: REPLACEME
pr-url: https://github.com/nodejs/node/pull/31223
description: Add support for async generators.
-->

* `source` {Stream|Iterable|AsyncIterable|Function}
* Returns: {Iterable|AsyncIterable}
* `...transforms` {Stream|Function}
* `source` {AsyncIterable}
* Returns: {AsyncIterable}
* `destination` {Stream|Function}
* `source` {AsyncIterable}
* Returns: {AsyncIterable|Promise}
* `callback` {Function} Called when the pipeline is fully done.
* `err` {Error}
* `val` Resolved value of `Promise` returned by `destination`.
* Returns: {Stream}

A module method to pipe between streams forwarding errors and properly cleaning
up and provide a callback when the pipeline is complete.
A module method to pipe between streams and generators forwarding errors and
properly cleaning up and provide a callback when the pipeline is complete.

```js
const { pipeline } = require('stream');
Expand Down Expand Up @@ -1608,6 +1621,28 @@ async function run() {
run().catch(console.error);
```

The `pipeline` API also supports async generators:

```js
const pipeline = util.promisify(stream.pipeline);
const fs = require('fs').promises;

async function run() {
await pipeline(
fs.createReadStream('lowercase.txt'),
async function* (source) {
for await (const chunk of source) {
yield String(chunk).toUpperCase();
}
},
fs.createWriteStream('uppercase.txt')
);
console.log('Pipeline succeeded.');
}

run().catch(console.error);
```

`stream.pipeline()` will call `stream.destroy(err)` on all streams except:
* `Readable` streams which have emitted `'end'` or `'close'`.
* `Writable` streams which have emitted `'finish'` or `'close'`.
Expand Down Expand Up @@ -2707,8 +2742,7 @@ const pipeline = util.promisify(stream.pipeline);
const writable = fs.createWriteStream('./file');

(async function() {
const readable = Readable.from(iterable);
await pipeline(readable, writable);
await pipeline(iterable, writable);
})();
```

Expand Down Expand Up @@ -2843,7 +2877,7 @@ contain multi-byte characters.
[`stream.cork()`]: #stream_writable_cork
[`stream.finished()`]: #stream_stream_finished_stream_options_callback
[`stream.pipe()`]: #stream_readable_pipe_destination_options
[`stream.pipeline()`]: #stream_stream_pipeline_streams_callback
[`stream.pipeline()`]: #stream_stream_pipeline_source_transforms_destination_callback
[`stream.uncork()`]: #stream_writable_uncork
[`stream.unpipe()`]: #stream_readable_unpipe_destination
[`stream.wrap()`]: #stream_readable_wrap_stream
Expand Down
230 changes: 196 additions & 34 deletions lib/internal/streams/pipeline.js
Original file line number Diff line number Diff line change
Expand Up @@ -5,55 +5,57 @@

const {
ArrayIsArray,
SymbolAsyncIterator,
SymbolIterator
} = primordials;

let eos;

const { once } = require('internal/util');
const {
ERR_INVALID_ARG_TYPE,
ERR_INVALID_RETURN_VALUE,
ERR_INVALID_CALLBACK,
ERR_MISSING_ARGS,
ERR_STREAM_DESTROYED
} = require('internal/errors').codes;

let EE;
let PassThrough;
let createReadableStreamAsyncIterator;

function isRequest(stream) {
return stream && stream.setHeader && typeof stream.abort === 'function';
}

function destroyStream(stream, err) {
// request.destroy just do .end - .abort is what we want
if (isRequest(stream)) return stream.abort();
if (isRequest(stream.req)) return stream.req.abort();
if (typeof stream.destroy === 'function') return stream.destroy(err);
if (typeof stream.close === 'function') return stream.close();
}

function destroyer(stream, reading, writing, callback) {
callback = once(callback);

let closed = false;
stream.on('close', () => {
closed = true;
});
let destroyed = false;

if (eos === undefined) eos = require('internal/streams/end-of-stream');
eos(stream, { readable: reading, writable: writing }, (err) => {
if (err) return callback(err);
closed = true;
callback();
if (destroyed) return;
destroyed = true;
destroyStream(stream, err);
callback(err);
});

let destroyed = false;
return (err) => {
if (closed) return;
if (destroyed) return;
destroyed = true;

// request.destroy just do .end - .abort is what we want
if (isRequest(stream)) return stream.abort();
if (isRequest(stream.req)) return stream.req.abort();
if (typeof stream.destroy === 'function') return stream.destroy(err);

destroyStream(stream, err);
callback(err || new ERR_STREAM_DESTROYED('pipe'));
};
}

function pipe(from, to) {
return from.pipe(to);
}

function popCallback(streams) {
// Streams should never be an empty array. It should always contain at least
// a single stream. Therefore optimize for the average case instead of
Expand All @@ -63,8 +65,89 @@ function popCallback(streams) {
return streams.pop();
}

function isPromise(obj) {
return !!(obj && typeof obj.then === 'function');
}

function isReadable(obj) {
return !!(obj && typeof obj.pipe === 'function');
}

function isWritable(obj) {
return !!(obj && typeof obj.write === 'function');
}

function isStream(obj) {
return isReadable(obj) || isWritable(obj);
}

function isIterable(obj, isAsync) {
if (!obj) return false;
if (isAsync === true) return typeof obj[SymbolAsyncIterator] === 'function';
if (isAsync === false) return typeof obj[SymbolIterator] === 'function';
return typeof obj[SymbolAsyncIterator] === 'function' ||
typeof obj[SymbolIterator] === 'function';
}

function makeAsyncIterable(val) {
if (isIterable(val)) {
return val;
} else if (isReadable(val)) {
// Legacy streams are not Iterable.
return _fromReadable(val);
} else {
throw new ERR_INVALID_ARG_TYPE(
'val', ['Readable', 'Iterable', 'AsyncIterable'], val);
}
}

async function* _fromReadable(val) {
if (!createReadableStreamAsyncIterator) {
createReadableStreamAsyncIterator =
require('internal/streams/async_iterator');
}

try {
if (typeof val.read !== 'function') {
// createReadableStreamAsyncIterator does not support
// v1 streams. Convert it into a v2 stream.

if (!PassThrough) {
PassThrough = require('_stream_passthrough');
}

const pt = new PassThrough();
val
.on('error', (err) => pt.destroy(err))
.pipe(pt);
yield* createReadableStreamAsyncIterator(pt);
} else {
yield* createReadableStreamAsyncIterator(val);
}
} finally {
destroyStream(val);
}
}

async function pump(iterable, writable, finish) {
if (!EE) {
EE = require('events');
}
try {
for await (const chunk of iterable) {
if (!writable.write(chunk)) {
if (writable.destroyed) return;
await EE.once(writable, 'drain');
}
}
writable.end();
} catch (err) {
finish(err);
}
}

function pipeline(...streams) {
const callback = popCallback(streams);
const callback = once(popCallback(streams));

if (ArrayIsArray(streams[0])) streams = streams[0];

Expand All @@ -73,25 +156,104 @@ function pipeline(...streams) {
}

let error;
const destroys = streams.map(function(stream, i) {
let value;
const destroys = [];

function finish(err, final) {
if (!error && err) {
error = err;
}

if (error || final) {
for (const destroy of destroys) {
destroy(error);
}
}

if (final) {
callback(error, value);
}
}

function wrap(stream, reading, writing, final) {
destroys.push(destroyer(stream, reading, writing, (err) => {
finish(err, final);
}));
}

let ret;
for (let i = 0; i < streams.length; i++) {
const stream = streams[i];
const reading = i < streams.length - 1;
const writing = i > 0;
return destroyer(stream, reading, writing, function(err) {
if (!error) error = err;
if (err) {
for (const destroy of destroys) {
destroy(err);

if (isStream(stream)) {
wrap(stream, reading, writing, !reading);
}

if (i === 0) {
if (typeof stream === 'function') {
ret = stream();
if (!isIterable(ret)) {
throw new ERR_INVALID_RETURN_VALUE(
'Iterable, AsyncIterable or Stream', 'source', ret);
}
} else if (isIterable(stream) || isReadable(stream)) {
ret = stream;
} else {
throw new ERR_INVALID_ARG_TYPE(
'source', ['Stream', 'Iterable', 'AsyncIterable', 'Function'],
stream);
}
if (reading) return;
for (const destroy of destroys) {
destroy();
} else if (typeof stream === 'function') {
ret = makeAsyncIterable(ret);
ret = stream(ret);

if (reading) {
if (!isIterable(ret, true)) {
throw new ERR_INVALID_RETURN_VALUE(
'AsyncIterable', `transform[${i - 1}]`, ret);
}
} else {
if (!PassThrough) {
PassThrough = require('_stream_passthrough');
}

const pt = new PassThrough();
if (isPromise(ret)) {
ret
.then((val) => {
value = val;
pt.end(val);
}, (err) => {
pt.destroy(err);
});
} else if (isIterable(ret, true)) {
pump(ret, pt, finish);
} else {
throw new ERR_INVALID_RETURN_VALUE(
'AsyncIterable or Promise', 'destination', ret);
}

ret = pt;
wrap(ret, false, true, true);
}
callback(error);
});
});
} else if (isStream(stream)) {
if (isReadable(ret)) {
ret.pipe(stream);
} else {
ret = makeAsyncIterable(ret);
pump(ret, stream, finish);
}
ret = stream;
} else {
const name = reading ? `transform[${i - 1}]` : 'destination';
throw new ERR_INVALID_ARG_TYPE(
name, ['Stream', 'Function'], ret);
}
}

return streams.reduce(pipe);
return ret;
}

module.exports = pipeline;
25 changes: 25 additions & 0 deletions test/parallel/test-stream-pipeline-uncaught.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
'use strict';

const common = require('../common');
const {
pipeline,
PassThrough
} = require('stream');
const assert = require('assert');

process.on('uncaughtException', common.mustCall((err) => {
assert.strictEqual(err.message, 'error');
}));

// Ensure that pipeline that ends with Promise
// still propagates error to uncaughtException.
const s = new PassThrough();
s.end('data');
pipeline(s, async function(source) {
for await (const chunk of source) {
chunk;
}
}, common.mustCall((err) => {
assert.ifError(err);
throw new Error('error');
}));
Loading