Skip to content

Commit

Permalink
chore(re_execute): use rayon instead of explicit threadpool
Browse files Browse the repository at this point in the history
  • Loading branch information
kkovaacs authored and sistemd committed Feb 13, 2024
1 parent aafd588 commit a6e5a03
Show file tree
Hide file tree
Showing 3 changed files with 66 additions and 84 deletions.
1 change: 0 additions & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 0 additions & 1 deletion crates/pathfinder/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,6 @@ zstd = { workspace = true }
[dev-dependencies]
assert_matches = { workspace = true }
const-decoder = "0.3.0"
crossbeam-channel = "0.5.8"
flate2 = { workspace = true }
http = { workspace = true }
mockall = "0.11.4"
Expand Down
148 changes: 66 additions & 82 deletions crates/pathfinder/examples/re_execute.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ use pathfinder_common::transaction::Transaction;
use pathfinder_common::{BlockHeader, BlockNumber, ChainId};
use pathfinder_executor::ExecutionState;
use pathfinder_storage::{BlockId, JournalMode, Storage};
use rayon::prelude::*;

// Due to the amount of JSON parsing that gets done during execution we use an alternate
// allocator here: mimalloc. According to benchmarks re_execute performs roughly 25% better
Expand All @@ -28,7 +29,7 @@ fn main() -> anyhow::Result<()> {
.compact()
.init();

let n_cpus = std::thread::available_parallelism().unwrap().get();
let n_cpus = rayon::current_num_threads();

let database_path = std::env::args().nth(1).unwrap();
let storage = Storage::migrate(database_path.into(), JournalMode::WAL, 1)?
Expand All @@ -53,50 +54,36 @@ fn main() -> anyhow::Result<()> {
.map(|s| str::parse(&s).unwrap())
.unwrap_or(latest_block);

let (tx, rx) = crossbeam_channel::bounded::<Work>(10);

let executors = (0..n_cpus)
.map(|_| {
let storage = storage.clone();
let rx = rx.clone();
std::thread::spawn(move || execute(storage, chain_id, rx))
})
.collect::<Vec<_>>();

tracing::info!(%first_block, %last_block, "Re-executing blocks");

let start_time = std::time::Instant::now();
let mut num_transactions: usize = 0;

for block_number in first_block..=last_block {
let transaction = db.transaction().unwrap();
let block_id = BlockId::Number(BlockNumber::new_or_panic(block_number));
let block_header = transaction.block_header(block_id)?.unwrap();
let transactions_and_receipts = transaction
.transaction_data_for_block(block_id)?
.context("Getting transactions for block")?;
drop(transaction);

let (transactions, receipts): (Vec<_>, Vec<_>) =
transactions_and_receipts.into_iter().unzip();

num_transactions += transactions.len();

tracing::debug!(%block_number, num_transactions=%transactions.len(), "Submitting block");

tx.send(Work {
header: block_header,
transactions,
receipts,
(first_block..=last_block)
.map(|block_number| {
let transaction = db.transaction().unwrap();
let block_id = BlockId::Number(BlockNumber::new_or_panic(block_number));
let block_header = transaction.block_header(block_id).unwrap().unwrap();
let transactions_and_receipts = transaction
.transaction_data_for_block(block_id)
.unwrap()
.context("Getting transactions for block")
.unwrap();
drop(transaction);

let (transactions, receipts): (Vec<_>, Vec<_>) =
transactions_and_receipts.into_iter().unzip();

num_transactions += transactions.len();

Work {
header: block_header,
transactions,
receipts,
}
})
.unwrap();
}

drop(tx);

for executor in executors {
executor.join().expect("Executor expected to shut down");
}
.par_bridge()
.for_each_with(storage, |storage, block| execute(storage, chain_id, block));

let elapsed = start_time.elapsed().as_millis();

Expand Down Expand Up @@ -132,64 +119,61 @@ struct Work {
receipts: Vec<Receipt>,
}

fn execute(storage: Storage, chain_id: ChainId, rx: crossbeam_channel::Receiver<Work>) {
while let Ok(work) = rx.recv() {
let start_time = std::time::Instant::now();
let num_transactions = work.transactions.len();
fn execute(storage: &mut Storage, chain_id: ChainId, work: Work) {
let start_time = std::time::Instant::now();
let num_transactions = work.transactions.len();

let mut connection = storage.connection().unwrap();
let mut connection = storage.connection().unwrap();

let db_tx = connection.transaction().expect("Create transaction");
let db_tx = connection.transaction().expect("Create transaction");

let execution_state = ExecutionState::trace(&db_tx, chain_id, work.header.clone(), None);
let execution_state = ExecutionState::trace(&db_tx, chain_id, work.header.clone(), None);

let transactions = work
.transactions
.into_iter()
.map(|tx| pathfinder_rpc::compose_executor_transaction(&tx, &db_tx))
.collect::<Result<Vec<_>, _>>();
let transactions = work
.transactions
.into_iter()
.map(|tx| pathfinder_rpc::compose_executor_transaction(&tx.into(), &db_tx))
.collect::<Result<Vec<_>, _>>();

let transactions = match transactions {
Ok(transactions) => transactions,
Err(error) => {
tracing::error!(block_number=%work.header.number, %error, "Transaction conversion failed");
continue;
}
};
let transactions = match transactions {
Ok(transactions) => transactions,
Err(error) => {
tracing::error!(block_number=%work.header.number, %error, "Transaction conversion failed");
return;
}
};

match pathfinder_executor::estimate(execution_state, transactions, false) {
Ok(fee_estimates) => {
for (estimate, receipt) in fee_estimates.iter().zip(work.receipts.iter()) {
if let Some(actual_fee) = receipt.actual_fee {
let actual_fee = u128::from_be_bytes(
actual_fee.0.to_be_bytes()[16..].try_into().unwrap(),
);
match pathfinder_executor::estimate(execution_state, transactions, false) {
Ok(fee_estimates) => {
for (estimate, receipt) in fee_estimates.iter().zip(work.receipts.iter()) {
if let Some(actual_fee) = receipt.actual_fee {
let actual_fee =
u128::from_be_bytes(actual_fee.0.to_be_bytes()[16..].try_into().unwrap());

// L1 handler transactions have a fee of zero in the receipt.
if actual_fee == 0 {
continue;
}
// L1 handler transactions have a fee of zero in the receipt.
if actual_fee == 0 {
continue;
}

let gas_price = work.header.eth_l1_gas_price.0;
let actual_gas_consumed = actual_fee / gas_price.max(1);
let gas_price = work.header.eth_l1_gas_price.0;
let actual_gas_consumed = actual_fee / gas_price.max(1);

let estimated_gas_consumed = estimate.gas_consumed.as_u128();
let estimated_gas_consumed = estimate.gas_consumed.as_u128();

let diff = actual_gas_consumed.abs_diff(estimated_gas_consumed);
let diff = actual_gas_consumed.abs_diff(estimated_gas_consumed);

if diff > (actual_gas_consumed * 2 / 10) {
tracing::warn!(block_number=%work.header.number, transaction_hash=%receipt.transaction_hash, %estimated_gas_consumed, %actual_gas_consumed, estimated_fee=%estimate.overall_fee, %actual_fee, "Estimation mismatch");
}
if diff > (actual_gas_consumed * 2 / 10) {
tracing::warn!(block_number=%work.header.number, transaction_hash=%receipt.transaction_hash, %estimated_gas_consumed, %actual_gas_consumed, estimated_fee=%estimate.overall_fee, %actual_fee, "Estimation mismatch");
}
}
}
Err(error) => {
tracing::error!(block_number=%work.header.number, ?error, "Transaction re-execution failed");
}
}
Err(error) => {
tracing::error!(block_number=%work.header.number, ?error, "Transaction re-execution failed");
}
}

let elapsed = start_time.elapsed().as_millis();
let elapsed = start_time.elapsed().as_millis();

tracing::debug!(block_number=%work.header.number, %num_transactions, %elapsed, "Re-executed block");
}
tracing::debug!(block_number=%work.header.number, %num_transactions, %elapsed, "Re-executed block");
}

0 comments on commit a6e5a03

Please sign in to comment.