From 4d04a5ed88c21eaa3661dcc33f814e2511c37743 Mon Sep 17 00:00:00 2001 From: Debadree Chatterjee Date: Sun, 22 Jan 2023 19:20:59 +0530 Subject: [PATCH 01/26] stream: add pipeline() for webstreams Refs: https://github.com/nodejs/node/issues/39316 --- lib/internal/streams/pipeline.js | 40 ++- lib/internal/streams/utils.js | 10 + test/parallel/test-webstreams-pipeline.js | 306 ++++++++++++++++++++++ 3 files changed, 350 insertions(+), 6 deletions(-) create mode 100644 test/parallel/test-webstreams-pipeline.js diff --git a/lib/internal/streams/pipeline.js b/lib/internal/streams/pipeline.js index b8a756330536c5..5610ce1bb489ca 100644 --- a/lib/internal/streams/pipeline.js +++ b/lib/internal/streams/pipeline.js @@ -35,11 +35,29 @@ const { isReadable, isReadableNodeStream, isNodeStream, + isReadableStream, + isWritableStream, + isTransformStream, } = require('internal/streams/utils'); const { AbortController } = require('internal/abort_controller'); let PassThrough; let Readable; +let Writable; + +function lazyloadReadable() { + if (!Readable) { + Readable = require('internal/streams/readable'); + } + return Readable; +} + +function lazyloadWritable() { + if (!Writable) { + Writable = require('internal/streams/writable'); + } + return Writable; +} function destroyer(stream, reading, writing) { let finished = false; @@ -81,11 +99,7 @@ function makeAsyncIterable(val) { } async function* fromReadable(val) { - if (!Readable) { - Readable = require('internal/streams/readable'); - } - - yield* Readable.prototype[SymbolAsyncIterator].call(val); + yield* lazyloadReadable().prototype[SymbolAsyncIterator].call(val); } async function pump(iterable, writable, finish, { end }) { @@ -147,6 +161,20 @@ async function pump(iterable, writable, finish, { end }) { } } +function convertToNodeStreamIfWebstream(stream) { + if (isReadableStream(stream)) { + return lazyloadReadable().fromWeb(stream); + } else if (isWritableStream(stream)) { + return lazyloadWritable().fromWeb(stream); + } else if (isTransformStream(stream)) { + return Duplex.from({ + writable: stream.writable, + readable: stream.readable + }); + } + return stream; +} + function pipeline(...streams) { return pipelineImpl(streams, once(popCallback(streams))); } @@ -212,7 +240,7 @@ function pipelineImpl(streams, callback, opts) { let ret; for (let i = 0; i < streams.length; i++) { - const stream = streams[i]; + const stream = convertToNodeStreamIfWebstream(streams[i]); const reading = i < streams.length - 1; const writing = i > 0; const end = reading || opts?.end !== false; diff --git a/lib/internal/streams/utils.js b/lib/internal/streams/utils.js index 9d08af6f31a280..1471b8b7eb07d7 100644 --- a/lib/internal/streams/utils.js +++ b/lib/internal/streams/utils.js @@ -77,6 +77,15 @@ function isWritableStream(obj) { ); } +function isTransformStream(obj) { + return !!( + obj && + !isNodeStream(obj) && + typeof obj.readable === 'object' && + typeof obj.writable === 'object' + ); +} + function isIterable(obj, isAsync) { if (obj == null) return false; if (isAsync === true) return typeof obj[SymbolAsyncIterator] === 'function'; @@ -312,4 +321,5 @@ module.exports = { isServerRequest, isServerResponse, willEmitClose, + isTransformStream, }; diff --git a/test/parallel/test-webstreams-pipeline.js b/test/parallel/test-webstreams-pipeline.js new file mode 100644 index 00000000000000..e8448065d32cb8 --- /dev/null +++ b/test/parallel/test-webstreams-pipeline.js @@ -0,0 +1,306 @@ +'use strict'; + +const common = require('../common'); +const assert = require('assert'); +const { Readable, Writable, Transform, pipeline } = require('stream'); +const { pipeline: pipelinePromise } = require('stream/promises'); +const { ReadableStream, WritableStream, TransformStream } = require('stream/web'); +const http = require('http'); + +{ + const values = []; + let c; + const rs = new ReadableStream({ + start(controller) { + c = controller; + } + }); + const ws = new WritableStream({ + write(chunk) { + values.push(chunk?.toString()); + } + }); + + pipeline(rs, ws, common.mustSucceed(() => { + assert.deepStrictEqual(values, ['hello', 'world']); + })); + + c.enqueue('hello'); + c.enqueue('world'); + c.close(); +} + +{ + let c; + const rs = new ReadableStream({ + start(controller) { + c = controller; + } + }); + + const ws = new WritableStream({ + write() { } + }); + + pipeline(rs, ws, common.mustCall((err) => { + assert.strictEqual(err?.message, 'kaboom'); + })); + + c.error(new Error('kaboom')); +} + +{ + let c; + const values = []; + const rs = new ReadableStream({ + start(controller) { + c = controller; + } + }); + + const ts = new TransformStream({ + transform(chunk, controller) { + controller.enqueue(chunk?.toString().toUpperCase()); + } + }); + + const ws = new WritableStream({ + write(chunk) { + values.push(chunk?.toString()); + } + }); + + pipeline(rs, ts, ws, common.mustSucceed(() => { + assert.deepStrictEqual(values, ['HELLO', 'WORLD']); + })); + + c.enqueue('hello'); + c.enqueue('world'); + c.close(); +} + +{ + function makeTransformStream() { + return new TransformStream({ + transform(chunk, controller) { + controller.enqueue(chunk?.toString()); + } + }); + } + + let c; + const rs = new ReadableStream({ + start(controller) { + c = controller; + } + }); + + const ws = new WritableStream({ + write() { } + }); + + pipeline(rs, + makeTransformStream(), + makeTransformStream(), + makeTransformStream(), + makeTransformStream(), + ws, + common.mustCall((err) => { + assert.strictEqual(err?.message, 'kaboom'); + })); + + c.error(new Error('kaboom')); +} + +{ + const values = []; + + const r = new Readable({ + read() { } + }); + + const ws = new WritableStream({ + write(chunk) { + values.push(chunk?.toString()); + } + }); + + pipeline(r, ws, common.mustSucceed(() => { + assert.deepStrictEqual(values, ['hello', 'world']); + })); + + r.push('hello'); + r.push('world'); + r.push(null); +} + +{ + const values = []; + let c; + const rs = new ReadableStream({ + start(controller) { + c = controller; + } + }); + + const w = new Writable({ + write(chunk, encoding, callback) { + values.push(chunk?.toString()); + callback(); + } + }); + + pipeline(rs, w, common.mustSucceed(() => { + assert.deepStrictEqual(values, ['hello', 'world']); + })); + + c.enqueue('hello'); + c.enqueue('world'); + c.close(); +} + +{ + const values = []; + let c; + const rs = new ReadableStream({ + start(controller) { + c = controller; + } + }); + + const ws = new WritableStream({ + write(chunk) { + values.push(chunk?.toString()); + } + }); + + const t = new Transform({ + transform(chunk, encoding, callback) { + callback(null, chunk?.toString().toUpperCase()); + } + }); + + pipeline(rs, t, ws, common.mustSucceed(() => { + assert.deepStrictEqual(values, ['HELLO', 'WORLD']); + })); + + c.enqueue('hello'); + c.enqueue('world'); + c.close(); +} + +{ + const server = http.createServer((req, res) => { + const rs = new ReadableStream({ + start(controller) { + controller.enqueue('hello'); + controller.enqueue('world'); + controller.close(); + } + }); + pipeline(rs, res, common.mustSucceed(() => {})); + }); + + server.listen(0, common.mustCall(() => { + const req = http.request({ + port: server.address().port + }); + req.end(); + const values = []; + req.on('response', (res) => { + res.on('data', (chunk) => { + values.push(chunk?.toString()); + }); + res.on('end', common.mustCall(() => { + assert.deepStrictEqual(values, ['hello', 'world']); + server.close(); + })); + }); + })); +} + +{ + const values = []; + const server = http.createServer((req, res) => { + const ts = new TransformStream({ + transform(chunk, controller) { + controller.enqueue(chunk?.toString().toUpperCase()); + } + }); + pipeline(req, ts, res, common.mustSucceed()); + }); + + server.listen(0, () => { + const req = http.request({ + port: server.address().port + }); + + let c; + + const rs = new ReadableStream({ + start(controller) { + c = controller; + } + }); + + c.enqueue('hello'); + c.close(); + + pipeline(rs, req, common.mustSucceed(() => { + server.close(); + })); + + req.on('response', (res) => { + res.on('data', (chunk) => { + values.push(chunk?.toString()); + } + ); + res.on('end', common.mustCall(() => { + assert.deepStrictEqual(values, ['HELLO']); + server.close(); + })); + }); + }); +} + +{ + const values = []; + let c; + const rs = new ReadableStream({ + start(controller) { + c = controller; + } + }); + const ws = new WritableStream({ + write(chunk) { + values.push(chunk?.toString()); + } + }); + + pipelinePromise(rs, ws).then(common.mustCall(() => { + assert.deepStrictEqual(values, ['hello', 'world']); + })); + + c.enqueue('hello'); + c.enqueue('world'); + c.close(); +} + +{ + let c; + const rs = new ReadableStream({ + start(controller) { + c = controller; + } + }); + + const ws = new WritableStream({ + write() { } + }); + + pipelinePromise(rs, ws).then(common.mustNotCall()).catch(common.mustCall((err) => { + assert.strictEqual(err?.message, 'kaboom'); + })); + + c.error(new Error('kaboom')); +} From 06b8fff99fc01c17f9cc931413c07d5a3fd8b6c1 Mon Sep 17 00:00:00 2001 From: Debadree Chatterjee Date: Thu, 26 Jan 2023 01:27:27 +0530 Subject: [PATCH 02/26] fixup! stop making everything nodestream --- lib/internal/streams/pipeline.js | 98 +++++++++++++++++++++++--------- lib/internal/streams/utils.js | 5 ++ lib/stream/promises.js | 3 +- 3 files changed, 77 insertions(+), 29 deletions(-) diff --git a/lib/internal/streams/pipeline.js b/lib/internal/streams/pipeline.js index 5610ce1bb489ca..c94a1aa7d75257 100644 --- a/lib/internal/streams/pipeline.js +++ b/lib/internal/streams/pipeline.js @@ -35,15 +35,14 @@ const { isReadable, isReadableNodeStream, isNodeStream, - isReadableStream, isWritableStream, isTransformStream, + isWebStream, } = require('internal/streams/utils'); const { AbortController } = require('internal/abort_controller'); let PassThrough; let Readable; -let Writable; function lazyloadReadable() { if (!Readable) { @@ -52,13 +51,6 @@ function lazyloadReadable() { return Readable; } -function lazyloadWritable() { - if (!Writable) { - Writable = require('internal/streams/writable'); - } - return Writable; -} - function destroyer(stream, reading, writing) { let finished = false; stream.on('close', () => { @@ -106,6 +98,10 @@ async function pump(iterable, writable, finish, { end }) { let error; let onresolve = null; + if (isTransformStream(writable)) { + writable = writable.writable; + } + const resume = (err) => { if (err) { error = err; @@ -132,7 +128,9 @@ async function pump(iterable, writable, finish, { end }) { } }); - writable.on('drain', resume); + if (typeof writable.on === 'function') { + writable.on('drain', resume); + } const cleanup = eos(writable, { readable: false }, resume); try { @@ -140,14 +138,23 @@ async function pump(iterable, writable, finish, { end }) { await wait(); } + let writer = writable; + let endFn = () => {}; + if (isWritableStream(writable)) { + writer = writable.getWriter(); + endFn = writer.close.bind(writer); + } else { + endFn = writer.end.bind(writer); + } + for await (const chunk of iterable) { - if (!writable.write(chunk)) { + if (!writer.write(chunk)) { await wait(); } } if (end) { - writable.end(); + endFn(); } await wait(); @@ -157,22 +164,10 @@ async function pump(iterable, writable, finish, { end }) { finish(error !== err ? aggregateTwoErrors(error, err) : err); } finally { cleanup(); - writable.off('drain', resume); - } -} - -function convertToNodeStreamIfWebstream(stream) { - if (isReadableStream(stream)) { - return lazyloadReadable().fromWeb(stream); - } else if (isWritableStream(stream)) { - return lazyloadWritable().fromWeb(stream); - } else if (isTransformStream(stream)) { - return Duplex.from({ - writable: stream.writable, - readable: stream.readable - }); + if (typeof writable.off === 'function') { + writable.off('drain', resume); + } } - return stream; } function pipeline(...streams) { @@ -240,7 +235,7 @@ function pipelineImpl(streams, callback, opts) { let ret; for (let i = 0; i < streams.length; i++) { - const stream = convertToNodeStreamIfWebstream(streams[i]); + const stream = streams[i]; const reading = i < streams.length - 1; const writing = i > 0; const end = reading || opts?.end !== false; @@ -360,6 +355,22 @@ function pipelineImpl(streams, callback, opts) { 'val', ['Readable', 'Iterable', 'AsyncIterable'], ret); } ret = stream; + } else if (isWebStream(stream)) { + if (isReadableNodeStream(ret)) { + finishCount += 2; + pipeNodeToWeb(ret, stream, finish, { end }); + } else if (isIterable(ret)) { + finishCount++; + pump(ret, stream, finish, { end }); + } else { + throw new ERR_INVALID_ARG_TYPE( + 'val', ['Readable', 'Iterable', 'AsyncIterable'], ret); + } + if (isTransformStream(stream)) { + ret = stream.readable; + } else { + ret = stream; + } } else { ret = Duplex.from(stream); } @@ -420,4 +431,35 @@ function pipe(src, dst, finish, { end }) { return eos(dst, { readable: false, writable: true }, finish); } +function pipeNodeToWeb(src, dst, finish, { end }) { + let writable = dst; + if (isTransformStream(dst)) { + writable = dst.writable; + } + const writer = writable.getWriter(); + + if (end) { + src.once('end', () => { + writer.close(); + }); + } else { + finish(); + } + eos(src, { readable: true, writable: false }, (err) => { + const rState = src._readableState; + if ( + err && + err.code === 'ERR_STREAM_PREMATURE_CLOSE' && + (rState && rState.ended && !rState.errored && !rState.errorEmitted) + ) { + src + .once('end', finish) + .once('error', finish); + } else { + finish(err); + } + }); + return eos(dst, finish); +} + module.exports = { pipelineImpl, pipeline }; diff --git a/lib/internal/streams/utils.js b/lib/internal/streams/utils.js index 1471b8b7eb07d7..74faca5fe9bb2a 100644 --- a/lib/internal/streams/utils.js +++ b/lib/internal/streams/utils.js @@ -86,6 +86,10 @@ function isTransformStream(obj) { ); } +function isWebStream(obj) { + return isReadableStream(obj) || isWritableStream(obj) || isTransformStream(obj); +} + function isIterable(obj, isAsync) { if (obj == null) return false; if (isAsync === true) return typeof obj[SymbolAsyncIterator] === 'function'; @@ -312,6 +316,7 @@ module.exports = { isReadableFinished, isReadableErrored, isNodeStream, + isWebStream, isWritable, isWritableNodeStream, isWritableStream, diff --git a/lib/stream/promises.js b/lib/stream/promises.js index 512012860f4a7a..a8b65d62b0961b 100644 --- a/lib/stream/promises.js +++ b/lib/stream/promises.js @@ -8,6 +8,7 @@ const { const { isIterable, isNodeStream, + isWebStream, } = require('internal/streams/utils'); const { pipelineImpl: pl } = require('internal/streams/pipeline'); @@ -21,7 +22,7 @@ function pipeline(...streams) { let end; const lastArg = streams[streams.length - 1]; if (lastArg && typeof lastArg === 'object' && - !isNodeStream(lastArg) && !isIterable(lastArg)) { + !isNodeStream(lastArg) && !isIterable(lastArg) && !isWebStream(lastArg)) { const options = ArrayPrototypePop(streams); signal = options.signal; end = options.end; From 08c333a8800a9b006126e47abac654129807cce5 Mon Sep 17 00:00:00 2001 From: Debadree Chatterjee Date: Thu, 26 Jan 2023 20:13:33 +0530 Subject: [PATCH 03/26] fixup! final fixups for transform streams --- lib/internal/streams/pipeline.js | 25 ++++++++++++++++++------- 1 file changed, 18 insertions(+), 7 deletions(-) diff --git a/lib/internal/streams/pipeline.js b/lib/internal/streams/pipeline.js index c94a1aa7d75257..995197ab805881 100644 --- a/lib/internal/streams/pipeline.js +++ b/lib/internal/streams/pipeline.js @@ -282,8 +282,13 @@ function pipelineImpl(streams, callback, opts) { ret = Duplex.from(stream); } } else if (typeof stream === 'function') { - ret = makeAsyncIterable(ret); - ret = stream(ret, { signal }); + if (isTransformStream(ret)) { + ret = makeAsyncIterable(ret?.readable); + ret = stream(ret, { signal }); + } else { + ret = makeAsyncIterable(ret); + ret = stream(ret, { signal }); + } if (reading) { if (!isIterable(ret, true)) { @@ -350,6 +355,9 @@ function pipelineImpl(streams, callback, opts) { } else if (isIterable(ret)) { finishCount++; pump(ret, stream, finish, { end }); + } else if (isTransformStream(ret)) { + finishCount++; + pump(ret.readable, stream, finish, { end }); } else { throw new ERR_INVALID_ARG_TYPE( 'val', ['Readable', 'Iterable', 'AsyncIterable'], ret); @@ -362,15 +370,14 @@ function pipelineImpl(streams, callback, opts) { } else if (isIterable(ret)) { finishCount++; pump(ret, stream, finish, { end }); + } else if (isTransformStream(ret)) { + // finishCount++; + pump(ret.readable, stream, finish, { end }); } else { throw new ERR_INVALID_ARG_TYPE( 'val', ['Readable', 'Iterable', 'AsyncIterable'], ret); } - if (isTransformStream(stream)) { - ret = stream.readable; - } else { - ret = stream; - } + ret = stream; } else { ret = Duplex.from(stream); } @@ -438,6 +445,10 @@ function pipeNodeToWeb(src, dst, finish, { end }) { } const writer = writable.getWriter(); + src.on('data', (chunk) => { + writer.write(chunk); + }); + if (end) { src.once('end', () => { writer.close(); From cb7de04e8a756e04e0c4e171b2a37419a2d3d383 Mon Sep 17 00:00:00 2001 From: Debadree Chatterjee Date: Thu, 26 Jan 2023 20:18:07 +0530 Subject: [PATCH 04/26] fixup! remove redundant --- lib/internal/streams/pipeline.js | 1 - 1 file changed, 1 deletion(-) diff --git a/lib/internal/streams/pipeline.js b/lib/internal/streams/pipeline.js index 995197ab805881..1b5f0a7c959a15 100644 --- a/lib/internal/streams/pipeline.js +++ b/lib/internal/streams/pipeline.js @@ -371,7 +371,6 @@ function pipelineImpl(streams, callback, opts) { finishCount++; pump(ret, stream, finish, { end }); } else if (isTransformStream(ret)) { - // finishCount++; pump(ret.readable, stream, finish, { end }); } else { throw new ERR_INVALID_ARG_TYPE( From 22ee7472f64ac155c17a598b3880196ebf7fbe08 Mon Sep 17 00:00:00 2001 From: Debadree Chatterjee Date: Thu, 26 Jan 2023 21:35:37 +0530 Subject: [PATCH 05/26] fixup! use proper eos and request type --- lib/internal/streams/pipeline.js | 5 +++-- test/parallel/test-webstreams-pipeline.js | 14 +++++--------- 2 files changed, 8 insertions(+), 11 deletions(-) diff --git a/lib/internal/streams/pipeline.js b/lib/internal/streams/pipeline.js index 1b5f0a7c959a15..187ee31888bcbf 100644 --- a/lib/internal/streams/pipeline.js +++ b/lib/internal/streams/pipeline.js @@ -148,7 +148,8 @@ async function pump(iterable, writable, finish, { end }) { } for await (const chunk of iterable) { - if (!writer.write(chunk)) { + const written = writer.write(chunk); + if (!written) { await wait(); } } @@ -469,7 +470,7 @@ function pipeNodeToWeb(src, dst, finish, { end }) { finish(err); } }); - return eos(dst, finish); + return eos(writable, finish); } module.exports = { pipelineImpl, pipeline }; diff --git a/test/parallel/test-webstreams-pipeline.js b/test/parallel/test-webstreams-pipeline.js index e8448065d32cb8..9e797b8badd5ba 100644 --- a/test/parallel/test-webstreams-pipeline.js +++ b/test/parallel/test-webstreams-pipeline.js @@ -232,23 +232,19 @@ const http = require('http'); server.listen(0, () => { const req = http.request({ - port: server.address().port + port: server.address().port, + method: 'POST', }); - let c; const rs = new ReadableStream({ start(controller) { - c = controller; + controller.enqueue('hello'); + controller.close(); } }); - c.enqueue('hello'); - c.close(); - - pipeline(rs, req, common.mustSucceed(() => { - server.close(); - })); + pipeline(rs, req, common.mustSucceed()); req.on('response', (res) => { res.on('data', (chunk) => { From 2ec9a448a565900799368aeb3706960f26e01a91 Mon Sep 17 00:00:00 2001 From: Debadree Chatterjee Date: Thu, 26 Jan 2023 22:40:29 +0530 Subject: [PATCH 06/26] fixup! separate out pump function --- lib/internal/streams/pipeline.js | 74 +++++++++++++++++++++++++++++--- 1 file changed, 69 insertions(+), 5 deletions(-) diff --git a/lib/internal/streams/pipeline.js b/lib/internal/streams/pipeline.js index 187ee31888bcbf..1a34141ca5ce93 100644 --- a/lib/internal/streams/pipeline.js +++ b/lib/internal/streams/pipeline.js @@ -38,6 +38,7 @@ const { isWritableStream, isTransformStream, isWebStream, + isReadableStream, } = require('internal/streams/utils'); const { AbortController } = require('internal/abort_controller'); @@ -98,6 +99,65 @@ async function pump(iterable, writable, finish, { end }) { let error; let onresolve = null; + const resume = (err) => { + if (err) { + error = err; + } + + if (onresolve) { + const callback = onresolve; + onresolve = null; + callback(); + } + }; + + const wait = () => new Promise((resolve, reject) => { + if (error) { + reject(error); + } else { + onresolve = () => { + if (error) { + reject(error); + } else { + resolve(); + } + }; + } + }); + + writable.on('drain', resume); + const cleanup = eos(writable, { readable: false }, resume); + + try { + if (writable.writableNeedDrain) { + await wait(); + } + + for await (const chunk of iterable) { + if (!writable.write(chunk)) { + await wait(); + } + } + + if (end) { + writable.end(); + } + + await wait(); + + finish(); + } catch (err) { + finish(error !== err ? aggregateTwoErrors(error, err) : err); + } finally { + cleanup(); + writable.off('drain', resume); + } +} + +async function pumpWeb(iterable, writable, finish, { end }) { + let error; + let onresolve = null; + if (isTransformStream(writable)) { writable = writable.writable; } @@ -353,12 +413,13 @@ function pipelineImpl(streams, callback, opts) { if (isReadable(stream) && isLastStream) { lastStreamCleanup.push(cleanup); } + } else if (isTransformStream(ret) || isReadableStream(ret)) { + const toRead = ret.readable || ret; + finishCount++; + pumpWeb(toRead, stream, finish, { end }); } else if (isIterable(ret)) { finishCount++; pump(ret, stream, finish, { end }); - } else if (isTransformStream(ret)) { - finishCount++; - pump(ret.readable, stream, finish, { end }); } else { throw new ERR_INVALID_ARG_TYPE( 'val', ['Readable', 'Iterable', 'AsyncIterable'], ret); @@ -368,11 +429,14 @@ function pipelineImpl(streams, callback, opts) { if (isReadableNodeStream(ret)) { finishCount += 2; pipeNodeToWeb(ret, stream, finish, { end }); + } else if (isReadableStream(ret)) { + finishCount++; + pumpWeb(ret, stream, finish, { end }); + } else if (isTransformStream(ret)) { + pumpWeb(ret.readable, stream, finish, { end }); } else if (isIterable(ret)) { finishCount++; pump(ret, stream, finish, { end }); - } else if (isTransformStream(ret)) { - pump(ret.readable, stream, finish, { end }); } else { throw new ERR_INVALID_ARG_TYPE( 'val', ['Readable', 'Iterable', 'AsyncIterable'], ret); From d1919d9b346f8b2b3680c38dc425e49de23ea38b Mon Sep 17 00:00:00 2001 From: Debadree Chatterjee Date: Thu, 26 Jan 2023 23:32:14 +0530 Subject: [PATCH 07/26] fixup! generator functions and related tests --- lib/internal/streams/pipeline.js | 15 ++-- test/parallel/test-webstreams-pipeline.js | 90 +++++++++++++++++++++++ 2 files changed, 97 insertions(+), 8 deletions(-) diff --git a/lib/internal/streams/pipeline.js b/lib/internal/streams/pipeline.js index 1a34141ca5ce93..285f8d79204609 100644 --- a/lib/internal/streams/pipeline.js +++ b/lib/internal/streams/pipeline.js @@ -154,7 +154,7 @@ async function pump(iterable, writable, finish, { end }) { } } -async function pumpWeb(iterable, writable, finish, { end }) { +async function pumpWeb(readable, writable, finish, { end }) { let error; let onresolve = null; @@ -207,7 +207,7 @@ async function pumpWeb(iterable, writable, finish, { end }) { endFn = writer.end.bind(writer); } - for await (const chunk of iterable) { + for await (const chunk of readable) { const written = writer.write(chunk); if (!written) { await wait(); @@ -345,11 +345,10 @@ function pipelineImpl(streams, callback, opts) { } else if (typeof stream === 'function') { if (isTransformStream(ret)) { ret = makeAsyncIterable(ret?.readable); - ret = stream(ret, { signal }); } else { ret = makeAsyncIterable(ret); - ret = stream(ret, { signal }); } + ret = stream(ret, { signal }); if (reading) { if (!isIterable(ret, true)) { @@ -390,6 +389,9 @@ function pipelineImpl(streams, callback, opts) { process.nextTick(finish, err); }, ); + } else if (isReadableStream(ret) || isTransformStream(ret)) { + finishCount++; + pumpWeb(ret, pt, finish, { end }); } else if (isIterable(ret, true)) { finishCount++; pump(ret, pt, finish, { end }); @@ -429,14 +431,11 @@ function pipelineImpl(streams, callback, opts) { if (isReadableNodeStream(ret)) { finishCount += 2; pipeNodeToWeb(ret, stream, finish, { end }); - } else if (isReadableStream(ret)) { + } else if (isReadableStream(ret) || isIterable(ret)) { finishCount++; pumpWeb(ret, stream, finish, { end }); } else if (isTransformStream(ret)) { pumpWeb(ret.readable, stream, finish, { end }); - } else if (isIterable(ret)) { - finishCount++; - pump(ret, stream, finish, { end }); } else { throw new ERR_INVALID_ARG_TYPE( 'val', ['Readable', 'Iterable', 'AsyncIterable'], ret); diff --git a/test/parallel/test-webstreams-pipeline.js b/test/parallel/test-webstreams-pipeline.js index 9e797b8badd5ba..66bdc80f53604f 100644 --- a/test/parallel/test-webstreams-pipeline.js +++ b/test/parallel/test-webstreams-pipeline.js @@ -300,3 +300,93 @@ const http = require('http'); c.error(new Error('kaboom')); } + +{ + const values = []; + let c; + const rs = new ReadableStream({ + start(controller) { + c = controller; + } + }); + + pipeline(rs, async function(source) { + for await (const chunk of source) { + values.push(chunk?.toString()); + } + }, common.mustSucceed(() => { + assert.deepStrictEqual(values, ['hello', 'world']); + })); + + c.enqueue('hello'); + c.enqueue('world'); + c.close(); +} + +{ + const rs = new ReadableStream({ + start() {} + }); + + pipeline(rs, async function(source) { + throw new Error('kaboom'); + }, (err) => { + assert.strictEqual(err?.message, 'kaboom'); + }); +} + +{ + const values = []; + let c; + const rs = new ReadableStream({ + start(controller) { + c = controller; + } + }); + + const ts = new TransformStream({ + transform(chunk, controller) { + controller.enqueue(chunk?.toString().toUpperCase()); + } + }); + + pipeline(rs, ts, async function(source) { + for await (const chunk of source) { + values.push(chunk?.toString()); + } + }, common.mustSucceed(() => { + assert.deepStrictEqual(values, ['HELLO', 'WORLD']); + })); + + c.enqueue('hello'); + c.enqueue('world'); + c.close(); +} + +{ + const values = []; + let c; + const rs = new ReadableStream({ + start(controller) { + c = controller; + } + }); + + const ws = new WritableStream({ + write(chunk) { + values.push(chunk?.toString()); + } + }); + + pipeline(rs, async function* (source) { + for await (const chunk of source) { + yield chunk?.toString().toUpperCase(); + } + }, ws, common.mustSucceed(() => { + assert.deepStrictEqual(values, ['HELLO', 'WORLD']); + })); + + c.enqueue('hello'); + c.enqueue('world'); + c.close(); +} From 6c168d8693a0fcdf27d3bb7efa7b937d5b69336b Mon Sep 17 00:00:00 2001 From: Debadree Chatterjee Date: Fri, 27 Jan 2023 01:27:42 +0530 Subject: [PATCH 08/26] fixup! refactor pump functions --- lib/internal/streams/pipeline.js | 48 ++++++++++---------------------- 1 file changed, 15 insertions(+), 33 deletions(-) diff --git a/lib/internal/streams/pipeline.js b/lib/internal/streams/pipeline.js index 285f8d79204609..209642662b4595 100644 --- a/lib/internal/streams/pipeline.js +++ b/lib/internal/streams/pipeline.js @@ -35,7 +35,6 @@ const { isReadable, isReadableNodeStream, isNodeStream, - isWritableStream, isTransformStream, isWebStream, isReadableStream, @@ -95,7 +94,7 @@ async function* fromReadable(val) { yield* lazyloadReadable().prototype[SymbolAsyncIterator].call(val); } -async function pump(iterable, writable, finish, { end }) { +async function pumpToNode(iterable, writable, finish, { end }) { let error; let onresolve = null; @@ -154,7 +153,7 @@ async function pump(iterable, writable, finish, { end }) { } } -async function pumpWeb(readable, writable, finish, { end }) { +async function pumpToWeb(readable, writable, finish, { end }) { let error; let onresolve = null; @@ -188,25 +187,10 @@ async function pumpWeb(readable, writable, finish, { end }) { } }); - if (typeof writable.on === 'function') { - writable.on('drain', resume); - } const cleanup = eos(writable, { readable: false }, resume); try { - if (writable.writableNeedDrain) { - await wait(); - } - - let writer = writable; - let endFn = () => {}; - if (isWritableStream(writable)) { - writer = writable.getWriter(); - endFn = writer.close.bind(writer); - } else { - endFn = writer.end.bind(writer); - } - + const writer = writable.getWriter(); for await (const chunk of readable) { const written = writer.write(chunk); if (!written) { @@ -215,7 +199,7 @@ async function pumpWeb(readable, writable, finish, { end }) { } if (end) { - endFn(); + writer.close(); } await wait(); @@ -225,9 +209,6 @@ async function pumpWeb(readable, writable, finish, { end }) { finish(error !== err ? aggregateTwoErrors(error, err) : err); } finally { cleanup(); - if (typeof writable.off === 'function') { - writable.off('drain', resume); - } } } @@ -389,12 +370,13 @@ function pipelineImpl(streams, callback, opts) { process.nextTick(finish, err); }, ); - } else if (isReadableStream(ret) || isTransformStream(ret)) { - finishCount++; - pumpWeb(ret, pt, finish, { end }); } else if (isIterable(ret, true)) { finishCount++; - pump(ret, pt, finish, { end }); + pumpToNode(ret, pt, finish, { end }); + } else if (isReadableStream(ret) || isTransformStream(ret)) { + const toRead = ret.readable || ret; + finishCount++; + pumpToNode(toRead, pt, finish, { end }); } else { throw new ERR_INVALID_RETURN_VALUE( 'AsyncIterable or Promise', 'destination', ret); @@ -418,13 +400,13 @@ function pipelineImpl(streams, callback, opts) { } else if (isTransformStream(ret) || isReadableStream(ret)) { const toRead = ret.readable || ret; finishCount++; - pumpWeb(toRead, stream, finish, { end }); + pumpToNode(toRead, stream, finish, { end }); } else if (isIterable(ret)) { finishCount++; - pump(ret, stream, finish, { end }); + pumpToNode(ret, stream, finish, { end }); } else { throw new ERR_INVALID_ARG_TYPE( - 'val', ['Readable', 'Iterable', 'AsyncIterable'], ret); + 'val', ['Readable', 'Iterable', 'AsyncIterable', 'ReadableStream', 'TransformStream'], ret); } ret = stream; } else if (isWebStream(stream)) { @@ -433,12 +415,12 @@ function pipelineImpl(streams, callback, opts) { pipeNodeToWeb(ret, stream, finish, { end }); } else if (isReadableStream(ret) || isIterable(ret)) { finishCount++; - pumpWeb(ret, stream, finish, { end }); + pumpToWeb(ret, stream, finish, { end }); } else if (isTransformStream(ret)) { - pumpWeb(ret.readable, stream, finish, { end }); + pumpToWeb(ret.readable, stream, finish, { end }); } else { throw new ERR_INVALID_ARG_TYPE( - 'val', ['Readable', 'Iterable', 'AsyncIterable'], ret); + 'val', ['Readable', 'Iterable', 'AsyncIterable', 'ReadableStream', 'TransformStream'], ret); } ret = stream; } else { From 68c5c7522e4b033b8be16fd88d15a6d04263dcd4 Mon Sep 17 00:00:00 2001 From: Debadree Chatterjee Date: Fri, 27 Jan 2023 01:30:24 +0530 Subject: [PATCH 09/26] fixup! writable stream require no cleanup --- lib/internal/streams/pipeline.js | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/lib/internal/streams/pipeline.js b/lib/internal/streams/pipeline.js index 209642662b4595..86b2d4c34a455d 100644 --- a/lib/internal/streams/pipeline.js +++ b/lib/internal/streams/pipeline.js @@ -187,7 +187,7 @@ async function pumpToWeb(readable, writable, finish, { end }) { } }); - const cleanup = eos(writable, { readable: false }, resume); + eos(writable, resume); try { const writer = writable.getWriter(); @@ -207,8 +207,6 @@ async function pumpToWeb(readable, writable, finish, { end }) { finish(); } catch (err) { finish(error !== err ? aggregateTwoErrors(error, err) : err); - } finally { - cleanup(); } } From 4eac7e339c41a74ccee1ea723ed429f2fab73c24 Mon Sep 17 00:00:00 2001 From: Debadree Chatterjee Date: Fri, 27 Jan 2023 14:28:44 +0530 Subject: [PATCH 10/26] fixup! use pumpWeb Co-authored-by: Robert Nagy --- lib/internal/streams/pipeline.js | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/lib/internal/streams/pipeline.js b/lib/internal/streams/pipeline.js index 86b2d4c34a455d..4f701d1c5d3f96 100644 --- a/lib/internal/streams/pipeline.js +++ b/lib/internal/streams/pipeline.js @@ -410,7 +410,7 @@ function pipelineImpl(streams, callback, opts) { } else if (isWebStream(stream)) { if (isReadableNodeStream(ret)) { finishCount += 2; - pipeNodeToWeb(ret, stream, finish, { end }); + pumpToWeb(ret, stream, finish, { end }); } else if (isReadableStream(ret) || isIterable(ret)) { finishCount++; pumpToWeb(ret, stream, finish, { end }); From 4548998330b71f8161aef542122f4baf46f67e3c Mon Sep 17 00:00:00 2001 From: Debadree Chatterjee Date: Fri, 27 Jan 2023 15:21:11 +0530 Subject: [PATCH 11/26] fixup! Co-authored-by: Robert Nagy --- lib/internal/streams/pipeline.js | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/lib/internal/streams/pipeline.js b/lib/internal/streams/pipeline.js index 4f701d1c5d3f96..9e23ccb290a017 100644 --- a/lib/internal/streams/pipeline.js +++ b/lib/internal/streams/pipeline.js @@ -192,9 +192,9 @@ async function pumpToWeb(readable, writable, finish, { end }) { try { const writer = writable.getWriter(); for await (const chunk of readable) { - const written = writer.write(chunk); + const written = await writer.write(chunk); if (!written) { - await wait(); + // TODO: What does this mean? } } From 86add319a05d66bd729982381f761daea4dd2393 Mon Sep 17 00:00:00 2001 From: Debadree Chatterjee Date: Fri, 27 Jan 2023 15:21:41 +0530 Subject: [PATCH 12/26] fixup! Co-authored-by: Robert Nagy --- lib/internal/streams/pipeline.js | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/lib/internal/streams/pipeline.js b/lib/internal/streams/pipeline.js index 9e23ccb290a017..e3587233058b15 100644 --- a/lib/internal/streams/pipeline.js +++ b/lib/internal/streams/pipeline.js @@ -199,7 +199,7 @@ async function pumpToWeb(readable, writable, finish, { end }) { } if (end) { - writer.close(); + await writer.close(); } await wait(); From 78f0cf3be700573cbf2589478bf99985f6328dad Mon Sep 17 00:00:00 2001 From: Debadree Chatterjee Date: Fri, 27 Jan 2023 15:22:15 +0530 Subject: [PATCH 13/26] fixup! Co-authored-by: Robert Nagy --- lib/internal/streams/pipeline.js | 1 - 1 file changed, 1 deletion(-) diff --git a/lib/internal/streams/pipeline.js b/lib/internal/streams/pipeline.js index e3587233058b15..0a22b8807d6d36 100644 --- a/lib/internal/streams/pipeline.js +++ b/lib/internal/streams/pipeline.js @@ -187,7 +187,6 @@ async function pumpToWeb(readable, writable, finish, { end }) { } }); - eos(writable, resume); try { const writer = writable.getWriter(); From b5879c38b2d1bef2ae9c098c5e71a3aabf35f6b4 Mon Sep 17 00:00:00 2001 From: Debadree Chatterjee Date: Fri, 27 Jan 2023 15:22:45 +0530 Subject: [PATCH 14/26] fixup! Co-authored-by: Robert Nagy --- lib/internal/streams/pipeline.js | 11 ----------- 1 file changed, 11 deletions(-) diff --git a/lib/internal/streams/pipeline.js b/lib/internal/streams/pipeline.js index 0a22b8807d6d36..90bc03a2e01dfa 100644 --- a/lib/internal/streams/pipeline.js +++ b/lib/internal/streams/pipeline.js @@ -161,17 +161,6 @@ async function pumpToWeb(readable, writable, finish, { end }) { writable = writable.writable; } - const resume = (err) => { - if (err) { - error = err; - } - - if (onresolve) { - const callback = onresolve; - onresolve = null; - callback(); - } - }; const wait = () => new Promise((resolve, reject) => { if (error) { From 6190f6b924842b4ae1c7289378699b5c1efb409f Mon Sep 17 00:00:00 2001 From: Debadree Chatterjee Date: Fri, 27 Jan 2023 15:23:17 +0530 Subject: [PATCH 15/26] fixup! Co-authored-by: Robert Nagy --- lib/internal/streams/pipeline.js | 13 ------------- 1 file changed, 13 deletions(-) diff --git a/lib/internal/streams/pipeline.js b/lib/internal/streams/pipeline.js index 90bc03a2e01dfa..c930c4262f1d42 100644 --- a/lib/internal/streams/pipeline.js +++ b/lib/internal/streams/pipeline.js @@ -162,19 +162,6 @@ async function pumpToWeb(readable, writable, finish, { end }) { } - const wait = () => new Promise((resolve, reject) => { - if (error) { - reject(error); - } else { - onresolve = () => { - if (error) { - reject(error); - } else { - resolve(); - } - }; - } - }); try { From edfffa6650f0d0580a4783402dde057cf000e002 Mon Sep 17 00:00:00 2001 From: Debadree Chatterjee Date: Fri, 27 Jan 2023 15:23:54 +0530 Subject: [PATCH 16/26] fixup! Co-authored-by: Robert Nagy --- lib/internal/streams/pipeline.js | 1 - 1 file changed, 1 deletion(-) diff --git a/lib/internal/streams/pipeline.js b/lib/internal/streams/pipeline.js index c930c4262f1d42..24dc26d32f2ae4 100644 --- a/lib/internal/streams/pipeline.js +++ b/lib/internal/streams/pipeline.js @@ -177,7 +177,6 @@ async function pumpToWeb(readable, writable, finish, { end }) { await writer.close(); } - await wait(); finish(); } catch (err) { From 37df3642a6e602f0aa0b37273ee122ff0981b5b7 Mon Sep 17 00:00:00 2001 From: Debadree Chatterjee Date: Fri, 27 Jan 2023 15:30:02 +0530 Subject: [PATCH 17/26] fixup --- lib/internal/streams/pipeline.js | 61 ++++---------------------------- 1 file changed, 7 insertions(+), 54 deletions(-) diff --git a/lib/internal/streams/pipeline.js b/lib/internal/streams/pipeline.js index 24dc26d32f2ae4..d2d7e349fc7594 100644 --- a/lib/internal/streams/pipeline.js +++ b/lib/internal/streams/pipeline.js @@ -44,13 +44,6 @@ const { AbortController } = require('internal/abort_controller'); let PassThrough; let Readable; -function lazyloadReadable() { - if (!Readable) { - Readable = require('internal/streams/readable'); - } - return Readable; -} - function destroyer(stream, reading, writing) { let finished = false; stream.on('close', () => { @@ -91,7 +84,11 @@ function makeAsyncIterable(val) { } async function* fromReadable(val) { - yield* lazyloadReadable().prototype[SymbolAsyncIterator].call(val); + if (!Readable) { + Readable = require('internal/streams/readable'); + } + + yield* Readable.prototype[SymbolAsyncIterator].call(val); } async function pumpToNode(iterable, writable, finish, { end }) { @@ -154,33 +151,24 @@ async function pumpToNode(iterable, writable, finish, { end }) { } async function pumpToWeb(readable, writable, finish, { end }) { - let error; - let onresolve = null; if (isTransformStream(writable)) { writable = writable.writable; } - - - try { const writer = writable.getWriter(); for await (const chunk of readable) { - const written = await writer.write(chunk); - if (!written) { - // TODO: What does this mean? - } + await writer.write(chunk); } if (end) { await writer.close(); } - finish(); } catch (err) { - finish(error !== err ? aggregateTwoErrors(error, err) : err); + finish(err); } } @@ -455,39 +443,4 @@ function pipe(src, dst, finish, { end }) { return eos(dst, { readable: false, writable: true }, finish); } -function pipeNodeToWeb(src, dst, finish, { end }) { - let writable = dst; - if (isTransformStream(dst)) { - writable = dst.writable; - } - const writer = writable.getWriter(); - - src.on('data', (chunk) => { - writer.write(chunk); - }); - - if (end) { - src.once('end', () => { - writer.close(); - }); - } else { - finish(); - } - eos(src, { readable: true, writable: false }, (err) => { - const rState = src._readableState; - if ( - err && - err.code === 'ERR_STREAM_PREMATURE_CLOSE' && - (rState && rState.ended && !rState.errored && !rState.errorEmitted) - ) { - src - .once('end', finish) - .once('error', finish); - } else { - finish(err); - } - }); - return eos(writable, finish); -} - module.exports = { pipelineImpl, pipeline }; From 120c4fc9937116c63463b3d161f4b580617c525b Mon Sep 17 00:00:00 2001 From: Debadree Chatterjee Date: Fri, 27 Jan 2023 18:51:18 +0530 Subject: [PATCH 18/26] fixup! replace with pump to web --- lib/internal/streams/pipeline.js | 4 ++-- test/parallel/test-webstreams-pipeline.js | 4 ++-- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/lib/internal/streams/pipeline.js b/lib/internal/streams/pipeline.js index d2d7e349fc7594..d56dde6fb73d85 100644 --- a/lib/internal/streams/pipeline.js +++ b/lib/internal/streams/pipeline.js @@ -371,8 +371,8 @@ function pipelineImpl(streams, callback, opts) { ret = stream; } else if (isWebStream(stream)) { if (isReadableNodeStream(ret)) { - finishCount += 2; - pumpToWeb(ret, stream, finish, { end }); + finishCount++; + pumpToWeb(makeAsyncIterable(ret), stream, finish, { end }); } else if (isReadableStream(ret) || isIterable(ret)) { finishCount++; pumpToWeb(ret, stream, finish, { end }); diff --git a/test/parallel/test-webstreams-pipeline.js b/test/parallel/test-webstreams-pipeline.js index 66bdc80f53604f..fb69b29590d62e 100644 --- a/test/parallel/test-webstreams-pipeline.js +++ b/test/parallel/test-webstreams-pipeline.js @@ -126,7 +126,7 @@ const http = require('http'); }); pipeline(r, ws, common.mustSucceed(() => { - assert.deepStrictEqual(values, ['hello', 'world']); + assert.deepStrictEqual(values, ['helloworld']); })); r.push('hello'); @@ -181,7 +181,7 @@ const http = require('http'); }); pipeline(rs, t, ws, common.mustSucceed(() => { - assert.deepStrictEqual(values, ['HELLO', 'WORLD']); + assert.deepStrictEqual(values, ['HELLOWORLD']); })); c.enqueue('hello'); From cd2a64090da60b7e9bcfa64acd671f576e3fd111 Mon Sep 17 00:00:00 2001 From: Debadree Chatterjee Date: Fri, 27 Jan 2023 19:27:37 +0530 Subject: [PATCH 19/26] fixup! lint --- lib/internal/streams/pipeline.js | 1 - 1 file changed, 1 deletion(-) diff --git a/lib/internal/streams/pipeline.js b/lib/internal/streams/pipeline.js index d56dde6fb73d85..8c9f7b11efbf83 100644 --- a/lib/internal/streams/pipeline.js +++ b/lib/internal/streams/pipeline.js @@ -151,7 +151,6 @@ async function pumpToNode(iterable, writable, finish, { end }) { } async function pumpToWeb(readable, writable, finish, { end }) { - if (isTransformStream(writable)) { writable = writable.writable; } From 7d8414f9b72cb565b32e273c9d6b044c4d51fcdd Mon Sep 17 00:00:00 2001 From: Debadree Chatterjee Date: Fri, 27 Jan 2023 20:05:38 +0530 Subject: [PATCH 20/26] fixup! possible backpressure fix --- lib/internal/streams/pipeline.js | 1 + 1 file changed, 1 insertion(+) diff --git a/lib/internal/streams/pipeline.js b/lib/internal/streams/pipeline.js index 8c9f7b11efbf83..30afde39e700a6 100644 --- a/lib/internal/streams/pipeline.js +++ b/lib/internal/streams/pipeline.js @@ -158,6 +158,7 @@ async function pumpToWeb(readable, writable, finish, { end }) { try { const writer = writable.getWriter(); for await (const chunk of readable) { + await writer.ready; await writer.write(chunk); } From e3c139a58f43c8253649ec3cae6862b14a0e1721 Mon Sep 17 00:00:00 2001 From: Debadree Chatterjee Date: Sat, 28 Jan 2023 10:00:39 +0530 Subject: [PATCH 21/26] fixup! possible test for backpressure --- test/parallel/test-webstreams-pipeline.js | 31 +++++++++++++++++++++++ 1 file changed, 31 insertions(+) diff --git a/test/parallel/test-webstreams-pipeline.js b/test/parallel/test-webstreams-pipeline.js index fb69b29590d62e..2a88c1a935aee6 100644 --- a/test/parallel/test-webstreams-pipeline.js +++ b/test/parallel/test-webstreams-pipeline.js @@ -1,3 +1,4 @@ +// Flags: --expose-internals --no-warnings 'use strict'; const common = require('../common'); @@ -7,6 +8,10 @@ const { pipeline: pipelinePromise } = require('stream/promises'); const { ReadableStream, WritableStream, TransformStream } = require('stream/web'); const http = require('http'); +const { + kState, +} = require('internal/webstreams/util'); + { const values = []; let c; @@ -390,3 +395,29 @@ const http = require('http'); c.enqueue('world'); c.close(); } + +{ + let c; + let lastValue; + const rs = new ReadableStream({ + start(controller) { + c = controller; + } + }); + + const ws = new WritableStream({ + write(chunk) { + lastValue = chunk?.toString(); + } + }, { highWaterMark: 1 }); + + pipeline(rs, ws, common.mustSucceed(() => { + assert.strictEqual(lastValue, '9'); + })); + + for (let i = 0; i < 10; i++) { + c.enqueue(`${i}`); + assert.strictEqual(ws[kState].writer.desiredSize, 1); + } + c.close(); +} From c1e0241589958cd749ac185cb7166f06dd44cf1d Mon Sep 17 00:00:00 2001 From: Debadree Chatterjee Date: Sat, 28 Jan 2023 16:32:54 +0530 Subject: [PATCH 22/26] fixup! update highWatermark test --- test/parallel/test-webstreams-pipeline.js | 17 +++-------------- 1 file changed, 3 insertions(+), 14 deletions(-) diff --git a/test/parallel/test-webstreams-pipeline.js b/test/parallel/test-webstreams-pipeline.js index 2a88c1a935aee6..38ec9b10f04531 100644 --- a/test/parallel/test-webstreams-pipeline.js +++ b/test/parallel/test-webstreams-pipeline.js @@ -1,4 +1,3 @@ -// Flags: --expose-internals --no-warnings 'use strict'; const common = require('../common'); @@ -8,10 +7,6 @@ const { pipeline: pipelinePromise } = require('stream/promises'); const { ReadableStream, WritableStream, TransformStream } = require('stream/web'); const http = require('http'); -const { - kState, -} = require('internal/webstreams/util'); - { const values = []; let c; @@ -398,7 +393,6 @@ const { { let c; - let lastValue; const rs = new ReadableStream({ start(controller) { c = controller; @@ -406,18 +400,13 @@ const { }); const ws = new WritableStream({ - write(chunk) { - lastValue = chunk?.toString(); - } - }, { highWaterMark: 1 }); + write(chunk) { } + }, { highWaterMark: 0 }); - pipeline(rs, ws, common.mustSucceed(() => { - assert.strictEqual(lastValue, '9'); - })); + pipeline(rs, ws, common.mustNotCall()); for (let i = 0; i < 10; i++) { c.enqueue(`${i}`); - assert.strictEqual(ws[kState].writer.desiredSize, 1); } c.close(); } From e37052bd98d57b426c8b850059c277fce6cec7d3 Mon Sep 17 00:00:00 2001 From: Debadree Chatterjee Date: Sun, 29 Jan 2023 01:47:01 +0530 Subject: [PATCH 23/26] Update lib/internal/streams/pipeline.js Co-authored-by: Robert Nagy --- lib/internal/streams/pipeline.js | 14 +++++++++++--- 1 file changed, 11 insertions(+), 3 deletions(-) diff --git a/lib/internal/streams/pipeline.js b/lib/internal/streams/pipeline.js index 30afde39e700a6..cb558d3ad4772d 100644 --- a/lib/internal/streams/pipeline.js +++ b/lib/internal/streams/pipeline.js @@ -155,20 +155,28 @@ async function pumpToWeb(readable, writable, finish, { end }) { writable = writable.writable; } + // https://streams.spec.whatwg.org/#example-manual-write-with-backpressure + const writer = writable.getWriter(); try { - const writer = writable.getWriter(); for await (const chunk of readable) { await writer.ready; - await writer.write(chunk); + writer.write(chunk).catch(() => (}); } + await writer.ready; + if (end) { await writer.close(); } finish(); } catch (err) { - finish(err); + try { + await writer.abort(err); + finish(err); + } catch (err) { + finish(err); + } } } From d2fdac41de76afd4480c97bbe8804a27b0c8ed2c Mon Sep 17 00:00:00 2001 From: Debadree Chatterjee Date: Sun, 29 Jan 2023 09:53:10 +0530 Subject: [PATCH 24/26] fixup --- lib/internal/streams/pipeline.js | 7 +++---- 1 file changed, 3 insertions(+), 4 deletions(-) diff --git a/lib/internal/streams/pipeline.js b/lib/internal/streams/pipeline.js index cb558d3ad4772d..44c0e06ee30557 100644 --- a/lib/internal/streams/pipeline.js +++ b/lib/internal/streams/pipeline.js @@ -154,17 +154,16 @@ async function pumpToWeb(readable, writable, finish, { end }) { if (isTransformStream(writable)) { writable = writable.writable; } - - // https://streams.spec.whatwg.org/#example-manual-write-with-backpressure + // https://streams.spec.whatwg.org/#example-manual-write-with-backpressure const writer = writable.getWriter(); try { for await (const chunk of readable) { await writer.ready; - writer.write(chunk).catch(() => (}); + writer.write(chunk).catch(() => {}); } await writer.ready; - + if (end) { await writer.close(); } From 4ac3dc30f9704a02e3ffaeea599dba86ff381892 Mon Sep 17 00:00:00 2001 From: Debadree Chatterjee Date: Sun, 29 Jan 2023 15:35:21 +0530 Subject: [PATCH 25/26] fixup! add doc --- doc/api/stream.md | 12 ++++++++---- 1 file changed, 8 insertions(+), 4 deletions(-) diff --git a/doc/api/stream.md b/doc/api/stream.md index 4dbb84a7a6bb1f..a21c87cb8cf207 100644 --- a/doc/api/stream.md +++ b/doc/api/stream.md @@ -2693,6 +2693,9 @@ const cleanup = finished(rs, (err) => { -* `streams` {Stream\[]|Iterable\[]|AsyncIterable\[]|Function\[]} -* `source` {Stream|Iterable|AsyncIterable|Function} +* `streams` {Stream\[]|Iterable\[]|AsyncIterable\[]|Function\[]| + ReadableStream\[]|WritableStream\[]|TransformStream\[]} +* `source` {Stream|Iterable|AsyncIterable|Function|ReadableStream} * Returns: {Iterable|AsyncIterable} -* `...transforms` {Stream|Function} +* `...transforms` {Stream|Function|TransformStream} * `source` {AsyncIterable} * Returns: {AsyncIterable} -* `destination` {Stream|Function} +* `destination` {Stream|Function|WritableStream} * `source` {AsyncIterable} * Returns: {AsyncIterable|Promise} * `callback` {Function} Called when the pipeline is fully done. From 4c2c2eb0fc2013fbedc2eed41fc4dba5dc58ae49 Mon Sep 17 00:00:00 2001 From: Debadree Chatterjee Date: Mon, 30 Jan 2023 11:34:22 +0530 Subject: [PATCH 26/26] fixup! nit on test --- test/parallel/test-webstreams-pipeline.js | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/test/parallel/test-webstreams-pipeline.js b/test/parallel/test-webstreams-pipeline.js index 38ec9b10f04531..46bdf8718ea97a 100644 --- a/test/parallel/test-webstreams-pipeline.js +++ b/test/parallel/test-webstreams-pipeline.js @@ -17,7 +17,7 @@ const http = require('http'); }); const ws = new WritableStream({ write(chunk) { - values.push(chunk?.toString()); + values.push(chunk); } });