Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[v16.x backport] stream: add stream.compose #39563

Closed
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
89 changes: 89 additions & 0 deletions doc/api/stream.md
Original file line number Diff line number Diff line change
Expand Up @@ -1918,6 +1918,95 @@ run().catch(console.error);
after the `callback` has been invoked. In the case of reuse of streams after
failure, this can cause event listener leaks and swallowed errors.

### `stream.compose(...streams)`
<!-- YAML
added: REPLACEME
-->

> Stability: 1 - `stream.compose` is experimental.

* `streams` {Stream[]|Iterable[]|AsyncIterable[]|Function[]}
* Returns: {stream.Duplex}

Combines two or more streams into a `Duplex` stream that writes to the
first stream and reads from the last. Each provided stream is piped into
the next, using `stream.pipeline`. If any of the streams error then all
are destroyed, including the outer `Duplex` stream.

Because `stream.compose` returns a new stream that in turn can (and
should) be piped into other streams, it enables composition. In contrast,
when passing streams to `stream.pipeline`, typically the first stream is
a readable stream and the last a writable stream, forming a closed
circuit.

If passed a `Function` it must be a factory method taking a `source`
`Iterable`.

```mjs
import { compose, Transform } from 'stream';

const removeSpaces = new Transform({
transform(chunk, encoding, callback) {
callback(null, String(chunk).replace(' ', ''));
}
});

async function* toUpper(source) {
for await (const chunk of source) {
yield String(chunk).toUpperCase();
}
}

let res = '';
for await (const buf of compose(removeSpaces, toUpper).end('hello world')) {
res += buf;
}

console.log(res); // prints 'HELLOWORLD'
```

`stream.compose` can be used to convert async iterables, generators and
functions into streams.

* `AsyncIterable` converts into a readable `Duplex`. Cannot yield
`null`.
* `AsyncGeneratorFunction` converts into a readable/writable transform `Duplex`.
Must take a source `AsyncIterable` as first parameter. Cannot yield
`null`.
* `AsyncFunction` converts into a writable `Duplex`. Must return
either `null` or `undefined`.

```mjs
import { compose } from 'stream';
import { finished } from 'stream/promises';

// Convert AsyncIterable into readable Duplex.
const s1 = compose(async function*() {
yield 'Hello';
yield 'World';
}());

// Convert AsyncGenerator into transform Duplex.
const s2 = compose(async function*(source) {
for await (const chunk of source) {
yield String(chunk).toUpperCase();
}
});

let res = '';

// Convert AsyncFunction into writable Duplex.
const s3 = compose(async function(source) {
for await (const chunk of source) {
res += chunk;
}
});

await finished(compose(s1, s2, s3));

console.log(res); // prints 'HELLOWORLD'
```

### `stream.Readable.from(iterable, [options])`
<!-- YAML
added:
Expand Down
196 changes: 196 additions & 0 deletions lib/internal/streams/compose.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,196 @@
'use strict';

const pipeline = require('internal/streams/pipeline');
const Duplex = require('internal/streams/duplex');
const { destroyer } = require('internal/streams/destroy');
const {
isNodeStream,
isReadable,
isWritable,
} = require('internal/streams/utils');
const {
AbortError,
codes: {
ERR_INVALID_ARG_VALUE,
ERR_MISSING_ARGS,
},
} = require('internal/errors');

// This is needed for pre node 17.
class ComposeDuplex extends Duplex {
constructor(options) {
super(options);

// https://github.com/nodejs/node/pull/34385

if (options?.readable === false) {
this._readableState.readable = false;
this._readableState.ended = true;
this._readableState.endEmitted = true;
}

if (options?.writable === false) {
this._writableState.writable = false;
this._writableState.ending = true;
this._writableState.ended = true;
this._writableState.finished = true;
}
}
}

