diff --git a/.github/workflows/main.yml b/.github/workflows/main.yml index 41fe626..441975c 100644 --- a/.github/workflows/main.yml +++ b/.github/workflows/main.yml @@ -10,11 +10,10 @@ jobs: fail-fast: false matrix: node-version: - - 14 - - 12 + - 16 steps: - uses: actions/checkout@v2 - - uses: actions/setup-node@v1 + - uses: actions/setup-node@v2 with: node-version: ${{ matrix.node-version }} - run: npm install diff --git a/readme.md b/readme.md index dc32de4..cd92ce8 100644 --- a/readme.md +++ b/readme.md @@ -118,7 +118,7 @@ Note: If your items can potentially throw an exception, you must handle those er Type: `Function` -Promise-returning/async function. +Promise-returning/async function. When executed, it will receive `{signal}` as the first argument. #### options @@ -131,6 +131,41 @@ Default: `0` Priority of operation. Operations with greater priority will be scheduled first. +##### signal + +[`AbortSignal`](https://developer.mozilla.org/en-US/docs/Web/API/AbortSignal) for cancellation of the operation. When aborted, it will be removed from the queue and the `queue.add()` call will reject with an `AbortError`. If the operation is already running, the signal will need to be handled by the operation itself. + +```js +import PQueue, {AbortError} from 'p-queue'; +import got, {CancelError} from 'got'; + +const queue = new PQueue(); + +const controller = new AbortController(); + +try { + await queue.add(({signal}) => { + const request = got('https://sindresorhus.com'); + + signal.addEventListener('abort', () => { + request.cancel(); + }); + + try { + return await request; + } catch (error) { + if (!(error instanceof CancelError)) { + throw error; + } + } + }, {signal: controller.signal}); +} catch (error) { + if (!(error instanceof AbortError)) { + throw error; + } +} +``` + #### .addAll(fns, options?) Same as `.add()`, but accepts an array of sync or async functions and returns a promise that resolves when all functions are resolved. @@ -327,6 +362,10 @@ await queue.add(() => delay(600)); //=> 'Task is completed. Size: 0 Pending: 0' ``` +### AbortError + +The error thrown by `queue.add()` when a job is aborted before it is run. See [`signal`](#signal). + ## Advanced example A more advanced example to help you understand the flow. diff --git a/source/index.ts b/source/index.ts index 38e8ae5..2965430 100644 --- a/source/index.ts +++ b/source/index.ts @@ -2,57 +2,62 @@ import EventEmitter from 'eventemitter3'; import pTimeout, {TimeoutError} from 'p-timeout'; import {Queue, RunFunction} from './queue.js'; import PriorityQueue from './priority-queue.js'; -import {QueueAddOptions, DefaultAddOptions, Options} from './options.js'; +import {QueueAddOptions, Options, TaskOptions} from './options.js'; type ResolveFunction = (value?: T | PromiseLike) => void; type Task = - | (() => PromiseLike) - | (() => TaskResultType); + | ((options: TaskOptions) => PromiseLike) + | ((options: TaskOptions) => TaskResultType); // eslint-disable-next-line @typescript-eslint/no-empty-function const empty = (): void => {}; const timeoutError = new TimeoutError(); +/** +The error thrown by `queue.add()` when a job is aborted before it is run. See `signal`. +*/ +export class AbortError extends Error {} + /** Promise queue with concurrency control. */ -export default class PQueue = PriorityQueue, EnqueueOptionsType extends QueueAddOptions = DefaultAddOptions> extends EventEmitter<'active' | 'idle' | 'add' | 'next' | 'completed' | 'error'> { - private readonly _carryoverConcurrencyCount: boolean; +export default class PQueue = PriorityQueue, EnqueueOptionsType extends QueueAddOptions = QueueAddOptions> extends EventEmitter<'active' | 'idle' | 'add' | 'next' | 'completed' | 'error'> { + readonly #carryoverConcurrencyCount: boolean; - private readonly _isIntervalIgnored: boolean; + readonly #isIntervalIgnored: boolean; - private _intervalCount = 0; + #intervalCount = 0; - private readonly _intervalCap: number; + readonly #intervalCap: number; - private readonly _interval: number; + readonly #interval: number; - private _intervalEnd = 0; + #intervalEnd = 0; - private _intervalId?: NodeJS.Timeout; + #intervalId?: NodeJS.Timeout; - private _timeoutId?: NodeJS.Timeout; + #timeoutId?: NodeJS.Timeout; - private _queue: QueueType; + #queue: QueueType; - private readonly _queueClass: new () => QueueType; + readonly #queueClass: new () => QueueType; - private _pendingCount = 0; + #pendingCount = 0; // The `!` is needed because of https://github.com/microsoft/TypeScript/issues/32194 - private _concurrency!: number; + #concurrency!: number; - private _isPaused: boolean; + #isPaused: boolean; - private _resolveEmpty: ResolveFunction = empty; + #resolveEmpty: ResolveFunction = empty; - private _resolveIdle: ResolveFunction = empty; + #resolveIdle: ResolveFunction = empty; - private _timeout?: number; + #timeout?: number; - private readonly _throwOnTimeout: boolean; + readonly #throwOnTimeout: boolean; constructor(options?: Options) { super(); @@ -76,64 +81,64 @@ export default class PQueue= 0, got \`${options.interval?.toString() ?? ''}\` (${typeof options.interval})`); } - this._carryoverConcurrencyCount = options.carryoverConcurrencyCount!; - this._isIntervalIgnored = options.intervalCap === Number.POSITIVE_INFINITY || options.interval === 0; - this._intervalCap = options.intervalCap; - this._interval = options.interval; - this._queue = new options.queueClass!(); - this._queueClass = options.queueClass!; + this.#carryoverConcurrencyCount = options.carryoverConcurrencyCount!; + this.#isIntervalIgnored = options.intervalCap === Number.POSITIVE_INFINITY || options.interval === 0; + this.#intervalCap = options.intervalCap; + this.#interval = options.interval; + this.#queue = new options.queueClass!(); + this.#queueClass = options.queueClass!; this.concurrency = options.concurrency!; - this._timeout = options.timeout; - this._throwOnTimeout = options.throwOnTimeout === true; - this._isPaused = options.autoStart === false; + this.#timeout = options.timeout; + this.#throwOnTimeout = options.throwOnTimeout === true; + this.#isPaused = options.autoStart === false; } - private get _doesIntervalAllowAnother(): boolean { - return this._isIntervalIgnored || this._intervalCount < this._intervalCap; + get #doesIntervalAllowAnother(): boolean { + return this.#isIntervalIgnored || this.#intervalCount < this.#intervalCap; } - private get _doesConcurrentAllowAnother(): boolean { - return this._pendingCount < this._concurrency; + get #doesConcurrentAllowAnother(): boolean { + return this.#pendingCount < this.#concurrency; } - private _next(): void { - this._pendingCount--; - this._tryToStartAnother(); + #next(): void { + this.#pendingCount--; + this.#tryToStartAnother(); this.emit('next'); } - private _resolvePromises(): void { - this._resolveEmpty(); - this._resolveEmpty = empty; + #resolvePromises(): void { + this.#resolveEmpty(); + this.#resolveEmpty = empty; - if (this._pendingCount === 0) { - this._resolveIdle(); - this._resolveIdle = empty; + if (this.#pendingCount === 0) { + this.#resolveIdle(); + this.#resolveIdle = empty; this.emit('idle'); } } - private _onResumeInterval(): void { - this._onInterval(); - this._initializeIntervalIfNeeded(); - this._timeoutId = undefined; + #onResumeInterval(): void { + this.#onInterval(); + this.#initializeIntervalIfNeeded(); + this.#timeoutId = undefined; } - private _isIntervalPaused(): boolean { + #isIntervalPaused(): boolean { const now = Date.now(); - if (this._intervalId === undefined) { - const delay = this._intervalEnd - now; + if (this.#intervalId === undefined) { + const delay = this.#intervalEnd - now; if (delay < 0) { // Act as the interval was done // We don't need to resume it here because it will be resumed on line 160 - this._intervalCount = (this._carryoverConcurrencyCount) ? this._pendingCount : 0; + this.#intervalCount = (this.#carryoverConcurrencyCount) ? this.#pendingCount : 0; } else { // Act as the interval is pending - if (this._timeoutId === undefined) { - this._timeoutId = setTimeout( + if (this.#timeoutId === undefined) { + this.#timeoutId = setTimeout( () => { - this._onResumeInterval(); + this.#onResumeInterval(); }, delay, ); @@ -146,25 +151,25 @@ export default class PQueue { - this._onInterval(); + this.#onInterval(); }, - this._interval, + this.#interval, ); - this._intervalEnd = Date.now() + this._interval; + this.#intervalEnd = Date.now() + this.#interval; } - private _onInterval(): void { - if (this._intervalCount === 0 && this._pendingCount === 0 && this._intervalId) { - clearInterval(this._intervalId); - this._intervalId = undefined; + #onInterval(): void { + if (this.#intervalCount === 0 && this.#pendingCount === 0 && this.#intervalId) { + clearInterval(this.#intervalId); + this.#intervalId = undefined; } - this._intervalCount = this._carryoverConcurrencyCount ? this._pendingCount : 0; - this._processQueue(); + this.#intervalCount = this.#carryoverConcurrencyCount ? this.#pendingCount : 0; + this.#processQueue(); } /** Executes all queued functions until it reaches the limit. */ - private _processQueue(): void { + #processQueue(): void { // eslint-disable-next-line no-empty - while (this._tryToStartAnother()) {} + while (this.#tryToStartAnother()) {} } get concurrency(): number { - return this._concurrency; + return this.#concurrency; } set concurrency(newConcurrency: number) { @@ -225,9 +230,9 @@ export default class PQueue(fn: Task, options: Partial = {}): Promise { return new Promise((resolve, reject) => { const run = async (): Promise => { - this._pendingCount++; - this._intervalCount++; + this.#pendingCount++; + this.#intervalCount++; try { - const operation = (this._timeout === undefined && options.timeout === undefined) ? fn() : pTimeout( - Promise.resolve(fn()), - (options.timeout === undefined ? this._timeout : options.timeout) as number, + if (options.signal?.aborted) { + // TODO: Use ABORT_ERR code when targeting Node.js 16 (https://nodejs.org/docs/latest-v16.x/api/errors.html#abort_err) + reject(new AbortError('The task was aborted.')); + return; + } + + const operation = (this.#timeout === undefined && options.timeout === undefined) ? fn({signal: options.signal}) : pTimeout( + Promise.resolve(fn({signal: options.signal})), + (options.timeout === undefined ? this.#timeout : options.timeout)!, () => { - if (options.throwOnTimeout === undefined ? this._throwOnTimeout : options.throwOnTimeout) { + if (options.throwOnTimeout === undefined ? this.#throwOnTimeout : options.throwOnTimeout) { reject(timeoutError); } @@ -260,11 +271,11 @@ export default class PQueue { // Instantly resolve if the queue is empty - if (this._queue.size === 0) { + if (this.#queue.size === 0) { return; } return new Promise(resolve => { - const existingResolve = this._resolveEmpty; - this._resolveEmpty = () => { + const existingResolve = this.#resolveEmpty; + this.#resolveEmpty = () => { existingResolve(); resolve(); }; @@ -338,13 +349,13 @@ export default class PQueue { // Instantly resolve if the queue is empty. - if (this._queue.size < limit) { + if (this.#queue.size < limit) { return; } return new Promise(resolve => { const listener = () => { - if (this._queue.size < limit) { + if (this.#queue.size < limit) { this.removeListener('next', listener); resolve(); } @@ -361,13 +372,13 @@ export default class PQueue { // Instantly resolve if none pending and if nothing else is queued - if (this._pendingCount === 0 && this._queue.size === 0) { + if (this.#pendingCount === 0 && this.#queue.size === 0) { return; } return new Promise(resolve => { - const existingResolve = this._resolveIdle; - this._resolveIdle = () => { + const existingResolve = this.#resolveIdle; + this.#resolveIdle = () => { existingResolve(); resolve(); }; @@ -378,7 +389,7 @@ export default class PQueue>): number { // eslint-disable-next-line unicorn/no-array-callback-reference - return this._queue.filter(options).length; + return this.#queue.filter(options).length; } /** Number of running items (no longer in the queue). */ get pending(): number { - return this._pendingCount; + return this.#pendingCount; } /** Whether the queue is currently paused. */ get isPaused(): boolean { - return this._isPaused; + return this.#isPaused; } get timeout(): number | undefined { - return this._timeout; + return this.#timeout; } /** Set the timeout for future operations. */ set timeout(milliseconds: number | undefined) { - this._timeout = milliseconds; + this.#timeout = milliseconds; } } -export {Queue, QueueAddOptions, DefaultAddOptions, Options}; +// TODO: Rename `DefaultAddOptions` to `QueueAddOptions` in next major version +export {Queue, QueueAddOptions, QueueAddOptions as DefaultAddOptions, Options}; diff --git a/source/options.ts b/source/options.ts index 82fd7b1..af5a196 100644 --- a/source/options.ts +++ b/source/options.ts @@ -1,8 +1,20 @@ import {Queue, RunFunction} from './queue.js'; -export type QueueAddOptions = Readonly>; +interface TimeoutOptions { + /** + Per-operation timeout in milliseconds. Operations fulfill once `timeout` elapses if they haven't already. + */ + timeout?: number; + + /** + Whether or not a timeout is considered an exception. -export interface Options, QueueOptions extends QueueAddOptions> { + @default false + */ + throwOnTimeout?: boolean; +} + +export interface Options, QueueOptions extends QueueAddOptions> extends TimeoutOptions { /** Concurrency limit. @@ -48,21 +60,9 @@ export interface Options, Que @default false */ readonly carryoverConcurrencyCount?: boolean; - - /** - Per-operation timeout in milliseconds. Operations fulfill once `timeout` elapses if they haven't already. - */ - timeout?: number; - - /** - Whether or not a timeout is considered an exception. - - @default false - */ - throwOnTimeout?: boolean; } -export interface DefaultAddOptions extends QueueAddOptions { +export interface QueueAddOptions extends TaskOptions, TimeoutOptions { /** Priority of operation. Operations with greater priority will be scheduled first. @@ -70,3 +70,42 @@ export interface DefaultAddOptions extends QueueAddOptions { */ readonly priority?: number; } + +export interface TaskOptions { + /** + [`AbortSignal`](https://developer.mozilla.org/en-US/docs/Web/API/AbortSignal) for cancellation of the operation. When aborted, it will be removed from the queue and the `queue.add()` call will reject with an `AbortError`. If the operation is already running, the signal will need to be handled by the operation itself. + + @example + ``` + import PQueue, {AbortError} from 'p-queue'; + import got, {CancelError} from 'got'; + + const queue = new PQueue(); + + const controller = new AbortController(); + + try { + await queue.add(({signal}) => { + const request = got('https://sindresorhus.com'); + + signal.addEventListener('abort', () => { + request.cancel(); + }); + + try { + return await request; + } catch (error) { + if (!(error instanceof CancelError)) { + throw error; + } + } + }, {signal: controller.signal}); + } catch (error) { + if (!(error instanceof AbortError)) { + throw error; + } + } + ``` + */ + readonly signal?: AbortSignal; +} diff --git a/source/priority-queue.ts b/source/priority-queue.ts index acc3e73..45993bc 100644 --- a/source/priority-queue.ts +++ b/source/priority-queue.ts @@ -7,7 +7,7 @@ export interface PriorityQueueOptions extends QueueAddOptions { } export default class PriorityQueue implements Queue { - private readonly _queue: Array = []; + readonly #queue: Array = []; enqueue(run: RunFunction, options?: Partial): void { options = { @@ -20,30 +20,30 @@ export default class PriorityQueue implements Queue= options.priority!) { - this._queue.push(element); + if (this.size && this.#queue[this.size - 1]!.priority! >= options.priority!) { + this.#queue.push(element); return; } const index = lowerBound( - this._queue, element, + this.#queue, element, (a: Readonly, b: Readonly) => b.priority! - a.priority!, ); - this._queue.splice(index, 0, element); + this.#queue.splice(index, 0, element); } dequeue(): RunFunction | undefined { - const item = this._queue.shift(); + const item = this.#queue.shift(); return item?.run; } filter(options: Readonly>): RunFunction[] { - return this._queue.filter( + return this.#queue.filter( (element: Readonly) => element.priority === options.priority, ).map((element: Readonly<{run: RunFunction}>) => element.run); } get size(): number { - return this._queue.length; + return this.#queue.length; } } diff --git a/test/test.ts b/test/test.ts index 3fb6359..878e000 100644 --- a/test/test.ts +++ b/test/test.ts @@ -5,7 +5,7 @@ import delay from 'delay'; import inRange from 'in-range'; import timeSpan from 'time-span'; import randomInt from 'random-int'; -import PQueue from '../source/index.js'; +import PQueue, {AbortError} from '../source/index.js'; const fixture = Symbol('fixture'); @@ -1062,3 +1062,25 @@ test('should verify timeout overrides passed to add', async t => { await queue.onIdle(); }); + +test('should skip an aborted job', async t => { + const queue = new PQueue(); + + const controller = new AbortController(); + + controller.abort(); + // eslint-disable-next-line @typescript-eslint/no-empty-function + await t.throwsAsync(queue.add(() => {}, {signal: controller.signal}), { + instanceOf: AbortError, + }); +}); + +test('should pass AbortSignal instance to job', async t => { + const queue = new PQueue(); + + const controller = new AbortController(); + + await queue.add(async ({signal}) => { + t.is(controller.signal, signal); + }, {signal: controller.signal}); +});