diff --git a/README.md b/README.md index 700dd732..03c01610 100644 --- a/README.md +++ b/README.md @@ -69,15 +69,23 @@ read(buffer: Uint8Array, offset: number, length: number): Promise ``` Parameters: -- `uint8Array`: `Uint8Array`: The buffer into which the data will be peeked. - This is where the peeked data will be stored. -- `offset`: `number`: The offset in the Uint8Array where the peeked data should start being written. -- `length`: `number`: The number of bytes to peek from the stream. +- `uint8Array`: `Uint8Array`: The buffer into which the data will be read. + This is where the read data will be stored. +- `offset`: `number`: The offset in the Uint8Array where the read data should start being written. +- `length`: `number`: The number of bytes to read from the stream. Returns `Promise`: -A promise that resolves with the number of bytes actually peeked into the buffer. +A promise that resolves with the number of bytes actually read into the buffer. This number may be less than the requested length if the end of the stream is reached. +##### `abort` function + +Abort active asynchronous operation (`read` or `peak`) before it has completed. + +```ts +abort(): Promise +``` + ## Examples In the following example we read the first 16 bytes from a stream and store them in our buffer. diff --git a/lib/AbstractStreamReader.ts b/lib/AbstractStreamReader.ts index 833b156f..997446c4 100644 --- a/lib/AbstractStreamReader.ts +++ b/lib/AbstractStreamReader.ts @@ -2,8 +2,28 @@ import { EndOfStreamError } from "./EndOfStreamError.js"; export interface IStreamReader { + /** + * Peak ahead (peek) from stream. Subsequent read or peeks will return the same data. + * @param uint8Array - Uint8Array (or Buffer) to store data read from stream in + * @param offset - Offset target + * @param length - Number of bytes to read + * @returns Number of bytes peeked + */ peek(uint8Array: Uint8Array, offset: number, length: number): Promise; - read(buffer: Uint8Array, offset: number, length: number): Promise + + /** + * Read from stream the stream. + * @param uint8Array - Uint8Array (or Buffer) to store data read from stream in + * @param offset - Offset target + * @param length - Number of bytes to read + * @returns Number of bytes peeked + */ + read(uint8Array: Uint8Array, offset: number, length: number): Promise; + + /** + * Abort active asynchronous operation before it has completed. + */ + abort(): Promise; } export abstract class AbstractStreamReader implements IStreamReader { @@ -20,13 +40,6 @@ export abstract class AbstractStreamReader implements IStreamReader { */ protected peekQueue: Uint8Array[] = []; - /** - * Read ahead (peek) from stream. Subsequent read or peeks will return the same data - * @param uint8Array - Uint8Array (or Buffer) to store data read from stream in - * @param offset - Offset target - * @param length - Number of bytes to read - * @returns Number of bytes peeked - */ public async peek(uint8Array: Uint8Array, offset: number, length: number): Promise { const bytesRead = await this.read(uint8Array, offset, length); this.peekQueue.push(uint8Array.subarray(offset, offset + bytesRead)); // Put read data back to peek buffer @@ -90,4 +103,6 @@ export abstract class AbstractStreamReader implements IStreamReader { } protected abstract readFromStream(buffer: Uint8Array, offset: number, length: number): Promise; + + public abstract abort(): Promise; } diff --git a/lib/StreamReader.ts b/lib/StreamReader.ts index b9400b24..911e8e1e 100644 --- a/lib/StreamReader.ts +++ b/lib/StreamReader.ts @@ -91,4 +91,8 @@ export class StreamReader extends AbstractStreamReader { this.deferred = null; } } + + public async abort(): Promise { + this.s.destroy(); + } } diff --git a/lib/WebStreamReader.ts b/lib/WebStreamReader.ts index ffafbc1c..9b9cdec8 100644 --- a/lib/WebStreamReader.ts +++ b/lib/WebStreamReader.ts @@ -10,7 +10,9 @@ export type AnyWebByteStream = NodeReadableStream | ReadableStream { + return this.reader.cancel(); + } } diff --git a/package.json b/package.json index ee3443f5..adfadae8 100644 --- a/package.json +++ b/package.json @@ -43,11 +43,13 @@ ], "devDependencies": { "@biomejs/biome": "1.8.3", - "@types/chai": "^4.3.17", + "@types/chai": "^4.3.19", + "@types/chai-as-promised": "^8.0.0", "@types/mocha": "^10.0.7", "@types/node": "^22.1.0", "c8": "^10.1.2", "chai": "^5.1.1", + "chai-as-promised": "^8.0.0", "del-cli": "^5.1.0", "mocha": "^10.7.0", "remark-cli": "^12.0.1", diff --git a/test/test.ts b/test/test.ts index 7421768c..27b96c48 100644 --- a/test/test.ts +++ b/test/test.ts @@ -1,16 +1,16 @@ -import {assert, expect} from 'chai'; +import {assert, expect, use} from 'chai'; +import chaiAsPromised from 'chai-as-promised'; import {EventEmitter} from 'node:events'; import * as fs from 'node:fs'; import {Readable} from 'node:stream'; import {EndOfStreamError, type IStreamReader, StreamReader, WebStreamReader} from '../lib/index.js'; import {SourceStream, stringToReadableStream} from './util.js'; - -type StringToStreamFactory = (input: string) => IStreamReader; +use(chaiAsPromised); interface StreamFactorySuite { description: string; - fromString: StringToStreamFactory; + fromString: (input: string, delay?: number) => IStreamReader; } const latin1TextDecoder = new TextDecoder('latin1'); @@ -19,10 +19,10 @@ describe('Matrix', () => { const streamFactories: StreamFactorySuite[] = [{ description: 'Node.js StreamReader', - fromString: input => new StreamReader(new SourceStream(input)) + fromString: (input, delay) => new StreamReader(new SourceStream(input, delay)) }, { description: 'WebStream Reader', - fromString: input => new WebStreamReader(stringToReadableStream(input)) + fromString: (input, delay) => new WebStreamReader(stringToReadableStream(input, delay)) }]; streamFactories @@ -204,6 +204,25 @@ describe('Matrix', () => { }); + describe('Handle delayed read', () => { + + it('handle delay', async () => { + const fileReadStream = factory.fromString('123', 500); + const res = new Uint8Array(3); + const promise = fileReadStream.read(res, 0, 3); + assert.strictEqual(await promise, 3); + }); + + it('abort async operation', async () => { + const fileReadStream = factory.fromString('123', 500); + const res = new Uint8Array(3); + const promise = fileReadStream.read(res, 0, 3); + await fileReadStream.abort(); + await expect(promise).to.be.rejectedWith(Error) + }); + + }); + }); }); }); diff --git a/test/util.ts b/test/util.ts index fa8eed45..a3ec336d 100644 --- a/test/util.ts +++ b/test/util.ts @@ -10,21 +10,23 @@ export class SourceStream extends Readable { private buf: Uint8Array; - constructor(private str = '') { + constructor(private str = '', private delay = 0) { super(); - this.buf = new TextEncoder().encode(str); + this.buf = new TextEncoder().encode(str); } public _read() { - this.push(this.buf); - this.push(null); // push the EOF-signaling `null` chunk + setTimeout(() => { + this.push(this.buf); + this.push(null); // Signal end of stream + }, this.delay); } } // Function to convert a string to a BYOB ReadableStream -function stringToBYOBStream(inputString: string): ReadableStream { +function stringToBYOBStream(inputString: string, delay = 0): ReadableStream { // Convert the string to a Uint8Array using TextEncoder const encoder = new TextEncoder(); const uint8Array = encoder.encode(inputString); @@ -48,8 +50,10 @@ function stringToBYOBStream(inputString: string): ReadableStream { // @ts-ignore controller.byobRequest.respond(bytesRead); } else { - controller.enqueue(uint8Array); - position = uint8Array.length; + setTimeout(() => { + controller.enqueue(uint8Array); + position = uint8Array.length; + }, delay); } if (position >= uint8Array.length) { controller.close(); @@ -60,6 +64,6 @@ function stringToBYOBStream(inputString: string): ReadableStream { } // Function to convert a string to a ReadableStreamBYOBReader -export function stringToReadableStream(inputString: string): ReadableStream { - return stringToBYOBStream(inputString); +export function stringToReadableStream(inputString: string, delay?: number): ReadableStream { + return stringToBYOBStream(inputString, delay); } diff --git a/yarn.lock b/yarn.lock index c8c1838d..25b21111 100644 --- a/yarn.lock +++ b/yarn.lock @@ -342,10 +342,19 @@ __metadata: languageName: node linkType: hard -"@types/chai@npm:^4.3.17": - version: 4.3.17 - resolution: "@types/chai@npm:4.3.17" - checksum: 10c0/322a74489cdfde9c301b593d086c539584924c4c92689a858e0930708895a5ab229c31c64ac26b137615ef3ffbff1866851c280c093e07b3d3de05983d3793e0 +"@types/chai-as-promised@npm:^8.0.0": + version: 8.0.0 + resolution: "@types/chai-as-promised@npm:8.0.0" + dependencies: + "@types/chai": "npm:*" + checksum: 10c0/85c91bad8a5f1665a51a3fbb8cdd80b8eca663cf42816ccdfaddff1a9b8705060235220c26e32d361f2568386216c7ee484ab4e1735c971dbdcacced7acc857d + languageName: node + linkType: hard + +"@types/chai@npm:*, @types/chai@npm:^4.3.19": + version: 4.3.19 + resolution: "@types/chai@npm:4.3.19" + checksum: 10c0/8fd573192e486803c4d04185f2b0fab554660d9a1300dbed5bde9747ab8bef15f462a226f560ed5ca48827eecaf8d71eed64aa653ff9aec72fb2eae272e43a84 languageName: node linkType: hard @@ -774,6 +783,17 @@ __metadata: languageName: node linkType: hard +"chai-as-promised@npm:^8.0.0": + version: 8.0.0 + resolution: "chai-as-promised@npm:8.0.0" + dependencies: + check-error: "npm:^2.0.0" + peerDependencies: + chai: ">= 2.1.2 < 6" + checksum: 10c0/60200ea9cdac24394a97e768edd7d0cd8962097a02e3e6158932c13f7e9d89489d20e6a2e540e80dbe845c71362e7a7a311672fd16a2eab4214a6952fd0db804 + languageName: node + linkType: hard + "chai@npm:^5.1.1": version: 5.1.1 resolution: "chai@npm:5.1.1" @@ -850,7 +870,7 @@ __metadata: languageName: node linkType: hard -"check-error@npm:^2.1.1": +"check-error@npm:^2.0.0, check-error@npm:^2.1.1": version: 2.1.1 resolution: "check-error@npm:2.1.1" checksum: 10c0/979f13eccab306cf1785fa10941a590b4e7ea9916ea2a4f8c87f0316fc3eab07eabefb6e587424ef0f88cbcd3805791f172ea739863ca3d7ce2afc54641c7f0e @@ -2868,11 +2888,13 @@ __metadata: resolution: "peek-readable@workspace:." dependencies: "@biomejs/biome": "npm:1.8.3" - "@types/chai": "npm:^4.3.17" + "@types/chai": "npm:^4.3.19" + "@types/chai-as-promised": "npm:^8.0.0" "@types/mocha": "npm:^10.0.7" "@types/node": "npm:^22.1.0" c8: "npm:^10.1.2" chai: "npm:^5.1.1" + chai-as-promised: "npm:^8.0.0" del-cli: "npm:^5.1.0" mocha: "npm:^10.7.0" remark-cli: "npm:^12.0.1"