Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ReadableStream.from(asyncIterable) #1083

Merged
merged 22 commits into from
Jun 8, 2023
Merged

Conversation

MattiasBuelens
Copy link
Collaborator

@MattiasBuelens MattiasBuelens commented Nov 10, 2020

I want to give #1018 a try. 😁

This adds a static ReadableStream.from() method, which takes an async iterable and returns a ReadableStream pulling chunks from that async iterable. Sync iterables (including arrays and generators) are also supported, since GetIterator() already has all the necessary handling to adapt a sync iterator into an async iterator. So that's nice. 😄

I think it should be fairly easy to add ReadableStream.of(...chunks) as well, which would simply call through to ReadableStream.from(chunks). I'll look into it.

This is a very early draft, so there are no tests yet. I first want to get most of the semantics done, then we can worry about all the nasty error handling and edge cases.

(See WHATWG Working Mode: Changes for more details.)


Preview | Diff

Copy link
Member

@domenic domenic left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Very nice!! I originally was thinking we should add an async iterable type (e.g. async sequence<>) to Web IDL, but probably it's best to stick with any plus manual plumbing for now until there's a second consumer on the web platform that would use it.

index.bs Outdated Show resolved Hide resolved
index.bs Outdated Show resolved Hide resolved
index.bs Outdated Show resolved Hide resolved
index.bs Outdated Show resolved Hide resolved
index.bs Outdated Show resolved Hide resolved
@MattiasBuelens MattiasBuelens force-pushed the rs-from branch 2 times, most recently from b365e91 to abe1ce3 Compare November 10, 2020 22:10
index.bs Outdated
1. Perform ! [$ReadableStreamDefaultControllerClose$](stream.[=ReadableStream/[[controller]]=]).
1. Otherwise:
1. Let |value| be ? [$IteratorValue$](|iterResult|).
1. Perform ! [$ReadableStreamDefaultControllerEnqueue$](stream.[=ReadableStream/[[controller]]=],
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This may throw if strategy.size() throws or returns an invalid chunk size. We need to catch this error, and then close the async iterator.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah, that reminds me: there's currently no way to change the queuing strategy. 😅

Do we want ReadableStream.from(asyncIterable, queuingStrategy = {})?

That might conflict with a possible future extension to pass { type: 'bytes' } to create a readable byte stream. However, we could also introduce ReadableStream.fromBytes(asyncIterable, queuingStrategy) for that, as suggested by Jake.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I suggest we leave that to a future extension. I think there should be no backwards-compatibility issues if we add strategy and/or byte support later?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No, there shouldn't be. But when we decide to add such extension in the future, we mustn't forget to turn this ! into a ? and handle the error (by closing the iterator).

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A future extension seems reasonable to me. Feel free to add a <!-- comment --> to the source file to remind us.

That said, it's hard enough to motivate implementers to add convenience features like async iteration in the first place; I worry that doing this in two pieces would reduce the chance of the second piece ever happening. Hmm.

Copy link
Collaborator Author

@MattiasBuelens MattiasBuelens Nov 19, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've added a todo for now. I'll leave this conversation open, to discuss whether we already want to add a strategy argument.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

One possible use case would be if you had an (slow) async iterable, and you wanted to quickly add buffering around it:

const bufferedReadable = ReadableStream
  .from(createSlowIterable(), new CountQueuingStrategy({ highWaterMark: 10 }));

That said, you can also achieve this with an intermediate TransformStream, which is already explained in this example:

const slowReadable = ReadableStream.from(createSlowIterable());
const bufferedReadable = slowReadable
  .pipeThrough(new TransformStream(undefined, new CountQueuingStrategy({ highWaterMark: 10 })));

(Note that this assumes that ReadableStream.from creates a stream with HWM = 0, see other thread.)

I don't know how compelling this use case is though... 🤷

reference-implementation/lib/abstract-ops/ecmascript.js Outdated Show resolved Hide resolved
index.bs Outdated Show resolved Hide resolved
index.bs Outdated Show resolved Hide resolved
index.bs Outdated Show resolved Hide resolved
index.bs Outdated Show resolved Hide resolved
index.bs Outdated Show resolved Hide resolved
index.bs Outdated
1. Perform ! [$ReadableStreamDefaultControllerClose$](stream.[=ReadableStream/[[controller]]=]).
1. Otherwise:
1. Let |value| be ? [$IteratorValue$](|iterResult|).
1. Perform ! [$ReadableStreamDefaultControllerEnqueue$](stream.[=ReadableStream/[[controller]]=],
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A future extension seems reasonable to me. Feel free to add a <!-- comment --> to the source file to remind us.

That said, it's hard enough to motivate implementers to add convenience features like async iteration in the first place; I worry that doing this in two pieces would reduce the chance of the second piece ever happening. Hmm.

index.bs Outdated Show resolved Hide resolved
index.bs Outdated Show resolved Hide resolved
index.bs Outdated Show resolved Hide resolved
index.bs Outdated Show resolved Hide resolved
reference-implementation/lib/ReadableStream-impl.js Outdated Show resolved Hide resolved
reference-implementation/lib/abstract-ops/ecmascript.js Outdated Show resolved Hide resolved
reference-implementation/lib/abstract-ops/ecmascript.js Outdated Show resolved Hide resolved
index.bs Outdated Show resolved Hide resolved
@MattiasBuelens
Copy link
Collaborator Author

MattiasBuelens commented Nov 19, 2020

I've moved the implementation to a new abstract op ReadableStreamFromIterable. Unfortunately, this marks most of the review comments as outdated. Sorry about that. 😞

I also added the suggested ReadableStream.of(...chunks) method, since it's almost trivial now.

I think I'll start working on some tests next. 🙂

Copy link
Collaborator Author

@MattiasBuelens MattiasBuelens left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Found some time to write a few tests. More to come. 🙂

Don't worry too much about reviewing this right now, it's the holiday season after all. Enjoy! 😄

@@ -1456,6 +1456,6 @@ function ReadableStreamFromIterable(asyncIterable) {
return promiseResolvedWith(returnResult);
}

stream = CreateReadableStream(startAlgorithm, pullAlgorithm, cancelAlgorithm);
stream = CreateReadableStream(startAlgorithm, pullAlgorithm, cancelAlgorithm, 0);
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Setting the high-water mark to zero ensures ReadableStream.from(otherReadableStream) doesn't immediately start pulling from otherReadableStream, and matches the default HWM for TransformStream.readable (step 5 in the TransformStream constructor).

In comparison, ReadableStream.tee() does immediately start pulling, even if no read() was called on any of the returned branches. In step 16 and 17 of the ReadableStreamTee abstract op, no value is passed for highWaterMark and thus it defaults to 1. This seems to be intentional, as there are tests that rely on this. (In particular, we rely on it to forward closing of the parent stream to the two branches.)

I think we want to follow the example of TransformStream.readable here, and use zero as HWM? Or am I missing something important?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm. I have a slight bias toward keeping our default, since it must have been picked for a reason :). In particular, I believe our historical intention has been that by default readable streams should try to keep something in their internal queue, whether that something is derived from a controller.enqueue() call in start(), or one in pull(). I think that same logic applies to deriving it from an async iterator.

Transform streams are a bit different, as we want introducing them to be more of a no-op and not cause a bunch of chunks to sit in intermediate transform-stream queues, unless explicitly requested.

On the other hand, you could view ReadableStream.from() as a sort of transform, such that the same no-op logic should apply. Hmm. @ricea, any thoughts?

Copy link
Collaborator Author

@MattiasBuelens MattiasBuelens Jan 14, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh right, I didn't update the spec text yet to pass a zero high-water mark. But I'll first wait until we decide what that high-water mark should be. 😉

Transform streams are a bit different, as we want introducing them to be more of a no-op and not cause a bunch of chunks to sit in intermediate transform-stream queues, unless explicitly requested.

Hmm... it doesn't look like that's working as intended? 😕

By default, a TransformStream is indeed created with a readable HWM of 0, but it still has a writable HWM of 1. So piping a stream into it will pull at least one chunk, to fill the writable end's queue.

var rs1 = new ReadableStream({
  pull(c) {
    console.log('pull');
    c.enqueue('a');
  }
}, { highWaterMark: 0 });
// no "pull" is logged yet
var rs2 = rs1.pipeThrough(new TransformStream());
// logs "pull" once

There's also no good way to fix this. You can't set both the readable and writable HWM to 0, since then the pipe will stall:

var rs1 = new ReadableStream({
  pull(c) {
    console.log('pull');
    c.enqueue('a');
  }
}, { highWaterMark: 0 });
var rs2 = rs1.pipeThrough(new TransformStream({}, { highWaterMark: 0 }));
var r = rs2.getReader();
await r.read(); // never resolves

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It feels like there's a connection missing somewhere... There's no way for the writable end of a transform stream to know that the readable end is being read from. 🤔

let { readable, writable } = new TransformStream({}, { highWaterMark: 0 });
let r = readable.getReader();
let w = writable.getWriter();

// we start reading from the readable end...
let read1 = r.read();

// ...but the writable end is still applying backpressure. :-/
w.desiredSize; // still 0
w.ready; // still pending

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

By default, a TransformStream is indeed created with a readable HWM of 0, but it still has a writable HWM of 1. So piping a stream into it will pull at least one chunk, to fill the writable end's queue.

Yes. As you observed, a writable stream with a HWM of 0 will always have backpressure. So adding an identity TransformStream to a pipe can't be a complete no-op: it always increases the total queue size by 1.

Hmm. I have a slight bias toward keeping our default, since it must have been picked for a reason :).

