From f57a0e4d8b1b21f67fb5b01ad7ff3d9c1f60516a Mon Sep 17 00:00:00 2001 From: James M Snell Date: Fri, 30 Jul 2021 15:02:13 -0700 Subject: [PATCH] stream: utility consumers for web and node.js streams Signed-off-by: James M Snell PR-URL: https://github.com/nodejs/node/pull/39594 Reviewed-By: Matteo Collina --- doc/api/webstreams.md | 124 +++++++++++++ lib/stream/consumers.js | 84 +++++++++ test/parallel/test-stream-consumers.js | 234 +++++++++++++++++++++++++ 3 files changed, 442 insertions(+) create mode 100644 lib/stream/consumers.js create mode 100644 test/parallel/test-stream-consumers.js diff --git a/doc/api/webstreams.md b/doc/api/webstreams.md index 42f1481e3ff15f..67675d62975e90 100644 --- a/doc/api/webstreams.md +++ b/doc/api/webstreams.md @@ -1219,5 +1219,129 @@ added: v16.6.0 * Type: {WritableStream} +### Class: `CompressionStream` + +#### `new CompressionStream(format)` + + +* `format` {string} One of either `'deflate'` or `'gzip'`. + +#### `compressionStream.readable` + + +* Type: {ReadableStream} + +#### `compressionStream.writable` + + +* Type: {WritableStream} + +### Class: `DecompressionStream` + + +#### `new DecompressionStream(format)` + + +* `format` {string} One of either `'deflate'` or `'gzip'`. + +#### `decompressionStream.readable` + + +* Type: {ReadableStream} + +#### `deccompressionStream.writable` + + +* Type: {WritableStream} + +### Utility Consumers + + +The utility consumer functions provide common options for consuming +streams. + +They are accessed using: + +```mjs +import { + arrayBuffer, + blob, + json, + text, +} from 'node:stream/consumers'; +``` + +```cjs +const { + arrayBuffer, + blob, + json, + text, +} = require('stream/consumers'); +``` + +#### `streamConsumers.arrayBuffer(stream)` + + +* `stream` {ReadableStream|stream.Readable|AsyncIterator} +* Returns: {Promise} Fulfills with an `ArrayBuffer` containing the full + contents of the stream. + +#### `streamConsumers.blob(stream)` + + +* `stream` {ReadableStream|stream.Readable|AsyncIterator} +* Returns: {Promise} Fulfills with a {Blob} containing the full contents + of the stream. + +#### `streamConsumers.buffer(stream)` + + +* `stream` {ReadableStream|stream.Readable|AsyncIterator} +* Returns: {Promise} Fulfills with a {Buffer} containing the full + contents of the stream. + +#### `streamConsumers.json(stream)` + + +* `stream` {ReadableStream|stream.Readable|AsyncIterator} +* Returns: {Promise} Fulfills with the contents of the stream parsed as a + UTF-8 encoded string that is then passed through `JSON.parse()`. + +#### `streamConsumers.text(stream)` + + +* `stream` {ReadableStream|stream.Readable|AsyncIterator} +* Returns: {Promise} Fulfills with the contents of the stream parsed as a + UTF-8 encoded string. + [Streams]: stream.md [WHATWG Streams Standard]: https://streams.spec.whatwg.org/ diff --git a/lib/stream/consumers.js b/lib/stream/consumers.js new file mode 100644 index 00000000000000..ffe6e531205e7f --- /dev/null +++ b/lib/stream/consumers.js @@ -0,0 +1,84 @@ +'use strict'; + +const { + JSONParse, +} = primordials; + +const { + TextDecoder, +} = require('internal/encoding'); + +const { + Blob, +} = require('internal/blob'); + +const { + Buffer, +} = require('buffer'); + +/** + * @typedef {import('../internal/webstreams/readablestream').ReadableStream + * } ReadableStream + * @typedef {import('../internal/streams/readable')} Readable + */ + +/** + * @param {AsyncIterable|ReadableStream|Readable} stream + * @returns {Promise} + */ +async function blob(stream) { + const chunks = []; + for await (const chunk of stream) + chunks.push(chunk); + return new Blob(chunks); +} + +/** + * @param {AsyncIterable|ReadableStream|Readable} stream + * @returns {Promise} + */ +async function arrayBuffer(stream) { + const ret = await blob(stream); + return ret.arrayBuffer(); +} + +/** + * @param {AsyncIterable|ReadableStream|Readable} stream + * @returns {Promise} + */ +async function buffer(stream) { + return Buffer.from(await arrayBuffer(stream)); +} + +/** + * @param {AsyncIterable|ReadableStream|Readable} stream + * @returns {Promise} + */ +async function text(stream) { + const dec = new TextDecoder(); + let str = ''; + for await (const chunk of stream) { + if (typeof chunk === 'string') + str += chunk; + else + str += dec.decode(chunk, { stream: true }); + } + return str; +} + +/** + * @param {AsyncIterable|ReadableStream|Readable} stream + * @returns {Promise} + */ +async function json(stream) { + const str = await text(stream); + return JSONParse(str); +} + +module.exports = { + arrayBuffer, + blob, + buffer, + text, + json, +}; diff --git a/test/parallel/test-stream-consumers.js b/test/parallel/test-stream-consumers.js new file mode 100644 index 00000000000000..8f6a9deb1c27dc --- /dev/null +++ b/test/parallel/test-stream-consumers.js @@ -0,0 +1,234 @@ +// Flags: --no-warnings +'use strict'; + +const common = require('../common'); +const assert = require('assert'); + +const { + arrayBuffer, + blob, + buffer, + text, + json, +} = require('stream/consumers'); + +const { + PassThrough +} = require('stream'); + +const { + TransformStream, +} = require('stream/web'); + +const buf = Buffer.from('hellothere'); +const kArrayBuffer = + buf.buffer.slice(buf.byteOffset, buf.byteOffset + buf.byteLength); + +{ + const passthrough = new PassThrough(); + + blob(passthrough).then(common.mustCall(async (blob) => { + assert.strictEqual(blob.size, 10); + assert.deepStrictEqual(await blob.arrayBuffer(), kArrayBuffer); + })); + + passthrough.write('hello'); + setTimeout(() => passthrough.end('there'), 10); +} + +{ + const passthrough = new PassThrough(); + + arrayBuffer(passthrough).then(common.mustCall(async (ab) => { + assert.strictEqual(ab.byteLength, 10); + assert.deepStrictEqual(ab, kArrayBuffer); + })); + + passthrough.write('hello'); + setTimeout(() => passthrough.end('there'), 10); +} + +{ + const passthrough = new PassThrough(); + + buffer(passthrough).then(common.mustCall(async (buf) => { + assert.strictEqual(buf.byteLength, 10); + assert.deepStrictEqual(buf.buffer, kArrayBuffer); + })); + + passthrough.write('hello'); + setTimeout(() => passthrough.end('there'), 10); +} + + +{ + const passthrough = new PassThrough(); + + text(passthrough).then(common.mustCall(async (str) => { + assert.strictEqual(str.length, 10); + assert.deepStrictEqual(str, 'hellothere'); + })); + + passthrough.write('hello'); + setTimeout(() => passthrough.end('there'), 10); +} + +{ + const passthrough = new PassThrough(); + + json(passthrough).then(common.mustCall(async (str) => { + assert.strictEqual(str.length, 10); + assert.deepStrictEqual(str, 'hellothere'); + })); + + passthrough.write('"hello'); + setTimeout(() => passthrough.end('there"'), 10); +} + +{ + const { writable, readable } = new TransformStream(); + + blob(readable).then(common.mustCall(async (blob) => { + assert.strictEqual(blob.size, 10); + assert.deepStrictEqual(await blob.arrayBuffer(), kArrayBuffer); + })); + + const writer = writable.getWriter(); + writer.write('hello'); + setTimeout(() => { + writer.write('there'); + writer.close(); + }, 10); + + assert.rejects(blob(readable), { code: 'ERR_INVALID_STATE' }); +} + +{ + const { writable, readable } = new TransformStream(); + + arrayBuffer(readable).then(common.mustCall(async (ab) => { + assert.strictEqual(ab.byteLength, 10); + assert.deepStrictEqual(ab, kArrayBuffer); + })); + + const writer = writable.getWriter(); + writer.write('hello'); + setTimeout(() => { + writer.write('there'); + writer.close(); + }, 10); + + assert.rejects(arrayBuffer(readable), { code: 'ERR_INVALID_STATE' }); +} + +{ + const { writable, readable } = new TransformStream(); + + text(readable).then(common.mustCall(async (str) => { + assert.strictEqual(str.length, 10); + assert.deepStrictEqual(str, 'hellothere'); + })); + + const writer = writable.getWriter(); + writer.write('hello'); + setTimeout(() => { + writer.write('there'); + writer.close(); + }, 10); + + assert.rejects(text(readable), { code: 'ERR_INVALID_STATE' }); +} + +{ + const { writable, readable } = new TransformStream(); + + json(readable).then(common.mustCall(async (str) => { + assert.strictEqual(str.length, 10); + assert.deepStrictEqual(str, 'hellothere'); + })); + + const writer = writable.getWriter(); + writer.write('"hello'); + setTimeout(() => { + writer.write('there"'); + writer.close(); + }, 10); + + assert.rejects(json(readable), { code: 'ERR_INVALID_STATE' }); +} + +{ + const stream = new PassThrough({ + readableObjectMode: true, + writableObjectMode: true, + }); + + blob(stream).then(common.mustCall((blob) => { + assert.strictEqual(blob.size, 30); + })); + + stream.write({}); + stream.end({}); +} + +{ + const stream = new PassThrough({ + readableObjectMode: true, + writableObjectMode: true, + }); + + arrayBuffer(stream).then(common.mustCall((ab) => { + assert.strictEqual(ab.byteLength, 30); + assert.strictEqual( + Buffer.from(ab).toString(), + '[object Object][object Object]'); + })); + + stream.write({}); + stream.end({}); +} + +{ + const stream = new PassThrough({ + readableObjectMode: true, + writableObjectMode: true, + }); + + buffer(stream).then(common.mustCall((buf) => { + assert.strictEqual(buf.byteLength, 30); + assert.strictEqual( + buf.toString(), + '[object Object][object Object]'); + })); + + stream.write({}); + stream.end({}); +} + +{ + const stream = new PassThrough({ + readableObjectMode: true, + writableObjectMode: true, + }); + + assert.rejects(text(stream), { + code: 'ERR_INVALID_ARG_TYPE', + }); + + stream.write({}); + stream.end({}); +} + +{ + const stream = new PassThrough({ + readableObjectMode: true, + writableObjectMode: true, + }); + + assert.rejects(json(stream), { + code: 'ERR_INVALID_ARG_TYPE', + }); + + stream.write({}); + stream.end({}); +}