From e6ce10c79230815e44cc0f27fa77957e3552ca8a Mon Sep 17 00:00:00 2001 From: Robert Nagy Date: Mon, 14 Jun 2021 15:22:59 +0200 Subject: [PATCH] streams: add stream.pipe pipe is similar to pipeline however it supports stream composition. Refs: https://github.com/nodejs/node/issues/32020 --- lib/internal/streams/pipelinify.js | 102 +++++++++++++++++++++++++++++ lib/stream.js | 2 + 2 files changed, 104 insertions(+) create mode 100644 lib/internal/streams/pipelinify.js diff --git a/lib/internal/streams/pipelinify.js b/lib/internal/streams/pipelinify.js new file mode 100644 index 00000000000000..3131db8cc36014 --- /dev/null +++ b/lib/internal/streams/pipelinify.js @@ -0,0 +1,102 @@ +'use strict'; + +const pipeline = require('internal/streams/pipeline'); +const Duplex = require('internal/streams/duplex'); +const { destroyer } = require('internal/streams/destroy'); + +module.exports = function pipe(...streams) { + // TODO (ronag): Avoid double buffering. + // Implement Writable/Readable/Duplex traits. + + let ondrain; + let onfinish; + let onreadable; + let onclose; + let ret; + + const r = pipeline(streams, function(err) { + if (onclose) { + const cb = onclose; + onclose = null; + cb(err); + } else { + ret.destroy(err); + } + }); + const w = streams[0]; + + const writable = w.writable; + const readable = r.readable; + const objectMode = w.readableObjectMode; + + ret = new Duplex({ + writable, + readable, + objectMode, + highWaterMark: 1 + }); + + if (writable) { + ret._write = function(chunk, encoding, callback) { + if (w.write(chunk, encoding)) { + callback(); + } else { + ondrain = callback; + } + }; + + ret._final = function(callback) { + w.end(); + onfinish = callback; + }; + + ret.on('drain', function() { + if (ondrain) { + const cb = ondrain; + ondrain = null; + cb(); + } + }); + + ret.on('finish', function() { + if (onfinish) { + const cb = onfinish; + onfinish = null; + cb(); + } + }); + } + + if (readable) { + r.on('readable', function() { + if (onreadable) { + const cb = onreadable; + onreadable = null; + cb(); + } + }); + + ret._read = function() { + while (true) { + const buf = r.read(); + + if (buf === null) { + onreadable = ret._read; + return; + } + + if (!ret.push(buf)) { + return; + } + } + }; + } + + ret._destroy = function(err, callback) { + onclose = callback; + onreadable = null; + ondrain = null; + onfinish = null; + destroyer(r, err); + }; +}; diff --git a/lib/stream.js b/lib/stream.js index 85adda81b32f29..3b4a83868f84ac 100644 --- a/lib/stream.js +++ b/lib/stream.js @@ -30,6 +30,7 @@ const { } = require('internal/util'); const pipeline = require('internal/streams/pipeline'); +const pipelinify = require('internal/streams/pipelinify'); const eos = require('internal/streams/end-of-stream'); const internalBuffer = require('internal/buffer'); @@ -42,6 +43,7 @@ Stream.Duplex = require('internal/streams/duplex'); Stream.Transform = require('internal/streams/transform'); Stream.PassThrough = require('internal/streams/passthrough'); Stream.pipeline = pipeline; +Stream.pipelinify = pipelinify; const { addAbortSignal } = require('internal/streams/add-abort-signal'); Stream.addAbortSignal = addAbortSignal; Stream.finished = eos;