Skip to content

Commit

Permalink
Add function to abort async operations
Browse files Browse the repository at this point in the history
  • Loading branch information
Borewit committed Sep 2, 2024
1 parent 1a27c6f commit fc44366
Show file tree
Hide file tree
Showing 7 changed files with 185 additions and 38 deletions.
9 changes: 9 additions & 0 deletions lib/AbstractTokenizer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,11 @@ export abstract class AbstractTokenizer implements ITokenizer {
protected constructor(options?: ITokenizerOptions) {
this.fileInfo = options?.fileInfo ?? {};
this.onClose = options?.onClose;
if (options?.abortSignal) {
options.abortSignal.addEventListener('abort', () => {
this.abort();
})
}
}

/**
Expand Down Expand Up @@ -146,4 +151,8 @@ export abstract class AbstractTokenizer implements ITokenizer {
position: this.position
};
}

public abort(): Promise<void> {
return Promise.resolve(); // Ignore abort signal
}
}
4 changes: 4 additions & 0 deletions lib/ReadStreamTokenizer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -98,4 +98,8 @@ export class ReadStreamTokenizer extends AbstractTokenizer {
}
return totBytesRead;
}

public abort(): Promise<void> {
return this.streamReader.abort();
}
}
12 changes: 12 additions & 0 deletions lib/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,11 @@ export interface ITokenizer {
* It does not close the stream for StreamReader, but is does close the file-descriptor.
*/
close(): Promise<void>;

/**
* Abort pending asynchronous operations
*/
abort(): Promise<void>;
}

