Skip to content

Commit

Permalink
Implement R2 multipart upload bindings (#486)
Browse files Browse the repository at this point in the history
* Implement R2 multipart upload bindings

* fixup! Implement R2 multipart upload bindings
  • Loading branch information
mrbbot authored Feb 14, 2023
1 parent 547fda9 commit 7174947
Show file tree
Hide file tree
Showing 18 changed files with 1,808 additions and 125 deletions.
14 changes: 12 additions & 2 deletions packages/core/src/standards/http.ts
Original file line number Diff line number Diff line change
Expand Up @@ -158,6 +158,7 @@ export function _headersFromIncomingRequest(
export const _kInner = Symbol("kInner");

const kBodyStream = Symbol("kBodyStream");
const kBodyStreamBrand = Symbol("kBodyStreamBrand");
const kInputGated = Symbol("kInputGated");
const kFormDataFiles = Symbol("kFormDataFiles");
const kCloned = Symbol("kCloned");
Expand Down Expand Up @@ -197,7 +198,7 @@ export class Body<Inner extends BaseRequest | BaseResponse> {

if (body === null) return body;
// Only transform body stream once
const bodyStream = this[kBodyStream];
let bodyStream = this[kBodyStream];
if (bodyStream) return bodyStream;
assert(body instanceof ReadableStream);

Expand Down Expand Up @@ -263,7 +264,9 @@ export class Body<Inner extends BaseRequest | BaseResponse> {
cancel: (reason) => reader.cancel(reason),
};
// TODO: maybe set { highWaterMark: 0 } as a strategy here?
return (this[kBodyStream] = new ReadableStream(source));
bodyStream = new ReadableStream(source);
Object.defineProperty(bodyStream, kBodyStreamBrand, { value: true });
return (this[kBodyStream] = bodyStream);
}
get bodyUsed(): boolean {
return this[_kInner].bodyUsed;
Expand Down Expand Up @@ -357,6 +360,13 @@ export function withStringFormDataFiles<
return body;
}

/** @internal */
export function _isBodyStream(stream: ReadableStream): boolean {
return (
(stream as { [kBodyStreamBrand]?: unknown })[kBodyStreamBrand] === true
);
}

export type RequestInfo = BaseRequestInfo | Request;

export interface RequestInit extends BaseRequestInit {
Expand Down
3 changes: 3 additions & 0 deletions packages/core/src/standards/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ export {
Body,
withInputGating,
withStringFormDataFiles,
_isBodyStream,
Request,
withImmutableHeaders,
Response,
Expand Down Expand Up @@ -48,6 +49,8 @@ export {
CompressionStream,
DecompressionStream,
_isByteStream,
_isDisturbedStream,
_isFixedLengthStream,
} from "./streams";
export type { ArrayBufferViewConstructor } from "./streams";
export * from "./navigator";
Expand Down
20 changes: 20 additions & 0 deletions packages/core/src/standards/streams.ts
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,19 @@ export function _isByteStream(
return false;
}

/** @internal */
export function _isDisturbedStream(stream: ReadableStream): boolean {
// Try to determine if stream is a stream has been consumed at all by
// inspecting its state.
for (const symbol of Object.getOwnPropertySymbols(stream)) {
if (symbol.description === "kState") {
// @ts-expect-error symbol properties are not included in type definitions
return !!stream[symbol].disturbed;
}
}
return false;
}

function convertToByteStream(
stream: ReadableStream<Uint8Array>,
clone = false
Expand Down Expand Up @@ -250,6 +263,13 @@ export class FixedLengthStream extends IdentityTransformStream {
};
}

/** @internal */
export function _isFixedLengthStream(stream: ReadableStream): boolean {
return (
(stream as { [kContentLength]?: unknown })[kContentLength] !== undefined
);
}

function createTransformerFromTransform(transform: Transform): Transformer {
// TODO: backpressure? see https://github.com/nodejs/node/blob/440d95a878a1a19bf72a2685fc8fc0f47100b510/lib/internal/webstreams/adapters.js#L538
return {
Expand Down
30 changes: 30 additions & 0 deletions packages/core/test/standards/streams.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import {
Request,
Response,
_isByteStream,
_isDisturbedStream,
} from "@miniflare/core";
import { utf8Decode, utf8Encode } from "@miniflare/shared-test";
import test, { Macro, ThrowsExpectation } from "ava";
Expand Down Expand Up @@ -74,6 +75,35 @@ test("_isByteStream: determines if a ReadableStream is a byte stream", (t) => {
t.false(_isByteStream(regularStream));
t.true(_isByteStream(byteStream));
});
test("_isDisturbedStream: determines if a ReadableStream is disturbed", async (t) => {
const regularStream = new ReadableStream({
pull(controller) {
controller.enqueue(new Uint8Array([1, 2, 3]));
controller.enqueue("thing");
controller.close();
},
});
const byteStream = new ReadableStream({
type: "bytes",
pull(controller) {
controller.enqueue(new Uint8Array([1, 2, 3]));
controller.close();
},
});
t.false(_isDisturbedStream(regularStream));
t.false(_isDisturbedStream(byteStream));

// @ts-expect-error ReadableStream types are incompatible
await arrayBuffer(regularStream);
t.true(_isDisturbedStream(regularStream));

const reader = byteStream.getReader();
t.false(_isDisturbedStream(byteStream));
await reader.read();
t.true(_isDisturbedStream(byteStream));
await reader.releaseLock();
t.true(_isDisturbedStream(byteStream));
});

test("ReadableStreamBYOBReader: readAtLeast: reads at least n bytes", async (t) => {
const stream = chunkedStream([[1, 2, 3], [4], [5, 6]]);
Expand Down
Loading

0 comments on commit 7174947

Please sign in to comment.