Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Communication between workers/windows via Streams API #244

Open
tyoshino opened this issue Nov 5, 2014 · 26 comments
Open

Communication between workers/windows via Streams API #244

tyoshino opened this issue Nov 5, 2014 · 26 comments

Comments

@tyoshino
Copy link
Member

tyoshino commented Nov 5, 2014

/cc @sicking @domenic

A separate thread to continue discussion started at #97 (comment).

@tyoshino
Copy link
Member Author

We've finalized the ReadableStream. Let's investigate the plans again based on the up-to-date ReadableStream design.

The 2 plans proposed are:

  • (a) just allow postMessage()-ing ReadableStream
  • (b) have a method to create a ReadableStream (WritableStream) to the caller and given the destination worker a WritableStream (ReadableStream) connected with each other

The key difference between them is whether the initiator creates/sees the stream for the peer or not.

(a) has the following topics to solve

  • how to neuter the ReadableStream on the original thread
  • whether we should postMessage()-ing a reader or a stream or both
  • how much of the logic (ReadableStream, ReadableStreamDefaultController, ReadableStreamDefaultReader) should stay in the original thread

Because of these reasons, I proposed (b).

Regarding the 3rd point, I think it's no longer problematic. We need to fail it when chunks and an error object passed to controller.error() are not structured clonable, but except for them, ReadableStream class itself doesn't hold anything externally given. Queuing strategy and underlying source logic are now held by the ReadableStreamDefaultController instance. So, we need to neuter (public interface part of. if difficult, maybe just error it) the ReadableStream in the original worker, create a ReadableStream in the destination worker, connect them by some mechanism to transfer the protocol between the newly created ReadableStream and ReadableStreamDefaultController (with structure cloning).

We need to investigate pipeTo() optimization story before moving forward though. #359

@wanderview
Copy link
Member

I feel kind of strongly we should just allow postMessage() of the ReadableStream. I thought we had preliminary agreement to this affect in #276.

If someone wants a WritableStream they should be able to create an identity transform, write to it, and then postMessage() the readable side.

I still like just requiring postMessage() to lock the ReadableStream and then drain the underlying source as mentioned in:

#276 (comment)

This lets the browser grab a c++ underlying source and optimize off thread, but also works for js underlying sources. Interposing a writable step seems like it would make this harder, not easier.

@wanderview
Copy link
Member

Sorry, I see your last comment in #276 now. Maybe we have more agreement than I realized.

@domenic
Copy link
Member

domenic commented Jun 23, 2016

I think both plans are valuable, but we've heard several times from multiple people at Mozilla that plan (a) is something they are very interested in, so we should probably prioritize it higher. I think we're in agreement on that. And as @wanderview says there's an easy polyfill of (b) on top of (a).

how to neuter the ReadableStream on the original thread

I think just locking it is enough of a neuter. We never need to unlock it either IMO.

whether we should postMessage()-ing a reader or a stream or both

Definitely the stream, as that allows the browser to grab a lock.

@isonmad
Copy link
Contributor

isonmad commented Oct 27, 2016

I think just locking it is enough of a neuter. We never need to unlock it either IMO.

What about if you postMessage a stream to a worker, and the worker postMessages the same stream back. Is it the same stream and the original stream gets unlocked, or is it a new object?

@domenic
Copy link
Member

domenic commented Oct 27, 2016

That would be a new object, just like when you postMessage any other non-primitive back and forth.

@isonmad
Copy link
Contributor

isonmad commented Oct 27, 2016

Actually, where should the error-checking be for when someone further up the pipe chain enqueues chunks that contain things which can't be structured-cloned?

Would there be a third type besides default and "bytes", which will throw a TypeError if you try to controller.enqueue such a chunk, like how ReadableByteStreamController throws? And only that type of stream and byte streams are transferable?

@domenic
Copy link
Member

domenic commented Oct 28, 2016

Actually, where should the error-checking be for when someone further up the pipe chain enqueues chunks that contain things which can't be structured-cloned?

It's a good question. I think what would happen is that, as the browser reads the chunks in the background, it tries to structured clone. Any errors go to "report the exception" (i.e. fire an error event on the global scope; show up in the console). At that point we probably cancel the source readable stream, and make the readable stream on the other side of the boundary error.

Would there be a third type besides default and "bytes", which will throw a TypeError if you try to controller.enqueue such a chunk, like how ReadableByteStreamController throws? And only that type of stream and byte streams are transferable?

That seems over-restrictive, and not really necessary. Also you can't really tell if something is cloneable until you try to clone it (e.g. it may have a throwing getter property). So we couldn't really perform this check until we're ready to do the clone/transfer.


The larger issue this might be considered blocked on is whatwg/html#935; see in particular my comment whatwg/html#935 (comment).

@isonmad
Copy link
Contributor