export type OnClose = () => Promise<void>;
Expand All @@ -127,8 +132,15 @@ export interface ITokenizerOptions {
* Pass additional file information to the tokenizer
*/
fileInfo?: IFileInfo;

/**
* On tokenizer close handler
*/
onClose?: OnClose;

/**
* Pass `AbortSignal` which can stop active async operations
* Ref: https://developer.mozilla.org/en-US/docs/Web/API/AbortSignal
*/
abortSignal?: AbortSignal;
}
4 changes: 3 additions & 1 deletion package.json
Original file line number Diff line number Diff line change
Expand Up @@ -53,11 +53,13 @@
"devDependencies": {
"@biomejs/biome": "^1.8.3",
"@types/chai": "^4.3.17",
"@types/chai-as-promised": "^8.0.0",
"@types/debug": "^4.1.12",
"@types/mocha": "^10.0.7",
"@types/node": "^22.2.0",
"c8": "^10.1.2",
"chai": "^5.1.1",
"chai-as-promised": "^8.0.0",
"del-cli": "^5.1.0",
"mocha": "^10.7.3",
"remark-cli": "^12.0.1",
Expand All @@ -69,7 +71,7 @@
},
"dependencies": {
"@tokenizer/token": "^0.3.0",
"peek-readable": "^5.1.4"
"peek-readable": "^5.2.0"
},
"keywords": [
"tokenizer",
Expand Down
71 changes: 55 additions & 16 deletions test/test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,8 @@ import { dirname } from 'node:path';
import { fileURLToPath } from 'node:url';

import * as Token from 'token-types';
import { assert } from 'chai';
import { assert, expect, use } from 'chai';
import chaiAsPromised from 'chai-as-promised';
import { fromStream, fromWebStream, fromFile, fromBuffer, type ITokenizer } from '../lib/index.js';
import Path from 'node:path';
import { FileTokenizer } from '../lib/FileTokenizer.js';
Expand All @@ -14,59 +15,67 @@ import { EndOfStreamError } from 'peek-readable';
import mocha from 'mocha';
import { stringToUint8Array } from 'uint8array-extras';

import { makeReadableByteFileStream } from './util.js';
import { DelayedStream, makeReadableByteFileStream } from './util.js';

use(chaiAsPromised);

const __dirname = dirname(fileURLToPath(import .meta.url));

const {describe, it} = mocha;

interface ITokenizerTest {
name: string;
loadTokenizer: (testFile: string) => Promise<ITokenizer>;
loadTokenizer: (testFile: string, delay?: number, abortSignal?: AbortSignal) => Promise<ITokenizer>;
hasFileInfo: boolean;
abortable: boolean;
}

function getResourcePath(testFile: string) {
return Path.join(__dirname, 'resources', testFile);
}

async function getTokenizerWithData(testData: string, test: ITokenizerTest): Promise<ITokenizer> {
async function getTokenizerWithData(testData: string, test: ITokenizerTest, delay?: number, abortSignal?: AbortSignal): Promise<ITokenizer> {
const testPath = getResourcePath('tmp.dat');
await fs.writeFile(testPath, testData, {encoding: 'latin1'});
return test.loadTokenizer('tmp.dat');
return test.loadTokenizer('tmp.dat', delay, abortSignal);
}

describe('Matrix tests', () => {

const tokenizerTests: ITokenizerTest[] = [
{
name: 'fromStream()',
loadTokenizer: async testFile => {
loadTokenizer: async (testFile, delay, abortSignal?: AbortSignal) => {
const stream = createReadStream(getResourcePath(testFile));
return fromStream(stream);
const delayedStream = new DelayedStream(stream, delay);
return fromStream(delayedStream, {abortSignal});
},
hasFileInfo: true
hasFileInfo: true,
abortable: true
}, {
name: 'fromWebStream()',
loadTokenizer: async testFile => {
const fileStream = await makeReadableByteFileStream(Path.join(__dirname, 'resources', testFile));
return fromWebStream(fileStream.stream, {onClose: () => fileStream.closeFile()});
loadTokenizer: async (testFile, delay, abortSignal?: AbortSignal) => {
const fileStream = await makeReadableByteFileStream(Path.join(__dirname, 'resources', testFile), delay);
return fromWebStream(fileStream.stream, {onClose: () => fileStream.closeFile(), abortSignal});
},
hasFileInfo: false
hasFileInfo: false,
abortable: true
}, {
name: 'fromFile()',
loadTokenizer: async testFile => {
return fromFile(Path.join(__dirname, 'resources', testFile));
},
hasFileInfo: true
hasFileInfo: true,
abortable: false
}, {
name: 'fromBuffer()',
loadTokenizer: async testFile => {
return fs.readFile(Path.join(__dirname, 'resources', testFile)).then(data => {
return fromBuffer(data);
});
},
hasFileInfo: true
hasFileInfo: true,
abortable: false
}
];

Expand Down Expand Up @@ -114,6 +123,7 @@ describe('Matrix tests', () => {
assert.deepEqual(buffer, Uint8Array.from([0x02, 0x03, 0x04, 0x05, 0x06]));
await rst.close();
});

});

describe('tokenizer peek options', () => {
Expand Down Expand Up @@ -560,7 +570,7 @@ describe('Matrix tests', () => {
v = await rst.readNumber(Token.UINT8);
assert.strictEqual(v, expected % 255, `offset=${expected}`);
++expected;
} while (v>0);
} while (v > 0);
} catch (err) {
assert.instanceOf(err, EndOfStreamError);
assert.strictEqual(expected, size, 'total number of parsed bytes');
Expand Down Expand Up @@ -881,6 +891,35 @@ describe('Matrix tests', () => {
await tokenizer.close();
});

if (tokenizerType.abortable) {

describe('Abort delayed read', () => {

it('without aborting', async () => {
const fileReadStream = await getTokenizerWithData('123', tokenizerType, 500);
const promise = fileReadStream.readToken(new Token.StringType(3, 'utf-8'), 0);
assert.strictEqual(await promise, '123');
});

it('abort async operation using `abort()`', async () => {
const fileReadStream = await getTokenizerWithData('123', tokenizerType, 500);
const promise = fileReadStream.readToken(new Token.StringType(3, 'utf-8'), 0);
await fileReadStream.abort();
await expect(promise).to.be.rejectedWith(Error);
});


it('abort async operation using `AbortController`', async () => {
const abortController = new AbortController();
const fileReadStream = await getTokenizerWithData('123', tokenizerType, 500, abortController.signal);
const promise = fileReadStream.readToken(new Token.StringType(3, 'utf-8'), 0);
abortController.abort();
await expect(promise).to.be.rejectedWith(Error);
});
});

}

}); // End of test "Tokenizer-types"
});
});
Expand Down Expand Up @@ -919,7 +958,7 @@ describe('fromStream with mayBeLess flag', () => {
}
return;
} finally {
if(tokenizer) {
if (tokenizer) {
await tokenizer.close();
}
}
Expand Down
82 changes: 67 additions & 15 deletions test/util.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
import * as fs from 'node:fs/promises';
import { ReadableStream } from 'node:stream/web';
import { Readable } from 'node:stream';

export async function makeReadableByteFileStream(filename: string): Promise<{ stream: ReadableStream<Uint8Array>, closeFile: () => Promise<void> }> {
export async function makeReadableByteFileStream(filename: string, delay = 0): Promise<{ stream: ReadableStream<Uint8Array>, closeFile: () => Promise<void> }> {

let position = 0;
const fileHandle = await fs.open(filename, 'r');
Expand All @@ -15,22 +16,24 @@ export async function makeReadableByteFileStream(filename: string): Promise<{ st
// @ts-ignore
const view = controller.byobRequest.view;

try {
const {bytesRead} = await fileHandle.read(view, 0, view.byteLength, position);
if (bytesRead === 0) {
setTimeout(async () => {
try {
const {bytesRead} = await fileHandle.read(view, 0, view.byteLength, position);
if (bytesRead === 0) {
await fileHandle.close();
controller.close();
// @ts-ignore
controller.byobRequest.respond(0);
} else {
position += bytesRead;
// @ts-ignore
controller.byobRequest.respond(bytesRead);
}
} catch (err) {
controller.error(err);
await fileHandle.close();
controller.close();
// @ts-ignore
controller.byobRequest.respond(0);
} else {
position += bytesRead;
// @ts-ignore
controller.byobRequest.respond(bytesRead);
}
} catch (err) {
controller.error(err);
await fileHandle.close();
}
}, delay);
},

async cancel() {
Expand All @@ -44,3 +47,52 @@ export async function makeReadableByteFileStream(filename: string): Promise<{ st
}
};
}

export class DelayedStream extends Readable {

private buffer: (Uint8Array | null)[];
private isReading: boolean;
private path: string | undefined;

constructor(private sourceStream: Readable, private delay = 0) {
super();
this.path = (sourceStream as unknown as {path: string}).path;
this.buffer = [];
this.isReading = false;

this.sourceStream.on('data', (chunk) => {
this.buffer.push(chunk);
this.emitDelayed();
});

this.sourceStream.on('end', () => {
this.buffer.push(null); // Signal the end of the stream
this.emitDelayed();
});
}

_read() {
if (!this.isReading && this.buffer.length > 0) {
this.emitDelayed();
}
}

emitDelayed() {
if (this.isReading) return;

if (this.buffer.length > 0) {
this.isReading = true;
const chunk = this.buffer.shift();

setTimeout(() => {
this.push(chunk);
this.isReading = false;

if (this.buffer.length > 0) {
this.emitDelayed();
}
}, this.delay);
}
}
}

Loading

0 comments on commit fc44366

Please sign in to comment.