Skip to content

Commit

Permalink
refactor: archiver identifies prune
Browse files Browse the repository at this point in the history
  • Loading branch information
LHerskind committed Sep 20, 2024
1 parent 8bfc769 commit 708eb75
Show file tree
Hide file tree
Showing 10 changed files with 356 additions and 154 deletions.
79 changes: 76 additions & 3 deletions yarn-project/archiver/src/archiver/archiver.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -74,9 +74,10 @@ describe('Archiver', () => {

blocks = blockNumbers.map(x => L2Block.random(x, 4, x, x + 1, 2, 2));

rollupRead = mock<MockRollupContractRead>({
archiveAt: (args: readonly [bigint]) => Promise.resolve(blocks[Number(args[0] - 1n)].archive.root.toString()),
});
rollupRead = mock<MockRollupContractRead>();
rollupRead.archiveAt.mockImplementation((args: readonly [bigint]) =>
Promise.resolve(blocks[Number(args[0] - 1n)].archive.root.toString()),
);

((archiver as any).rollup as any).read = rollupRead;
});
Expand Down Expand Up @@ -275,6 +276,78 @@ describe('Archiver', () => {
expect(loggerSpy).toHaveBeenNthCalledWith(2, `No blocks to retrieve from ${1n} to ${50n}`);
}, 10_000);

it('Handle L2 reorg', async () => {
const loggerSpy = jest.spyOn((archiver as any).log, 'verbose');

let latestBlockNum = await archiver.getBlockNumber();
expect(latestBlockNum).toEqual(0);

const numL2BlocksInTest = 2;

const rollupTxs = blocks.map(makeRollupTx);

publicClient.getBlockNumber.mockResolvedValueOnce(50n).mockResolvedValueOnce(100n).mockResolvedValueOnce(150n);

// We will return status at first to have an empty round, then as if we have 2 pending blocks, and finally
// Just a single pending block returning a "failure" for the expected pending block
rollupRead.status
.mockResolvedValueOnce([0n, GENESIS_ROOT, 0n, GENESIS_ROOT, GENESIS_ROOT])
.mockResolvedValueOnce([0n, GENESIS_ROOT, 2n, blocks[1].archive.root.toString(), GENESIS_ROOT])
.mockResolvedValueOnce([0n, GENESIS_ROOT, 1n, blocks[0].archive.root.toString(), Fr.ZERO.toString()]);

rollupRead.archiveAt
.mockResolvedValueOnce(blocks[0].archive.root.toString())
.mockResolvedValueOnce(blocks[1].archive.root.toString())
.mockResolvedValueOnce(Fr.ZERO.toString());

// This can look slightly odd, but we will need to do an empty request for the messages, and will entirely skip
// a call to the proposed blocks because of changes with status.
mockGetLogs({
messageSent: [],
});
mockGetLogs({
messageSent: [makeMessageSentEvent(66n, 1n, 0n), makeMessageSentEvent(68n, 1n, 1n)],
L2BlockProposed: [
makeL2BlockProposedEvent(70n, 1n, blocks[0].archive.root.toString()),
makeL2BlockProposedEvent(80n, 2n, blocks[1].archive.root.toString()),
],
});
mockGetLogs({
messageSent: [],
});
mockGetLogs({
messageSent: [],
});

rollupTxs.forEach(tx => publicClient.getTransaction.mockResolvedValueOnce(tx));

await archiver.start(false);

while ((await archiver.getBlockNumber()) !== numL2BlocksInTest) {
await sleep(100);
}

latestBlockNum = await archiver.getBlockNumber();
expect(latestBlockNum).toEqual(numL2BlocksInTest);

// For some reason, this is 1-indexed.
expect(loggerSpy).toHaveBeenNthCalledWith(
1,
`Retrieved no new L1 -> L2 messages between L1 blocks ${1n} and ${50}.`,
);
expect(loggerSpy).toHaveBeenNthCalledWith(2, `No blocks to retrieve from ${1n} to ${50n}`);

// Lets take a look to see if we can find re-org stuff!
await sleep(1000);

expect(loggerSpy).toHaveBeenNthCalledWith(6, `L2 prune have occurred, unwind state`);
expect(loggerSpy).toHaveBeenNthCalledWith(7, `Unwinding 1 block from block 2`);

// Should also see the block number be reduced
latestBlockNum = await archiver.getBlockNumber();
expect(latestBlockNum).toEqual(numL2BlocksInTest - 1);
}, 10_000);

