Skip to content

Commit

Permalink
feat: backfill job single block iterator (#9245)
Browse files Browse the repository at this point in the history
  • Loading branch information
greged93 authored Jul 4, 2024
1 parent 4447f65 commit 93f9a85
Show file tree
Hide file tree
Showing 3 changed files with 247 additions and 59 deletions.
4 changes: 1 addition & 3 deletions crates/ethereum/evm/src/execute.rs
Original file line number Diff line number Diff line change
Expand Up @@ -361,13 +361,11 @@ where
type Output = BlockExecutionOutput<Receipt>;
type Error = BlockExecutionError;

/// Executes the block and commits the state changes.
/// Executes the block and commits the changes to the internal state.
///
/// Returns the receipts of the transactions in the block.
///
/// Returns an error if the block could not be executed or failed verification.
///
/// State changes are committed to the database.
fn execute(mut self, input: Self::Input<'_>) -> Result<Self::Output, Self::Error> {
let BlockExecutionInput { block, total_difficulty } = input;
let EthExecuteOutput { receipts, requests, gas_used } =
Expand Down
2 changes: 1 addition & 1 deletion crates/evm/src/execute.rs
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,7 @@ pub trait BatchExecutor<DB> {
/// Contains the state changes, transaction receipts, and total gas used in the block.
///
/// TODO(mattsse): combine with `ExecutionOutcome`
#[derive(Debug)]
#[derive(Debug, PartialEq, Eq)]
pub struct BlockExecutionOutput<T> {
/// The changed state of the block after execution.
pub state: BundleState,
Expand Down
300 changes: 245 additions & 55 deletions crates/exex/exex/src/backfill.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
use reth_db_api::database::Database;
use reth_evm::execute::{BatchExecutor, BlockExecutionError, BlockExecutorProvider};
use reth_evm::execute::{
BatchExecutor, BlockExecutionError, BlockExecutionOutput, BlockExecutorProvider, Executor,
};
use reth_node_api::FullNodeComponents;
use reth_primitives::{Block, BlockNumber};
use reth_primitives::{Block, BlockNumber, BlockWithSenders, Receipt};
use reth_primitives_traits::format_gas_throughput;
use reth_provider::{Chain, FullProvider, ProviderError, TransactionVariant};
use reth_prune_types::PruneModes;
Expand Down Expand Up @@ -195,38 +197,124 @@ where
}
}

impl<E, DB, P> BackfillJob<E, DB, P> {
/// Converts the backfill job into a single block backfill job.
pub fn into_single_blocks(self) -> SingleBlockBackfillJob<E, DB, P> {
self.into()
}
}

impl<E, DB, P> From<BackfillJob<E, DB, P>> for SingleBlockBackfillJob<E, DB, P> {
fn from(value: BackfillJob<E, DB, P>) -> Self {
Self {
executor: value.executor,
provider: value.provider,
range: value.range,
_db: PhantomData,
}
}
}

/// Single block Backfill job started for a specific range.
///
/// It implements [`Iterator`] which executes a block each time the
/// iterator is advanced and yields ([`BlockWithSenders`], [`BlockExecutionOutput`])
#[derive(Debug)]
pub struct SingleBlockBackfillJob<E, DB, P> {
executor: E,
provider: P,
range: RangeInclusive<BlockNumber>,
_db: PhantomData<DB>,
}

impl<E, DB, P> Iterator for SingleBlockBackfillJob<E, DB, P>
where
E: BlockExecutorProvider,
DB: Database,
P: FullProvider<DB>,
{
type Item = Result<(BlockWithSenders, BlockExecutionOutput<Receipt>), BlockExecutionError>;

fn next(&mut self) -> Option<Self::Item> {
self.range.next().map(|block_number| self.execute_block(block_number))
}
}

impl<E, DB, P> SingleBlockBackfillJob<E, DB, P>
where
E: BlockExecutorProvider,
DB: Database,
P: FullProvider<DB>,
{
fn execute_block(
&self,
block_number: u64,
) -> Result<(BlockWithSenders, BlockExecutionOutput<Receipt>), BlockExecutionError> {
let td = self
.provider
.header_td_by_number(block_number)?
.ok_or_else(|| ProviderError::HeaderNotFound(block_number.into()))?;

// Fetch the block with senders for execution.
let block_with_senders = self
.provider
.block_with_senders(block_number.into(), TransactionVariant::WithHash)?
.ok_or_else(|| ProviderError::HeaderNotFound(block_number.into()))?;

// Configure the executor to use the previous block's state.
let executor = self.executor.executor(StateProviderDatabase::new(
self.provider.history_by_block_number(block_number.saturating_sub(1))?,
));

trace!(target: "exex::backfill", number = block_number, txs = block_with_senders.block.body.len(), "Executing block");

let block_execution_output = executor.execute((&block_with_senders, td).into())?;

Ok((block_with_senders, block_execution_output))
}
}

#[cfg(test)]
mod tests {
use crate::BackfillJobFactory;
use eyre::OptionExt;
use reth_blockchain_tree::noop::NoopBlockchainTree;
use reth_chainspec::{ChainSpecBuilder, EthereumHardfork, MAINNET};
use reth_chainspec::{ChainSpec, ChainSpecBuilder, EthereumHardfork, MAINNET};
use reth_db_common::init::init_genesis;
use reth_evm::execute::{BatchExecutor, BlockExecutorProvider};
use reth_evm::execute::{
BlockExecutionInput, BlockExecutionOutput, BlockExecutorProvider, Executor,
};
use reth_evm_ethereum::execute::EthExecutorProvider;
use reth_primitives::{
b256, constants::ETH_TO_WEI, public_key_to_address, Address, Block, Genesis,
GenesisAccount, Header, Transaction, TxEip2930, TxKind, U256,
b256, constants::ETH_TO_WEI, public_key_to_address, Address, Block, BlockWithSenders,
Genesis, GenesisAccount, Header, Receipt, Requests, SealedBlockWithSenders, Transaction,
TxEip2930, TxKind, U256,
};
use reth_provider::{
providers::BlockchainProvider, test_utils::create_test_provider_factory_with_chain_spec,
BlockWriter, LatestStateProviderRef,
BlockWriter, ExecutionOutcome, LatestStateProviderRef, ProviderFactory,
};
use reth_revm::database::StateProviderDatabase;
use reth_testing_utils::generators::{self, sign_tx_with_key_pair};
use secp256k1::Keypair;
use std::sync::Arc;

#[tokio::test]
async fn test_backfill() -> eyre::Result<()> {
reth_tracing::init_test_tracing();

// Create a key pair for the sender
let key_pair = Keypair::new_global(&mut generators::rng());
let address = public_key_to_address(key_pair.public_key());
fn to_execution_outcome(
block_number: u64,
block_execution_output: &BlockExecutionOutput<Receipt>,
) -> ExecutionOutcome {
ExecutionOutcome {
bundle: block_execution_output.state.clone(),
receipts: block_execution_output.receipts.clone().into(),
first_block: block_number,
requests: vec![Requests(block_execution_output.requests.clone())],
}
}

// Create a chain spec with a genesis state that contains the sender
let chain_spec = Arc::new(
fn chain_spec(address: Address) -> Arc<ChainSpec> {
// Create a chain spec with a genesis state that contains the
// provided sender
Arc::new(
ChainSpecBuilder::default()
.chain(MAINNET.chain)
.genesis(Genesis {
Expand All @@ -239,16 +327,53 @@ mod tests {
})
.paris_activated()
.build(),
);
)
}

let executor = EthExecutorProvider::ethereum(chain_spec.clone());
let provider_factory = create_test_provider_factory_with_chain_spec(chain_spec.clone());
init_genesis(provider_factory.clone())?;
let blockchain_db = BlockchainProvider::new(
provider_factory.clone(),
Arc::new(NoopBlockchainTree::default()),
fn execute_block_and_commit_to_database<DB>(
provider_factory: &ProviderFactory<DB>,
chain_spec: Arc<ChainSpec>,
block: &BlockWithSenders,
) -> eyre::Result<BlockExecutionOutput<Receipt>>
where
DB: reth_db_api::database::Database,
{
let provider = provider_factory.provider()?;

// Execute the block to produce a block execution output
let mut block_execution_output = EthExecutorProvider::ethereum(chain_spec)
.executor(StateProviderDatabase::new(LatestStateProviderRef::new(
provider.tx_ref(),
provider.static_file_provider().clone(),
)))
.execute(BlockExecutionInput { block, total_difficulty: U256::ZERO })?;
block_execution_output.state.reverts.sort();

// Convert the block execution output to an execution outcome for committing to the database
let execution_outcome = to_execution_outcome(block.number, &block_execution_output);

// Commit the block's execution outcome to the database
let provider_rw = provider_factory.provider_rw()?;
let block = block.clone().seal_slow();
provider_rw.append_blocks_with_state(
vec![block],
execution_outcome,
Default::default(),
Default::default(),
)?;
provider_rw.commit()?;

Ok(block_execution_output)
}

fn blocks_and_execution_outputs<DB>(
provider_factory: ProviderFactory<DB>,
chain_spec: Arc<ChainSpec>,
key_pair: Keypair,
) -> eyre::Result<Vec<(SealedBlockWithSenders, BlockExecutionOutput<Receipt>)>>
where
DB: reth_db_api::database::Database,
{
// First block has a transaction that transfers some ETH to zero address
let block1 = Block {
header: Header {
Expand Down Expand Up @@ -279,52 +404,69 @@ mod tests {
.with_recovered_senders()
.ok_or_eyre("failed to recover senders")?;

// Second block has no state changes
// Second block resends the same transaction with increased nonce
let block2 = Block {
header: Header {
parent_hash: block1.hash_slow(),
parent_hash: block1.header.hash_slow(),
receipts_root: b256!(
"d3a6acf9a244d78b33831df95d472c4128ea85bf079a1d41e32ed0b7d2244c9e"
),
difficulty: chain_spec.fork(EthereumHardfork::Paris).ttd().expect("Paris TTD"),
number: 2,
gas_limit: 21000,
gas_used: 21000,
..Default::default()
},
body: vec![sign_tx_with_key_pair(
key_pair,
Transaction::Eip2930(TxEip2930 {
chain_id: chain_spec.chain.id(),
nonce: 1,
gas_limit: 21000,
gas_price: 1_500_000_000,
to: TxKind::Call(Address::ZERO),
value: U256::from(0.1 * ETH_TO_WEI as f64),
..Default::default()
}),
)],
..Default::default()
}
.with_recovered_senders()
.ok_or_eyre("failed to recover senders")?;

let provider = provider_factory.provider()?;
// Execute only the first block on top of genesis state
let mut outcome_single = EthExecutorProvider::ethereum(chain_spec.clone())
.batch_executor(StateProviderDatabase::new(LatestStateProviderRef::new(
provider.tx_ref(),
provider.static_file_provider().clone(),
)))
.execute_and_verify_batch([(&block1, U256::ZERO).into()])?;
outcome_single.bundle.reverts.sort();
// Execute both blocks on top of the genesis state
let outcome_batch = EthExecutorProvider::ethereum(chain_spec)
.batch_executor(StateProviderDatabase::new(LatestStateProviderRef::new(
provider.tx_ref(),
provider.static_file_provider().clone(),
)))
.execute_and_verify_batch([
(&block1, U256::ZERO).into(),
(&block2, U256::ZERO).into(),
])?;
drop(provider);
let block_output1 =
execute_block_and_commit_to_database(&provider_factory, chain_spec.clone(), &block1)?;
let block_output2 =
execute_block_and_commit_to_database(&provider_factory, chain_spec, &block2)?;

let block1 = block1.seal_slow();
let block2 = block2.seal_slow();

// Update the state with the execution results of both blocks
let provider_rw = provider_factory.provider_rw()?;
provider_rw.append_blocks_with_state(
vec![block1.clone(), block2],
outcome_batch,
Default::default(),
Default::default(),
Ok(vec![(block1, block_output1), (block2, block_output2)])
}

#[test]
fn test_backfill() -> eyre::Result<()> {
reth_tracing::init_test_tracing();

// Create a key pair for the sender
let key_pair = Keypair::new_global(&mut generators::rng());
let address = public_key_to_address(key_pair.public_key());

let chain_spec = chain_spec(address);

let executor = EthExecutorProvider::ethereum(chain_spec.clone());
let provider_factory = create_test_provider_factory_with_chain_spec(chain_spec.clone());
init_genesis(provider_factory.clone())?;
let blockchain_db = BlockchainProvider::new(
provider_factory.clone(),
Arc::new(NoopBlockchainTree::default()),
)?;
provider_rw.commit()?;

let blocks_and_execution_outputs =
blocks_and_execution_outputs(provider_factory, chain_spec, key_pair)?;
let (block, block_execution_output) = blocks_and_execution_outputs.first().unwrap();
let execution_outcome = to_execution_outcome(block.number, block_execution_output);

// Backfill the first block
let factory = BackfillJobFactory::new(executor, blockchain_db);
Expand All @@ -336,8 +478,56 @@ mod tests {
assert_eq!(chains.len(), 1);
let mut chain = chains.into_iter().next().unwrap();
chain.execution_outcome_mut().bundle.reverts.sort();
assert_eq!(chain.blocks(), &[(1, block1)].into());
assert_eq!(chain.execution_outcome(), &outcome_single);
assert_eq!(chain.blocks(), &[(1, block.clone())].into());
assert_eq!(chain.execution_outcome(), &execution_outcome);

Ok(())
}

#[test]
fn test_single_block_backfill() -> eyre::Result<()> {
reth_tracing::init_test_tracing();

// Create a key pair for the sender
let key_pair = Keypair::new_global(&mut generators::rng());
let address = public_key_to_address(key_pair.public_key());

let chain_spec = chain_spec(address);

let executor = EthExecutorProvider::ethereum(chain_spec.clone());
let provider_factory = create_test_provider_factory_with_chain_spec(chain_spec.clone());
init_genesis(provider_factory.clone())?;
let blockchain_db = BlockchainProvider::new(
provider_factory.clone(),
Arc::new(NoopBlockchainTree::default()),
)?;

let blocks_and_execution_outcomes =
blocks_and_execution_outputs(provider_factory, chain_spec, key_pair)?;

// Backfill the first block
let factory = BackfillJobFactory::new(executor, blockchain_db);
let job = factory.backfill(1..=1);
let single_job = job.into_single_blocks();
let block_execution_it = single_job.into_iter();

// Assert that the backfill job only produces a single block
let blocks_and_outcomes = block_execution_it.collect::<Vec<_>>();
assert_eq!(blocks_and_outcomes.len(), 1);

// Assert that the backfill job single block iterator produces the expected output for each
// block
for (i, res) in blocks_and_outcomes.into_iter().enumerate() {
let (block, mut execution_output) = res?;
execution_output.state.reverts.sort();

let sealed_block_with_senders = blocks_and_execution_outcomes[i].0.clone();
let expected_block = sealed_block_with_senders.unseal();
let expected_output = &blocks_and_execution_outcomes[i].1;

assert_eq!(block, expected_block);
assert_eq!(&execution_output, expected_output);
}

Ok(())
}
Expand Down

0 comments on commit 93f9a85

Please sign in to comment.