Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add function to abort async operations #748

Merged
merged 1 commit into from
Sep 1, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 13 additions & 5 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -69,15 +69,23 @@ read(buffer: Uint8Array, offset: number, length: number): Promise<number>
```

Parameters:
- `uint8Array`: `Uint8Array`: The buffer into which the data will be peeked.
This is where the peeked data will be stored.
- `offset`: `number`: The offset in the Uint8Array where the peeked data should start being written.
- `length`: `number`: The number of bytes to peek from the stream.
- `uint8Array`: `Uint8Array`: The buffer into which the data will be read.
This is where the read data will be stored.
- `offset`: `number`: The offset in the Uint8Array where the read data should start being written.
- `length`: `number`: The number of bytes to read from the stream.

Returns `Promise<number>`:
A promise that resolves with the number of bytes actually peeked into the buffer.
A promise that resolves with the number of bytes actually read into the buffer.
This number may be less than the requested length if the end of the stream is reached.

##### `abort` function

Abort active asynchronous operation (`read` or `peak`) before it has completed.

```ts
abort(): Promise<void>
```

## Examples

In the following example we read the first 16 bytes from a stream and store them in our buffer.
Expand Down
31 changes: 23 additions & 8 deletions lib/AbstractStreamReader.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,28 @@ import { EndOfStreamError } from "./EndOfStreamError.js";


export interface IStreamReader {
/**
* Peak ahead (peek) from stream. Subsequent read or peeks will return the same data.
* @param uint8Array - Uint8Array (or Buffer) to store data read from stream in
* @param offset - Offset target
* @param length - Number of bytes to read
* @returns Number of bytes peeked
*/
peek(uint8Array: Uint8Array, offset: number, length: number): Promise<number>;
read(buffer: Uint8Array, offset: number, length: number): Promise<number>

/**
* Read from stream the stream.
* @param uint8Array - Uint8Array (or Buffer) to store data read from stream in
* @param offset - Offset target
* @param length - Number of bytes to read
* @returns Number of bytes peeked
*/
read(uint8Array: Uint8Array, offset: number, length: number): Promise<number>;

/**
* Abort active asynchronous operation before it has completed.
*/
abort(): Promise<void>;
}

export abstract class AbstractStreamReader implements IStreamReader {
Expand All @@ -20,13 +40,6 @@ export abstract class AbstractStreamReader implements IStreamReader {
*/
protected peekQueue: Uint8Array[] = [];

/**
* Read ahead (peek) from stream. Subsequent read or peeks will return the same data
* @param uint8Array - Uint8Array (or Buffer) to store data read from stream in
* @param offset - Offset target
* @param length - Number of bytes to read
* @returns Number of bytes peeked
*/
public async peek(uint8Array: Uint8Array, offset: number, length: number): Promise<number> {
const bytesRead = await this.read(uint8Array, offset, length);
this.peekQueue.push(uint8Array.subarray(offset, offset + bytesRead)); // Put read data back to peek buffer
Expand Down Expand Up @@ -90,4 +103,6 @@ export abstract class AbstractStreamReader implements IStreamReader {
}

protected abstract readFromStream(buffer: Uint8Array, offset: number, length: number): Promise<number>;

public abstract abort(): Promise<void>;
}
4 changes: 4 additions & 0 deletions lib/StreamReader.ts
Original file line number Diff line number Diff line change
Expand Up @@ -91,4 +91,8 @@ export class StreamReader extends AbstractStreamReader {
this.deferred = null;
}
}

public async abort(): Promise<void> {
this.s.destroy();
}
}
6 changes: 6 additions & 0 deletions lib/WebStreamReader.ts
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,9 @@ export type AnyWebByteStream = NodeReadableStream<Uint8Array> | ReadableStream<U
* Reference: https://nodejs.org/api/webstreams.html#class-readablestreambyobreader
*/
export class WebStreamReader extends AbstractStreamReader {

private reader: ReadableStreamBYOBReader;
private abortController = new AbortController();

public constructor(stream: AnyWebByteStream) {
super();
Expand All @@ -36,4 +38,8 @@ export class WebStreamReader extends AbstractStreamReader {

return 0;
}

public abort(): Promise<void> {
return this.reader.cancel();
}
}
4 changes: 3 additions & 1 deletion package.json
Original file line number Diff line number Diff line change
Expand Up @@ -43,11 +43,13 @@
],
"devDependencies": {
"@biomejs/biome": "1.8.3",
"@types/chai": "^4.3.17",
"@types/chai": "^4.3.19",
"@types/chai-as-promised": "^8.0.0",
"@types/mocha": "^10.0.7",
"@types/node": "^22.1.0",
"c8": "^10.1.2",
"chai": "^5.1.1",
"chai-as-promised": "^8.0.0",
"del-cli": "^5.1.0",
"mocha": "^10.7.0",
"remark-cli": "^12.0.1",
Expand Down
31 changes: 25 additions & 6 deletions test/test.ts
Original file line number Diff line number Diff line change
@@ -1,16 +1,16 @@
import {assert, expect} from 'chai';
import {assert, expect, use} from 'chai';
import chaiAsPromised from 'chai-as-promised';
import {EventEmitter} from 'node:events';
import * as fs from 'node:fs';
import {Readable} from 'node:stream';
import {EndOfStreamError, type IStreamReader, StreamReader, WebStreamReader} from '../lib/index.js';
import {SourceStream, stringToReadableStream} from './util.js';


type StringToStreamFactory = (input: string) => IStreamReader;
use(chaiAsPromised);

interface StreamFactorySuite {
description: string;
fromString: StringToStreamFactory;
fromString: (input: string, delay?: number) => IStreamReader;
}

const latin1TextDecoder = new TextDecoder('latin1');
Expand All @@ -19,10 +19,10 @@ describe('Matrix', () => {

const streamFactories: StreamFactorySuite[] = [{
description: 'Node.js StreamReader',
fromString: input => new StreamReader(new SourceStream(input))
fromString: (input, delay) => new StreamReader(new SourceStream(input, delay))
}, {
description: 'WebStream Reader',
fromString: input => new WebStreamReader(stringToReadableStream(input))
fromString: (input, delay) => new WebStreamReader(stringToReadableStream(input, delay))
}];

streamFactories
Expand Down Expand Up @@ -204,6 +204,25 @@ describe('Matrix', () => {

});

describe('Handle delayed read', () => {

it('handle delay', async () => {
const fileReadStream = factory.fromString('123', 500);
const res = new Uint8Array(3);
const promise = fileReadStream.read(res, 0, 3);
assert.strictEqual(await promise, 3);
});

it('abort async operation', async () => {
const fileReadStream = factory.fromString('123', 500);
const res = new Uint8Array(3);
const promise = fileReadStream.read(res, 0, 3);
await fileReadStream.abort();
await expect(promise).to.be.rejectedWith(Error)
});

});

});
});
});
Expand Down
22 changes: 13 additions & 9 deletions test/util.ts
Original file line number Diff line number Diff line change
Expand Up @@ -10,21 +10,23 @@ export class SourceStream extends Readable {

private buf: Uint8Array;

constructor(private str = '') {
constructor(private str = '', private delay = 0) {
super();

this.buf = new TextEncoder().encode(str);
this.buf = new TextEncoder().encode(str);
}

public _read() {
this.push(this.buf);
this.push(null); // push the EOF-signaling `null` chunk
setTimeout(() => {
this.push(this.buf);
this.push(null); // Signal end of stream
}, this.delay);
}
}


// Function to convert a string to a BYOB ReadableStream
function stringToBYOBStream(inputString: string): ReadableStream<Uint8Array> {
function stringToBYOBStream(inputString: string, delay = 0): ReadableStream<Uint8Array> {
// Convert the string to a Uint8Array using TextEncoder
const encoder = new TextEncoder();
const uint8Array = encoder.encode(inputString);
Expand All @@ -48,8 +50,10 @@ function stringToBYOBStream(inputString: string): ReadableStream<Uint8Array> {
// @ts-ignore
controller.byobRequest.respond(bytesRead);
} else {
controller.enqueue(uint8Array);
position = uint8Array.length;
setTimeout(() => {
controller.enqueue(uint8Array);
position = uint8Array.length;
}, delay);
}
if (position >= uint8Array.length) {
controller.close();
Expand All @@ -60,6 +64,6 @@ function stringToBYOBStream(inputString: string): ReadableStream<Uint8Array> {
}

// Function to convert a string to a ReadableStreamBYOBReader
export function stringToReadableStream(inputString: string): ReadableStream<Uint8Array> {
return stringToBYOBStream(inputString);
export function stringToReadableStream(inputString: string, delay?: number): ReadableStream<Uint8Array> {
return stringToBYOBStream(inputString, delay);
}
34 changes: 28 additions & 6 deletions yarn.lock
Original file line number Diff line number Diff line change
Expand Up @@ -342,10 +342,19 @@ __metadata:
languageName: node
linkType: hard

"@types/chai@npm:^4.3.17":
version: 4.3.17
resolution: "@types/chai@npm:4.3.17"
checksum: 10c0/322a74489cdfde9c301b593d086c539584924c4c92689a858e0930708895a5ab229c31c64ac26b137615ef3ffbff1866851c280c093e07b3d3de05983d3793e0
"@types/chai-as-promised@npm:^8.0.0":
version: 8.0.0
resolution: "@types/chai-as-promised@npm:8.0.0"
dependencies:
"@types/chai": "npm:*"
checksum: 10c0/85c91bad8a5f1665a51a3fbb8cdd80b8eca663cf42816ccdfaddff1a9b8705060235220c26e32d361f2568386216c7ee484ab4e1735c971dbdcacced7acc857d
languageName: node
linkType: hard

"@types/chai@npm:*, @types/chai@npm:^4.3.19":
version: 4.3.19
resolution: "@types/chai@npm:4.3.19"
checksum: 10c0/8fd573192e486803c4d04185f2b0fab554660d9a1300dbed5bde9747ab8bef15f462a226f560ed5ca48827eecaf8d71eed64aa653ff9aec72fb2eae272e43a84
languageName: node
linkType: hard

Expand Down Expand Up @@ -774,6 +783,17 @@ __metadata:
languageName: node
linkType: hard

"chai-as-promised@npm:^8.0.0":
version: 8.0.0
resolution: "chai-as-promised@npm:8.0.0"
dependencies:
check-error: "npm:^2.0.0"
peerDependencies:
chai: ">= 2.1.2 < 6"
checksum: 10c0/60200ea9cdac24394a97e768edd7d0cd8962097a02e3e6158932c13f7e9d89489d20e6a2e540e80dbe845c71362e7a7a311672fd16a2eab4214a6952fd0db804
languageName: node
linkType: hard

"chai@npm:^5.1.1":
version: 5.1.1
resolution: "chai@npm:5.1.1"
Expand Down Expand Up @@ -850,7 +870,7 @@ __metadata:
languageName: node
linkType: hard

"check-error@npm:^2.1.1":
"check-error@npm:^2.0.0, check-error@npm:^2.1.1":
version: 2.1.1
resolution: "check-error@npm:2.1.1"
checksum: 10c0/979f13eccab306cf1785fa10941a590b4e7ea9916ea2a4f8c87f0316fc3eab07eabefb6e587424ef0f88cbcd3805791f172ea739863ca3d7ce2afc54641c7f0e
Expand Down Expand Up @@ -2868,11 +2888,13 @@ __metadata:
resolution: "peek-readable@workspace:."
dependencies:
"@biomejs/biome": "npm:1.8.3"
"@types/chai": "npm:^4.3.17"
"@types/chai": "npm:^4.3.19"
"@types/chai-as-promised": "npm:^8.0.0"
"@types/mocha": "npm:^10.0.7"
"@types/node": "npm:^22.1.0"
c8: "npm:^10.1.2"
chai: "npm:^5.1.1"
chai-as-promised: "npm:^8.0.0"
del-cli: "npm:^5.1.0"
mocha: "npm:^10.7.0"
remark-cli: "npm:^12.0.1"
Expand Down
Loading