Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

stream: implement ReadableStream.from #48395

Merged
merged 1 commit into from
Jul 18, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
43 changes: 43 additions & 0 deletions doc/api/webstreams.md
Original file line number Diff line number Diff line change
Expand Up @@ -387,6 +387,49 @@ port1.onmessage = ({ data }) => {
port2.postMessage(stream, [stream]);
```

### `ReadableStream.from(iterable)`

<!-- YAML
added: REPLACEME
-->

* `iterable` {Iterable} Object implementing the `Symbol.asyncIterator` or
`Symbol.iterator` iterable protocol.

A utility method that creates a new {ReadableStream} from an iterable.

```mjs
import { ReadableStream } from 'node:stream/web';

async function* asyncIterableGenerator() {
yield 'a';
yield 'b';
yield 'c';
}

const stream = ReadableStream.from(asyncIterableGenerator());

for await (const chunk of stream)
console.log(chunk); // Prints 'a', 'b', 'c'
```

```cjs
const { ReadableStream } = require('node:stream/web');

async function* asyncIterableGenerator() {
yield 'a';
yield 'b';
yield 'c';
}

(async () => {
const stream = ReadableStream.from(asyncIterableGenerator());

for await (const chunk of stream)
console.log(chunk); // Prints 'a', 'b', 'c'
})();
```

### Class: `ReadableStreamDefaultReader`

<!-- YAML
Expand Down
59 changes: 59 additions & 0 deletions lib/internal/webstreams/readablestream.js
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,8 @@ const {
nonOpCancel,
nonOpPull,
nonOpStart,
getIterator,
iteratorNext,
kType,
kState,
} = require('internal/webstreams/util');
Expand Down Expand Up @@ -312,6 +314,10 @@ class ReadableStream {
return isReadableStreamLocked(this);
}

static from(iterable) {
return readableStreamFromIterable(iterable);
}

/**
* @param {any} [reason]
* @returns { Promise<void> }
Expand Down Expand Up @@ -1248,6 +1254,59 @@ const isReadableStreamBYOBReader =

// ---- ReadableStream Implementation

function readableStreamFromIterable(iterable) {
let stream;
const iteratorRecord = getIterator(iterable, 'async');

const startAlgorithm = nonOpStart;

async function pullAlgorithm() {
const nextResult = iteratorNext(iteratorRecord);
const nextPromise = PromiseResolve(nextResult);
return PromisePrototypeThen(nextPromise, (iterResult) => {
if (typeof iterResult !== 'object' || iterResult === null) {
throw new ERR_INVALID_STATE.TypeError(
'The promise returned by the iterator.next() method must fulfill with an object');
}
if (iterResult.done) {
readableStreamDefaultControllerClose(stream[kState].controller);
} else {
readableStreamDefaultControllerEnqueue(stream[kState].controller, iterResult.value);
}
});
}

async function cancelAlgorithm(reason) {
const iterator = iteratorRecord.iterator;
const returnMethod = iterator.return;
if (returnMethod === undefined) {
return PromiseResolve();
}
const returnResult = FunctionPrototypeCall(returnMethod, iterator, reason);
const returnPromise = PromiseResolve(returnResult);
return PromisePrototypeThen(returnPromise, (iterResult) => {
if (typeof iterResult !== 'object' || iterResult === null) {
throw new ERR_INVALID_STATE.TypeError(
'The promise returned by the iterator.return() method must fulfill with an object');
}
return undefined;
});
}

stream = new ReadableStream({
start: startAlgorithm,
pull: pullAlgorithm,
cancel: cancelAlgorithm,
}, {
size() {
return 1;
},
highWaterMark: 0,
});

return stream;
}

function readableStreamPipeTo(
source,
dest,
Expand Down
53 changes: 53 additions & 0 deletions lib/internal/webstreams/util.js
Original file line number Diff line number Diff line change
Expand Up @@ -13,13 +13,16 @@ const {
PromiseReject,
ReflectGet,
Symbol,
SymbolAsyncIterator,
SymbolIterator,
Uint8Array,
} = primordials;

const {
codes: {
ERR_INVALID_ARG_VALUE,
ERR_OPERATION_FAILED,
ERR_INVALID_STATE,
},
} = require('internal/errors');

Expand Down Expand Up @@ -217,6 +220,54 @@ function lazyTransfer() {
return transfer;
}

function createAsyncFromSyncIterator(syncIteratorRecord) {
const syncIterable = {
[SymbolIterator]: () => syncIteratorRecord.iterator,
};

const asyncIterator = (async function* () {
return yield* syncIterable;
}());

const nextMethod = asyncIterator.next;
return { iterator: asyncIterator, nextMethod, done: false };
}

function getIterator(obj, kind = 'sync', method) {
if (method === undefined) {
if (kind === 'async') {
method = obj[SymbolAsyncIterator];
if (method === undefined) {
const syncMethod = obj[SymbolIterator];
const syncIteratorRecord = getIterator(obj, 'sync', syncMethod);
return createAsyncFromSyncIterator(syncIteratorRecord);
}
} else {
method = obj[SymbolIterator];
}
}

const iterator = FunctionPrototypeCall(method, obj);
if (typeof iterator !== 'object' || iterator === null) {
throw new ERR_INVALID_STATE.TypeError('The iterator method must return an object');
}
const nextMethod = iterator.next;
return { iterator, nextMethod, done: false };
}
aduh95 marked this conversation as resolved.
Show resolved Hide resolved

function iteratorNext(iteratorRecord, value) {
let result;
if (value === undefined) {
result = FunctionPrototypeCall(iteratorRecord.nextMethod, iteratorRecord.iterator);
} else {
result = FunctionPrototypeCall(iteratorRecord.nextMethod, iteratorRecord.iterator, [value]);
}
debadree25 marked this conversation as resolved.
Show resolved Hide resolved
if (typeof result !== 'object' || result === null) {
throw new ERR_INVALID_STATE.TypeError('The iterator.next() method must return an object');
}
return result;
}

module.exports = {
ArrayBufferViewGetBuffer,
ArrayBufferViewGetByteLength,
Expand All @@ -243,6 +294,8 @@ module.exports = {
nonOpPull,
nonOpStart,
nonOpWrite,
getIterator,
iteratorNext,
kType,
kState,
};
2 changes: 1 addition & 1 deletion test/fixtures/wpt/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ Last update:
- performance-timeline: https://github.com/web-platform-tests/wpt/tree/17ebc3aea0/performance-timeline
- resource-timing: https://github.com/web-platform-tests/wpt/tree/22d38586d0/resource-timing
- resources: https://github.com/web-platform-tests/wpt/tree/919874f84f/resources
- streams: https://github.com/web-platform-tests/wpt/tree/51750bc8d7/streams
- streams: https://github.com/web-platform-tests/wpt/tree/517e945bbf/streams
- url: https://github.com/web-platform-tests/wpt/tree/84782d9315/url
- user-timing: https://github.com/web-platform-tests/wpt/tree/5ae85bf826/user-timing
- wasm/jsapi: https://github.com/web-platform-tests/wpt/tree/cde25e7e3c/wasm/jsapi
Expand Down
15 changes: 15 additions & 0 deletions test/fixtures/wpt/streams/piping/general-addition.any.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
// META: global=window,worker
'use strict';

promise_test(async t => {
/** @type {ReadableStreamDefaultController} */
var con;
let synchronous = false;
new ReadableStream({ start(c) { con = c }}, { highWaterMark: 0 }).pipeTo(
new WritableStream({ write() { synchronous = true; } })
)
// wait until start algorithm finishes
await Promise.resolve();
con.enqueue();
assert_false(synchronous, 'write algorithm must not run synchronously');
}, "enqueue() must not synchronously call write algorithm");
Loading