From 33b232a8cd6cce0df6cd64acb861ed341882cde8 Mon Sep 17 00:00:00 2001 From: Borewit Date: Sun, 1 Sep 2024 20:17:24 +0200 Subject: [PATCH] Add function to abort async operations --- lib/AbstractTokenizer.ts | 9 +++++ lib/ReadStreamTokenizer.ts | 4 ++ lib/types.ts | 12 ++++++ package.json | 4 +- test/test.ts | 71 +++++++++++++++++++++++++-------- test/util.ts | 82 +++++++++++++++++++++++++++++++------- yarn.lock | 41 ++++++++++++++++--- 7 files changed, 185 insertions(+), 38 deletions(-) diff --git a/lib/AbstractTokenizer.ts b/lib/AbstractTokenizer.ts index 824a2443..fd78a7da 100644 --- a/lib/AbstractTokenizer.ts +++ b/lib/AbstractTokenizer.ts @@ -26,6 +26,11 @@ export abstract class AbstractTokenizer implements ITokenizer { protected constructor(options?: ITokenizerOptions) { this.fileInfo = options?.fileInfo ?? {}; this.onClose = options?.onClose; + if (options?.abortSignal) { + options.abortSignal.addEventListener('abort', () => { + this.abort(); + }) + } } /** @@ -146,4 +151,8 @@ export abstract class AbstractTokenizer implements ITokenizer { position: this.position }; } + + public abort(): Promise { + return Promise.resolve(); // Ignore abort signal + } } diff --git a/lib/ReadStreamTokenizer.ts b/lib/ReadStreamTokenizer.ts index 0168825b..42dce0f1 100644 --- a/lib/ReadStreamTokenizer.ts +++ b/lib/ReadStreamTokenizer.ts @@ -98,4 +98,8 @@ export class ReadStreamTokenizer extends AbstractTokenizer { } return totBytesRead; } + + public abort(): Promise { + return this.streamReader.abort(); + } } diff --git a/lib/types.ts b/lib/types.ts index 4a033bb2..f483bd80 100644 --- a/lib/types.ts +++ b/lib/types.ts @@ -118,6 +118,11 @@ export interface ITokenizer { * It does not close the stream for StreamReader, but is does close the file-descriptor. */ close(): Promise; + + /** + * Abort pending asynchronous operations + */ + abort(): Promise; } export type OnClose = () => Promise; @@ -127,8 +132,15 @@ export interface ITokenizerOptions { * Pass additional file information to the tokenizer */ fileInfo?: IFileInfo; + /** * On tokenizer close handler */ onClose?: OnClose; + + /** + * Pass `AbortSignal` which can stop active async operations + * Ref: https://developer.mozilla.org/en-US/docs/Web/API/AbortSignal + */ + abortSignal?: AbortSignal; } diff --git a/package.json b/package.json index 76ba3922..27f5d7a5 100644 --- a/package.json +++ b/package.json @@ -53,11 +53,13 @@ "devDependencies": { "@biomejs/biome": "^1.8.3", "@types/chai": "^4.3.17", + "@types/chai-as-promised": "^8.0.0", "@types/debug": "^4.1.12", "@types/mocha": "^10.0.7", "@types/node": "^22.2.0", "c8": "^10.1.2", "chai": "^5.1.1", + "chai-as-promised": "^8.0.0", "del-cli": "^5.1.0", "mocha": "^10.7.3", "remark-cli": "^12.0.1", @@ -69,7 +71,7 @@ }, "dependencies": { "@tokenizer/token": "^0.3.0", - "peek-readable": "^5.1.4" + "peek-readable": "^5.2.0" }, "keywords": [ "tokenizer", diff --git a/test/test.ts b/test/test.ts index 44f9a008..d808d096 100644 --- a/test/test.ts +++ b/test/test.ts @@ -5,7 +5,8 @@ import { dirname } from 'node:path'; import { fileURLToPath } from 'node:url'; import * as Token from 'token-types'; -import { assert } from 'chai'; +import { assert, expect, use } from 'chai'; +import chaiAsPromised from 'chai-as-promised'; import { fromStream, fromWebStream, fromFile, fromBuffer, type ITokenizer } from '../lib/index.js'; import Path from 'node:path'; import { FileTokenizer } from '../lib/FileTokenizer.js'; @@ -14,7 +15,9 @@ import { EndOfStreamError } from 'peek-readable'; import mocha from 'mocha'; import { stringToUint8Array } from 'uint8array-extras'; -import { makeReadableByteFileStream } from './util.js'; +import { DelayedStream, makeReadableByteFileStream } from './util.js'; + +use(chaiAsPromised); const __dirname = dirname(fileURLToPath(import .meta.url)); @@ -22,18 +25,19 @@ const {describe, it} = mocha; interface ITokenizerTest { name: string; - loadTokenizer: (testFile: string) => Promise; + loadTokenizer: (testFile: string, delay?: number, abortSignal?: AbortSignal) => Promise; hasFileInfo: boolean; + abortable: boolean; } function getResourcePath(testFile: string) { return Path.join(__dirname, 'resources', testFile); } -async function getTokenizerWithData(testData: string, test: ITokenizerTest): Promise { +async function getTokenizerWithData(testData: string, test: ITokenizerTest, delay?: number, abortSignal?: AbortSignal): Promise { const testPath = getResourcePath('tmp.dat'); await fs.writeFile(testPath, testData, {encoding: 'latin1'}); - return test.loadTokenizer('tmp.dat'); + return test.loadTokenizer('tmp.dat', delay, abortSignal); } describe('Matrix tests', () => { @@ -41,24 +45,28 @@ describe('Matrix tests', () => { const tokenizerTests: ITokenizerTest[] = [ { name: 'fromStream()', - loadTokenizer: async testFile => { + loadTokenizer: async (testFile, delay, abortSignal?: AbortSignal) => { const stream = createReadStream(getResourcePath(testFile)); - return fromStream(stream); + const delayedStream = new DelayedStream(stream, delay); + return fromStream(delayedStream, {abortSignal}); }, - hasFileInfo: true + hasFileInfo: true, + abortable: true }, { name: 'fromWebStream()', - loadTokenizer: async testFile => { - const fileStream = await makeReadableByteFileStream(Path.join(__dirname, 'resources', testFile)); - return fromWebStream(fileStream.stream, {onClose: () => fileStream.closeFile()}); + loadTokenizer: async (testFile, delay, abortSignal?: AbortSignal) => { + const fileStream = await makeReadableByteFileStream(Path.join(__dirname, 'resources', testFile), delay); + return fromWebStream(fileStream.stream, {onClose: () => fileStream.closeFile(), abortSignal}); }, - hasFileInfo: false + hasFileInfo: false, + abortable: true }, { name: 'fromFile()', loadTokenizer: async testFile => { return fromFile(Path.join(__dirname, 'resources', testFile)); }, - hasFileInfo: true + hasFileInfo: true, + abortable: false }, { name: 'fromBuffer()', loadTokenizer: async testFile => { @@ -66,7 +74,8 @@ describe('Matrix tests', () => { return fromBuffer(data); }); }, - hasFileInfo: true + hasFileInfo: true, + abortable: false } ]; @@ -114,6 +123,7 @@ describe('Matrix tests', () => { assert.deepEqual(buffer, Uint8Array.from([0x02, 0x03, 0x04, 0x05, 0x06])); await rst.close(); }); + }); describe('tokenizer peek options', () => { @@ -560,7 +570,7 @@ describe('Matrix tests', () => { v = await rst.readNumber(Token.UINT8); assert.strictEqual(v, expected % 255, `offset=${expected}`); ++expected; - } while (v>0); + } while (v > 0); } catch (err) { assert.instanceOf(err, EndOfStreamError); assert.strictEqual(expected, size, 'total number of parsed bytes'); @@ -881,6 +891,35 @@ describe('Matrix tests', () => { await tokenizer.close(); }); + if (tokenizerType.abortable) { + + describe('Abort delayed read', () => { + + it('without aborting', async () => { + const fileReadStream = await getTokenizerWithData('123', tokenizerType, 500); + const promise = fileReadStream.readToken(new Token.StringType(3, 'utf-8'), 0); + assert.strictEqual(await promise, '123'); + }); + + it('abort async operation using `abort()`', async () => { + const fileReadStream = await getTokenizerWithData('123', tokenizerType, 500); + const promise = fileReadStream.readToken(new Token.StringType(3, 'utf-8'), 0); + await fileReadStream.abort(); + await expect(promise).to.be.rejectedWith(Error); + }); + + + it('abort async operation using `AbortController`', async () => { + const abortController = new AbortController(); + const fileReadStream = await getTokenizerWithData('123', tokenizerType, 500, abortController.signal); + const promise = fileReadStream.readToken(new Token.StringType(3, 'utf-8'), 0); + abortController.abort(); + await expect(promise).to.be.rejectedWith(Error); + }); + }); + + } + }); // End of test "Tokenizer-types" }); }); @@ -919,7 +958,7 @@ describe('fromStream with mayBeLess flag', () => { } return; } finally { - if(tokenizer) { + if (tokenizer) { await tokenizer.close(); } } diff --git a/test/util.ts b/test/util.ts index 76b01402..747bcc4e 100644 --- a/test/util.ts +++ b/test/util.ts @@ -1,7 +1,8 @@ import * as fs from 'node:fs/promises'; import { ReadableStream } from 'node:stream/web'; +import { Readable } from 'node:stream'; -export async function makeReadableByteFileStream(filename: string): Promise<{ stream: ReadableStream, closeFile: () => Promise }> { +export async function makeReadableByteFileStream(filename: string, delay = 0): Promise<{ stream: ReadableStream, closeFile: () => Promise }> { let position = 0; const fileHandle = await fs.open(filename, 'r'); @@ -15,22 +16,24 @@ export async function makeReadableByteFileStream(filename: string): Promise<{ st // @ts-ignore const view = controller.byobRequest.view; - try { - const {bytesRead} = await fileHandle.read(view, 0, view.byteLength, position); - if (bytesRead === 0) { + setTimeout(async () => { + try { + const {bytesRead} = await fileHandle.read(view, 0, view.byteLength, position); + if (bytesRead === 0) { + await fileHandle.close(); + controller.close(); + // @ts-ignore + controller.byobRequest.respond(0); + } else { + position += bytesRead; + // @ts-ignore + controller.byobRequest.respond(bytesRead); + } + } catch (err) { + controller.error(err); await fileHandle.close(); - controller.close(); - // @ts-ignore - controller.byobRequest.respond(0); - } else { - position += bytesRead; - // @ts-ignore - controller.byobRequest.respond(bytesRead); } - } catch (err) { - controller.error(err); - await fileHandle.close(); - } + }, delay); }, async cancel() { @@ -44,3 +47,52 @@ export async function makeReadableByteFileStream(filename: string): Promise<{ st } }; } + +export class DelayedStream extends Readable { + + private buffer: (Uint8Array | null)[]; + private isReading: boolean; + private path: string | undefined; + + constructor(private sourceStream: Readable, private delay = 0) { + super(); + this.path = (sourceStream as unknown as {path: string}).path; + this.buffer = []; + this.isReading = false; + + this.sourceStream.on('data', (chunk) => { + this.buffer.push(chunk); + this.emitDelayed(); + }); + + this.sourceStream.on('end', () => { + this.buffer.push(null); // Signal the end of the stream + this.emitDelayed(); + }); + } + + _read() { + if (!this.isReading && this.buffer.length > 0) { + this.emitDelayed(); + } + } + + emitDelayed() { + if (this.isReading) return; + + if (this.buffer.length > 0) { + this.isReading = true; + const chunk = this.buffer.shift(); + + setTimeout(() => { + this.push(chunk); + this.isReading = false; + + if (this.buffer.length > 0) { + this.emitDelayed(); + } + }, this.delay); + } + } +} + diff --git a/yarn.lock b/yarn.lock index 50e90a63..52bd738a 100644 --- a/yarn.lock +++ b/yarn.lock @@ -349,6 +349,22 @@ __metadata: languageName: node linkType: hard +"@types/chai-as-promised@npm:^8.0.0": + version: 8.0.0 + resolution: "@types/chai-as-promised@npm:8.0.0" + dependencies: + "@types/chai": "npm:*" + checksum: 10c0/85c91bad8a5f1665a51a3fbb8cdd80b8eca663cf42816ccdfaddff1a9b8705060235220c26e32d361f2568386216c7ee484ab4e1735c971dbdcacced7acc857d + languageName: node + linkType: hard + +"@types/chai@npm:*": + version: 4.3.19 + resolution: "@types/chai@npm:4.3.19" + checksum: 10c0/8fd573192e486803c4d04185f2b0fab554660d9a1300dbed5bde9747ab8bef15f462a226f560ed5ca48827eecaf8d71eed64aa653ff9aec72fb2eae272e43a84 + languageName: node + linkType: hard + "@types/chai@npm:^4.3.17": version: 4.3.17 resolution: "@types/chai@npm:4.3.17" @@ -787,6 +803,17 @@ __metadata: languageName: node linkType: hard +"chai-as-promised@npm:^8.0.0": + version: 8.0.0 + resolution: "chai-as-promised@npm:8.0.0" + dependencies: + check-error: "npm:^2.0.0" + peerDependencies: + chai: ">= 2.1.2 < 6" + checksum: 10c0/60200ea9cdac24394a97e768edd7d0cd8962097a02e3e6158932c13f7e9d89489d20e6a2e540e80dbe845c71362e7a7a311672fd16a2eab4214a6952fd0db804 + languageName: node + linkType: hard + "chai@npm:^5.1.1": version: 5.1.1 resolution: "chai@npm:5.1.1" @@ -856,7 +883,7 @@ __metadata: languageName: node linkType: hard -"check-error@npm:^2.1.1": +"check-error@npm:^2.0.0, check-error@npm:^2.1.1": version: 2.1.1 resolution: "check-error@npm:2.1.1" checksum: 10c0/979f13eccab306cf1785fa10941a590b4e7ea9916ea2a4f8c87f0316fc3eab07eabefb6e587424ef0f88cbcd3805791f172ea739863ca3d7ce2afc54641c7f0e @@ -2829,10 +2856,10 @@ __metadata: languageName: node linkType: hard -"peek-readable@npm:^5.1.4": - version: 5.1.4 - resolution: "peek-readable@npm:5.1.4" - checksum: 10c0/19015142b2f2556bec8b51c53111209bd6d774181df26dbf68908495819e35f4103e83915aa7ea35505d2a65d2712f2ab58f36b329cf61219c012621e65211b3 +"peek-readable@npm:^5.2.0": + version: 5.2.0 + resolution: "peek-readable@npm:5.2.0" + checksum: 10c0/7647d56786c94fc7f5f39923ef5f6ed1da7be92a2c221b743db74f1517821b1e944cc965d8f018a7e6984d969b6ced8a0c421aece7996c0b1981eac9daa4c323 languageName: node linkType: hard @@ -3584,14 +3611,16 @@ __metadata: "@biomejs/biome": "npm:^1.8.3" "@tokenizer/token": "npm:^0.3.0" "@types/chai": "npm:^4.3.17" + "@types/chai-as-promised": "npm:^8.0.0" "@types/debug": "npm:^4.1.12" "@types/mocha": "npm:^10.0.7" "@types/node": "npm:^22.2.0" c8: "npm:^10.1.2" chai: "npm:^5.1.1" + chai-as-promised: "npm:^8.0.0" del-cli: "npm:^5.1.0" mocha: "npm:^10.7.3" - peek-readable: "npm:^5.1.4" + peek-readable: "npm:^5.2.0" remark-cli: "npm:^12.0.1" remark-preset-lint-recommended: "npm:^7.0.0" token-types: "npm:^6.0.0"