diff --git a/.github/workflows/jerigon-native.yml b/.github/workflows/jerigon-native.yml index 29a380c3a..1ef443d9d 100644 --- a/.github/workflows/jerigon-native.yml +++ b/.github/workflows/jerigon-native.yml @@ -74,14 +74,14 @@ jobs: run: | ETH_RPC_URL="$(kurtosis port print cancun-testnet el-2-erigon-lighthouse ws-rpc)" ulimit -n 8192 - OUTPUT_TO_TERMINAL=true ./scripts/prove_rpc.sh 0x1 0xf $ETH_RPC_URL native true 3000 100 test_only + OUTPUT_TO_TERMINAL=true ./scripts/prove_rpc.sh 1 15 $ETH_RPC_URL native 0 3000 100 test_only echo "Proving blocks in test_only mode finished" - name: Run prove blocks with native tracer in real mode run: | ETH_RPC_URL="$(kurtosis port print cancun-testnet el-2-erigon-lighthouse ws-rpc)" rm -rf proofs/* circuits/* ./proofs.json test.out verify.out leader.out - OUTPUT_TO_TERMINAL=true RUN_VERIFICATION=true ./scripts/prove_rpc.sh 0x4 0x7 $ETH_RPC_URL native true 3000 100 + OUTPUT_TO_TERMINAL=true RUN_VERIFICATION=true ./scripts/prove_rpc.sh 4 7 $ETH_RPC_URL native 3 3000 100 echo "Proving blocks in real mode finished" - name: Shut down network diff --git a/.github/workflows/jerigon-zero.yml b/.github/workflows/jerigon-zero.yml index 216b32f8c..c2e994c80 100644 --- a/.github/workflows/jerigon-zero.yml +++ b/.github/workflows/jerigon-zero.yml @@ -10,7 +10,6 @@ on: branches: - "**" - env: CARGO_TERM_COLOR: always REGISTRY: ghcr.io @@ -26,16 +25,16 @@ jobs: uses: actions/checkout@v4 - name: Checkout test-jerigon-network sources - uses: actions/checkout@v4 + uses: actions/checkout@v4 with: repository: 0xPolygonZero/jerigon-test-network - ref: 'feat/kurtosis-network' + ref: "feat/kurtosis-network" path: jerigon-test-network - uses: actions-rust-lang/setup-rust-toolchain@v1 - + - name: Set up QEMU - uses: docker/setup-qemu-action@v3 + uses: docker/setup-qemu-action@v3 - name: Login to GitHub Container Registry uses: docker/login-action@v2 @@ -57,39 +56,35 @@ jobs: #It is much easier to use cast tool in scripts so install foundry - name: Install Foundry - uses: foundry-rs/foundry-toolchain@v1 + uses: foundry-rs/foundry-toolchain@v1 - name: Run cancun test network run: | docker pull ghcr.io/0xpolygonzero/erigon:feat-zero - kurtosis run --enclave cancun-testnet github.com/ethpandaops/ethereum-package@4.0.0 --args-file jerigon-test-network/network_params.yml + kurtosis run --enclave cancun-testnet github.com/ethpandaops/ethereum-package@4.0.0 --args-file jerigon-test-network/network_params.yml - name: Generate blocks with transactions run: | - ETH_RPC_URL="$(kurtosis port print cancun-testnet el-2-erigon-lighthouse ws-rpc)" - cast rpc eth_blockNumber --rpc-url $ETH_RPC_URL - cd jerigon-test-network && set -a && source .env && set +a - bash ./tests/generate_transactions.sh - + ETH_RPC_URL="$(kurtosis port print cancun-testnet el-2-erigon-lighthouse ws-rpc)" + cast rpc eth_blockNumber --rpc-url $ETH_RPC_URL + cd jerigon-test-network && set -a && source .env && set +a + bash ./tests/generate_transactions.sh + - name: Run prove blocks with zero tracer in test_only mode run: | ETH_RPC_URL="$(kurtosis port print cancun-testnet el-2-erigon-lighthouse ws-rpc)" ulimit -n 8192 - OUTPUT_TO_TERMINAL=true ./scripts/prove_rpc.sh 0x1 0xf $ETH_RPC_URL jerigon true 3000 100 test_only + OUTPUT_TO_TERMINAL=true ./scripts/prove_rpc.sh 1 15 $ETH_RPC_URL jerigon true 3000 100 test_only echo "Proving blocks in test_only mode finished" - - name: Run prove blocks with zero tracer in real mode run: | ETH_RPC_URL="$(kurtosis port print cancun-testnet el-2-erigon-lighthouse ws-rpc)" rm -rf proofs/* circuits/* ./proofs.json test.out verify.out leader.out - OUTPUT_TO_TERMINAL=true RUN_VERIFICATION=true ./scripts/prove_rpc.sh 0x2 0x5 $ETH_RPC_URL jerigon true 3000 100 + OUTPUT_TO_TERMINAL=true RUN_VERIFICATION=true ./scripts/prove_rpc.sh 2 5 $ETH_RPC_URL jerigon true 3000 100 echo "Proving blocks in real mode finished" - + - name: Shut down network run: | kurtosis enclave rm -f cancun-testnet kurtosis engine stop - - - diff --git a/Cargo.lock b/Cargo.lock index ea4fb8060..cb2e30838 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1834,6 +1834,12 @@ version = "0.15.7" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "1aaf95b3e5c8f23aa320147307562d361db0ae0d51242340f558153b4eb2439b" +[[package]] +name = "downcast" +version = "0.11.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1435fa1053d8b2fbbe9be7e97eca7f33d37b28409959813daefc1446a14247f1" + [[package]] name = "dunce" version = "1.0.5" @@ -2189,6 +2195,12 @@ dependencies = [ "percent-encoding", ] +[[package]] +name = "fragile" +version = "2.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6c2141d6d6c8512188a7891b4b01590a45f6dac67afb4f255c4124dbb86d4eaa" + [[package]] name = "funty" version = "2.0.0" @@ -3057,6 +3069,32 @@ dependencies = [ "windows-sys 0.52.0", ] +[[package]] +name = "mockall" +version = "0.13.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d4c28b3fb6d753d28c20e826cd46ee611fda1cf3cde03a443a974043247c065a" +dependencies = [ + "cfg-if", + "downcast", + "fragile", + "mockall_derive", + "predicates", + "predicates-tree", +] + +[[package]] +name = "mockall_derive" +version = "0.13.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "341014e7f530314e9a1fdbc7400b244efea7122662c96bfa248c31da5bfb2020" +dependencies = [ + "cfg-if", + "proc-macro2", + "quote", + "syn 2.0.77", +] + [[package]] name = "mpt_trie" version = "0.4.1" @@ -3789,6 +3827,32 @@ dependencies = [ "zerocopy", ] +[[package]] +name = "predicates" +version = "3.1.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7e9086cc7640c29a356d1a29fd134380bee9d8f79a17410aa76e7ad295f42c97" +dependencies = [ + "anstyle", + "predicates-core", +] + +[[package]] +name = "predicates-core" +version = "1.0.8" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ae8177bee8e75d6846599c6b9ff679ed51e882816914eec639944d7c9aa11931" + +[[package]] +name = "predicates-tree" +version = "1.0.11" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "41b740d195ed3166cd147c8047ec98db0e22ec019eb8eeb76d343b795304fb13" +dependencies = [ + "predicates-core", + "termtree", +] + [[package]] name = "pretty_env_logger" version = "0.5.0" @@ -4814,6 +4878,12 @@ dependencies = [ "winapi-util", ] +[[package]] +name = "termtree" +version = "0.4.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3369f5ac52d5eb6ab48c6b4ffdc8efbcad6b89c765749064ba298f2c68a16a76" + [[package]] name = "text-size" version = "1.1.1" @@ -5823,6 +5893,7 @@ dependencies = [ "jemallocator", "keccak-hash 0.10.0", "lru", + "mockall", "mpt_trie", "num-traits", "once_cell", diff --git a/scripts/prove_rpc.sh b/scripts/prove_rpc.sh index 49848fdfe..17f529bd0 100755 --- a/scripts/prove_rpc.sh +++ b/scripts/prove_rpc.sh @@ -1,11 +1,11 @@ #!/bin/bash # Args: -# 1 --> Start block idx -# 2 --> End block index (inclusive) +# 1 --> Start block (number or hash) +# 2 --> End block (number or hash, inclusive) # 3 --> Rpc endpoint:port (eg. http://35.246.1.96:8545) # 4 --> Rpc type (eg. jerigon / native) -# 5 --> Ignore previous proofs (boolean) +# 5 --> Checkpoint block (number or hash to ignore previous proofs. empty [""] when specifying start block by number to rely on previous proofs) # 6 --> Backoff in milliseconds (optional [default: 0]) # 7 --> Number of retries (optional [default: 0]) # 8 --> Test run only flag `test_only` (optional) @@ -38,13 +38,12 @@ REPO_ROOT=$(git rev-parse --show-toplevel) PROOF_OUTPUT_DIR="${REPO_ROOT}/proofs" OUT_LOG_PATH="${PROOF_OUTPUT_DIR}/b$1_$2.log" ALWAYS_WRITE_LOGS=0 # Change this to `1` if you always want logs to be written. -TOT_BLOCKS=$(($2-$1+1)) START_BLOCK=$1 END_BLOCK=$2 NODE_RPC_URL=$3 NODE_RPC_TYPE=$4 -IGNORE_PREVIOUS_PROOFS=$5 +CHECKPOINT_BLOCK=$5 BACKOFF=${6:-0} RETRIES=${7:-0} @@ -56,41 +55,27 @@ RUN_VERIFICATION="${RUN_VERIFICATION:-false}" # Recommended soft file handle limit. Will warn if it is set lower. RECOMMENDED_FILE_HANDLE_LIMIT=8192 -mkdir -p $PROOF_OUTPUT_DIR +mkdir -p "$PROOF_OUTPUT_DIR" -if $IGNORE_PREVIOUS_PROOFS ; then - # Set checkpoint height to previous block number for the first block in range - prev_proof_num=$(($1-1)) - PREV_PROOF_EXTRA_ARG="--checkpoint-block-number ${prev_proof_num}" +if [ -n "$CHECKPOINT_BLOCK" ] ; then + # Checkpoint block provided, pass it to the prover as a flag + PREV_PROOF_EXTRA_ARG="--checkpoint-block $CHECKPOINT_BLOCK" else + # Checkpoint block not provided, but is required hash-based start block + if [[ $START_BLOCK == 0x* ]]; then + echo "Checkpoint block is required when specifying blocks by hash" + exit 1 + fi + + # Checkpoint block not provided, deduce proof starting point from the start block if [[ $1 -gt 1 ]]; then prev_proof_num=$(($1-1)) PREV_PROOF_EXTRA_ARG="-f ${PROOF_OUTPUT_DIR}/b${prev_proof_num}.zkproof" fi fi -# Convert hex to decimal parameters -if [[ $START_BLOCK == 0x* ]]; then - START_BLOCK=$((16#${START_BLOCK#"0x"})) -fi -if [[ $END_BLOCK == 0x* ]]; then - END_BLOCK=$((16#${END_BLOCK#"0x"})) -fi - -# Define block interval -if [ $END_BLOCK == '-' ]; then - # Follow from the start block to the end of the chain - BLOCK_INTERVAL=$START_BLOCK.. -elif [ $START_BLOCK == $END_BLOCK ]; then - # Single block - BLOCK_INTERVAL=$START_BLOCK -else - # Block range - BLOCK_INTERVAL=$START_BLOCK..=$END_BLOCK -fi - # Print out a warning if the we're using `native` and our file descriptor limit is too low. Don't bother if we can't find `ulimit`. -if [ $(command -v ulimit) ] && [ $NODE_RPC_TYPE == "native" ] +if [ "$(command -v ulimit)" ] && [ "$NODE_RPC_TYPE" == "native" ] then file_desc_limit=$(ulimit -n) @@ -108,49 +93,76 @@ fi # other non-proving code. if [[ $8 == "test_only" ]]; then # test only run - echo "Proving blocks ${BLOCK_INTERVAL} in a test_only mode now... (Total: ${TOT_BLOCKS})" - command='cargo r --release --package zero --bin leader -- --test-only --runtime in-memory --load-strategy on-demand --proof-output-dir $PROOF_OUTPUT_DIR --block-batch-size $BLOCK_BATCH_SIZE rpc --rpc-type "$NODE_RPC_TYPE" --rpc-url "$NODE_RPC_URL" --block-interval $BLOCK_INTERVAL $PREV_PROOF_EXTRA_ARG --backoff "$BACKOFF" --max-retries "$RETRIES" ' + echo "Proving blocks from ($START_BLOCK) to ($END_BLOCK)" + command="cargo r --release --package zero --bin leader -- \ +--test-only \ +--runtime in-memory \ +--load-strategy on-demand \ +--proof-output-dir $PROOF_OUTPUT_DIR \ +--block-batch-size $BLOCK_BATCH_SIZE \ +rpc \ +--rpc-type $NODE_RPC_TYPE \ +--rpc-url $NODE_RPC_URL \ +--start-block $START_BLOCK \ +--end-block $END_BLOCK \ +--backoff $BACKOFF \ +--max-retries $RETRIES \ +$PREV_PROOF_EXTRA_ARG" + if [ "$OUTPUT_TO_TERMINAL" = true ]; then - eval $command + eval "$command" retVal=$? echo -e "Proof witness generation finished with result: $retVal" exit $retVal else - eval $command > $OUT_LOG_PATH 2>&1 - if grep -q 'All proof witnesses have been generated successfully.' $OUT_LOG_PATH; then + eval "$command" > "$OUT_LOG_PATH" 2>&1 + if grep -q 'All proof witnesses have been generated successfully.' "$OUT_LOG_PATH"; then echo -e "Success - Note this was just a test, not a proof" # Remove the log on success if we don't want to keep it. if [ $ALWAYS_WRITE_LOGS -ne 1 ]; then - rm $OUT_LOG_PATH + rm "$OUT_LOG_PATH" fi exit else - echo "Failed to create proof witnesses. See ${OUT_LOG_PATH} for more details." + echo "Failed to create proof witnesses. See $OUT_LOG_PATH for more details." exit 1 fi fi else # normal run - echo "Proving blocks ${BLOCK_INTERVAL} now... (Total: ${TOT_BLOCKS})" - command='cargo r --release --package zero --bin leader -- --runtime in-memory --load-strategy on-demand --proof-output-dir $PROOF_OUTPUT_DIR --block-batch-size $BLOCK_BATCH_SIZE rpc --rpc-type "$NODE_RPC_TYPE" --rpc-url "$3" --block-interval $BLOCK_INTERVAL $PREV_PROOF_EXTRA_ARG --backoff "$BACKOFF" --max-retries "$RETRIES" ' + echo "Proving blocks from ($START_BLOCK) to ($END_BLOCK)" + command="cargo r --release --package zero --bin leader -- \ +--runtime in-memory \ +--load-strategy on-demand \ +--proof-output-dir $PROOF_OUTPUT_DIR \ +--block-batch-size $BLOCK_BATCH_SIZE \ +rpc \ +--rpc-type $NODE_RPC_TYPE \ +--rpc-url $3 \ +--start-block $START_BLOCK \ +--end-block $END_BLOCK \ +--backoff $BACKOFF \ +--max-retries $RETRIES \ +$PREV_PROOF_EXTRA_ARG " + if [ "$OUTPUT_TO_TERMINAL" = true ]; then - eval $command + eval "$command" echo -e "Proof generation finished with result: $?" else - eval $command > $OUT_LOG_PATH 2>&1 + eval "$command" > "$OUT_LOG_PATH" 2>&1 retVal=$? if [ $retVal -ne 0 ]; then # Some error occurred, display the logs and exit. - cat $OUT_LOG_PATH - echo "Block ${i} errored. See ${OUT_LOG_PATH} for more details." + cat "$OUT_LOG_PATH" + echo "Error occurred. See $OUT_LOG_PATH for more details." exit $retVal else # Remove the log on success if we don't want to keep it. if [ $ALWAYS_WRITE_LOGS -ne 1 ]; then - rm $OUT_LOG_PATH + rm "$OUT_LOG_PATH" fi fi - echo "Successfully generated ${TOT_BLOCKS} proofs!" + echo "Successfully generated proofs!" fi fi @@ -160,15 +172,15 @@ if [ "$RUN_VERIFICATION" = true ]; then echo "Running the verification for the last proof..." proof_file_name=$PROOF_OUTPUT_DIR/b$END_BLOCK.zkproof - echo "Verifying the proof of the latest block in the interval:" $proof_file_name - cargo r --release --package zero --bin verifier -- -f $proof_file_name > $PROOF_OUTPUT_DIR/verify.out 2>&1 + echo "Verifying the proof of the latest block in the interval:" "$proof_file_name" + cargo r --release --package zero --bin verifier -- -f "$proof_file_name" > "$PROOF_OUTPUT_DIR/verify.out" 2>&1 - if grep -q 'All proofs verified successfully!' $PROOF_OUTPUT_DIR/verify.out; then + if grep -q 'All proofs verified successfully!' "$PROOF_OUTPUT_DIR/verify.out"; then echo "$proof_file_name verified successfully!"; - rm $PROOF_OUTPUT_DIR/verify.out + rm "$PROOF_OUTPUT_DIR/verify.out" else # Some error occurred with verification, display the logs and exit. - cat $PROOF_OUTPUT_DIR/verify.out + cat "$PROOF_OUTPUT_DIR/verify.out" echo "There was an issue with proof verification. See $PROOF_OUTPUT_DIR/verify.out for more details."; exit 1 fi diff --git a/zero/Cargo.toml b/zero/Cargo.toml index 7cbf2f351..1a6abfa90 100644 --- a/zero/Cargo.toml +++ b/zero/Cargo.toml @@ -47,11 +47,11 @@ tracing.workspace = true tracing-subscriber.workspace = true url.workspace = true zk_evm_common.workspace = true +mockall = "0.13.0" [target.'cfg(not(target_env = "msvc"))'.dependencies] jemallocator = "0.5.4" - [build-dependencies] anyhow.workspace = true vergen-git2 = { version = "1.0.0", features = ["build"] } diff --git a/zero/src/bin/leader.rs b/zero/src/bin/leader.rs index 6daaf599f..38ca2a3da 100644 --- a/zero/src/bin/leader.rs +++ b/zero/src/bin/leader.rs @@ -5,12 +5,12 @@ use std::sync::Arc; use anyhow::Result; use clap::Parser; use cli::Command; -use client::RpcParams; use paladin::config::Config; use paladin::runtime::Runtime; use tracing::info; use zero::env::load_dotenvy_vars_if_present; use zero::prover::{ProofRuntime, ProverConfig}; +use zero::rpc::retry::build_http_retry_provider; use zero::{ block_interval::BlockInterval, prover_state::persistence::set_circuit_cache_dir_env_if_not_set, }; @@ -103,26 +103,36 @@ async fn main() -> Result<()> { Command::Rpc { rpc_url, rpc_type, - block_interval, - checkpoint_block_number, + checkpoint_block, previous_proof, block_time, + end_block, + start_block, backoff, max_retries, } => { + // Construct the provider. let previous_proof = get_previous_proof(previous_proof)?; - let block_interval = BlockInterval::new(&block_interval)?; + let retry_provider = build_http_retry_provider(rpc_url.clone(), backoff, max_retries)?; + let cached_provider = Arc::new(zero::provider::CachedProvider::new( + retry_provider, + rpc_type, + )); + // Construct the block interval. + let block_interval = + BlockInterval::new(cached_provider.clone(), start_block, end_block).await?; + + // Convert the checkpoint block to a block number. + let checkpoint_block_number = + BlockInterval::block_to_num(cached_provider.clone(), checkpoint_block).await?; + + // Prove the block interval. info!("Proving interval {block_interval}"); client_main( proof_runtime, - RpcParams { - rpc_url, - rpc_type, - backoff, - max_retries, - block_time, - }, + cached_provider, + block_time, block_interval, LeaderConfig { checkpoint_block_number, diff --git a/zero/src/bin/leader/cli.rs b/zero/src/bin/leader/cli.rs index ad9270ee8..6827d5814 100644 --- a/zero/src/bin/leader/cli.rs +++ b/zero/src/bin/leader/cli.rs @@ -1,5 +1,6 @@ use std::path::PathBuf; +use alloy::eips::BlockId; use alloy::transports::http::reqwest::Url; use clap::{Parser, Subcommand, ValueEnum, ValueHint}; use zero::prover::cli::CliProverConfig; @@ -45,6 +46,7 @@ pub enum WorkerRunMode { Default, } +#[allow(clippy::large_enum_variant)] #[derive(Subcommand)] pub(crate) enum Command { /// Deletes all the previously cached circuits. @@ -63,12 +65,15 @@ pub(crate) enum Command { // The node RPC type (jerigon / native). #[arg(long, short = 't', default_value = "jerigon")] rpc_type: RpcType, - /// The block interval for which to generate a proof. - #[arg(long, short = 'i')] - block_interval: String, - /// The checkpoint block number. - #[arg(short, long, default_value_t = 0)] - checkpoint_block_number: u64, + /// The start of the block range to prove (inclusive). + #[arg(long, short = 's')] + start_block: BlockId, + /// The end of the block range to prove (inclusive). + #[arg(long, short = 'e')] + end_block: Option, + /// The checkpoint block. + #[arg(short, long, default_value = "0")] + checkpoint_block: BlockId, /// The previous proof output. #[arg(long, short = 'f', value_hint = ValueHint::FilePath)] previous_proof: Option, diff --git a/zero/src/bin/leader/client.rs b/zero/src/bin/leader/client.rs index 343a2cdcb..a51d17a0d 100644 --- a/zero/src/bin/leader/client.rs +++ b/zero/src/bin/leader/client.rs @@ -1,7 +1,8 @@ use std::sync::Arc; +use alloy::providers::Provider; use alloy::rpc::types::{BlockId, BlockNumberOrTag}; -use alloy::transports::http::reqwest::Url; +use alloy::transports::Transport; use anyhow::{anyhow, Result}; use tokio::sync::mpsc; use tracing::info; @@ -10,19 +11,9 @@ use zero::pre_checks::check_previous_proof_and_checkpoint; use zero::proof_types::GeneratedBlockProof; use zero::prover::{self, BlockProverInput, ProverConfig}; use zero::rpc; -use zero::rpc::{retry::build_http_retry_provider, RpcType}; use crate::ProofRuntime; -#[derive(Debug)] -pub struct RpcParams { - pub rpc_url: Url, - pub rpc_type: RpcType, - pub backoff: u64, - pub max_retries: u32, - pub block_time: u64, -} - #[derive(Debug)] pub struct LeaderConfig { pub checkpoint_block_number: u64, @@ -31,24 +22,21 @@ pub struct LeaderConfig { } /// The main function for the client. -pub(crate) async fn client_main( +pub(crate) async fn client_main( proof_runtime: Arc, - rpc_params: RpcParams, + cached_provider: Arc>, + block_time: u64, block_interval: BlockInterval, mut leader_config: LeaderConfig, -) -> Result<()> { +) -> Result<()> +where + ProviderT: Provider + 'static, + TransportT: Transport + Clone, +{ use futures::StreamExt; let test_only = leader_config.prover_config.test_only; - let cached_provider = Arc::new(zero::provider::CachedProvider::new( - build_http_retry_provider( - rpc_params.rpc_url.clone(), - rpc_params.backoff, - rpc_params.max_retries, - )?, - )); - if !test_only { // For actual proof runs, perform a sanity check on the provided inputs. check_previous_proof_and_checkpoint( @@ -76,7 +64,7 @@ pub(crate) async fn client_main( let mut block_interval_stream: BlockIntervalStream = match block_interval { block_interval @ BlockInterval::FollowFrom { .. } => { block_interval - .into_unbounded_stream(cached_provider.clone(), rpc_params.block_time) + .into_unbounded_stream(cached_provider.clone(), block_time) .await? } _ => block_interval.into_bounded_stream()?, @@ -92,7 +80,6 @@ pub(crate) async fn client_main( cached_provider.clone(), block_id, leader_config.checkpoint_block_number, - rpc_params.rpc_type, ) .await?; block_tx diff --git a/zero/src/bin/rpc.rs b/zero/src/bin/rpc.rs index 164751df2..a8a42a6d4 100644 --- a/zero/src/bin/rpc.rs +++ b/zero/src/bin/rpc.rs @@ -25,7 +25,6 @@ struct FetchParams { pub start_block: u64, pub end_block: u64, pub checkpoint_block_number: Option, - pub rpc_type: RpcType, } #[derive(Args, Clone, Debug)] @@ -98,13 +97,9 @@ where let (block_num, _is_last_block) = block_interval_elem?; let block_id = BlockId::Number(BlockNumberOrTag::Number(block_num)); // Get the prover input for particular block. - let result = rpc::block_prover_input( - cached_provider.clone(), - block_id, - checkpoint_block_number, - params.rpc_type, - ) - .await?; + let result = + rpc::block_prover_input(cached_provider.clone(), block_id, checkpoint_block_number) + .await?; block_prover_inputs.push(result); } @@ -114,11 +109,12 @@ where impl Cli { /// Execute the cli command. pub async fn execute(self) -> anyhow::Result<()> { - let cached_provider = Arc::new(CachedProvider::new(build_http_retry_provider( + let retry_provider = build_http_retry_provider( self.config.rpc_url.clone(), self.config.backoff, self.config.max_retries, - )?)); + )?; + let cached_provider = Arc::new(CachedProvider::new(retry_provider, self.config.rpc_type)); match self.command { Command::Fetch { @@ -130,7 +126,6 @@ impl Cli { start_block, end_block, checkpoint_block_number, - rpc_type: self.config.rpc_type, }; let block_prover_inputs = @@ -156,7 +151,6 @@ impl Cli { start_block: block_number, end_block: block_number, checkpoint_block_number: None, - rpc_type: self.config.rpc_type, }; let block_prover_inputs = diff --git a/zero/src/block_interval.rs b/zero/src/block_interval.rs index e424076e0..7ca4f2366 100644 --- a/zero/src/block_interval.rs +++ b/zero/src/block_interval.rs @@ -1,16 +1,14 @@ +use std::ops::Range; use std::pin::Pin; use std::sync::Arc; -use alloy::primitives::B256; use alloy::rpc::types::eth::BlockId; -use alloy::{hex, providers::Provider, transports::Transport}; use anyhow::{anyhow, Result}; use async_stream::try_stream; use futures::Stream; use tracing::info; -use crate::parsing; -use crate::provider::CachedProvider; +use crate::provider::BlockProvider; /// The async stream of block numbers. /// The second bool flag indicates if the element is last in the interval. @@ -20,9 +18,9 @@ pub type BlockIntervalStream = Pin), + Range(Range), // Dynamic interval from the start block to the latest network block FollowFrom { // Interval starting block number @@ -31,64 +29,42 @@ pub enum BlockInterval { } impl BlockInterval { - /// Create a new block interval + /// Creates a new block interval. /// - /// A valid block range is of the form: - /// * `block_number` for a single block number - /// * `lhs..rhs`, `lhs..=rhs` as an exclusive/inclusive range - /// * `lhs..` for a range starting from `lhs` to the chain tip. `lhs..=` - /// is also valid format. + /// If end_block is None, the interval is unbounded and will follow from + /// start_block. If start_block == end_block, the interval is a single + /// block. Otherwise the interval is a range from start_block to end_block. /// - /// # Example - /// - /// ```rust - /// # use alloy::rpc::types::eth::BlockId; - /// # use zero::block_interval::BlockInterval; - /// assert_eq!(BlockInterval::new("0..10").unwrap(), BlockInterval::Range(0..10)); - /// assert_eq!(BlockInterval::new("0..=10").unwrap(), BlockInterval::Range(0..11)); - /// assert_eq!(BlockInterval::new("32141").unwrap(), BlockInterval::SingleBlockId(BlockId::Number(32141.into()))); - /// assert_eq!(BlockInterval::new("100..").unwrap(), BlockInterval::FollowFrom{start_block: 100}); - /// ``` - pub fn new(s: &str) -> anyhow::Result { - if (s.starts_with("0x") && s.len() == 66) || s.len() == 64 { - // Try to parse hash - let hash = s - .parse::() - .map_err(|_| anyhow!("invalid block hash '{s}'"))?; - return Ok(BlockInterval::SingleBlockId(BlockId::Hash(hash.into()))); - } - - // First we parse for inclusive range and then for exclusive range, - // because both separators start with `..` - if let Ok(range) = parsing::parse_range_inclusive(s) { - Ok(BlockInterval::Range(range)) - } else if let Ok(range) = parsing::parse_range_exclusive(s) { - Ok(BlockInterval::Range(range)) - } - // Now we look for the follow from range - else if s.contains("..") { - let mut split = s.trim().split("..").filter(|s| *s != "=" && !s.is_empty()); + /// end_block is treated as inclusive because it may have been specified + /// as a block hash. + pub async fn new( + provider: Arc, + start_block: BlockId, + end_block: Option, + ) -> Result { + // Ensure the start block is a valid block number. + let start_block_num = Self::block_to_num(provider.clone(), start_block).await?; - // Any other character after `..` or `..=` is invalid - if split.clone().count() > 1 { - return Err(anyhow!("invalid block interval range '{s}'")); + // Create the block interval. + match end_block { + // Start and end are the same. + Some(end_block) if end_block == start_block => { + Ok(BlockInterval::SingleBlockId(start_block_num)) } - let num = split - .next() - .map(|num| { - num.parse::() - .map_err(|_| anyhow!("invalid block number '{num}'")) - }) - .ok_or(anyhow!("invalid block interval range '{s}'"))??; - return Ok(BlockInterval::FollowFrom { start_block: num }); - } - // Only single block number is left to try to parse - else { - let num: u64 = s - .trim() - .parse() - .map_err(|_| anyhow!("invalid block interval range '{s}'"))?; - return Ok(BlockInterval::SingleBlockId(BlockId::Number(num.into()))); + // Bounded range provided. + Some(end_block) => { + let end_block_num = Self::block_to_num(provider.clone(), end_block).await?; + if end_block_num <= start_block_num { + return Err(anyhow!( + "invalid block interval range ({start_block_num}..{end_block_num})" + )); + } + Ok(BlockInterval::Range(start_block_num..end_block_num + 1)) + } + // Unbounded range provided. + None => Ok(BlockInterval::FollowFrom { + start_block: start_block_num, + }), } } @@ -96,10 +72,7 @@ impl BlockInterval { /// second bool flag indicates if the element is last in the interval. pub fn into_bounded_stream(self) -> Result { match self { - BlockInterval::SingleBlockId(BlockId::Number(num)) => { - let num = num - .as_number() - .ok_or(anyhow!("invalid block number '{num}'"))?; + BlockInterval::SingleBlockId(num) => { let range = (num..num + 1).map(|it| Ok((it, true))).collect::>(); Ok(Box::pin(futures::stream::iter(range))) @@ -110,42 +83,33 @@ impl BlockInterval { range.last_mut().map(|it| it.as_mut().map(|it| it.1 = true)); Ok(Box::pin(futures::stream::iter(range))) } - _ => Err(anyhow!( + BlockInterval::FollowFrom { .. } => Err(anyhow!( "could not create bounded stream from unbounded follow-from interval", )), } } + /// Returns the start block number of the interval. pub fn get_start_block(&self) -> Result { match self { - BlockInterval::SingleBlockId(BlockId::Number(num)) => { - let num_value = num - .as_number() - .ok_or_else(|| anyhow!("invalid block number '{num}'"))?; - Ok(num_value) // Return the valid block number - } + BlockInterval::SingleBlockId(num) => Ok(*num), BlockInterval::Range(range) => Ok(range.start), BlockInterval::FollowFrom { start_block, .. } => Ok(*start_block), - _ => Err(anyhow!("Unknown BlockInterval variant")), // Handle unknown variants } } /// Convert the block interval into an unbounded async stream of block /// numbers. Query the blockchain node for the latest block number. - pub async fn into_unbounded_stream( + pub async fn into_unbounded_stream( self, - cached_provider: Arc>, + provider: Arc, block_time: u64, - ) -> Result - where - ProviderT: Provider + 'static, - TransportT: Transport + Clone, - { + ) -> Result { match self { BlockInterval::FollowFrom { start_block } => Ok(Box::pin(try_stream! { let mut current = start_block; loop { - let last_block_number = cached_provider.get_provider().await?.get_block_number().await.map_err(|e: alloy::transports::RpcError<_>| { + let last_block_number = provider.latest_block_number().await.map_err(|e| { anyhow!("could not retrieve latest block number from the provider: {e}") })?; @@ -166,15 +130,40 @@ impl BlockInterval { )), } } + + /// Converts a [`BlockId`] into a block number by querying the provider. + pub async fn block_to_num( + provider: Arc, + block: BlockId, + ) -> Result { + let block_num = match block { + // Number already provided + BlockId::Number(num) => num + .as_number() + .ok_or_else(|| anyhow!("invalid block number '{num}'"))?, + + // Hash provided, query the provider for the block number. + BlockId::Hash(hash) => { + let block = provider + .get_block_by_id(BlockId::Hash(hash)) + .await + .map_err(|e| { + anyhow!("could not retrieve block number by hash from the provider: {e}") + })?; + block + .ok_or(anyhow!("block not found {hash}"))? + .header + .number + } + }; + Ok(block_num) + } } impl std::fmt::Display for BlockInterval { fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result { match self { - BlockInterval::SingleBlockId(block_id) => match block_id { - BlockId::Number(it) => f.write_fmt(format_args!("{}", it)), - BlockId::Hash(it) => f.write_fmt(format_args!("0x{}", &hex::encode(it.block_hash))), - }, + BlockInterval::SingleBlockId(num) => f.write_fmt(format_args!("{}", num)), BlockInterval::Range(range) => { write!(f, "{}..{}", range.start, range.end) } @@ -185,92 +174,167 @@ impl std::fmt::Display for BlockInterval { } } -impl std::str::FromStr for BlockInterval { - type Err = anyhow::Error; - - fn from_str(s: &str) -> Result { - BlockInterval::new(s) - } -} - #[cfg(test)] mod test { use alloy::primitives::B256; + use alloy::rpc::types::{Block, Header, Transaction}; + use mockall::predicate::*; use super::*; + use crate::provider::MockBlockProvider; - #[test] - fn can_create_block_interval_from_exclusive_range() { + #[tokio::test] + async fn can_create_block_interval_from_inclusive_range() { assert_eq!( - BlockInterval::new("0..10").unwrap(), - BlockInterval::Range(0..10) + BlockInterval::new( + Arc::new(MockBlockProvider::new()), + BlockId::from(0), + Some(BlockId::from(10)) + ) + .await + .unwrap(), + BlockInterval::Range(0..11) ); } - #[test] - fn can_create_block_interval_from_inclusive_range() { + #[tokio::test] + async fn can_create_follow_from_block_interval() { assert_eq!( - BlockInterval::new("0..=10").unwrap(), - BlockInterval::Range(0..11) + BlockInterval::new(Arc::new(MockBlockProvider::new()), BlockId::from(100), None) + .await + .unwrap(), + BlockInterval::FollowFrom { start_block: 100 } ); } - #[test] - fn can_create_follow_from_block_interval() { + #[tokio::test] + async fn can_create_single_block_interval() { assert_eq!( - BlockInterval::new("100..").unwrap(), - BlockInterval::FollowFrom { start_block: 100 } + BlockInterval::new( + Arc::new(MockBlockProvider::new()), + BlockId::from(123415131), + Some(BlockId::from(123415131)) + ) + .await + .unwrap(), + BlockInterval::SingleBlockId(123415131) ); } - #[test] - fn can_create_single_block_interval() { + #[tokio::test] + async fn cannot_create_invalid_range() { assert_eq!( - BlockInterval::new("123415131").unwrap(), - BlockInterval::SingleBlockId(BlockId::Number(123415131.into())) + BlockInterval::new( + Arc::new(MockBlockProvider::new()), + BlockId::from(123415131), + Some(BlockId::from(0)) + ) + .await + .unwrap_err() + .to_string(), + anyhow!("invalid block interval range (123415131..0)").to_string() ); } - #[test] - fn new_interval_proper_single_block_error() { + #[tokio::test] + async fn can_create_single_block_interval_from_hash() { + // Mock the block for single block interval. + let mut mock = MockBlockProvider::new(); + let block_id = BlockId::Hash( + "0xb51ceca7ba912779ed6721d2b93849758af0d2354683170fb71dead6e439e6cb" + .parse::() + .unwrap() + .into(), + ); + mock_block(&mut mock, block_id, 12345); + + // Create the interval. + let mock = Arc::new(mock); assert_eq!( - BlockInterval::new("113A").err().unwrap().to_string(), - "invalid block interval range '113A'" + BlockInterval::new(mock, block_id, Some(block_id)) + .await + .unwrap(), + BlockInterval::SingleBlockId(12345) ); } - #[test] - fn new_interval_proper_range_error() { + #[tokio::test] + async fn can_create_block_interval_from_inclusive_hash_range() { + // Mock the blocks for the range. + let mut mock = MockBlockProvider::new(); + let start_block_id = BlockId::Hash( + "0xb51ceca7ba912779ed6721d2b93849758af0d2354683170fb71dead6e439e6cb" + .parse::() + .unwrap() + .into(), + ); + mock_block(&mut mock, start_block_id, 12345); + let end_block_id = BlockId::Hash( + "0x351ceca7ba912779ed6721d2b93849758af0d2354683170fb71dead6e439e6cb" + .parse::() + .unwrap() + .into(), + ); + mock_block(&mut mock, end_block_id, 12355); + + // Create the interval. + let mock = Arc::new(mock); assert_eq!( - BlockInterval::new("111...156").err().unwrap().to_string(), - "invalid block interval range '111...156'" + BlockInterval::new(mock, start_block_id, Some(end_block_id)) + .await + .unwrap(), + BlockInterval::Range(12345..12356) ); } - #[test] - fn new_interval_parse_block_hash() { + #[tokio::test] + async fn can_create_follow_from_block_interval_hash() { + // Mock a block for range to start from. + let start_block_id = BlockId::Hash( + "0xb51ceca7ba912779ed6721d2b93849758af0d2354683170fb71dead6e439e6cb" + .parse::() + .unwrap() + .into(), + ); + let mut mock = MockBlockProvider::new(); + mock_block(&mut mock, start_block_id, 12345); + + // Create the interval. + let mock = Arc::new(mock); assert_eq!( - BlockInterval::new( - "0xb51ceca7ba912779ed6721d2b93849758af0d2354683170fb71dead6e439e6cb" - ) - .unwrap(), - BlockInterval::SingleBlockId(BlockId::Hash( - "0xb51ceca7ba912779ed6721d2b93849758af0d2354683170fb71dead6e439e6cb" - .parse::() - .unwrap() - .into() - )) - ) + BlockInterval::new(mock, start_block_id, None) + .await + .unwrap(), + BlockInterval::FollowFrom { start_block: 12345 } + ); + } + + /// Configures the mock to expect a query for a block by id and return the + /// expected block number. + fn mock_block(mock: &mut MockBlockProvider, query_id: BlockId, resulting_block_num: u64) { + let mut block: Block = Block::default(); + block.header.number = resulting_block_num; + mock.expect_get_block_by_id() + .with(eq(query_id)) + .returning(move |_| { + let block = block.clone(); + Box::pin(async move { Ok(Some(block)) }) + }); } #[tokio::test] async fn can_into_bounded_stream() { use futures::StreamExt; let mut result = Vec::new(); - let mut stream = BlockInterval::new("1..10") - .unwrap() - .into_bounded_stream() - .unwrap(); + let mut stream = BlockInterval::new( + Arc::new(MockBlockProvider::new()), + BlockId::from(1), + Some(BlockId::from(9)), + ) + .await + .unwrap() + .into_bounded_stream() + .unwrap(); while let Some(val) = stream.next().await { result.push(val.unwrap()); } @@ -281,13 +345,4 @@ mod test { expected.last_mut().unwrap().1 = true; assert_eq!(result, expected); } - - #[test] - fn can_create_from_string() { - use std::str::FromStr; - assert_eq!( - &format!("{}", BlockInterval::from_str("0..10").unwrap()), - "0..10" - ); - } } diff --git a/zero/src/provider.rs b/zero/src/provider.rs index 876cb270c..3bded7d39 100644 --- a/zero/src/provider.rs +++ b/zero/src/provider.rs @@ -1,3 +1,4 @@ +use std::future::Future; use std::ops::{Deref, DerefMut}; use std::sync::Arc; @@ -5,11 +6,24 @@ use alloy::primitives::BlockHash; use alloy::rpc::types::{Block, BlockId, BlockTransactionsKind}; use alloy::{providers::Provider, transports::Transport}; use anyhow::Context; +use mockall::automock; use tokio::sync::{Mutex, Semaphore, SemaphorePermit}; +use crate::rpc::RpcType; + const CACHE_SIZE: usize = 1024; const MAX_NUMBER_OF_PARALLEL_REQUESTS: usize = 128; +#[automock] +pub trait BlockProvider { + fn get_block_by_id( + &self, + block_id: BlockId, + ) -> impl Future>> + Send; + + fn latest_block_number(&self) -> impl Future> + Send; +} + /// Wrapper around alloy provider to cache blocks and other /// frequently used data. pub struct CachedProvider { @@ -22,6 +36,8 @@ pub struct CachedProvider { blocks_by_number: Arc>>, blocks_by_hash: Arc>>, _phantom: std::marker::PhantomData, + + pub rpc_type: RpcType, } pub struct ProviderGuard<'a, ProviderT> { @@ -48,7 +64,7 @@ where ProviderT: Provider, TransportT: Transport + Clone, { - pub fn new(provider: ProviderT) -> Self { + pub fn new(provider: ProviderT, rpc_type: RpcType) -> Self { Self { provider: provider.into(), semaphore: Arc::new(Semaphore::new(MAX_NUMBER_OF_PARALLEL_REQUESTS)), @@ -58,6 +74,7 @@ where blocks_by_hash: Arc::new(Mutex::new(lru::LruCache::new( std::num::NonZero::new(CACHE_SIZE).unwrap(), ))), + rpc_type, _phantom: std::marker::PhantomData, } } @@ -118,3 +135,20 @@ where } } } + +impl BlockProvider for CachedProvider +where + ProviderT: Provider, + TransportT: Transport + Clone, +{ + async fn get_block_by_id(&self, block_id: BlockId) -> anyhow::Result> { + Ok(Some( + self.get_block(block_id, BlockTransactionsKind::Hashes) + .await?, + )) + } + + async fn latest_block_number(&self) -> anyhow::Result { + Ok(self.provider.get_block_number().await?) + } +} diff --git a/zero/src/rpc/mod.rs b/zero/src/rpc/mod.rs index 007a4fdb2..40e9cd9a7 100644 --- a/zero/src/rpc/mod.rs +++ b/zero/src/rpc/mod.rs @@ -45,13 +45,12 @@ pub async fn block_prover_input( cached_provider: Arc>, block_id: BlockId, checkpoint_block_number: u64, - rpc_type: RpcType, ) -> Result where ProviderT: Provider, TransportT: Transport + Clone, { - match rpc_type { + match cached_provider.rpc_type { RpcType::Jerigon => { jerigon::block_prover_input(cached_provider, block_id, checkpoint_block_number).await }