// logs should be created in order of how archiver syncs.
const mockGetLogs = (logs: {
messageSent?: ReturnType<typeof makeMessageSentEvent>[];
Expand Down
120 changes: 78 additions & 42 deletions yarn-project/archiver/src/archiver/archiver.ts
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,6 @@ import { type ArchiverDataStore } from './archiver_store.js';
import { type ArchiverConfig } from './config.js';
import { retrieveBlockFromRollup, retrieveL1ToL2Messages } from './data_retrieval.js';
import { ArchiverInstrumentation } from './instrumentation.js';
import { type SingletonDataRetrieval } from './structs/data_retrieval.js';

/**
* Helper interface to combine all sources this archiver implementation provides.
Expand Down Expand Up @@ -199,11 +198,8 @@ export class Archiver implements ArchiveSource {
*
* This code does not handle reorgs.
*/
const {
blocksSynchedTo = this.l1StartBlock,
messagesSynchedTo = this.l1StartBlock,
provenLogsSynchedTo = this.l1StartBlock,
} = await this.store.getSynchPoint();
const { blocksSynchedTo = this.l1StartBlock, messagesSynchedTo = this.l1StartBlock } =
await this.store.getSynchPoint();
const currentL1BlockNumber = await this.publicClient.getBlockNumber();

// ********** Ensuring Consistency of data pulled from L1 **********
Expand All @@ -225,10 +221,7 @@ export class Archiver implements ArchiveSource {
* in future but for the time being it should give us the guarantees that we need
*/

await this.updateLastProvenL2Block(provenLogsSynchedTo, currentL1BlockNumber);

// ********** Events that are processed per L1 block **********

await this.handleL1ToL2Messages(blockUntilSynced, messagesSynchedTo, currentL1BlockNumber);

// ********** Events that are processed per L2 block **********
Expand Down Expand Up @@ -268,46 +261,84 @@ export class Archiver implements ArchiveSource {
);
}

private async updateLastProvenL2Block(provenSynchedTo: bigint, currentL1BlockNumber: bigint) {
if (currentL1BlockNumber <= provenSynchedTo) {
return;
}

const provenBlockNumber = await this.rollup.read.getProvenBlockNumber();
if (provenBlockNumber) {
await this.store.setProvenL2BlockNumber({
retrievedData: Number(provenBlockNumber),
lastProcessedL1BlockNumber: currentL1BlockNumber,
});
}
}

private async handleL2blocks(blockUntilSynced: boolean, blocksSynchedTo: bigint, currentL1BlockNumber: bigint) {
if (currentL1BlockNumber <= blocksSynchedTo) {
return;
}

const lastBlock = await this.getBlock(-1);

const [, , pendingBlockNumber, pendingArchive, archiveOfMyBlock] = await this.rollup.read.status([
BigInt(lastBlock?.number ?? 0),
]);

const noBlocksButInitial = lastBlock === undefined && pendingBlockNumber == 0n;
const noBlockSinceLast =
lastBlock &&
pendingBlockNumber === BigInt(lastBlock.number) &&
pendingArchive === lastBlock.archive.root.toString();
const localPendingBlockNumber = BigInt(await this.getBlockNumber());
const [provenBlockNumber, provenArchive, pendingBlockNumber, pendingArchive, archiveForLocalPendingBlockNumber] =
await this.rollup.read.status([localPendingBlockNumber]);

const updateProvenBlock = async () => {
// Only update the proven block number if we are behind. And only if we have the state
if (provenBlockNumber > BigInt(await this.getProvenBlockNumber())) {
const localBlockForDestinationProvenBlockNumber = await this.getBlock(Number(provenBlockNumber));
if (
localBlockForDestinationProvenBlockNumber &&
provenArchive === localBlockForDestinationProvenBlockNumber.archive.root.toString()
) {
this.log.info(`Updating the proven block number to ${provenBlockNumber}`);
await this.store.setProvenL2BlockNumber(Number(provenBlockNumber));
}
}
};

if (noBlocksButInitial || noBlockSinceLast) {
// This is an edge case that we only hit if there are no proposed blocks.
const noBlocks = localPendingBlockNumber === 0n && pendingBlockNumber === 0n;
if (noBlocks) {
await this.store.setBlockSynchedL1BlockNumber(currentL1BlockNumber);
this.log.verbose(`No blocks to retrieve from ${blocksSynchedTo + 1n} to ${currentL1BlockNumber}`);
return;
}

if (lastBlock && archiveOfMyBlock !== lastBlock.archive.root.toString()) {
// @todo Either `prune` have been called, or L1 have re-orged deep enough to remove a block.
// Issue#8620 and Issue#8621
// Related to the L2 reorgs of the pending chain. We are only interested in actually addressing a reorg if there
// are any state that could be impacted by it. If we have no blocks, there is no impact.
if (localPendingBlockNumber > 0) {
const localPendingBlock = await this.getBlock(Number(localPendingBlockNumber));
if (localPendingBlock === undefined) {
throw new Error(`Missing block ${localPendingBlockNumber}`);
}

const noBlockSinceLast = localPendingBlock && pendingArchive === localPendingBlock.archive.root.toString();
if (noBlockSinceLast) {
// While there have been no L2 blocks, there might have been a proof, so we will update if needed.
await updateProvenBlock();
await this.store.setBlockSynchedL1BlockNumber(currentL1BlockNumber);
this.log.verbose(`No blocks to retrieve from ${blocksSynchedTo + 1n} to ${currentL1BlockNumber}`);
return;
}

const localPendingBlockInChain = archiveForLocalPendingBlockNumber === localPendingBlock.archive.root.toString();
if (!localPendingBlockInChain) {
// If our local pending block tip is not in the chain on L1 a "prune" must have happened
// or the L1 have reorged.
// In any case, we have to figure out how far into the past the action will take us.
// For simplicity here, we will simply rewind until we end in a block that is also on the chain on L1.
this.log.verbose(`L2 prune have occurred, unwind state`);

let tipAfterUnwind = localPendingBlockNumber;
while (true) {
const candidateBlock = await this.getBlock(Number(tipAfterUnwind));
if (candidateBlock === undefined) {
break;
}

const archiveAtContract = await this.rollup.read.archiveAt([BigInt(candidateBlock.number)]);

if (archiveAtContract === candidateBlock.archive.root.toString()) {
break;
}
tipAfterUnwind--;
}

const blocksToUnwind = localPendingBlockNumber - tipAfterUnwind;
this.log.verbose(
`Unwinding ${blocksToUnwind} block${blocksToUnwind > 1n ? 's' : ''} from block ${localPendingBlockNumber}`,
);

await this.store.unwindBlocks(Number(localPendingBlockNumber), Number(blocksToUnwind));
}
}

this.log.debug(`Retrieving blocks from ${blocksSynchedTo + 1n} to ${currentL1BlockNumber}`);
Expand All @@ -321,7 +352,7 @@ export class Archiver implements ArchiveSource {
);

if (retrievedBlocks.length === 0) {
await this.store.setBlockSynchedL1BlockNumber(currentL1BlockNumber);
// We are not calling `setBlockSynchedL1BlockNumber` because it may cause sync issues if based off infura.
this.log.verbose(`Retrieved no new blocks from ${blocksSynchedTo + 1n} to ${currentL1BlockNumber}`);
return;
}
Expand Down Expand Up @@ -365,6 +396,8 @@ export class Archiver implements ArchiveSource {

const timer = new Timer();
await this.store.addBlocks(retrievedBlocks);
// Important that we update AFTER inserting the blocks.
await updateProvenBlock();
this.instrumentation.processNewBlocks(
timer.ms() / retrievedBlocks.length,
retrievedBlocks.map(b => b.data),
Expand Down Expand Up @@ -476,14 +509,17 @@ export class Archiver implements ArchiveSource {

/**
* Gets an l2 block.
* @param number - The block number to return (inclusive).
* @param number - The block number to return.
* @returns The requested L2 block.
*/
public async getBlock(number: number): Promise<L2Block | undefined> {
// If the number provided is -ve, then return the latest block.
if (number < 0) {
number = await this.store.getSynchedL2BlockNumber();
}
if (number == 0) {
return undefined;
}
const blocks = await this.store.getBlocks(number, 1);
return blocks.length === 0 ? undefined : blocks[0].data;
}
Expand Down Expand Up @@ -554,8 +590,8 @@ export class Archiver implements ArchiveSource {
}

/** Forcefully updates the last proven block number. Use for testing. */
public setProvenBlockNumber(block: SingletonDataRetrieval<number>): Promise<void> {
return this.store.setProvenL2BlockNumber(block);
public setProvenBlockNumber(blockNumber: number): Promise<void> {
return this.store.setProvenL2BlockNumber(blockNumber);
}

public getContractClass(id: Fr): Promise<ContractClassPublic | undefined> {
Expand Down
13 changes: 11 additions & 2 deletions yarn-project/archiver/src/archiver/archiver_store.ts
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ import {
type UnconstrainedFunctionWithMembershipProof,
} from '@aztec/types/contracts';

import { type DataRetrieval, type SingletonDataRetrieval } from './structs/data_retrieval.js';
import { type DataRetrieval } from './structs/data_retrieval.js';
import { type L1Published } from './structs/published.js';

/**
Expand All @@ -50,6 +50,15 @@ export interface ArchiverDataStore {
*/
addBlocks(blocks: L1Published<L2Block>[]): Promise<boolean>;

/**
* Unwinds blocks from the database
* @param from - The tip of the chain, passed for verification purposes,
* ensuring that we don't end up deleting something we did not intend
* @param blocksToUnwind - The number of blocks we are to unwind
* @returns True if the operation is successful
*/
unwindBlocks(from: number, blocksToUnwind: number): Promise<boolean>;

/**
* Gets up to `limit` amount of L2 blocks starting from `from`.
* @param from - Number of the first block to return (inclusive).
Expand Down Expand Up @@ -145,7 +154,7 @@ export interface ArchiverDataStore {
* Stores the number of the latest proven L2 block processed.
* @param l2BlockNumber - The number of the latest proven L2 block processed.
*/
setProvenL2BlockNumber(l2BlockNumber: SingletonDataRetrieval<number>): Promise<void>;
setProvenL2BlockNumber(l2BlockNumber: number): Promise<void>;

/**
* Stores the l1 block number that blocks have been synched until
Expand Down
16 changes: 2 additions & 14 deletions yarn-project/archiver/src/archiver/archiver_store_test_suite.ts
Original file line number Diff line number Diff line change
Expand Up @@ -69,8 +69,8 @@ export function describeArchiverDataStore(testName: string, getStore: () => Arch
await expect(store.getBlocks(1, 0)).rejects.toThrow('Invalid limit: 0');
});

it('resets `from` to the first block if it is out of range', async () => {
await expect(store.getBlocks(INITIAL_L2_BLOCK_NUM - 100, 1)).resolves.toEqual(blocks.slice(0, 1));
it('throws an error if `from` it is out of range', async () => {
await expect(store.getBlocks(INITIAL_L2_BLOCK_NUM - 100, 1)).rejects.toThrow('Invalid start: -99');
});
});

Expand All @@ -90,7 +90,6 @@ export function describeArchiverDataStore(testName: string, getStore: () => Arch
await expect(store.getSynchPoint()).resolves.toEqual({
blocksSynchedTo: undefined,
messagesSynchedTo: undefined,
provenLogsSynchedTo: undefined,
} satisfies ArchiverL1SynchPoint);
});

Expand All @@ -99,7 +98,6 @@ export function describeArchiverDataStore(testName: string, getStore: () => Arch
await expect(store.getSynchPoint()).resolves.toEqual({
blocksSynchedTo: 19n,
messagesSynchedTo: undefined,
provenLogsSynchedTo: undefined,
} satisfies ArchiverL1SynchPoint);
});

Expand All @@ -111,16 +109,6 @@ export function describeArchiverDataStore(testName: string, getStore: () => Arch
await expect(store.getSynchPoint()).resolves.toEqual({
blocksSynchedTo: undefined,
messagesSynchedTo: 1n,
provenLogsSynchedTo: undefined,
} satisfies ArchiverL1SynchPoint);
});

it('returns the L1 block number that most recently logged a proven block', async () => {
await store.setProvenL2BlockNumber({ lastProcessedL1BlockNumber: 3n, retrievedData: 5 });
await expect(store.getSynchPoint()).resolves.toEqual({
blocksSynchedTo: undefined,
messagesSynchedTo: undefined,
provenLogsSynchedTo: 3n,
} satisfies ArchiverL1SynchPoint);
});
});
Expand Down
Loading

0 comments on commit 708eb75

Please sign in to comment.