Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Flush sequencer #8050

Merged
merged 3 commits into from
Aug 19, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions yarn-project/aztec-node/src/aztec-node/server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -801,6 +801,14 @@ export class AztecNodeService implements AztecNode {
return this.contractDataSource.addContractArtifact(address, artifact);
}

public flushTxs(): Promise<void> {
if (!this.sequencer) {
throw new Error(`Sequencer is not initialized`);
}
this.sequencer.flush();
return Promise.resolve();
}

/**
* Returns an instance of MerkleTreeOperations having first ensured the world state is fully synched
* @param blockNumber - The block number at which to get the data.
Expand Down
3 changes: 3 additions & 0 deletions yarn-project/circuit-types/src/interfaces/aztec-node.ts
Original file line number Diff line number Diff line change
Expand Up @@ -341,4 +341,7 @@ export interface AztecNode {
* @param address - Address of the deployed contract.
*/
getContract(address: AztecAddress): Promise<ContractInstanceWithAddress | undefined>;

/** Forces the next block to be built bypassing all time and pending checks. Useful for testing. */
flushTxs(): Promise<void>;
}
5 changes: 5 additions & 0 deletions yarn-project/sequencer-client/src/client/sequencer-client.ts
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,11 @@ export class SequencerClient {
await this.sequencer.stop();
}

/** Forces the sequencer to bypass all time and tx count checks for the next block and build anyway. */
public flush() {
this.sequencer.flush();
}

/**
* Restarts the sequencer after being stopped.
*/
Expand Down
184 changes: 184 additions & 0 deletions yarn-project/sequencer-client/src/sequencer/sequencer.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import {
IS_DEV_NET,
NUMBER_OF_L1_L2_MESSAGES_PER_ROLLUP,
} from '@aztec/circuits.js';
import { times } from '@aztec/foundation/collection';
import { randomBytes } from '@aztec/foundation/crypto';
import { type Writeable } from '@aztec/foundation/types';
import { type P2P, P2PClientState } from '@aztec/p2p';
Expand Down Expand Up @@ -376,6 +377,189 @@ describe('sequencer', () => {
expect(blockSimulator.cancelBlock).toHaveBeenCalledTimes(0);
});

it('builds a block once it reaches the minimum number of transactions', async () => {
const txs = times(8, i => {
const tx = mockTxForRollup(i * 0x10000);
tx.data.constants.txContext.chainId = chainId;
return tx;
});
const block = L2Block.random(lastBlockNumber + 1);
const result: ProvingSuccess = {
status: PROVING_STATUS.SUCCESS,
};
const ticket: ProvingTicket = {
provingPromise: Promise.resolve(result),
};

blockSimulator.startNewBlock.mockResolvedValueOnce(ticket);
blockSimulator.finaliseBlock.mockResolvedValue({ block });
publisher.processL2Block.mockResolvedValueOnce(true);

const mockedGlobalVariables = new GlobalVariables(
chainId,
version,
new Fr(lastBlockNumber + 1),
block.header.globalVariables.slotNumber,
Fr.ZERO,
coinbase,
feeRecipient,
gasFees,
);

globalVariableBuilder.buildGlobalVariables.mockResolvedValueOnce(mockedGlobalVariables);

await sequencer.initialSync();

sequencer.updateConfig({ minTxsPerBlock: 4 });

// block is not built with 0 txs
p2p.getTxs.mockReturnValueOnce([]);
//p2p.getTxs.mockReturnValueOnce(txs.slice(0, 4));
await sequencer.work();
expect(blockSimulator.startNewBlock).toHaveBeenCalledTimes(0);

// block is not built with 3 txs
p2p.getTxs.mockReturnValueOnce(txs.slice(0, 3));
await sequencer.work();
expect(blockSimulator.startNewBlock).toHaveBeenCalledTimes(0);

// block is built with 4 txs
p2p.getTxs.mockReturnValueOnce(txs.slice(0, 4));
await sequencer.work();
expect(blockSimulator.startNewBlock).toHaveBeenCalledWith(
4,
mockedGlobalVariables,
Array(NUMBER_OF_L1_L2_MESSAGES_PER_ROLLUP).fill(new Fr(0n)),
);
expect(publisher.processL2Block).toHaveBeenCalledTimes(1);
expect(publisher.processL2Block).toHaveBeenCalledWith(block, getAttestations());
expect(blockSimulator.cancelBlock).toHaveBeenCalledTimes(0);
});

