diff --git a/core/node/consensus/src/en.rs b/core/node/consensus/src/en.rs index c358974fb0c..518a7ebb29a 100644 --- a/core/node/consensus/src/en.rs +++ b/core/node/consensus/src/en.rs @@ -4,7 +4,7 @@ use anyhow::Context as _; use zksync_concurrency::{ctx, error::Wrap as _, scope, time}; use zksync_consensus_executor::{self as executor, attestation}; use zksync_consensus_roles::{attester, validator}; -use zksync_consensus_storage::BlockStore; +use zksync_consensus_storage::{BlockStore, PersistentBlockStore as _}; use zksync_dal::consensus_dal; use zksync_node_sync::{fetcher::FetchedBlock, sync_action::ActionQueueSender, SyncState}; use zksync_types::L2BlockNumber; @@ -21,6 +21,10 @@ use crate::{ storage::{self, ConnectionPool}, }; +/// If less than TEMPORARY_FETCHER_THRESHOLD certificates are missing, +/// the temporary fetcher will stop fetching blocks. +pub(crate) const TEMPORARY_FETCHER_THRESHOLD: u64 = 10; + /// External node. pub(super) struct EN { pub(super) pool: ConnectionPool, @@ -120,6 +124,20 @@ impl EN { .wrap("Store::new()")?; s.spawn_bg(async { Ok(runner.run(ctx).await?) }); + // Run the temporary fetcher until the certificates are backfilled. + // Temporary fetcher should be removed once json RPC syncing is fully deprecated. + s.spawn_bg({ + let store = store.clone(); + async { + let store = store; + self.temporary_block_fetcher(ctx, &store).await?; + tracing::info!( + "temporary block fetcher finished, switching to p2p fetching only" + ); + Ok(()) + } + }); + let (block_store, runner) = BlockStore::new(ctx, Box::new(store.clone())) .await .wrap("BlockStore::new()")?; @@ -358,8 +376,42 @@ impl EN { } } + /// Fetches blocks from the main node directly, until the certificates + /// are backfilled. This allows for smooth transition from json RPC to p2p block syncing. + pub(crate) async fn temporary_block_fetcher( + &self, + ctx: &ctx::Ctx, + store: &Store, + ) -> ctx::Result<()> { + const MAX_CONCURRENT_REQUESTS: usize = 30; + scope::run!(ctx, |ctx, s| async { + let (send, mut recv) = ctx::channel::bounded(MAX_CONCURRENT_REQUESTS); + s.spawn(async { + let Some(mut next) = store.next_block(ctx).await? else { + return Ok(()); + }; + while store.persisted().borrow().next().0 + TEMPORARY_FETCHER_THRESHOLD < next.0 { + let n = L2BlockNumber(next.0.try_into().context("overflow")?); + self.sync_state.wait_for_main_node_block(ctx, n).await?; + send.send(ctx, s.spawn(self.fetch_block(ctx, n))).await?; + next = next.next(); + } + drop(send); + Ok(()) + }); + while let Ok(block) = recv.recv_or_disconnected(ctx).await? { + store + .queue_next_fetched_block(ctx, block.join(ctx).await?) + .await + .wrap("queue_next_fetched_block()")?; + } + Ok(()) + }) + .await + } + /// Fetches blocks from the main node in range `[cursor.next()..end)`. - pub(super) async fn fetch_blocks( + async fn fetch_blocks( &self, ctx: &ctx::Ctx, queue: &mut storage::PayloadQueue, @@ -373,7 +425,7 @@ impl EN { s.spawn(async { let send = send; while end.map_or(true, |end| next < end) { - let n = L2BlockNumber(next.0.try_into().unwrap()); + let n = L2BlockNumber(next.0.try_into().context("overflow")?); self.sync_state.wait_for_main_node_block(ctx, n).await?; send.send(ctx, s.spawn(self.fetch_block(ctx, n))).await?; next = next.next(); diff --git a/core/node/consensus/src/storage/store.rs b/core/node/consensus/src/storage/store.rs index ed83758ba9f..7267d7e1c82 100644 --- a/core/node/consensus/src/storage/store.rs +++ b/core/node/consensus/src/storage/store.rs @@ -111,6 +111,30 @@ impl Store { async fn conn(&self, ctx: &ctx::Ctx) -> ctx::Result { self.pool.connection(ctx).await.wrap("connection") } + + /// Number of the next block to queue. + pub(crate) async fn next_block( + &self, + ctx: &ctx::Ctx, + ) -> ctx::OrCanceled> { + Ok(sync::lock(ctx, &self.block_payloads) + .await? + .as_ref() + .map(|p| p.next())) + } + + /// Queues the next block. + pub(crate) async fn queue_next_fetched_block( + &self, + ctx: &ctx::Ctx, + block: FetchedBlock, + ) -> ctx::Result<()> { + let mut payloads = sync::lock(ctx, &self.block_payloads).await?.into_async(); + if let Some(payloads) = &mut *payloads { + payloads.send(block).await.context("payloads.send()")?; + } + Ok(()) + } } impl PersistedBlockState { diff --git a/core/node/consensus/src/testonly.rs b/core/node/consensus/src/testonly.rs index 2ba961dacc3..4ebcf5c9a61 100644 --- a/core/node/consensus/src/testonly.rs +++ b/core/node/consensus/src/testonly.rs @@ -45,7 +45,10 @@ use zksync_types::{ }; use zksync_web3_decl::client::{Client, DynClient, L2}; -use crate::{en, storage::ConnectionPool}; +use crate::{ + en, + storage::{ConnectionPool, Store}, +}; /// Fake StateKeeper for tests. #[derive(Debug)] @@ -417,6 +420,40 @@ impl StateKeeper { .await } + pub async fn run_temporary_fetcher( + self, + ctx: &ctx::Ctx, + client: Box>, + ) -> ctx::Result<()> { + scope::run!(ctx, |ctx, s| async { + let payload_queue = self + .pool + .connection(ctx) + .await + .wrap("connection()")? + .new_payload_queue(ctx, self.actions_sender, self.sync_state.clone()) + .await + .wrap("new_payload_queue()")?; + let (store, runner) = Store::new( + ctx, + self.pool.clone(), + Some(payload_queue), + Some(client.clone()), + ) + .await + .wrap("Store::new()")?; + s.spawn_bg(async { Ok(runner.run(ctx).await?) }); + en::EN { + pool: self.pool.clone(), + client, + sync_state: self.sync_state.clone(), + } + .temporary_block_fetcher(ctx, &store) + .await + }) + .await + } + /// Runs consensus node for the external node. pub async fn run_consensus( self, diff --git a/core/node/consensus/src/tests/mod.rs b/core/node/consensus/src/tests/mod.rs index 94fbcbb90d8..8da17cfba8a 100644 --- a/core/node/consensus/src/tests/mod.rs +++ b/core/node/consensus/src/tests/mod.rs @@ -13,8 +13,10 @@ use zksync_consensus_storage::{BlockStore, PersistentBlockStore}; use zksync_dal::consensus_dal; use zksync_test_account::Account; use zksync_types::ProtocolVersionId; +use zksync_web3_decl::namespaces::EnNamespaceClient as _; use crate::{ + en::TEMPORARY_FETCHER_THRESHOLD, mn::run_main_node, storage::{ConnectionPool, Store}, testonly, @@ -665,6 +667,136 @@ async fn test_p2p_fetcher_backfill_certs( .unwrap(); } +// Test temporary fetcher fetching blocks if a lot of certs are missing. +#[test_casing(8, Product((FROM_SNAPSHOT,VERSIONS,PREGENESIS)))] +#[tokio::test] +async fn test_temporary_fetcher(from_snapshot: bool, version: ProtocolVersionId, pregenesis: bool) { + zksync_concurrency::testonly::abort_on_panic(); + let ctx = &ctx::test_root(&ctx::AffineClock::new(10.)); + let rng = &mut ctx.rng(); + // We force certs to be missing on EN by having 1 of the validators permanently offline. + // This way no blocks will be finalized at all, so no one will have certs. + let setup = Setup::new(rng, 2); + let validator_cfg = testonly::new_configs(rng, &setup, 0, pregenesis)[0].clone(); + let node_cfg = validator_cfg.new_fullnode(rng); + let account = &mut Account::random(); + + scope::run!(ctx, |ctx, s| async { + tracing::info!("Spawn validator."); + let validator_pool = ConnectionPool::test(from_snapshot, version).await; + let (mut validator, runner) = + testonly::StateKeeper::new(ctx, validator_pool.clone()).await?; + s.spawn_bg(runner.run(ctx)); + s.spawn_bg(run_main_node( + ctx, + validator_cfg.config.clone(), + validator_cfg.secrets.clone(), + validator_pool.clone(), + )); + // API server needs at least 1 L1 batch to start. + validator.seal_batch().await; + let client = validator.connect(ctx).await?; + + // Wait for the consensus to be initialized. + while ctx.wait(client.consensus_global_config()).await??.is_none() { + ctx.sleep(time::Duration::milliseconds(100)).await?; + } + + let node_pool = ConnectionPool::test(from_snapshot, version).await; + + tracing::info!("Run centralized fetcher, so that there is a lot of certs missing."); + scope::run!(ctx, |ctx, s| async { + let (node, runner) = testonly::StateKeeper::new(ctx, node_pool.clone()).await?; + s.spawn_bg(runner.run(ctx)); + s.spawn_bg(node.run_fetcher(ctx, client.clone())); + validator + .push_random_blocks(rng, account, TEMPORARY_FETCHER_THRESHOLD as usize + 1) + .await; + node_pool + .wait_for_payload(ctx, validator.last_block()) + .await?; + Ok(()) + }) + .await + .unwrap(); + + tracing::info!( + "Run p2p fetcher. Blocks should be fetched by the temporary fetcher anyway." + ); + scope::run!(ctx, |ctx, s| async { + let (node, runner) = testonly::StateKeeper::new(ctx, node_pool.clone()).await?; + s.spawn_bg(runner.run(ctx)); + s.spawn_bg(node.run_consensus(ctx, client.clone(), node_cfg.clone())); + validator.push_random_blocks(rng, account, 5).await; + node_pool + .wait_for_payload(ctx, validator.last_block()) + .await?; + Ok(()) + }) + .await + .unwrap(); + Ok(()) + }) + .await + .unwrap(); +} + +// Test that temporary fetcher terminates once enough blocks have certs. +#[test_casing(4, Product((FROM_SNAPSHOT,VERSIONS)))] +#[tokio::test] +async fn test_temporary_fetcher_termination(from_snapshot: bool, version: ProtocolVersionId) { + zksync_concurrency::testonly::abort_on_panic(); + let ctx = &ctx::test_root(&ctx::AffineClock::new(10.)); + let rng = &mut ctx.rng(); + let setup = Setup::new(rng, 1); + let pregenesis = true; + let validator_cfg = testonly::new_configs(rng, &setup, 0, pregenesis)[0].clone(); + let node_cfg = validator_cfg.new_fullnode(rng); + let account = &mut Account::random(); + + scope::run!(ctx, |ctx, s| async { + tracing::info!("Spawn validator."); + let validator_pool = ConnectionPool::test(from_snapshot, version).await; + let (mut validator, runner) = + testonly::StateKeeper::new(ctx, validator_pool.clone()).await?; + s.spawn_bg(runner.run(ctx)); + s.spawn_bg(run_main_node( + ctx, + validator_cfg.config.clone(), + validator_cfg.secrets.clone(), + validator_pool.clone(), + )); + // API server needs at least 1 L1 batch to start. + validator.seal_batch().await; + let client = validator.connect(ctx).await?; + + let node_pool = ConnectionPool::test(from_snapshot, version).await; + + // Run the EN so the consensus is initialized on EN and wait for it to sync. + scope::run!(ctx, |ctx, s| async { + let (node, runner) = testonly::StateKeeper::new(ctx, node_pool.clone()).await?; + s.spawn_bg(runner.run(ctx)); + s.spawn_bg(node.run_consensus(ctx, client.clone(), node_cfg.clone())); + validator.push_random_blocks(rng, account, 5).await; + node_pool + .wait_for_payload(ctx, validator.last_block()) + .await?; + Ok(()) + }) + .await + .unwrap(); + + // Run the temporary fetcher. It should terminate immediately, since EN is synced. + let (node, runner) = testonly::StateKeeper::new(ctx, node_pool.clone()).await?; + s.spawn_bg(runner.run(ctx)); + node.run_temporary_fetcher(ctx, client).await?; + + Ok(()) + }) + .await + .unwrap(); +} + #[test_casing(4, Product((VERSIONS,PREGENESIS)))] #[tokio::test] async fn test_with_pruning(version: ProtocolVersionId, pregenesis: bool) {