Skip to content

Commit

Permalink
stream: fix deadlock when cloning webstreams
Browse files Browse the repository at this point in the history
  • Loading branch information
tsctx committed Dec 17, 2023
1 parent 1b60054 commit 37ddb34
Show file tree
Hide file tree
Showing 4 changed files with 71 additions and 14 deletions.
14 changes: 10 additions & 4 deletions lib/internal/webstreams/readablestream.js
Original file line number Diff line number Diff line change
Expand Up @@ -589,18 +589,24 @@ class ReadableStream {
'DataCloneError');
}

const { port1, port2 } = this[kState].transfer;

this[kState].transfer.port2 = undefined;

const transfer = lazyTransfer();
const {
writable,
source,
promise,
} = lazyTransfer().newCrossRealmWritableSink(
this,
this[kState].transfer.port1);
} = transfer.newCrossRealmWritableSink(this, port1);

transfer.closeRegistry().register(port2, source);

this[kState].transfer.writable = writable;
this[kState].transfer.promise = promise;

return {
data: { port: this[kState].transfer.port2 },
data: { port: port2 },
deserializeInfo:
'internal/webstreams/readablestream:TransferredReadableStream',
};
Expand Down
24 changes: 18 additions & 6 deletions lib/internal/webstreams/transfer.js
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ const {
ObjectDefineProperties,
PromiseResolve,
ReflectConstruct,
SafeFinalizationRegistry,
} = primordials;

const {
Expand Down Expand Up @@ -261,10 +262,18 @@ class CrossRealmTransformWritableSink {
}
}

let finalizer = null;

function closeRegistry() {
return (finalizer ??= new SafeFinalizationRegistry((source) => {
source[kState].port.close();
}));
}

function newCrossRealmReadableStream(writable, port) {
const readable =
new ReadableStream(
new CrossRealmTransformReadableSource(port));
const source = new CrossRealmTransformReadableSource(port);

const readable = new ReadableStream(source);

const promise =
readableStreamPipeTo(readable, writable, false, false, false);
Expand All @@ -273,19 +282,21 @@ function newCrossRealmReadableStream(writable, port) {

return {
readable,
source,
promise,
};
}

function newCrossRealmWritableSink(readable, port) {
const writable =
new WritableStream(
new CrossRealmTransformWritableSink(port));
const source = new CrossRealmTransformWritableSink(port);

const writable = new WritableStream(source);

const promise = readableStreamPipeTo(readable, writable, false, false, false);
setPromiseHandled(promise);
return {
writable,
source,
promise,
};
}
Expand All @@ -297,4 +308,5 @@ module.exports = {
CrossRealmTransformReadableSource,
CloneableDOMException,
InternalCloneableDOMException,
closeRegistry,
};
15 changes: 11 additions & 4 deletions lib/internal/webstreams/writablestream.js
Original file line number Diff line number Diff line change
Expand Up @@ -282,20 +282,27 @@ class WritableStream {
'DataCloneError');
}

const { port1, port2 } = this[kState].transfer;

this[kState].transfer.port2 = undefined;

const transfer = lazyTransfer();
const {
readable,
source,
promise,
} = lazyTransfer().newCrossRealmReadableStream(
} = transfer.newCrossRealmReadableStream(
this,
this[kState].transfer.port1);
port1);

transfer.closeRegistry().register(port2, source);

this[kState].transfer.readable = readable;
this[kState].transfer.promise = promise;

setPromiseHandled(this[kState].transfer.promise);

return {
data: { port: this[kState].transfer.port2 },
data: { port: port2 },
deserializeInfo:
'internal/webstreams/writablestream:TransferredWritableStream',
};
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
'use strict';

const common = require('../common');
const { Worker } = require('worker_threads');
const { once } = require('node:events');

const worker = new Worker(
`
const { ReadableStream } = require('stream/web');
const rs = new ReadableStream();
const cloned = structuredClone(rs, { transfer: [rs] });
`,
{ eval: true },
);

const worker2 = new Worker(
`
const { WritableStream } = require('stream/web');
const ws = new WritableStream();
const cloned = structuredClone(ws, { transfer: [ws] });
`,
{ eval: true },
);

(async () => {
// A timer is used here to detect the end of a process.
const timer = setTimeout(common.mustNotCall(), common.platformTimeout(10000));

await Promise.all([once(worker, 'exit'), once(worker2, 'exit')]);

clearTimeout(timer);
})().then(common.mustCall());

0 comments on commit 37ddb34

Please sign in to comment.