Skip to content

Commit

Permalink
Match streams behaviour with Workers runtime, closes #168
Browse files Browse the repository at this point in the history
- Support `IdentityTransformStream`
- Support the `streams_enable_constructors` compatibility flag:
  - Throw when constructing `ReadableStream`/`WritableStream`s if not
    set
  - Only include `ReadableByteStreamController`,
    `ReadableStreamBYOBRequest`, `ReadableStreamDefaultController`
    and `WritableStreamDefaultController` if set
- Support the `transformstream_enable_standard_constructor`
  compatibility flag
  - Use `IdentityTransformStream` as `TransformStream` implementation
    if not set
  - Warn if custom transformer specified but not set
  - Throw when constructing `TransformStream` if set, but
    `streams_enable_constructors` isn't
  - Only include `TransformStreamDefaultController` if it and
    `streams_enable_constructors` set
- Enable BYOB reads for `FixedLengthStream`
  • Loading branch information
mrbbot committed Sep 3, 2022
1 parent 8a8c5ae commit 5ab7cb3
Show file tree
Hide file tree
Showing 7 changed files with 593 additions and 124 deletions.
106 changes: 87 additions & 19 deletions packages/core/src/plugins/core.ts
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ import {
DecompressionStream,
FetchEvent,
FixedLengthStream,
IdentityTransformStream,
Navigator,
Request,
Response,
Expand Down Expand Up @@ -83,6 +84,21 @@ function proxyStringFormDataFiles<
});
}

function proxyDisableStreamConstructor<
Class extends
| typeof ReadableStream
| typeof WritableStream
| typeof TransformStream
>(klass: Class) {
return new Proxy(klass, {
construct() {
throw new Error(
`To use the new ${klass.name}() constructor, enable the streams_enable_constructors feature flag.`
);
},
});
}

// Approximation of structuredClone for Node < 17.0.0
function structuredCloneBuffer<T>(value: T): T {
return deserialize(serialize(value));
Expand Down Expand Up @@ -398,21 +414,73 @@ export class CorePlugin extends Plugin<CoreOptions> implements CoreOptions {
);
}

const extraGlobals: Context = {};

