diff --git a/core/bin/external_node/src/main.rs b/core/bin/external_node/src/main.rs index 1cf17bc8400..07ee4140602 100644 --- a/core/bin/external_node/src/main.rs +++ b/core/bin/external_node/src/main.rs @@ -218,7 +218,7 @@ async fn init_tasks( .await .context("failed to build a connection pool for BatchStatusUpdater")?, ) - .await; + .context("failed initializing batch status updater")?; // Run the components. let tree_stop_receiver = stop_receiver.clone(); diff --git a/core/lib/dal/.sqlx/query-4cdc90ed409b37b3c1c57bbcca9f82918afa1b0ac410325e4d00cd1c4fdd1e8b.json b/core/lib/dal/.sqlx/query-4cdc90ed409b37b3c1c57bbcca9f82918afa1b0ac410325e4d00cd1c4fdd1e8b.json deleted file mode 100644 index 6210b70e4d6..00000000000 --- a/core/lib/dal/.sqlx/query-4cdc90ed409b37b3c1c57bbcca9f82918afa1b0ac410325e4d00cd1c4fdd1e8b.json +++ /dev/null @@ -1,134 +0,0 @@ -{ - "db_name": "PostgreSQL", - "query": "\n SELECT\n number,\n l1_tx_count,\n l2_tx_count,\n timestamp,\n is_finished,\n fee_account_address,\n l2_to_l1_logs,\n l2_to_l1_messages,\n bloom,\n priority_ops_onchain_data,\n used_contract_hashes,\n base_fee_per_gas,\n l1_gas_price,\n l2_fair_gas_price,\n bootloader_code_hash,\n default_aa_code_hash,\n protocol_version,\n compressed_state_diffs,\n system_logs,\n pubdata_input\n FROM\n l1_batches\n ORDER BY\n number DESC\n LIMIT\n 1\n ", - "describe": { - "columns": [ - { - "ordinal": 0, - "name": "number", - "type_info": "Int8" - }, - { - "ordinal": 1, - "name": "l1_tx_count", - "type_info": "Int4" - }, - { - "ordinal": 2, - "name": "l2_tx_count", - "type_info": "Int4" - }, - { - "ordinal": 3, - "name": "timestamp", - "type_info": "Int8" - }, - { - "ordinal": 4, - "name": "is_finished", - "type_info": "Bool" - }, - { - "ordinal": 5, - "name": "fee_account_address", - "type_info": "Bytea" - }, - { - "ordinal": 6, - "name": "l2_to_l1_logs", - "type_info": "ByteaArray" - }, - { - "ordinal": 7, - "name": "l2_to_l1_messages", - "type_info": "ByteaArray" - }, - { - "ordinal": 8, - "name": "bloom", - "type_info": "Bytea" - }, - { - "ordinal": 9, - "name": "priority_ops_onchain_data", - "type_info": "ByteaArray" - }, - { - "ordinal": 10, - "name": "used_contract_hashes", - "type_info": "Jsonb" - }, - { - "ordinal": 11, - "name": "base_fee_per_gas", - "type_info": "Numeric" - }, - { - "ordinal": 12, - "name": "l1_gas_price", - "type_info": "Int8" - }, - { - "ordinal": 13, - "name": "l2_fair_gas_price", - "type_info": "Int8" - }, - { - "ordinal": 14, - "name": "bootloader_code_hash", - "type_info": "Bytea" - }, - { - "ordinal": 15, - "name": "default_aa_code_hash", - "type_info": "Bytea" - }, - { - "ordinal": 16, - "name": "protocol_version", - "type_info": "Int4" - }, - { - "ordinal": 17, - "name": "compressed_state_diffs", - "type_info": "Bytea" - }, - { - "ordinal": 18, - "name": "system_logs", - "type_info": "ByteaArray" - }, - { - "ordinal": 19, - "name": "pubdata_input", - "type_info": "Bytea" - } - ], - "parameters": { - "Left": [] - }, - "nullable": [ - false, - false, - false, - false, - false, - false, - false, - false, - false, - false, - false, - false, - false, - false, - true, - true, - true, - true, - false, - true - ] - }, - "hash": "4cdc90ed409b37b3c1c57bbcca9f82918afa1b0ac410325e4d00cd1c4fdd1e8b" -} diff --git a/core/lib/dal/src/blocks_dal.rs b/core/lib/dal/src/blocks_dal.rs index 9d581adc763..cf9cb03f3d4 100644 --- a/core/lib/dal/src/blocks_dal.rs +++ b/core/lib/dal/src/blocks_dal.rs @@ -1746,46 +1746,6 @@ impl BlocksDal<'_, '_> { Ok(Some((H256::from_slice(&hash), row.timestamp as u64))) } - pub async fn get_newest_l1_batch_header(&mut self) -> sqlx::Result { - let last_l1_batch = sqlx::query_as!( - StorageL1BatchHeader, - r#" - SELECT - number, - l1_tx_count, - l2_tx_count, - timestamp, - is_finished, - fee_account_address, - l2_to_l1_logs, - l2_to_l1_messages, - bloom, - priority_ops_onchain_data, - used_contract_hashes, - base_fee_per_gas, - l1_gas_price, - l2_fair_gas_price, - bootloader_code_hash, - default_aa_code_hash, - protocol_version, - compressed_state_diffs, - system_logs, - pubdata_input - FROM - l1_batches - ORDER BY - number DESC - LIMIT - 1 - "# - ) - .instrument("get_newest_l1_batch_header") - .fetch_one(self.storage.conn()) - .await?; - - Ok(last_l1_batch.into()) - } - pub async fn get_l1_batch_metadata( &mut self, number: L1BatchNumber, diff --git a/core/lib/zksync_core/src/consensus/testonly.rs b/core/lib/zksync_core/src/consensus/testonly.rs index b14403ad294..83d68a812d4 100644 --- a/core/lib/zksync_core/src/consensus/testonly.rs +++ b/core/lib/zksync_core/src/consensus/testonly.rs @@ -183,11 +183,20 @@ impl StateKeeper { .await .context("ensure_genesis_state()")?; } - let last_batch = storage + + let last_l1_batch_number = storage + .blocks_dal() + .get_sealed_l1_batch_number() + .await + .context("get_sealed_l1_batch_number()")? + .context("no L1 batches in storage")?; + let last_miniblock_header = storage .blocks_dal() - .get_newest_l1_batch_header() + .get_last_sealed_miniblock_header() .await - .context("get_newest_l1_batch_header()")?; + .context("get_last_sealed_miniblock_header()")? + .context("no miniblocks in storage")?; + let pending_batch = storage .blocks_dal() .pending_batch_exists() @@ -196,13 +205,9 @@ impl StateKeeper { let (actions_sender, actions_queue) = ActionQueue::new(); Ok(( Self { - last_batch: last_batch.number + if pending_batch { 1 } else { 0 }, - last_block: storage - .blocks_dal() - .get_sealed_miniblock_number() - .await - .context("get_sealed_miniblock_number()")?, - last_timestamp: last_batch.timestamp, + last_batch: last_l1_batch_number + if pending_batch { 1 } else { 0 }, + last_block: last_miniblock_header.number, + last_timestamp: last_miniblock_header.timestamp, batch_sealed: !pending_batch, fee_per_gas: 10, gas_per_pubdata: 100, diff --git a/core/lib/zksync_core/src/state_keeper/io/mempool.rs b/core/lib/zksync_core/src/state_keeper/io/mempool.rs index d9644577a08..20af1b9b221 100644 --- a/core/lib/zksync_core/src/state_keeper/io/mempool.rs +++ b/core/lib/zksync_core/src/state_keeper/io/mempool.rs @@ -415,11 +415,13 @@ impl MempoolIO { ); let mut storage = pool.access_storage_tagged("state_keeper").await.unwrap(); - let last_sealed_l1_batch_header = storage + // TODO (PLA-703): Support no L1 batches / miniblocks in the storage + let last_sealed_l1_batch_number = storage .blocks_dal() - .get_newest_l1_batch_header() + .get_sealed_l1_batch_number() .await - .unwrap(); + .unwrap() + .expect("No L1 batches sealed"); let last_miniblock_number = storage .blocks_dal() .get_sealed_miniblock_number() @@ -435,7 +437,7 @@ impl MempoolIO { timeout_sealer: TimeoutSealer::new(config), filter: L2TxFilter::default(), // ^ Will be initialized properly on the first newly opened batch - current_l1_batch_number: last_sealed_l1_batch_header.number + 1, + current_l1_batch_number: last_sealed_l1_batch_number + 1, miniblock_sealer_handle, current_miniblock_number: last_miniblock_number + 1, fee_account: config.fee_account_addr, diff --git a/core/lib/zksync_core/src/sync_layer/batch_status_updater.rs b/core/lib/zksync_core/src/sync_layer/batch_status_updater.rs deleted file mode 100644 index f3027c1d65e..00000000000 --- a/core/lib/zksync_core/src/sync_layer/batch_status_updater.rs +++ /dev/null @@ -1,365 +0,0 @@ -use std::time::Duration; - -use chrono::{DateTime, Utc}; -use tokio::sync::watch::Receiver; -use zksync_dal::ConnectionPool; -use zksync_types::{ - aggregated_operations::AggregatedActionType, api::BlockDetails, L1BatchNumber, MiniblockNumber, - H256, -}; -use zksync_web3_decl::{ - jsonrpsee::http_client::{HttpClient, HttpClientBuilder}, - namespaces::ZksNamespaceClient, - RpcResult, -}; - -use super::metrics::{FetchStage, L1BatchStage, FETCHER_METRICS}; -use crate::metrics::EN_METRICS; - -/// Represents a change in the batch status. -/// It may be a batch being committed, proven or executed. -#[derive(Debug)] -pub(crate) struct BatchStatusChange { - pub(crate) number: L1BatchNumber, - pub(crate) l1_tx_hash: H256, - pub(crate) happened_at: DateTime, -} - -#[derive(Debug, Default)] -struct StatusChanges { - commit: Vec, - prove: Vec, - execute: Vec, -} - -impl StatusChanges { - fn new() -> Self { - Self::default() - } - - /// Returns true if there are no status changes. - fn is_empty(&self) -> bool { - self.commit.is_empty() && self.prove.is_empty() && self.execute.is_empty() - } -} - -/// Module responsible for fetching the batch status changes, i.e. one that monitors whether the -/// locally applied batch was committed, proven or executed on L1. -/// -/// In essence, it keeps track of the last batch number per status, and periodically polls the main -/// node on these batches in order to see whether the status has changed. If some changes were picked up, -/// the module updates the database to mirror the state observable from the main node. -#[derive(Debug)] -pub struct BatchStatusUpdater { - client: HttpClient, - pool: ConnectionPool, - - last_executed_l1_batch: L1BatchNumber, - last_proven_l1_batch: L1BatchNumber, - last_committed_l1_batch: L1BatchNumber, -} - -impl BatchStatusUpdater { - pub async fn new(main_node_url: &str, pool: ConnectionPool) -> Self { - let client = HttpClientBuilder::default() - .build(main_node_url) - .expect("Unable to create a main node client"); - - let mut storage = pool.access_storage_tagged("sync_layer").await.unwrap(); - let last_executed_l1_batch = storage - .blocks_dal() - .get_number_of_last_l1_batch_executed_on_eth() - .await - .unwrap() - .unwrap_or_default(); - let last_proven_l1_batch = storage - .blocks_dal() - .get_number_of_last_l1_batch_proven_on_eth() - .await - .unwrap() - .unwrap_or_default(); - let last_committed_l1_batch = storage - .blocks_dal() - .get_number_of_last_l1_batch_committed_on_eth() - .await - .unwrap() - .unwrap_or_default(); - drop(storage); - - Self { - client, - pool, - - last_committed_l1_batch, - last_proven_l1_batch, - last_executed_l1_batch, - } - } - - pub async fn run(mut self, stop_receiver: Receiver) -> anyhow::Result<()> { - loop { - if *stop_receiver.borrow() { - tracing::info!("Stop signal received, exiting the batch status updater routine"); - return Ok(()); - } - // Status changes are created externally, so that even if we will receive a network error - // while requesting the changes, we will be able to process what we already fetched. - let mut status_changes = StatusChanges::new(); - if let Err(err) = self.get_status_changes(&mut status_changes).await { - tracing::warn!("Failed to get status changes from the database: {err}"); - }; - - if status_changes.is_empty() { - const DELAY_INTERVAL: Duration = Duration::from_secs(5); - tokio::time::sleep(DELAY_INTERVAL).await; - continue; - } - - self.apply_status_changes(status_changes).await; - } - } - - /// Goes through the already fetched batches trying to update their statuses. - /// Returns a collection of the status updates grouped by the operation type. - /// - /// Fetched changes are capped by the last locally applied batch number, so - /// it's safe to assume that every status change can safely be applied (no status - /// changes "from the future"). - async fn get_status_changes(&self, status_changes: &mut StatusChanges) -> RpcResult<()> { - let total_latency = EN_METRICS.update_batch_statuses.start(); - let last_sealed_batch = self - .pool - .access_storage_tagged("sync_layer") - .await - .unwrap() - .blocks_dal() - .get_newest_l1_batch_header() - .await - .unwrap() - .number; - - let mut last_committed_l1_batch = self.last_committed_l1_batch; - let mut last_proven_l1_batch = self.last_proven_l1_batch; - let mut last_executed_l1_batch = self.last_executed_l1_batch; - - let mut batch = last_executed_l1_batch.next(); - // In this loop we try to progress on the batch statuses, utilizing the same request to the node to potentially - // update all three statuses (e.g. if the node is still syncing), but also skipping the gaps in the statuses - // (e.g. if the last executed batch is 10, but the last proven is 20, we don't need to check the batches 11-19). - while batch <= last_sealed_batch { - // While we may receive `None` for the `self.current_l1_batch`, it's OK: open batch is guaranteed to not - // be sent to L1. - let request_latency = FETCHER_METRICS.requests[&FetchStage::GetMiniblockRange].start(); - let Some((start_miniblock, _)) = self.client.get_miniblock_range(batch).await? else { - return Ok(()); - }; - request_latency.observe(); - - // We could have used any miniblock from the range, all of them share the same info. - let request_latency = FETCHER_METRICS.requests[&FetchStage::GetBlockDetails].start(); - let Some(batch_info) = self - .client - .get_block_details(MiniblockNumber(start_miniblock.as_u32())) - .await? - else { - // We cannot recover from an external API inconsistency. - panic!( - "Node API is inconsistent: miniblock {} was reported to be a part of {} L1 batch, \ - but API has no information about this miniblock", start_miniblock, batch - ); - }; - request_latency.observe(); - - Self::update_committed_batch(status_changes, &batch_info, &mut last_committed_l1_batch); - Self::update_proven_batch(status_changes, &batch_info, &mut last_proven_l1_batch); - Self::update_executed_batch(status_changes, &batch_info, &mut last_executed_l1_batch); - - // Check whether we can skip a part of the range. - if batch_info.base.commit_tx_hash.is_none() { - // No committed batches after this one. - break; - } else if batch_info.base.prove_tx_hash.is_none() && batch < last_committed_l1_batch { - // The interval between this batch and the last committed one is not proven. - batch = last_committed_l1_batch.next(); - } else if batch_info.base.executed_at.is_none() && batch < last_proven_l1_batch { - // The interval between this batch and the last proven one is not executed. - batch = last_proven_l1_batch.next(); - } else { - batch += 1; - } - } - - total_latency.observe(); - Ok(()) - } - - fn update_committed_batch( - status_changes: &mut StatusChanges, - batch_info: &BlockDetails, - last_committed_l1_batch: &mut L1BatchNumber, - ) { - if batch_info.base.commit_tx_hash.is_some() - && batch_info.l1_batch_number == last_committed_l1_batch.next() - { - assert!( - batch_info.base.committed_at.is_some(), - "Malformed API response: batch is committed, but has no commit timestamp" - ); - status_changes.commit.push(BatchStatusChange { - number: batch_info.l1_batch_number, - l1_tx_hash: batch_info.base.commit_tx_hash.unwrap(), - happened_at: batch_info.base.committed_at.unwrap(), - }); - tracing::info!("Batch {}: committed", batch_info.l1_batch_number); - FETCHER_METRICS.l1_batch[&L1BatchStage::Committed] - .set(batch_info.l1_batch_number.0.into()); - *last_committed_l1_batch += 1; - } - } - - fn update_proven_batch( - status_changes: &mut StatusChanges, - batch_info: &BlockDetails, - last_proven_l1_batch: &mut L1BatchNumber, - ) { - if batch_info.base.prove_tx_hash.is_some() - && batch_info.l1_batch_number == last_proven_l1_batch.next() - { - assert!( - batch_info.base.proven_at.is_some(), - "Malformed API response: batch is proven, but has no prove timestamp" - ); - status_changes.prove.push(BatchStatusChange { - number: batch_info.l1_batch_number, - l1_tx_hash: batch_info.base.prove_tx_hash.unwrap(), - happened_at: batch_info.base.proven_at.unwrap(), - }); - tracing::info!("Batch {}: proven", batch_info.l1_batch_number); - FETCHER_METRICS.l1_batch[&L1BatchStage::Proven] - .set(batch_info.l1_batch_number.0.into()); - *last_proven_l1_batch += 1; - } - } - - fn update_executed_batch( - status_changes: &mut StatusChanges, - batch_info: &BlockDetails, - last_executed_l1_batch: &mut L1BatchNumber, - ) { - if batch_info.base.execute_tx_hash.is_some() - && batch_info.l1_batch_number == last_executed_l1_batch.next() - { - assert!( - batch_info.base.executed_at.is_some(), - "Malformed API response: batch is executed, but has no execute timestamp" - ); - status_changes.execute.push(BatchStatusChange { - number: batch_info.l1_batch_number, - l1_tx_hash: batch_info.base.execute_tx_hash.unwrap(), - happened_at: batch_info.base.executed_at.unwrap(), - }); - tracing::info!("Batch {}: executed", batch_info.l1_batch_number); - FETCHER_METRICS.l1_batch[&L1BatchStage::Executed] - .set(batch_info.l1_batch_number.0.into()); - *last_executed_l1_batch += 1; - } - } - - /// Inserts the provided status changes into the database. - /// The status changes are applied to the database by inserting bogus confirmed transactions (with - /// some fields missing/substituted) only to satisfy API needs; this component doesn't expect the updated - /// tables to be ever accessed by the `eth_sender` module. - async fn apply_status_changes(&mut self, changes: StatusChanges) { - let total_latency = EN_METRICS.batch_status_updater_loop_iteration.start(); - let mut connection = self.pool.access_storage_tagged("sync_layer").await.unwrap(); - - let mut transaction = connection.start_transaction().await.unwrap(); - - let last_sealed_batch = transaction - .blocks_dal() - .get_newest_l1_batch_header() - .await - .unwrap() - .number; - - for change in changes.commit.into_iter() { - tracing::info!( - "Commit status change: number {}, hash {}, happened at {}", - change.number, - change.l1_tx_hash, - change.happened_at - ); - - assert!( - change.number <= last_sealed_batch, - "Incorrect update state: unknown batch marked as committed" - ); - - transaction - .eth_sender_dal() - .insert_bogus_confirmed_eth_tx( - change.number, - AggregatedActionType::Commit, - change.l1_tx_hash, - change.happened_at, - ) - .await - .unwrap(); - self.last_committed_l1_batch = change.number; - } - for change in changes.prove.into_iter() { - tracing::info!( - "Prove status change: number {}, hash {}, happened at {}", - change.number, - change.l1_tx_hash, - change.happened_at - ); - - assert!( - change.number <= self.last_committed_l1_batch, - "Incorrect update state: proven batch must be committed" - ); - - transaction - .eth_sender_dal() - .insert_bogus_confirmed_eth_tx( - change.number, - AggregatedActionType::PublishProofOnchain, - change.l1_tx_hash, - change.happened_at, - ) - .await - .unwrap(); - self.last_proven_l1_batch = change.number; - } - for change in changes.execute.into_iter() { - tracing::info!( - "Execute status change: number {}, hash {}, happened at {}", - change.number, - change.l1_tx_hash, - change.happened_at - ); - - assert!( - change.number <= self.last_proven_l1_batch, - "Incorrect update state: executed batch must be proven" - ); - - transaction - .eth_sender_dal() - .insert_bogus_confirmed_eth_tx( - change.number, - AggregatedActionType::Execute, - change.l1_tx_hash, - change.happened_at, - ) - .await - .unwrap(); - self.last_executed_l1_batch = change.number; - } - - transaction.commit().await.unwrap(); - - total_latency.observe(); - } -} diff --git a/core/lib/zksync_core/src/sync_layer/batch_status_updater/mod.rs b/core/lib/zksync_core/src/sync_layer/batch_status_updater/mod.rs new file mode 100644 index 00000000000..4a670349723 --- /dev/null +++ b/core/lib/zksync_core/src/sync_layer/batch_status_updater/mod.rs @@ -0,0 +1,470 @@ +//! Component responsible for updating L1 batch status. + +use std::{fmt, time::Duration}; + +use anyhow::Context as _; +use async_trait::async_trait; +use chrono::{DateTime, Utc}; +#[cfg(test)] +use tokio::sync::mpsc; +use tokio::sync::watch; +use zksync_dal::{ConnectionPool, StorageProcessor}; +use zksync_types::{ + aggregated_operations::AggregatedActionType, api, L1BatchNumber, MiniblockNumber, H256, +}; +use zksync_web3_decl::{ + jsonrpsee::{ + core::ClientError, + http_client::{HttpClient, HttpClientBuilder}, + }, + namespaces::ZksNamespaceClient, +}; + +use super::metrics::{FetchStage, FETCHER_METRICS}; +use crate::{metrics::EN_METRICS, utils::projected_first_l1_batch}; + +#[cfg(test)] +mod tests; + +fn l1_batch_stage_to_action_str(stage: AggregatedActionType) -> &'static str { + match stage { + AggregatedActionType::Commit => "committed", + AggregatedActionType::PublishProofOnchain => "proven", + AggregatedActionType::Execute => "executed", + } +} + +/// Represents a change in the batch status. +/// It may be a batch being committed, proven or executed. +#[derive(Debug)] +struct BatchStatusChange { + number: L1BatchNumber, + l1_tx_hash: H256, + happened_at: DateTime, +} + +#[derive(Debug, Default)] +struct StatusChanges { + commit: Vec, + prove: Vec, + execute: Vec, +} + +impl StatusChanges { + /// Returns true if there are no status changes. + fn is_empty(&self) -> bool { + self.commit.is_empty() && self.prove.is_empty() && self.execute.is_empty() + } +} + +#[derive(Debug, thiserror::Error)] +enum UpdaterError { + #[error("JSON-RPC error communicating with main node")] + Web3(#[from] ClientError), + #[error("Internal error")] + Internal(#[from] anyhow::Error), +} + +impl From for UpdaterError { + fn from(err: zksync_dal::SqlxError) -> Self { + Self::Internal(err.into()) + } +} + +#[async_trait] +trait MainNodeClient: fmt::Debug + Send + Sync { + /// Returns any miniblock in the specified L1 batch. + async fn resolve_l1_batch_to_miniblock( + &self, + number: L1BatchNumber, + ) -> Result, ClientError>; + + async fn block_details( + &self, + number: MiniblockNumber, + ) -> Result, ClientError>; +} + +#[async_trait] +impl MainNodeClient for HttpClient { + async fn resolve_l1_batch_to_miniblock( + &self, + number: L1BatchNumber, + ) -> Result, ClientError> { + let request_latency = FETCHER_METRICS.requests[&FetchStage::GetMiniblockRange].start(); + let number = self + .get_miniblock_range(number) + .await? + .map(|(start, _)| MiniblockNumber(start.as_u32())); + request_latency.observe(); + Ok(number) + } + + async fn block_details( + &self, + number: MiniblockNumber, + ) -> Result, ClientError> { + let request_latency = FETCHER_METRICS.requests[&FetchStage::GetBlockDetails].start(); + let details = self.get_block_details(number).await?; + request_latency.observe(); + Ok(details) + } +} + +/// Cursors for the last executed / proven / committed L1 batch numbers. +#[derive(Debug, Clone, Copy, PartialEq)] +struct UpdaterCursor { + last_executed_l1_batch: L1BatchNumber, + last_proven_l1_batch: L1BatchNumber, + last_committed_l1_batch: L1BatchNumber, +} + +impl UpdaterCursor { + async fn new(storage: &mut StorageProcessor<'_>) -> anyhow::Result { + let first_l1_batch_number = projected_first_l1_batch(storage).await?; + // Use the snapshot L1 batch, or the genesis batch if we are not using a snapshot. Technically, the snapshot L1 batch + // is not necessarily proven / executed yet, but since it and earlier batches are not stored, it serves + // a natural lower boundary for the cursor. + let starting_l1_batch_number = L1BatchNumber(first_l1_batch_number.saturating_sub(1)); + + let last_executed_l1_batch = storage + .blocks_dal() + .get_number_of_last_l1_batch_executed_on_eth() + .await? + .unwrap_or(starting_l1_batch_number); + let last_proven_l1_batch = storage + .blocks_dal() + .get_number_of_last_l1_batch_proven_on_eth() + .await? + .unwrap_or(starting_l1_batch_number); + let last_committed_l1_batch = storage + .blocks_dal() + .get_number_of_last_l1_batch_committed_on_eth() + .await? + .unwrap_or(starting_l1_batch_number); + Ok(Self { + last_executed_l1_batch, + last_proven_l1_batch, + last_committed_l1_batch, + }) + } + + fn extract_tx_hash_and_timestamp( + batch_info: &api::BlockDetails, + stage: AggregatedActionType, + ) -> (Option, Option>) { + match stage { + AggregatedActionType::Commit => { + (batch_info.base.commit_tx_hash, batch_info.base.committed_at) + } + AggregatedActionType::PublishProofOnchain => { + (batch_info.base.prove_tx_hash, batch_info.base.proven_at) + } + AggregatedActionType::Execute => { + (batch_info.base.execute_tx_hash, batch_info.base.executed_at) + } + } + } + + fn update( + &mut self, + status_changes: &mut StatusChanges, + batch_info: &api::BlockDetails, + ) -> anyhow::Result<()> { + for stage in [ + AggregatedActionType::Commit, + AggregatedActionType::PublishProofOnchain, + AggregatedActionType::Execute, + ] { + self.update_stage(status_changes, batch_info, stage)?; + } + Ok(()) + } + + fn update_stage( + &mut self, + status_changes: &mut StatusChanges, + batch_info: &api::BlockDetails, + stage: AggregatedActionType, + ) -> anyhow::Result<()> { + let (l1_tx_hash, happened_at) = Self::extract_tx_hash_and_timestamp(batch_info, stage); + let (last_l1_batch, changes_to_update) = match stage { + AggregatedActionType::Commit => ( + &mut self.last_committed_l1_batch, + &mut status_changes.commit, + ), + AggregatedActionType::PublishProofOnchain => { + (&mut self.last_proven_l1_batch, &mut status_changes.prove) + } + AggregatedActionType::Execute => ( + &mut self.last_executed_l1_batch, + &mut status_changes.execute, + ), + }; + + // Check whether we have all data for the update. + let Some(l1_tx_hash) = l1_tx_hash else { + return Ok(()); + }; + if batch_info.l1_batch_number != last_l1_batch.next() { + return Ok(()); + } + + let action_str = l1_batch_stage_to_action_str(stage); + let happened_at = happened_at.with_context(|| { + format!("Malformed API response: batch is {action_str}, but has no relevant timestamp") + })?; + changes_to_update.push(BatchStatusChange { + number: batch_info.l1_batch_number, + l1_tx_hash, + happened_at, + }); + tracing::info!("Batch {}: {action_str}", batch_info.l1_batch_number); + FETCHER_METRICS.l1_batch[&stage.into()].set(batch_info.l1_batch_number.0.into()); + *last_l1_batch += 1; + Ok(()) + } +} + +/// Component responsible for fetching the batch status changes, i.e. one that monitors whether the +/// locally applied batch was committed, proven or executed on L1. +/// +/// In essence, it keeps track of the last batch number per status, and periodically polls the main +/// node on these batches in order to see whether the status has changed. If some changes were picked up, +/// the module updates the database to mirror the state observable from the main node. This is required for other components +/// (e.g., the API server and the consistency checker) to function properly. E.g., the API server returns commit / prove / execute +/// L1 transaction information in `zks_getBlockDetails` and `zks_getL1BatchDetails` RPC methods. +#[derive(Debug)] +pub struct BatchStatusUpdater { + client: Box, + pool: ConnectionPool, + sleep_interval: Duration, + /// Test-only sender of status changes each time they are produced and applied to the storage. + #[cfg(test)] + changes_sender: mpsc::UnboundedSender, +} + +impl BatchStatusUpdater { + const DEFAULT_SLEEP_INTERVAL: Duration = Duration::from_secs(5); + + pub fn new(main_node_url: &str, pool: ConnectionPool) -> anyhow::Result { + let client = HttpClientBuilder::default() + .build(main_node_url) + .context("Unable to create a main node client")?; + Ok(Self::from_parts( + Box::new(client), + pool, + Self::DEFAULT_SLEEP_INTERVAL, + )) + } + + fn from_parts( + client: Box, + pool: ConnectionPool, + sleep_interval: Duration, + ) -> Self { + Self { + client, + pool, + sleep_interval, + #[cfg(test)] + changes_sender: mpsc::unbounded_channel().0, + } + } + + pub async fn run(self, stop_receiver: watch::Receiver) -> anyhow::Result<()> { + let mut storage = self.pool.access_storage_tagged("sync_layer").await?; + let mut cursor = UpdaterCursor::new(&mut storage).await?; + drop(storage); + tracing::info!("Initialized batch status updater cursor: {cursor:?}"); + + loop { + if *stop_receiver.borrow() { + tracing::info!("Stop signal received, exiting the batch status updater routine"); + return Ok(()); + } + + // Status changes are created externally, so that even if we will receive a network error + // while requesting the changes, we will be able to process what we already fetched. + let mut status_changes = StatusChanges::default(); + // Note that we don't update `cursor` here (it is copied), but rather only in `apply_status_changes`. + match self.get_status_changes(&mut status_changes, cursor).await { + Ok(()) => { /* everything went smoothly */ } + Err(UpdaterError::Web3(err)) => { + tracing::warn!("Failed to get status changes from the main node: {err}"); + } + Err(UpdaterError::Internal(err)) => return Err(err), + } + + if status_changes.is_empty() { + tokio::time::sleep(self.sleep_interval).await; + } else { + self.apply_status_changes(&mut cursor, status_changes) + .await?; + } + } + } + + /// Goes through the already fetched batches trying to update their statuses. + /// + /// Fetched changes are capped by the last locally applied batch number, so + /// it's safe to assume that every status change can safely be applied (no status + /// changes "from the future"). + async fn get_status_changes( + &self, + status_changes: &mut StatusChanges, + mut cursor: UpdaterCursor, + ) -> Result<(), UpdaterError> { + let total_latency = EN_METRICS.update_batch_statuses.start(); + let Some(last_sealed_batch) = self + .pool + .access_storage_tagged("sync_layer") + .await? + .blocks_dal() + .get_sealed_l1_batch_number() + .await? + else { + return Ok(()); // No L1 batches in the storage yet; do nothing. + }; + + let mut batch = cursor.last_executed_l1_batch.next(); + // In this loop we try to progress on the batch statuses, utilizing the same request to the node to potentially + // update all three statuses (e.g. if the node is still syncing), but also skipping the gaps in the statuses + // (e.g. if the last executed batch is 10, but the last proven is 20, we don't need to check the batches 11-19). + while batch <= last_sealed_batch { + // While we may receive `None` for the `self.current_l1_batch`, it's OK: open batch is guaranteed to not + // be sent to L1. + let miniblock_number = self.client.resolve_l1_batch_to_miniblock(batch).await?; + let Some(miniblock_number) = miniblock_number else { + return Ok(()); + }; + + let Some(batch_info) = self.client.block_details(miniblock_number).await? else { + // We cannot recover from an external API inconsistency. + let err = anyhow::anyhow!( + "Node API is inconsistent: miniblock {miniblock_number} was reported to be a part of {batch} L1 batch, \ + but API has no information about this miniblock", + ); + return Err(err.into()); + }; + + cursor.update(status_changes, &batch_info)?; + + // Check whether we can skip a part of the range. + if batch_info.base.commit_tx_hash.is_none() { + // No committed batches after this one. + break; + } else if batch_info.base.prove_tx_hash.is_none() + && batch < cursor.last_committed_l1_batch + { + // The interval between this batch and the last committed one is not proven. + batch = cursor.last_committed_l1_batch.next(); + } else if batch_info.base.executed_at.is_none() && batch < cursor.last_proven_l1_batch { + // The interval between this batch and the last proven one is not executed. + batch = cursor.last_proven_l1_batch.next(); + } else { + batch += 1; + } + } + + total_latency.observe(); + Ok(()) + } + + /// Inserts the provided status changes into the database. + /// The status changes are applied to the database by inserting bogus confirmed transactions (with + /// some fields missing/substituted) only to satisfy API needs; this component doesn't expect the updated + /// tables to be ever accessed by the `eth_sender` module. + async fn apply_status_changes( + &self, + cursor: &mut UpdaterCursor, + changes: StatusChanges, + ) -> anyhow::Result<()> { + let total_latency = EN_METRICS.batch_status_updater_loop_iteration.start(); + let mut connection = self.pool.access_storage_tagged("sync_layer").await?; + let mut transaction = connection.start_transaction().await?; + let last_sealed_batch = transaction + .blocks_dal() + .get_sealed_l1_batch_number() + .await? + .context("L1 batches disappeared from Postgres")?; + + for change in &changes.commit { + tracing::info!( + "Commit status change: number {}, hash {}, happened at {}", + change.number, + change.l1_tx_hash, + change.happened_at + ); + anyhow::ensure!( + change.number <= last_sealed_batch, + "Incorrect update state: unknown batch marked as committed" + ); + + transaction + .eth_sender_dal() + .insert_bogus_confirmed_eth_tx( + change.number, + AggregatedActionType::Commit, + change.l1_tx_hash, + change.happened_at, + ) + .await?; + cursor.last_committed_l1_batch = change.number; + } + + for change in &changes.prove { + tracing::info!( + "Prove status change: number {}, hash {}, happened at {}", + change.number, + change.l1_tx_hash, + change.happened_at + ); + anyhow::ensure!( + change.number <= cursor.last_committed_l1_batch, + "Incorrect update state: proven batch must be committed" + ); + + transaction + .eth_sender_dal() + .insert_bogus_confirmed_eth_tx( + change.number, + AggregatedActionType::PublishProofOnchain, + change.l1_tx_hash, + change.happened_at, + ) + .await?; + cursor.last_proven_l1_batch = change.number; + } + + for change in &changes.execute { + tracing::info!( + "Execute status change: number {}, hash {}, happened at {}", + change.number, + change.l1_tx_hash, + change.happened_at + ); + anyhow::ensure!( + change.number <= cursor.last_proven_l1_batch, + "Incorrect update state: executed batch must be proven" + ); + + transaction + .eth_sender_dal() + .insert_bogus_confirmed_eth_tx( + change.number, + AggregatedActionType::Execute, + change.l1_tx_hash, + change.happened_at, + ) + .await?; + cursor.last_executed_l1_batch = change.number; + } + transaction.commit().await?; + total_latency.observe(); + + #[cfg(test)] + self.changes_sender.send(changes).ok(); + Ok(()) + } +} diff --git a/core/lib/zksync_core/src/sync_layer/batch_status_updater/tests.rs b/core/lib/zksync_core/src/sync_layer/batch_status_updater/tests.rs new file mode 100644 index 00000000000..7ca6e73c37c --- /dev/null +++ b/core/lib/zksync_core/src/sync_layer/batch_status_updater/tests.rs @@ -0,0 +1,442 @@ +//! Tests for batch status updater. + +use std::{future, sync::Arc}; + +use chrono::TimeZone; +use test_casing::{test_casing, Product}; +use tokio::sync::{watch, Mutex}; +use zksync_contracts::BaseSystemContractsHashes; +use zksync_types::{block::BlockGasCount, Address, L2ChainId, ProtocolVersionId}; + +use super::*; +use crate::{ + genesis::{ensure_genesis_state, GenesisParams}, + sync_layer::metrics::L1BatchStage, + utils::testonly::{create_l1_batch, create_miniblock, prepare_empty_recovery_snapshot}, +}; + +async fn seal_l1_batch(storage: &mut StorageProcessor<'_>, number: L1BatchNumber) { + let mut storage = storage.start_transaction().await.unwrap(); + // Insert a mock miniblock so that `get_block_details()` will return values. + let miniblock = create_miniblock(number.0); + storage + .blocks_dal() + .insert_miniblock(&miniblock) + .await + .unwrap(); + + let l1_batch = create_l1_batch(number.0); + storage + .blocks_dal() + .insert_l1_batch(&l1_batch, &[], BlockGasCount::default(), &[], &[], 0) + .await + .unwrap(); + storage + .blocks_dal() + .mark_miniblocks_as_executed_in_l1_batch(number) + .await + .unwrap(); + storage.commit().await.unwrap(); +} + +/// Mapping `L1BatchNumber` -> `L1BatchStage` for a continuous range of numbers. +#[derive(Debug, Clone, Default, PartialEq)] +struct L1BatchStagesMap { + first_batch_number: L1BatchNumber, + stages: Vec, +} + +impl L1BatchStagesMap { + fn empty(first_batch_number: L1BatchNumber, len: usize) -> Self { + Self { + first_batch_number, + stages: vec![L1BatchStage::Open; len], + } + } + + fn new(first_batch_number: L1BatchNumber, stages: Vec) -> Self { + assert!(stages.windows(2).all(|window| { + let [prev, next] = window else { unreachable!() }; + prev >= next + })); + Self { + first_batch_number, + stages, + } + } + + fn get(&self, number: L1BatchNumber) -> Option { + let Some(index) = number.0.checked_sub(self.first_batch_number.0) else { + return None; + }; + self.stages.get(index as usize).copied() + } + + fn iter(&self) -> impl Iterator + '_ { + self.stages + .iter() + .enumerate() + .map(|(i, &stage)| (self.first_batch_number + i as u32, stage)) + } + + fn update(&mut self, changes: &StatusChanges) { + self.update_to_stage(&changes.commit, L1BatchStage::Committed); + self.update_to_stage(&changes.prove, L1BatchStage::Proven); + self.update_to_stage(&changes.execute, L1BatchStage::Executed); + } + + fn update_to_stage(&mut self, batch_changes: &[BatchStatusChange], target: L1BatchStage) { + for change in batch_changes { + let number = change.number; + let index = number + .0 + .checked_sub(self.first_batch_number.0) + .unwrap_or_else(|| panic!("stage is missing for L1 batch #{number}")); + let stage = self + .stages + .get_mut(index as usize) + .unwrap_or_else(|| panic!("stage is missing for L1 batch #{number}")); + assert!( + *stage < target, + "Invalid update for L1 batch #{number}: {stage:?} -> {target:?}" + ); + *stage = target; + } + } + + async fn assert_storage(&self, storage: &mut StorageProcessor<'_>) { + for (number, stage) in self.iter() { + let local_details = storage + .blocks_web3_dal() + .get_block_details(MiniblockNumber(number.0), Address::zero()) + .await + .unwrap() + .unwrap_or_else(|| panic!("no details for block #{number}")); + let expected_details = mock_block_details(number.0, stage); + + assert_eq!( + local_details.base.commit_tx_hash, + expected_details.base.commit_tx_hash + ); + assert_eq!( + local_details.base.committed_at, + expected_details.base.committed_at + ); + assert_eq!( + local_details.base.prove_tx_hash, + expected_details.base.prove_tx_hash + ); + assert_eq!( + local_details.base.proven_at, + expected_details.base.proven_at + ); + assert_eq!( + local_details.base.execute_tx_hash, + expected_details.base.execute_tx_hash + ); + assert_eq!( + local_details.base.executed_at, + expected_details.base.executed_at + ); + } + } +} + +fn mock_block_details(number: u32, stage: L1BatchStage) -> api::BlockDetails { + api::BlockDetails { + number: MiniblockNumber(number), + l1_batch_number: L1BatchNumber(number), + base: api::BlockDetailsBase { + timestamp: number.into(), + l1_tx_count: 0, + l2_tx_count: 0, + root_hash: Some(H256::zero()), + status: api::BlockStatus::Sealed, + commit_tx_hash: (stage >= L1BatchStage::Committed).then(|| H256::repeat_byte(1)), + committed_at: (stage >= L1BatchStage::Committed) + .then(|| Utc.timestamp_opt(100, 0).unwrap()), + prove_tx_hash: (stage >= L1BatchStage::Proven).then(|| H256::repeat_byte(2)), + proven_at: (stage >= L1BatchStage::Proven).then(|| Utc.timestamp_opt(200, 0).unwrap()), + execute_tx_hash: (stage >= L1BatchStage::Executed).then(|| H256::repeat_byte(3)), + executed_at: (stage >= L1BatchStage::Executed) + .then(|| Utc.timestamp_opt(300, 0).unwrap()), + l1_gas_price: 1, + l2_fair_gas_price: 2, + base_system_contracts_hashes: BaseSystemContractsHashes::default(), + }, + operator_address: Address::zero(), + protocol_version: Some(ProtocolVersionId::default()), + } +} + +#[derive(Debug, Default)] +struct MockMainNodeClient(Arc>); + +impl From for MockMainNodeClient { + fn from(map: L1BatchStagesMap) -> Self { + Self(Arc::new(Mutex::new(map))) + } +} + +#[async_trait] +impl MainNodeClient for MockMainNodeClient { + async fn resolve_l1_batch_to_miniblock( + &self, + number: L1BatchNumber, + ) -> Result, ClientError> { + let map = self.0.lock().await; + Ok(map + .get(number) + .is_some() + .then_some(MiniblockNumber(number.0))) + } + + async fn block_details( + &self, + number: MiniblockNumber, + ) -> Result, ClientError> { + let map = self.0.lock().await; + let Some(stage) = map.get(L1BatchNumber(number.0)) else { + return Ok(None); + }; + Ok(Some(mock_block_details(number.0, stage))) + } +} + +fn mock_change(number: L1BatchNumber) -> BatchStatusChange { + BatchStatusChange { + number, + l1_tx_hash: H256::zero(), + happened_at: DateTime::default(), + } +} + +fn mock_updater( + client: MockMainNodeClient, + pool: ConnectionPool, +) -> (BatchStatusUpdater, mpsc::UnboundedReceiver) { + let (changes_sender, changes_receiver) = mpsc::unbounded_channel(); + let mut updater = + BatchStatusUpdater::from_parts(Box::new(client), pool, Duration::from_millis(10)); + updater.changes_sender = changes_sender; + (updater, changes_receiver) +} + +#[tokio::test] +async fn updater_cursor_for_storage_with_genesis_block() { + let pool = ConnectionPool::test_pool().await; + let mut storage = pool.access_storage().await.unwrap(); + ensure_genesis_state(&mut storage, L2ChainId::default(), &GenesisParams::mock()) + .await + .unwrap(); + for number in [1, 2] { + seal_l1_batch(&mut storage, L1BatchNumber(number)).await; + } + + let mut cursor = UpdaterCursor::new(&mut storage).await.unwrap(); + assert_eq!(cursor.last_committed_l1_batch, L1BatchNumber(0)); + assert_eq!(cursor.last_proven_l1_batch, L1BatchNumber(0)); + assert_eq!(cursor.last_executed_l1_batch, L1BatchNumber(0)); + + let (updater, _) = mock_updater(MockMainNodeClient::default(), pool.clone()); + let changes = StatusChanges { + commit: vec![mock_change(L1BatchNumber(1)), mock_change(L1BatchNumber(2))], + prove: vec![mock_change(L1BatchNumber(1))], + execute: vec![], + }; + updater + .apply_status_changes(&mut cursor, changes) + .await + .unwrap(); + + assert_eq!(cursor.last_committed_l1_batch, L1BatchNumber(2)); + assert_eq!(cursor.last_proven_l1_batch, L1BatchNumber(1)); + assert_eq!(cursor.last_executed_l1_batch, L1BatchNumber(0)); + + let restored_cursor = UpdaterCursor::new(&mut storage).await.unwrap(); + assert_eq!(restored_cursor, cursor); +} + +#[tokio::test] +async fn updater_cursor_after_snapshot_recovery() { + let pool = ConnectionPool::test_pool().await; + let mut storage = pool.access_storage().await.unwrap(); + prepare_empty_recovery_snapshot(&mut storage, 23).await; + + let cursor = UpdaterCursor::new(&mut storage).await.unwrap(); + assert_eq!(cursor.last_committed_l1_batch, L1BatchNumber(23)); + assert_eq!(cursor.last_proven_l1_batch, L1BatchNumber(23)); + assert_eq!(cursor.last_executed_l1_batch, L1BatchNumber(23)); +} + +#[test_casing(4, Product(([false, true], [false, true])))] +#[tokio::test] +async fn normal_updater_operation(snapshot_recovery: bool, async_batches: bool) { + let pool = ConnectionPool::test_pool().await; + let mut storage = pool.access_storage().await.unwrap(); + let first_batch_number = if snapshot_recovery { + prepare_empty_recovery_snapshot(&mut storage, 23).await; + L1BatchNumber(24) + } else { + ensure_genesis_state(&mut storage, L2ChainId::default(), &GenesisParams::mock()) + .await + .unwrap(); + L1BatchNumber(1) + }; + + let target_batch_stages = L1BatchStagesMap::new( + first_batch_number, + vec![ + L1BatchStage::Executed, + L1BatchStage::Proven, + L1BatchStage::Proven, + L1BatchStage::Committed, + L1BatchStage::Committed, + L1BatchStage::Open, + ], + ); + let batch_numbers: Vec<_> = target_batch_stages + .iter() + .map(|(number, _)| number) + .collect(); + + if !async_batches { + // Make all L1 batches present in the storage from the start. + for &number in &batch_numbers { + seal_l1_batch(&mut storage, number).await; + } + } + + let client = MockMainNodeClient::from(target_batch_stages.clone()); + let (updater, mut changes_receiver) = mock_updater(client, pool.clone()); + let (stop_sender, stop_receiver) = watch::channel(false); + let updater_task = tokio::spawn(updater.run(stop_receiver)); + + let batches_task = if async_batches { + let pool = pool.clone(); + tokio::spawn(async move { + let mut storage = pool.access_storage().await.unwrap(); + for &number in &batch_numbers { + seal_l1_batch(&mut storage, number).await; + tokio::time::sleep(Duration::from_millis(15)).await; + } + }) + } else { + tokio::spawn(future::ready(())) + }; + + let mut observed_batch_stages = + L1BatchStagesMap::empty(first_batch_number, target_batch_stages.stages.len()); + loop { + let changes = changes_receiver.recv().await.unwrap(); + observed_batch_stages.update(&changes); + if observed_batch_stages == target_batch_stages { + break; + } + } + + batches_task.await.unwrap(); + target_batch_stages.assert_storage(&mut storage).await; + stop_sender.send_replace(true); + updater_task.await.unwrap().expect("updater failed"); +} + +#[test_casing(2, [false, true])] +#[tokio::test] +async fn updater_with_gradual_main_node_updates(snapshot_recovery: bool) { + let pool = ConnectionPool::test_pool().await; + let mut storage = pool.access_storage().await.unwrap(); + let first_batch_number = if snapshot_recovery { + prepare_empty_recovery_snapshot(&mut storage, 23).await; + L1BatchNumber(24) + } else { + ensure_genesis_state(&mut storage, L2ChainId::default(), &GenesisParams::mock()) + .await + .unwrap(); + L1BatchNumber(1) + }; + + let target_batch_stages = L1BatchStagesMap::new( + first_batch_number, + vec![ + L1BatchStage::Executed, + L1BatchStage::Proven, + L1BatchStage::Proven, + L1BatchStage::Committed, + L1BatchStage::Committed, + L1BatchStage::Open, + ], + ); + let mut observed_batch_stages = + L1BatchStagesMap::empty(first_batch_number, target_batch_stages.stages.len()); + + for (number, _) in target_batch_stages.iter() { + seal_l1_batch(&mut storage, number).await; + } + + let client = MockMainNodeClient::from(observed_batch_stages.clone()); + + // Gradually update information provided by the main node. + let client_map = Arc::clone(&client.0); + let final_stages = target_batch_stages.clone(); + let storage_task = tokio::spawn(async move { + for max_stage in [ + L1BatchStage::Committed, + L1BatchStage::Proven, + L1BatchStage::Executed, + ] { + let mut client_map = client_map.lock().await; + for (stage, &final_stage) in client_map.stages.iter_mut().zip(&final_stages.stages) { + *stage = final_stage.min(max_stage); + } + drop(client_map); + tokio::time::sleep(Duration::from_millis(15)).await; + } + }); + + let (updater, mut changes_receiver) = mock_updater(client, pool.clone()); + let (stop_sender, stop_receiver) = watch::channel(false); + let updater_task = tokio::spawn(updater.run(stop_receiver)); + + loop { + let changes = changes_receiver.recv().await.unwrap(); + observed_batch_stages.update(&changes); + if observed_batch_stages == target_batch_stages { + break; + } + } + + storage_task.await.unwrap(); + target_batch_stages.assert_storage(&mut storage).await; + stop_sender.send_replace(true); + updater_task.await.unwrap().expect("updater failed"); + + drop(storage); + test_resuming_updater(pool, target_batch_stages).await; +} + +async fn test_resuming_updater(pool: ConnectionPool, initial_batch_stages: L1BatchStagesMap) { + let target_batch_stages = L1BatchStagesMap::new( + initial_batch_stages.first_batch_number, + vec![L1BatchStage::Executed; 6], + ); + + let client = MockMainNodeClient::from(target_batch_stages.clone()); + let (updater, mut changes_receiver) = mock_updater(client, pool.clone()); + let (stop_sender, stop_receiver) = watch::channel(false); + let updater_task = tokio::spawn(updater.run(stop_receiver)); + + let mut observed_batch_stages = initial_batch_stages; + loop { + let changes = changes_receiver.recv().await.unwrap(); + observed_batch_stages.update(&changes); + if observed_batch_stages == target_batch_stages { + break; + } + } + + let mut storage = pool.access_storage().await.unwrap(); + target_batch_stages.assert_storage(&mut storage).await; + stop_sender.send_replace(true); + updater_task.await.unwrap().expect("updater failed"); +} diff --git a/core/lib/zksync_core/src/sync_layer/external_io.rs b/core/lib/zksync_core/src/sync_layer/external_io.rs index b082d4cbc79..0b5bef237b6 100644 --- a/core/lib/zksync_core/src/sync_layer/external_io.rs +++ b/core/lib/zksync_core/src/sync_layer/external_io.rs @@ -71,11 +71,13 @@ impl ExternalIO { chain_id: L2ChainId, ) -> Self { let mut storage = pool.access_storage_tagged("sync_layer").await.unwrap(); - let last_sealed_l1_batch_header = storage + // TODO (PLA-703): Support no L1 batches / miniblocks in the storage + let last_sealed_l1_batch_number = storage .blocks_dal() - .get_newest_l1_batch_header() + .get_sealed_l1_batch_number() .await - .unwrap(); + .unwrap() + .expect("No L1 batches sealed"); let last_miniblock_number = storage .blocks_dal() .get_sealed_miniblock_number() @@ -85,7 +87,7 @@ impl ExternalIO { tracing::info!( "Initialized the ExternalIO: current L1 batch number {}, current miniblock number {}", - last_sealed_l1_batch_header.number + 1, + last_sealed_l1_batch_number + 1, last_miniblock_number + 1, ); @@ -94,7 +96,7 @@ impl ExternalIO { Self { miniblock_sealer_handle, pool, - current_l1_batch_number: last_sealed_l1_batch_header.number + 1, + current_l1_batch_number: last_sealed_l1_batch_number + 1, current_miniblock_number: last_miniblock_number + 1, actions, sync_state, diff --git a/core/lib/zksync_core/src/sync_layer/fetcher.rs b/core/lib/zksync_core/src/sync_layer/fetcher.rs index ca7a483ce83..1f4f7bea810 100644 --- a/core/lib/zksync_core/src/sync_layer/fetcher.rs +++ b/core/lib/zksync_core/src/sync_layer/fetcher.rs @@ -80,11 +80,13 @@ pub struct FetcherCursor { impl FetcherCursor { /// Loads the cursor from Postgres. pub async fn new(storage: &mut StorageProcessor<'_>) -> anyhow::Result { - let last_sealed_l1_batch_header = storage + // TODO (PLA-703): Support no L1 batches / miniblocks in the storage + let last_sealed_l1_batch_number = storage .blocks_dal() - .get_newest_l1_batch_header() + .get_sealed_l1_batch_number() .await - .context("Failed getting newest L1 batch header")?; + .context("Failed getting sealed L1 batch number")? + .context("No L1 batches sealed")?; let last_miniblock_header = storage .blocks_dal() .get_last_sealed_miniblock_header() @@ -106,10 +108,10 @@ impl FetcherCursor { // Decide whether the next batch should be explicitly opened or not. let l1_batch = if was_new_batch_open { // No `OpenBatch` action needed. - last_sealed_l1_batch_header.number + 1 + last_sealed_l1_batch_number + 1 } else { // We need to open the next batch. - last_sealed_l1_batch_header.number + last_sealed_l1_batch_number }; Ok(Self { diff --git a/core/lib/zksync_core/src/sync_layer/metrics.rs b/core/lib/zksync_core/src/sync_layer/metrics.rs index 3a431294b25..51c569b7fcc 100644 --- a/core/lib/zksync_core/src/sync_layer/metrics.rs +++ b/core/lib/zksync_core/src/sync_layer/metrics.rs @@ -3,6 +3,7 @@ use std::time::Duration; use vise::{Buckets, Counter, EncodeLabelSet, EncodeLabelValue, Family, Gauge, Histogram, Metrics}; +use zksync_types::aggregated_operations::AggregatedActionType; #[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, EncodeLabelValue, EncodeLabelSet)] #[metrics(label = "stage", rename_all = "snake_case")] @@ -12,7 +13,9 @@ pub(super) enum FetchStage { SyncL2Block, } -#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, EncodeLabelValue, EncodeLabelSet)] +#[derive( + Debug, Clone, Copy, PartialEq, Eq, Hash, PartialOrd, Ord, EncodeLabelValue, EncodeLabelSet, +)] #[metrics(label = "stage", rename_all = "snake_case")] pub(super) enum L1BatchStage { Open, @@ -21,6 +24,16 @@ pub(super) enum L1BatchStage { Executed, } +impl From for L1BatchStage { + fn from(ty: AggregatedActionType) -> Self { + match ty { + AggregatedActionType::Commit => Self::Committed, + AggregatedActionType::PublishProofOnchain => Self::Proven, + AggregatedActionType::Execute => Self::Executed, + } + } +} + #[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, EncodeLabelValue, EncodeLabelSet)] #[metrics(label = "method", rename_all = "snake_case")] pub(super) enum CachedMethod { diff --git a/core/lib/zksync_core/src/utils/mod.rs b/core/lib/zksync_core/src/utils/mod.rs index e740c8f0a31..7d919d31f88 100644 --- a/core/lib/zksync_core/src/utils/mod.rs +++ b/core/lib/zksync_core/src/utils/mod.rs @@ -2,8 +2,9 @@ use std::time::Duration; +use anyhow::Context as _; use tokio::sync::watch; -use zksync_dal::ConnectionPool; +use zksync_dal::{ConnectionPool, StorageProcessor}; use zksync_types::L1BatchNumber; #[cfg(test)] @@ -73,6 +74,19 @@ pub(crate) async fn wait_for_l1_batch_with_metadata( } } +/// Returns the projected number of the first locally available L1 batch. The L1 batch is **not** +/// guaranteed to be present in the storage! +pub(crate) async fn projected_first_l1_batch( + storage: &mut StorageProcessor<'_>, +) -> anyhow::Result { + let snapshot_recovery = storage + .snapshot_recovery_dal() + .get_applied_snapshot_status() + .await + .context("failed getting snapshot recovery status")?; + Ok(snapshot_recovery.map_or(L1BatchNumber(0), |recovery| recovery.l1_batch_number + 1)) +} + #[cfg(test)] mod tests { use zksync_types::L2ChainId; diff --git a/core/lib/zksync_core/src/utils/testonly.rs b/core/lib/zksync_core/src/utils/testonly.rs index 7e868a15c1c..c84754f7cd3 100644 --- a/core/lib/zksync_core/src/utils/testonly.rs +++ b/core/lib/zksync_core/src/utils/testonly.rs @@ -1,6 +1,7 @@ //! Test utils. use zksync_contracts::BaseSystemContractsHashes; +use zksync_dal::StorageProcessor; use zksync_system_constants::ZKPORTER_IS_AVAILABLE; use zksync_types::{ block::{L1BatchHeader, MiniblockHeader}, @@ -8,8 +9,10 @@ use zksync_types::{ fee::Fee, fee_model::BatchFeeInput, l2::L2Tx, + snapshots::SnapshotRecoveryStatus, transaction_request::PaymasterParams, - Address, L1BatchNumber, L2ChainId, MiniblockNumber, Nonce, ProtocolVersionId, H256, U256, + Address, L1BatchNumber, L2ChainId, MiniblockNumber, Nonce, ProtocolVersion, ProtocolVersionId, + H256, U256, }; /// Creates a miniblock header with the specified number and deterministic contents. @@ -92,3 +95,28 @@ pub(crate) fn create_l2_transaction(fee_per_gas: u64, gas_per_pubdata: u32) -> L tx.set_input(H256::random().0.to_vec(), H256::random()); tx } + +pub(crate) async fn prepare_empty_recovery_snapshot( + storage: &mut StorageProcessor<'_>, + l1_batch_number: u32, +) -> SnapshotRecoveryStatus { + storage + .protocol_versions_dal() + .save_protocol_version_with_tx(ProtocolVersion::default()) + .await; + + let snapshot_recovery = SnapshotRecoveryStatus { + l1_batch_number: l1_batch_number.into(), + l1_batch_root_hash: H256::zero(), + miniblock_number: l1_batch_number.into(), + miniblock_root_hash: H256::zero(), // not used + last_finished_chunk_id: None, + total_chunk_count: 100, + }; + storage + .snapshot_recovery_dal() + .set_applied_snapshot_status(&snapshot_recovery) + .await + .unwrap(); + snapshot_recovery +}