Skip to content

Commit

Permalink
Various fixes for readable byte streams
Browse files Browse the repository at this point in the history
* respondWithNewView(newView) can now be called with a newView that is *smaller than* the BYOB request's view. This aligns it with respond(bytesWritten), which also allows bytesWritten <= byobRequest.view.byteLength.

* respondWithNewView() must now be called with an empty view when the stream is closed. This aligns it with respond(bytesWritten), which requires bytesWritten to be 0 when closed.

* respondWithNewView(newView) must now be called with a view whose view.buffer.byteLength matches that of the BYOB request. Ideally, we would like to assert that the new view's buffer is the "transferred version" of the BYOB request's buffer, but that's not possible yet with the tools currently provided by the ECMAScript specification.

* enqueue(chunk) and respondWithNewView(newView) now check that the given view's buffer is actually *transferable*. Previously, we only checked whether the buffer is *not yet detached*, but this is insufficient: a WebAssembly.Memory's buffer is *never* transferable. We also make sure to not transfer the given buffer until *after* we've checked all other preconditions, so the buffer is still intact if these methods were to throw an error.

* enqueue() and respond() now check that the BYOB request's view has *not* been transferred, since otherwise it's not possible to copy bytes into its buffer and/or transfer the buffer when committing.

* enqueue(), respond(), and respondWithNewView() immediately invalidate the BYOB request. Previously, we only did this if we actually filled the first pull-into descriptor, which doesn't *always* happen. (For example: if the pull-into descriptor's element size is 4, but we only have filled 1 or 2 bytes.)

* We now always transfer the pull-into descriptor's buffer when committing it (to fulfill a read request or read-into request). This is mainly a sanity check: the stream should never use this buffer after it has been committed.
  • Loading branch information