// Make sure the kFormDataFiles flag is set correctly when constructing
let CompatRequest = Request;
let CompatResponse = Response;
const formDataFiles = ctx.compat.isEnabled(
"formdata_parser_supports_files"
);
if (!formDataFiles) {
CompatRequest = proxyStringFormDataFiles(CompatRequest);
CompatResponse = proxyStringFormDataFiles(CompatResponse);
if (!ctx.compat.isEnabled("formdata_parser_supports_files")) {
CompatRequest = proxyStringFormDataFiles(Request);
CompatResponse = proxyStringFormDataFiles(Response);
}

// Only include `navigator` if `global_navigator` compatibility flag is set
const compatGlobals: Context = {};
if (ctx.compat.isEnabled("global_navigator")) {
compatGlobals.navigator = new Navigator();
extraGlobals.navigator = new Navigator();
extraGlobals.Navigator = Navigator;
}

const enableStreamConstructors = ctx.compat.isEnabled(
"streams_enable_constructors"
);
const enableTransformStreamConstructor = ctx.compat.isEnabled(
"transformstream_enable_standard_constructor"
);
let CompatReadableStream = ReadableStream;
let CompatWritableStream = WritableStream;
let CompatTransformStream:
| typeof TransformStream
| typeof IdentityTransformStream = TransformStream;
// Disable stream constructors if `streams_enable_constructors`
// compatibility flag not set
if (!enableStreamConstructors) {
CompatReadableStream = proxyDisableStreamConstructor(ReadableStream);
CompatWritableStream = proxyDisableStreamConstructor(WritableStream);
// If `transformstream_enable_standard_constructor` flag set, but
// `streams_enable_constructors` not set, disable `TransformStream`
// constructor
if (enableTransformStreamConstructor) {
CompatTransformStream = proxyDisableStreamConstructor(TransformStream);
}
}
// If `transformstream_enable_standard_constructor` flag not set, use
// non-spec `IdentityTransformStream` implementation instead
if (!enableTransformStreamConstructor) {
CompatTransformStream = new Proxy(IdentityTransformStream, {
construct(target, args, newTarget) {
if (args.length > 0) {
ctx.log.warn(
"To use the new TransformStream() constructor with a custom transformer, enable the transformstream_enable_standard_constructor feature flag."
);
}
return Reflect.construct(target, args, newTarget);
},
});
}

// Only include stream controllers if constructors enabled
if (enableStreamConstructors) {
extraGlobals.ReadableByteStreamController = ReadableByteStreamController;
extraGlobals.ReadableStreamBYOBRequest = ReadableStreamBYOBRequest;
extraGlobals.ReadableStreamDefaultController =
ReadableStreamDefaultController;
extraGlobals.WritableStreamDefaultController =
WritableStreamDefaultController;

if (enableTransformStreamConstructor) {
extraGlobals.TransformStreamDefaultController =
TransformStreamDefaultController;
}
}

// Try to parse upstream URL if set
Expand Down Expand Up @@ -477,20 +545,20 @@ export class CorePlugin extends Plugin<CoreOptions> implements CoreOptions {
URLSearchParams,
URLPattern,

ByteLengthQueuingStrategy,
CountQueuingStrategy,
ReadableByteStreamController,
ReadableStream,
ReadableStream: CompatReadableStream,
WritableStream: CompatWritableStream,
TransformStream: CompatTransformStream,

ReadableStreamBYOBReader,
ReadableStreamBYOBRequest,
ReadableStreamDefaultController,
ReadableStreamDefaultReader,
TransformStream,
TransformStreamDefaultController,
WritableStream,
WritableStreamDefaultController,
WritableStreamDefaultWriter,

ByteLengthQueuingStrategy,
CountQueuingStrategy,

IdentityTransformStream,
FixedLengthStream,

CompressionStream: CompressionStreamImpl,
DecompressionStream: DecompressionStreamImpl,

Expand All @@ -511,7 +579,7 @@ export class CorePlugin extends Plugin<CoreOptions> implements CoreOptions {

Date: createDate(this.actualTime),

...compatGlobals,
...extraGlobals,

// The types below would be included automatically, but it's not possible
// to create instances of them without using their constructors and they
Expand Down
1 change: 1 addition & 0 deletions packages/core/src/standards/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ export type {
ResponseRedirectStatus,
} from "./http";
export {
IdentityTransformStream,
FixedLengthStream,
CompressionStream,
DecompressionStream,
Expand Down
135 changes: 102 additions & 33 deletions packages/core/src/standards/streams.ts
Original file line number Diff line number Diff line change
@@ -1,8 +1,11 @@
import type { Transform } from "stream";
import {
ReadableStream,
ReadableStreamBYOBReadResult,
ReadableStreamBYOBReader,
ReadableStreamDefaultReader,
TransformStream,
TransformStreamDefaultController,
Transformer,
} from "stream/web";
import zlib from "zlib";
Expand Down Expand Up @@ -80,56 +83,122 @@ ReadableStreamBYOBReader.prototype.readAtLeast = async function <
return { value: value as any, done };
};

export const kContentLength = Symbol("kContentLength");
const kTransformHook = Symbol("kTransformHook");
const kFlushHook = Symbol("kFlushHook");

export class FixedLengthStream extends TransformStream<Uint8Array, Uint8Array> {
constructor(expectedLength: number) {
// noinspection SuspiciousTypeOfGuard
if (typeof expectedLength !== "number" || expectedLength < 0) {
throw new TypeError(
"FixedLengthStream requires a non-negative integer expected length."
);
}
export class IdentityTransformStream extends TransformStream<
Uint8Array,
Uint8Array
> {
#readableByteStream?: ReadableStream<Uint8Array>;

// Keep track of the number of bytes written
let written = 0;
// Hooks for FixedLengthStream
[kTransformHook]?: (
chunk: Uint8Array,
controller: TransformStreamDefaultController<Uint8Array>
) => boolean;
[kFlushHook]?: (
controller: TransformStreamDefaultController<Uint8Array>
) => void;

constructor() {
super({
transform(chunk, controller) {
transform: (chunk, controller) => {
// Make sure this chunk is an ArrayBuffer(View)
if (isBufferSource(chunk)) {
const array = bufferSourceToArray(chunk);

// Throw if written too many bytes
written += array.byteLength;
if (written > expectedLength) {
return controller.error(
new TypeError(
"Attempt to write too many bytes through a FixedLengthStream."
)
);
}

if (this[kTransformHook]?.(array, controller) === false) return;
controller.enqueue(array);
} else {
controller.error(new TypeError(buildNotBufferSourceError(chunk)));
}
},
flush(controller) {
// Throw if not written enough bytes on close
if (written < expectedLength) {
controller.error(
new TypeError(
"FixedLengthStream did not see all expected bytes before close()."
)
);
flush: (controller) => this[kFlushHook]?.(controller),
});
}

get readable() {
if (this.#readableByteStream !== undefined) return this.#readableByteStream;
const readable = super.readable;
let reader: ReadableStreamDefaultReader;
return (this.#readableByteStream = new ReadableStream({
type: "bytes",
start() {
reader = readable.getReader();
},
async pull(controller) {
const { done, value } = await reader.read();
if (done) {
queueMicrotask(() => {
controller.close();
// Not documented in MDN but if there's an ongoing request that's waiting,
// we need to tell it that there were 0 bytes delivered so that it unblocks
// and notices the end of stream.
// @ts-expect-error `byobRequest` has type `undefined` in `@types/node`
controller.byobRequest?.respond(0);
});
} else {
controller.enqueue(value);
}
},
});
cancel() {
return reader.cancel();
},
}));
}
}

export const kContentLength = Symbol("kContentLength");

export class FixedLengthStream extends IdentityTransformStream {
readonly #expectedLength: number;
#bytesWritten = 0;

constructor(expectedLength: number) {
super();

// noinspection SuspiciousTypeOfGuard
if (typeof expectedLength !== "number" || expectedLength < 0) {
throw new TypeError(
"FixedLengthStream requires a non-negative integer expected length."
);
}
this.#expectedLength = expectedLength;

// When used as Request/Response body, override the Content-Length header
// with the expectedLength
(this.readable as any)[kContentLength] = expectedLength;
Object.defineProperty(this.readable, kContentLength, {
value: expectedLength,
});
}

[kTransformHook] = (
chunk: Uint8Array,
controller: TransformStreamDefaultController<Uint8Array>
) => {
// Throw if written too many bytes
this.#bytesWritten += chunk.byteLength;
if (this.#bytesWritten > this.#expectedLength) {
controller.error(
new TypeError(
"Attempt to write too many bytes through a FixedLengthStream."
)
);
return false;
}
return true;
};

[kFlushHook] = (controller: TransformStreamDefaultController<Uint8Array>) => {
// Throw if not written enough bytes on close
if (this.#bytesWritten < this.#expectedLength) {
controller.error(
new TypeError(
"FixedLengthStream did not see all expected bytes before close()."
)
);
}
};
}

function createTransformerFromTransform(transform: Transform): Transformer {
Expand Down
Loading

0 comments on commit 5ab7cb3

Please sign in to comment.