In implementation practice we seem to be setting readable HWM to 0 whenever we create a platform stream, because it permits maximum control over backpressure. So I'm not sure the default of 1 is correct.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Another reason for setting HWM to 0: you can treat ReadableStream.from() as the "inverse" of ReadableStream.prototype.values().

const it1 = ['a', 'b', 'c'];
// This async iterator should behave the same as the original
const it2 = ReadableStream.from(it1).values();
for await (const chunk of it2) { /* ... */ }
const rs1 = new ReadableStream({
  start(c) {
    c.enqueue('a');
    c.enqueue('b');
    c.enqueue('c');
    c.close();
  }
});
// This stream should behave the same as the original
const rs2 = ReadableStream.from(rs1.values());
const reader = rs2.getReader();
for (let result = await reader.read(); !result.done; result = await reader.read()) { /* ... */ }

Base automatically changed from master to main January 15, 2021 07:25
@MattiasBuelens MattiasBuelens marked this pull request as ready for review March 19, 2021 22:07
@MattiasBuelens
Copy link
Collaborator Author

I can't think of any more interesting tests to add, so I'm putting this up for review. 🙂

Quick recap of the discussions so far:

  • An optional queuingStrategy argument is currently left as a future extension. However, Domenic is concerned that implementers might not be motivated to add a convenience method if they already know that they might have to revisit it in the future.
  • The high water mark of the stream is currently set to 0 (instead of the default value of 1). The question remains whether we should keep it at 0 (to align with TransformStream.readable) or revert it to 1 (to restore the default).

