Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Max pending jobs in prover node #8045

Merged
merged 2 commits into from
Aug 16, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 8 additions & 2 deletions yarn-project/end-to-end/src/e2e_prover_node.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ import {
sleep,
} from '@aztec/aztec.js';
import { StatefulTestContract, TestContract } from '@aztec/noir-contracts.js';
import { createProverNode } from '@aztec/prover-node';
import { type ProverNodeConfig, createProverNode } from '@aztec/prover-node';

import { sendL1ToL2Message } from './fixtures/l1_to_l2_messaging.js';
import {
Expand Down Expand Up @@ -127,7 +127,13 @@ describe('e2e_prover_node', () => {
logger.info(`Creating prover node ${proverId.toString()}`);
// HACK: We have to use the existing archiver to fetch L2 data, since anvil's chain dump/load used by the
// snapshot manager does not include events nor txs, so a new archiver would not "see" old blocks.
const proverConfig = { ...ctx.aztecNodeConfig, txProviderNodeUrl: undefined, dataDirectory: undefined, proverId };
const proverConfig: ProverNodeConfig = {
...ctx.aztecNodeConfig,
txProviderNodeUrl: undefined,
dataDirectory: undefined,
proverId,
proverNodeMaxPendingJobs: 100,
};
const archiver = ctx.aztecNode.getBlockSource() as Archiver;
const proverNode = await createProverNode(proverConfig, { aztecNodeTxProvider: ctx.aztecNode, archiver });

Expand Down
4 changes: 3 additions & 1 deletion yarn-project/foundation/src/config/env_var.ts
Original file line number Diff line number Diff line change
Expand Up @@ -97,4 +97,6 @@ export type EnvVar =
| 'BB_SKIP_CLEANUP'
| 'PXE_PROVER_ENABLED'
| 'VALIDATOR_PRIVATE_KEY'
| 'VALIDATOR_DISABLED';
| 'VALIDATOR_DISABLED'
| 'PROVER_NODE_DISABLE_AUTOMATIC_PROVING'
| 'PROVER_NODE_MAX_PENDING_JOBS';
1 change: 1 addition & 0 deletions yarn-project/prover-node/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@
"@aztec/sequencer-client": "workspace:^",
"@aztec/simulator": "workspace:^",
"@aztec/telemetry-client": "workspace:^",
"@aztec/types": "workspace:^",
"@aztec/world-state": "workspace:^",
"source-map-support": "^0.5.21",
"tslib": "^2.4.0"
Expand Down
29 changes: 27 additions & 2 deletions yarn-project/prover-node/src/config.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,10 @@
import { type ArchiverConfig, archiverConfigMappings, getArchiverConfigFromEnv } from '@aztec/archiver';
import { type ConfigMappingsType } from '@aztec/foundation/config';
import {
type ConfigMappingsType,
booleanConfigHelper,
getConfigFromMappings,
numberConfigHelper,
} from '@aztec/foundation/config';
import { type ProverClientConfig, getProverEnvVars, proverClientConfigMappings } from '@aztec/prover-client';
import {
type PublisherConfig,
Expand All @@ -18,7 +23,25 @@ export type ProverNodeConfig = ArchiverConfig &
WorldStateConfig &
PublisherConfig &
TxSenderConfig &
TxProviderConfig;
TxProviderConfig & {
proverNodeDisableAutomaticProving?: boolean;
proverNodeMaxPendingJobs?: number;
};

const specificProverNodeConfigMappings: ConfigMappingsType<
Pick<ProverNodeConfig, 'proverNodeDisableAutomaticProving' | 'proverNodeMaxPendingJobs'>
> = {
proverNodeDisableAutomaticProving: {
env: 'PROVER_NODE_DISABLE_AUTOMATIC_PROVING',
description: 'Whether to disable automatic proving of pending blocks seen on L1',
...booleanConfigHelper(false),
},
proverNodeMaxPendingJobs: {
env: 'PROVER_NODE_MAX_PENDING_JOBS',
description: 'The maximum number of pending jobs for the prover node',
...numberConfigHelper(100),
},
};

export const proverNodeConfigMappings: ConfigMappingsType<ProverNodeConfig> = {
...archiverConfigMappings,
Expand All @@ -27,6 +50,7 @@ export const proverNodeConfigMappings: ConfigMappingsType<ProverNodeConfig> = {
...getPublisherConfigMappings('PROVER'),
...getTxSenderConfigMappings('PROVER'),
...txProviderConfigMappings,
...specificProverNodeConfigMappings,
};

export function getProverNodeConfigFromEnv(): ProverNodeConfig {
Expand All @@ -37,5 +61,6 @@ export function getProverNodeConfigFromEnv(): ProverNodeConfig {
...getPublisherConfigFromEnv('PROVER'),
...getTxSenderConfigFromEnv('PROVER'),
...getTxProviderConfigFromEnv(),
...getConfigFromMappings(specificProverNodeConfigMappings),
};
}
4 changes: 4 additions & 0 deletions yarn-project/prover-node/src/factory.ts
Original file line number Diff line number Diff line change
Expand Up @@ -59,5 +59,9 @@ export async function createProverNode(
txProvider,
simulationProvider,
telemetry,
{
disableAutomaticProving: config.proverNodeDisableAutomaticProving,
maxPendingJobs: config.proverNodeMaxPendingJobs,
},
);
}
29 changes: 22 additions & 7 deletions yarn-project/prover-node/src/job/block-proving-job.ts
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@ import { createDebugLogger } from '@aztec/foundation/log';
import { type L1Publisher } from '@aztec/sequencer-client';
import { type PublicProcessor, type PublicProcessorFactory } from '@aztec/simulator';

import * as crypto from 'node:crypto';

/**
* Job that grabs a range of blocks from the unfinalised chain from L1, gets their txs given their hashes,
* re-executes their public calls, generates a rollup proof, and submits it to L1. This job will update the
Expand All @@ -22,6 +24,7 @@ import { type PublicProcessor, type PublicProcessorFactory } from '@aztec/simula
export class BlockProvingJob {
private state: BlockProvingJobState = 'initialized';
private log = createDebugLogger('aztec:block-proving-job');
private uuid: string;

constructor(
private prover: BlockProver,
Expand All @@ -30,8 +33,14 @@ export class BlockProvingJob {
private l2BlockSource: L2BlockSource,
private l1ToL2MessageSource: L1ToL2MessageSource,
private txProvider: TxProvider,
private cleanUp: () => Promise<void> = () => Promise.resolve(),
) {}
private cleanUp: (job: BlockProvingJob) => Promise<void> = () => Promise.resolve(),
) {
this.uuid = crypto.randomUUID();
}

public getId(): string {
return this.uuid;
}

public getState(): BlockProvingJobState {
return this.state;
Expand All @@ -42,7 +51,7 @@ export class BlockProvingJob {
throw new Error(`Block ranges are not yet supported`);
}

this.log.info(`Starting block proving job`, { fromBlock, toBlock });
this.log.info(`Starting block proving job`, { fromBlock, toBlock, uuid: this.uuid });
this.state = 'processing';
try {
let historicalHeader = (await this.l2BlockSource.getBlock(fromBlock - 1))?.header;
Expand All @@ -61,6 +70,7 @@ export class BlockProvingJob {
nullifierTreeRoot: block.header.state.partial.nullifierTree.root,
publicDataTreeRoot: block.header.state.partial.publicDataTree.root,
historicalHeader: historicalHeader?.hash(),
uuid: this.uuid,
...globalVariables,
});

Expand All @@ -75,6 +85,7 @@ export class BlockProvingJob {
this.log.verbose(`Processed all txs for block`, {
blockNumber: block.number,
blockHash: block.hash().toString(),
uuid: this.uuid,
});

await this.prover.setBlockCompleted();
Expand All @@ -90,7 +101,7 @@ export class BlockProvingJob {
}

const { block, aggregationObject, proof } = await this.prover.finaliseBlock();
this.log.info(`Finalised proof for block range`, { fromBlock, toBlock });
this.log.info(`Finalised proof for block range`, { fromBlock, toBlock, uuid: this.uuid });

this.state = 'publishing-proof';
await this.publisher.submitProof(
Expand All @@ -100,17 +111,21 @@ export class BlockProvingJob {
aggregationObject,
proof,
);
this.log.info(`Submitted proof for block range`, { fromBlock, toBlock });
this.log.info(`Submitted proof for block range`, { fromBlock, toBlock, uuid: this.uuid });

this.state = 'completed';
} catch (err) {
this.log.error(`Error running block prover job: ${err}`);
this.log.error(`Error running block prover job`, err, { uuid: this.uuid });
this.state = 'failed';
} finally {
await this.cleanUp();
await this.cleanUp(this);
}
}

public stop() {
this.prover.cancelBlock();
}

private async getBlock(blockNumber: number): Promise<L2Block> {
const block = await this.l2BlockSource.getBlock(blockNumber);
if (!block) {
Expand Down
179 changes: 179 additions & 0 deletions yarn-project/prover-node/src/prover-node.test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,179 @@
import {
type L1ToL2MessageSource,
type L2BlockSource,
type MerkleTreeOperations,
type ProverClient,
type TxProvider,
} from '@aztec/circuit-types';
import { type L1Publisher } from '@aztec/sequencer-client';
import { type PublicProcessorFactory, type SimulationProvider } from '@aztec/simulator';
import { type TelemetryClient } from '@aztec/telemetry-client';
import { type ContractDataSource } from '@aztec/types/contracts';
import { WorldStateRunningState, type WorldStateSynchronizer } from '@aztec/world-state';

import { type MockProxy, mock } from 'jest-mock-extended';

import { type BlockProvingJob } from './job/block-proving-job.js';
import { ProverNode } from './prover-node.js';

describe('prover-node', () => {
let prover: MockProxy<ProverClient>;
let publisher: MockProxy<L1Publisher>;
let l2BlockSource: MockProxy<L2BlockSource>;
let l1ToL2MessageSource: MockProxy<L1ToL2MessageSource>;
let contractDataSource: MockProxy<ContractDataSource>;
let worldState: MockProxy<WorldStateSynchronizer>;
let txProvider: MockProxy<TxProvider>;
let simulator: MockProxy<SimulationProvider>;
let telemetryClient: MockProxy<TelemetryClient>;

let proverNode: TestProverNode;

// List of all jobs ever created by the test prover node and their dependencies
let jobs: {
job: MockProxy<BlockProvingJob>;
cleanUp: (job: BlockProvingJob) => Promise<void>;
db: MerkleTreeOperations;
}[];

beforeEach(() => {
prover = mock<ProverClient>();
publisher = mock<L1Publisher>();
l2BlockSource = mock<L2BlockSource>();
l1ToL2MessageSource = mock<L1ToL2MessageSource>();
contractDataSource = mock<ContractDataSource>();
worldState = mock<WorldStateSynchronizer>();
txProvider = mock<TxProvider>();
simulator = mock<SimulationProvider>();
telemetryClient = mock<TelemetryClient>();

// World state returns a new mock db every time it is asked to fork
worldState.syncImmediateAndFork.mockImplementation(() => Promise.resolve(mock<MerkleTreeOperations>()));

jobs = [];
proverNode = new TestProverNode(
prover,
publisher,
l2BlockSource,
l1ToL2MessageSource,
contractDataSource,
worldState,
txProvider,
simulator,
telemetryClient,
{ maxPendingJobs: 3, pollingIntervalMs: 10 },
);
});

afterEach(async () => {
await proverNode.stop();
});

const setBlockNumbers = (blockNumber: number, provenBlockNumber: number) => {
l2BlockSource.getBlockNumber.mockResolvedValue(blockNumber);
l2BlockSource.getProvenBlockNumber.mockResolvedValue(provenBlockNumber);
worldState.status.mockResolvedValue({ syncedToL2Block: provenBlockNumber, state: WorldStateRunningState.RUNNING });
};

it('proves pending blocks', async () => {
setBlockNumbers(5, 3);

await proverNode.work();
await proverNode.work();
await proverNode.work();

expect(jobs.length).toEqual(2);
expect(jobs[0].job.run).toHaveBeenCalledWith(4, 4);
expect(jobs[1].job.run).toHaveBeenCalledWith(5, 5);
});

it('stops proving when maximum jobs are reached', async () => {
setBlockNumbers(10, 3);

await proverNode.work();
await proverNode.work();
await proverNode.work();
await proverNode.work();

expect(jobs.length).toEqual(3);
expect(jobs[0].job.run).toHaveBeenCalledWith(4, 4);
expect(jobs[1].job.run).toHaveBeenCalledWith(5, 5);
expect(jobs[2].job.run).toHaveBeenCalledWith(6, 6);
});

it('reports on pending jobs', async () => {
setBlockNumbers(5, 3);

await proverNode.work();
await proverNode.work();

expect(jobs.length).toEqual(2);
expect(proverNode.getJobs().length).toEqual(2);
expect(proverNode.getJobs()).toEqual([
{ uuid: '0', status: 'processing' },
{ uuid: '1', status: 'processing' },
]);
});

it('cleans up jobs when completed', async () => {
setBlockNumbers(10, 3);

await proverNode.work();
await proverNode.work();
await proverNode.work();
await proverNode.work();

expect(jobs.length).toEqual(3);
expect(jobs[0].job.run).toHaveBeenCalledWith(4, 4);
expect(jobs[1].job.run).toHaveBeenCalledWith(5, 5);
expect(jobs[2].job.run).toHaveBeenCalledWith(6, 6);

expect(proverNode.getJobs().length).toEqual(3);

// Clean up the first job
await jobs[0].cleanUp(jobs[0].job);
expect(proverNode.getJobs().length).toEqual(2);
expect(jobs[0].db.delete).toHaveBeenCalled();

// Request another job to run and ensure it gets pushed
await proverNode.work();
expect(jobs.length).toEqual(4);
expect(jobs[3].job.run).toHaveBeenCalledWith(7, 7);
expect(proverNode.getJobs().length).toEqual(3);
expect(proverNode.getJobs().map(({ uuid }) => uuid)).toEqual(['1', '2', '3']);
});

it('moves forward when proving fails', async () => {
setBlockNumbers(10, 3);

// We trigger an error by setting world state past the block that the prover node will try proving
worldState.status.mockResolvedValue({ syncedToL2Block: 5, state: WorldStateRunningState.RUNNING });

// These two calls should return in failures
await proverNode.work();
await proverNode.work();
expect(jobs.length).toEqual(0);

// But now the prover node should move forward
await proverNode.work();
expect(jobs.length).toEqual(1);
expect(jobs[0].job.run).toHaveBeenCalledWith(6, 6);
});

class TestProverNode extends ProverNode {
protected override doCreateBlockProvingJob(
db: MerkleTreeOperations,
_publicProcessorFactory: PublicProcessorFactory,
cleanUp: (job: BlockProvingJob) => Promise<void>,
): BlockProvingJob {
const job = mock<BlockProvingJob>({ getState: () => 'processing' });
job.getId.mockReturnValue(jobs.length.toString());
jobs.push({ job, cleanUp, db });
return job;
}

public override work() {
return super.work();
}
}
});
Loading
Loading