Skip to content

Commit

Permalink
Add function to abort async operations
Browse files Browse the repository at this point in the history
  • Loading branch information
Borewit committed Sep 1, 2024
1 parent bf0b23f commit 49ab1f9
Show file tree
Hide file tree
Showing 8 changed files with 115 additions and 35 deletions.
18 changes: 13 additions & 5 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -69,15 +69,23 @@ read(buffer: Uint8Array, offset: number, length: number): Promise<number>
```

Parameters:
- `uint8Array`: `Uint8Array`: The buffer into which the data will be peeked.
This is where the peeked data will be stored.
- `offset`: `number`: The offset in the Uint8Array where the peeked data should start being written.
- `length`: `number`: The number of bytes to peek from the stream.
- `uint8Array`: `Uint8Array`: The buffer into which the data will be read.
This is where the read data will be stored.
- `offset`: `number`: The offset in the Uint8Array where the read data should start being written.
- `length`: `number`: The number of bytes to read from the stream.

Returns `Promise<number>`:
A promise that resolves with the number of bytes actually peeked into the buffer.
A promise that resolves with the number of bytes actually read into the buffer.
This number may be less than the requested length if the end of the stream is reached.

##### `abort` function

Abort active asynchronous operation (`read` or `peak`) before it has completed.

```ts
abort(): Promise<void>
```

## Examples

In the following example we read the first 16 bytes from a stream and store them in our buffer.
Expand Down
31 changes: 23 additions & 8 deletions lib/AbstractStreamReader.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,28 @@ import { EndOfStreamError } from "./EndOfStreamError.js";


export interface IStreamReader {
/**
* Peak ahead (peek) from stream. Subsequent read or peeks will return the same data.
* @param uint8Array - Uint8Array (or Buffer) to store data read from stream in
* @param offset - Offset target
* @param length - Number of bytes to read
* @returns Number of bytes peeked
*/
peek(uint8Array: Uint8Array, offset: number, length: number): Promise<number>;
read(buffer: Uint8Array, offset: number, length: number): Promise<number>

/**
* Read from stream the stream.
* @param uint8Array - Uint8Array (or Buffer) to store data read from stream in
* @param offset - Offset target
* @param length - Number of bytes to read
* @returns Number of bytes peeked
*/
read(uint8Array: Uint8Array, offset: number, length: number): Promise<number>;

/**
* Abort active asynchronous operation before it has completed.
*/
abort(): Promise<void>;
}

export abstract class AbstractStreamReader implements IStreamReader {
Expand All @@ -20,13 +40,6 @@ export abstract class AbstractStreamReader implements IStreamReader {
*/
protected peekQueue: Uint8Array[] = [];

/**
* Read ahead (peek) from stream. Subsequent read or peeks will return the same data
* @param uint8Array - Uint8Array (or Buffer) to store data read from stream in
* @param offset - Offset target
* @param length - Number of bytes to read
* @returns Number of bytes peeked
*/
public async peek(uint8Array: Uint8Array, offset: number, length: number): Promise<number> {
const bytesRead = await this.read(uint8Array, offset, length);
this.peekQueue.push(uint8Array.subarray(offset, offset + bytesRead)); // Put read data back to peek buffer
Expand Down Expand Up @@ -90,4 +103,6 @@ export abstract class AbstractStreamReader implements IStreamReader {
}

protected abstract readFromStream(buffer: Uint8Array, offset: number, length: number): Promise<number>;

public abstract abort(): Promise<void>;
}
4 changes: 4 additions & 0 deletions lib/StreamReader.ts
Original file line number Diff line number Diff line change
Expand Up @@ -91,4 +91,8 @@ export class StreamReader extends AbstractStreamReader {
this.deferred = null;
}
}

public async abort(): Promise<void> {
this.s.destroy();
}
}
6 changes: 6 additions & 0 deletions lib/WebStreamReader.ts
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,9 @@ export type AnyWebByteStream = NodeReadableStream<Uint8Array> | ReadableStream<U
* Reference: https://nodejs.org/api/webstreams.html#class-readablestreambyobreader
*/
export class WebStreamReader extends AbstractStreamReader {

private reader: ReadableStreamBYOBReader;
private abortController = new AbortController();

public constructor(stream: AnyWebByteStream) {
super();
Expand All @@ -36,4 +38,8 @@ export class WebStreamReader extends AbstractStreamReader {

return 0;
}

public abort(): Promise<void> {
return this.reader.cancel();
}
}
4 changes: 3 additions & 1 deletion package.json
Original file line number Diff line number Diff line change
Expand Up @@ -43,11 +43,13 @@
],
"devDependencies": {
"@biomejs/biome": "1.8.3",
"@types/chai": "^4.3.17",
"@types/chai": "^4.3.19",
"@types/chai-as-promised": "^8.0.0",
"@types/mocha": "^10.0.7",
"@types/node": "^22.1.0",
"c8": "^10.1.2",
"chai": "^5.1.1",
"chai-as-promised": "^8.0.0",
"del-cli": "^5.1.0",
"mocha": "^10.7.0",
"remark-cli": "^12.0.1",
Expand Down
31 changes: 25 additions & 6 deletions test/test.ts
Original file line number Diff line number Diff line change
@@ -1,16 +1,16 @@
import {assert, expect} from 'chai';
import {assert, expect, use} from 'chai';
import chaiAsPromised from 'chai-as-promised';
import {EventEmitter} from 'node:events';
import * as fs from 'node:fs';
import {Readable} from 'node:stream';
import {EndOfStreamError, type IStreamReader, StreamReader, WebStreamReader} from '../lib/index.js';
import {SourceStream, stringToReadableStream} from './util.js';


type StringToStreamFactory = (input: string) => IStreamReader;
use(chaiAsPromised);

interface StreamFactorySuite {
description: string;
fromString: StringToStreamFactory;
fromString: (input: string, delay?: number) => IStreamReader;
}

const latin1TextDecoder = new TextDecoder('latin1');
Expand All @@ -19,10 +19,10 @@ describe('Matrix', () => {

const streamFactories: StreamFactorySuite[] = [{
description: 'Node.js StreamReader',
fromString: input => new StreamReader(new SourceStream(input))
fromString: (input, delay) => new StreamReader(new SourceStream(input, delay))
}, {
description: 'WebStream Reader',
fromString: input => new WebStreamReader(stringToReadableStream(input))
fromString: (input, delay) => new WebStreamReader(stringToReadableStream(input, delay))
}];

streamFactories
Expand Down Expand Up @@ -204,6 +204,25 @@ describe('Matrix', () => {

});

describe('Handle delayed read', () => {

it('handle delay', async () => {
const fileReadStream = factory.fromString('123', 500);
const res = new Uint8Array(3);
const promise = fileReadStream.read(res, 0, 3);
assert.strictEqual(await promise, 3);
});

it('abort async operation', async () => {
const fileReadStream = factory.fromString('123', 500);
const res = new Uint8Array(3);
const promise = fileReadStream.read(res, 0, 3);
await fileReadStream.abort();
await expect(promise).to.be.rejectedWith(Error)
});

});

});
});
});
Expand Down
22 changes: 13 additions & 9 deletions test/util.ts
Original file line number Diff line number Diff line change
Expand Up @@ -10,21 +10,23 @@ export class SourceStream extends Readable {

private buf: Uint8Array;

constructor(private str = '') {
constructor(private str = '', private delay = 0) {
super();

this.buf = new TextEncoder().encode(str);
this.buf = new TextEncoder().encode(str);
}

public _read() {
this.push(this.buf);
this.push(null); // push the EOF-signaling `null` chunk
setTimeout(() => {
this.push(this.buf);
this.push(null); // Signal end of stream
}, this.delay);
}
}


// Function to convert a string to a BYOB ReadableStream
function stringToBYOBStream(inputString: string): ReadableStream<Uint8Array> {
function stringToBYOBStream(inputString: string, delay = 0): ReadableStream<Uint8Array> {
// Convert the string to a Uint8Array using TextEncoder
const encoder = new TextEncoder();
const uint8Array = encoder.encode(inputString);
Expand All @@ -48,8 +50,10 @@ function stringToBYOBStream(inputString: string): ReadableStream<Uint8Array> {
// @ts-ignore
controller.byobRequest.respond(bytesRead);
} else {
controller.enqueue(uint8Array);
position = uint8Array.length;
setTimeout(() => {
controller.enqueue(uint8Array);
position = uint8Array.length;
}, delay);
}
if (position >= uint8Array.length) {
controller.close();
Expand All @@ -60,6 +64,6 @@ function stringToBYOBStream(inputString: string): ReadableStream<Uint8Array> {
}

// Function to convert a string to a ReadableStreamBYOBReader
export function stringToReadableStream(inputString: string): ReadableStream<Uint8Array> {
return stringToBYOBStream(inputString);
export function stringToReadableStream(inputString: string, delay?: number): ReadableStream<Uint8Array> {
return stringToBYOBStream(inputString, delay);
}
34 changes: 28 additions & 6 deletions yarn.lock
Original file line number Diff line number Diff line change
Expand Up @@ -342,10 +342,19 @@ __metadata:
languageName: node
linkType: hard

"@types/chai@npm:^4.3.17":
version: 4.3.17
resolution: "@types/chai@npm:4.3.17"
checksum: 10c0/322a74489cdfde9c301b593d086c539584924c4c92689a858e0930708895a5ab229c31c64ac26b137615ef3ffbff1866851c280c093e07b3d3de05983d3793e0
"@types/chai-as-promised@npm:^8.0.0":
version: 8.0.0
resolution: "@types/chai-as-promised@npm:8.0.0"
dependencies:
"@types/chai": "npm:*"
checksum: 10c0/85c91bad8a5f1665a51a3fbb8cdd80b8eca663cf42816ccdfaddff1a9b8705060235220c26e32d361f2568386216c7ee484ab4e1735c971dbdcacced7acc857d
languageName: node
linkType: hard

"@types/chai@npm:*, @types/chai@npm:^4.3.19":
version: 4.3.19
resolution: "@types/chai@npm:4.3.19"
checksum: 10c0/8fd573192e486803c4d04185f2b0fab554660d9a1300dbed5bde9747ab8bef15f462a226f560ed5ca48827eecaf8d71eed64aa653ff9aec72fb2eae272e43a84
languageName: node
linkType: hard

Expand Down Expand Up @@ -774,6 +783,17 @@ __metadata:
languageName: node
linkType: hard

"chai-as-promised@npm:^8.0.0":
version: 8.0.0
resolution: "chai-as-promised@npm:8.0.0"
dependencies:
check-error: "npm:^2.0.0"
peerDependencies:
chai: ">= 2.1.2 < 6"
checksum: 10c0/60200ea9cdac24394a97e768edd7d0cd8962097a02e3e6158932c13f7e9d89489d20e6a2e540e80dbe845c71362e7a7a311672fd16a2eab4214a6952fd0db804
languageName: node
linkType: hard

"chai@npm:^5.1.1":
version: 5.1.1
resolution: "chai@npm:5.1.1"
Expand Down Expand Up @@ -850,7 +870,7 @@ __metadata:
languageName: node
linkType: hard

"check-error@npm:^2.1.1":
"check-error@npm:^2.0.0, check-error@npm:^2.1.1":
version: 2.1.1
resolution: "check-error@npm:2.1.1"
checksum: 10c0/979f13eccab306cf1785fa10941a590b4e7ea9916ea2a4f8c87f0316fc3eab07eabefb6e587424ef0f88cbcd3805791f172ea739863ca3d7ce2afc54641c7f0e
Expand Down Expand Up @@ -2868,11 +2888,13 @@ __metadata:
resolution: "peek-readable@workspace:."
dependencies:
"@biomejs/biome": "npm:1.8.3"
"@types/chai": "npm:^4.3.17"
"@types/chai": "npm:^4.3.19"
"@types/chai-as-promised": "npm:^8.0.0"
"@types/mocha": "npm:^10.0.7"
"@types/node": "npm:^22.1.0"
c8: "npm:^10.1.2"
chai: "npm:^5.1.1"
chai-as-promised: "npm:^8.0.0"
del-cli: "npm:^5.1.0"
mocha: "npm:^10.7.0"
remark-cli: "npm:^12.0.1"
Expand Down

0 comments on commit 49ab1f9

Please sign in to comment.