Copy link
Collaborator

@ricea ricea left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

lgtm.

Sorry for the delay.

@ricea
Copy link
Collaborator

ricea commented Apr 15, 2021

Chrome is interested.

@MattiasBuelens MattiasBuelens force-pushed the rs-from branch 3 times, most recently from ca24228 to ea345b9 Compare April 15, 2021 21:27
Copy link
Collaborator

@ricea ricea left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

lgtm

@ricea
Copy link
Collaborator

ricea commented Apr 19, 2021

I realised that for synchronous iterators we could get optimum performance if we just enqueued everything in startAlgorithm. It's probably not worth the extra complexity, and I guess an iterator could return an infinite stream, which would be bad. What do you think?

@MattiasBuelens
Copy link
Collaborator Author

I think we should only do that if we are absolutely sure that the user-visible behavior is exactly the same. So we shouldn't do this with arbitrary synchronous iterators, since they can be infinite (as you correctly noted) or they might be modifying some external state inside next() (such as variables or synchronous I/O).

That doesn't mean we can't allow some optimizations. For example, if the input is an Array with its original [Symbol.iterator] method, then the implementation can indeed use a "fast path" for such a case. The same logic could apply to other built-in iterables like Set or Map or Uint8Array (although I don't know if these are worth optimizing).

@ricea
Copy link
Collaborator

ricea commented Apr 19, 2021

Thanks for the feedback. I have a suspicion that it's detectable, but I don't know how.

I'm ready to land this if you are.

@MattiasBuelens
Copy link
Collaborator Author

Good to go. 🙂

@MattiasBuelens MattiasBuelens force-pushed the rs-from branch 2 times, most recently from 574fa8e to d016390 Compare June 7, 2023 16:59
@MattiasBuelens
Copy link
Collaborator Author

@domenic or @ricea: Tests have landed and implementation bugs have been filed. Okay if I merge? 🙂

@MattiasBuelens
Copy link
Collaborator Author

I've also opened implementation bugs for Deno and Node.

I just realized that I forgot to write the "For web developers" documentation for the new from() method... 😓 Except a follow-up PR soon.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
addition/proposal New features or enhancements
Development

Successfully merging this pull request may close these issues.

8 participants