it('builds a block that contains zero real transactions once flushed', async () => {
const txs = times(8, i => {
const tx = mockTxForRollup(i * 0x10000);
tx.data.constants.txContext.chainId = chainId;
return tx;
});
const block = L2Block.random(lastBlockNumber + 1);
const result: ProvingSuccess = {
status: PROVING_STATUS.SUCCESS,
};
const ticket: ProvingTicket = {
provingPromise: Promise.resolve(result),
};

blockSimulator.startNewBlock.mockResolvedValueOnce(ticket);
blockSimulator.finaliseBlock.mockResolvedValue({ block });
publisher.processL2Block.mockResolvedValueOnce(true);

const mockedGlobalVariables = new GlobalVariables(
chainId,
version,
new Fr(lastBlockNumber + 1),
block.header.globalVariables.slotNumber,
Fr.ZERO,
coinbase,
feeRecipient,
gasFees,
);

globalVariableBuilder.buildGlobalVariables.mockResolvedValueOnce(mockedGlobalVariables);

await sequencer.initialSync();

sequencer.updateConfig({ minTxsPerBlock: 4 });

// block is not built with 0 txs
p2p.getTxs.mockReturnValueOnce([]);
await sequencer.work();
expect(blockSimulator.startNewBlock).toHaveBeenCalledTimes(0);

// block is not built with 3 txs
p2p.getTxs.mockReturnValueOnce(txs.slice(0, 3));
await sequencer.work();
expect(blockSimulator.startNewBlock).toHaveBeenCalledTimes(0);

// flush the sequencer and it should build a block
sequencer.flush();

// block is built with 0 txs
p2p.getTxs.mockReturnValueOnce([]);
await sequencer.work();
expect(blockSimulator.startNewBlock).toHaveBeenCalledTimes(1);
expect(blockSimulator.startNewBlock).toHaveBeenCalledWith(
2,
mockedGlobalVariables,
Array(NUMBER_OF_L1_L2_MESSAGES_PER_ROLLUP).fill(new Fr(0n)),
);
expect(publisher.processL2Block).toHaveBeenCalledTimes(1);
expect(publisher.processL2Block).toHaveBeenCalledWith(block, getAttestations());
expect(blockSimulator.cancelBlock).toHaveBeenCalledTimes(0);
});

it('builds a block that contains less than the minimum number of transactions once flushed', async () => {
const txs = times(8, i => {
const tx = mockTxForRollup(i * 0x10000);
tx.data.constants.txContext.chainId = chainId;
return tx;
});
const block = L2Block.random(lastBlockNumber + 1);
const result: ProvingSuccess = {
status: PROVING_STATUS.SUCCESS,
};
const ticket: ProvingTicket = {
provingPromise: Promise.resolve(result),
};

blockSimulator.startNewBlock.mockResolvedValueOnce(ticket);
blockSimulator.finaliseBlock.mockResolvedValue({ block });
publisher.processL2Block.mockResolvedValueOnce(true);

const mockedGlobalVariables = new GlobalVariables(
chainId,
version,
new Fr(lastBlockNumber + 1),
block.header.globalVariables.slotNumber,
Fr.ZERO,
coinbase,
feeRecipient,
gasFees,
);

globalVariableBuilder.buildGlobalVariables.mockResolvedValueOnce(mockedGlobalVariables);

await sequencer.initialSync();

sequencer.updateConfig({ minTxsPerBlock: 4 });

// block is not built with 0 txs
p2p.getTxs.mockReturnValueOnce([]);
await sequencer.work();
expect(blockSimulator.startNewBlock).toHaveBeenCalledTimes(0);

// block is not built with 3 txs
p2p.getTxs.mockReturnValueOnce(txs.slice(0, 3));
await sequencer.work();
expect(blockSimulator.startNewBlock).toHaveBeenCalledTimes(0);

// flush the sequencer and it should build a block
sequencer.flush();

// block is built with 3 txs
p2p.getTxs.mockReturnValueOnce(txs.slice(0, 3));
await sequencer.work();
expect(blockSimulator.startNewBlock).toHaveBeenCalledTimes(1);
expect(blockSimulator.startNewBlock).toHaveBeenCalledWith(
3,
mockedGlobalVariables,
Array(NUMBER_OF_L1_L2_MESSAGES_PER_ROLLUP).fill(new Fr(0n)),
);
expect(publisher.processL2Block).toHaveBeenCalledTimes(1);
expect(publisher.processL2Block).toHaveBeenCalledWith(block, getAttestations());
expect(blockSimulator.cancelBlock).toHaveBeenCalledTimes(0);
});

