Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement R2 multipart upload bindings #486

Merged
merged 2 commits into from
Feb 14, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 12 additions & 2 deletions packages/core/src/standards/http.ts
Original file line number Diff line number Diff line change
Expand Up @@ -158,6 +158,7 @@ export function _headersFromIncomingRequest(
export const _kInner = Symbol("kInner");

const kBodyStream = Symbol("kBodyStream");
const kBodyStreamBrand = Symbol("kBodyStreamBrand");
const kInputGated = Symbol("kInputGated");
const kFormDataFiles = Symbol("kFormDataFiles");
const kCloned = Symbol("kCloned");
Expand Down Expand Up @@ -197,7 +198,7 @@ export class Body<Inner extends BaseRequest | BaseResponse> {

if (body === null) return body;
// Only transform body stream once
const bodyStream = this[kBodyStream];
let bodyStream = this[kBodyStream];
if (bodyStream) return bodyStream;
assert(body instanceof ReadableStream);

Expand Down Expand Up @@ -263,7 +264,9 @@ export class Body<Inner extends BaseRequest | BaseResponse> {
cancel: (reason) => reader.cancel(reason),
};
// TODO: maybe set { highWaterMark: 0 } as a strategy here?
return (this[kBodyStream] = new ReadableStream(source));
bodyStream = new ReadableStream(source);
Object.defineProperty(bodyStream, kBodyStreamBrand, { value: true });
return (this[kBodyStream] = bodyStream);
}
get bodyUsed(): boolean {
return this[_kInner].bodyUsed;
Expand Down Expand Up @@ -357,6 +360,13 @@ export function withStringFormDataFiles<
return body;
}

/** @internal */
export function _isBodyStream(stream: ReadableStream): boolean {
return (
(stream as { [kBodyStreamBrand]?: unknown })[kBodyStreamBrand] === true
);
}

export type RequestInfo = BaseRequestInfo | Request;

export interface RequestInit extends BaseRequestInit {
Expand Down
3 changes: 3 additions & 0 deletions packages/core/src/standards/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ export {
Body,
withInputGating,
withStringFormDataFiles,
_isBodyStream,
Request,
withImmutableHeaders,
Response,
Expand Down Expand Up @@ -48,6 +49,8 @@ export {
CompressionStream,
DecompressionStream,
_isByteStream,
_isDisturbedStream,
_isFixedLengthStream,
} from "./streams";
export type { ArrayBufferViewConstructor } from "./streams";
export * from "./navigator";
Expand Down
20 changes: 20 additions & 0 deletions packages/core/src/standards/streams.ts
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,19 @@ export function _isByteStream(
return false;
}

/** @internal */
export function _isDisturbedStream(stream: ReadableStream): boolean {
// Try to determine if stream is a stream has been consumed at all by
// inspecting its state.
for (const symbol of Object.getOwnPropertySymbols(stream)) {
if (symbol.description === "kState") {
// @ts-expect-error symbol properties are not included in type definitions
return !!stream[symbol].disturbed;
}
}
return false;
}

function convertToByteStream(
stream: ReadableStream<Uint8Array>,
clone = false
Expand Down Expand Up @@ -250,6 +263,13 @@ export class FixedLengthStream extends IdentityTransformStream {
};
}

/** @internal */
export function _isFixedLengthStream(stream: ReadableStream): boolean {
return (
(stream as { [kContentLength]?: unknown })[kContentLength] !== undefined
);
}

function createTransformerFromTransform(transform: Transform): Transformer {
// TODO: backpressure? see https://github.com/nodejs/node/blob/440d95a878a1a19bf72a2685fc8fc0f47100b510/lib/internal/webstreams/adapters.js#L538
return {
Expand Down
30 changes: 30 additions & 0 deletions packages/core/test/standards/streams.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import {
Request,
Response,
_isByteStream,
_isDisturbedStream,
} from "@miniflare/core";
import { utf8Decode, utf8Encode } from "@miniflare/shared-test";
import test, { Macro, ThrowsExpectation } from "ava";
Expand Down Expand Up @@ -74,6 +75,35 @@ test("_isByteStream: determines if a ReadableStream is a byte stream", (t) => {
t.false(_isByteStream(regularStream));
t.true(_isByteStream(byteStream));
});
test("_isDisturbedStream: determines if a ReadableStream is disturbed", async (t) => {
const regularStream = new ReadableStream({
pull(controller) {
controller.enqueue(new Uint8Array([1, 2, 3]));
controller.enqueue("thing");
controller.close();
},
});
const byteStream = new ReadableStream({
type: "bytes",
pull(controller) {
controller.enqueue(new Uint8Array([1, 2, 3]));
controller.close();
},
});
t.false(_isDisturbedStream(regularStream));
t.false(_isDisturbedStream(byteStream));

// @ts-expect-error ReadableStream types are incompatible
await arrayBuffer(regularStream);
t.true(_isDisturbedStream(regularStream));

const reader = byteStream.getReader();
t.false(_isDisturbedStream(byteStream));
await reader.read();
t.true(_isDisturbedStream(byteStream));
await reader.releaseLock();
t.true(_isDisturbedStream(byteStream));
});

test("ReadableStreamBYOBReader: readAtLeast: reads at least n bytes", async (t) => {
const stream = chunkedStream([[1, 2, 3], [4], [5, 6]]);
Expand Down
Loading