diff --git a/packages/utils/package.json b/packages/utils/package.json index 57e2351ec1..a2695c0531 100644 --- a/packages/utils/package.json +++ b/packages/utils/package.json @@ -171,6 +171,7 @@ "uint8arrays": "^5.1.0" }, "devDependencies": { + "@libp2p/peer-id": "^5.0.4", "@types/netmask": "^2.0.5", "aegir": "^44.0.1", "benchmark": "^2.1.4", diff --git a/packages/utils/src/errors.ts b/packages/utils/src/errors.ts index d2b62ebace..f801149fe1 100644 --- a/packages/utils/src/errors.ts +++ b/packages/utils/src/errors.ts @@ -18,3 +18,12 @@ export class RateLimitError extends Error { this.isFirstInDuration = props.isFirstInDuration } } + +export class QueueFullError extends Error { + static name = 'QueueFullError' + + constructor (message: string = 'The queue was full') { + super(message) + this.name = 'QueueFullError' + } +} diff --git a/packages/utils/src/queue/index.ts b/packages/utils/src/queue/index.ts index 3c9969ba25..44c06fc3b9 100644 --- a/packages/utils/src/queue/index.ts +++ b/packages/utils/src/queue/index.ts @@ -1,6 +1,7 @@ import { AbortError, TypedEventEmitter } from '@libp2p/interface' import { pushable } from 'it-pushable' import { raceEvent } from 'race-event' +import { QueueFullError } from '../errors.js' import { Job } from './job.js' import type { AbortOptions, Metrics } from '@libp2p/interface' @@ -21,6 +22,14 @@ export interface QueueInit extends TypedEventEmitter> { public concurrency: number + public maxSize: number public queue: Array> private pending: number private readonly sort?: Comparator> @@ -122,6 +132,7 @@ export class Queue, options?: JobOptions): Promise { options?.signal?.throwIfAborted() + if (this.size === this.maxSize) { + throw new QueueFullError() + } + const job = new Job(fn, options) this.enqueue(job) this.safeDispatchEvent('add') diff --git a/packages/utils/test/peer-job-queue.spec.ts b/packages/utils/test/peer-job-queue.spec.ts index e69de29bb2..66e4e5c268 100644 --- a/packages/utils/test/peer-job-queue.spec.ts +++ b/packages/utils/test/peer-job-queue.spec.ts @@ -0,0 +1,172 @@ +/* eslint-env mocha */ + +import { generateKeyPair } from '@libp2p/crypto/keys' +import { peerIdFromPrivateKey } from '@libp2p/peer-id' +import { expect } from 'aegir/chai' +import delay from 'delay' +import pDefer from 'p-defer' +import { raceEvent } from 'race-event' +import { PeerQueue, type PeerQueueJobOptions } from '../src/peer-queue.js' +import type { QueueJobFailure, QueueJobSuccess } from '../src/queue/index.js' + +describe('peer queue', () => { + it('should have jobs', async () => { + const deferred = pDefer() + + const privateKeyA = await generateKeyPair('Ed25519') + const peerIdA = peerIdFromPrivateKey(privateKeyA) + const privateKeyB = await generateKeyPair('Ed25519') + const peerIdB = peerIdFromPrivateKey(privateKeyB) + const queue = new PeerQueue({ + concurrency: 1 + }) + + expect(queue.has(peerIdA)).to.be.false() + + void queue.add(async () => { + await deferred.promise + }, { + peerId: peerIdB + }) + + void queue.add(async () => { + await deferred.promise + }, { + peerId: peerIdA + }) + + expect(queue.has(peerIdA)).to.be.true() + + deferred.resolve() + + await queue.onIdle() + + expect(queue.has(peerIdA)).to.be.false() + }) + + it('can join existing jobs', async () => { + const value = 'hello world' + const deferred = pDefer() + + const privateKeyA = await generateKeyPair('Ed25519') + const peerIdA = peerIdFromPrivateKey(privateKeyA) + const queue = new PeerQueue({ + concurrency: 1 + }) + + expect(queue.has(peerIdA)).to.be.false() + expect(queue.find(peerIdA)).to.be.undefined() + + void queue.add(async () => { + return deferred.promise + }, { + peerId: peerIdA + }) + + const job = queue.find(peerIdA) + const join = job?.join() + + deferred.resolve(value) + + await expect(join).to.eventually.equal(value) + + expect(queue.has(peerIdA)).to.be.false() + expect(queue.find(peerIdA)).to.be.undefined() + }) + + it('can join an existing job that fails', async () => { + const error = new Error('nope!') + const deferred = pDefer() + + const privateKeyA = await generateKeyPair('Ed25519') + const peerIdA = peerIdFromPrivateKey(privateKeyA) + const queue = new PeerQueue({ + concurrency: 1 + }) + + void queue.add(async () => { + return deferred.promise + }, { + peerId: peerIdA + }) + .catch(() => {}) + + const job = queue.find(peerIdA) + const joinedJob = job?.join() + + deferred.reject(error) + + await expect(joinedJob).to.eventually.rejected + .with.property('message', error.message) + }) + + it('cannot join jobs after clear', async () => { + const value = 'hello world' + const deferred = pDefer() + + const privateKeyA = await generateKeyPair('Ed25519') + const peerIdA = peerIdFromPrivateKey(privateKeyA) + const queue = new PeerQueue({ + concurrency: 1 + }) + + expect(queue.has(peerIdA)).to.be.false() + expect(queue.find(peerIdA)).to.be.undefined() + + void queue.add(async () => { + return deferred.promise + }, { + peerId: peerIdA + }).catch(() => {}) + + queue.clear() + + expect(queue.find(peerIdA)).to.be.undefined() + + deferred.resolve(value) + }) + + it('emits success event', async () => { + const value = 'hello world' + + const privateKeyA = await generateKeyPair('Ed25519') + const peerIdA = peerIdFromPrivateKey(privateKeyA) + const queue = new PeerQueue({ + concurrency: 1 + }) + + void queue.add(async () => { + await delay(100) + return value + }, { + peerId: peerIdA + }).catch(() => {}) + + const event = await raceEvent>>(queue, 'success') + + expect(event.detail.job.options.peerId).to.equal(peerIdA) + expect(event.detail.result).to.equal(value) + }) + + it('emits failure event', async () => { + const err = new Error('Oh no!') + + const privateKeyA = await generateKeyPair('Ed25519') + const peerIdA = peerIdFromPrivateKey(privateKeyA) + const queue = new PeerQueue({ + concurrency: 1 + }) + + void queue.add(async () => { + await delay(100) + throw err + }, { + peerId: peerIdA + }).catch(() => {}) + + const event = await raceEvent>>(queue, 'failure') + + expect(event.detail.job.options.peerId).to.equal(peerIdA) + expect(event.detail.error).to.equal(err) + }) +}) diff --git a/packages/utils/test/queue.spec.ts b/packages/utils/test/queue.spec.ts index 02d0faa56c..7b512f60a2 100644 --- a/packages/utils/test/queue.spec.ts +++ b/packages/utils/test/queue.spec.ts @@ -800,4 +800,24 @@ describe('queue', () => { // job not in queue any more expect(queue.queue.find(job => !job.options.slow)).to.be.undefined() }) + + it('rejects job when the queue is full', async () => { + const queue = new Queue({ + concurrency: 1, + maxSize: 1 + }) + + const job = async (): Promise => { + await delay(100) + return 'hello' + } + + const p = queue.add(job) + + await expect(queue.add(job)).to.eventually.be.rejected + .with.property('name', 'QueueFullError') + + await expect(p).to.eventually.equal('hello') + await expect(queue.add(job)).to.eventually.equal('hello') + }) })