it('aborts building a block if the chain moves underneath it', async () => {
const tx = mockTxForRollup();
tx.data.constants.txContext.chainId = chainId;
Expand Down
36 changes: 32 additions & 4 deletions yarn-project/sequencer-client/src/sequencer/sequencer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ export class Sequencer {
private allowedInTeardown: AllowedElement[] = [];
private maxBlockSizeInBytes: number = 1024 * 1024;
private metrics: SequencerMetrics;
private isFlushing: boolean = false;

constructor(
private publisher: L1Publisher,
Expand Down Expand Up @@ -194,6 +195,10 @@ export class Sequencer {
return;
}

if (this.isFlushing) {
this.log.verbose(`Flushing all pending txs in new block`);
}

// Compute time elapsed since the previous block
const lastBlockTime = historicalHeader?.globalVariables.timestamp.toNumber() || 0;
const currentTime = Math.floor(Date.now() / 1000);
Expand All @@ -203,7 +208,11 @@ export class Sequencer {
);

// Do not go forward with new block if not enough time has passed since last block
if (this.minSecondsBetweenBlocks > 0 && elapsedSinceLastBlock < this.minSecondsBetweenBlocks) {
if (
!this.isFlushing &&
this.minSecondsBetweenBlocks > 0 &&
elapsedSinceLastBlock < this.minSecondsBetweenBlocks
) {
this.log.debug(
`Not creating block because not enough time ${this.minSecondsBetweenBlocks} has passed since last block`,
);
Expand All @@ -216,7 +225,7 @@ export class Sequencer {
const pendingTxs = this.p2pClient.getTxs('pending');

// If we haven't hit the maxSecondsBetweenBlocks, we need to have at least minTxsPerBLock txs.
if (pendingTxs.length < this.minTxsPerBLock) {
if (!this.isFlushing && pendingTxs.length < this.minTxsPerBLock) {
if (this.skipMinTxsPerBlockCheck(elapsedSinceLastBlock)) {
this.log.debug(
`Creating block with only ${pendingTxs.length} txs as more than ${this.maxSecondsBetweenBlocks}s have passed since last block`,
Expand Down Expand Up @@ -252,7 +261,11 @@ export class Sequencer {
const validTxs = this.takeTxsWithinMaxSize(allValidTxs);

// Bail if we don't have enough valid txs
if (!this.skipMinTxsPerBlockCheck(elapsedSinceLastBlock) && validTxs.length < this.minTxsPerBLock) {
if (
!this.isFlushing &&
!this.skipMinTxsPerBlockCheck(elapsedSinceLastBlock) &&
validTxs.length < this.minTxsPerBLock
) {
this.log.debug(
`Not creating block because not enough valid txs loaded from the pool (got ${validTxs.length} min ${this.minTxsPerBLock})`,
);
Expand Down Expand Up @@ -334,7 +347,12 @@ export class Sequencer {
// less txs than the minimum. But that'd cause the entire block to be aborted and retried. Instead, we should
// go back to the p2p pool and load more txs until we hit our minTxsPerBLock target. Only if there are no txs
// we should bail.
if (processedTxs.length === 0 && !this.skipMinTxsPerBlockCheck(elapsedSinceLastBlock) && this.minTxsPerBLock > 0) {
if (
!this.isFlushing &&
processedTxs.length === 0 &&
!this.skipMinTxsPerBlockCheck(elapsedSinceLastBlock) &&
this.minTxsPerBLock > 0
) {
this.log.verbose('No txs processed correctly to build block. Exiting');
blockBuilder.cancelBlock();
return;
Expand Down Expand Up @@ -374,6 +392,11 @@ export class Sequencer {
} satisfies L2BlockBuiltStats,
);

if (this.isFlushing) {
this.log.verbose(`Flushing completed`);
}
this.isFlushing = false;

try {
const attestations = await this.collectAttestations(block);
await this.publishL2Block(block, attestations);
Expand All @@ -389,6 +412,11 @@ export class Sequencer {
}
}

/** Forces the sequencer to bypass all time and tx count checks for the next block and build anyway. */
public flush() {
this.isFlushing = true;
}

protected async collectAttestations(block: L2Block): Promise<Signature[] | undefined> {
// @todo This should collect attestations properly and fix the ordering of them to make sense
// the current implementation is a PLACEHOLDER and should be nuked from orbit.
Expand Down
Loading