diff --git a/doc/api/stream.md b/doc/api/stream.md index 603e0407a8c60c..843f59c51d17fe 100644 --- a/doc/api/stream.md +++ b/doc/api/stream.md @@ -1859,6 +1859,93 @@ run().catch(console.error); after the `callback` has been invoked. In the case of reuse of streams after failure, this can cause event listener leaks and swallowed errors. +### `stream.compose(...streams)` + + +* `streams` {Stream[]|Iterable[]|AsyncIterable[]|Function[]} +* Returns: {stream.Duplex} + +Combines two or more streams into a `Duplex` stream that writes to the +first stream and reads from the last. Each provided stream is piped into +the next, using `stream.pipeline`. If any of the streams error then all +are destroyed, including the outer `Duplex` stream. + +Because `stream.compose` returns a new stream that in turn can (and +should) be piped into other streams, it enables composition. In contrast, +when passing streams to `stream.pipeline`, typically the first stream is +a readable stream and the last a writable stream, forming a closed +circuit. + +If passed a `Function` is must be a factory method taking a `source` +`Iterable`. + +```mjs +import { compose, Transform } from 'stream'; + +const removeSpaces = new Transform({ + transform(chunk, encoding, callback) { + callback(null, String(chunk).replace(' ', '')); + } +}); + +async function* toUpper(source) { + for await (const chunk of source) { + yield String(chunk).toUpperCase(); + } +} + +let res = ''; +for await (const buf of compose(removeSpaces, toUpper).end('hello world')) { + res += buf; +} + +console.log(res); // prints 'HELLOWORLD' +``` + +`stream.compose` can be used to convert async iterables, generators and +functions into streams. + +* `AsyncIterable` converts into a readable `Duplex`. Cannot yield + `null`. +* `AsyncGeneratorFunction` converts into a readable/writable transform `Duplex`. + Must take a source `AsyncIterable` as first parameter. Cannot yield + `null`. +* `AsyncFunction` converts into a writable `Duplex`. Must return + either `null` or `undefined`. + +```mjs +import { compose } from 'stream'; +import { finished } from 'stream/promises'; + +// Convert AsyncIterable into readable Duplex. +const s1 = compose(async function*() { + yield 'Hello'; + yield 'World'; +}()); + +// Convert AsyncGenerator into transform Duplex. +const s2 = compose(async function*(source) { + for await (const chunk of source) { + yield String(chunk).toUpperCase(); + } +}); + +let res = ''; + +// Convert AsyncFunction into writable Duplex. +const s3 = compose(async function(source) { + for await (const chunk of source) { + res += chunk; + } +}); + +await finished(compose(s1, s2, s3)); + +console.log(res); // prints 'HELLOWORLD' +``` + ### `stream.Readable.from(iterable, [options])`