Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(streams/unstable): toByteStream() #6046

Merged
merged 11 commits into from
Oct 8, 2024
1 change: 1 addition & 0 deletions streams/deno.json
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
"./text-line-stream": "./text_line_stream.ts",
"./to-array-buffer": "./to_array_buffer.ts",
"./to-blob": "./to_blob.ts",
"./unstable-to-byte-stream": "./unstable_to_byte_stream.ts",
"./unstable-to-bytes": "./unstable_to_bytes.ts",
"./to-json": "./to_json.ts",
"./unstable-to-lines": "./unstable_to_lines.ts",
Expand Down
73 changes: 73 additions & 0 deletions streams/unstable_to_byte_stream.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
// Copyright 2018-2024 the Deno authors. All rights reserved. MIT license.

/**
* The function takes a `ReadableStream<Uint8Array>` and wraps it in a BYOB
* stream if it doesn't already support it.
*
* @experimental **UNSTABLE**: New API, yet to be vetted.
*
* @example Usage
* ```ts
* import { assertEquals } from "@std/assert";
* import { toByteStream } from "@std/streams/unstable-to-byte-stream";
*
* const reader = toByteStream(ReadableStream.from([new Uint8Array(100)]))
* .getReader({ mode: "byob" });
*
* while (true) {
* const { done, value } = await reader.read(new Uint8Array(10), { min: 10 });
* if (done) break;
* assertEquals(value.length, 10);
* }
*
* reader.releaseLock();
* ```
*
* @param readable The ReadableStream to be wrapped if needed.
* @returns A BYOB ReadableStream.
*/
export function toByteStream(
readable: ReadableStream<Uint8Array>,
): ReadableStream<Uint8Array> {
try {
const reader = readable.getReader({ mode: "byob" });
reader.releaseLock();
return readable;
} catch {
const reader = readable.getReader();
return new ReadableStream({
type: "bytes",
async pull(controller) {
const value = await async function () {
while (true) {
const { done, value } = await reader.read();
if (done) return undefined;
if (value.length) return value;
}
}();

if (value == undefined) {
controller.close();
return controller.byobRequest?.respond(0);
}

if (controller.byobRequest?.view) {
const buffer = new Uint8Array(controller.byobRequest.view.buffer);
const offset = controller.byobRequest.view.byteOffset;
const size = buffer.length - offset;
if (value.length > size) {
buffer.set(value.slice(0, size), offset);
controller.byobRequest.respond(size);
controller.enqueue(value.slice(size));
} else {
buffer.set(value, offset);
controller.byobRequest.respond(value.length);
}
} else controller.enqueue(value);
},
async cancel(reason) {
await reader.cancel(reason);
},
});
}
}
101 changes: 101 additions & 0 deletions streams/unstable_to_byte_stream_test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,101 @@
// Copyright 2018-2024 the Deno authors. All rights reserved. MIT license.

import { assertEquals } from "@std/assert";
import { toByteStream } from "./unstable_to_byte_stream.ts";

Deno.test(
"toByteStream() consumable as BYOB with min property set",
async () => {
let size = 0;
const reader = toByteStream(ReadableStream.from(async function* () {
for (let i = 0; i < 100; ++i) {
const bytes = Math.floor(Math.random() * 10 + 5);
size += bytes;
yield new Uint8Array(bytes).map((_) => Math.random() * 256);
}
}())).getReader({ mode: "byob" });

let count = 0;
while (true) {
const { done, value } = await reader.read(new Uint8Array(10), {
min: 10,
});
if (done) {
count += value!.length;
break;
}
count += value.length;
}

assertEquals(count, size);
},
);

Deno.test("toByteStream() consumable as BYOB", async () => {
const size = 100;

const reader = toByteStream(
ReadableStream.from([
new Uint8Array(size),
new Uint8Array(size * 0),
new Uint8Array(size * 2),
new Uint8Array(5),
new Uint8Array(3),
new Uint8Array(size * 3),
]),
)
.getReader({ mode: "byob" });

let count = 0;
while (true) {
const { done, value } = await reader.read(new Uint8Array(10));
if (done) break;
count += value.length;
}

assertEquals(count, size * 6 + 5 + 3);
});

Deno.test("toByteStream() consumable as default", async () => {
const size = 100;

const reader = toByteStream(
ReadableStream.from([
new Uint8Array(size),
new Uint8Array(size * 0),
new Uint8Array(size * 2),
new Uint8Array(5),
new Uint8Array(3),
new Uint8Array(size * 3),
]),
)
.getReader();

let count = 0;
while (true) {
const { done, value } = await reader.read();
if (done) break;
count += value.length;
}

assertEquals(count, size * 6 + 5 + 3);
});

Deno.test("toByteStream() cancellable", async () => {
let receivedCancel = false;

const readable = toByteStream(
new ReadableStream({
pull(controller) {
controller.enqueue(new Uint8Array(100));
},
cancel(reason) {
receivedCancel = true;
assertEquals(reason, "Potato");
},
}),
);

await readable.cancel("Potato");
assertEquals(receivedCancel, true);
});