diff --git a/yarn-project/circuit-types/src/interfaces/server_circuit_prover.ts b/yarn-project/circuit-types/src/interfaces/server_circuit_prover.ts index 69c096cca47..40e4e01f33c 100644 --- a/yarn-project/circuit-types/src/interfaces/server_circuit_prover.ts +++ b/yarn-project/circuit-types/src/interfaces/server_circuit_prover.ts @@ -37,6 +37,7 @@ export interface ServerCircuitProver { getBaseParityProof( inputs: BaseParityInputs, signal?: AbortSignal, + epochNumber?: number, ): Promise>; /** @@ -46,6 +47,7 @@ export interface ServerCircuitProver { getRootParityProof( inputs: RootParityInputs, signal?: AbortSignal, + epochNumber?: number, ): Promise>; /** @@ -55,6 +57,7 @@ export interface ServerCircuitProver { getBaseRollupProof( baseRollupInput: BaseRollupInputs, signal?: AbortSignal, + epochNumber?: number, ): Promise>; /** @@ -64,6 +67,7 @@ export interface ServerCircuitProver { getTubeProof( tubeInput: TubeInputs, signal?: AbortSignal, + epochNumber?: number, ): Promise<{ tubeVK: VerificationKeyData; tubeProof: RecursiveProof }>; /** @@ -73,6 +77,7 @@ export interface ServerCircuitProver { getMergeRollupProof( input: MergeRollupInputs, signal?: AbortSignal, + epochNumber?: number, ): Promise>; /** @@ -82,6 +87,7 @@ export interface ServerCircuitProver { getRootRollupProof( input: RootRollupInputs, signal?: AbortSignal, + epochNumber?: number, ): Promise>; /** @@ -91,6 +97,7 @@ export interface ServerCircuitProver { getPublicKernelProof( kernelRequest: PublicKernelNonTailRequest, signal?: AbortSignal, + epochNumber?: number, ): Promise>; /** @@ -100,23 +107,26 @@ export interface ServerCircuitProver { getPublicTailProof( kernelRequest: PublicKernelTailRequest, signal?: AbortSignal, + epochNumber?: number, ): Promise>; getEmptyPrivateKernelProof( inputs: PrivateKernelEmptyInputData, signal?: AbortSignal, + epochNumber?: number, ): Promise>; getEmptyTubeProof( inputs: PrivateKernelEmptyInputData, signal?: AbortSignal, + epochNumber?: number, ): Promise>; /** * Create a proof for the AVM circuit. * @param inputs - Inputs to the AVM circuit. */ - getAvmProof(inputs: AvmCircuitInputs, signal?: AbortSignal): Promise; + getAvmProof(inputs: AvmCircuitInputs, signal?: AbortSignal, epochNumber?: number): Promise; } /** diff --git a/yarn-project/circuit-types/src/l2_block_downloader/l2_block_downloader.ts b/yarn-project/circuit-types/src/l2_block_downloader/l2_block_downloader.ts index 8c43ce23925..ce4e2eb5f67 100644 --- a/yarn-project/circuit-types/src/l2_block_downloader/l2_block_downloader.ts +++ b/yarn-project/circuit-types/src/l2_block_downloader/l2_block_downloader.ts @@ -1,6 +1,6 @@ import { INITIAL_L2_BLOCK_NUM } from '@aztec/circuits.js/constants'; -import { MemoryFifo, Semaphore, SerialQueue } from '@aztec/foundation/fifo'; import { createDebugLogger } from '@aztec/foundation/log'; +import { FifoMemoryQueue, Semaphore, SerialQueue } from '@aztec/foundation/queue'; import { InterruptibleSleep } from '@aztec/foundation/sleep'; import { type L2Block } from '../l2_block.js'; @@ -21,7 +21,7 @@ export class L2BlockDownloader { private interruptibleSleep = new InterruptibleSleep(); private readonly semaphore: Semaphore; private readonly jobQueue = new SerialQueue(); - private readonly blockQueue = new MemoryFifo(); + private readonly blockQueue = new FifoMemoryQueue(); private readonly proven: boolean; private readonly pollIntervalMS: number; diff --git a/yarn-project/foundation/package.json b/yarn-project/foundation/package.json index 9940003d2f5..cc7b6512aef 100644 --- a/yarn-project/foundation/package.json +++ b/yarn-project/foundation/package.json @@ -17,7 +17,7 @@ "./crypto": "./dest/crypto/index.js", "./error": "./dest/error/index.js", "./eth-address": "./dest/eth-address/index.js", - "./fifo": "./dest/fifo/index.js", + "./queue": "./dest/queue/index.js", "./fs": "./dest/fs/index.js", "./hash": "./dest/hash/index.js", "./json-rpc": "./dest/json-rpc/index.js", diff --git a/yarn-project/foundation/src/index.ts b/yarn-project/foundation/src/index.ts index 0f662455cab..a422558dd66 100644 --- a/yarn-project/foundation/src/index.ts +++ b/yarn-project/foundation/src/index.ts @@ -9,7 +9,7 @@ export * as crypto from './crypto/index.js'; export * as errors from './error/index.js'; export * as ethAddress from './eth-address/index.js'; export * as fields from './fields/index.js'; -export * as fifo from './fifo/index.js'; +export * as fifo from './queue/index.js'; export * as fs from './fs/index.js'; export * as jsonRpc from './json-rpc/index.js'; export * as jsonRpcClient from './json-rpc/client/index.js'; diff --git a/yarn-project/foundation/src/fifo/memory_fifo.ts b/yarn-project/foundation/src/queue/base_memory_queue.ts similarity index 90% rename from yarn-project/foundation/src/fifo/memory_fifo.ts rename to yarn-project/foundation/src/queue/base_memory_queue.ts index d7ef956d869..4677aa8e0b3 100644 --- a/yarn-project/foundation/src/fifo/memory_fifo.ts +++ b/yarn-project/foundation/src/queue/base_memory_queue.ts @@ -1,18 +1,19 @@ import { TimeoutError } from '../error/index.js'; import { createDebugLogger } from '../log/index.js'; -/** - * A simple fifo queue. It can grow unbounded. It can have multiple producers and consumers. - * Putting an item onto the queue always succeeds, unless either end() or cancel() has been called in which case - * the item being pushed is simply discarded. - */ -export class MemoryFifo { +export abstract class BaseMemoryQueue { private waiting: ((item: T | null) => void)[] = []; - private items: T[] = []; private flushing = false; constructor(private log = createDebugLogger('aztec:foundation:memory_fifo')) {} + protected abstract get items(): { + length: number; + get(): T | undefined; + put(item: T): void; + clear: () => void; + }; + /** * Returns the current number of items in the queue. * The length represents the size of the queue at the time of invocation and may change as new items are added or consumed. @@ -36,7 +37,7 @@ export class MemoryFifo { */ public get(timeoutSec?: number): Promise { if (this.items.length) { - return Promise.resolve(this.items.shift()!); + return Promise.resolve(this.items.get()!); } if (this.items.length === 0 && this.flushing) { @@ -78,7 +79,7 @@ export class MemoryFifo { this.waiting.shift()!(item); return true; } else { - this.items.push(item); + this.items.put(item); return true; } } @@ -100,7 +101,7 @@ export class MemoryFifo { */ public cancel() { this.flushing = true; - this.items = []; + this.items.clear(); this.waiting.forEach(resolve => resolve(null)); } diff --git a/yarn-project/foundation/src/fifo/bounded_serial_queue.ts b/yarn-project/foundation/src/queue/bounded_serial_queue.ts similarity index 100% rename from yarn-project/foundation/src/fifo/bounded_serial_queue.ts rename to yarn-project/foundation/src/queue/bounded_serial_queue.ts diff --git a/yarn-project/foundation/src/queue/fifo_memory_queue.ts b/yarn-project/foundation/src/queue/fifo_memory_queue.ts new file mode 100644 index 00000000000..e2271143ac6 --- /dev/null +++ b/yarn-project/foundation/src/queue/fifo_memory_queue.ts @@ -0,0 +1,39 @@ +import { type DebugLogger } from '../log/logger.js'; +import { BaseMemoryQueue } from './base_memory_queue.js'; + +/** + * A simple fifo queue. It can grow unbounded. It can have multiple producers and consumers. + * Putting an item onto the queue always succeeds, unless either end() or cancel() has been called in which case + * the item being pushed is simply discarded. + */ +export class FifoMemoryQueue extends BaseMemoryQueue { + private container = new FifoQueue(); + + constructor(log?: DebugLogger) { + super(log); + } + + protected override get items() { + return this.container; + } +} + +class FifoQueue { + private items: T[] = []; + + public put(item: T): void { + this.items.push(item); + } + + public get(): T | undefined { + return this.items.shift(); + } + + public get length(): number { + return this.items.length; + } + + public clear() { + this.items = []; + } +} diff --git a/yarn-project/foundation/src/fifo/index.ts b/yarn-project/foundation/src/queue/index.ts similarity index 56% rename from yarn-project/foundation/src/fifo/index.ts rename to yarn-project/foundation/src/queue/index.ts index 73b23f23dbd..e018dc2c2d3 100644 --- a/yarn-project/foundation/src/fifo/index.ts +++ b/yarn-project/foundation/src/queue/index.ts @@ -1,4 +1,5 @@ -export * from './memory_fifo.js'; +export * from './fifo_memory_queue.js'; +export * from './priority_memory_queue.js'; export * from './serial_queue.js'; export * from './bounded_serial_queue.js'; export * from './semaphore.js'; diff --git a/yarn-project/foundation/src/queue/priority_memory_queue.test.ts b/yarn-project/foundation/src/queue/priority_memory_queue.test.ts new file mode 100644 index 00000000000..e8b3f9b5ce2 --- /dev/null +++ b/yarn-project/foundation/src/queue/priority_memory_queue.test.ts @@ -0,0 +1,25 @@ +import { PriorityMemoryQueue } from './priority_memory_queue.js'; + +describe('PriorityMemoryQueue', () => { + let queue: PriorityMemoryQueue; + + beforeEach(() => { + queue = new PriorityMemoryQueue((a, b) => a - b); + }); + + it('returns items in the correct order', async () => { + expect(queue.put(3)).toBeTruthy(); + expect(queue.put(1)).toBeTruthy(); + expect(queue.put(2)).toBeTruthy(); + + expect(queue.length()).toEqual(3); + + expect(await queue.get()).toBe(1); + expect(await queue.get()).toBe(2); + expect(await queue.get()).toBe(3); + + expect(queue.length()).toEqual(0); + + await expect(queue.get(1)).rejects.toThrow(/timeout/i); + }); +}); diff --git a/yarn-project/foundation/src/queue/priority_memory_queue.ts b/yarn-project/foundation/src/queue/priority_memory_queue.ts new file mode 100644 index 00000000000..9dd3fe8ae46 --- /dev/null +++ b/yarn-project/foundation/src/queue/priority_memory_queue.ts @@ -0,0 +1,20 @@ +import { BaseMemoryQueue } from './base_memory_queue.js'; +import { PriorityQueue } from './priority_queue.js'; + +/** + * A priority queue. It can grow unbounded. It can have multiple producers and consumers. + * Putting an item onto the queue always succeeds, unless either end() or cancel() has been called in which case + * the item being pushed is simply discarded. + */ +export class PriorityMemoryQueue extends BaseMemoryQueue { + private container: PriorityQueue; + + constructor(comparator: (a: T, b: T) => number) { + super(); + this.container = new PriorityQueue(comparator); + } + + protected override get items() { + return this.container; + } +} diff --git a/yarn-project/foundation/src/queue/priority_queue.ts b/yarn-project/foundation/src/queue/priority_queue.ts new file mode 100644 index 00000000000..3da3feff66d --- /dev/null +++ b/yarn-project/foundation/src/queue/priority_queue.ts @@ -0,0 +1,34 @@ +/** + * Priority queue implementation based on a custom comparator. + */ +export class PriorityQueue { + private items: T[]; + + constructor(private comparator: (a: T, b: T) => number) { + this.items = []; + } + + public put(item: T): void { + let i = 0; + while (i < this.items.length && this.comparator(item, this.items[i]) >= 0) { + i++; + } + this.items.splice(i, 0, item); + } + + public get(): T | undefined { + return this.items.shift(); + } + + public peek(): T | undefined { + return this.items[0]; + } + + public clear() { + this.items = []; + } + + public get length(): number { + return this.items.length; + } +} diff --git a/yarn-project/foundation/src/fifo/semaphore.ts b/yarn-project/foundation/src/queue/semaphore.ts similarity index 90% rename from yarn-project/foundation/src/fifo/semaphore.ts rename to yarn-project/foundation/src/queue/semaphore.ts index b1d17734049..d20bd9c4cd2 100644 --- a/yarn-project/foundation/src/fifo/semaphore.ts +++ b/yarn-project/foundation/src/queue/semaphore.ts @@ -1,10 +1,10 @@ -import { MemoryFifo } from './memory_fifo.js'; +import { FifoMemoryQueue } from './fifo_memory_queue.js'; /** * Allows the acquiring of up to `size` tokens before calls to acquire block, waiting for a call to release(). */ export class Semaphore { - private readonly queue = new MemoryFifo(); + private readonly queue = new FifoMemoryQueue(); constructor(size: number) { new Array(size).fill(true).map(() => this.queue.put(true)); diff --git a/yarn-project/foundation/src/fifo/serial_queue.ts b/yarn-project/foundation/src/queue/serial_queue.ts similarity index 95% rename from yarn-project/foundation/src/fifo/serial_queue.ts rename to yarn-project/foundation/src/queue/serial_queue.ts index 6854cdc6ddc..b6bcd9baadd 100644 --- a/yarn-project/foundation/src/fifo/serial_queue.ts +++ b/yarn-project/foundation/src/queue/serial_queue.ts @@ -1,10 +1,10 @@ -import { MemoryFifo } from './memory_fifo.js'; +import { FifoMemoryQueue } from './fifo_memory_queue.js'; /** * A more specialized fifo queue that enqueues functions to execute. Enqueued functions are executed in serial. */ export class SerialQueue { - private readonly queue = new MemoryFifo<() => Promise>(); + private readonly queue = new FifoMemoryQueue<() => Promise>(); private runningPromise!: Promise; /** diff --git a/yarn-project/foundation/src/wasm/wasm_module.ts b/yarn-project/foundation/src/wasm/wasm_module.ts index 20851a05cc6..f0c3a15debe 100644 --- a/yarn-project/foundation/src/wasm/wasm_module.ts +++ b/yarn-project/foundation/src/wasm/wasm_module.ts @@ -1,8 +1,8 @@ import { Buffer } from 'buffer'; import { randomBytes } from '../crypto/index.js'; -import { MemoryFifo } from '../fifo/index.js'; import { type LogFn, createDebugOnlyLogger } from '../log/index.js'; +import { FifoMemoryQueue } from '../queue/index.js'; import { getEmptyWasiSdk } from './empty_wasi_sdk.js'; /** @@ -47,7 +47,7 @@ export class WasmModule implements IWasmModule { private memory!: WebAssembly.Memory; private heap!: Uint8Array; private instance?: WebAssembly.Instance; - private mutexQ = new MemoryFifo(); + private mutexQ = new FifoMemoryQueue(); private debug: LogFn; /** diff --git a/yarn-project/p2p/src/service/libp2p_service.ts b/yarn-project/p2p/src/service/libp2p_service.ts index 6f8d9d9d234..4570c8291f5 100644 --- a/yarn-project/p2p/src/service/libp2p_service.ts +++ b/yarn-project/p2p/src/service/libp2p_service.ts @@ -1,6 +1,6 @@ import { type Gossipable, type RawGossipMessage, TopicType, TopicTypeMap, Tx } from '@aztec/circuit-types'; -import { SerialQueue } from '@aztec/foundation/fifo'; import { createDebugLogger } from '@aztec/foundation/log'; +import { SerialQueue } from '@aztec/foundation/queue'; import { RunningPromise } from '@aztec/foundation/running-promise'; import type { AztecKVStore } from '@aztec/kv-store'; diff --git a/yarn-project/prover-client/src/orchestrator/orchestrator.ts b/yarn-project/prover-client/src/orchestrator/orchestrator.ts index 1a8f795b39e..ae00044dcda 100644 --- a/yarn-project/prover-client/src/orchestrator/orchestrator.ts +++ b/yarn-project/prover-client/src/orchestrator/orchestrator.ts @@ -345,6 +345,7 @@ export class ProvingOrchestrator implements BlockProver { getVKTreeRoot(), ), signal, + provingState.epochNumber, ), ), result => { @@ -577,7 +578,7 @@ export class ProvingOrchestrator implements BlockProver { return; } - // We build tha base rollup inputs using a mock proof and verification key. + // We build the base rollup inputs using a mock proof and verification key. // These will be overwritten later once we have proven the tube circuit and any public kernels const [ms, inputs] = await elapsed( buildBaseRollupInput( @@ -697,7 +698,7 @@ export class ProvingOrchestrator implements BlockProver { [Attributes.PROTOCOL_CIRCUIT_TYPE]: 'server', [Attributes.PROTOCOL_CIRCUIT_NAME]: 'base-rollup' as CircuitName, }, - signal => this.prover.getBaseRollupProof(tx.baseRollupInputs, signal), + signal => this.prover.getBaseRollupProof(tx.baseRollupInputs, signal, provingState.epochNumber), ), result => { logger.debug(`Completed proof for base rollup for tx ${tx.processedTx.hash.toString()}`); @@ -733,7 +734,12 @@ export class ProvingOrchestrator implements BlockProver { [Attributes.PROTOCOL_CIRCUIT_TYPE]: 'server', [Attributes.PROTOCOL_CIRCUIT_NAME]: 'tube-circuit' as CircuitName, }, - signal => this.prover.getTubeProof(new TubeInputs(txProvingState.processedTx.clientIvcProof), signal), + signal => + this.prover.getTubeProof( + new TubeInputs(txProvingState.processedTx.clientIvcProof), + signal, + provingState.epochNumber, + ), ), result => { logger.debug(`Completed tube proof for tx index: ${txIndex}`); @@ -772,7 +778,7 @@ export class ProvingOrchestrator implements BlockProver { [Attributes.PROTOCOL_CIRCUIT_TYPE]: 'server', [Attributes.PROTOCOL_CIRCUIT_NAME]: 'merge-rollup' as CircuitName, }, - signal => this.prover.getMergeRollupProof(inputs, signal), + signal => this.prover.getMergeRollupProof(inputs, signal, provingState.epochNumber), ), result => { this.storeAndExecuteNextMergeLevel(provingState, level, index, [ @@ -817,7 +823,7 @@ export class ProvingOrchestrator implements BlockProver { [Attributes.PROTOCOL_CIRCUIT_TYPE]: 'server', [Attributes.PROTOCOL_CIRCUIT_NAME]: 'root-rollup' as CircuitName, }, - signal => this.prover.getRootRollupProof(inputs, signal), + signal => this.prover.getRootRollupProof(inputs, signal, provingState.epochNumber), ), result => { provingState.rootRollupPublicInputs = result.inputs; @@ -847,7 +853,7 @@ export class ProvingOrchestrator implements BlockProver { [Attributes.PROTOCOL_CIRCUIT_TYPE]: 'server', [Attributes.PROTOCOL_CIRCUIT_NAME]: 'base-parity' as CircuitName, }, - signal => this.prover.getBaseParityProof(inputs, signal), + signal => this.prover.getBaseParityProof(inputs, signal, provingState.epochNumber), ), rootInput => { provingState.setRootParityInputs(rootInput, index); @@ -866,7 +872,7 @@ export class ProvingOrchestrator implements BlockProver { // Runs the root parity circuit ans stored the outputs // Enqueues the root rollup proof if all inputs are available - private enqueueRootParityCircuit(provingState: ProvingState | undefined, inputs: RootParityInputs) { + private enqueueRootParityCircuit(provingState: ProvingState, inputs: RootParityInputs) { this.deferredProving( provingState, wrapCallbackInSpan( @@ -876,7 +882,7 @@ export class ProvingOrchestrator implements BlockProver { [Attributes.PROTOCOL_CIRCUIT_TYPE]: 'server', [Attributes.PROTOCOL_CIRCUIT_NAME]: 'root-parity' as CircuitName, }, - signal => this.prover.getRootParityProof(inputs, signal), + signal => this.prover.getRootParityProof(inputs, signal, provingState.epochNumber), ), async rootInput => { provingState!.finalRootParityInput = rootInput; @@ -962,7 +968,7 @@ export class ProvingOrchestrator implements BlockProver { publicFunction.vmRequest!.avmHints, ); try { - return await this.prover.getAvmProof(inputs, signal); + return await this.prover.getAvmProof(inputs, signal, provingState.epochNumber); } catch (err) { if (process.env.AVM_PROVING_STRICT) { throw err; @@ -1073,9 +1079,9 @@ export class ProvingOrchestrator implements BlockProver { signal, ): Promise> => { if (request.type === PublicKernelType.TAIL) { - return this.prover.getPublicTailProof(request, signal); + return this.prover.getPublicTailProof(request, signal, provingState.epochNumber); } else { - return this.prover.getPublicKernelProof(request, signal); + return this.prover.getPublicKernelProof(request, signal, provingState.epochNumber); } }, ), diff --git a/yarn-project/prover-client/src/orchestrator/proving-state.ts b/yarn-project/prover-client/src/orchestrator/proving-state.ts index e247b6f0d20..b375bb7e0ac 100644 --- a/yarn-project/prover-client/src/orchestrator/proving-state.ts +++ b/yarn-project/prover-client/src/orchestrator/proving-state.ts @@ -143,6 +143,11 @@ export class ProvingState { return this.txs; } + /** Returns the block number as an epoch number. Used for prioritizing proof requests. */ + public get epochNumber(): number { + return this.globalVariables.blockNumber.toNumber(); + } + /** * Stores the inputs to a merge circuit and determines if the circuit is ready to be executed * @param mergeInputs - The inputs to store diff --git a/yarn-project/prover-client/src/prover-agent/memory-proving-queue.test.ts b/yarn-project/prover-client/src/prover-agent/memory-proving-queue.test.ts index 41eb09a9e54..a581c4012d0 100644 --- a/yarn-project/prover-client/src/prover-agent/memory-proving-queue.test.ts +++ b/yarn-project/prover-client/src/prover-agent/memory-proving-queue.test.ts @@ -7,7 +7,12 @@ import { VerificationKeyAsFields, makeRecursiveProof, } from '@aztec/circuits.js'; -import { makeBaseParityInputs, makeBaseRollupInputs, makeParityPublicInputs } from '@aztec/circuits.js/testing'; +import { + makeBaseParityInputs, + makeBaseRollupInputs, + makeParityPublicInputs, + makeRootRollupInputs, +} from '@aztec/circuits.js/testing'; import { makeTuple } from '@aztec/foundation/array'; import { AbortError } from '@aztec/foundation/error'; import { sleep } from '@aztec/foundation/sleep'; @@ -42,6 +47,32 @@ describe('MemoryProvingQueue', () => { expect(job2?.request.type).toEqual(ProvingRequestType.BASE_ROLLUP); }); + it('returns jobs ordered by priority', async () => { + // We push base rollup proof requests for a first block + void queue.getBaseRollupProof(makeBaseRollupInputs(), undefined, 1); + void queue.getBaseRollupProof(makeBaseRollupInputs(), undefined, 1); + + // The agent consumes one of them + expect((await queue.getProvingJob())!.request.type).toEqual(ProvingRequestType.BASE_ROLLUP); + + // A new block comes along with its base rollups, and the orchestrator then pushes a root request for the first one + void queue.getBaseRollupProof(makeBaseRollupInputs(), undefined, 2); + void queue.getBaseRollupProof(makeBaseRollupInputs(), undefined, 2); + void queue.getBaseRollupProof(makeBaseRollupInputs(), undefined, 2); + void queue.getBaseRollupProof(makeBaseRollupInputs(), undefined, 2); + void queue.getRootRollupProof(makeRootRollupInputs(), undefined, 1); + + // The next jobs for the agent should be the ones from block 1, skipping the ones for block 2 + expect((await queue.getProvingJob())!.request.type).toEqual(ProvingRequestType.BASE_ROLLUP); + expect((await queue.getProvingJob())!.request.type).toEqual(ProvingRequestType.ROOT_ROLLUP); + + // And the base rollups for block 2 should go next + expect((await queue.getProvingJob())!.request.type).toEqual(ProvingRequestType.BASE_ROLLUP); + expect((await queue.getProvingJob())!.request.type).toEqual(ProvingRequestType.BASE_ROLLUP); + expect((await queue.getProvingJob())!.request.type).toEqual(ProvingRequestType.BASE_ROLLUP); + expect((await queue.getProvingJob())!.request.type).toEqual(ProvingRequestType.BASE_ROLLUP); + }); + it('returns undefined when no jobs are available', async () => { await expect(queue.getProvingJob({ timeoutSec: 0 })).resolves.toBeUndefined(); }); diff --git a/yarn-project/prover-client/src/prover-agent/memory-proving-queue.ts b/yarn-project/prover-client/src/prover-agent/memory-proving-queue.ts index 0ee31b82acc..a161722315b 100644 --- a/yarn-project/prover-client/src/prover-agent/memory-proving-queue.ts +++ b/yarn-project/prover-client/src/prover-agent/memory-proving-queue.ts @@ -31,34 +31,35 @@ import type { } from '@aztec/circuits.js'; import { randomBytes } from '@aztec/foundation/crypto'; import { AbortError, TimeoutError } from '@aztec/foundation/error'; -import { MemoryFifo } from '@aztec/foundation/fifo'; import { createDebugLogger } from '@aztec/foundation/log'; import { type PromiseWithResolvers, RunningPromise, promiseWithResolvers } from '@aztec/foundation/promise'; +import { PriorityMemoryQueue } from '@aztec/foundation/queue'; import { serializeToBuffer } from '@aztec/foundation/serialize'; import { type TelemetryClient } from '@aztec/telemetry-client'; import { ProvingQueueMetrics } from './queue_metrics.js'; -type ProvingJobWithResolvers = { - id: string; - request: T; - signal?: AbortSignal; - attempts: number; - heartbeat: number; -} & PromiseWithResolvers>; +type ProvingJobWithResolvers = ProvingJob & + PromiseWithResolvers> & { + signal?: AbortSignal; + epochNumber?: number; + attempts: number; + heartbeat: number; + }; const MAX_RETRIES = 3; const defaultIdGenerator = () => randomBytes(4).toString('hex'); const defaultTimeSource = () => Date.now(); - /** * A helper class that sits in between services that need proofs created and agents that can create them. - * The queue accumulates jobs and provides them to agents in FIFO order. + * The queue accumulates jobs and provides them to agents prioritized by block number. */ export class MemoryProvingQueue implements ServerCircuitProver, ProvingJobSource { private log = createDebugLogger('aztec:prover-client:prover-pool:queue'); - private queue = new MemoryFifo(); + private queue = new PriorityMemoryQueue( + (a, b) => (a.epochNumber ?? 0) - (b.epochNumber ?? 0), + ); private jobsInProgress = new Map(); private runningPromise: RunningPromise; @@ -220,6 +221,7 @@ export class MemoryProvingQueue implements ServerCircuitProver, ProvingJobSource private enqueue( request: T, signal?: AbortSignal, + epochNumber?: number, ): Promise> { if (!this.runningPromise.isRunning()) { return Promise.reject(new Error('Proving queue is not running.')); @@ -235,6 +237,7 @@ export class MemoryProvingQueue implements ServerCircuitProver, ProvingJobSource reject, attempts: 1, heartbeat: 0, + epochNumber, }; if (signal) { @@ -258,22 +261,25 @@ export class MemoryProvingQueue implements ServerCircuitProver, ProvingJobSource getEmptyPrivateKernelProof( inputs: PrivateKernelEmptyInputData, signal?: AbortSignal, + epochNumber?: number, ): Promise> { - return this.enqueue({ type: ProvingRequestType.PRIVATE_KERNEL_EMPTY, inputs }, signal); + return this.enqueue({ type: ProvingRequestType.PRIVATE_KERNEL_EMPTY, inputs }, signal, epochNumber); } getTubeProof( inputs: TubeInputs, - signal?: AbortSignal | undefined, + signal?: AbortSignal, + epochNumber?: number, ): Promise<{ tubeVK: VerificationKeyData; tubeProof: RecursiveProof }> { - return this.enqueue({ type: ProvingRequestType.TUBE_PROOF, inputs }, signal); + return this.enqueue({ type: ProvingRequestType.TUBE_PROOF, inputs }, signal, epochNumber); } getEmptyTubeProof( inputs: PrivateKernelEmptyInputData, signal?: AbortSignal, + epochNumber?: number, ): Promise> { - return this.enqueue({ type: ProvingRequestType.PRIVATE_KERNEL_EMPTY, inputs }, signal); + return this.enqueue({ type: ProvingRequestType.PRIVATE_KERNEL_EMPTY, inputs }, signal, epochNumber); } /** @@ -283,14 +289,9 @@ export class MemoryProvingQueue implements ServerCircuitProver, ProvingJobSource getBaseParityProof( inputs: BaseParityInputs, signal?: AbortSignal, + epochNumber?: number, ): Promise> { - return this.enqueue( - { - type: ProvingRequestType.BASE_PARITY, - inputs, - }, - signal, - ); + return this.enqueue({ type: ProvingRequestType.BASE_PARITY, inputs }, signal, epochNumber); } /** @@ -300,14 +301,9 @@ export class MemoryProvingQueue implements ServerCircuitProver, ProvingJobSource getRootParityProof( inputs: RootParityInputs, signal?: AbortSignal, + epochNumber?: number, ): Promise> { - return this.enqueue( - { - type: ProvingRequestType.ROOT_PARITY, - inputs, - }, - signal, - ); + return this.enqueue({ type: ProvingRequestType.ROOT_PARITY, inputs }, signal, epochNumber); } /** @@ -317,14 +313,9 @@ export class MemoryProvingQueue implements ServerCircuitProver, ProvingJobSource getBaseRollupProof( baseRollupInput: BaseRollupInputs, signal?: AbortSignal, + epochNumber?: number, ): Promise> { - return this.enqueue( - { - type: ProvingRequestType.BASE_ROLLUP, - inputs: baseRollupInput, - }, - signal, - ); + return this.enqueue({ type: ProvingRequestType.BASE_ROLLUP, inputs: baseRollupInput }, signal, epochNumber); } /** @@ -334,14 +325,9 @@ export class MemoryProvingQueue implements ServerCircuitProver, ProvingJobSource getMergeRollupProof( input: MergeRollupInputs, signal?: AbortSignal, + epochNumber?: number, ): Promise> { - return this.enqueue( - { - type: ProvingRequestType.MERGE_ROLLUP, - inputs: input, - }, - signal, - ); + return this.enqueue({ type: ProvingRequestType.MERGE_ROLLUP, inputs: input }, signal, epochNumber); } /** @@ -351,14 +337,9 @@ export class MemoryProvingQueue implements ServerCircuitProver, ProvingJobSource getRootRollupProof( input: RootRollupInputs, signal?: AbortSignal, + epochNumber?: number, ): Promise> { - return this.enqueue( - { - type: ProvingRequestType.ROOT_ROLLUP, - inputs: input, - }, - signal, - ); + return this.enqueue({ type: ProvingRequestType.ROOT_ROLLUP, inputs: input }, signal, epochNumber); } /** @@ -368,14 +349,12 @@ export class MemoryProvingQueue implements ServerCircuitProver, ProvingJobSource getPublicKernelProof( kernelRequest: PublicKernelNonTailRequest, signal?: AbortSignal, + epochNumber?: number, ): Promise> { return this.enqueue( - { - type: ProvingRequestType.PUBLIC_KERNEL_NON_TAIL, - kernelType: kernelRequest.type, - inputs: kernelRequest.inputs, - }, + { type: ProvingRequestType.PUBLIC_KERNEL_NON_TAIL, kernelType: kernelRequest.type, inputs: kernelRequest.inputs }, signal, + epochNumber, ); } @@ -386,28 +365,20 @@ export class MemoryProvingQueue implements ServerCircuitProver, ProvingJobSource getPublicTailProof( kernelRequest: PublicKernelTailRequest, signal?: AbortSignal, + epochNumber?: number, ): Promise> { return this.enqueue( - { - type: ProvingRequestType.PUBLIC_KERNEL_TAIL, - kernelType: kernelRequest.type, - inputs: kernelRequest.inputs, - }, + { type: ProvingRequestType.PUBLIC_KERNEL_TAIL, kernelType: kernelRequest.type, inputs: kernelRequest.inputs }, signal, + epochNumber, ); } /** * Creates an AVM proof. */ - getAvmProof(inputs: AvmCircuitInputs, signal?: AbortSignal | undefined): Promise { - return this.enqueue( - { - type: ProvingRequestType.PUBLIC_VM, - inputs, - }, - signal, - ); + getAvmProof(inputs: AvmCircuitInputs, signal?: AbortSignal, epochNumber?: number): Promise { + return this.enqueue({ type: ProvingRequestType.PUBLIC_VM, inputs }, signal, epochNumber); } /** diff --git a/yarn-project/pxe/src/pxe_service/pxe_service.ts b/yarn-project/pxe/src/pxe_service/pxe_service.ts index 1b8708d9582..7b6ba12d62c 100644 --- a/yarn-project/pxe/src/pxe_service/pxe_service.ts +++ b/yarn-project/pxe/src/pxe_service/pxe_service.ts @@ -44,8 +44,8 @@ import { encodeArguments, } from '@aztec/foundation/abi'; import { type Fq, Fr, type Point } from '@aztec/foundation/fields'; -import { SerialQueue } from '@aztec/foundation/fifo'; import { type DebugLogger, createDebugLogger } from '@aztec/foundation/log'; +import { SerialQueue } from '@aztec/foundation/queue'; import { type KeyStore } from '@aztec/key-store'; import { ClassRegistererAddress } from '@aztec/protocol-contracts/class-registerer'; import { getCanonicalFeeJuice } from '@aztec/protocol-contracts/fee-juice'; diff --git a/yarn-project/pxe/src/synchronizer/synchronizer.test.ts b/yarn-project/pxe/src/synchronizer/synchronizer.test.ts index 6a55b6cb052..ea2d1d86c39 100644 --- a/yarn-project/pxe/src/synchronizer/synchronizer.test.ts +++ b/yarn-project/pxe/src/synchronizer/synchronizer.test.ts @@ -2,7 +2,7 @@ import { type AztecNode, L2Block } from '@aztec/circuit-types'; import { Fr, type Header, INITIAL_L2_BLOCK_NUM } from '@aztec/circuits.js'; import { makeHeader } from '@aztec/circuits.js/testing'; import { randomInt } from '@aztec/foundation/crypto'; -import { SerialQueue } from '@aztec/foundation/fifo'; +import { SerialQueue } from '@aztec/foundation/queue'; import { KeyStore } from '@aztec/key-store'; import { openTmpStore } from '@aztec/kv-store/utils'; diff --git a/yarn-project/pxe/src/synchronizer/synchronizer.ts b/yarn-project/pxe/src/synchronizer/synchronizer.ts index 52d7cc3d3a4..0ddfcc191aa 100644 --- a/yarn-project/pxe/src/synchronizer/synchronizer.ts +++ b/yarn-project/pxe/src/synchronizer/synchronizer.ts @@ -1,8 +1,8 @@ import { type AztecNode, type L2Block, MerkleTreeId, type TxHash } from '@aztec/circuit-types'; import { type NoteProcessorCaughtUpStats } from '@aztec/circuit-types/stats'; import { type AztecAddress, type Fr, INITIAL_L2_BLOCK_NUM, type PublicKey } from '@aztec/circuits.js'; -import { type SerialQueue } from '@aztec/foundation/fifo'; import { type DebugLogger, createDebugLogger } from '@aztec/foundation/log'; +import { type SerialQueue } from '@aztec/foundation/queue'; import { RunningPromise } from '@aztec/foundation/running-promise'; import { type KeyStore } from '@aztec/key-store'; diff --git a/yarn-project/world-state/src/synchronizer/server_world_state_synchronizer.ts b/yarn-project/world-state/src/synchronizer/server_world_state_synchronizer.ts index fd99f6ba9ce..a4512b41048 100644 --- a/yarn-project/world-state/src/synchronizer/server_world_state_synchronizer.ts +++ b/yarn-project/world-state/src/synchronizer/server_world_state_synchronizer.ts @@ -8,9 +8,9 @@ import { import { type L2BlockHandledStats } from '@aztec/circuit-types/stats'; import { L1_TO_L2_MSG_SUBTREE_HEIGHT } from '@aztec/circuits.js/constants'; import { Fr } from '@aztec/foundation/fields'; -import { SerialQueue } from '@aztec/foundation/fifo'; import { createDebugLogger } from '@aztec/foundation/log'; import { promiseWithResolvers } from '@aztec/foundation/promise'; +import { SerialQueue } from '@aztec/foundation/queue'; import { elapsed } from '@aztec/foundation/timer'; import { type AztecKVStore, type AztecSingleton } from '@aztec/kv-store'; import { openTmpStore } from '@aztec/kv-store/utils'; diff --git a/yarn-project/world-state/src/world-state-db/merkle_trees.ts b/yarn-project/world-state/src/world-state-db/merkle_trees.ts index 0ebd9937bca..8db05dc8119 100644 --- a/yarn-project/world-state/src/world-state-db/merkle_trees.ts +++ b/yarn-project/world-state/src/world-state-db/merkle_trees.ts @@ -30,8 +30,8 @@ import { StateReference, } from '@aztec/circuits.js'; import { padArrayEnd } from '@aztec/foundation/collection'; -import { SerialQueue } from '@aztec/foundation/fifo'; import { type DebugLogger, createDebugLogger } from '@aztec/foundation/log'; +import { SerialQueue } from '@aztec/foundation/queue'; import { type IndexedTreeLeafPreimage } from '@aztec/foundation/trees'; import { type AztecKVStore, type AztecSingleton } from '@aztec/kv-store'; import {