diff --git a/yarn-project/end-to-end/src/e2e_prover_node.test.ts b/yarn-project/end-to-end/src/e2e_prover_node.test.ts index a2bc44ea239..b300dc8dac2 100644 --- a/yarn-project/end-to-end/src/e2e_prover_node.test.ts +++ b/yarn-project/end-to-end/src/e2e_prover_node.test.ts @@ -15,7 +15,7 @@ import { sleep, } from '@aztec/aztec.js'; import { StatefulTestContract, TestContract } from '@aztec/noir-contracts.js'; -import { createProverNode } from '@aztec/prover-node'; +import { type ProverNodeConfig, createProverNode } from '@aztec/prover-node'; import { sendL1ToL2Message } from './fixtures/l1_to_l2_messaging.js'; import { @@ -127,7 +127,13 @@ describe('e2e_prover_node', () => { logger.info(`Creating prover node ${proverId.toString()}`); // HACK: We have to use the existing archiver to fetch L2 data, since anvil's chain dump/load used by the // snapshot manager does not include events nor txs, so a new archiver would not "see" old blocks. - const proverConfig = { ...ctx.aztecNodeConfig, txProviderNodeUrl: undefined, dataDirectory: undefined, proverId }; + const proverConfig: ProverNodeConfig = { + ...ctx.aztecNodeConfig, + txProviderNodeUrl: undefined, + dataDirectory: undefined, + proverId, + proverNodeMaxPendingJobs: 100, + }; const archiver = ctx.aztecNode.getBlockSource() as Archiver; const proverNode = await createProverNode(proverConfig, { aztecNodeTxProvider: ctx.aztecNode, archiver }); diff --git a/yarn-project/foundation/src/config/env_var.ts b/yarn-project/foundation/src/config/env_var.ts index cc45692f3e4..6dd924eae74 100644 --- a/yarn-project/foundation/src/config/env_var.ts +++ b/yarn-project/foundation/src/config/env_var.ts @@ -97,4 +97,6 @@ export type EnvVar = | 'BB_SKIP_CLEANUP' | 'PXE_PROVER_ENABLED' | 'VALIDATOR_PRIVATE_KEY' - | 'VALIDATOR_DISABLED'; + | 'VALIDATOR_DISABLED' + | 'PROVER_NODE_DISABLE_AUTOMATIC_PROVING' + | 'PROVER_NODE_MAX_PENDING_JOBS'; diff --git a/yarn-project/prover-node/package.json b/yarn-project/prover-node/package.json index efcd145e763..1ab15127b80 100644 --- a/yarn-project/prover-node/package.json +++ b/yarn-project/prover-node/package.json @@ -58,6 +58,7 @@ "@aztec/sequencer-client": "workspace:^", "@aztec/simulator": "workspace:^", "@aztec/telemetry-client": "workspace:^", + "@aztec/types": "workspace:^", "@aztec/world-state": "workspace:^", "source-map-support": "^0.5.21", "tslib": "^2.4.0" diff --git a/yarn-project/prover-node/src/config.ts b/yarn-project/prover-node/src/config.ts index e49192a217c..d7229c80afe 100644 --- a/yarn-project/prover-node/src/config.ts +++ b/yarn-project/prover-node/src/config.ts @@ -1,5 +1,10 @@ import { type ArchiverConfig, archiverConfigMappings, getArchiverConfigFromEnv } from '@aztec/archiver'; -import { type ConfigMappingsType } from '@aztec/foundation/config'; +import { + type ConfigMappingsType, + booleanConfigHelper, + getConfigFromMappings, + numberConfigHelper, +} from '@aztec/foundation/config'; import { type ProverClientConfig, getProverEnvVars, proverClientConfigMappings } from '@aztec/prover-client'; import { type PublisherConfig, @@ -18,7 +23,25 @@ export type ProverNodeConfig = ArchiverConfig & WorldStateConfig & PublisherConfig & TxSenderConfig & - TxProviderConfig; + TxProviderConfig & { + proverNodeDisableAutomaticProving?: boolean; + proverNodeMaxPendingJobs?: number; + }; + +const specificProverNodeConfigMappings: ConfigMappingsType< + Pick +> = { + proverNodeDisableAutomaticProving: { + env: 'PROVER_NODE_DISABLE_AUTOMATIC_PROVING', + description: 'Whether to disable automatic proving of pending blocks seen on L1', + ...booleanConfigHelper(false), + }, + proverNodeMaxPendingJobs: { + env: 'PROVER_NODE_MAX_PENDING_JOBS', + description: 'The maximum number of pending jobs for the prover node', + ...numberConfigHelper(100), + }, +}; export const proverNodeConfigMappings: ConfigMappingsType = { ...archiverConfigMappings, @@ -27,6 +50,7 @@ export const proverNodeConfigMappings: ConfigMappingsType = { ...getPublisherConfigMappings('PROVER'), ...getTxSenderConfigMappings('PROVER'), ...txProviderConfigMappings, + ...specificProverNodeConfigMappings, }; export function getProverNodeConfigFromEnv(): ProverNodeConfig { @@ -37,5 +61,6 @@ export function getProverNodeConfigFromEnv(): ProverNodeConfig { ...getPublisherConfigFromEnv('PROVER'), ...getTxSenderConfigFromEnv('PROVER'), ...getTxProviderConfigFromEnv(), + ...getConfigFromMappings(specificProverNodeConfigMappings), }; } diff --git a/yarn-project/prover-node/src/factory.ts b/yarn-project/prover-node/src/factory.ts index f1e966de091..ae6235af9d1 100644 --- a/yarn-project/prover-node/src/factory.ts +++ b/yarn-project/prover-node/src/factory.ts @@ -59,5 +59,9 @@ export async function createProverNode( txProvider, simulationProvider, telemetry, + { + disableAutomaticProving: config.proverNodeDisableAutomaticProving, + maxPendingJobs: config.proverNodeMaxPendingJobs, + }, ); } diff --git a/yarn-project/prover-node/src/job/block-proving-job.ts b/yarn-project/prover-node/src/job/block-proving-job.ts index dbf9d4bf9fe..25fe23ed990 100644 --- a/yarn-project/prover-node/src/job/block-proving-job.ts +++ b/yarn-project/prover-node/src/job/block-proving-job.ts @@ -14,6 +14,8 @@ import { createDebugLogger } from '@aztec/foundation/log'; import { type L1Publisher } from '@aztec/sequencer-client'; import { type PublicProcessor, type PublicProcessorFactory } from '@aztec/simulator'; +import * as crypto from 'node:crypto'; + /** * Job that grabs a range of blocks from the unfinalised chain from L1, gets their txs given their hashes, * re-executes their public calls, generates a rollup proof, and submits it to L1. This job will update the @@ -22,6 +24,7 @@ import { type PublicProcessor, type PublicProcessorFactory } from '@aztec/simula export class BlockProvingJob { private state: BlockProvingJobState = 'initialized'; private log = createDebugLogger('aztec:block-proving-job'); + private uuid: string; constructor( private prover: BlockProver, @@ -30,8 +33,14 @@ export class BlockProvingJob { private l2BlockSource: L2BlockSource, private l1ToL2MessageSource: L1ToL2MessageSource, private txProvider: TxProvider, - private cleanUp: () => Promise = () => Promise.resolve(), - ) {} + private cleanUp: (job: BlockProvingJob) => Promise = () => Promise.resolve(), + ) { + this.uuid = crypto.randomUUID(); + } + + public getId(): string { + return this.uuid; + } public getState(): BlockProvingJobState { return this.state; @@ -42,7 +51,7 @@ export class BlockProvingJob { throw new Error(`Block ranges are not yet supported`); } - this.log.info(`Starting block proving job`, { fromBlock, toBlock }); + this.log.info(`Starting block proving job`, { fromBlock, toBlock, uuid: this.uuid }); this.state = 'processing'; try { let historicalHeader = (await this.l2BlockSource.getBlock(fromBlock - 1))?.header; @@ -61,6 +70,7 @@ export class BlockProvingJob { nullifierTreeRoot: block.header.state.partial.nullifierTree.root, publicDataTreeRoot: block.header.state.partial.publicDataTree.root, historicalHeader: historicalHeader?.hash(), + uuid: this.uuid, ...globalVariables, }); @@ -75,6 +85,7 @@ export class BlockProvingJob { this.log.verbose(`Processed all txs for block`, { blockNumber: block.number, blockHash: block.hash().toString(), + uuid: this.uuid, }); await this.prover.setBlockCompleted(); @@ -90,7 +101,7 @@ export class BlockProvingJob { } const { block, aggregationObject, proof } = await this.prover.finaliseBlock(); - this.log.info(`Finalised proof for block range`, { fromBlock, toBlock }); + this.log.info(`Finalised proof for block range`, { fromBlock, toBlock, uuid: this.uuid }); this.state = 'publishing-proof'; await this.publisher.submitProof( @@ -100,17 +111,21 @@ export class BlockProvingJob { aggregationObject, proof, ); - this.log.info(`Submitted proof for block range`, { fromBlock, toBlock }); + this.log.info(`Submitted proof for block range`, { fromBlock, toBlock, uuid: this.uuid }); this.state = 'completed'; } catch (err) { - this.log.error(`Error running block prover job: ${err}`); + this.log.error(`Error running block prover job`, err, { uuid: this.uuid }); this.state = 'failed'; } finally { - await this.cleanUp(); + await this.cleanUp(this); } } + public stop() { + this.prover.cancelBlock(); + } + private async getBlock(blockNumber: number): Promise { const block = await this.l2BlockSource.getBlock(blockNumber); if (!block) { diff --git a/yarn-project/prover-node/src/prover-node.test.ts b/yarn-project/prover-node/src/prover-node.test.ts new file mode 100644 index 00000000000..65f24b5a2ff --- /dev/null +++ b/yarn-project/prover-node/src/prover-node.test.ts @@ -0,0 +1,179 @@ +import { + type L1ToL2MessageSource, + type L2BlockSource, + type MerkleTreeOperations, + type ProverClient, + type TxProvider, +} from '@aztec/circuit-types'; +import { type L1Publisher } from '@aztec/sequencer-client'; +import { type PublicProcessorFactory, type SimulationProvider } from '@aztec/simulator'; +import { type TelemetryClient } from '@aztec/telemetry-client'; +import { type ContractDataSource } from '@aztec/types/contracts'; +import { WorldStateRunningState, type WorldStateSynchronizer } from '@aztec/world-state'; + +import { type MockProxy, mock } from 'jest-mock-extended'; + +import { type BlockProvingJob } from './job/block-proving-job.js'; +import { ProverNode } from './prover-node.js'; + +describe('prover-node', () => { + let prover: MockProxy; + let publisher: MockProxy; + let l2BlockSource: MockProxy; + let l1ToL2MessageSource: MockProxy; + let contractDataSource: MockProxy; + let worldState: MockProxy; + let txProvider: MockProxy; + let simulator: MockProxy; + let telemetryClient: MockProxy; + + let proverNode: TestProverNode; + + // List of all jobs ever created by the test prover node and their dependencies + let jobs: { + job: MockProxy; + cleanUp: (job: BlockProvingJob) => Promise; + db: MerkleTreeOperations; + }[]; + + beforeEach(() => { + prover = mock(); + publisher = mock(); + l2BlockSource = mock(); + l1ToL2MessageSource = mock(); + contractDataSource = mock(); + worldState = mock(); + txProvider = mock(); + simulator = mock(); + telemetryClient = mock(); + + // World state returns a new mock db every time it is asked to fork + worldState.syncImmediateAndFork.mockImplementation(() => Promise.resolve(mock())); + + jobs = []; + proverNode = new TestProverNode( + prover, + publisher, + l2BlockSource, + l1ToL2MessageSource, + contractDataSource, + worldState, + txProvider, + simulator, + telemetryClient, + { maxPendingJobs: 3, pollingIntervalMs: 10 }, + ); + }); + + afterEach(async () => { + await proverNode.stop(); + }); + + const setBlockNumbers = (blockNumber: number, provenBlockNumber: number) => { + l2BlockSource.getBlockNumber.mockResolvedValue(blockNumber); + l2BlockSource.getProvenBlockNumber.mockResolvedValue(provenBlockNumber); + worldState.status.mockResolvedValue({ syncedToL2Block: provenBlockNumber, state: WorldStateRunningState.RUNNING }); + }; + + it('proves pending blocks', async () => { + setBlockNumbers(5, 3); + + await proverNode.work(); + await proverNode.work(); + await proverNode.work(); + + expect(jobs.length).toEqual(2); + expect(jobs[0].job.run).toHaveBeenCalledWith(4, 4); + expect(jobs[1].job.run).toHaveBeenCalledWith(5, 5); + }); + + it('stops proving when maximum jobs are reached', async () => { + setBlockNumbers(10, 3); + + await proverNode.work(); + await proverNode.work(); + await proverNode.work(); + await proverNode.work(); + + expect(jobs.length).toEqual(3); + expect(jobs[0].job.run).toHaveBeenCalledWith(4, 4); + expect(jobs[1].job.run).toHaveBeenCalledWith(5, 5); + expect(jobs[2].job.run).toHaveBeenCalledWith(6, 6); + }); + + it('reports on pending jobs', async () => { + setBlockNumbers(5, 3); + + await proverNode.work(); + await proverNode.work(); + + expect(jobs.length).toEqual(2); + expect(proverNode.getJobs().length).toEqual(2); + expect(proverNode.getJobs()).toEqual([ + { uuid: '0', status: 'processing' }, + { uuid: '1', status: 'processing' }, + ]); + }); + + it('cleans up jobs when completed', async () => { + setBlockNumbers(10, 3); + + await proverNode.work(); + await proverNode.work(); + await proverNode.work(); + await proverNode.work(); + + expect(jobs.length).toEqual(3); + expect(jobs[0].job.run).toHaveBeenCalledWith(4, 4); + expect(jobs[1].job.run).toHaveBeenCalledWith(5, 5); + expect(jobs[2].job.run).toHaveBeenCalledWith(6, 6); + + expect(proverNode.getJobs().length).toEqual(3); + + // Clean up the first job + await jobs[0].cleanUp(jobs[0].job); + expect(proverNode.getJobs().length).toEqual(2); + expect(jobs[0].db.delete).toHaveBeenCalled(); + + // Request another job to run and ensure it gets pushed + await proverNode.work(); + expect(jobs.length).toEqual(4); + expect(jobs[3].job.run).toHaveBeenCalledWith(7, 7); + expect(proverNode.getJobs().length).toEqual(3); + expect(proverNode.getJobs().map(({ uuid }) => uuid)).toEqual(['1', '2', '3']); + }); + + it('moves forward when proving fails', async () => { + setBlockNumbers(10, 3); + + // We trigger an error by setting world state past the block that the prover node will try proving + worldState.status.mockResolvedValue({ syncedToL2Block: 5, state: WorldStateRunningState.RUNNING }); + + // These two calls should return in failures + await proverNode.work(); + await proverNode.work(); + expect(jobs.length).toEqual(0); + + // But now the prover node should move forward + await proverNode.work(); + expect(jobs.length).toEqual(1); + expect(jobs[0].job.run).toHaveBeenCalledWith(6, 6); + }); + + class TestProverNode extends ProverNode { + protected override doCreateBlockProvingJob( + db: MerkleTreeOperations, + _publicProcessorFactory: PublicProcessorFactory, + cleanUp: (job: BlockProvingJob) => Promise, + ): BlockProvingJob { + const job = mock({ getState: () => 'processing' }); + job.getId.mockReturnValue(jobs.length.toString()); + jobs.push({ job, cleanUp, db }); + return job; + } + + public override work() { + return super.work(); + } + } +}); diff --git a/yarn-project/prover-node/src/prover-node.ts b/yarn-project/prover-node/src/prover-node.ts index 8c9f7a21fb1..ebae8b234f1 100644 --- a/yarn-project/prover-node/src/prover-node.ts +++ b/yarn-project/prover-node/src/prover-node.ts @@ -1,13 +1,19 @@ -import { type L1ToL2MessageSource, type L2BlockSource, type ProverClient, type TxProvider } from '@aztec/circuit-types'; +import { + type L1ToL2MessageSource, + type L2BlockSource, + type MerkleTreeOperations, + type ProverClient, + type TxProvider, +} from '@aztec/circuit-types'; import { createDebugLogger } from '@aztec/foundation/log'; import { RunningPromise } from '@aztec/foundation/running-promise'; import { type L1Publisher } from '@aztec/sequencer-client'; import { PublicProcessorFactory, type SimulationProvider } from '@aztec/simulator'; import { type TelemetryClient } from '@aztec/telemetry-client'; +import { type ContractDataSource } from '@aztec/types/contracts'; import { type WorldStateSynchronizer } from '@aztec/world-state'; -import { type ContractDataSource } from '../../types/src/contracts/contract_data_source.js'; -import { BlockProvingJob } from './job/block-proving-job.js'; +import { BlockProvingJob, type BlockProvingJobState } from './job/block-proving-job.js'; /** * An Aztec Prover Node is a standalone process that monitors the unfinalised chain on L1 for unproven blocks, @@ -18,6 +24,8 @@ export class ProverNode { private log = createDebugLogger('aztec:prover-node'); private runningPromise: RunningPromise | undefined; private latestBlockWeAreProving: number | undefined; + private jobs: Map = new Map(); + private options: { pollingIntervalMs: number; disableAutomaticProving: boolean; maxPendingJobs: number }; constructor( private prover: ProverClient, @@ -29,11 +37,15 @@ export class ProverNode { private txProvider: TxProvider, private simulator: SimulationProvider, private telemetryClient: TelemetryClient, - private options: { pollingIntervalMs: number; disableAutomaticProving: boolean } = { + options: { pollingIntervalMs?: number; disableAutomaticProving?: boolean; maxPendingJobs?: number } = {}, + ) { + this.options = { pollingIntervalMs: 1_000, disableAutomaticProving: false, - }, - ) {} + maxPendingJobs: 100, + ...options, + }; + } /** * Starts the prover node so it periodically checks for unproven blocks in the unfinalised chain from L1 and proves them. @@ -54,8 +66,8 @@ export class ProverNode { await this.prover.stop(); await this.l2BlockSource.stop(); this.publisher.interrupt(); + this.jobs.forEach(job => job.stop()); this.log.info('Stopped ProverNode'); - // TODO(palla/prover-node): Keep a reference to all ongoing ProvingJobs and stop them. } /** @@ -68,6 +80,14 @@ export class ProverNode { return; } + if (!this.checkMaximumPendingJobs()) { + this.log.debug(`Maximum pending proving jobs reached. Skipping work.`, { + maxPendingJobs: this.options.maxPendingJobs, + pendingJobs: this.jobs.size, + }); + return; + } + const [latestBlockNumber, latestProvenBlockNumber] = await Promise.all([ this.l2BlockSource.getBlockNumber(), this.l2BlockSource.getProvenBlockNumber(), @@ -88,8 +108,13 @@ export class ProverNode { const fromBlock = latestProven + 1; const toBlock = fromBlock; // We only prove one block at a time for now - await this.startProof(fromBlock, toBlock); - this.latestBlockWeAreProving = toBlock; + try { + await this.startProof(fromBlock, toBlock); + } finally { + // If we fail to create a proving job for the given block, skip it instead of getting stuck on it. + this.log.verbose(`Setting ${toBlock} as latest block we are proving`); + this.latestBlockWeAreProving = toBlock; + } } catch (err) { this.log.error(`Error in prover node work`, err); } @@ -118,7 +143,23 @@ export class ProverNode { return this.prover; } + /** + * Returns an array of jobs being processed. + */ + public getJobs(): { uuid: string; status: BlockProvingJobState }[] { + return Array.from(this.jobs.entries()).map(([uuid, job]) => ({ uuid, status: job.getState() })); + } + + private checkMaximumPendingJobs() { + const { maxPendingJobs } = this.options; + return maxPendingJobs === 0 || this.jobs.size < maxPendingJobs; + } + private async createProvingJob(fromBlock: number) { + if (!this.checkMaximumPendingJobs()) { + throw new Error(`Maximum pending proving jobs ${this.options.maxPendingJobs} reached. Cannot create new job.`); + } + if ((await this.worldState.status()).syncedToL2Block >= fromBlock) { throw new Error(`Cannot create proving job for block ${fromBlock} as it is behind the current world state`); } @@ -134,6 +175,22 @@ export class ProverNode { this.telemetryClient, ); + const cleanUp = async () => { + await db.delete(); + this.jobs.delete(job.getId()); + }; + + const job = this.doCreateBlockProvingJob(db, publicProcessorFactory, cleanUp); + this.jobs.set(job.getId(), job); + return job; + } + + /** Extracted for testing purposes. */ + protected doCreateBlockProvingJob( + db: MerkleTreeOperations, + publicProcessorFactory: PublicProcessorFactory, + cleanUp: () => Promise, + ) { return new BlockProvingJob( this.prover.createBlockProver(db), publicProcessorFactory, @@ -141,7 +198,7 @@ export class ProverNode { this.l2BlockSource, this.l1ToL2MessageSource, this.txProvider, - () => db.delete(), + cleanUp, ); } } diff --git a/yarn-project/prover-node/tsconfig.json b/yarn-project/prover-node/tsconfig.json index 9f241158c41..ed7c2e5772e 100644 --- a/yarn-project/prover-node/tsconfig.json +++ b/yarn-project/prover-node/tsconfig.json @@ -33,6 +33,9 @@ { "path": "../telemetry-client" }, + { + "path": "../types" + }, { "path": "../world-state" } diff --git a/yarn-project/yarn.lock b/yarn-project/yarn.lock index 760f8200f69..4050f3da274 100644 --- a/yarn-project/yarn.lock +++ b/yarn-project/yarn.lock @@ -934,6 +934,7 @@ __metadata: "@aztec/sequencer-client": "workspace:^" "@aztec/simulator": "workspace:^" "@aztec/telemetry-client": "workspace:^" + "@aztec/types": "workspace:^" "@aztec/world-state": "workspace:^" "@jest/globals": ^29.5.0 "@types/jest": ^29.5.0