module.exports = function compose(...streams) {
if (streams.length === 0) {
throw new ERR_MISSING_ARGS('streams');
}

if (streams.length === 1) {
return Duplex.from(streams[0]);
}

const orgStreams = [...streams];

if (typeof streams[0] === 'function') {
streams[0] = Duplex.from(streams[0]);
}

if (typeof streams[streams.length - 1] === 'function') {
const idx = streams.length - 1;
streams[idx] = Duplex.from(streams[idx]);
}

for (let n = 0; n < streams.length; ++n) {
if (!isNodeStream(streams[n])) {
// TODO(ronag): Add checks for non streams.
continue;
}
if (n < streams.length - 1 && !isReadable(streams[n])) {
throw new ERR_INVALID_ARG_VALUE(
`streams[${n}]`,
orgStreams[n],
'must be readable'
);
}
if (n > 0 && !isWritable(streams[n])) {
throw new ERR_INVALID_ARG_VALUE(
`streams[${n}]`,
orgStreams[n],
'must be writable'
);
}
}

let ondrain;
let onfinish;
let onreadable;
let onclose;
let d;

function onfinished(err) {
const cb = onclose;
onclose = null;

if (cb) {
cb(err);
} else if (err) {
d.destroy(err);
} else if (!readable && !writable) {
d.destroy();
}
}

const head = streams[0];
const tail = pipeline(streams, onfinished);

const writable = !!isWritable(head);
const readable = !!isReadable(tail);

// TODO(ronag): Avoid double buffering.
// Implement Writable/Readable/Duplex traits.
// See, https://github.com/nodejs/node/pull/33515.
d = new ComposeDuplex({
// TODO (ronag): highWaterMark?
writableObjectMode: !!head?.writableObjectMode,
readableObjectMode: !!tail?.writableObjectMode,
writable,
readable,
});

if (writable) {
d._write = function(chunk, encoding, callback) {
if (head.write(chunk, encoding)) {
callback();
} else {
ondrain = callback;
}
};

d._final = function(callback) {
head.end();
onfinish = callback;
};

head.on('drain', function() {
if (ondrain) {
const cb = ondrain;
ondrain = null;
cb();
}
});

tail.on('finish', function() {
if (onfinish) {
const cb = onfinish;
onfinish = null;
cb();
}
});
}

if (readable) {
tail.on('readable', function() {
if (onreadable) {
const cb = onreadable;
onreadable = null;
cb();
}
});

tail.on('end', function() {
d.push(null);
});

d._read = function() {
while (true) {
const buf = tail.read();

if (buf === null) {
onreadable = d._read;
return;
}

if (!d.push(buf)) {
return;
}
}
};
}

d._destroy = function(err, callback) {
if (!err && onclose !== null) {
err = new AbortError();
}

onreadable = null;
ondrain = null;
onfinish = null;

if (onclose === null) {
callback(err);
} else {
onclose = callback;
destroyer(tail, err);
}
};

return d;
};
19 changes: 2 additions & 17 deletions lib/internal/streams/duplexify.js
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,11 @@
const {
isIterable,
isNodeStream,
isDestroyed,
isReadable,
isReadableNodeStream,
isWritable,
isWritableNodeStream,
isDuplexNodeStream,
isReadableFinished,
isWritableEnded,
} = require('internal/streams/utils');
const eos = require('internal/streams/end-of-stream');
const {
Expand All @@ -32,20 +31,6 @@ const {
FunctionPrototypeCall
} = primordials;

function isReadable(stream) {
const r = isReadableNodeStream(stream);
if (r === null || typeof stream?.readable !== 'boolean') return null;
if (isDestroyed(stream)) return false;
return r && stream.readable && !isReadableFinished(stream);
}

function isWritable(stream) {
const r = isWritableNodeStream(stream);
if (r === null || typeof stream?.writable !== 'boolean') return null;
if (isDestroyed(stream)) return false;
return r && stream.writable && !isWritableEnded(stream);
}

// This is needed for pre node 17.
class Duplexify extends Duplex {
constructor(options) {
Expand Down
2 changes: 2 additions & 0 deletions lib/internal/streams/end-of-stream.js
Original file line number Diff line number Diff line change
Expand Up @@ -176,6 +176,8 @@ function eos(stream, options, callback) {
(rState && rState.errorEmitted) ||
(rState && stream.req && stream.aborted) ||
(
(!wState || !willEmitClose || typeof wState.closed !== 'boolean') &&
(!rState || !willEmitClose || typeof rState.closed !== 'boolean') &&
(!writable || (wState && wState.finished)) &&
(!readable || (rState && rState.endEmitted))
)
Expand Down
17 changes: 7 additions & 10 deletions lib/internal/streams/pipeline.js
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,8 @@ const { validateCallback } = require('internal/validators');

const {
isIterable,
isReadable,
isStream,
isReadableNodeStream,
isNodeStream,
} = require('internal/streams/utils');

let PassThrough;
Expand Down Expand Up @@ -87,7 +87,7 @@ function popCallback(streams) {
function makeAsyncIterable(val) {
if (isIterable(val)) {
return val;
} else if (isReadable(val)) {
} else if (isReadableNodeStream(val)) {
// Legacy streams are not Iterable.
return fromReadable(val);
}
Expand Down Expand Up @@ -204,7 +204,7 @@ function pipeline(...streams) {
const reading = i < streams.length - 1;
const writing = i > 0;

if (isStream(stream)) {
if (isNodeStream(stream)) {
finishCount++;
destroys.push(destroyer(stream, reading, writing, finish));
}
Expand All @@ -216,7 +216,7 @@ function pipeline(...streams) {
throw new ERR_INVALID_RETURN_VALUE(
'Iterable, AsyncIterable or Stream', 'source', ret);
}
} else if (isIterable(stream) || isReadable(stream)) {
} else if (isIterable(stream) || isReadableNodeStream(stream)) {
ret = stream;
} else {
ret = Duplex.from(stream);
Expand Down Expand Up @@ -269,8 +269,8 @@ function pipeline(...streams) {
finishCount++;
destroys.push(destroyer(ret, false, true, finish));
}
} else if (isStream(stream)) {
if (isReadable(ret)) {
} else if (isNodeStream(stream)) {
if (isReadableNodeStream(ret)) {
ret.pipe(stream);

// Compat. Before node v10.12.0 stdio used to throw an error so
Expand All @@ -291,9 +291,6 @@ function pipeline(...streams) {
}
}

// TODO(ronag): Consider returning a Duplex proxy if the first argument
// is a writable. Would improve composability.
// See, https://github.com/nodejs/node/issues/32020
return ret;
}

Expand Down
Loading