From d9378db951d126db226a0b06b126504672b161d1 Mon Sep 17 00:00:00 2001 From: Xuguang Mei Date: Thu, 3 Mar 2022 12:53:59 +0800 Subject: [PATCH] stream: use .chunk when calling adapters's writev Fix: https://github.com/nodejs/node/issues/42157 PR-URL: https://github.com/nodejs/node/pull/42161 Fixes: https://github.com/nodejs/node/issues/42157 Reviewed-By: Benjamin Gruenbaum Reviewed-By: Matteo Collina --- lib/internal/webstreams/adapters.js | 10 +- ...twg-webstreams-adapters-to-streamduplex.js | 166 +++++++++++++ ...g-webstreams-adapters-to-streamwritable.js | 234 ++++++++++++++++++ 3 files changed, 406 insertions(+), 4 deletions(-) create mode 100644 test/parallel/test-whatwg-webstreams-adapters-to-streamduplex.js create mode 100644 test/parallel/test-whatwg-webstreams-adapters-to-streamwritable.js diff --git a/lib/internal/webstreams/adapters.js b/lib/internal/webstreams/adapters.js index a81c173e4714e9..1058c1c0356ead 100644 --- a/lib/internal/webstreams/adapters.js +++ b/lib/internal/webstreams/adapters.js @@ -228,8 +228,9 @@ function newStreamWritableFromWritableStream(writableStream, options = {}) { writev(chunks, callback) { function done(error) { + error = error.filter((e) => e); try { - callback(error); + callback(error.length === 0 ? undefined : error); } catch (error) { // In a next tick because this is happening within // a promise context, and if there are any errors @@ -247,7 +248,7 @@ function newStreamWritableFromWritableStream(writableStream, options = {}) { PromiseAll( ArrayPrototypeMap( chunks, - (chunk) => writer.write(chunk))), + (data) => writer.write(data.chunk))), done, done); }, @@ -633,8 +634,9 @@ function newStreamDuplexFromReadableWritablePair(pair = {}, options = {}) { writev(chunks, callback) { function done(error) { + error = error.filter((e) => e); try { - callback(error); + callback(error.length === 0 ? undefined : error); } catch (error) { // In a next tick because this is happening within // a promise context, and if there are any errors @@ -652,7 +654,7 @@ function newStreamDuplexFromReadableWritablePair(pair = {}, options = {}) { PromiseAll( ArrayPrototypeMap( chunks, - (chunk) => writer.write(chunk))), + (data) => writer.write(data.chunk))), done, done); }, diff --git a/test/parallel/test-whatwg-webstreams-adapters-to-streamduplex.js b/test/parallel/test-whatwg-webstreams-adapters-to-streamduplex.js new file mode 100644 index 00000000000000..15ac9f832714e9 --- /dev/null +++ b/test/parallel/test-whatwg-webstreams-adapters-to-streamduplex.js @@ -0,0 +1,166 @@ +// Flags: --no-warnings --expose-internals +'use strict'; + +const common = require('../common'); + +const assert = require('assert'); + +const { + TransformStream, +} = require('stream/web'); + +const { + newStreamDuplexFromReadableWritablePair, +} = require('internal/webstreams/adapters'); + +const { + finished, + pipeline, + Readable, + Writable, +} = require('stream'); + +const { + kState, +} = require('internal/webstreams/util'); + +{ + const transform = new TransformStream(); + const duplex = newStreamDuplexFromReadableWritablePair(transform); + + assert(transform.readable.locked); + assert(transform.writable.locked); + + duplex.destroy(); + + duplex.on('close', common.mustCall(() => { + assert.strictEqual(transform.readable[kState].state, 'closed'); + assert.strictEqual(transform.writable[kState].state, 'errored'); + })); +} + +{ + const error = new Error('boom'); + const transform = new TransformStream(); + const duplex = newStreamDuplexFromReadableWritablePair(transform); + + assert(transform.readable.locked); + assert(transform.writable.locked); + + duplex.destroy(error); + duplex.on('error', common.mustCall((reason) => { + assert.strictEqual(reason, error); + })); + + duplex.on('close', common.mustCall(() => { + assert.strictEqual(transform.readable[kState].state, 'closed'); + assert.strictEqual(transform.writable[kState].state, 'errored'); + assert.strictEqual(transform.writable[kState].storedError, error); + })); +} + +{ + const transform = new TransformStream(); + const duplex = new newStreamDuplexFromReadableWritablePair(transform); + + duplex.end(); + duplex.resume(); + + duplex.on('close', common.mustCall(() => { + assert.strictEqual(transform.readable[kState].state, 'closed'); + assert.strictEqual(transform.writable[kState].state, 'closed'); + })); +} + +{ + const ec = new TextEncoder(); + const dc = new TextDecoder(); + const transform = new TransformStream({ + transform(chunk, controller) { + const text = dc.decode(chunk); + controller.enqueue(ec.encode(text.toUpperCase())); + } + }); + const duplex = new newStreamDuplexFromReadableWritablePair(transform, { + encoding: 'utf8', + }); + + duplex.end('hello'); + duplex.on('data', common.mustCall((chunk) => { + assert.strictEqual(chunk, 'HELLO'); + })); + duplex.on('end', common.mustCall()); + + duplex.on('close', common.mustCall(() => { + assert.strictEqual(transform.readable[kState].state, 'closed'); + assert.strictEqual(transform.writable[kState].state, 'closed'); + })); +} + +{ + const ec = new TextEncoder(); + const dc = new TextDecoder(); + const transform = new TransformStream({ + transform: common.mustCall((chunk, controller) => { + const text = dc.decode(chunk); + controller.enqueue(ec.encode(text.toUpperCase())); + }) + }); + const duplex = new newStreamDuplexFromReadableWritablePair(transform, { + encoding: 'utf8', + }); + + finished(duplex, common.mustCall()); + + duplex.end('hello'); + duplex.resume(); +} + +{ + const ec = new TextEncoder(); + const dc = new TextDecoder(); + const transform = new TransformStream({ + transform: common.mustCall((chunk, controller) => { + const text = dc.decode(chunk); + controller.enqueue(ec.encode(text.toUpperCase())); + }) + }); + const duplex = new newStreamDuplexFromReadableWritablePair(transform, { + encoding: 'utf8', + }); + + const readable = new Readable({ + read() { + readable.push(Buffer.from('hello')); + readable.push(null); + } + }); + + const writable = new Writable({ + write: common.mustCall((chunk, encoding, callback) => { + assert.strictEqual(dc.decode(chunk), 'HELLO'); + assert.strictEqual(encoding, 'buffer'); + callback(); + }) + }); + + finished(duplex, common.mustCall()); + pipeline(readable, duplex, writable, common.mustCall()); +} + +{ + const transform = new TransformStream(); + const duplex = newStreamDuplexFromReadableWritablePair(transform); + duplex.setEncoding('utf-8'); + duplex.on('data', common.mustCall((data) => { + assert.strictEqual(data, 'hello'); + }, 5)); + + duplex.write(Buffer.from('hello')); + duplex.write(Buffer.from('hello')); + duplex.write(Buffer.from('hello')); + duplex.write(Buffer.from('hello')); + duplex.write(Buffer.from('hello')); + + duplex.end(); +} diff --git a/test/parallel/test-whatwg-webstreams-adapters-to-streamwritable.js b/test/parallel/test-whatwg-webstreams-adapters-to-streamwritable.js new file mode 100644 index 00000000000000..495eef73f79272 --- /dev/null +++ b/test/parallel/test-whatwg-webstreams-adapters-to-streamwritable.js @@ -0,0 +1,234 @@ +// Flags: --no-warnings --expose-internals +'use strict'; + +const common = require('../common'); + +const assert = require('assert'); + +const { + WritableStream, +} = require('stream/web'); + +const { + newStreamWritableFromWritableStream, +} = require('internal/webstreams/adapters'); + +const { + finished, + pipeline, + Readable, +} = require('stream'); + +const { + kState, +} = require('internal/webstreams/util'); + +class TestSource { + constructor() { + this.chunks = []; + } + + start(c) { + this.controller = c; + this.started = true; + } + + write(chunk) { + this.chunks.push(chunk); + } + + close() { + this.closed = true; + } + + abort(reason) { + this.abortReason = reason; + } +} + +[1, {}, false, []].forEach((arg) => { + assert.throws(() => newStreamWritableFromWritableStream(arg), { + code: 'ERR_INVALID_ARG_TYPE', + }); +}); + +{ + // Ending the stream.Writable should close the writableStream + const source = new TestSource(); + const writableStream = new WritableStream(source); + const writable = newStreamWritableFromWritableStream(writableStream); + + assert(writableStream.locked); + + writable.end('chunk'); + + writable.on('close', common.mustCall(() => { + assert(writableStream.locked); + assert.strictEqual(writableStream[kState].state, 'closed'); + assert.strictEqual(source.chunks.length, 1); + assert.deepStrictEqual(source.chunks[0], Buffer.from('chunk')); + })); +} + +{ + // Destroying the stream.Writable without an error should close + // the writableStream with no error. + const source = new TestSource(); + const writableStream = new WritableStream(source); + const writable = newStreamWritableFromWritableStream(writableStream); + + assert(writableStream.locked); + + writable.destroy(); + + writable.on('close', common.mustCall(() => { + assert(writableStream.locked); + assert.strictEqual(writableStream[kState].state, 'closed'); + assert.strictEqual(source.chunks.length, 0); + })); +} + +{ + // Destroying the stream.Writable with an error should error + // the writableStream + const error = new Error('boom'); + const source = new TestSource(); + const writableStream = new WritableStream(source); + const writable = newStreamWritableFromWritableStream(writableStream); + + assert(writableStream.locked); + + writable.destroy(error); + + writable.on('error', common.mustCall((reason) => { + assert.strictEqual(reason, error); + })); + + writable.on('close', common.mustCall(() => { + assert(writableStream.locked); + assert.strictEqual(writableStream[kState].state, 'errored'); + assert.strictEqual(writableStream[kState].storedError, error); + assert.strictEqual(source.chunks.length, 0); + })); +} + +{ + // Attempting to close, abort, or getWriter on writableStream + // should fail because it is locked. An internal error in + // writableStream should error the writable. + const error = new Error('boom'); + const source = new TestSource(); + const writableStream = new WritableStream(source); + const writable = newStreamWritableFromWritableStream(writableStream); + + assert(writableStream.locked); + + assert.rejects(writableStream.close(), { + code: 'ERR_INVALID_STATE', + }); + + assert.rejects(writableStream.abort(), { + code: 'ERR_INVALID_STATE', + }); + + assert.throws(() => writableStream.getWriter(), { + code: 'ERR_INVALID_STATE', + }); + + writable.on('error', common.mustCall((reason) => { + assert.strictEqual(error, reason); + })); + + source.controller.error(error); +} + +{ + const source = new TestSource(); + const writableStream = new WritableStream(source); + const writable = newStreamWritableFromWritableStream(writableStream); + + writable.on('error', common.mustNotCall()); + writable.on('finish', common.mustCall()); + writable.on('close', common.mustCall(() => { + assert.strictEqual(source.chunks.length, 1); + assert.deepStrictEqual(source.chunks[0], Buffer.from('hello')); + })); + + writable.write('hello', common.mustCall()); + writable.end(); +} + +{ + const source = new TestSource(); + const writableStream = new WritableStream(source); + const writable = + newStreamWritableFromWritableStream(writableStream, { + decodeStrings: false, + }); + + writable.on('error', common.mustNotCall()); + writable.on('finish', common.mustCall()); + writable.on('close', common.mustCall(() => { + assert.strictEqual(source.chunks.length, 1); + assert.strictEqual(source.chunks[0], 'hello'); + })); + + writable.write('hello', common.mustCall()); + writable.end(); +} + +{ + const source = new TestSource(); + const writableStream = new WritableStream(source); + const writable = + newStreamWritableFromWritableStream( + writableStream, { + objectMode: true + }); + assert(writable.writableObjectMode); + + writable.on('error', common.mustNotCall()); + writable.on('finish', common.mustCall()); + writable.on('close', common.mustCall(() => { + assert.strictEqual(source.chunks.length, 1); + assert.strictEqual(source.chunks[0], 'hello'); + })); + + writable.write('hello', common.mustCall()); + writable.end(); +} + +{ + const writableStream = new WritableStream({ + write: common.mustCall(5), + close: common.mustCall(), + }); + const writable = newStreamWritableFromWritableStream(writableStream); + + finished(writable, common.mustCall()); + + writable.write('hello'); + writable.write('hello'); + writable.write('hello'); + writable.write('world'); + writable.write('world'); + writable.end(); +} + +{ + const writableStream = new WritableStream({ + write: common.mustCall(2), + close: common.mustCall(), + }); + const writable = newStreamWritableFromWritableStream(writableStream); + + const readable = new Readable({ + read() { + readable.push(Buffer.from('hello')); + readable.push(Buffer.from('world')); + readable.push(null); + } + }); + + pipeline(readable, writable, common.mustCall()); +}