Skip to content

Commit

Permalink
feat: Sort proving jobs by epoch number (#7844)
Browse files Browse the repository at this point in the history
The proving jobs queue sorted jobs in FIFO order. When we added support
for proving multiple blocks simultaneously, this led to an inefficiency:
if jobs for two blocks were pushed into the queue, the merge and root
jobs for the first block would not be computed until the base jobs for
the second one were done.

This PR changes the queue so it works by priority, where priority is
based on an epoch number, defined as the block number for simplicity.

This also renames the "fifo" export in foundation to "queue", since we
now support priority in addition to fifo queues.
  • Loading branch information
spalladino authored Aug 12, 2024
1 parent 65583e3 commit 95c14a9
Show file tree
Hide file tree
Showing 24 changed files with 251 additions and 108 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ export interface ServerCircuitProver {
getBaseParityProof(
inputs: BaseParityInputs,
signal?: AbortSignal,
epochNumber?: number,
): Promise<RootParityInput<typeof RECURSIVE_PROOF_LENGTH>>;

/**
Expand All @@ -46,6 +47,7 @@ export interface ServerCircuitProver {
getRootParityProof(
inputs: RootParityInputs,
signal?: AbortSignal,
epochNumber?: number,
): Promise<RootParityInput<typeof NESTED_RECURSIVE_PROOF_LENGTH>>;

/**
Expand All @@ -55,6 +57,7 @@ export interface ServerCircuitProver {
getBaseRollupProof(
baseRollupInput: BaseRollupInputs,
signal?: AbortSignal,
epochNumber?: number,
): Promise<PublicInputsAndRecursiveProof<BaseOrMergeRollupPublicInputs>>;

/**
Expand All @@ -64,6 +67,7 @@ export interface ServerCircuitProver {
getTubeProof(
tubeInput: TubeInputs,
signal?: AbortSignal,
epochNumber?: number,
): Promise<{ tubeVK: VerificationKeyData; tubeProof: RecursiveProof<typeof RECURSIVE_PROOF_LENGTH> }>;

/**
Expand All @@ -73,6 +77,7 @@ export interface ServerCircuitProver {
getMergeRollupProof(
input: MergeRollupInputs,
signal?: AbortSignal,
epochNumber?: number,
): Promise<PublicInputsAndRecursiveProof<BaseOrMergeRollupPublicInputs>>;

/**
Expand All @@ -82,6 +87,7 @@ export interface ServerCircuitProver {
getRootRollupProof(
input: RootRollupInputs,
signal?: AbortSignal,
epochNumber?: number,
): Promise<PublicInputsAndRecursiveProof<RootRollupPublicInputs>>;

/**
Expand All @@ -91,6 +97,7 @@ export interface ServerCircuitProver {
getPublicKernelProof(
kernelRequest: PublicKernelNonTailRequest,
signal?: AbortSignal,
epochNumber?: number,
): Promise<PublicInputsAndRecursiveProof<PublicKernelCircuitPublicInputs>>;

/**
Expand All @@ -100,23 +107,26 @@ export interface ServerCircuitProver {
getPublicTailProof(
kernelRequest: PublicKernelTailRequest,
signal?: AbortSignal,
epochNumber?: number,
): Promise<PublicInputsAndRecursiveProof<KernelCircuitPublicInputs>>;

getEmptyPrivateKernelProof(
inputs: PrivateKernelEmptyInputData,
signal?: AbortSignal,
epochNumber?: number,
): Promise<PublicInputsAndRecursiveProof<KernelCircuitPublicInputs>>;

getEmptyTubeProof(
inputs: PrivateKernelEmptyInputData,
signal?: AbortSignal,
epochNumber?: number,
): Promise<PublicInputsAndTubeProof<KernelCircuitPublicInputs>>;

/**
* Create a proof for the AVM circuit.
* @param inputs - Inputs to the AVM circuit.
*/
getAvmProof(inputs: AvmCircuitInputs, signal?: AbortSignal): Promise<ProofAndVerificationKey>;
getAvmProof(inputs: AvmCircuitInputs, signal?: AbortSignal, epochNumber?: number): Promise<ProofAndVerificationKey>;
}

/**
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import { INITIAL_L2_BLOCK_NUM } from '@aztec/circuits.js/constants';
import { MemoryFifo, Semaphore, SerialQueue } from '@aztec/foundation/fifo';
import { createDebugLogger } from '@aztec/foundation/log';
import { FifoMemoryQueue, Semaphore, SerialQueue } from '@aztec/foundation/queue';
import { InterruptibleSleep } from '@aztec/foundation/sleep';

import { type L2Block } from '../l2_block.js';
Expand All @@ -21,7 +21,7 @@ export class L2BlockDownloader {
private interruptibleSleep = new InterruptibleSleep();
private readonly semaphore: Semaphore;
private readonly jobQueue = new SerialQueue();
private readonly blockQueue = new MemoryFifo<L2Block[]>();
private readonly blockQueue = new FifoMemoryQueue<L2Block[]>();
private readonly proven: boolean;
private readonly pollIntervalMS: number;

Expand Down
2 changes: 1 addition & 1 deletion yarn-project/foundation/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
"./crypto": "./dest/crypto/index.js",
"./error": "./dest/error/index.js",
"./eth-address": "./dest/eth-address/index.js",
"./fifo": "./dest/fifo/index.js",
"./queue": "./dest/queue/index.js",
"./fs": "./dest/fs/index.js",
"./hash": "./dest/hash/index.js",
"./json-rpc": "./dest/json-rpc/index.js",
Expand Down
2 changes: 1 addition & 1 deletion yarn-project/foundation/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ export * as crypto from './crypto/index.js';
export * as errors from './error/index.js';
export * as ethAddress from './eth-address/index.js';
export * as fields from './fields/index.js';
export * as fifo from './fifo/index.js';
export * as fifo from './queue/index.js';
export * as fs from './fs/index.js';
export * as jsonRpc from './json-rpc/index.js';
export * as jsonRpcClient from './json-rpc/client/index.js';
Expand Down
Original file line number Diff line number Diff line change
@@ -1,18 +1,19 @@
import { TimeoutError } from '../error/index.js';
import { createDebugLogger } from '../log/index.js';

/**
* A simple fifo queue. It can grow unbounded. It can have multiple producers and consumers.
* Putting an item onto the queue always succeeds, unless either end() or cancel() has been called in which case
* the item being pushed is simply discarded.
*/
export class MemoryFifo<T> {
export abstract class BaseMemoryQueue<T> {
private waiting: ((item: T | null) => void)[] = [];
private items: T[] = [];
private flushing = false;

constructor(private log = createDebugLogger('aztec:foundation:memory_fifo')) {}

protected abstract get items(): {
length: number;
get(): T | undefined;
put(item: T): void;
clear: () => void;
};

/**
* Returns the current number of items in the queue.
* The length represents the size of the queue at the time of invocation and may change as new items are added or consumed.
Expand All @@ -36,7 +37,7 @@ export class MemoryFifo<T> {
*/
public get(timeoutSec?: number): Promise<T | null> {
if (this.items.length) {
return Promise.resolve(this.items.shift()!);
return Promise.resolve(this.items.get()!);
}

if (this.items.length === 0 && this.flushing) {
Expand Down Expand Up @@ -78,7 +79,7 @@ export class MemoryFifo<T> {
this.waiting.shift()!(item);
return true;
} else {
this.items.push(item);
this.items.put(item);
return true;
}
}
Expand All @@ -100,7 +101,7 @@ export class MemoryFifo<T> {
*/
public cancel() {
this.flushing = true;
this.items = [];
this.items.clear();
this.waiting.forEach(resolve => resolve(null));
}

Expand Down
39 changes: 39 additions & 0 deletions yarn-project/foundation/src/queue/fifo_memory_queue.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
import { type DebugLogger } from '../log/logger.js';
import { BaseMemoryQueue } from './base_memory_queue.js';

/**
* A simple fifo queue. It can grow unbounded. It can have multiple producers and consumers.
* Putting an item onto the queue always succeeds, unless either end() or cancel() has been called in which case
* the item being pushed is simply discarded.
*/
export class FifoMemoryQueue<T> extends BaseMemoryQueue<T> {
private container = new FifoQueue<T>();

constructor(log?: DebugLogger) {
super(log);
}

protected override get items() {
return this.container;
}
}

class FifoQueue<T> {
private items: T[] = [];

public put(item: T): void {
this.items.push(item);
}

public get(): T | undefined {
return this.items.shift();
}

public get length(): number {
return this.items.length;
}

public clear() {
this.items = [];
}
}
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
export * from './memory_fifo.js';
export * from './fifo_memory_queue.js';
export * from './priority_memory_queue.js';
export * from './serial_queue.js';
export * from './bounded_serial_queue.js';
export * from './semaphore.js';
25 changes: 25 additions & 0 deletions yarn-project/foundation/src/queue/priority_memory_queue.test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
import { PriorityMemoryQueue } from './priority_memory_queue.js';

describe('PriorityMemoryQueue', () => {
let queue: PriorityMemoryQueue<number>;

beforeEach(() => {
queue = new PriorityMemoryQueue<number>((a, b) => a - b);
});

it('returns items in the correct order', async () => {
expect(queue.put(3)).toBeTruthy();
expect(queue.put(1)).toBeTruthy();
expect(queue.put(2)).toBeTruthy();

expect(queue.length()).toEqual(3);

expect(await queue.get()).toBe(1);
expect(await queue.get()).toBe(2);
expect(await queue.get()).toBe(3);

expect(queue.length()).toEqual(0);

await expect(queue.get(1)).rejects.toThrow(/timeout/i);
});
});
20 changes: 20 additions & 0 deletions yarn-project/foundation/src/queue/priority_memory_queue.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
import { BaseMemoryQueue } from './base_memory_queue.js';
import { PriorityQueue } from './priority_queue.js';

/**
* A priority queue. It can grow unbounded. It can have multiple producers and consumers.
* Putting an item onto the queue always succeeds, unless either end() or cancel() has been called in which case
* the item being pushed is simply discarded.
*/
export class PriorityMemoryQueue<T> extends BaseMemoryQueue<T> {
private container: PriorityQueue<T>;

constructor(comparator: (a: T, b: T) => number) {
super();
this.container = new PriorityQueue(comparator);
}

protected override get items() {
return this.container;
}
}
34 changes: 34 additions & 0 deletions yarn-project/foundation/src/queue/priority_queue.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
/**
* Priority queue implementation based on a custom comparator.
*/
export class PriorityQueue<T> {
private items: T[];

constructor(private comparator: (a: T, b: T) => number) {
this.items = [];
}

public put(item: T): void {
let i = 0;
while (i < this.items.length && this.comparator(item, this.items[i]) >= 0) {
i++;
}
this.items.splice(i, 0, item);
}

public get(): T | undefined {
return this.items.shift();
}

public peek(): T | undefined {
return this.items[0];
}

public clear() {
this.items = [];
}

public get length(): number {
return this.items.length;
}
}
Original file line number Diff line number Diff line change
@@ -1,10 +1,10 @@
import { MemoryFifo } from './memory_fifo.js';
import { FifoMemoryQueue } from './fifo_memory_queue.js';

/**
* Allows the acquiring of up to `size` tokens before calls to acquire block, waiting for a call to release().
*/
export class Semaphore {
private readonly queue = new MemoryFifo<boolean>();
private readonly queue = new FifoMemoryQueue<boolean>();

constructor(size: number) {
new Array(size).fill(true).map(() => this.queue.put(true));
Expand Down
Original file line number Diff line number Diff line change
@@ -1,10 +1,10 @@
import { MemoryFifo } from './memory_fifo.js';
import { FifoMemoryQueue } from './fifo_memory_queue.js';

/**
* A more specialized fifo queue that enqueues functions to execute. Enqueued functions are executed in serial.
*/
export class SerialQueue {
private readonly queue = new MemoryFifo<() => Promise<void>>();
private readonly queue = new FifoMemoryQueue<() => Promise<void>>();
private runningPromise!: Promise<void>;

/**
Expand Down
4 changes: 2 additions & 2 deletions yarn-project/foundation/src/wasm/wasm_module.ts
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
import { Buffer } from 'buffer';

import { randomBytes } from '../crypto/index.js';
import { MemoryFifo } from '../fifo/index.js';
import { type LogFn, createDebugOnlyLogger } from '../log/index.js';
import { FifoMemoryQueue } from '../queue/index.js';
import { getEmptyWasiSdk } from './empty_wasi_sdk.js';

/**
Expand Down Expand Up @@ -47,7 +47,7 @@ export class WasmModule implements IWasmModule {
private memory!: WebAssembly.Memory;
private heap!: Uint8Array;
private instance?: WebAssembly.Instance;
private mutexQ = new MemoryFifo<boolean>();
private mutexQ = new FifoMemoryQueue<boolean>();
private debug: LogFn;

/**
Expand Down
2 changes: 1 addition & 1 deletion yarn-project/p2p/src/service/libp2p_service.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import { type Gossipable, type RawGossipMessage, TopicType, TopicTypeMap, Tx } from '@aztec/circuit-types';
import { SerialQueue } from '@aztec/foundation/fifo';
import { createDebugLogger } from '@aztec/foundation/log';
import { SerialQueue } from '@aztec/foundation/queue';
import { RunningPromise } from '@aztec/foundation/running-promise';
import type { AztecKVStore } from '@aztec/kv-store';

Expand Down
Loading

0 comments on commit 95c14a9

Please sign in to comment.