Skip to content

Commit

Permalink
Add error & completed events (#130)
Browse files Browse the repository at this point in the history
Co-authored-by: Sindre Sorhus <sindresorhus@gmail.com>
  • Loading branch information
dobesv and sindresorhus authored Mar 31, 2021
1 parent 8d0a356 commit a176837
Show file tree
Hide file tree
Showing 3 changed files with 99 additions and 5 deletions.
38 changes: 37 additions & 1 deletion readme.md
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,8 @@ If `true`, specifies that any [pending](https://developer.mozilla.org/en-US/docs

Adds a sync or async task to the queue. Always returns a promise.

Note: If your items can potentially throw an exception, you must handle those errors from the returned Promise or they may be reported as an unhandled Promise rejection and potentially cause your process to exit immediately.

##### fn

Type: `Function`
Expand Down Expand Up @@ -229,6 +231,40 @@ queue.add(() => Promise.resolve());
queue.add(() => delay(500));
```

#### completed

Emitted when an item completes without error.

```js
import delay from 'delay';
import PQueue from 'p-queue';

const queue = new PQueue({concurrency: 2});

queue.on('completed', result => {
console.log(result);
});

queue.add(() => Promise.resolve('hello, world!'));
```

#### error

Emitted if an item throws an error.

```js
import delay from 'delay';
import PQueue from 'p-queue';

const queue = new PQueue({concurrency: 2});

queue.on('error', error => {
console.error(error);
});

queue.add(() => Promise.reject(new Error('error')));
```

#### idle

Emitted every time the queue becomes empty and all promises have completed; `queue.size === 0 && queue.pending === 0`.
Expand Down Expand Up @@ -262,7 +298,7 @@ Emitted every time the add method is called and the number of pending or queued

#### next

Emitted every time a task is completed and the number of pending or queued tasks is decreased.
Emitted every time a task is completed and the number of pending or queued tasks is decreased. This is emitted regardless of whether the task completed normally or with an error.

```js
import delay from 'delay';
Expand Down
9 changes: 5 additions & 4 deletions source/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ const timeoutError = new TimeoutError();
/**
Promise queue with concurrency control.
*/
export default class PQueue<QueueType extends Queue<RunFunction, EnqueueOptionsType> = PriorityQueue, EnqueueOptionsType extends QueueAddOptions = DefaultAddOptions> extends EventEmitter<'active' | 'idle' | 'add' | 'next'> {
export default class PQueue<QueueType extends Queue<RunFunction, EnqueueOptionsType> = PriorityQueue, EnqueueOptionsType extends QueueAddOptions = DefaultAddOptions> extends EventEmitter<'active' | 'idle' | 'add' | 'next' | 'completed' | 'error'> {
private readonly _carryoverConcurrencyCount: boolean;

private readonly _isIntervalIgnored: boolean;
Expand Down Expand Up @@ -252,11 +252,12 @@ export default class PQueue<QueueType extends Queue<RunFunction, EnqueueOptionsT
}
);

// TODO: Fix this ignore.
// @ts-expect-error
resolve(await operation);
const result = await operation;
resolve(result!);
this.emit('completed', result);
} catch (error: unknown) {
reject(error);
this.emit('error', error);
}

this._next();
Expand Down
57 changes: 57 additions & 0 deletions test/test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -980,6 +980,63 @@ test('should emit next event when completing task', async t => {
t.is(timesCalled, 3);
});

test('should emit completed / error events', async t => {
const queue = new PQueue({concurrency: 1});

let errorEvents = 0;
let completedEvents = 0;
queue.on('error', () => {
errorEvents++;
});
queue.on('completed', () => {
completedEvents++;
});

const job1 = queue.add(async () => delay(100));

t.is(queue.pending, 1);
t.is(queue.size, 0);
t.is(errorEvents, 0);
t.is(completedEvents, 0);

const job2 = queue.add(async () => {
await delay(1);
throw new Error('failure');
});

t.is(queue.pending, 1);
t.is(queue.size, 1);
t.is(errorEvents, 0);
t.is(completedEvents, 0);

await job1;

t.is(queue.pending, 1);
t.is(queue.size, 0);
t.is(errorEvents, 0);
t.is(completedEvents, 1);

await t.throwsAsync(job2);

t.is(queue.pending, 0);
t.is(queue.size, 0);
t.is(errorEvents, 1);
t.is(completedEvents, 1);

const job3 = queue.add(async () => delay(100));

t.is(queue.pending, 1);
t.is(queue.size, 0);
t.is(errorEvents, 1);
t.is(completedEvents, 1);

await job3;
t.is(queue.pending, 0);
t.is(queue.size, 0);
t.is(errorEvents, 1);
t.is(completedEvents, 2);
});

test('should verify timeout overrides passed to add', async t => {
const queue = new PQueue({timeout: 200, throwOnTimeout: true});

Expand Down

0 comments on commit a176837

Please sign in to comment.