Skip to content

Commit

Permalink
stream: utility consumers for web and node.js streams
Browse files Browse the repository at this point in the history
Signed-off-by: James M Snell <jasnell@gmail.com>
  • Loading branch information
jasnell committed Aug 3, 2021
1 parent dd18795 commit 344dba7
Show file tree
Hide file tree
Showing 3 changed files with 315 additions and 0 deletions.
73 changes: 73 additions & 0 deletions doc/api/webstreams.md
Original file line number Diff line number Diff line change
Expand Up @@ -1270,5 +1270,78 @@ added: REPLACEME
* Type: {WritableStream}
### Utility Consumers
<!-- YAML
added: REPLACEME
-->
The utility consumer functions provide common options for consuming
streams.
They are accessed using:
```mjs
import {
arrayBuffer,
blob,
json,
text,
} from 'node:stream/consumers';
```
```cjs
const {
arrayBuffer,
blob,
json,
text,
} = require('stream/consumers');
```
#### `streamConsumers.arrayBuffer(stream)`
<!-- YAML
added: REPLACEME
-->
* `stream` {ReadableStream|stream.Readable|AsyncIterator}
* Returns: {Promise} Fulfills with an `ArrayBuffer` containing the full
contents of the stream.
#### `streamConsumers.blob(stream)`
<!-- YAML
added: REPLACEME
-->
* `stream` {ReadableStream|stream.Readable|AsyncIterator}
* Returns: {Promise} Fulfills with a {Blob} containing the full contents
of the stream.
#### `streamConsumers.buffer(stream)`
<!-- YAML
added: REPLACEME
-->
* `stream` {ReadableStream|stream.Readable|AsyncIterator}
* Returns: {Promise} Fulfills with a {Buffer} containing the full
contents of the stream.
#### `streamConsumers.json(stream)`
<!-- YAML
added: REPLACEME
-->
* `stream` {ReadableStream|stream.Readable|AsyncIterator}
* Returns: {Promise} Fulfills with the contents of the stream parsed as a
UTF-8 encoded string that is then passed through `JSON.parse()`.
#### `streamConsumers.text(stream)`
<!-- YAML
added: REPLACEME
-->
* `stream` {ReadableStream|stream.Readable|AsyncIterator}
* Returns: {Promise} Fulfills with the contents of the stream parsed as a
UTF-8 encoded string.
[Streams]: stream.md
[WHATWG Streams Standard]: https://streams.spec.whatwg.org/
84 changes: 84 additions & 0 deletions lib/stream/consumers.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,84 @@
'use strict';

const {
JSONParse,
} = primordials;

const {
TextDecoder,
} = require('internal/encoding');

const {
Blob,
} = require('internal/blob');

const {
Buffer,
} = require('buffer');

/**
* @typedef {import('../internal/webstreams/readablestream').ReadableStream
* } ReadableStream
* @typedef {import('../internal/streams/readable')} Readable
*/

/**
* @param {AsyncIterable|ReadableStream|Readable} stream
* @returns {Promise<Blob>}
*/
async function blob(stream) {
const chunks = [];
for await (const chunk of stream)
chunks.push(chunk);
return new Blob(chunks);
}

/**
* @param {AsyncIterable|ReadableStream|Readable} stream
* @returns {Promise<ArrayBuffer>}
*/
async function arrayBuffer(stream) {
const ret = await blob(stream);
return ret.arrayBuffer();
}

/**
* @param {AsyncIterable|ReadableStream|Readable} stream
* @returns {Promise<Buffer>}
*/
async function buffer(stream) {
return Buffer.from(await arrayBuffer(stream));
}

/**
* @param {AsyncIterable|ReadableStream|Readable} stream
* @returns {Promise<string>}
*/
async function text(stream) {
const dec = new TextDecoder(undefined, { stream: true });
let str = '';
for await (const chunk of stream) {
if (typeof chunk === 'string')
str += chunk;
else
str += dec.decode(chunk);
}
return str;
}

/**
* @param {AsyncIterable|ReadableStream|Readable} stream
* @returns {Promise<any>}
*/
async function json(stream) {
const str = await text(stream);
return JSONParse(str);
}

