Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ReadableStream.from(asyncIterable) #1083

Merged
merged 22 commits into from
Jun 8, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
22 commits
Select commit Hold shift + click to select a range
13f2911
Add ReadableStream.from(asyncIterable)
MattiasBuelens Nov 10, 2020
c946567
Format literals as code
MattiasBuelens Nov 10, 2020
9b72815
Handle sync errors from calling return()
MattiasBuelens Nov 10, 2020
f931fc2
Move startAlgorithm steps to main steps
MattiasBuelens Nov 10, 2020
5c48420
Pass cancel reason to return()
MattiasBuelens Nov 10, 2020
8e151d6
Add ReadableStreamFromIterable abstract op
MattiasBuelens Nov 17, 2020
0a8f260
Add ReadableStream.of(...chunks)
MattiasBuelens Nov 17, 2020
d065c85
Fix GetIterator call
MattiasBuelens Nov 19, 2020
991477e
Fix highlighting for |stream|
MattiasBuelens Nov 19, 2020
0d07b3d
Use Reflect.apply
MattiasBuelens Nov 19, 2020
99b858d
Fix "Type(x) is Object" checks
MattiasBuelens Nov 19, 2020
76080da
Add todo if we want to allow changing the queuing strategy
MattiasBuelens Nov 19, 2020
ca0daf7
Revert "Add ReadableStream.of(...chunks)"
MattiasBuelens Nov 19, 2020
0724c91
Fix Call abstract op
MattiasBuelens Dec 26, 2020
523f3d0
Implement CreateAsyncFromSyncIterator abstract op
MattiasBuelens Dec 26, 2020
7b8db87
Set HWM to 0 for ReadableStream.from()
MattiasBuelens Dec 26, 2020
078a335
Update spec text to set HWM to 0
MattiasBuelens Feb 8, 2021
3337ff7
Add error message
MattiasBuelens Apr 15, 2021
5275f3b
Add more error messages
MattiasBuelens Apr 15, 2021
1796989
Throw if iterator.return() does not fulfill with an object
MattiasBuelens Apr 4, 2023
d016390
Roll WPT
MattiasBuelens Jun 7, 2023
bf5f360
Put abstract op in alphabetical order
MattiasBuelens Jun 7, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
58 changes: 58 additions & 0 deletions index.bs
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ urlPrefix: https://tc39.es/ecma262/; spec: ECMASCRIPT
text: Number type; url: #sec-ecmascript-language-types-number-type
text: Data Block; url: #sec-data-blocks
type: abstract-op
text: Call; url: #sec-call
text: CloneArrayBuffer; url: #sec-clonearraybuffer
text: CopyDataBlockBytes; url: #sec-copydatablockbytes
text: CreateArrayFromList; url: #sec-createarrayfromlist
Expand All @@ -53,9 +54,14 @@ urlPrefix: https://tc39.es/ecma262/; spec: ECMASCRIPT
text: Construct; url: #sec-construct
text: DetachArrayBuffer; url: #sec-detacharraybuffer
text: Get; url: #sec-get-o-p
text: GetIterator; url: #sec-getiterator
text: GetMethod; url: #sec-getmethod
text: GetV; url: #sec-getv
text: IsDetachedBuffer; url: #sec-isdetachedbuffer
text: IsInteger; url: #sec-isinteger
text: IteratorComplete; url: #sec-iteratorcomplete
text: IteratorNext; url: #sec-iteratornext
text: IteratorValue; url: #sec-iteratorvalue
text: OrdinaryObjectCreate; url: #sec-ordinaryobjectcreate
text: SameValue; url: #sec-samevalue
text: Type; url: #sec-ecmascript-data-types-and-values
Expand Down Expand Up @@ -478,6 +484,8 @@ The Web IDL definition for the {{ReadableStream}} class is given as follows:
interface ReadableStream {
constructor(optional object underlyingSource, optional QueuingStrategy strategy = {});

static ReadableStream from(any asyncIterable);
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It would be excellent to see this move forward. With regards to the question about the special case for byte sources, the signature here can echo the signature for the new ReadableStream() constructor...

const readable1 = ReadableStream.from(iter, new CountQueueingStrategy({ highWaterMark: 10 });

const readable2 = ReadableStream.from({ source: iter, type: 'bytes', autoAllocateChunkSize: 1024 });

That is, make the first argument either a dictionary or an iterator...

dictionary FromInit {
  (AsyncIterator | Iterator) source;
  ReadableStreamType type;
  unsigned long long autoAllocateChunkSize;
}

Copy link
Collaborator Author

@MattiasBuelens MattiasBuelens Oct 11, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't feel like there's much precedent for an API that accepts two different types of objects (Iterable or FromInit). What should happen if you pass an object that is both Iterable and FromInit?

const readable = ReadableStream.from({ source: iter1, [Symbol.iterator]: () => iter2 });
// will this iterate over iter1 or iter2?

What's the autoAllocateChunkSize for? We don't really have a way to expose the BYOB request to the async iterable. Or do you expect it to be passed as an argument to next()?

const readable = ReadableStream.from({
  type: bytes,
  autoAllocateChunkSize: 1024,
  source: {
    next(view) {
      // view is controller.byobRequest.view
      // Fill it with some data
      view[0] = 0xaa;
      view[1] = 0xbb;
      view[2] = 0xcc;
      // Return the filled sub-view.
      // (This doesn't currently work, since enqueue() transfers the given view's buffer
      // and the first pull-into descriptor's buffer separately.)
      return { done: false, value: view.subarray(0, 3) };
    },
    [Symbol.iterator]() {
      return this;
    }
  }
});

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's fine. It was largely just a strawman anyway. I'm just as happy deferring any support for BYOB in this mechanism.


readonly attribute boolean locked;

Promise<undefined> cancel(optional any reason);
Expand Down Expand Up @@ -808,6 +816,13 @@ option. If {{UnderlyingSource/type}} is set to undefined (including via omission
|underlyingSource|, |underlyingSourceDict|, |highWaterMark|, |sizeAlgorithm|).
</div>

<div algorithm>
The static <dfn id="rs-from" method for="ReadableStream">from(|asyncIterable|)</dfn> method steps
are:

1. Return ? [$ReadableStreamFromIterable$](|asyncIterable|).
</div>

<div algorithm>
The <dfn id="rs-locked" attribute for="ReadableStream">locked</dfn> getter steps are:

Expand Down Expand Up @@ -2095,6 +2110,49 @@ The following abstract operations operate on {{ReadableStream}} instances at a h
1. Return true.
</div>

<div algorithm>
<dfn abstract-op lt="ReadableStreamFromIterable" id="readable-stream-from-iterable">
ReadableStreamFromIterable(|asyncIterable|)</dfn> performs the following steps:

1. Let |stream| be undefined.
1. Let |iteratorRecord| be ? [$GetIterator$](|asyncIterable|, async).
1. Let |startAlgorithm| be an algorithm that returns undefined.
1. Let |pullAlgorithm| be the following steps:
1. Let |nextResult| be [$IteratorNext$](|iteratorRecord|).
1. If |nextResult| is an abrupt completion, return [=a promise rejected with=]
|nextResult|.\[[Value]].
1. Let |nextPromise| be [=a promise resolved with=] |nextResult|.\[[Value]].
1. Return the result of [=reacting=] to |nextPromise| with the following fulfillment steps,
given |iterResult|:
1. If [$Type$](|iterResult|) is not Object, throw a {{TypeError}}.
1. Let |done| be ? [$IteratorComplete$](|iterResult|).
1. If |done| is true:
1. Perform ! [$ReadableStreamDefaultControllerClose$](|stream|.[=ReadableStream/[[controller]]=]).
1. Otherwise:
1. Let |value| be ? [$IteratorValue$](|iterResult|).
1. Perform ! [$ReadableStreamDefaultControllerEnqueue$](|stream|.[=ReadableStream/[[controller]]=],
|value|).
<!-- TODO (future): If we allow changing the queuing strategy, this Enqueue might throw.
We'll then need to catch the error and close the async iterator. -->
1. Let |cancelAlgorithm| be the following steps, given |reason|:
1. Let |iterator| be |iteratorRecord|.\[[Iterator]].
1. Let |returnMethod| be [$GetMethod$](|iterator|, "`return`").
1. If |returnMethod| is an abrupt completion, return [=a promise rejected with=]
|returnMethod|.\[[Value]].
1. If |returnMethod|.\[[Value]] is undefined, return [=a promise resolved with=] undefined.
1. Let |returnResult| be [$Call$](|returnMethod|.\[[Value]], |iterator|, « |reason| »).
1. If |returnResult| is an abrupt completion, return [=a promise rejected with=]
|returnResult|.\[[Value]].
1. Let |returnPromise| be [=a promise resolved with=] |returnResult|.\[[Value]].
1. Return the result of [=reacting=] to |returnPromise| with the following fulfillment steps,
given |iterResult|:
1. If [$Type$](|iterResult|) is not Object, throw a {{TypeError}}.
1. Return undefined.
1. Set |stream| to ! [$CreateReadableStream$](|startAlgorithm|, |pullAlgorithm|, |cancelAlgorithm|,
0).
1. Return |stream|.
</div>

<div algorithm="ReadableStreamPipeTo">
<dfn abstract-op lt="ReadableStreamPipeTo"
id="readable-stream-pipe-to">ReadableStreamPipeTo(|source|, |dest|, |preventClose|, |preventAbort|,
Expand Down
4 changes: 4 additions & 0 deletions reference-implementation/lib/ReadableStream-impl.js
Original file line number Diff line number Diff line change
Expand Up @@ -151,6 +151,10 @@ exports.implementation = class ReadableStreamImpl {
aos.ReadableStreamDefaultReaderRelease(reader);
return promiseResolvedWith(undefined);
}

static from(asyncIterable) {
return aos.ReadableStreamFromIterable(asyncIterable);
}
};

// See pipeTo()/pipeThrough() for why this is needed.
Expand Down
2 changes: 2 additions & 0 deletions reference-implementation/lib/ReadableStream.webidl
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@
interface ReadableStream {
constructor(optional object underlyingSource, optional QueuingStrategy strategy = {});

static ReadableStream from(any asyncIterable);
MattiasBuelens marked this conversation as resolved.
Show resolved Hide resolved

readonly attribute boolean locked;

Promise<undefined> cancel(optional any reason);
Expand Down
83 changes: 83 additions & 0 deletions reference-implementation/lib/abstract-ops/ecmascript.js
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@ const assert = require('assert');

const isFakeDetached = Symbol('is "detached" for our purposes');

exports.typeIsObject = x => (typeof x === 'object' && x !== null) || typeof x === 'function';

exports.CreateArrayFromList = elements => {
// We use arrays to represent lists, so this is basically a no-op.
// Do a slice though just in case we happen to depend on the unique-ness.
Expand Down Expand Up @@ -39,3 +41,84 @@ exports.CanTransferArrayBuffer = O => {
exports.IsDetachedBuffer = O => {
return isFakeDetached in O;
};

exports.Call = (F, V, args = []) => {
if (typeof F !== 'function') {
throw new TypeError('Argument is not a function');
}

return Reflect.apply(F, V, args);
};

exports.GetMethod = (V, P) => {
const func = V[P];
if (func === undefined || func === null) {
return undefined;
}
if (typeof func !== 'function') {
throw new TypeError(`${P} is not a function`);
}
return func;
};

exports.CreateAsyncFromSyncIterator = syncIteratorRecord => {
// Instead of re-implementing CreateAsyncFromSyncIterator and %AsyncFromSyncIteratorPrototype%,
// we use yield* inside an async generator function to achieve the same result.

// Wrap the sync iterator inside a sync iterable, so we can use it with yield*.
const syncIterable = {
[Symbol.iterator]: () => syncIteratorRecord.iterator
};
// Create an async generator function and immediately invoke it.
const asyncIterator = (async function* () {
return yield* syncIterable;
}());
// Return as an async iterator record.
const nextMethod = asyncIterator.next;
return { iterator: asyncIterator, nextMethod, done: false };
};

exports.GetIterator = (obj, hint = 'sync', method) => {
assert(hint === 'sync' || hint === 'async');
if (method === undefined) {
if (hint === 'async') {
method = exports.GetMethod(obj, Symbol.asyncIterator);
if (method === undefined) {
const syncMethod = exports.GetMethod(obj, Symbol.iterator);
const syncIteratorRecord = exports.GetIterator(obj, 'sync', syncMethod);
return exports.CreateAsyncFromSyncIterator(syncIteratorRecord);
}
} else {
method = exports.GetMethod(obj, Symbol.iterator);
}
}
const iterator = exports.Call(method, obj);
if (!exports.typeIsObject(iterator)) {
throw new TypeError('The iterator method must return an object');
}
const nextMethod = iterator.next;
return { iterator, nextMethod, done: false };
};

exports.IteratorNext = (iteratorRecord, value) => {
let result;
if (value === undefined) {
result = exports.Call(iteratorRecord.nextMethod, iteratorRecord.iterator);
} else {
result = exports.Call(iteratorRecord.nextMethod, iteratorRecord.iterator, [value]);
}
if (!exports.typeIsObject(result)) {
throw new TypeError('The iterator.next() method must return an object');
}
return result;
};

exports.IteratorComplete = iterResult => {
assert(exports.typeIsObject(iterResult));
return Boolean(iterResult.done);
};

exports.IteratorValue = iterResult => {
assert(exports.typeIsObject(iterResult));
return iterResult.value;
};
63 changes: 61 additions & 2 deletions reference-implementation/lib/abstract-ops/readable-streams.js
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,8 @@ const assert = require('assert');
const { promiseResolvedWith, promiseRejectedWith, newPromise, resolvePromise, rejectPromise, uponPromise,
setPromiseIsHandledToTrue, waitForAllPromise, transformPromiseWith, uponFulfillment, uponRejection } =
require('../helpers/webidl.js');
const { CanTransferArrayBuffer, CopyDataBlockBytes, CreateArrayFromList, IsDetachedBuffer, TransferArrayBuffer } =
require('./ecmascript.js');
const { CanTransferArrayBuffer, Call, CopyDataBlockBytes, CreateArrayFromList, GetIterator, GetMethod, IsDetachedBuffer,
IteratorComplete, IteratorNext, IteratorValue, TransferArrayBuffer, typeIsObject } = require('./ecmascript.js');
const { CloneAsUint8Array, IsNonNegativeNumber } = require('./miscellaneous.js');
const { EnqueueValueWithSize, ResetQueue } = require('./queue-with-sizes.js');
const { AcquireWritableStreamDefaultWriter, IsWritableStreamLocked, WritableStreamAbort,
Expand Down Expand Up @@ -55,6 +55,7 @@ Object.assign(exports, {
ReadableStreamDefaultControllerHasBackpressure,
ReadableStreamDefaultReaderRead,
ReadableStreamDefaultReaderRelease,
ReadableStreamFromIterable,
ReadableStreamGetNumReadRequests,
ReadableStreamHasDefaultReader,
ReadableStreamPipeTo,
Expand Down Expand Up @@ -1879,3 +1880,61 @@ function SetUpReadableByteStreamControllerFromUnderlyingSource(
stream, controller, startAlgorithm, pullAlgorithm, cancelAlgorithm, highWaterMark, autoAllocateChunkSize
);
}

function ReadableStreamFromIterable(asyncIterable) {
let stream;
const iteratorRecord = GetIterator(asyncIterable, 'async');

const startAlgorithm = () => undefined;

function pullAlgorithm() {
let nextResult;
try {
nextResult = IteratorNext(iteratorRecord);
} catch (e) {
return promiseRejectedWith(e);
}
const nextPromise = promiseResolvedWith(nextResult);
return transformPromiseWith(nextPromise, iterResult => {
if (!typeIsObject(iterResult)) {
throw new TypeError('The promise returned by the iterator.next() method must fulfill with an object');
}
const done = IteratorComplete(iterResult);
if (done === true) {
ReadableStreamDefaultControllerClose(stream._controller);
} else {
const value = IteratorValue(iterResult);
ReadableStreamDefaultControllerEnqueue(stream._controller, value);
}
});
}

function cancelAlgorithm(reason) {
const iterator = iteratorRecord.iterator;
let returnMethod;
try {
returnMethod = GetMethod(iterator, 'return');
} catch (e) {
return promiseRejectedWith(e);
}
if (returnMethod === undefined) {
return promiseResolvedWith(undefined);
}
let returnResult;
try {
returnResult = Call(returnMethod, iterator, [reason]);
} catch (e) {
return promiseRejectedWith(e);
}
const returnPromise = promiseResolvedWith(returnResult);
return transformPromiseWith(returnPromise, iterResult => {
if (!typeIsObject(iterResult)) {
throw new TypeError('The promise returned by the iterator.return() method must fulfill with an object');
}
return undefined;
});
}

stream = CreateReadableStream(startAlgorithm, pullAlgorithm, cancelAlgorithm, 0);
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Setting the high-water mark to zero ensures ReadableStream.from(otherReadableStream) doesn't immediately start pulling from otherReadableStream, and matches the default HWM for TransformStream.readable (step 5 in the TransformStream constructor).

In comparison, ReadableStream.tee() does immediately start pulling, even if no read() was called on any of the returned branches. In step 16 and 17 of the ReadableStreamTee abstract op, no value is passed for highWaterMark and thus it defaults to 1. This seems to be intentional, as there are tests that rely on this. (In particular, we rely on it to forward closing of the parent stream to the two branches.)

I think we want to follow the example of TransformStream.readable here, and use zero as HWM? Or am I missing something important?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm. I have a slight bias toward keeping our default, since it must have been picked for a reason :). In particular, I believe our historical intention has been that by default readable streams should try to keep something in their internal queue, whether that something is derived from a controller.enqueue() call in start(), or one in pull(). I think that same logic applies to deriving it from an async iterator.

Transform streams are a bit different, as we want introducing them to be more of a no-op and not cause a bunch of chunks to sit in intermediate transform-stream queues, unless explicitly requested.

On the other hand, you could view ReadableStream.from() as a sort of transform, such that the same no-op logic should apply. Hmm. @ricea, any thoughts?

Copy link
Collaborator Author

@MattiasBuelens MattiasBuelens Jan 14, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh right, I didn't update the spec text yet to pass a zero high-water mark. But I'll first wait until we decide what that high-water mark should be. 😉

Transform streams are a bit different, as we want introducing them to be more of a no-op and not cause a bunch of chunks to sit in intermediate transform-stream queues, unless explicitly requested.

Hmm... it doesn't look like that's working as intended? 😕

By default, a TransformStream is indeed created with a readable HWM of 0, but it still has a writable HWM of 1. So piping a stream into it will pull at least one chunk, to fill the writable end's queue.

var rs1 = new ReadableStream({
  pull(c) {
    console.log('pull');
    c.enqueue('a');
  }
}, { highWaterMark: 0 });
// no "pull" is logged yet
var rs2 = rs1.pipeThrough(new TransformStream());
// logs "pull" once

There's also no good way to fix this. You can't set both the readable and writable HWM to 0, since then the pipe will stall:

var rs1 = new ReadableStream({
  pull(c) {
    console.log('pull');
    c.enqueue('a');
  }
}, { highWaterMark: 0 });
var rs2 = rs1.pipeThrough(new TransformStream({}, { highWaterMark: 0 }));
var r = rs2.getReader();
await r.read(); // never resolves

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It feels like there's a connection missing somewhere... There's no way for the writable end of a transform stream to know that the readable end is being read from. 🤔

let { readable, writable } = new TransformStream({}, { highWaterMark: 0 });
let r = readable.getReader();
let w = writable.getWriter();

// we start reading from the readable end...
let read1 = r.read();

// ...but the writable end is still applying backpressure. :-/
w.desiredSize; // still 0
w.ready; // still pending

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

By default, a TransformStream is indeed created with a readable HWM of 0, but it still has a writable HWM of 1. So piping a stream into it will pull at least one chunk, to fill the writable end's queue.

Yes. As you observed, a writable stream with a HWM of 0 will always have backpressure. So adding an identity TransformStream to a pipe can't be a complete no-op: it always increases the total queue size by 1.

Hmm. I have a slight bias toward keeping our default, since it must have been picked for a reason :).

In implementation practice we seem to be setting readable HWM to 0 whenever we create a platform stream, because it permits maximum control over backpressure. So I'm not sure the default of 1 is correct.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Another reason for setting HWM to 0: you can treat ReadableStream.from() as the "inverse" of ReadableStream.prototype.values().

const it1 = ['a', 'b', 'c'];
// This async iterator should behave the same as the original
const it2 = ReadableStream.from(it1).values();
for await (const chunk of it2) { /* ... */ }
const rs1 = new ReadableStream({
  start(c) {
    c.enqueue('a');
    c.enqueue('b');
    c.enqueue('c');
    c.close();
  }
});
// This stream should behave the same as the original
const rs2 = ReadableStream.from(rs1.values());
const reader = rs2.getReader();
for (let result = await reader.read(); !result.done; result = await reader.read()) { /* ... */ }

return stream;
}