Skip to content

Commit

Permalink
Support web-streams (WHATWG standard streams)
Browse files Browse the repository at this point in the history
  • Loading branch information
Borewit committed Dec 12, 2023
1 parent 174a035 commit e9ad62b
Show file tree
Hide file tree
Showing 10 changed files with 215 additions and 13 deletions.
1 change: 0 additions & 1 deletion .github/workflows/nodejs-ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,6 @@ jobs:
test/**/*.js.map
test:

runs-on: ubuntu-latest
needs: build

Expand Down
17 changes: 17 additions & 0 deletions .nycrc
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
{
"cache": false,
"check-coverage": false,
"extension": [
".ts"
],
"sourceMap": true,
"instrument": true,
"reporter": [
"lcov",
"text"
],
"all": true,
"instrument": true,
"report-dir": "coverage",
"include": "lib/**/*.ts"
}
52 changes: 52 additions & 0 deletions lib/WebStreamReader.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
// @ts-ignore
import { ReadableStream, ReadableStreamBYOBReader } from 'node:stream/web';
import { EndOfStreamError } from './EndOfFileStream.js';

export { EndOfStreamError } from './EndOfFileStream.js';

export class WebStreamReader {

private reader: ReadableStreamBYOBReader;

/**
* Store peeked data
* @type {Array}
*/
private peekQueue: Uint8Array[] = [];

public constructor(stream: ReadableStream) {
this.reader = stream.getReader({mode: 'byob'}) as ReadableStreamBYOBReader;
}

/**
* Read ahead (peek) from stream. Subsequent read or peeks will return the same data
* @param buffer - Uint8Array (or Buffer) to store data read from stream in
* @param offset - Offset target
* @param length - Number of bytes to read
* @returns Number of bytes peeked
*/
public async peek(buffer: Uint8Array, offset: number, length: number): Promise<number> {
const bytesRead = await this.read(buffer, offset, length);
this.peekQueue.push(buffer.subarray(offset, offset + bytesRead)); // Put read data back to peek buffer
return bytesRead;
}

/**
* Read chunk from stream
* @param buffer - Target Uint8Array (or Buffer) to store data read from stream in
* @param offset - Offset target
* @param length - Number of bytes to read
* @returns Number of bytes read
*/
public async read(buffer: Uint8Array, offset: number, length: number): Promise<number> {
if (length === 0) {
return 0;
}
const result = await this.reader.read(buffer);

if (result.done) {
throw new EndOfStreamError();
}
return length;
}
}
1 change: 1 addition & 0 deletions lib/index.ts
Original file line number Diff line number Diff line change
@@ -1,2 +1,3 @@
export { EndOfStreamError } from './EndOfFileStream.js';
export { StreamReader } from './StreamReader.js';
export { WebStreamReader } from './WebStreamReader.js';
2 changes: 1 addition & 1 deletion package.json
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@
"devDependencies": {
"@types/chai": "^4.3.4",
"@types/mocha": "^10.0.1",
"@types/node": "^18.11.13",
"@types/node": "^20.10.4",
"@typescript-eslint/eslint-plugin": "^4.33.0",
"@typescript-eslint/parser": "^4.33.0",
"c8": "^7.12.0",
Expand Down
2 changes: 1 addition & 1 deletion test/examples.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
/* eslint-disable no-console */

import { assert } from 'chai';
import fs from 'node:fs';
import * as fs from 'node:fs';
import { EndOfStreamError, StreamReader } from '../lib/index.js';

describe('Examples', () => {
Expand Down
57 changes: 50 additions & 7 deletions test/test.ts
Original file line number Diff line number Diff line change
@@ -1,11 +1,11 @@
import { assert, expect } from 'chai';
import { EventEmitter } from 'node:events';
import fs from 'node:fs';
import { Readable } from 'node:stream';
import { EndOfStreamError, StreamReader } from '../lib/index.js';
import { SourceStream } from './util.js';
import {assert, expect} from 'chai';
import {EventEmitter} from 'node:events';
import * as fs from 'node:fs';
import {Readable} from 'node:stream';
import {EndOfStreamError, StreamReader, WebStreamReader} from '../lib/index.js';
import {SourceStream, stringToReadableStream} from './util.js';

describe('StreamReader', () => {
describe('Node.js StreamReader', () => {

it('should throw an exception if constructor argument is not a stream', () => {
class MyEmitter extends EventEmitter {
Expand Down Expand Up @@ -327,3 +327,46 @@ describe('StreamReader', () => {
});

});

describe('WebStreamReader', () => {


it('should be able to handle 0 byte read request', async () => {
const webStreamReader = new WebStreamReader(stringToReadableStream('abcdefg'));

const buf = new Uint8Array(0);
const bytesRead = await webStreamReader.read(buf, 0, 0);
assert.strictEqual(bytesRead, 0, 'Should return');
});

it('read from a streamed data chunk', async () => {
const webStreamReader = new WebStreamReader(stringToReadableStream('\x05peter'));

let uint8Array: Uint8Array;
let bytesRead: number;

// read only one byte from the chunk
uint8Array = new Uint8Array(1);
bytesRead = await webStreamReader.read(uint8Array, 0, 1);
assert.strictEqual(bytesRead, 1, 'Should read exactly one byte');
assert.strictEqual(uint8Array[0], 5, '0x05 == 5');


// should decode string from chunk
uint8Array = new Uint8Array(5);
bytesRead = await webStreamReader.read(uint8Array, 0, 5);
assert.strictEqual(bytesRead, 5, 'Should read 5 bytes');
assert.strictEqual(Buffer.from(uint8Array).toString('latin1'), 'peter');

// should reject at the end of the stream
uint8Array = new Uint8Array(1);
try {
await webStreamReader.read(uint8Array, 0, 1);
assert.fail('Should reject due to end-of-stream');
} catch (err) {
assert.instanceOf(err, EndOfStreamError);
}
});


});
80 changes: 79 additions & 1 deletion test/util.ts
Original file line number Diff line number Diff line change
@@ -1,9 +1,14 @@
// Utilities for testing

import { Readable } from 'node:stream';
import { Blob } from 'node:buffer';

// @ts-ignore
import { ReadableStream, ReadableByteStreamController } from 'node:stream/web';


/**
* A mock readable-stream, using string to read from
* A mock Node.js readable-stream, using string to read from
*/
export class SourceStream extends Readable {

Expand All @@ -20,3 +25,76 @@ export class SourceStream extends Readable {
this.push(null); // push the EOF-signaling `null` chunk
}
}

// /**
// * Convert Uint8Array to ReadableStream
// */
// function uint8ArrayToReadableStream(uint8Array: Uint8Array): ReadableStream<Uint8Array> {
// let position = 0;
// return new ReadableStream({
// type: 'bytes',
// start(controller) {},
// async pull(controller) {
// // Called when there is a pull request for data
// // @ts-ignore
// const theView = controller.byobRequest.view;
// theView.set(uint8Array.subarray(position), theView.byteOffset, theView.byteLength);
// // @ts-ignore
// controller.byobRequest.respond(theView.byteLength);
// position += theView.byteLength;
// },
// cancel(reason) {}
// });
// }
//
// /**
// * Convert string to ReadableStream
// */
// export function stringToReadableStream(str: string) {
// const buffer = Buffer.from(str, 'latin1');
// return uint8ArrayToReadableStream(buffer);
// }

// Function to convert a string to a BYOB ReadableStream
function stringToBYOBStream(inputString: string): ReadableStream<Uint8Array> {
// Convert the string to a Uint8Array using TextEncoder
const encoder = new TextEncoder();
const uint8Array = encoder.encode(inputString);

let position = 0;

// Create a BYOBReadableStream
const stream = new ReadableStream({
type: 'bytes',
async pull(controller) {
// Check if there is data left to be pushed
if (position < uint8Array.length) {
// Push the chunk to the controller
if (controller.byobRequest) {
const remaining = uint8Array.length - position;
// @ts-ignore
const v = controller.byobRequest.view;
const bytesRead = Math.min(remaining, v.byteLength);
v.set(uint8Array.subarray(position, position + bytesRead));
position += bytesRead;
// @ts-ignore
controller.byobRequest.respond(bytesRead);
} else {
controller.enqueue(uint8Array);
position = uint8Array.length;
}
} else {
// If no more data is left, close the stream
controller.close();
}
}
});

return stream;
}


// Function to convert a string to a ReadableStreamBYOBReader
export function stringToReadableStream(inputString: string): ReadableStream<Uint8Array> {
return stringToBYOBStream(inputString);
}
2 changes: 1 addition & 1 deletion tsconfig.json
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
"inlineSources": false,
"module": "node16",
"moduleResolution": "node16",
"target": "ES2019",
"target": "ES2020",
"esModuleInterop": true,
"strict": true
}
Expand Down
14 changes: 13 additions & 1 deletion yarn.lock
Original file line number Diff line number Diff line change
Expand Up @@ -282,11 +282,18 @@
resolved "https://registry.yarnpkg.com/@types/ms/-/ms-0.7.31.tgz#31b7ca6407128a3d2bbc27fe2d21b345397f6197"
integrity sha512-iiUgKzV9AuaEkZqkOLDIvlQiL6ltuZd9tGcW3gwpnX8JbuiuhFlEGmmFXEXkN50Cvq7Os88IY2v0dkDqXYWVgA==

"@types/node@*", "@types/node@^18.0.0", "@types/node@^18.11.13":
"@types/node@*", "@types/node@^18.0.0":
version "18.11.13"
resolved "https://registry.yarnpkg.com/@types/node/-/node-18.11.13.tgz#dff34f226ec1ac0432ae3b136ec5552bd3b9c0fe"
integrity sha512-IASpMGVcWpUsx5xBOrxMj7Bl8lqfuTY7FKAnPmu5cHkfQVWF8GulWS1jbRqA934qZL35xh5xN/+Xe/i26Bod4w==

"@types/node@^20.10.4":
version "20.10.4"
resolved "https://registry.yarnpkg.com/@types/node/-/node-20.10.4.tgz#b246fd84d55d5b1b71bf51f964bd514409347198"
integrity sha512-D08YG6rr8X90YB56tSIuBaddy/UXAA9RKJoFvrsnogAum/0pmjkgi4+2nx96A330FmioegBWmEYQ+syqCFaveg==
dependencies:
undici-types "~5.26.4"

"@types/normalize-package-data@^2.4.0":
version "2.4.1"
resolved "https://registry.yarnpkg.com/@types/normalize-package-data/-/normalize-package-data-2.4.1.tgz#d3357479a0fdfdd5907fe67e17e0a85c906e1301"
Expand Down Expand Up @@ -3661,6 +3668,11 @@ unbox-primitive@^1.0.2:
has-symbols "^1.0.3"
which-boxed-primitive "^1.0.2"

undici-types@~5.26.4:
version "5.26.5"
resolved "https://registry.yarnpkg.com/undici-types/-/undici-types-5.26.5.tgz#bcd539893d00b56e964fd2657a4866b221a65617"
integrity sha512-JlCMO+ehdEIKqlFxk6IfVoAUVmgz7cU7zD/h9XZ0qzeosSHmUJVOzSQvvYSYWXkFXC+IfLKSIffhv0sVZup6pA==

unified-args@^10.0.0:
version "10.0.0"
resolved "https://registry.yarnpkg.com/unified-args/-/unified-args-10.0.0.tgz#95994c5558fea83ff07006cb560fd88cdcf31134"
Expand Down

0 comments on commit e9ad62b

Please sign in to comment.