module.exports = {
arrayBuffer,
blob,
buffer,
text,
json,
};
158 changes: 158 additions & 0 deletions test/parallel/test-stream-consumers.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,158 @@
// Flags: --no-warnings
'use strict';

const common = require('../common');
const assert = require('assert');

const {
arrayBuffer,
blob,
buffer,
text,
json,
} = require('stream/consumers');

const {
PassThrough
} = require('stream');

const {
TransformStream,
} = require('stream/web');

const buf = Buffer.from('hellothere');
const kArrayBuffer =
buf.buffer.slice(buf.byteOffset, buf.byteOffset + buf.byteLength);

{
const passthrough = new PassThrough();

blob(passthrough).then(common.mustCall(async (blob) => {
assert.strictEqual(blob.size, 10);
assert.deepStrictEqual(await blob.arrayBuffer(), kArrayBuffer);
}));

passthrough.write('hello');
setTimeout(() => passthrough.end('there'), 10);
}

{
const passthrough = new PassThrough();

arrayBuffer(passthrough).then(common.mustCall(async (ab) => {
assert.strictEqual(ab.byteLength, 10);
assert.deepStrictEqual(ab, kArrayBuffer);
}));

passthrough.write('hello');
setTimeout(() => passthrough.end('there'), 10);
}

{
const passthrough = new PassThrough();

buffer(passthrough).then(common.mustCall(async (buf) => {
assert.strictEqual(buf.byteLength, 10);
assert.deepStrictEqual(buf.buffer, kArrayBuffer);
}));

passthrough.write('hello');
setTimeout(() => passthrough.end('there'), 10);
}


{
const passthrough = new PassThrough();

text(passthrough).then(common.mustCall(async (str) => {
assert.strictEqual(str.length, 10);
assert.deepStrictEqual(str, 'hellothere');
}));

passthrough.write('hello');
setTimeout(() => passthrough.end('there'), 10);
}

{
const passthrough = new PassThrough();

json(passthrough).then(common.mustCall(async (str) => {
assert.strictEqual(str.length, 10);
assert.deepStrictEqual(str, 'hellothere');
}));

passthrough.write('"hello');
setTimeout(() => passthrough.end('there"'), 10);
}

{
const { writable, readable } = new TransformStream();

blob(readable).then(common.mustCall(async (blob) => {
assert.strictEqual(blob.size, 10);
assert.deepStrictEqual(await blob.arrayBuffer(), kArrayBuffer);
}));

const writer = writable.getWriter();
writer.write('hello');
setTimeout(() => {
writer.write('there');
writer.close();
}, 10);

assert.rejects(blob(readable), { code: 'ERR_INVALID_STATE' });
}

{
const { writable, readable } = new TransformStream();

arrayBuffer(readable).then(common.mustCall(async (ab) => {
assert.strictEqual(ab.byteLength, 10);
assert.deepStrictEqual(ab, kArrayBuffer);
}));

const writer = writable.getWriter();
writer.write('hello');
setTimeout(() => {
writer.write('there');
writer.close();
}, 10);

assert.rejects(arrayBuffer(readable), { code: 'ERR_INVALID_STATE' });
}

{
const { writable, readable } = new TransformStream();

text(readable).then(common.mustCall(async (str) => {
assert.strictEqual(str.length, 10);
assert.deepStrictEqual(str, 'hellothere');
}));

const writer = writable.getWriter();
writer.write('hello');
setTimeout(() => {
writer.write('there');
writer.close();
}, 10);

assert.rejects(text(readable), { code: 'ERR_INVALID_STATE' });
}

{
const { writable, readable } = new TransformStream();

json(readable).then(common.mustCall(async (str) => {
assert.strictEqual(str.length, 10);
assert.deepStrictEqual(str, 'hellothere');
}));

const writer = writable.getWriter();
writer.write('"hello');
setTimeout(() => {
writer.write('there"');
writer.close();
}, 10);

assert.rejects(json(readable), { code: 'ERR_INVALID_STATE' });
}

0 comments on commit 344dba7

Please sign in to comment.