diff --git a/packages/core/src/standards/http.ts b/packages/core/src/standards/http.ts index a6d956161..da34afd5e 100644 --- a/packages/core/src/standards/http.ts +++ b/packages/core/src/standards/http.ts @@ -158,6 +158,7 @@ export function _headersFromIncomingRequest( export const _kInner = Symbol("kInner"); const kBodyStream = Symbol("kBodyStream"); +const kBodyStreamBrand = Symbol("kBodyStreamBrand"); const kInputGated = Symbol("kInputGated"); const kFormDataFiles = Symbol("kFormDataFiles"); const kCloned = Symbol("kCloned"); @@ -197,7 +198,7 @@ export class Body { if (body === null) return body; // Only transform body stream once - const bodyStream = this[kBodyStream]; + let bodyStream = this[kBodyStream]; if (bodyStream) return bodyStream; assert(body instanceof ReadableStream); @@ -263,7 +264,9 @@ export class Body { cancel: (reason) => reader.cancel(reason), }; // TODO: maybe set { highWaterMark: 0 } as a strategy here? - return (this[kBodyStream] = new ReadableStream(source)); + bodyStream = new ReadableStream(source); + Object.defineProperty(bodyStream, kBodyStreamBrand, { value: true }); + return (this[kBodyStream] = bodyStream); } get bodyUsed(): boolean { return this[_kInner].bodyUsed; @@ -357,6 +360,13 @@ export function withStringFormDataFiles< return body; } +/** @internal */ +export function _isBodyStream(stream: ReadableStream): boolean { + return ( + (stream as { [kBodyStreamBrand]?: unknown })[kBodyStreamBrand] === true + ); +} + export type RequestInfo = BaseRequestInfo | Request; export interface RequestInit extends BaseRequestInit { diff --git a/packages/core/src/standards/index.ts b/packages/core/src/standards/index.ts index e2471a891..0a53c9501 100644 --- a/packages/core/src/standards/index.ts +++ b/packages/core/src/standards/index.ts @@ -10,6 +10,7 @@ export { Body, withInputGating, withStringFormDataFiles, + _isBodyStream, Request, withImmutableHeaders, Response, @@ -48,6 +49,8 @@ export { CompressionStream, DecompressionStream, _isByteStream, + _isDisturbedStream, + _isFixedLengthStream, } from "./streams"; export type { ArrayBufferViewConstructor } from "./streams"; export * from "./navigator"; diff --git a/packages/core/src/standards/streams.ts b/packages/core/src/standards/streams.ts index 95f9f64f2..4cbc0e52c 100644 --- a/packages/core/src/standards/streams.ts +++ b/packages/core/src/standards/streams.ts @@ -34,6 +34,19 @@ export function _isByteStream( return false; } +/** @internal */ +export function _isDisturbedStream(stream: ReadableStream): boolean { + // Try to determine if stream is a stream has been consumed at all by + // inspecting its state. + for (const symbol of Object.getOwnPropertySymbols(stream)) { + if (symbol.description === "kState") { + // @ts-expect-error symbol properties are not included in type definitions + return !!stream[symbol].disturbed; + } + } + return false; +} + function convertToByteStream( stream: ReadableStream, clone = false @@ -250,6 +263,13 @@ export class FixedLengthStream extends IdentityTransformStream { }; } +/** @internal */ +export function _isFixedLengthStream(stream: ReadableStream): boolean { + return ( + (stream as { [kContentLength]?: unknown })[kContentLength] !== undefined + ); +} + function createTransformerFromTransform(transform: Transform): Transformer { // TODO: backpressure? see https://github.com/nodejs/node/blob/440d95a878a1a19bf72a2685fc8fc0f47100b510/lib/internal/webstreams/adapters.js#L538 return { diff --git a/packages/core/test/standards/streams.spec.ts b/packages/core/test/standards/streams.spec.ts index 91b099dc9..baf415994 100644 --- a/packages/core/test/standards/streams.spec.ts +++ b/packages/core/test/standards/streams.spec.ts @@ -14,6 +14,7 @@ import { Request, Response, _isByteStream, + _isDisturbedStream, } from "@miniflare/core"; import { utf8Decode, utf8Encode } from "@miniflare/shared-test"; import test, { Macro, ThrowsExpectation } from "ava"; @@ -74,6 +75,35 @@ test("_isByteStream: determines if a ReadableStream is a byte stream", (t) => { t.false(_isByteStream(regularStream)); t.true(_isByteStream(byteStream)); }); +test("_isDisturbedStream: determines if a ReadableStream is disturbed", async (t) => { + const regularStream = new ReadableStream({ + pull(controller) { + controller.enqueue(new Uint8Array([1, 2, 3])); + controller.enqueue("thing"); + controller.close(); + }, + }); + const byteStream = new ReadableStream({ + type: "bytes", + pull(controller) { + controller.enqueue(new Uint8Array([1, 2, 3])); + controller.close(); + }, + }); + t.false(_isDisturbedStream(regularStream)); + t.false(_isDisturbedStream(byteStream)); + + // @ts-expect-error ReadableStream types are incompatible + await arrayBuffer(regularStream); + t.true(_isDisturbedStream(regularStream)); + + const reader = byteStream.getReader(); + t.false(_isDisturbedStream(byteStream)); + await reader.read(); + t.true(_isDisturbedStream(byteStream)); + await reader.releaseLock(); + t.true(_isDisturbedStream(byteStream)); +}); test("ReadableStreamBYOBReader: readAtLeast: reads at least n bytes", async (t) => { const stream = chunkedStream([[1, 2, 3], [4], [5, 6]]); diff --git a/packages/r2/src/bucket.ts b/packages/r2/src/bucket.ts index 46240cf1b..60c6a2ed8 100644 --- a/packages/r2/src/bucket.ts +++ b/packages/r2/src/bucket.ts @@ -7,18 +7,28 @@ import { ReadableStream } from "stream/web"; import { TextEncoder } from "util"; import { parseRanges } from "@miniflare/core"; import { - RangeStoredValueMeta, RequestContext, Storage, assertInRequest, getRequestContext, + parseRange, viewToArray, waitForOpenInputGate, waitForOpenOutputGate, } from "@miniflare/shared"; import { Headers } from "undici"; +import { + InternalR2MultipartUploadOptions, + R2MultipartUpload, + _INTERNAL_PREFIX, + createMultipartUpload, + deleteMultipartParts, + getMultipartValue, + validateMultipartKey, +} from "./multipart"; import { HEX_REGEXP, + MAX_KEY_SIZE, R2Checksums, R2HashAlgorithm, R2Object, @@ -78,6 +88,10 @@ export interface R2PutOptions extends R2Checksums { // A map of custom, user-defined metadata that will be stored with the object. customMetadata?: Record; } +export type R2MultipartOptions = Pick< + R2PutOptions, + "httpMetadata" | "customMetadata" +>; export type R2ListOptionsInclude = ("httpMetadata" | "customMetadata")[]; @@ -119,7 +133,6 @@ export interface R2Objects { } const MAX_LIST_KEYS = 1_000; -const MAX_KEY_SIZE = 1024; // https://developers.cloudflare.com/r2/platform/limits/ (5GB - 5MB) const MAX_VALUE_SIZE = 5 * 1_000 * 1_000 * 1_000 - 5 * 1_000 * 1_000; const UNPAIRED_SURROGATE_PAIR_REGEX = @@ -129,7 +142,7 @@ const encoder = new TextEncoder(); type Method = "HEAD" | "GET" | "PUT" | "LIST" | "DELETE"; -function throwR2Error(method: Method, status: number, message: string): void { +function throwR2Error(method: Method, status: number, message: string): never { throw new Error(`R2 ${method} failed: (${status}) ${message}`); } @@ -146,6 +159,10 @@ function validateKey(method: Method, key: string) { `UTF-8 encoded length of ${keyLength} exceeds key length limit of ${MAX_KEY_SIZE}.` ); } + // Check key doesn't start with internal prefix used for multipart storage + if (key.startsWith(_INTERNAL_PREFIX)) { + throwR2Error(method, 400, `Key cannot start with "${_INTERNAL_PREFIX}".`); + } return key; } @@ -408,30 +425,38 @@ function rangeHeaderToR2Range(headers: Headers, size: number): R2Range { return {}; } -function buildKeyTypeError(method: Lowercase): string { +function buildKeyTypeError(method: keyof R2Bucket): string { return `Failed to execute '${method}' on 'R2Bucket': parameter 1 is not of type 'string'.`; } export interface InternalR2BucketOptions { blockGlobalAsyncIO?: boolean; listRespectInclude?: boolean; + minMultipartUploadSize?: number; } export class R2Bucket { readonly #storage: Storage; readonly #blockGlobalAsyncIO: boolean; readonly #listRespectInclude: boolean; + readonly #multipartOpts: InternalR2MultipartUploadOptions; constructor( storage: Storage, { blockGlobalAsyncIO = false, listRespectInclude = true, + minMultipartUploadSize, }: InternalR2BucketOptions = {} ) { this.#storage = storage; this.#blockGlobalAsyncIO = blockGlobalAsyncIO; this.#listRespectInclude = listRespectInclude; + this.#multipartOpts = { + storage, + blockGlobalAsyncIO, + minMultipartUploadSize, + }; } #prepareCtx(): RequestContext | undefined { @@ -441,7 +466,7 @@ export class R2Bucket { return ctx; } - async #head(key: string): Promise { + async #head(key: string): Promise { // Get value, returning null if not found const stored = await this.#storage.head(key); // fix dates @@ -449,7 +474,7 @@ export class R2Bucket { const { metadata } = stored; parseR2ObjectMetadata(metadata); - return new R2Object(metadata); + return metadata; } async head(key: string): Promise { @@ -467,7 +492,7 @@ export class R2Bucket { await waitForOpenInputGate(); ctx?.advanceCurrentTime(); - return meta; + return meta === null ? null : new R2Object(meta); } /** @@ -507,10 +532,10 @@ export class R2Bucket { return null; } // test conditional should it exist - if (!testR2Conditional(onlyIf, meta) || meta?.size === 0) { + if (!testR2Conditional(onlyIf, meta)) { await waitForOpenInputGate(); ctx?.advanceCurrentTime(); - return meta; + return new R2Object(meta); } // Convert `Range` header to R2Range if specified @@ -518,28 +543,39 @@ export class R2Bucket { range = rangeHeaderToR2Range(range, meta.size); } - let stored: RangeStoredValueMeta | undefined; - - // get data dependent upon whether suffix or range exists + let value: Uint8Array | ReadableStream; try { - stored = await this.#storage.getRange(key, range); + if (meta.size === 0) { + value = new Uint8Array(); + } else if (meta.multipart !== undefined) { + const parsedRange = parseRange(range, meta.size); + value = getMultipartValue( + this.#storage, + key, + meta.multipart, + parsedRange + ); + meta.range = parsedRange; + } else { + const stored = await this.#storage.getRange( + key, + range + ); + if (stored === undefined) return null; + value = stored.value; + // Add range should it exist + if ("range" in stored && stored.range !== undefined) { + meta.range = stored.range; + } + } } catch { throwR2Error("GET", 400, "The requested range is not satisfiable."); } await waitForOpenInputGate(); ctx?.advanceCurrentTime(); - // if bad metadata, return null - if (stored?.metadata === undefined) return null; - const { value, metadata } = stored; - // fix dates - parseR2ObjectMetadata(metadata); - // add range should it exist - if ("range" in stored && stored.range !== undefined) { - metadata.range = stored.range; - } - return new R2ObjectBody(metadata, value); + return new R2ObjectBody(meta, value); } async put( @@ -617,6 +653,10 @@ export class R2Bucket { value: toStore, metadata, }); + // If existing value was multipart, remove its parts + if (meta?.multipart !== undefined) { + await deleteMultipartParts(this.#storage, key, meta.multipart.uploadId); + } await waitForOpenInputGate(); ctx?.advanceCurrentTime(); @@ -634,7 +674,23 @@ export class R2Bucket { keys = keys.map((key) => validateKey("DELETE", String(key))); await waitForOpenOutputGate(); + const keyMetas = await Promise.all(keys.map((key) => this.#head(key))); + await this.#storage.deleteMany(keys); + + // If any existing values were multipart, remove their parts + const deletePartsPromises = keys.map((key, i) => { + const keyMeta = keyMetas[i]; + if (keyMeta?.multipart !== undefined) { + return deleteMultipartParts( + this.#storage, + key, + keyMeta.multipart.uploadId + ); + } + }); + await Promise.all(deletePartsPromises); + await waitForOpenInputGate(); ctx?.advanceCurrentTime(); @@ -659,6 +715,7 @@ export class R2Bucket { const res = await this.#storage.list({ prefix, + excludePrefix: _INTERNAL_PREFIX, limit, cursor, start: startAfter, @@ -707,4 +764,77 @@ export class R2Bucket { delimitedPrefixes: [...delimitedPrefixes], }; } + + async createMultipartUpload( + key: string, + options: R2MultipartOptions = {} + ): Promise { + const ctx = this.#prepareCtx(); + + // The Workers runtime will coerce the key parameter to a string + if (arguments.length === 0) { + throw new TypeError(buildKeyTypeError("createMultipartUpload")); + } + key = String(key); + validateMultipartKey("createMultipartUpload", key); + + // Validate options + if (typeof options !== "object") { + throw new TypeError( + "Failed to execute 'createMultipartUpload' on 'R2Bucket': parameter 2 is not of type 'MultipartOptions'." + ); + } + if ( + options.customMetadata !== undefined && + typeof options.customMetadata !== "object" + ) { + throw new TypeError( + "Incorrect type for the 'customMetadata' field on 'MultipartOptions': the provided value is not of type 'object'." + ); + } + if ( + options.httpMetadata !== undefined && + typeof options.httpMetadata !== "object" + ) { + throw new TypeError( + "Incorrect type for the 'httpMetadata' field on 'MultipartOptions': the provided value is not of type 'HttpMetadata or Headers'." + ); + } + const customMetadata = options.customMetadata ?? {}; + const httpMetadata = parseHttpMetadata(options.httpMetadata); + + // Creating a multipart upload isn't observable so no need to wait on + // output gate to open + const upload = await createMultipartUpload( + key, + { customMetadata, httpMetadata }, + this.#multipartOpts + ); + await waitForOpenInputGate(); + ctx?.advanceCurrentTime(); + + return upload; + } + + async resumeMultipartUpload( + key: string, + uploadId: string + ): Promise { + // The Workers runtime doesn't make a subrequest here, so no need to call + // `prepareCtx()` + + // The Workers runtime will coerce key and uploadId parameters to a string + if (arguments.length === 0) { + throw new TypeError(buildKeyTypeError("resumeMultipartUpload")); + } + if (arguments.length === 1) { + throw new TypeError( + "Failed to execute 'resumeMultipartUpload' on 'R2Bucket': parameter 2 is not of type 'string'." + ); + } + key = String(key); + uploadId = String(uploadId); + + return new R2MultipartUpload(key, uploadId, this.#multipartOpts); + } } diff --git a/packages/r2/src/index.ts b/packages/r2/src/index.ts index 0dc88e34c..97f74a5e5 100644 --- a/packages/r2/src/index.ts +++ b/packages/r2/src/index.ts @@ -1,3 +1,4 @@ export * from "./bucket"; +export * from "./multipart"; export * from "./plugin"; export * from "./r2Object"; diff --git a/packages/r2/src/multipart.ts b/packages/r2/src/multipart.ts new file mode 100644 index 000000000..976c4a3a2 --- /dev/null +++ b/packages/r2/src/multipart.ts @@ -0,0 +1,568 @@ +// This file implements R2's multipart uploads. Multipart uploads are created +// and later resumed. When creating a multipart upload, Miniflare will store +// an "index", containing passed HTTP and custom metadata. This index serves +// as a marker for the upload, and is used by other methods to check the upload +// exists. +// +// A new key is stored for each uploaded part, in the same namespace as the +// upload's index. Each part gets an associated ETag, which must be used in +// conjunction with the part number when completing an upload. If a part is +// uploaded with the same part number as an existing part, it will override it. +// +// To complete an upload, an array of part number and ETag objects is required. +// Miniflare will then put a file in the regular location for the key containing +// pointers to the uploaded parts. This means Miniflare doesn't need to load +// all parts into memory, concatenate them, and write them back out. An upload +// can also be aborted, in which case all its parts will be deleted. +// +// Note that when completing or aborting an upload, the index is NOT deleted. +// This is because uploads can be aborted more than once, and even aborted after +// completion (although in this case, aborting is a no-op). We need to be able +// to distinguish between a completed upload, an aborted upload and an upload +// that never existed to handle this, and match R2's error messages. +// +// If regular `R2Bucket#{put,delete}()` methods are called on completed +// multipart keys, they will delete all parts in addition to the key itself. +// `R2Bucket#{head,get,list}()` will never return data from in-progress uploads. +// +// Unfortunately, Miniflare 2's storage abstraction is not very good at +// handling large data (doesn't support streaming reads/writes), or complex +// operations (doesn't support transactions). This limits the reliability of +// this multipart implementation, but it should still be useful for testing. +// We should aim to improve this in Miniflare 3. + +import assert from "assert"; +import { Blob } from "buffer"; +import crypto from "crypto"; +import { arrayBuffer } from "stream/consumers"; +import { ReadableStream } from "stream/web"; +import { TextEncoder } from "util"; +import { _isBodyStream, _isFixedLengthStream } from "@miniflare/core"; +import { + ParsedRange, + RequestContext, + Storage, + assertInRequest, + getRequestContext, + viewToArray, + waitForOpenInputGate, + waitForOpenOutputGate, +} from "@miniflare/shared"; +import { + MAX_KEY_SIZE, + R2MultipartReference, + R2Object, + R2ObjectMetadata, + createMD5Hash, + createVersion, +} from "./r2Object"; + +/** @internal */ +export const _INTERNAL_PREFIX = "__MINIFLARE_INTERNAL__"; + +const MIN_MULTIPART_UPLOAD_SIZE = 5 * 1024 * 1024; + +export interface R2UploadedPart { + partNumber: number; + etag: string; +} +export interface R2MultipartPendingIndexMetadata { + httpMetadata: R2ObjectMetadata["httpMetadata"]; + customMetadata: R2ObjectMetadata["customMetadata"]; +} +export type R2MultipartIndexMetadata = + | R2MultipartPendingIndexMetadata + | { aborted: true } + | { completed: true }; +interface R2MultipartPartMetadata { + size: number; + md5: string; + etag: string; +} + +type R2UploadState = + | { exists: true; meta: R2MultipartPendingIndexMetadata } + | { exists: false; aborted: boolean; completed: boolean }; + +const encoder = new TextEncoder(); +export function validateMultipartKey(method: string, key: string) { + if ( + Buffer.byteLength(key) > MAX_KEY_SIZE || + key.startsWith(_INTERNAL_PREFIX) + ) { + throw new TypeError( + `${method}: The specified object name is not valid. (10020)` + ); + } +} +function validatePartNumber(partNumber: number) { + if (partNumber >= 1 && partNumber <= 10_000) return; + throw new TypeError( + `Part number must be between 1 and 10000 (inclusive). Actual value was: ${partNumber}` + ); +} + +function generateId(likelyOnFilesystem = false) { + // Windows has a maximum path length of ~260 characters: + // https://learn.microsoft.com/en-us/windows/win32/fileio/maximum-file-path-limitation + // Miniflare R2 buckets will usually be backed by file-system storage, + // especially when using multipart uploads for large files. Therefore, reduce + // the size of the upload ID on Windows, preferring a longer ID otherwise + // to more closely match R2 behaviour. + const size = likelyOnFilesystem && process.platform === "win32" ? 32 : 128; + return crypto.randomBytes(size).toString("base64url"); +} +function generateMultipartEtag(md5Hexes: string[]) { + // TODO: R2's multipart ETags don't seem to be deterministic, should ours be? + // https://stackoverflow.com/a/19896823 + const hash = crypto.createHash("md5"); + for (const md5Hex of md5Hexes) hash.update(Buffer.from(md5Hex, "hex")); + return `${hash.digest("hex")}-${md5Hexes.length}`; +} + +const INDEX = "index"; +function buildKey(key: string, uploadId: string, part?: number) { + return `${_INTERNAL_PREFIX}:multipart:${uploadId}:${key}:${part ?? INDEX}`; +} + +function isKnownLengthStream(stream: ReadableStream): boolean { + return _isBodyStream(stream) || _isFixedLengthStream(stream); +} + +export interface InternalR2MultipartUploadOptions { + storage: Storage; + blockGlobalAsyncIO?: boolean; + minMultipartUploadSize?: number; +} +export async function createMultipartUpload( + key: string, + metadata: R2MultipartIndexMetadata, + opts: InternalR2MultipartUploadOptions +): Promise { + const uploadId = generateId(/* likelyOnFilesystem */ true); + const indexKey = buildKey(key, uploadId); + await opts.storage.put(indexKey, { + value: new Uint8Array(), + metadata, + }); + return new R2MultipartUpload(key, uploadId, opts); +} + +interface R2MultipartRange { + /* inclusive */ start: number; + /* exclusive */ end: number; +} +function overlaps(a: R2MultipartRange, b: R2MultipartRange): boolean { + return a.start < b.end && b.start < a.end; +} +export function getMultipartValue( + storage: Storage, + key: string, + multipart: R2MultipartReference, + range: ParsedRange +): ReadableStream { + // Convert from offset/length to start/end + const queryRange: R2MultipartRange = { + start: range.offset, + end: range.offset + range.length, + }; + + // Find required parts (and the ranges within them) to satisfy the query + const parts: ({ partNumber: number } & R2MultipartRange)[] = []; + let start = 0; + for (const part of multipart.parts) { + const partRange: R2MultipartRange = { start, end: start + part.size }; + if (overlaps(partRange, queryRange)) { + parts.push({ + partNumber: part.partNumber, + start: Math.max(partRange.start, queryRange.start) - partRange.start, + end: Math.min(partRange.end, queryRange.end) - partRange.start, + }); + } + start = partRange.end; + } + + // Return a stream that fetches the parts lazily when required + return new ReadableStream({ + type: "bytes", + async pull(controller) { + const part = parts.shift(); + if (part === undefined) { + // If there are no more parts left, close the stream + await waitForOpenInputGate(); + controller.close(); + // Not documented in MDN but if there's an ongoing request that's + // waiting, we need to tell it that there were 0 bytes delivered so that + // it unblocks and notices the end of stream. + // @ts-expect-error `byobRequest` has type `undefined` in `@types/node` + controller.byobRequest?.respond(0); + } else { + // Otherwise, fetch and enqueue the next part + const partKey = buildKey(key, multipart.uploadId, part.partNumber); + const value = await storage.getRange( + partKey, + { offset: part.start, length: part.end - part.start }, + /* skipMetadata */ true + ); + assert(value !== undefined); // The part must exist + await waitForOpenInputGate(); + if (value.value.byteLength > 0) controller.enqueue(value.value); + } + }, + }); +} + +export async function deleteMultipartParts( + storage: Storage, + key: string, + uploadId: string, + excludeKeys?: Set +): Promise { + const indexKey = buildKey(key, uploadId); + const partPrefix = indexKey.substring(0, indexKey.length - INDEX.length); + const { keys } = await storage.list({ prefix: partPrefix }); + const partKeys: string[] = []; + for (const key of keys) { + if ( + key.name !== indexKey && + (excludeKeys === undefined || !excludeKeys.has(key.name)) + ) { + partKeys.push(key.name); + } + } + await storage.deleteMany(partKeys); +} + +export class R2MultipartUpload { + readonly #storage: Storage; + readonly #blockGlobalAsyncIO: boolean; + readonly #minMultipartUploadSize: number; + + readonly key!: string; + readonly uploadId!: string; + + constructor( + key: string, + uploadId: string, + opts: InternalR2MultipartUploadOptions + ) { + this.#storage = opts.storage; + this.#blockGlobalAsyncIO = opts.blockGlobalAsyncIO ?? false; + this.#minMultipartUploadSize = + opts.minMultipartUploadSize ?? MIN_MULTIPART_UPLOAD_SIZE; + + // `key` and `uploadId` should be enumerable, readonly, instance properties: + // https://github.com/cloudflare/workerd/blob/main/src/workerd/api/r2-multipart.h#L40-L41 + Object.defineProperties(this, { + key: { + enumerable: true, + get() { + return key; + }, + set() { + throw new TypeError( + "Cannot assign to read only property 'key' of object '#'" + ); + }, + }, + uploadId: { + enumerable: true, + get() { + return uploadId; + }, + set() { + throw new TypeError( + "Cannot assign to read only property 'uploadId' of object '#'" + ); + }, + }, + }); + } + + #prepareCtx(): RequestContext | undefined { + if (this.#blockGlobalAsyncIO) assertInRequest(); + const ctx = getRequestContext(); + ctx?.incrementInternalSubrequests(); + return ctx; + } + + async #state(): Promise { + const meta = await this.#storage.head( + buildKey(this.key, this.uploadId) + ); + if (meta?.metadata === undefined) { + return { exists: false, aborted: false, completed: false }; + } + if ("aborted" in meta.metadata) { + return { exists: false, aborted: true, completed: false }; + } + if ("completed" in meta.metadata) { + return { exists: false, aborted: false, completed: true }; + } + return { exists: true, meta: meta.metadata }; + } + + async uploadPart( + partNumber: number, + value: ReadableStream | ArrayBuffer | ArrayBufferView | string | Blob + ): Promise { + const ctx = this.#prepareCtx(); + + // 1. Validate and coerce parameters + if (arguments.length === 0) { + throw new TypeError( + "Failed to execute 'uploadPart' on 'R2MultipartUpload': parameter 1 is not of type 'integer'." + ); + } + // noinspection SuspiciousTypeOfGuard + if (typeof partNumber !== "number") { + partNumber = parseInt(String(partNumber)); + } + if (isNaN(partNumber)) partNumber = 0; + + let valueArray: Uint8Array; + if (typeof value === "string") { + valueArray = encoder.encode(value); + } else if (value instanceof ArrayBuffer) { + valueArray = new Uint8Array(value); + } else if (ArrayBuffer.isView(value)) { + valueArray = viewToArray(value); + } else if (value instanceof Blob) { + valueArray = new Uint8Array(await value.arrayBuffer()); + } else if (value instanceof ReadableStream) { + if (!isKnownLengthStream(value)) { + throw new TypeError( + "Provided readable stream must have a known length (request/response body or readable half of FixedLengthStream)" + ); + } + // @ts-expect-error @types/node stream/consumers doesn't accept ReadableStream + valueArray = new Uint8Array(await arrayBuffer(value)); + } else { + throw new TypeError( + "Failed to execute 'uploadPart' on 'R2MultipartUpload': parameter 2 is not of type 'ReadableStream or ArrayBuffer or ArrayBufferView or string or Blob'." + ); + } + + validatePartNumber(partNumber); + + // 2. Make sure this multipart upload exists + validateMultipartKey("uploadPart", this.key); + if (!(await this.#state()).exists) { + throw new Error( + "uploadPart: The specified multipart upload does not exist. (10024)" + ); + } + + // 3. Write part to storage + const partKey = buildKey(this.key, this.uploadId, partNumber); + const etag = generateId(); + // No need to wait for output gate here, as the user can't know the `etag` + // before this function resolves, so this change isn't externally visible + await this.#storage.put(partKey, { + value: valueArray, + metadata: { + size: valueArray.byteLength, + md5: createMD5Hash(valueArray), + etag, + }, + }); + await waitForOpenInputGate(); + ctx?.advanceCurrentTime(); + + return { partNumber, etag }; + } + + async abort(): Promise { + const ctx = this.#prepareCtx(); + + // 1. Make sure this multipart upload exists, ignoring the finalised state + validateMultipartKey("abortMultipartUpload", this.key); + const state = await this.#state(); + if (!state.exists) { + if (state.aborted || state.completed) { + // If this upload has already been finalised, return here. `abort()` can + // be called multiple times, and on already `complete()`ed uploads. In + // the later case, we really don't want to delete pointed-to parts. + await waitForOpenInputGate(); + ctx?.advanceCurrentTime(); + return; + } else { + throw new Error( + "abortMultipartUpload: We encountered an internal error. Please try again. (10001)" + ); + } + } + + // 3. Delete all parts, excluding the index + // No need to wait for output gate here, as we're just deleting hidden + // internal parts, so this change isn't externally visible + await deleteMultipartParts(this.#storage, this.key, this.uploadId); + + // 4. Mark upload as aborted + const indexKey = buildKey(this.key, this.uploadId); + await this.#storage.put(indexKey, { + value: new Uint8Array(), + metadata: { aborted: true }, + }); + await waitForOpenInputGate(); + ctx?.advanceCurrentTime(); + } + + async complete(uploadedParts: R2UploadedPart[]): Promise { + const ctx = this.#prepareCtx(); + + // 1. Validate and coerce parameters + if (!Array.isArray(uploadedParts)) { + throw new TypeError( + "Failed to execute 'complete' on 'R2MultipartUpload': parameter 1 is not of type 'Array'." + ); + } + uploadedParts = uploadedParts.map((part, i) => { + if (typeof part !== "object") { + throw new TypeError( + `Incorrect type for array element ${i}: the provided value is not of type 'UploadedPart'.` + ); + } + // Create new part object, so we don't mutate parameters when coercing + part = { partNumber: part.partNumber, etag: part.etag }; + // noinspection SuspiciousTypeOfGuard + if (typeof part.partNumber !== "number") { + part.partNumber = parseInt(String(part.partNumber)); + } + if (isNaN(part.partNumber)) part.partNumber = 0; + + part.etag = String(part.etag); + + return part; + }); + for (const part of uploadedParts) { + validatePartNumber(part.partNumber); + } + + // 2. Make sure this multipart upload exists + validateMultipartKey("completeMultipartUpload", this.key); + const state = await this.#state(); + if (!state.exists) { + throw new Error( + state.completed + ? "completeMultipartUpload: The specified multipart upload does not exist. (10024)" + : "completeMultipartUpload: We encountered an internal error. Please try again. (10001)" + ); + } + + // 3. Make sure all part numbers are unique + const partNumberSet = new Set(); + for (const { partNumber } of uploadedParts) { + if (partNumberSet.has(partNumber)) { + throw new Error( + "completeMultipartUpload: We encountered an internal error. Please try again. (10001)" + ); + } + partNumberSet.add(partNumber); + } + + // 4. Get metadata for all parts, checking they all exist + const partMetas = await Promise.all( + uploadedParts.map(({ partNumber }) => { + const partKey = buildKey(this.key, this.uploadId, partNumber); + return this.#storage.head(partKey); + }) + ); + const parts = partMetas.map((partMeta, i) => { + const uploadedPart = uploadedParts[i]; + if ( + partMeta?.metadata === undefined || + partMeta.metadata.etag !== uploadedPart.etag + ) { + throw new Error( + "completeMultipartUpload: One or more of the specified parts could not be found. (10025)" + ); + } + // Note both `uploadedPart` and `partMeta.metadata` have an `etag` field, + // but we've just validated they're the same + return { ...uploadedPart, ...partMeta.metadata }; + }); + + // 5. Check all but last part meets minimum size requirements. First check + // the in argument order, throwing a friendly error... + for (const part of parts.slice(0, -1)) { + if (part.size < this.#minMultipartUploadSize) { + throw new Error( + "completeMultipartUpload: Your proposed upload is smaller than the minimum allowed object size." + ); + } + } + // ...then check again in ascending `partNumber` order, throwing an + // internal error. We won't know where the current last element ends + // up in the sort, so we just check all parts again. + // + // Also check that all but last parts are the same size... + parts.sort((a, b) => a.partNumber - b.partNumber); + let partSize: number | undefined; + for (const part of parts.slice(0, -1)) { + if (partSize === undefined) partSize = part.size; + if (part.size < this.#minMultipartUploadSize || part.size !== partSize) { + throw new Error( + "completeMultipartUpload: There was a problem with the multipart upload. (10048)" + ); + } + } + // ...and the last part is not greater than all others + // (if part size is defined, we must have at least one part) + if (partSize !== undefined && parts[parts.length - 1].size > partSize) { + throw new Error( + "completeMultipartUpload: There was a problem with the multipart upload. (10048)" + ); + } + + // 6. Write key to storage with pointers to parts, and mark upload as + // completed + const existingMeta = await this.#storage.head(this.key); + const indexKey = buildKey(this.key, this.uploadId); + const totalSize = parts.reduce((acc, { size }) => acc + size, 0); + const etag = generateMultipartEtag(parts.map(({ md5 }) => md5)); + const metadata: R2ObjectMetadata = { + key: this.key, + version: createVersion(), + size: totalSize, + etag, + httpEtag: `"${etag}"`, + uploaded: new Date(), + httpMetadata: state.meta.httpMetadata, + customMetadata: state.meta.customMetadata, + checksums: {}, + multipart: { + uploadId: this.uploadId, + parts: parts.map(({ partNumber, size }) => ({ partNumber, size })), + }, + }; + await waitForOpenOutputGate(); + await this.#storage.putMany([ + [this.key, { value: new Uint8Array(), metadata }], + [indexKey, { value: new Uint8Array(), metadata: { completed: true } }], + ]); + await waitForOpenInputGate(); + ctx?.advanceCurrentTime(); + + // 7. Cleanup redundant parts + // a) If we didn't use all upload parts, remove the unused + const used = new Set( + parts.map(({ partNumber }) => + buildKey(this.key, this.uploadId, partNumber) + ) + ); + await deleteMultipartParts(this.#storage, this.key, this.uploadId, used); + // b) If we had an existing multipart key, remove all its parts + if (existingMeta?.metadata?.multipart !== undefined) { + await deleteMultipartParts( + this.#storage, + this.key, + existingMeta.metadata.multipart.uploadId + ); + } + + // Note metadata is empty in objects returned from `complete()`, this feels + // like a bug... + return new R2Object(metadata); + } +} diff --git a/packages/r2/src/r2Object.ts b/packages/r2/src/r2Object.ts index 6b50a962d..2a3c7d62e 100644 --- a/packages/r2/src/r2Object.ts +++ b/packages/r2/src/r2Object.ts @@ -1,13 +1,15 @@ import assert from "assert"; import { Blob } from "buffer"; import crypto from "crypto"; -import { arrayBuffer } from "stream/consumers"; +import consumers from "stream/consumers"; import { ReadableStream } from "stream/web"; -import { TextDecoder } from "util"; +import { _isDisturbedStream } from "@miniflare/core"; import { viewToBuffer, waitForOpenInputGate } from "@miniflare/shared"; import { Headers } from "undici"; import { R2Conditional, R2Range } from "./bucket"; +export const MAX_KEY_SIZE = 1024; + export interface R2ConditionalUnparsed { etagMatches?: string | string[]; etagDoesNotMatch?: string | string[]; @@ -88,6 +90,10 @@ export class Checksums implements R2Checksums { } } +export interface R2MultipartReference { + uploadId: string; + parts: { partNumber: number; size: number }[]; +} export interface R2ObjectMetadata { // The object’s key. key: string; @@ -110,12 +116,10 @@ export interface R2ObjectMetadata { // Hashes used to check the received object’s integrity. At most one can be // specified. checksums?: R2Checksums; + // If this was a multipart upload, pointer to the constituent parts + multipart?: R2MultipartReference; } -const decoder = new TextDecoder(); - -// NOTE: Incase multipart is ever added to the worker -// refer to https://stackoverflow.com/questions/12186993/what-is-the-algorithm-to-compute-the-amazon-s3-etag-for-a-file-larger-than-5gb/19896823#19896823 export function createMD5Hash(input: Uint8Array): string { return crypto.createHash("md5").update(input).digest("hex"); } @@ -317,16 +321,18 @@ export class R2Object { this.customMetadata = metadata.customMetadata; this.range = metadata.range; - // We always need to store an MD5 hash in `checksums`, but never explicitly - // stored one. Luckily, `R2Bucket#put()` always makes `etag` an MD5 hash. - assert( - metadata.etag.length === 32 && HEX_REGEXP.test(metadata.etag), - "Expected `etag` to be an MD5 hash" - ); - const checksums: R2Checksums = { - md5: metadata.etag, - ...metadata.checksums, - }; + // For non-multipart uploads, we always need to store an MD5 hash in + // `checksums`, but never explicitly stored one. Luckily, `R2Bucket#put()` + // always makes `etag` an MD5 hash. + const checksums: R2Checksums = { ...metadata.checksums }; + if (metadata.multipart === undefined) { + assert( + metadata.etag.length === 32 && HEX_REGEXP.test(metadata.etag), + "Expected `etag` to be an MD5 hash" + ); + checksums.md5 = metadata.etag; + } + this.#checksums = new Checksums(checksums); } @@ -347,19 +353,21 @@ export class R2Object { export class R2ObjectBody extends R2Object { // The object’s value. readonly body: ReadableStream; - // Whether the object’s value has been consumed or not. - readonly bodyUsed: boolean = false; - constructor(metadata: R2ObjectMetadata, value: Uint8Array) { + + constructor( + metadata: R2ObjectMetadata, + value: Uint8Array | ReadableStream + ) { super(metadata); - // To maintain readonly, we build this clever work around to update upon consumption. - const setBodyUsed = (): void => { - (this.bodyUsed as R2ObjectBody["bodyUsed"]) = true; - }; + // Convert value to readable stream if not already + if (value instanceof ReadableStream) { + this.body = value; + return; + } - // convert value to readable stream - this.body = new ReadableStream({ - type: "bytes" as any, + this.body = new ReadableStream({ + type: "bytes", // Delay enqueuing chunk until it's actually requested so we can wait // for the input gate to open before delivering it async pull(controller) { @@ -371,32 +379,39 @@ export class R2ObjectBody extends R2Object { // and notices the end of stream. // @ts-expect-error `byobRequest` has type `undefined` in `@types/node` controller.byobRequest?.respond(0); - setBodyUsed(); }, }); } + get bodyUsed(): boolean { + return _isDisturbedStream(this.body); + } + // Returns a Promise that resolves to an ArrayBuffer containing the object’s value. async arrayBuffer(): Promise { if (this.bodyUsed) throw new TypeError("Body already used."); - // @ts-expect-error ReadableStream is missing properties - return arrayBuffer(this.body); + return consumers.arrayBuffer(this.body); } - // Returns a Promise that resolves to an string containing the object’s value. + // Returns a Promise that resolves to a string containing the object’s value. async text(): Promise { - return decoder.decode(await this.arrayBuffer()); + if (this.bodyUsed) throw new TypeError("Body already used."); + // @ts-expect-error ReadableStream is missing properties + return consumers.text(this.body); } // Returns a Promise that resolves to the given object containing the object’s value. async json(): Promise { - return JSON.parse(await this.text()); + if (this.bodyUsed) throw new TypeError("Body already used."); + // @ts-expect-error ReadableStream is missing properties + return consumers.json(this.body); } // Returns a Promise that resolves to a binary Blob containing the object’s value. async blob(): Promise { - const ab = await this.arrayBuffer(); - return new Blob([new Uint8Array(ab)]); + if (this.bodyUsed) throw new TypeError("Body already used."); + // @ts-expect-error ReadableStream is missing properties + return consumers.blob(this.body); } } diff --git a/packages/r2/test/bucket.spec.ts b/packages/r2/test/bucket.spec.ts index 3c566b170..1a12dc72e 100644 --- a/packages/r2/test/bucket.spec.ts +++ b/packages/r2/test/bucket.spec.ts @@ -91,6 +91,9 @@ const validatesKeyMacro: Macro< instanceOf: Error, message: `R2 ${httpMethod} failed: (414) UTF-8 encoded length of 1025 exceeds key length limit of 1024.`, }); + await t.throwsAsync(func(r2, "__MINIFLARE_INTERNAL__:multipart"), { + message: `R2 ${method.toUpperCase()} failed: (400) Key cannot start with "__MINIFLARE_INTERNAL__".`, + }); }; validatesKeyMacro.title = (providedTitle, method) => `${method}: validates key`; @@ -1988,11 +1991,13 @@ test("list: startAfter: ensure limit and starting position are correct", async ( test("hides implementation details", (t) => { const { r2 } = t.context; t.deepEqual(getObjectProperties(r2), [ + "createMultipartUpload", "delete", "get", "head", "list", "put", + "resumeMultipartUpload", ]); }); test("operations throw outside request handler", async (t) => { diff --git a/packages/r2/test/multipart.spec.ts b/packages/r2/test/multipart.spec.ts new file mode 100644 index 000000000..609c42f78 --- /dev/null +++ b/packages/r2/test/multipart.spec.ts @@ -0,0 +1,902 @@ +import assert from "assert"; +import { Blob } from "buffer"; +import { ReadableStream } from "stream/web"; +import { FixedLengthStream, Request, Response } from "@miniflare/core"; +import { R2Bucket, R2ObjectBody, _INTERNAL_PREFIX } from "@miniflare/r2"; +import { + EXTERNAL_SUBREQUEST_LIMIT_BUNDLED, + RequestContext, + Storage, + viewToBuffer, +} from "@miniflare/shared"; +import { + advancesTime, + getObjectProperties, + testClock, + useTmp, + utf8Encode, + waitsForInputGate, + waitsForOutputGate, +} from "@miniflare/shared-test"; +import { FileStorage } from "@miniflare/storage-file"; +import anyTest, { TestInterface, ThrowsExpectation } from "ava"; + +const PART_SIZE = 50; + +interface Context { + storage: Storage; + r2: R2Bucket; +} + +const test = anyTest as TestInterface; + +test.beforeEach(async (t) => { + const tmp = await useTmp(t); + const storage = new FileStorage(tmp, true, testClock); + const r2 = new R2Bucket(storage, { minMultipartUploadSize: PART_SIZE }); + t.context = { storage, r2 }; +}); + +function objectNameNotValidExpectations(method: string) { + return { + instanceOf: Error, + message: `${method}: The specified object name is not valid. (10020)`, + }; +} +function doesNotExistExpectations(method: string) { + return { + instanceOf: Error, + message: `${method}: The specified multipart upload does not exist. (10024)`, + }; +} +function internalErrorExpectations(method: string) { + return { + instanceOf: Error, + message: `${method}: We encountered an internal error. Please try again. (10001)`, + }; +} + +// Check multipart operations on bucket +test("R2Bucket: createMultipartUpload", async (t) => { + const { r2 } = t.context; + + // Check creates upload + const upload1 = await r2.createMultipartUpload("key", { + customMetadata: { key: "value" }, + httpMetadata: { contentType: "text/plain" }, + }); + t.is(upload1.key, "key"); + t.not(upload1.uploadId, ""); + + // Check creates multiple distinct uploads with different uploadIds for key + const upload2 = await r2.createMultipartUpload("key"); + t.is(upload2.key, "key"); + t.not(upload2.uploadId, ""); + t.not(upload2.uploadId, upload1.uploadId); + + // Check validates key and metadata + // @ts-expect-error intentionally testing incorrect types + await t.throwsAsync(r2.createMultipartUpload(), { + instanceOf: TypeError, + message: + "Failed to execute 'createMultipartUpload' on 'R2Bucket': parameter 1 is not of type 'string'.", + }); + await t.throwsAsync( + r2.createMultipartUpload("x".repeat(1025)), + objectNameNotValidExpectations("createMultipartUpload") + ); + // @ts-expect-error intentionally testing incorrect types + await t.throwsAsync(r2.createMultipartUpload("key", 42), { + instanceOf: TypeError, + message: + "Failed to execute 'createMultipartUpload' on 'R2Bucket': parameter 2 is not of type 'MultipartOptions'.", + }); + // @ts-expect-error intentionally testing incorrect types + await t.throwsAsync(r2.createMultipartUpload("key", { customMetadata: 42 }), { + instanceOf: TypeError, + message: + "Incorrect type for the 'customMetadata' field on 'MultipartOptions': the provided value is not of type 'object'.", + }); + // @ts-expect-error intentionally testing incorrect types + await t.throwsAsync(r2.createMultipartUpload("key", { httpMetadata: 42 }), { + instanceOf: TypeError, + message: + "Incorrect type for the 'httpMetadata' field on 'MultipartOptions': the provided value is not of type 'HttpMetadata or Headers'.", + }); + + // Check coerces key to string + // @ts-expect-error intentionally testing incorrect types + let upload = await r2.createMultipartUpload(42); + t.is(upload.key, "42"); + // @ts-expect-error intentionally testing incorrect types + upload = await r2.createMultipartUpload(undefined); + t.is(upload.key, "undefined"); +}); +test("R2Bucket: resumeMultipartUpload", async (t) => { + const { r2 } = t.context; + + // Check creates upload object with correct key and uploadId + let upload = await r2.resumeMultipartUpload("key", "upload"); + t.is(upload.key, "key"); + t.is(upload.uploadId, "upload"); + + // Check validates key and uploadId provided, but not key length + // @ts-expect-error intentionally testing incorrect types + await t.throwsAsync(r2.resumeMultipartUpload(), { + instanceOf: TypeError, + message: + "Failed to execute 'resumeMultipartUpload' on 'R2Bucket': parameter 1 is not of type 'string'.", + }); + // @ts-expect-error intentionally testing incorrect types + await t.throwsAsync(r2.resumeMultipartUpload("key"), { + instanceOf: TypeError, + message: + "Failed to execute 'resumeMultipartUpload' on 'R2Bucket': parameter 2 is not of type 'string'.", + }); + upload = await r2.resumeMultipartUpload("x".repeat(1025), "upload"); + t.is(upload.key, "x".repeat(1025)); + + // Check coerces key and uploadId to string + // @ts-expect-error intentionally testing incorrect types + upload = await r2.resumeMultipartUpload(1, 2); + t.is(upload.key, "1"); + t.is(upload.uploadId, "2"); + // @ts-expect-error intentionally testing incorrect types + upload = await r2.resumeMultipartUpload(undefined, undefined); + t.is(upload.key, "undefined"); + t.is(upload.uploadId, "undefined"); +}); + +// Check operations on upload objects +test("R2MultipartUpload: uploadPart", async (t) => { + const { storage, r2 } = t.context; + + // Check uploads parts of all types + const upload = await r2.createMultipartUpload("key"); + const part1 = await upload.uploadPart(1, "value1"); + t.is(part1.partNumber, 1); + t.not(part1.etag, ""); + const part2 = await upload.uploadPart(2, utf8Encode("value2")); + t.is(part2.partNumber, 2); + t.not(part2.etag, ""); + t.not(part2.etag, part1.etag); + await upload.uploadPart(3, viewToBuffer(utf8Encode("value3"))); + await upload.uploadPart(4, new Blob(["value4"])); + + // Check requires known-length stream + const { readable, writable } = new FixedLengthStream(6); + const writer = writable.getWriter(); + // noinspection ES6MissingAwait + void writer.write(utf8Encode("value5")); + // noinspection ES6MissingAwait + void writer.close(); + const request = new Request("http://localhost", { + method: "POST", + body: "value6", + }); + const response = new Response("value7"); + assert(request.body !== null && response.body !== null); + await upload.uploadPart(5, readable); + await upload.uploadPart(6, request.body); + await upload.uploadPart(7, response.body); + const unknownLengthReadable = new ReadableStream({ + type: "bytes", + pull(controller) { + controller.enqueue(utf8Encode("chunk")); + controller.close(); + }, + }); + await t.throwsAsync(upload.uploadPart(1, unknownLengthReadable), { + instanceOf: TypeError, + message: + "Provided readable stream must have a known length (request/response body or readable half of FixedLengthStream)", + }); + + const partKey = (part: number) => + `${_INTERNAL_PREFIX}:multipart:${upload.uploadId}:key:${part}`; + const value1 = await storage.get(partKey(1)); + const value2 = await storage.get(partKey(2)); + const value3 = await storage.get(partKey(3)); + const value4 = await storage.get(partKey(4)); + const value5 = await storage.get(partKey(5)); + const value6 = await storage.get(partKey(6)); + const value7 = await storage.get(partKey(7)); + t.deepEqual(value1?.value, utf8Encode("value1")); + t.deepEqual(value2?.value, utf8Encode("value2")); + t.deepEqual(value3?.value, utf8Encode("value3")); + t.deepEqual(value4?.value, utf8Encode("value4")); + t.deepEqual(value5?.value, utf8Encode("value5")); + t.deepEqual(value6?.value, utf8Encode("value6")); + t.deepEqual(value7?.value, utf8Encode("value7")); + + // Check upload part with same part number and same value + const part1b = await upload.uploadPart(1, "value1"); + t.is(part1b.partNumber, 1); + t.not(part1b.etag, part1.etag); + // Check upload part with different part number but same value + const part100 = await upload.uploadPart(100, "value1"); + t.is(part100.partNumber, 100); + t.not(part100.etag, part1.etag); + + // Check validates key and uploadId + let expectations = doesNotExistExpectations("uploadPart"); + let nonExistentUpload = await r2.resumeMultipartUpload("key", "bad"); + await t.throwsAsync(nonExistentUpload.uploadPart(1, "value"), expectations); + nonExistentUpload = await r2.resumeMultipartUpload("badkey", upload.uploadId); + await t.throwsAsync(nonExistentUpload.uploadPart(1, "value"), expectations); + expectations = objectNameNotValidExpectations("uploadPart"); + nonExistentUpload = await r2.resumeMultipartUpload("x".repeat(1025), "bad"); + await t.throwsAsync(nonExistentUpload.uploadPart(1, "value"), expectations); + + // Check validates part number (before key and uploadId) + // @ts-expect-error intentionally testing incorrect types + await t.throwsAsync(upload.uploadPart(), { + instanceOf: TypeError, + message: + "Failed to execute 'uploadPart' on 'R2MultipartUpload': parameter 1 is not of type 'integer'.", + }); + // @ts-expect-error intentionally testing incorrect types + await t.throwsAsync(upload.uploadPart(undefined), { + instanceOf: TypeError, + message: + "Failed to execute 'uploadPart' on 'R2MultipartUpload': parameter 2 is not of type 'ReadableStream or ArrayBuffer or ArrayBufferView or string or Blob'.", + }); + // @ts-expect-error intentionally testing incorrect types + await t.throwsAsync(upload.uploadPart(undefined, "value"), { + instanceOf: TypeError, + message: + "Part number must be between 1 and 10000 (inclusive). Actual value was: 0", + }); + // @ts-expect-error intentionally testing incorrect types + await t.throwsAsync(upload.uploadPart("-42", "value"), { + instanceOf: TypeError, + message: + "Part number must be between 1 and 10000 (inclusive). Actual value was: -42", + }); + await t.throwsAsync(upload.uploadPart(NaN, "value"), { + instanceOf: TypeError, + message: + "Part number must be between 1 and 10000 (inclusive). Actual value was: 0", + }); + await t.throwsAsync(upload.uploadPart(0, "value"), { + instanceOf: TypeError, + message: + "Part number must be between 1 and 10000 (inclusive). Actual value was: 0", + }); + await t.throwsAsync(upload.uploadPart(10001, "value"), { + instanceOf: TypeError, + message: + "Part number must be between 1 and 10000 (inclusive). Actual value was: 10001", + }); + await t.throwsAsync(nonExistentUpload.uploadPart(0, "value"), { + instanceOf: TypeError, + message: + "Part number must be between 1 and 10000 (inclusive). Actual value was: 0", + }); + + // Check validates value type + expectations = { + instanceOf: TypeError, + message: + "Failed to execute 'uploadPart' on 'R2MultipartUpload': parameter 2 is not of type 'ReadableStream or ArrayBuffer or ArrayBufferView or string or Blob'.", + }; + // @ts-expect-error intentionally testing incorrect types + await t.throwsAsync(upload.uploadPart(1), expectations); + // @ts-expect-error intentionally testing incorrect types + await t.throwsAsync(upload.uploadPart(1, undefined), expectations); + // @ts-expect-error intentionally testing incorrect types + await t.throwsAsync(upload.uploadPart(1, null), expectations); + // @ts-expect-error intentionally testing incorrect types + await t.throwsAsync(upload.uploadPart(1, 42), expectations); + // @ts-expect-error intentionally testing incorrect types + await t.throwsAsync(upload.uploadPart(1, [1, [2, 3]]), expectations); +}); +test("R2MultipartUpload: abort", async (t) => { + const { storage, r2 } = t.context; + + // Check deletes upload and all parts for corresponding upload + const upload1 = await r2.createMultipartUpload("key"); + const upload2 = await r2.createMultipartUpload("key"); + await upload1.uploadPart(1, "value1"); + await upload1.uploadPart(2, "value2"); + await upload1.uploadPart(3, "value3"); + let { keys } = await storage.list(); + t.is(keys.length, 2 /* uploads */ + 3 /* parts */); + await upload1.abort(); + ({ keys } = await storage.list()); + // upload1 kept after abort to ensure aborting already aborted doesn't throw + t.is(keys.length, 2 /* uploads */); + const keySet = new Set(keys.map(({ name }) => name)); + t.true( + keySet.has(`${_INTERNAL_PREFIX}:multipart:${upload1.uploadId}:key:index`) + ); + t.true( + keySet.has(`${_INTERNAL_PREFIX}:multipart:${upload2.uploadId}:key:index`) + ); + + // Check cannot upload after abort + let expectations = doesNotExistExpectations("uploadPart"); + await t.throwsAsync(upload1.uploadPart(4, "value4"), expectations); + + // Check can abort already aborted upload + await upload1.abort(); + + // Check can abort already completed upload + const part1 = await upload2.uploadPart(1, "value1"); + await upload2.complete([part1]); + await upload2.abort(); + t.is(await (await r2.get("key"))?.text(), "value1"); + + // Check validates key and uploadId + const upload3 = await r2.createMultipartUpload("key"); + // Note this is internalErrorExpectations, not doesNotExistExpectations + expectations = internalErrorExpectations("abortMultipartUpload"); + let nonExistentUpload = await r2.resumeMultipartUpload("key", "bad"); + await t.throwsAsync(nonExistentUpload.abort(), expectations); + nonExistentUpload = await r2.resumeMultipartUpload("bad", upload3.uploadId); + await t.throwsAsync(nonExistentUpload.abort(), expectations); + expectations = objectNameNotValidExpectations("abortMultipartUpload"); + nonExistentUpload = await r2.resumeMultipartUpload("x".repeat(1025), "bad"); + await t.throwsAsync(nonExistentUpload.abort(), expectations); +}); +test("R2MultipartUpload: complete", async (t) => { + const { storage, r2 } = t.context; + + // Check creates regular key with correct metadata, and returns object + const upload1 = await r2.createMultipartUpload("key", { + customMetadata: { key: "value" }, + httpMetadata: { contentType: "text/plain" }, + }); + const upload2 = await r2.createMultipartUpload("key"); + let part1 = await upload1.uploadPart(1, "1".repeat(PART_SIZE)); + let part2 = await upload1.uploadPart(2, "2".repeat(PART_SIZE)); + let part3 = await upload1.uploadPart(3, "3"); + let object = await upload1.complete([part1, part2, part3]); + t.is(object.key, "key"); + t.not(object.version, ""); + t.is(object.size, 2 * PART_SIZE + 1); + t.is(object.etag, "3b676245e58d988dc75f80c0c27a9645-3"); + t.is(object.httpEtag, '"3b676245e58d988dc75f80c0c27a9645-3"'); + t.is(object.range, undefined); + t.deepEqual(object.checksums.toJSON(), {}); + t.deepEqual(object.customMetadata, { key: "value" }); + t.deepEqual(object.httpMetadata, { contentType: "text/plain" }); + let { keys } = await storage.list(); + t.is(keys.length, 2 /* uploads */ + 3 /* parts */ + 1 /* complete */); + let objectBody = await r2.get("key"); + t.is( + await objectBody?.text(), + `${"1".repeat(PART_SIZE)}${"2".repeat(PART_SIZE)}3` + ); + + // Check requires all but last part to be greater than 5MB + part1 = await upload2.uploadPart(1, "1"); + part2 = await upload2.uploadPart(2, "2"); + part3 = await upload2.uploadPart(3, "3"); + const sizeExpectations: ThrowsExpectation = { + instanceOf: Error, + message: + "completeMultipartUpload: Your proposed upload is smaller than the minimum allowed object size.", + }; + await t.throwsAsync( + upload2.complete([part1, part2, part3]), + sizeExpectations + ); + await t.throwsAsync(upload2.complete([part1, part2]), sizeExpectations); + object = await upload2.complete([part1]); + t.is(object.size, 1); + t.is(object.etag, "46d1741e8075da4ac72c71d8130fcb71-1"); + + // Check completing multiple uploads overrides existing, deleting all parts + ({ keys } = await storage.list()); + t.is(keys.length, 2 /* uploads */ + 1 /* part */ + 1 /* complete */); + const keySet = new Set(keys.map(({ name }) => name)); + t.true( + keySet.has(`${_INTERNAL_PREFIX}:multipart:${upload1.uploadId}:key:index`) + ); + t.true( + keySet.has(`${_INTERNAL_PREFIX}:multipart:${upload2.uploadId}:key:index`) + ); + t.true(keySet.has(`${_INTERNAL_PREFIX}:multipart:${upload2.uploadId}:key:1`)); + t.true(keySet.has("key")); + objectBody = await r2.get("key"); + t.is(await objectBody?.text(), "1"); + + // Check completing with overridden part + const upload3 = await r2.createMultipartUpload("key"); + let part1a = await upload3.uploadPart(1, "value"); + let part1b = await upload3.uploadPart(1, "value"); + t.is(part1a.partNumber, part1b.partNumber); + t.not(part1a.etag, part1b.etag); + const notFoundExpectations: ThrowsExpectation = { + instanceOf: Error, + message: + "completeMultipartUpload: One or more of the specified parts could not be found. (10025)", + }; + await t.throwsAsync(upload3.complete([part1a]), notFoundExpectations); + object = await upload3.complete([part1b]); + t.is(object.size, 5); + + // Check completing with multiple parts of same part number + const upload4 = await r2.createMultipartUpload("key"); + part1a = await upload4.uploadPart(1, "1".repeat(PART_SIZE)); + part1b = await upload4.uploadPart(1, "2".repeat(PART_SIZE)); + const part1c = await upload4.uploadPart(1, "3".repeat(PART_SIZE)); + await t.throwsAsync( + upload4.complete([part1a, part1b, part1c]), + internalErrorExpectations("completeMultipartUpload") + ); + + // Check completing with out-of-order parts + const upload5a = await r2.createMultipartUpload("key"); + part1 = await upload5a.uploadPart(1, "1".repeat(PART_SIZE)); + part2 = await upload5a.uploadPart(2, "2".repeat(PART_SIZE)); + part3 = await upload5a.uploadPart(3, "3".repeat(PART_SIZE)); + object = await upload5a.complete([part2, part3, part1]); + t.is(object.size, 3 * PART_SIZE); + t.is(object.etag, "f1115cc5564e7e0b25bbd87d95c72c86-3"); + objectBody = await r2.get("key"); + t.is( + await objectBody?.text(), + `${"1".repeat(PART_SIZE)}${"2".repeat(PART_SIZE)}${"3".repeat(PART_SIZE)}` + ); + const upload5b = await r2.createMultipartUpload("key"); + part1 = await upload5b.uploadPart(1, "1"); + part2 = await upload5b.uploadPart(2, "2".repeat(PART_SIZE)); + part3 = await upload5b.uploadPart(3, "3".repeat(PART_SIZE)); + // Check part size checking happens in argument order (part1's size isn't + // checked until too late, as it's the last argument so ignored...) + await t.throwsAsync(upload5b.complete([part2, part3, part1]), { + instanceOf: Error, + message: + "completeMultipartUpload: There was a problem with the multipart upload. (10048)", + }); + const upload5c = await r2.createMultipartUpload("key"); + part1 = await upload5c.uploadPart(1, "1".repeat(PART_SIZE)); + part2 = await upload5c.uploadPart(2, "2".repeat(PART_SIZE)); + part3 = await upload5c.uploadPart(3, "3"); + // (...but here, part3 isn't the last argument, so get a regular size error) + await t.throwsAsync( + upload5c.complete([part2, part3, part1]), + sizeExpectations + ); + + // Check completing with missing parts + const upload6 = await r2.createMultipartUpload("key"); + part2 = await upload6.uploadPart(2, "2".repeat(PART_SIZE)); + const part5 = await upload6.uploadPart(5, "5".repeat(PART_SIZE)); + const part9 = await upload6.uploadPart(9, "9".repeat(PART_SIZE)); + object = await upload6.complete([part2, part5, part9]); + t.is(object.size, 3 * PART_SIZE); + t.is(object.etag, "471d773597286301a10c61cd8c84e659-3"); + objectBody = await r2.get("key"); + t.is( + await objectBody?.text(), + `${"2".repeat(PART_SIZE)}${"5".repeat(PART_SIZE)}${"9".repeat(PART_SIZE)}` + ); + + // Check completing with no parts + const upload7 = await r2.createMultipartUpload("key"); + object = await upload7.complete([]); + t.is(object.size, 0); + t.is(object.etag, "d41d8cd98f00b204e9800998ecf8427e-0"); + objectBody = await r2.get("key"); + t.is(await objectBody?.text(), ""); + + // Check cannot complete with parts from another upload + const upload8a = await r2.createMultipartUpload("key"); + const upload8b = await r2.createMultipartUpload("key"); + part1 = await upload8b.uploadPart(1, "value"); + await t.throwsAsync(upload8a.complete([part1]), notFoundExpectations); + + // Check cannot complete already completed upload + const upload9 = await r2.createMultipartUpload("key"); + part1 = await upload9.uploadPart(1, "value"); + await upload9.complete([part1]); + await t.throwsAsync( + upload9.complete([part1]), + doesNotExistExpectations("completeMultipartUpload") + ); + + // Check cannot complete aborted upload + const upload10 = await r2.createMultipartUpload("key"); + part1 = await upload10.uploadPart(1, "value"); + await upload10.abort(); + await t.throwsAsync( + upload10.complete([part1]), + internalErrorExpectations("completeMultipartUpload") + ); + + // Check validates key and uploadId + const upload11 = await r2.createMultipartUpload("key"); + // Note this is internalErrorExpectations, not doesNotExistExpectations + let expectations = internalErrorExpectations("completeMultipartUpload"); + let nonExistentUpload = await r2.resumeMultipartUpload("key", "bad"); + await t.throwsAsync(nonExistentUpload.complete([]), expectations); + nonExistentUpload = await r2.resumeMultipartUpload( + "badkey", + upload11.uploadId + ); + await t.throwsAsync(nonExistentUpload.complete([]), expectations); + expectations = objectNameNotValidExpectations("completeMultipartUpload"); + nonExistentUpload = await r2.resumeMultipartUpload("x".repeat(1025), "bad"); + await t.throwsAsync(nonExistentUpload.complete([]), expectations); + + // Check validates uploaded parts + const upload12 = await r2.createMultipartUpload("key"); + // @ts-expect-error intentionally testing incorrect types + await t.throwsAsync(upload12.complete(), { + instanceOf: TypeError, + message: + "Failed to execute 'complete' on 'R2MultipartUpload': parameter 1 is not of type 'Array'.", + }); + // @ts-expect-error intentionally testing incorrect types + await t.throwsAsync(upload12.complete(42), { + instanceOf: TypeError, + message: + "Failed to execute 'complete' on 'R2MultipartUpload': parameter 1 is not of type 'Array'.", + }); + // @ts-expect-error intentionally testing incorrect types + await t.throwsAsync(upload12.complete(["not a part"]), { + instanceOf: TypeError, + message: + "Incorrect type for array element 0: the provided value is not of type 'UploadedPart'.", + }); + // @ts-expect-error intentionally testing incorrect types + await t.throwsAsync(upload12.complete([{}]), { + instanceOf: TypeError, + message: + "Part number must be between 1 and 10000 (inclusive). Actual value was: 0", + }); + // @ts-expect-error intentionally testing incorrect types + await t.throwsAsync(upload12.complete([{ etag: "" }]), { + instanceOf: TypeError, + message: + "Part number must be between 1 and 10000 (inclusive). Actual value was: 0", + }); + await t.throwsAsync(upload12.complete([{ partNumber: 0, etag: "" }]), { + instanceOf: TypeError, + message: + "Part number must be between 1 and 10000 (inclusive). Actual value was: 0", + }); + await t.throwsAsync( + // @ts-expect-error intentionally testing incorrect types + upload12.complete([{ partNumber: 1 }]), + notFoundExpectations + ); + + // Check coerces uploaded part partNumber and etag + part1 = await upload12.uploadPart(1, "1".repeat(PART_SIZE)); + part2 = await upload12.uploadPart(2, "2".repeat(PART_SIZE)); + object = await upload12.complete([ + // @ts-expect-error intentionally testing incorrect types + { partNumber: String(part1.partNumber), etag: part1.etag }, + // @ts-expect-error intentionally testing incorrect types + { partNumber: part2.partNumber, etag: [part2.etag] }, + ]); + t.is(object.size, 2 * PART_SIZE); + t.is(object.etag, "2ccbacaf03d9cb4e5a1bdd692fae289a-2"); + objectBody = await r2.get("key"); + t.is( + await objectBody?.text(), + `${"1".repeat(PART_SIZE)}${"2".repeat(PART_SIZE)}` + ); + + // Check requires all but last part to have same size + const upload13 = await r2.createMultipartUpload("key"); + part1 = await upload13.uploadPart(1, "1".repeat(PART_SIZE)); + part2 = await upload13.uploadPart(2, "2".repeat(PART_SIZE + 1)); + part3 = await upload13.uploadPart(3, "3".repeat(PART_SIZE)); + expectations = { + instanceOf: Error, + message: + "completeMultipartUpload: There was a problem with the multipart upload. (10048)", + }; + await t.throwsAsync(upload13.complete([part1, part2, part3]), expectations); + part2 = await upload13.uploadPart(2, "2".repeat(PART_SIZE)); + // Check allows last part to have different size, only if <= others + part3 = await upload13.uploadPart(3, "3".repeat(PART_SIZE + 1)); + await t.throwsAsync(upload13.complete([part1, part2, part3]), expectations); + part3 = await upload13.uploadPart(3, "3".repeat(PART_SIZE - 1)); + object = await upload13.complete([part1, part2, part3]); + t.is(object.size, 3 * PART_SIZE - 1); +}); + +// Check regular operations on buckets with existing multipart keys +test("R2Bucket: multipart head", async (t) => { + const { r2 } = t.context; + + // Check returns nothing for in-progress multipart upload + const upload = await r2.createMultipartUpload("key", { + customMetadata: { key: "value" }, + httpMetadata: { contentType: "text/plain" }, + }); + const part1 = await upload.uploadPart(1, "1".repeat(PART_SIZE)); + const part2 = await upload.uploadPart(2, "2".repeat(PART_SIZE)); + const part3 = await upload.uploadPart(3, "3".repeat(PART_SIZE)); + t.is(await r2.head("key"), null); + + // Check returns metadata for completed upload + const completed = await upload.complete([part1, part2, part3]); + const object = await r2.head("key"); + t.is(object?.key, "key"); + t.is(object?.version, completed.version); + t.is(object?.size, 3 * PART_SIZE); + t.is(object?.etag, "f1115cc5564e7e0b25bbd87d95c72c86-3"); + t.is(object?.httpEtag, '"f1115cc5564e7e0b25bbd87d95c72c86-3"'); + t.is(object?.range, undefined); + t.deepEqual(object?.checksums.toJSON(), {}); + t.deepEqual(object?.customMetadata, { key: "value" }); + t.deepEqual(object?.httpMetadata, { contentType: "text/plain" }); +}); +test("R2Bucket: multipart get", async (t) => { + const { r2 } = t.context; + + // Check returns nothing for in-progress multipart upload + const upload = await r2.createMultipartUpload("key", { + customMetadata: { key: "value" }, + httpMetadata: { contentType: "text/plain" }, + }); + const part1 = await upload.uploadPart(1, "a".repeat(PART_SIZE)); + const part2 = await upload.uploadPart(2, "b".repeat(PART_SIZE)); + const part3 = await upload.uploadPart(3, "c".repeat(PART_SIZE)); + t.is(await r2.get("key"), null); + + // Check returns metadata and value for completed upload + const completed = await upload.complete([part1, part2, part3]); + let object = await r2.get("key"); + t.is(object?.key, "key"); + t.is(object?.version, completed.version); + t.is(object?.size, 3 * PART_SIZE); + t.is(object?.etag, "d63a28fd44cfddc0215c8da47e582eb7-3"); + t.is(object?.httpEtag, '"d63a28fd44cfddc0215c8da47e582eb7-3"'); + t.deepEqual(object?.range, { offset: 0, length: 3 * PART_SIZE }); + t.deepEqual(object?.checksums.toJSON(), {}); + t.deepEqual(object?.customMetadata, { key: "value" }); + t.deepEqual(object?.httpMetadata, { contentType: "text/plain" }); + t.is( + await object?.text(), + `${"a".repeat(PART_SIZE)}${"b".repeat(PART_SIZE)}${"c".repeat(PART_SIZE)}` + ); + + // Check ranged get accessing single part + const halfPartSize = Math.floor(PART_SIZE / 2); + const quarterPartSize = Math.floor(PART_SIZE / 4); + object = (await r2.get("key", { + range: { offset: halfPartSize, length: quarterPartSize }, + })) as R2ObjectBody | null; + t.is(await object?.text(), "a".repeat(quarterPartSize)); + // Check ranged get accessing multiple parts + object = (await r2.get("key", { + range: { + offset: halfPartSize, + length: halfPartSize + PART_SIZE + quarterPartSize, + }, + })) as R2ObjectBody | null; + t.is( + await object?.text(), + `${"a".repeat(halfPartSize)}${"b".repeat(PART_SIZE)}${"c".repeat( + quarterPartSize + )}` + ); + // Check ranged get of suffix + object = (await r2.get("key", { + range: { suffix: quarterPartSize + PART_SIZE }, + })) as R2ObjectBody | null; + t.is( + await object?.text(), + `${"b".repeat(quarterPartSize)}${"c".repeat(PART_SIZE)}` + ); +}); +test("R2Bucket: multipart put", async (t) => { + const { storage, r2 } = t.context; + + // Check doesn't overwrite parts for in-progress multipart upload + const upload = await r2.createMultipartUpload("key"); + const part1 = await upload.uploadPart(1, "1".repeat(PART_SIZE)); + const part2 = await upload.uploadPart(2, "2".repeat(PART_SIZE)); + const part3 = await upload.uploadPart(3, "3".repeat(PART_SIZE)); + await r2.put("key", "value"); + + const partKey = (part: number | "index") => + `${_INTERNAL_PREFIX}:multipart:${upload.uploadId}:key:${part}`; + + let { keys } = await storage.list(); + t.is(keys.length, 1 /* upload */ + 3 /* parts */ + 1 /* put */); + let keySet = new Set(keys.map(({ name }) => name)); + // noinspection DuplicatedCode + t.true(keySet.has(partKey("index"))); + t.true(keySet.has(partKey(1))); + t.true(keySet.has(partKey(2))); + t.true(keySet.has(partKey(3))); + t.true(keySet.has("key")); + + const object = await upload.complete([part1, part2, part3]); + t.is(object.size, 3 * PART_SIZE); + ({ keys } = await storage.list()); + t.is(keys.length, 1 /* upload */ + 3 /* parts */ + 1 /* completed */); + keySet = new Set(keys.map(({ name }) => name)); + // noinspection DuplicatedCode + t.true(keySet.has(partKey("index"))); + t.true(keySet.has(partKey(1))); + t.true(keySet.has(partKey(2))); + t.true(keySet.has(partKey(3))); + t.true(keySet.has("key")); + + // Check overwrites all multipart parts of completed upload + await r2.put("key", "new-value"); + ({ keys } = await storage.list()); + t.is(keys.length, 1 /* upload */ + 1 /* put */); + keySet = new Set(keys.map(({ name }) => name)); + t.true(keySet.has(partKey("index"))); + t.true(keySet.has("key")); +}); +test("R2Bucket: multipart delete deletes all parts", async (t) => { + const { storage, r2 } = t.context; + + // Check doesn't remove parts for in-progress multipart upload + const upload = await r2.createMultipartUpload("key"); + const part1 = await upload.uploadPart(1, "1".repeat(PART_SIZE)); + const part2 = await upload.uploadPart(2, "2".repeat(PART_SIZE)); + const part3 = await upload.uploadPart(3, "3".repeat(PART_SIZE)); + await r2.delete("key"); + + // Check removes all multipart parts of completed upload + const object = await upload.complete([part1, part2, part3]); + t.is(object.size, 3 * PART_SIZE); + await r2.delete("key"); + + const { keys } = await storage.list(); + t.is(keys.length, 1 /* upload */); + t.is( + keys[0].name, + `${_INTERNAL_PREFIX}:multipart:${upload.uploadId}:key:index` + ); +}); +test("R2Bucket: multipart list returns single entry", async (t) => { + const { r2 } = t.context; + + // Check returns nothing for in-progress multipart upload + const upload = await r2.createMultipartUpload("key", { + customMetadata: { key: "value" }, + httpMetadata: { contentType: "text/plain" }, + }); + const part1 = await upload.uploadPart(1, "x".repeat(PART_SIZE)); + const part2 = await upload.uploadPart(2, "y".repeat(PART_SIZE)); + const part3 = await upload.uploadPart(3, "z".repeat(PART_SIZE)); + let { objects } = await r2.list({ + include: ["httpMetadata", "customMetadata"], + }); + t.is(objects.length, 0); + + // Check returns metadata for completed upload + const completed = await upload.complete([part1, part2, part3]); + ({ objects } = await r2.list({ + include: ["httpMetadata", "customMetadata"], + })); + t.is(objects.length, 1); + const object = objects[0]; + t.is(object?.key, "key"); + t.is(object?.version, completed.version); + t.is(object?.size, 3 * PART_SIZE); + t.is(object?.etag, "9f4271a2af6d83c1d3fef1cc6d170f9f-3"); + t.is(object?.httpEtag, '"9f4271a2af6d83c1d3fef1cc6d170f9f-3"'); + t.is(object?.range, undefined); + t.deepEqual(object?.checksums.toJSON(), {}); + t.deepEqual(object?.customMetadata, { key: "value" }); + t.deepEqual(object?.httpMetadata, { contentType: "text/plain" }); +}); + +test("R2MultipartUpload: fields included with JSON.stringify and readonly", async (t) => { + const { r2 } = t.context; + const upload = await r2.createMultipartUpload("key"); + t.deepEqual(JSON.parse(JSON.stringify(upload)), { + key: upload.key, + uploadId: upload.uploadId, + }); + // @ts-expect-error intentionally testing incorrect types + // noinspection JSConstantReassignment + t.throws(() => (upload.key = "new"), { + instanceOf: TypeError, + message: + "Cannot assign to read only property 'key' of object '#'", + }); + // @ts-expect-error intentionally testing incorrect types + // noinspection JSConstantReassignment + t.throws(() => (upload.uploadId = "new"), { + instanceOf: TypeError, + message: + "Cannot assign to read only property 'uploadId' of object '#'", + }); +}); +test("R2MultipartUpload: hides implementation details", async (t) => { + const { r2 } = t.context; + const upload = await r2.createMultipartUpload("key"); + t.deepEqual(getObjectProperties(upload), [ + "abort", + "complete", + "key", + "uploadId", + "uploadPart", + ]); +}); + +test("R2Bucket/R2MultipartUpload: waits for appropriate input/output gates", async (t) => { + const { r2 } = t.context; + + // Check createMultipartUpload() waits for input gate to open before resolving + // (no need to wait for output gate here as createMultipartUpload() isn't + // externally observable: we don't know the `uploadId` before this resolves) + await waitsForInputGate(t, () => r2.createMultipartUpload("key")); + + // (resumeMultipartUpload() doesn't make subrequests, so doesn't need to wait + // for the input gate to open before resolving) + + // Check uploadPart() waits for input gate to open before resolving + // (no need to wait for output gate here as uploadPart() isn't externally + // observable: we don't know the `etag` before this resolves) + let upload = await r2.createMultipartUpload("key"); + let part = await waitsForInputGate(t, () => upload.uploadPart(1, "value")); + + // Check complete() waits for output gate to open before storing + await waitsForOutputGate( + t, + () => upload.complete([part]), + () => r2.head("key") + ); + // Check complete() waits for input gate to open before resolving + upload = await r2.createMultipartUpload("key"); + part = await upload.uploadPart(1, "value"); + await waitsForInputGate(t, () => upload.complete([part])); + + // Check abort() waits for input gate to open before resolving + // (no need to wait for output gate here as abort() isn't externally + // observable: just deletes hidden pending parts) + const upload2 = await r2.createMultipartUpload("key"); + await waitsForInputGate(t, () => upload2.abort()); + // ...even when aborting already completed upload + await waitsForInputGate(t, () => upload.abort()); +}); +test("R2Bucket/R2MultipartUpload: operations throw outside request handler", async (t) => { + const tmp = await useTmp(t); + const storage = new FileStorage(tmp, true, testClock); + const r2 = new R2Bucket(storage, { blockGlobalAsyncIO: true }); + const ctx = new RequestContext({ + externalSubrequestLimit: EXTERNAL_SUBREQUEST_LIMIT_BUNDLED, + }); + + const expectations: ThrowsExpectation = { + instanceOf: Error, + message: /^Some functionality, such as asynchronous I\/O/, + }; + await t.throwsAsync(r2.createMultipartUpload("key"), expectations); + // (resumeMultipartUpload() doesn't make any "network" calls, so can be called + // outside a request context) + await r2.resumeMultipartUpload("key", "upload"); + + t.is(ctx.internalSubrequests, 0); + const upload = await ctx.runWith(() => r2.createMultipartUpload("key")); + t.is(ctx.internalSubrequests, 1); + await t.throwsAsync(upload.uploadPart(1, "value"), expectations); + await t.throwsAsync(upload.complete([]), expectations); + await t.throwsAsync(upload.abort(), expectations); + + const part1 = await ctx.runWith(() => upload.uploadPart(1, "value")); + t.is(ctx.internalSubrequests, 2); + await ctx.runWith(() => upload.complete([part1])); + t.is(ctx.internalSubrequests, 3); + await ctx.runWith(() => upload.abort()); + t.is(ctx.internalSubrequests, 4); +}); +test("R2Bucket/R2MultipartUpload: operations advance current time", async (t) => { + const { r2 } = t.context; + const upload = await advancesTime(t, () => r2.createMultipartUpload("key")); + // (resumeMultipartUpload() doesn't make any "network" calls, so shouldn't + // advance the current time) + + const part1 = await advancesTime(t, () => upload.uploadPart(1, "value")); + await advancesTime(t, () => upload.complete([part1])); + await advancesTime(t, () => upload.abort()); + const upload2 = await r2.createMultipartUpload("key"); + await advancesTime(t, () => upload2.abort()); +}); diff --git a/packages/shared-test/src/asserts.ts b/packages/shared-test/src/asserts.ts index 1b34fff1d..b22814e9e 100644 --- a/packages/shared-test/src/asserts.ts +++ b/packages/shared-test/src/asserts.ts @@ -54,14 +54,15 @@ export function getObjectProperties(obj: T): string[] { .sort(); } -export async function advancesTime( +export async function advancesTime( t: ExecutionContext, - closure: () => Promise + closure: () => Promise ) { const ctx = new RequestContext(); const previous = ctx.currentTime; await setTimeout(50); t.is(ctx.currentTime, previous); - await ctx.runWith(closure); + const result = await ctx.runWith(closure); t.not(ctx.currentTime, previous); + return result; } diff --git a/packages/shared-test/src/storage/list.ts b/packages/shared-test/src/storage/list.ts index ed9e0bbc1..f725bf5c3 100644 --- a/packages/shared-test/src/storage/list.ts +++ b/packages/shared-test/src/storage/list.ts @@ -54,7 +54,7 @@ export const listStartMacro = listMacro( ); export const listEndMacro = listMacro( - " lists keys ending at end exclusive", + "lists keys ending at end exclusive", [["section1key1", "section1key2"]], { end: "section2key1" } ); @@ -104,6 +104,12 @@ export const listPrefixMacro = listMacro( { prefix: "section2" } ); +export const listExcludePrefixMacro = listMacro( + "lists keys not matching excludePrefix", + [["section1key1", "section1key2", "section3key1", "section3key2"]], + { excludePrefix: "section2" } +); + export const listCombinationMacro = listMacro( "paginates keys with start, limit and prefix in reverse", [ @@ -113,6 +119,15 @@ export const listCombinationMacro = listMacro( { start: "section2", prefix: "section", limit: 2, reverse: true } ); +export const listCombination2Macro = listMacro( + "paginates keys with limit, prefix and excludePrefix in reverse", + [ + ["section2key2", "section2key1"], + ["section1key2", "section1key1"], + ], + { prefix: "section", excludePrefix: "section3", limit: 2, reverse: true } +); + export const listStartAfterAllMacro = listMacro( "returns empty list with start after all", [[]], diff --git a/packages/shared/src/storage.ts b/packages/shared/src/storage.ts index 596f0cbf7..2f1a8ac19 100644 --- a/packages/shared/src/storage.ts +++ b/packages/shared/src/storage.ts @@ -37,11 +37,42 @@ export interface Range { length?: number; suffix?: number; } +export interface ParsedRange { + offset: number; + length: number; +} +export function parseRange( + { offset, length, suffix }: Range, + size: number +): ParsedRange { + if (suffix !== undefined) { + if (suffix <= 0) { + throw new Error("Suffix must be > 0"); + } + if (suffix > size) suffix = size; + offset = size - suffix; + length = size - offset; + } + if (offset === undefined) offset = 0; + if (length === undefined) length = size - offset; + + // If offset is negative or greater than size, throw an error + if (offset < 0) throw new Error("Offset must be >= 0"); + if (offset > size) throw new Error("Offset must be < size"); + // If length is less than or equal to 0, throw an error + if (length <= 0) throw new Error("Length must be > 0"); + // If length goes beyond actual length, adjust length to the end of the value + if (offset + length > size) length = size - offset; + + return { offset, length }; +} export interface StorageListOptions { // Stage 1: filtering /** Returned keys must start with this string if defined */ prefix?: string; + /** Returned keys must NOT start with this string if defined */ + excludePrefix?: string; /** Returned keys must be lexicographically >= this string if defined */ start?: string; /** Returned keys must be lexicographically < this string if defined */ diff --git a/packages/storage-file/src/helpers.ts b/packages/storage-file/src/helpers.ts index 6eb678039..58a8b201b 100644 --- a/packages/storage-file/src/helpers.ts +++ b/packages/storage-file/src/helpers.ts @@ -1,5 +1,6 @@ import fs from "fs/promises"; import path from "path"; +import { parseRange } from "@miniflare/shared"; export interface FileRange { value: Buffer; @@ -41,25 +42,7 @@ export async function readFileRange( filePath = await fs.realpath(filePath); const { size } = await fs.lstat(filePath); // build offset and length as necessary - if (suffix !== undefined) { - if (suffix <= 0) { - throw new Error("Suffix must be > 0"); - } - if (suffix > size) suffix = size; - offset = size - suffix; - length = size - offset; - } - if (offset === undefined) offset = 0; - if (length === undefined) { - // get length of file - length = size - offset; - } - - // check offset and length are valid - if (offset < 0) throw new Error("Offset must be >= 0"); - if (offset >= size) throw new Error("Offset must be < size"); - if (length <= 0) throw new Error("Length must be > 0"); - if (offset + length > size) length = size - offset; + ({ offset, length } = parseRange({ offset, length, suffix }, size)); // read file fd = await fs.open(filePath, "r"); diff --git a/packages/storage-memory/src/helpers.ts b/packages/storage-memory/src/helpers.ts index ca9ea47fa..4cdbed651 100644 --- a/packages/storage-memory/src/helpers.ts +++ b/packages/storage-memory/src/helpers.ts @@ -17,9 +17,12 @@ export function listFilterMatch( name: string ): boolean { return !( - (options?.prefix && !name.startsWith(options.prefix)) || - (options?.start && lexicographicCompare(name, options.start) < 0) || - (options?.end && lexicographicCompare(name, options.end) >= 0) + (options?.prefix !== undefined && !name.startsWith(options.prefix)) || + (options?.excludePrefix !== undefined && + name.startsWith(options.excludePrefix)) || + (options?.start !== undefined && + lexicographicCompare(name, options.start) < 0) || + (options?.end !== undefined && lexicographicCompare(name, options.end) >= 0) ); } diff --git a/packages/storage-memory/src/memory.ts b/packages/storage-memory/src/memory.ts index 3dae5eaaa..a5a4b2ebd 100644 --- a/packages/storage-memory/src/memory.ts +++ b/packages/storage-memory/src/memory.ts @@ -4,6 +4,7 @@ import { SqliteDB, createSQLiteDB, defaultClock, + parseRange, } from "@miniflare/shared"; import { StoredKeyMeta, StoredMeta, StoredValueMeta } from "@miniflare/shared"; import { cloneMetadata } from "./helpers"; @@ -54,32 +55,13 @@ export class MemoryStorage extends LocalStorage { getRangeMaybeExpired( key: string, - { offset, length, suffix }: Range + range: Range ): RangeStoredValueMeta | undefined { const stored = this.map.get(key); if (stored === undefined) return; const { value } = stored; const size = value.byteLength; - // build proper offset and length - if (suffix !== undefined) { - if (suffix <= 0) { - throw new Error("Suffix must be > 0"); - } - if (suffix > size) suffix = size; - offset = size - suffix; - length = size - offset; - } - if (offset === undefined) offset = 0; - if (length === undefined) length = size - offset; - - // if offset is negative or greater than size, throw an error - if (offset < 0) throw new Error("Offset must be >= 0"); - if (offset > size) throw new Error("Offset must be < size"); - // if length is less than or equal to 0, throw an error - if (length <= 0) throw new Error("Length must be > 0"); - - // if length goes beyond actual length, adjust length to the end of the value - if (offset + length > size) length = size - offset; + const { offset, length } = parseRange(range, size); return { value: value.slice(offset, offset + length), diff --git a/packages/storage-redis/src/index.ts b/packages/storage-redis/src/index.ts index 4c8712f2a..f651ebab1 100644 --- a/packages/storage-redis/src/index.ts +++ b/packages/storage-redis/src/index.ts @@ -12,6 +12,7 @@ import { StoredValue, StoredValueMeta, millisToSeconds, + parseRange, viewToArray, } from "@miniflare/shared"; import { listFilterMatch, listPaginate } from "@miniflare/storage-memory"; @@ -203,27 +204,10 @@ export class RedisStorage extends Storage { range: Range = {}, skipMetadata?: boolean ): Promise | undefined> { - let { offset, length, suffix } = range; // ensure offset and length are prepared const size = await this.#redis.strlen(this.#key(key)); if (size === 0) return undefined; - if (suffix !== undefined) { - if (suffix <= 0) { - throw new Error("Suffix must be > 0"); - } - if (suffix > size) suffix = size; - offset = size - suffix; - length = size - offset; - } - if (offset === undefined) offset = 0; - if (length === undefined) length = size - offset; - - // if offset is negative, throw an error - if (offset < 0) throw new Error("Offset must be >= 0"); - if (offset > size) throw new Error("Offset must be < size"); - if (length <= 0) throw new Error("Length must be > 0"); - // if length goes beyond actual length, adjust length to the end of the value - if (offset + length > size) length = size - offset; + const { offset, length } = parseRange(range, size); if (skipMetadata) { // If we don't need metadata, just get the value, Redis handles expiry diff --git a/types/vm.d.ts b/types/vm.d.ts index 3c588ecc6..ccaad4588 100644 --- a/types/vm.d.ts +++ b/types/vm.d.ts @@ -69,6 +69,6 @@ declare module "vm" { evaluateCallback: SyntheticModuleEvaluate, options?: SyntheticModuleOptions ); - setExport(name: keyof Namespace, value: any): void; + setExport(name: K, value: Namespace[K]): void; } }