MattiasBuelens authored May 26, 2021
1 parent 0ff6d45 commit 033c6d9
Show file tree
Hide file tree
Showing 8 changed files with 198 additions and 66 deletions.
136 changes: 101 additions & 35 deletions index.bs

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,7 @@ exports.implementation = class ReadableByteStreamControllerImpl {

const pullIntoDescriptor = {
buffer,
bufferByteLength: autoAllocateChunkSize,
byteOffset: 0,
byteLength: autoAllocateChunkSize,
bytesFilled: 0,
Expand Down
4 changes: 4 additions & 0 deletions reference-implementation/lib/ReadableStreamBYOBReader-impl.js
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
'use strict';

const { newPromise, resolvePromise, rejectPromise, promiseRejectedWith } = require('./helpers/webidl.js');
const { IsDetachedBuffer } = require('./abstract-ops/ecmascript.js');
const aos = require('./abstract-ops/readable-streams.js');
const { mixin } = require('./helpers/miscellaneous.js');
const ReadableStreamGenericReaderImpl = require('./ReadableStreamGenericReader-impl.js').implementation;
Expand All @@ -17,6 +18,9 @@ class ReadableStreamBYOBReaderImpl {
if (view.buffer.byteLength === 0) {
return promiseRejectedWith(new TypeError('view\'s buffer must have non-zero byteLength'));
}
if (IsDetachedBuffer(view.buffer) === true) {
return promiseRejectedWith(new TypeError('view\'s buffer has been detached'));
}

if (this._stream === undefined) {
return promiseRejectedWith(readerLockException('read'));
Expand Down
11 changes: 4 additions & 7 deletions reference-implementation/lib/ReadableStreamBYOBRequest-impl.js
Original file line number Diff line number Diff line change
Expand Up @@ -25,17 +25,14 @@ exports.implementation = class ReadableStreamBYOBRequestImpl {
}

respondWithNewView(view) {
if (view.byteLength === 0) {
throw new TypeError('chunk must have non-zero byteLength');
}
if (view.buffer.byteLength === 0) {
throw new TypeError('chunk\'s buffer must have non-zero byteLength');
}

if (this._controller === undefined) {
throw new TypeError('This BYOB request has been invalidated');
}

if (IsDetachedBuffer(view.buffer) === true) {
throw new TypeError('The given view\'s buffer has been detached and so cannot be used as a response');
}

aos.ReadableByteStreamControllerRespondWithNewView(this._controller, view);
}
};
5 changes: 5 additions & 0 deletions reference-implementation/lib/abstract-ops/ecmascript.js
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,11 @@ exports.TransferArrayBuffer = O => {
return transferredIshVersion;
};

// Not implemented correctly
exports.CanTransferArrayBuffer = O => {
return !exports.IsDetachedBuffer(O);
};

// Not implemented correctly
exports.IsDetachedBuffer = O => {
return isFakeDetached in O;
Expand Down
97 changes: 75 additions & 22 deletions reference-implementation/lib/abstract-ops/readable-streams.js
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,8 @@ const assert = require('assert');
const { promiseResolvedWith, promiseRejectedWith, newPromise, resolvePromise, rejectPromise, uponPromise,
setPromiseIsHandledToTrue, waitForAllPromise, transformPromiseWith, uponFulfillment, uponRejection } =
require('../helpers/webidl.js');
const { CopyDataBlockBytes, CreateArrayFromList, TransferArrayBuffer } = require('./ecmascript.js');
const { CanTransferArrayBuffer, CopyDataBlockBytes, CreateArrayFromList, IsDetachedBuffer, TransferArrayBuffer } =
require('./ecmascript.js');
const { IsNonNegativeNumber } = require('./miscellaneous.js');
const { EnqueueValueWithSize, ResetQueue } = require('./queue-with-sizes.js');
const { AcquireWritableStreamDefaultWriter, IsWritableStreamLocked, WritableStreamAbort,
Expand Down Expand Up @@ -983,8 +984,8 @@ function ReadableByteStreamControllerConvertPullIntoDescriptor(pullIntoDescripto
assert(bytesFilled <= pullIntoDescriptor.byteLength);
assert(bytesFilled % elementSize === 0);

return new pullIntoDescriptor.viewConstructor(
pullIntoDescriptor.buffer, pullIntoDescriptor.byteOffset, bytesFilled / elementSize);
const buffer = TransferArrayBuffer(pullIntoDescriptor.buffer);
return new pullIntoDescriptor.viewConstructor(buffer, pullIntoDescriptor.byteOffset, bytesFilled / elementSize);
}

function ReadableByteStreamControllerEnqueue(controller, chunk) {
Expand All @@ -997,8 +998,23 @@ function ReadableByteStreamControllerEnqueue(controller, chunk) {
const buffer = chunk.buffer;
const byteOffset = chunk.byteOffset;
const byteLength = chunk.byteLength;
if (IsDetachedBuffer(buffer) === true) {
throw new TypeError('chunk\'s buffer is detached and so cannot be enqueued');
}
const transferredBuffer = TransferArrayBuffer(buffer);

if (controller._pendingPullIntos.length > 0) {
const firstPendingPullInto = controller._pendingPullIntos[0];
if (IsDetachedBuffer(firstPendingPullInto.buffer) === true) {
throw new TypeError(
'The BYOB request\'s buffer has been detached and so cannot be filled with an enqueued chunk'
);
}
firstPendingPullInto.buffer = TransferArrayBuffer(firstPendingPullInto.buffer);
}

ReadableByteStreamControllerInvalidateBYOBRequest(controller);

if (ReadableStreamHasDefaultReader(stream) === true) {
if (ReadableStreamGetNumReadRequests(stream) === 0) {
ReadableByteStreamControllerEnqueueChunkToQueue(controller, transferredBuffer, byteOffset, byteLength);
Expand Down Expand Up @@ -1041,8 +1057,7 @@ function ReadableByteStreamControllerError(controller, e) {

function ReadableByteStreamControllerFillHeadPullIntoDescriptor(controller, size, pullIntoDescriptor) {
assert(controller._pendingPullIntos.length === 0 || controller._pendingPullIntos[0] === pullIntoDescriptor);

ReadableByteStreamControllerInvalidateBYOBRequest(controller);
assert(controller._byobRequest === null);
pullIntoDescriptor.bytesFilled += size;
}

Expand Down Expand Up @@ -1160,9 +1175,17 @@ function ReadableByteStreamControllerPullInto(controller, view, readIntoRequest)

const ctor = view.constructor;

const buffer = TransferArrayBuffer(view.buffer);
let buffer;
try {
buffer = TransferArrayBuffer(view.buffer);
} catch (e) {
readIntoRequest.errorSteps(e);
return;
}

const pullIntoDescriptor = {
buffer,
bufferByteLength: buffer.byteLength,
byteOffset: view.byteOffset,
byteLength: view.byteLength,
bytesFilled: 0,
Expand Down Expand Up @@ -1216,12 +1239,29 @@ function ReadableByteStreamControllerPullInto(controller, view, readIntoRequest)
function ReadableByteStreamControllerRespond(controller, bytesWritten) {
assert(controller._pendingPullIntos.length > 0);

const firstDescriptor = controller._pendingPullIntos[0];
const state = controller._stream._state;

if (state === 'closed') {
if (bytesWritten !== 0) {
throw new TypeError('bytesWritten must be 0 when calling respond() on a closed stream');
}
} else {
assert(state === 'readable');
if (bytesWritten === 0) {
throw new TypeError('bytesWritten must be greater than 0 when calling respond() on a readable stream');
}
if (firstDescriptor.bytesFilled + bytesWritten > firstDescriptor.byteLength) {
throw new RangeError('bytesWritten out of range');
}
}

firstDescriptor.buffer = TransferArrayBuffer(firstDescriptor.buffer);

ReadableByteStreamControllerRespondInternal(controller, bytesWritten);
}

function ReadableByteStreamControllerRespondInClosedState(controller, firstDescriptor) {
firstDescriptor.buffer = TransferArrayBuffer(firstDescriptor.buffer);

assert(firstDescriptor.bytesFilled === 0);

const stream = controller._stream;
Expand All @@ -1234,14 +1274,11 @@ function ReadableByteStreamControllerRespondInClosedState(controller, firstDescr
}

function ReadableByteStreamControllerRespondInReadableState(controller, bytesWritten, pullIntoDescriptor) {
if (pullIntoDescriptor.bytesFilled + bytesWritten > pullIntoDescriptor.byteLength) {
throw new RangeError('bytesWritten out of range');
}
assert(pullIntoDescriptor.bytesFilled + bytesWritten <= pullIntoDescriptor.byteLength);

ReadableByteStreamControllerFillHeadPullIntoDescriptor(controller, bytesWritten, pullIntoDescriptor);

if (pullIntoDescriptor.bytesFilled < pullIntoDescriptor.elementSize) {
// TODO: Figure out whether we should detach the buffer or not here.
return;
}

Expand All @@ -1254,7 +1291,6 @@ function ReadableByteStreamControllerRespondInReadableState(controller, bytesWri
ReadableByteStreamControllerEnqueueChunkToQueue(controller, remainder, 0, remainder.byteLength);
}

pullIntoDescriptor.buffer = TransferArrayBuffer(pullIntoDescriptor.buffer);
pullIntoDescriptor.bytesFilled -= remainderSize;
ReadableByteStreamControllerCommitPullIntoDescriptor(controller._stream, pullIntoDescriptor);

Expand All @@ -1263,18 +1299,17 @@ function ReadableByteStreamControllerRespondInReadableState(controller, bytesWri

function ReadableByteStreamControllerRespondInternal(controller, bytesWritten) {
const firstDescriptor = controller._pendingPullIntos[0];
assert(CanTransferArrayBuffer(firstDescriptor.buffer) === true);

const state = controller._stream._state;
ReadableByteStreamControllerInvalidateBYOBRequest(controller);

const state = controller._stream._state;
if (state === 'closed') {
if (bytesWritten !== 0) {
throw new TypeError('bytesWritten must be 0 when calling respond() on a closed stream');
}

assert(bytesWritten === 0);
ReadableByteStreamControllerRespondInClosedState(controller, firstDescriptor);
} else {
assert(state === 'readable');

assert(bytesWritten > 0);
ReadableByteStreamControllerRespondInReadableState(controller, bytesWritten, firstDescriptor);
}

Expand All @@ -1283,24 +1318,42 @@ function ReadableByteStreamControllerRespondInternal(controller, bytesWritten) {

function ReadableByteStreamControllerRespondWithNewView(controller, view) {
assert(controller._pendingPullIntos.length > 0);
assert(IsDetachedBuffer(view.buffer) === false);

const firstDescriptor = controller._pendingPullIntos[0];
const state = controller._stream._state;

if (state === 'closed') {
if (view.byteLength !== 0) {
throw new TypeError('The view\'s length must be 0 when calling respondWithNewView() on a closed stream');
}
} else {
assert(state === 'readable');
if (view.byteLength === 0) {
throw new TypeError(
'The view\'s length must be greater than 0 when calling respondWithNewView() on a readable stream'
);
}
}

if (firstDescriptor.byteOffset + firstDescriptor.bytesFilled !== view.byteOffset) {
throw new RangeError('The region specified by view does not match byobRequest');
}
if (firstDescriptor.byteLength !== view.byteLength) {
if (firstDescriptor.bufferByteLength !== view.buffer.byteLength) {
throw new RangeError('The buffer of view has different capacity than byobRequest');
}
if (firstDescriptor.bytesFilled + view.byteLength > firstDescriptor.byteLength) {
throw new RangeError('The region specified by view is larger than byobRequest');
}

firstDescriptor.buffer = view.buffer;
firstDescriptor.buffer = TransferArrayBuffer(view.buffer);

ReadableByteStreamControllerRespondInternal(controller, view.byteLength);
}

function ReadableByteStreamControllerShiftPendingPullInto(controller) {
assert(controller._byobRequest === null);
const descriptor = controller._pendingPullIntos.shift();
ReadableByteStreamControllerInvalidateBYOBRequest(controller);
return descriptor;
}

Expand Down
8 changes: 7 additions & 1 deletion reference-implementation/run-web-platform-tests.js
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,11 @@ async function main() {
const testsPath = path.resolve(wptPath, 'streams');

const filterGlobs = process.argv.length >= 3 ? process.argv.slice(2) : ['**/*.html'];
const excludeGlobs = [
// These tests use ArrayBuffers backed by WebAssembly.Memory objects, which *should* be non-transferable.
// However, our TransferArrayBuffer implementation cannot detect these, and will incorrectly "transfer" them anyway.
'readable-byte-streams/non-transferable-buffers.any.html'
];
const anyTestPattern = /\.any\.html$/;

const bundledJS = await bundle(entryPath);
Expand Down Expand Up @@ -61,7 +66,8 @@ async function main() {
return false;
}

return filterGlobs.some(glob => minimatch(testPath, glob));
return filterGlobs.some(glob => minimatch(testPath, glob)) &&
!excludeGlobs.some(glob => minimatch(testPath, glob));
}
});

Expand Down
2 changes: 1 addition & 1 deletion reference-implementation/web-platform-tests

0 comments on commit 033c6d9

Please sign in to comment.