isonmad commented Oct 30, 2016

you can't really tell if something is cloneable until you try to clone it (e.g. it may have a throwing getter property). So we couldn't really perform this check until we're ready to do the clone/transfer.

Is cloning synchronously on enqueue, instead of on dequeue, not feasible/performant? I guess the queue has to stay in the underlyingSource's thread so that desiredSize is always known synchronously, but can you not serialize on enqueue, and deserialize on dequeue?

Actually I always wondered why structured-clone isn't ever exposed as an API to javascript. Does any implementation of it exist outside of the engines themselves? realistic-structured-clone tries but it says it's really incomplete (can't handle Map, Set, ArrayBuffer, etc).

@sicking
Copy link

sicking commented Oct 30, 2016

Structured cloning has to happen synchronously when the data is put into
the stream. Otherwise it is very unclear what data is actually written.
Consider for example:

x = { a: 1 };
writer.write(x);
x.a = 2;

Or

x = { a: 1 };
writer.write(x);
someButton.onclick = () =>
  { x.a = 2; };

This is why structured cloning happens synchronously in functions like
port.postMessage() and objectStore.put().

@isonmad
Copy link
Contributor

isonmad commented Oct 30, 2016

Yeah, that makes sense. I was focusing on the opposite case, transferred ReadableStreams, where the source enqueues things and it would be cloned/serialized synchronously at enqueue time, before the destination realm reads/dequeues it. But yeah, for a transferred WritableStream, it would be like you said. The destination realm would be enqueueing things at writer.write time, and putting the cloned/serialized chunk in the queue at the underlyingSink's thread, before it actually gets dequeued it at sink.write time.

isonmad pushed a commit to isonmad/streams that referenced this issue Dec 3, 2016
As [[storedError]] is observable, can be an arbitrary object,
and is very likely an uncloneable Error, it can't be sent to
a new realm reliably. So just forbid errored streams.

Still needs clearer semantics of when structured cloning occurs
and how DataCloneErrors are reported.

Cloning needs polyfilling somehow too.

Related to: whatwg#244, whatwg#276
isonmad pushed a commit to isonmad/streams that referenced this issue Dec 3, 2016
As [[storedError]] is observable, can be an arbitrary object,
and is very likely an uncloneable Error, it can't be sent to
a new realm reliably. So just forbid errored streams.

Still needs clearer semantics of when structured cloning occurs
and how DataCloneErrors are reported.

Cloning needs polyfilling somehow too.

Related to: whatwg#244, whatwg#276
isonmad pushed a commit to isonmad/streams that referenced this issue Dec 4, 2016
As [[storedError]] is observable, can be an arbitrary object,
and is very likely an uncloneable Error, it can't be sent to
a new realm reliably. So just forbid errored streams.

Still needs clearer semantics of when structured cloning occurs
and how DataCloneErrors are reported.

Cloning needs polyfilling somehow too.

Related to: whatwg#244, whatwg#276
@isonmad
Copy link
Contributor

isonmad commented Jan 29, 2017

There's still the big question of when exactly various writable stream write() promises fulfill to communicate backpressure across event loops I wrote up in #629 (comment)

@ricea
Copy link
Collaborator

ricea commented Apr 23, 2018

Assuming we want chunks to be transferred where possible, rather than copied, we have three main options on the table:

1. Opportunistic greedy transfer.

Any part of the chunk that is transferable is transferred, everything else is copied. Example:
{ name: 'carpool', value: new Uint8Array([1, 2, 3, 4]) }
Here name is copied and value is transferred.

Advantages:

  • Easy to use

Disadvantages:

  • "Opportunistic greedy transfer" algorithm needs to be specified somewhere.
  • Can lead to surprising behaviour for deeply-nested objects, for example:
    { thirdPartyObject: { internalThing: Uint8Array } }
    Here the author has nested an object from third-party code inside their own object. The nested object
    contains a transferrable field, and is unexpectedly broken by being passed as a chunk. In general
    passing arbitrary third-party objects as chunks is not going to work well, so maybe this is okay.

2. Provided by strategy.

An extra function is added to the strategy which provides a list of objects that are to be transferred when the stream is transferred. Example
strategy = { transfer(chunk) { return [chunk.value]; } }
would transfer the value property of any chunk, and copy everything else.

Advantages:

  • Easy to understand
  • Allows re-use of existing postMessage machinery--no extra algorithm needs to be standardised.

Disadvantages:

  • The transfer() function cannot itself be transferred, which means that if a stream is transferred twice it will lose its ability to transfer chunks.
  • Developers who want to be able to transfer arbitrary objects are going to have to write a lot of code.

3. New meta-protocol.

Objects which are capable of being transferred contain metadata saying how to do it. Example:
{ name: 'fitboot', value: new Uint8Array([3,5,7]), [Symbol.transfer]: ? }

Advantages:

  • A general solution which can be made to work for postMessage() too.

Disadvantages:

  • There's no way to explicitly say that the top-level object is transferable.
  • Not clear what the meta-protocol should be.
  • Probably requires the most standards and implementation work of any of the options.

@yutakahirano
Copy link
Member

Regarding 2, TransformStream constructor has strategy options, having a strategy for postMessage doesn't sound so strange.

@domenic
Copy link
Member

domenic commented Apr 23, 2018

For (3), I'm assuming the protocol would be a function of some sort. (Either one that returns the transferable parts, or one that somehow "does the transfer".) Doesn't that reintroduce

The transfer() function cannot itself be transferred, which means that if a stream is transferred twice it will lose its ability to transfer chunks.

to (3) as well?

To me, the transfer-twice problem is the worst issue here. Everything else in the disadvantages columns seems solvable.

@ricea
Copy link
Collaborator

ricea commented Apr 24, 2018

For (3), I'm assuming the protocol would be a function of some sort. (Either one that returns the transferable parts, or one that somehow "does the transfer".) Doesn't that reintroduce

The transfer() function cannot itself be transferred, which means that if a stream is transferred twice it will lose its ability to transfer chunks.

to (3) as well?

I've been wondering whether a meta-protocol as simple as

[Symbol.transferKeys] : ['value']

could work. For a nested object you'd end up with something like

{
  key: 'outer',
  value: {
    label: 'car',
    data: new Uint8Array([1, 2]),
    [Symbol.transferKeys]: ['data']
  },
  [Symbol.transferKeys]: ['value']
}

It seems overly simplistic, but I haven't come up with a case where it wouldn't work yet.

To me, the transfer-twice problem is the worst issue here. Everything else in the disadvantages columns seems solvable.

Thinking about it further, I realised it can be a problem even with a single transfer. Consider these steps:

  1. Create a WritableStream on the main thread, with transfer(chunk) { return [chunk.value]; } in its strategy.
  2. Transfer it to a Worker
  3. Call ws.getWriter().write({value: new Uint8Array([1, 2])}) in the Worker.

The intent is that value gets transferred rather than copied, but since the transfer() function cannot be executed in the Worker, value has to be copied to transfer the chunk to the main thread.

@domenic
Copy link
Member

domenic commented Apr 24, 2018

Hmm, I see. That seems to basically work, as far as I can tell...

So as much as I like (2)'s ergonomics, it seems like both (1) and your version of (3) are more workable. Let me comment on why I think the disadvantages are OK in both cases, before going to sleep:

On (1):

"Opportunistic greedy transfer" algorithm needs to be specified somewhere.

Not a big deal. I think we'd integrate it into StructuredSerializeWithTransfer, which already is doing object-graph crawling.

Can lead to surprising behaviour for deeply-nested objects, for example

Since this is opt-in on the stream level, this seems fine. (I.e., we aren't applying this to every ReadableStream.) It seems like an OK thing to say that passing your chunks to a transferred readable stream means that not only will you never see the readable stream again, you'll never see your chunks again.

On (3):

There's no way to explicitly say that the top-level object is transferable.

Hmm, OK, this is an issue. Somehow I missed this earlier.

Right now I'm feeling [Symbol.transferable]: '*' vs. [Symbol.transferable]: ['value'].

Not clear what the meta-protocol should be.

I think you've answered this :).

Probably requires the most standards and implementation work of any of the options.

Not so bad. The hardest thing is picking a place to park the symbol. (I'm not sure putting web platform stuff on Symbol is a great idea.)

Once we've done that, it's similar spec work to (1); we need to insert some steps into StructuredSerializeWithTransfer.

It does have more wide-ranging effects, but the spec and implementation work should be similar, I think: just add some extra auto-discovery-of-transferableness steps inside the existing graph-crawling.

@ricea
Copy link
Collaborator

ricea commented Apr 24, 2018

I had a discussion with @yutakahirano about transferring strategies. He proposed the mental model of a transfer consisting of creating a special kind of TransformStream with one leg in the source context and one leg in the destination context. In the destination context you receive one of these legs directly from the MessageEvent, and in the source context the leg is piped to/from the source stream. I think this model is very helpful.

We talked about various ways of attaching a strategy to a WritableStream after it had arrived in the destination context, for example with an attachStrategy() method. This would enable (2)'s transfer() function to be used. However, the attachStrategy() method could only be used when the queue is empty (because otherwise it changes the meaning of queueTotalSize, which is bad).

We discussed changing postMessage() to be able to specify what the strategy would be on the destination side, but it doesn't really fit into the API.

I brought up the issue that write(chunk) should perform the transfer synchronously, ie.

const chunk = new ArrayBuffer(10);
writer.write(chunk);
console.log(chunk.byteLength);

should log 0. The reasoning behind this is that if the chunk was transferred asynchronously at some later time it would be confusing and error-prone.

This means the chunk needs to be cloned into the target thread immediately. However, something still needs to be queued on the sending side in order for backpressure to work properly. I think we need some kind of placeholder that represents the chunk in the queue on the sending side until the chunk is read on the receiving side.

We concluded that if we do not have transfer() as part of the strategy, then the best way to customise the strategy in the transferred stream is just to put a TransformStream in front of it with the desired strategy.

We also discussed what happens if you try to transfer locked streams. Ideally postMessage() would throw. Maybe this also applies if IsDisturbed() is true?

@domenic
Copy link
Member

domenic commented Apr 25, 2018

We talked about various ways of attaching a strategy to a WritableStream after it had arrived in the destination context, for example with an attachStrategy() method.

This is interesting, but seems pretty un-ergonomic.

We discussed changing postMessage() to be able to specify what the strategy would be on the destination side, but it doesn't really fit into the API.

Even if we put it in, I'm not sure how it would solve the essential problem that you can't transfer functions across threads :(.

I brought up the issue that write(chunk) should perform the transfer synchronousl

Agreed. This and your subsequent conclusions all make sense.

An alternate approach would be to do two transfers: one same-realm that transfers from the producer's control into the internal queue on the sender side, and one that transfers from the sender side to the receiver side. Not sure if that's better.

We concluded that if we do not have transfer() as part of the strategy, then the best way to customise the strategy in the transferred stream is just to put a TransformStream in front of it with the desired strategy.

I'm not quite sure I follow this case.

We also discussed what happens if you try to transfer locked streams. Ideally postMessage() would throw.

Oh, definitely. To be transferable you need a [[Detached]] slot, and the contract is you're supposed to set that to true once you get transferred, so that future attempts to transfer (or clone) throw. So on a spec level at least this is pretty much a given.

Maybe this also applies if IsDisturbed() is true?

I still see IsDisturbed as kind of weird. But maybe the reasons we had for using it in fetch also apply here? I dunno.

@yutakahirano
Copy link
Member

I'll be happy if postMessage doesn't care about IsDisturbed.

@ricea
Copy link
Collaborator

ricea commented May 1, 2018

An alternate approach would be to do two transfers: one same-realm that transfers from the producer's control into the internal queue on the sender side, and one that transfers from the sender side to the receiver side. Not sure if that's better.

I think in practice implementations will want to do only one transfer, to avoid walking the object graph twice. Unless it makes the standard hugely more complicated, I'd rather spec it the way browsers will implement it.

We concluded that if we do not have transfer() as part of the strategy, then the best way to customise the strategy in the transferred stream is just to put a TransformStream in front of it with the desired strategy.

I'm not quite sure I follow this case.

Let's say a developer transfers a readable stream to a worker, and wants to have 64 KB of buffering in the worker in addition to whatever buffering is configured in the main page. Then it would work just as well to write

const rs = transferredReadableStream.pipeThrough(
  new TransformStream({}, new ByteLengthStrategy({highWaterMark: 65536})));

as to write

rs.addStrategy(new ByteLengthStrategy({highWaterMark: 65536}));

So there's no need for an API like addStrategy() to exist to support this use case.

@ricea
Copy link
Collaborator

ricea commented May 1, 2018

I'll be happy if postMessage doesn't care about IsDisturbed.

Yes. I've thought about it some more and I don't think there's a need for it to look at IsDisturbed.

@ricea
Copy link
Collaborator

ricea commented Jun 5, 2018

My current plan is to do the work in two stages. The first stage will only clone chunks. This will be inefficient for ArrayBuffers and transferring stream-of-streams will still not be supported.

As a second stage, the write() and enqueue() APIs will gain a second transferList argument. When this argument is used, the chunks will be serialised immediately and queued in their serialised form. They will then be deserialised before being returned from read() or passed to underlying sink write() or transformer transform(). In particular, they will not be deserialised when sent through a pipe.

The new syntax can be used with an empty transferList to get "unshared object" semantics, meaning that changing an object after passing it to a stream will have no impact on the object that is returned from the stream. This is a similar idea to how the byte stream API protects against concurrent modification of the data, and is nice from the point of view of enforcing correctness.

@chcunningham
Copy link

Hi @ricea - What is the timeline for implementing the plan described in the comment above?

@ricea
Copy link
Collaborator

ricea commented Jul 21, 2020

Hi @ricea - What is the timeline for implementing the plan described in the comment above?

I have a PR in progress for stage 1 #1053 which I hope to land soon. It can be tested in Chrome using the --enable-experimental-web-platform-features flag.

Stage 2 is still in the early design stages and I wouldn't expect it to be implemented this year.

@alvestrand
Copy link

Tagging @guidou for interest.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Development

No branches or pull requests

10 participants