From 573f5dfd8a05a2f6a4c9003b9b49302fa0442f30 Mon Sep 17 00:00:00 2001 From: Mattias Buelens Date: Wed, 1 May 2024 12:28:29 +0200 Subject: [PATCH] stream: fixes for webstreams PR-URL: https://github.com/nodejs/node/pull/51168 Backport-PR-URL: https://github.com/nodejs/node/pull/52773 Reviewed-By: Matteo Collina --- lib/internal/webstreams/readablestream.js | 178 ++++++++++---------- lib/internal/webstreams/transformstream.js | 140 +++++++--------- lib/internal/webstreams/util.js | 25 ++- lib/internal/webstreams/writablestream.js | 179 +++++++++++---------- test/wpt/status/streams.json | 7 - 5 files changed, 249 insertions(+), 280 deletions(-) diff --git a/lib/internal/webstreams/readablestream.js b/lib/internal/webstreams/readablestream.js index abd896abeaa980..28b97208922ed4 100644 --- a/lib/internal/webstreams/readablestream.js +++ b/lib/internal/webstreams/readablestream.js @@ -11,7 +11,6 @@ const { FunctionPrototypeCall, MathMin, NumberIsInteger, - ObjectCreate, ObjectDefineProperties, ObjectSetPrototypeOf, Promise, @@ -96,9 +95,9 @@ const { AsyncIterator, cloneAsUint8Array, copyArrayBuffer, + createPromiseCallback, customInspect, dequeueValue, - ensureIsPromise, enqueueValueWithSize, extractHighWaterMark, extractSizeAlgorithm, @@ -251,19 +250,7 @@ class ReadableStream { constructor(source = {}, strategy = kEmptyObject) { if (source === null) throw new ERR_INVALID_ARG_VALUE('source', 'Object', source); - this[kState] = { - disturbed: false, - reader: undefined, - state: 'readable', - storedError: undefined, - stream: undefined, - transfer: { - writable: undefined, - port1: undefined, - port2: undefined, - promise: undefined, - }, - }; + this[kState] = createReadableStreamState(); this[kIsClosedPromise] = createDeferredPromise(); this[kControllerErrorFunction] = () => {}; @@ -655,17 +642,7 @@ function TransferredReadableStream() { return makeTransferable(ReflectConstruct( function() { this[kType] = 'ReadableStream'; - this[kState] = { - disturbed: false, - state: 'readable', - storedError: undefined, - stream: undefined, - transfer: { - writable: undefined, - port: undefined, - promise: undefined, - }, - }; + this[kState] = createReadableStreamState(); this[kIsClosedPromise] = createDeferredPromise(); }, [], ReadableStream)); @@ -1223,43 +1200,58 @@ ObjectDefineProperties(ReadableByteStreamController.prototype, { [SymbolToStringTag]: getNonWritablePropertyDescriptor(ReadableByteStreamController.name), }); -function TeeReadableStream(start, pull, cancel) { +function InternalReadableStream(start, pull, cancel, highWaterMark, size) { this[kType] = 'ReadableStream'; - this[kState] = { - disturbed: false, - state: 'readable', - storedError: undefined, - stream: undefined, - transfer: { - writable: undefined, - port: undefined, - promise: undefined, - }, - }; + this[kState] = createReadableStreamState(); this[kIsClosedPromise] = createDeferredPromise(); - setupReadableStreamDefaultControllerFromSource( + const controller = new ReadableStreamDefaultController(kSkipThrow); + setupReadableStreamDefaultController( this, - ObjectCreate(null, { - start: { __proto__: null, value: start }, - pull: { __proto__: null, value: pull }, - cancel: { __proto__: null, value: cancel }, - }), - 1, - () => 1); + controller, + start, + pull, + cancel, + highWaterMark, + size); + return makeTransferable(this); +} +ObjectSetPrototypeOf(InternalReadableStream.prototype, ReadableStream.prototype); +ObjectSetPrototypeOf(InternalReadableStream, ReadableStream); + +function createReadableStream(start, pull, cancel, highWaterMark = 1, size = () => 1) { + const stream = new InternalReadableStream(start, pull, cancel, highWaterMark, size); + + // For spec compliance the InternalReadableStream must be a ReadableStream + stream.constructor = ReadableStream; + return stream; +} +function InternalReadableByteStream(start, pull, cancel) { + this[kType] = 'ReadableStream'; + this[kState] = createReadableStreamState(); + this[kIsClosedPromise] = createDeferredPromise(); + const controller = new ReadableByteStreamController(kSkipThrow); + setupReadableByteStreamController( + this, + controller, + start, + pull, + cancel, + 0, + undefined); return makeTransferable(this); } -ObjectSetPrototypeOf(TeeReadableStream.prototype, ReadableStream.prototype); -ObjectSetPrototypeOf(TeeReadableStream, ReadableStream); +ObjectSetPrototypeOf(InternalReadableByteStream.prototype, ReadableStream.prototype); +ObjectSetPrototypeOf(InternalReadableByteStream, ReadableStream); -function createTeeReadableStream(start, pull, cancel) { - const tee = new TeeReadableStream(start, pull, cancel); +function createReadableByteStream(start, pull, cancel) { + const stream = new InternalReadableByteStream(start, pull, cancel); - // For spec compliance the Tee must be a ReadableStream - tee.constructor = ReadableStream; - return tee; + // For spec compliance the InternalReadableByteStream must be a ReadableStream + stream.constructor = ReadableStream; + return stream; } const isReadableStream = @@ -1275,6 +1267,23 @@ const isReadableStreamBYOBReader = // ---- ReadableStream Implementation +function createReadableStreamState() { + return { + __proto__: null, + disturbed: false, + reader: undefined, + state: 'readable', + storedError: undefined, + transfer: { + __proto__: null, + writable: undefined, + port1: undefined, + port2: undefined, + promise: undefined, + }, + }; +} + function readableStreamFromIterable(iterable) { let stream; const iteratorRecord = getIterator(iterable, 'async'); @@ -1314,16 +1323,12 @@ function readableStreamFromIterable(iterable) { }); } - stream = new ReadableStream({ - start: startAlgorithm, - pull: pullAlgorithm, - cancel: cancelAlgorithm, - }, { - size() { - return 1; - }, - highWaterMark: 0, - }); + stream = createReadableStream( + startAlgorithm, + pullAlgorithm, + cancelAlgorithm, + 0, + ); return stream; } @@ -1649,9 +1654,9 @@ function readableStreamDefaultTee(stream, cloneForBranch2) { } branch1 = - createTeeReadableStream(nonOpStart, pullAlgorithm, cancel1Algorithm); + createReadableStream(nonOpStart, pullAlgorithm, cancel1Algorithm); branch2 = - createTeeReadableStream(nonOpStart, pullAlgorithm, cancel2Algorithm); + createReadableStream(nonOpStart, pullAlgorithm, cancel2Algorithm); PromisePrototypeThen( reader[kState].close.promise, @@ -1928,16 +1933,10 @@ function readableByteStreamTee(stream) { return cancelDeferred.promise; } - branch1 = new ReadableStream({ - type: 'bytes', - pull: pull1Algorithm, - cancel: cancel1Algorithm, - }); - branch2 = new ReadableStream({ - type: 'bytes', - pull: pull2Algorithm, - cancel: cancel2Algorithm, - }); + branch1 = + createReadableByteStream(nonOpStart, pull1Algorithm, cancel1Algorithm); + branch2 = + createReadableByteStream(nonOpStart, pull2Algorithm, cancel2Algorithm); forwardReaderError(reader); @@ -1988,10 +1987,7 @@ function readableStreamCancel(stream, reason) { } return PromisePrototypeThen( - ensureIsPromise( - stream[kState].controller[kCancel], - stream[kState].controller, - reason), + stream[kState].controller[kCancel](reason), () => {}); } @@ -2356,7 +2352,7 @@ function readableStreamDefaultControllerCallPullIfNeeded(controller) { assert(!controller[kState].pullAgain); controller[kState].pulling = true; PromisePrototypeThen( - ensureIsPromise(controller[kState].pullAlgorithm, controller), + controller[kState].pullAlgorithm(controller), () => { controller[kState].pulling = false; if (controller[kState].pullAgain) { @@ -2386,12 +2382,9 @@ function readableStreamDefaultControllerError(controller, error) { function readableStreamDefaultControllerCancelSteps(controller, reason) { resetQueue(controller); - try { - const result = controller[kState].cancelAlgorithm(reason); - return result; - } finally { - readableStreamDefaultControllerClearAlgorithms(controller); - } + const result = controller[kState].cancelAlgorithm(reason); + readableStreamDefaultControllerClearAlgorithms(controller); + return result; } function readableStreamDefaultControllerPullSteps(controller, readRequest) { @@ -2465,11 +2458,10 @@ function setupReadableStreamDefaultControllerFromSource( FunctionPrototypeBind(start, source, controller) : nonOpStart; const pullAlgorithm = pull ? - FunctionPrototypeBind(pull, source, controller) : + createPromiseCallback('source.pull', pull, source) : nonOpPull; - const cancelAlgorithm = cancel ? - FunctionPrototypeBind(cancel, source) : + createPromiseCallback('source.cancel', cancel, source) : nonOpCancel; setupReadableStreamDefaultController( @@ -3097,7 +3089,7 @@ function readableByteStreamControllerCallPullIfNeeded(controller) { assert(!controller[kState].pullAgain); controller[kState].pulling = true; PromisePrototypeThen( - ensureIsPromise(controller[kState].pullAlgorithm, controller), + controller[kState].pullAlgorithm(controller), () => { controller[kState].pulling = false; if (controller[kState].pullAgain) { @@ -3264,10 +3256,10 @@ function setupReadableByteStreamControllerFromSource( FunctionPrototypeBind(start, source, controller) : nonOpStart; const pullAlgorithm = pull ? - FunctionPrototypeBind(pull, source, controller) : + createPromiseCallback('source.pull', pull, source, controller) : nonOpPull; const cancelAlgorithm = cancel ? - FunctionPrototypeBind(cancel, source) : + createPromiseCallback('source.cancel', cancel, source) : nonOpCancel; if (autoAllocateChunkSize === 0) { @@ -3364,4 +3356,6 @@ module.exports = { readableByteStreamControllerPullSteps, setupReadableByteStreamController, setupReadableByteStreamControllerFromSource, + createReadableStream, + createReadableByteStream, }; diff --git a/lib/internal/webstreams/transformstream.js b/lib/internal/webstreams/transformstream.js index 5e780705311ac2..486faf884741aa 100644 --- a/lib/internal/webstreams/transformstream.js +++ b/lib/internal/webstreams/transformstream.js @@ -1,7 +1,6 @@ 'use strict'; const { - FunctionPrototypeBind, FunctionPrototypeCall, ObjectDefineProperties, PromisePrototypeThen, @@ -38,8 +37,8 @@ const { } = require('internal/worker/js_transferable'); const { + createPromiseCallback, customInspect, - ensureIsPromise, extractHighWaterMark, extractSizeAlgorithm, isBrandCheck, @@ -50,7 +49,7 @@ const { } = require('internal/webstreams/util'); const { - ReadableStream, + createReadableStream, readableStreamDefaultControllerCanCloseOrEnqueue, readableStreamDefaultControllerClose, readableStreamDefaultControllerEnqueue, @@ -60,7 +59,7 @@ const { } = require('internal/webstreams/readablestream'); const { - WritableStream, + createWritableStream, writableStreamDefaultControllerErrorIfNeeded, } = require('internal/webstreams/writablestream'); @@ -254,10 +253,12 @@ function TransferredTransformStream() { function() { this[kType] = 'TransformStream'; this[kState] = { + __proto__: null, readable: undefined, writable: undefined, backpressure: undefined, backpressureChange: { + __proto__: null, promise: undefined, resolve: undefined, reject: undefined, @@ -353,43 +354,33 @@ function initializeTransformStream( readableHighWaterMark, readableSizeAlgorithm) { - const writable = new WritableStream({ - __proto__: null, - start() { return startPromise.promise; }, - write(chunk) { - return transformStreamDefaultSinkWriteAlgorithm(stream, chunk); - }, - abort(reason) { - return transformStreamDefaultSinkAbortAlgorithm(stream, reason); - }, - close() { - return transformStreamDefaultSinkCloseAlgorithm(stream); - }, - }, { - highWaterMark: writableHighWaterMark, - size: writableSizeAlgorithm, - }); + const startAlgorithm = () => startPromise.promise; - const readable = new ReadableStream({ - __proto__: null, - start() { return startPromise.promise; }, - pull() { - return transformStreamDefaultSourcePullAlgorithm(stream); - }, - cancel(reason) { - return transformStreamDefaultSourceCancelAlgorithm(stream, reason); - }, - }, { - highWaterMark: readableHighWaterMark, - size: readableSizeAlgorithm, - }); + const writable = createWritableStream( + startAlgorithm, + (chunk) => transformStreamDefaultSinkWriteAlgorithm(stream, chunk), + () => transformStreamDefaultSinkCloseAlgorithm(stream), + (reason) => transformStreamDefaultSinkAbortAlgorithm(stream, reason), + writableHighWaterMark, + writableSizeAlgorithm, + ); + + const readable = createReadableStream( + startAlgorithm, + () => transformStreamDefaultSourcePullAlgorithm(stream), + (reason) => transformStreamDefaultSourceCancelAlgorithm(stream, reason), + readableHighWaterMark, + readableSizeAlgorithm, + ); stream[kState] = { + __proto__: null, readable, writable, controller: undefined, backpressure: undefined, backpressureChange: { + __proto__: null, promise: undefined, resolve: undefined, reject: undefined, @@ -444,6 +435,7 @@ function setupTransformStreamDefaultController( assert(isTransformStream(stream)); assert(stream[kState].controller === undefined); controller[kState] = { + __proto__: null, stream, transformAlgorithm, flushAlgorithm, @@ -456,15 +448,18 @@ function setupTransformStreamDefaultControllerFromTransformer( stream, transformer) { const controller = new TransformStreamDefaultController(kSkipThrow); - const transform = transformer?.transform || defaultTransformAlgorithm; - const flush = transformer?.flush || nonOpFlush; - const cancel = transformer?.cancel || nonOpCancel; - const transformAlgorithm = - FunctionPrototypeBind(transform, transformer); - const flushAlgorithm = - FunctionPrototypeBind(flush, transformer); - const cancelAlgorithm = - FunctionPrototypeBind(cancel, transformer); + const transform = transformer?.transform; + const flush = transformer?.flush; + const cancel = transformer?.cancel; + const transformAlgorithm = transform ? + createPromiseCallback('transformer.transform', transform, transformer) : + defaultTransformAlgorithm; + const flushAlgorithm = flush ? + createPromiseCallback('transformer.flush', flush, transformer) : + nonOpFlush; + const cancelAlgorithm = cancel ? + createPromiseCallback('transformer.cancel', cancel, transformer) : + nonOpCancel; setupTransformStreamDefaultController( stream, @@ -512,11 +507,7 @@ function transformStreamDefaultControllerError(controller, error) { async function transformStreamDefaultControllerPerformTransform(controller, chunk) { try { - return await ensureIsPromise( - controller[kState].transformAlgorithm, - controller, - chunk, - controller); + return await controller[kState].transformAlgorithm(chunk, controller); } catch (error) { transformStreamError(controller[kState].stream, error); throw error; @@ -577,10 +568,7 @@ async function transformStreamDefaultSinkAbortAlgorithm(stream, reason) { const { promise, resolve, reject } = createDeferredPromise(); controller[kState].finishPromise = promise; - const cancelPromise = ensureIsPromise( - controller[kState].cancelAlgorithm, - controller, - reason); + const cancelPromise = controller[kState].cancelAlgorithm(reason); transformStreamDefaultControllerClearAlgorithms(controller); PromisePrototypeThen( @@ -613,11 +601,7 @@ function transformStreamDefaultSinkCloseAlgorithm(stream) { } const { promise, resolve, reject } = createDeferredPromise(); controller[kState].finishPromise = promise; - const flushPromise = - ensureIsPromise( - controller[kState].flushAlgorithm, - controller, - controller); + const flushPromise = controller[kState].flushAlgorithm(controller); transformStreamDefaultControllerClearAlgorithms(controller); PromisePrototypeThen( flushPromise, @@ -655,31 +639,29 @@ function transformStreamDefaultSourceCancelAlgorithm(stream, reason) { const { promise, resolve, reject } = createDeferredPromise(); controller[kState].finishPromise = promise; - const cancelPromise = ensureIsPromise( - controller[kState].cancelAlgorithm, - controller, - reason); + const cancelPromise = controller[kState].cancelAlgorithm(reason); transformStreamDefaultControllerClearAlgorithms(controller); - PromisePrototypeThen(cancelPromise, - () => { - if (writable[kState].state === 'errored') - reject(writable[kState].storedError); - else { - writableStreamDefaultControllerErrorIfNeeded( - writable[kState].controller, - reason); - transformStreamUnblockWrite(stream); - resolve(); - } - }, - (error) => { - writableStreamDefaultControllerErrorIfNeeded( - writable[kState].controller, - error); - transformStreamUnblockWrite(stream); - reject(error); - }, + PromisePrototypeThen( + cancelPromise, + () => { + if (writable[kState].state === 'errored') + reject(writable[kState].storedError); + else { + writableStreamDefaultControllerErrorIfNeeded( + writable[kState].controller, + reason); + transformStreamUnblockWrite(stream); + resolve(); + } + }, + (error) => { + writableStreamDefaultControllerErrorIfNeeded( + writable[kState].controller, + error); + transformStreamUnblockWrite(stream); + reject(error); + }, ); return controller[kState].finishPromise; diff --git a/lib/internal/webstreams/util.js b/lib/internal/webstreams/util.js index 1979c55667b167..e862b3ffe25724 100644 --- a/lib/internal/webstreams/util.js +++ b/lib/internal/webstreams/util.js @@ -9,8 +9,6 @@ const { MathMax, NumberIsNaN, PromisePrototypeThen, - PromiseResolve, - PromiseReject, ReflectGet, Symbol, SymbolAsyncIterator, @@ -31,10 +29,6 @@ const { detachArrayBuffer, } = internalBinding('buffer'); -const { - isPromise, -} = require('internal/util/types'); - const { inspect, } = require('util'); @@ -180,13 +174,15 @@ function enqueueValueWithSize(controller, value, size) { controller[kState].queueTotalSize += size; } -function ensureIsPromise(fn, thisArg, ...args) { - try { - const value = FunctionPrototypeCall(fn, thisArg, ...args); - return isPromise(value) ? value : PromiseResolve(value); - } catch (error) { - return PromiseReject(error); - } +// This implements "invoke a callback function type" for callback functions that return a promise. +// See https://webidl.spec.whatwg.org/#es-invoking-callback-functions +async function invokePromiseCallback(fn, thisArg, ...args) { + return FunctionPrototypeCall(fn, thisArg, ...args); +} + +function createPromiseCallback(name, fn, thisArg) { + validateFunction(fn, name); + return (...args) => invokePromiseCallback(fn, thisArg, ...args); } function isPromisePending(promise) { @@ -273,15 +269,16 @@ module.exports = { ArrayBufferViewGetByteLength, ArrayBufferViewGetByteOffset, AsyncIterator, + createPromiseCallback, cloneAsUint8Array, copyArrayBuffer, customInspect, dequeueValue, - ensureIsPromise, enqueueValueWithSize, extractHighWaterMark, extractSizeAlgorithm, lazyTransfer, + invokePromiseCallback, isBrandCheck, isPromisePending, isViewedArrayBufferDetached, diff --git a/lib/internal/webstreams/writablestream.js b/lib/internal/webstreams/writablestream.js index 0e807982ee7471..82129a8586da3e 100644 --- a/lib/internal/webstreams/writablestream.js +++ b/lib/internal/webstreams/writablestream.js @@ -6,6 +6,7 @@ const { FunctionPrototypeBind, FunctionPrototypeCall, ObjectDefineProperties, + ObjectSetPrototypeOf, Promise, PromisePrototypeThen, PromiseResolve, @@ -49,9 +50,9 @@ const { } = require('internal/worker/js_transferable'); const { + createPromiseCallback, customInspect, dequeueValue, - ensureIsPromise, enqueueValueWithSize, extractHighWaterMark, extractSizeAlgorithm, @@ -159,45 +160,7 @@ class WritableStream { if (type !== undefined) throw new ERR_INVALID_ARG_VALUE.RangeError('type', type); - this[kState] = { - close: createDeferredPromise(), - closeRequest: { - promise: undefined, - resolve: undefined, - reject: undefined, - }, - inFlightWriteRequest: { - promise: undefined, - resolve: undefined, - reject: undefined, - }, - inFlightCloseRequest: { - promise: undefined, - resolve: undefined, - reject: undefined, - }, - pendingAbortRequest: { - abort: { - promise: undefined, - resolve: undefined, - reject: undefined, - }, - reason: undefined, - wasAlreadyErroring: false, - }, - backpressure: false, - controller: undefined, - state: 'writable', - storedError: undefined, - writeRequests: [], - writer: undefined, - transfer: { - readable: undefined, - port1: undefined, - port2: undefined, - promise: undefined, - }, - }; + this[kState] = createWritableStreamState(); this[kIsClosedPromise] = createDeferredPromise(); this[kControllerErrorFunction] = () => {}; @@ -335,45 +298,7 @@ function TransferredWritableStream() { return makeTransferable(ReflectConstruct( function() { this[kType] = 'WritableStream'; - this[kState] = { - close: createDeferredPromise(), - closeRequest: { - promise: undefined, - resolve: undefined, - reject: undefined, - }, - inFlightWriteRequest: { - promise: undefined, - resolve: undefined, - reject: undefined, - }, - inFlightCloseRequest: { - promise: undefined, - resolve: undefined, - reject: undefined, - }, - pendingAbortRequest: { - abort: { - promise: undefined, - resolve: undefined, - reject: undefined, - }, - reason: undefined, - wasAlreadyErroring: false, - }, - backpressure: false, - controller: undefined, - state: 'writable', - storedError: undefined, - writeRequests: [], - writer: undefined, - transfer: { - promise: undefined, - port1: undefined, - port2: undefined, - readable: undefined, - }, - }; + this[kState] = createWritableStreamState(); this[kIsClosedPromise] = createDeferredPromise(); this[kControllerErrorFunction] = () => {}; }, @@ -576,6 +501,35 @@ ObjectDefineProperties(WritableStreamDefaultController.prototype, { [SymbolToStringTag]: getNonWritablePropertyDescriptor(WritableStreamDefaultController.name), }); +function InternalWritableStream(start, write, close, abort, highWaterMark, size) { + this[kType] = 'WritableStream'; + this[kState] = createWritableStreamState(); + this[kIsClosedPromise] = createDeferredPromise(); + const controller = new WritableStreamDefaultController(kSkipThrow); + setupWritableStreamDefaultController( + this, + controller, + start, + write, + close, + abort, + highWaterMark, + size, + ); + return makeTransferable(this); +} + +ObjectSetPrototypeOf(InternalWritableStream.prototype, WritableStream.prototype); +ObjectSetPrototypeOf(InternalWritableStream, WritableStream); + +function createWritableStream(start, write, close, abort, highWaterMark = 1, size = () => 1) { + const stream = new InternalWritableStream(start, write, close, abort, highWaterMark, size); + + // For spec compliance the InternalWritableStream must be a WritableStream + stream.constructor = WritableStream; + return stream; +} + const isWritableStream = isBrandCheck('WritableStream'); const isWritableStreamDefaultWriter = @@ -583,6 +537,55 @@ const isWritableStreamDefaultWriter = const isWritableStreamDefaultController = isBrandCheck('WritableStreamDefaultController'); +function createWritableStreamState() { + return { + __proto__: null, + close: createDeferredPromise(), + closeRequest: { + __proto__: null, + promise: undefined, + resolve: undefined, + reject: undefined, + }, + inFlightWriteRequest: { + __proto__: null, + promise: undefined, + resolve: undefined, + reject: undefined, + }, + inFlightCloseRequest: { + __proto__: null, + promise: undefined, + resolve: undefined, + reject: undefined, + }, + pendingAbortRequest: { + __proto__: null, + abort: { + __proto__: null, + promise: undefined, + resolve: undefined, + reject: undefined, + }, + reason: undefined, + wasAlreadyErroring: false, + }, + backpressure: false, + controller: undefined, + state: 'writable', + storedError: undefined, + writeRequests: [], + writer: undefined, + transfer: { + __proto__: null, + readable: undefined, + port1: undefined, + port2: undefined, + promise: undefined, + }, + }; +} + function isWritableStreamLocked(stream) { return stream[kState].writer !== undefined; } @@ -902,10 +905,7 @@ function writableStreamFinishErroring(stream) { return; } PromisePrototypeThen( - ensureIsPromise( - stream[kState].controller[kAbort], - stream[kState].controller, - abortRequest.reason), + stream[kState].controller[kAbort](abortRequest.reason), () => { abortRequest.abort.resolve?.(); writableStreamRejectCloseAndClosedPromiseIfNeeded(stream); @@ -1109,7 +1109,7 @@ function writableStreamDefaultControllerProcessWrite(controller, chunk) { writableStreamMarkFirstWriteRequestInFlight(stream); PromisePrototypeThen( - ensureIsPromise(writeAlgorithm, controller, chunk, controller), + writeAlgorithm(chunk, controller), () => { writableStreamFinishInFlightWrite(stream); const { @@ -1142,7 +1142,7 @@ function writableStreamDefaultControllerProcessClose(controller) { writableStreamMarkCloseRequestInFlight(stream); dequeueValue(controller); assert(!queue.length); - const sinkClosePromise = ensureIsPromise(closeAlgorithm, controller); + const sinkClosePromise = closeAlgorithm(); writableStreamDefaultControllerClearAlgorithms(controller); PromisePrototypeThen( sinkClosePromise, @@ -1241,12 +1241,14 @@ function setupWritableStreamDefaultControllerFromSink( FunctionPrototypeBind(start, sink, controller) : nonOpStart; const writeAlgorithm = write ? - FunctionPrototypeBind(write, sink) : + createPromiseCallback('sink.write', write, sink) : nonOpWrite; const closeAlgorithm = close ? - FunctionPrototypeBind(close, sink) : nonOpCancel; + createPromiseCallback('sink.close', close, sink) : + nonOpCancel; const abortAlgorithm = abort ? - FunctionPrototypeBind(abort, sink) : nonOpCancel; + createPromiseCallback('sink.abort', abort, sink) : + nonOpCancel; setupWritableStreamDefaultController( stream, controller, @@ -1356,4 +1358,5 @@ module.exports = { writableStreamDefaultControllerAdvanceQueueIfNeeded, setupWritableStreamDefaultControllerFromSink, setupWritableStreamDefaultController, + createWritableStream, }; diff --git a/test/wpt/status/streams.json b/test/wpt/status/streams.json index 8d6a4c6d2fe27b..3b6e0ce6429f9d 100644 --- a/test/wpt/status/streams.json +++ b/test/wpt/status/streams.json @@ -59,12 +59,5 @@ }, "readable-streams/read-task-handling.window.js": { "skip": "Browser-specific test" - }, - "transform-streams/cancel.any.js": { - "fail": { - "expected": [ - "readable.cancel() and a parallel writable.close() should reject if a transformer.cancel() calls controller.error()" - ] - } } }