From d161ddcb2e67b886e5d8c76eabb758373c5d79c0 Mon Sep 17 00:00:00 2001 From: Daniyar Itegulov Date: Mon, 6 May 2024 19:38:13 +1000 Subject: [PATCH 01/12] add basic vm runner output handler --- Cargo.lock | 1 + Cargo.toml | 1 + core/lib/zksync_core/Cargo.toml | 1 + .../src/state_keeper/io/output_handler.rs | 10 +- .../src/state_keeper/io/persistence.rs | 6 +- .../zksync_core/src/state_keeper/keeper.rs | 4 +- .../zksync_core/src/sync_layer/sync_state.rs | 5 +- core/lib/zksync_core/src/vm_runner/mod.rs | 4 + .../src/vm_runner/output_handler.rs | 199 ++++++++++++++++++ core/lib/zksync_core/src/vm_runner/storage.rs | 6 + 10 files changed, 230 insertions(+), 7 deletions(-) create mode 100644 core/lib/zksync_core/src/vm_runner/output_handler.rs diff --git a/Cargo.lock b/Cargo.lock index 590e715a90e..7f2257c3535 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -8416,6 +8416,7 @@ dependencies = [ "backon", "chrono", "ctrlc", + "dashmap", "futures 0.3.28", "governor", "hex", diff --git a/Cargo.toml b/Cargo.toml index d2e935b3c6f..87b7e6a7052 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -87,6 +87,7 @@ clap = "4.2.2" codegen = "0.2.0" criterion = "0.4.0" ctrlc = "3.1" +dashmap = "5.5.3" derive_more = "=1.0.0-beta.6" envy = "0.4" ethabi = "18.0.0" diff --git a/core/lib/zksync_core/Cargo.toml b/core/lib/zksync_core/Cargo.toml index e1ba5f66c49..3424b150fef 100644 --- a/core/lib/zksync_core/Cargo.toml +++ b/core/lib/zksync_core/Cargo.toml @@ -90,6 +90,7 @@ axum = { workspace = true, features = [ "tokio", ] } once_cell.workspace = true +dashmap.workspace = true tracing.workspace = true diff --git a/core/lib/zksync_core/src/state_keeper/io/output_handler.rs b/core/lib/zksync_core/src/state_keeper/io/output_handler.rs index d9a6797c0e9..5b47b29bea0 100644 --- a/core/lib/zksync_core/src/state_keeper/io/output_handler.rs +++ b/core/lib/zksync_core/src/state_keeper/io/output_handler.rs @@ -1,6 +1,7 @@ //! Handling outputs produced by the state keeper. use std::fmt; +use std::sync::Arc; use anyhow::Context as _; use async_trait::async_trait; @@ -20,7 +21,10 @@ pub trait StateKeeperOutputHandler: 'static + Send + fmt::Debug { async fn handle_l2_block(&mut self, updates_manager: &UpdatesManager) -> anyhow::Result<()>; /// Handles an L1 batch produced by the state keeper. - async fn handle_l1_batch(&mut self, _updates_manager: &UpdatesManager) -> anyhow::Result<()> { + async fn handle_l1_batch( + &mut self, + _updates_manager: Arc, + ) -> anyhow::Result<()> { Ok(()) } } @@ -81,11 +85,11 @@ impl OutputHandler { pub(crate) async fn handle_l1_batch( &mut self, - updates_manager: &UpdatesManager, + updates_manager: Arc, ) -> anyhow::Result<()> { for handler in &mut self.inner { handler - .handle_l1_batch(updates_manager) + .handle_l1_batch(updates_manager.clone()) .await .with_context(|| { format!( diff --git a/core/lib/zksync_core/src/state_keeper/io/persistence.rs b/core/lib/zksync_core/src/state_keeper/io/persistence.rs index 65028906c54..2cdabce2283 100644 --- a/core/lib/zksync_core/src/state_keeper/io/persistence.rs +++ b/core/lib/zksync_core/src/state_keeper/io/persistence.rs @@ -1,5 +1,6 @@ //! State keeper persistence logic. +use std::sync::Arc; use std::time::Instant; use anyhow::Context as _; @@ -155,7 +156,10 @@ impl StateKeeperOutputHandler for StateKeeperPersistence { Ok(()) } - async fn handle_l1_batch(&mut self, updates_manager: &UpdatesManager) -> anyhow::Result<()> { + async fn handle_l1_batch( + &mut self, + updates_manager: Arc, + ) -> anyhow::Result<()> { // We cannot start sealing an L1 batch until we've sealed all L2 blocks included in it. self.wait_for_all_commands().await; diff --git a/core/lib/zksync_core/src/state_keeper/keeper.rs b/core/lib/zksync_core/src/state_keeper/keeper.rs index 0ebac71db87..91ba523e7f6 100644 --- a/core/lib/zksync_core/src/state_keeper/keeper.rs +++ b/core/lib/zksync_core/src/state_keeper/keeper.rs @@ -176,8 +176,9 @@ impl ZkSyncStateKeeper { let finished_batch = batch_executor.finish_batch().await; let sealed_batch_protocol_version = updates_manager.protocol_version(); updates_manager.finish_batch(finished_batch); + let mut next_cursor = updates_manager.io_cursor(); self.output_handler - .handle_l1_batch(&updates_manager) + .handle_l1_batch(Arc::new(updates_manager)) .await .with_context(|| format!("failed sealing L1 batch {l1_batch_env:?}"))?; @@ -187,7 +188,6 @@ impl ZkSyncStateKeeper { l1_batch_seal_delta = Some(Instant::now()); // Start the new batch. - let mut next_cursor = updates_manager.io_cursor(); next_cursor.l1_batch += 1; (system_env, l1_batch_env) = self.wait_for_new_batch_env(&next_cursor).await?; updates_manager = UpdatesManager::new(&l1_batch_env, &system_env); diff --git a/core/lib/zksync_core/src/sync_layer/sync_state.rs b/core/lib/zksync_core/src/sync_layer/sync_state.rs index 68d5e4be51c..bd01e90cd43 100644 --- a/core/lib/zksync_core/src/sync_layer/sync_state.rs +++ b/core/lib/zksync_core/src/sync_layer/sync_state.rs @@ -120,7 +120,10 @@ impl StateKeeperOutputHandler for SyncState { Ok(()) } - async fn handle_l1_batch(&mut self, updates_manager: &UpdatesManager) -> anyhow::Result<()> { + async fn handle_l1_batch( + &mut self, + updates_manager: Arc, + ) -> anyhow::Result<()> { let sealed_block_number = updates_manager.l2_block.number; self.set_local_block(sealed_block_number); Ok(()) diff --git a/core/lib/zksync_core/src/vm_runner/mod.rs b/core/lib/zksync_core/src/vm_runner/mod.rs index 01eb3541fca..004edef1d40 100644 --- a/core/lib/zksync_core/src/vm_runner/mod.rs +++ b/core/lib/zksync_core/src/vm_runner/mod.rs @@ -1,6 +1,10 @@ +mod output_handler; mod storage; #[cfg(test)] mod tests; +pub use output_handler::{ + ConcurrentOutputHandlerFactory, ConcurrentOutputHandlerFactoryTask, OutputHandlerFactory, +}; pub use storage::{BatchExecuteData, VmRunnerStorage, VmRunnerStorageLoader}; diff --git a/core/lib/zksync_core/src/vm_runner/output_handler.rs b/core/lib/zksync_core/src/vm_runner/output_handler.rs new file mode 100644 index 00000000000..6ad512af025 --- /dev/null +++ b/core/lib/zksync_core/src/vm_runner/output_handler.rs @@ -0,0 +1,199 @@ +use crate::state_keeper::updates::UpdatesManager; +use crate::state_keeper::StateKeeperOutputHandler; +use crate::vm_runner::VmRunnerStorageLoader; +use async_trait::async_trait; +use dashmap::DashMap; +use futures::future::BoxFuture; +use std::fmt::{Debug, Formatter}; +use std::sync::Arc; +use std::time::Duration; +use tokio::sync::{oneshot, watch}; +use zksync_dal::Core; +use zksync_db_connection::connection_pool::ConnectionPool; +use zksync_types::L1BatchNumber; + +#[async_trait] +pub trait OutputHandlerFactory: Debug + Send { + async fn create_handler( + &mut self, + l1_batch_number: L1BatchNumber, + ) -> anyhow::Result>; +} + +pub struct ConcurrentOutputHandlerFactory { + pool: ConnectionPool, + state: Arc>>>>, + loader: L, + factory: F, +} + +impl Debug + for ConcurrentOutputHandlerFactory +{ + fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { + f.debug_struct("ConcurrentOutputHandlerFactory") + .field("pool", &self.pool) + .field("loader", &self.loader) + .field("factory", &self.factory) + .finish() + } +} + +impl + ConcurrentOutputHandlerFactory +{ + pub fn new( + pool: ConnectionPool, + loader: L, + factory: F, + ) -> (Self, ConcurrentOutputHandlerFactoryTask) { + let state = Arc::new(DashMap::new()); + let task = ConcurrentOutputHandlerFactoryTask { + pool: pool.clone(), + loader: loader.clone(), + state: state.clone(), + }; + ( + Self { + pool, + state, + loader, + factory, + }, + task, + ) + } +} + +#[async_trait] +impl OutputHandlerFactory + for ConcurrentOutputHandlerFactory +{ + async fn create_handler( + &mut self, + l1_batch_number: L1BatchNumber, + ) -> anyhow::Result> { + let mut conn = self.pool.connection_tagged(L::name()).await?; + let latest_processed_batch = self.loader.latest_processed_batch(&mut conn).await?; + let last_processable_batch = self.loader.last_ready_to_be_loaded_batch(&mut conn).await?; + anyhow::ensure!( + l1_batch_number > latest_processed_batch, + "Cannot handle an already processed batch #{} (latest is #{})", + l1_batch_number, + latest_processed_batch + ); + anyhow::ensure!( + l1_batch_number <= last_processable_batch, + "Cannot handle batch #{} as it is too far away from latest batch #{} (last processable batch is #{})", + l1_batch_number, + latest_processed_batch, + last_processable_batch + ); + + let handler = self.factory.create_handler(l1_batch_number).await?; + let (sender, receiver) = oneshot::channel(); + self.state.insert(l1_batch_number, receiver); + Ok(Box::new(AsyncOutputHandler { + internal: Some(OutputHandlerState::Running { handler, sender }), + })) + } +} + +enum OutputHandlerState { + Running { + handler: Box, + sender: oneshot::Sender>>, + }, + Finished, +} + +impl Debug for OutputHandlerState { + fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { + f.debug_struct("OutputHandlerState").finish() + } +} + +#[derive(Debug)] +struct AsyncOutputHandler { + internal: Option, +} + +#[async_trait] +impl StateKeeperOutputHandler for AsyncOutputHandler { + async fn handle_l2_block(&mut self, updates_manager: &UpdatesManager) -> anyhow::Result<()> { + match &mut self.internal { + Some(OutputHandlerState::Running { handler, .. }) => { + handler.handle_l2_block(updates_manager).await + } + Some(OutputHandlerState::Finished) => { + Err(anyhow::anyhow!("Cannot handle any more L2 blocks")) + } + None => Err(anyhow::anyhow!( + "Unexpected state, missing output handler state" + )), + } + } + + async fn handle_l1_batch( + &mut self, + updates_manager: Arc, + ) -> anyhow::Result<()> { + let state = self.internal.take(); + match state { + Some(OutputHandlerState::Running { + mut handler, + sender, + }) => { + self.internal = Some(OutputHandlerState::Finished); + sender + .send(Box::pin(async move { + handler.handle_l1_batch(updates_manager).await + })) + .ok(); + Ok(()) + } + Some(OutputHandlerState::Finished) => { + self.internal = state; + Err(anyhow::anyhow!("Cannot handle any more L1 batches")) + } + None => Err(anyhow::anyhow!( + "Unexpected state, missing output handler state" + )), + } + } +} + +pub struct ConcurrentOutputHandlerFactoryTask { + pool: ConnectionPool, + loader: L, + state: Arc>>>>, +} + +impl ConcurrentOutputHandlerFactoryTask { + pub async fn run(self, stop_receiver: watch::Receiver) -> anyhow::Result<()> { + let mut conn = self.pool.connection_tagged(L::name()).await?; + let mut latest_processed_batch = self.loader.latest_processed_batch(&mut conn).await?; + drop(conn); + loop { + if *stop_receiver.borrow() { + tracing::info!("`ConcurrentOutputHandlerFactoryTask` was interrupted"); + return Ok(()); + } + match self.state.remove(&(latest_processed_batch + 1)) { + None => { + tokio::time::sleep(Duration::from_millis(50)).await; + } + Some((_, receiver)) => { + let future = receiver.await?; + future.await?; + latest_processed_batch += 1; + let mut conn = self.pool.connection_tagged(L::name()).await?; + self.loader + .mark_l1_batch_as_completed(&mut conn, latest_processed_batch) + .await?; + drop(conn); + } + } + } + } +} diff --git a/core/lib/zksync_core/src/vm_runner/storage.rs b/core/lib/zksync_core/src/vm_runner/storage.rs index 019d5f6e80b..b96a457a50d 100644 --- a/core/lib/zksync_core/src/vm_runner/storage.rs +++ b/core/lib/zksync_core/src/vm_runner/storage.rs @@ -63,6 +63,12 @@ pub trait VmRunnerStorageLoader: Debug + Send + Sync + 'static { &self, conn: &mut Connection<'_, Core>, ) -> anyhow::Result; + + async fn mark_l1_batch_as_completed( + &self, + conn: &mut Connection<'_, Core>, + l1_batch_number: L1BatchNumber, + ) -> anyhow::Result<()>; } /// Abstraction for VM runner's storage layer that provides two main features: From 16438ac8098052617d4e7fc26d3b631d3bf3f9ac Mon Sep 17 00:00:00 2001 From: Daniyar Itegulov Date: Mon, 6 May 2024 19:47:07 +1000 Subject: [PATCH 02/12] fmt + lint --- .../src/state_keeper/io/output_handler.rs | 3 +-- .../src/state_keeper/io/persistence.rs | 8 +++++--- .../src/state_keeper/tests/tester.rs | 7 +++++-- .../zksync_core/src/vm_runner/output_handler.rs | 17 +++++++++++------ core/lib/zksync_core/src/vm_runner/tests/mod.rs | 8 ++++++++ 5 files changed, 30 insertions(+), 13 deletions(-) diff --git a/core/lib/zksync_core/src/state_keeper/io/output_handler.rs b/core/lib/zksync_core/src/state_keeper/io/output_handler.rs index 5b47b29bea0..17773af1c33 100644 --- a/core/lib/zksync_core/src/state_keeper/io/output_handler.rs +++ b/core/lib/zksync_core/src/state_keeper/io/output_handler.rs @@ -1,7 +1,6 @@ //! Handling outputs produced by the state keeper. -use std::fmt; -use std::sync::Arc; +use std::{fmt, sync::Arc}; use anyhow::Context as _; use async_trait::async_trait; diff --git a/core/lib/zksync_core/src/state_keeper/io/persistence.rs b/core/lib/zksync_core/src/state_keeper/io/persistence.rs index 3aa4a6b8697..0438b14ad6d 100644 --- a/core/lib/zksync_core/src/state_keeper/io/persistence.rs +++ b/core/lib/zksync_core/src/state_keeper/io/persistence.rs @@ -1,7 +1,6 @@ //! State keeper persistence logic. -use std::sync::Arc; -use std::time::Instant; +use std::{sync::Arc, time::Instant}; use anyhow::Context as _; use async_trait::async_trait; @@ -360,7 +359,10 @@ mod tests { .map(|query| query.log_query) .collect(); updates.finish_batch(batch_result); - persistence.handle_l1_batch(&updates).await.unwrap(); + persistence + .handle_l1_batch(Arc::new(updates)) + .await + .unwrap(); tx_hash } diff --git a/core/lib/zksync_core/src/state_keeper/tests/tester.rs b/core/lib/zksync_core/src/state_keeper/tests/tester.rs index c57bea73312..0d5943deb5a 100644 --- a/core/lib/zksync_core/src/state_keeper/tests/tester.rs +++ b/core/lib/zksync_core/src/state_keeper/tests/tester.rs @@ -566,13 +566,16 @@ impl StateKeeperOutputHandler for TestPersistence { Ok(()) } - async fn handle_l1_batch(&mut self, updates_manager: &UpdatesManager) -> anyhow::Result<()> { + async fn handle_l1_batch( + &mut self, + updates_manager: Arc, + ) -> anyhow::Result<()> { let action = self.pop_next_item("seal_l1_batch"); let ScenarioItem::BatchSeal(_, check_fn) = action else { anyhow::bail!("Unexpected action: {:?}", action); }; if let Some(check_fn) = check_fn { - check_fn(updates_manager); + check_fn(&updates_manager); } Ok(()) } diff --git a/core/lib/zksync_core/src/vm_runner/output_handler.rs b/core/lib/zksync_core/src/vm_runner/output_handler.rs index 6ad512af025..d3c4c503794 100644 --- a/core/lib/zksync_core/src/vm_runner/output_handler.rs +++ b/core/lib/zksync_core/src/vm_runner/output_handler.rs @@ -1,17 +1,22 @@ -use crate::state_keeper::updates::UpdatesManager; -use crate::state_keeper::StateKeeperOutputHandler; -use crate::vm_runner::VmRunnerStorageLoader; +use std::{ + fmt::{Debug, Formatter}, + sync::Arc, + time::Duration, +}; + use async_trait::async_trait; use dashmap::DashMap; use futures::future::BoxFuture; -use std::fmt::{Debug, Formatter}; -use std::sync::Arc; -use std::time::Duration; use tokio::sync::{oneshot, watch}; use zksync_dal::Core; use zksync_db_connection::connection_pool::ConnectionPool; use zksync_types::L1BatchNumber; +use crate::{ + state_keeper::{updates::UpdatesManager, StateKeeperOutputHandler}, + vm_runner::VmRunnerStorageLoader, +}; + #[async_trait] pub trait OutputHandlerFactory: Debug + Send { async fn create_handler( diff --git a/core/lib/zksync_core/src/vm_runner/tests/mod.rs b/core/lib/zksync_core/src/vm_runner/tests/mod.rs index bc56ead5d9d..6d32ceb8c16 100644 --- a/core/lib/zksync_core/src/vm_runner/tests/mod.rs +++ b/core/lib/zksync_core/src/vm_runner/tests/mod.rs @@ -53,6 +53,14 @@ impl VmRunnerStorageLoader for Arc> { ) -> anyhow::Result { Ok(self.read().await.max) } + + async fn mark_l1_batch_as_completed( + &self, + _conn: &mut Connection<'_, Core>, + _l1_batch_number: L1BatchNumber, + ) -> anyhow::Result<()> { + Ok(()) + } } #[derive(Debug)] From 86c4005eb216682b2e4ecb5c79e9138ca817050d Mon Sep 17 00:00:00 2001 From: Daniyar Itegulov Date: Tue, 7 May 2024 15:35:55 +1000 Subject: [PATCH 03/12] add some comments + sleep constant --- .../lib/zksync_core/src/vm_runner/output_handler.rs | 13 ++++++++++++- core/lib/zksync_core/src/vm_runner/storage.rs | 4 +++- 2 files changed, 15 insertions(+), 2 deletions(-) diff --git a/core/lib/zksync_core/src/vm_runner/output_handler.rs b/core/lib/zksync_core/src/vm_runner/output_handler.rs index d3c4c503794..11ec205136a 100644 --- a/core/lib/zksync_core/src/vm_runner/output_handler.rs +++ b/core/lib/zksync_core/src/vm_runner/output_handler.rs @@ -81,6 +81,7 @@ impl OutputHandlerFactory let mut conn = self.pool.connection_tagged(L::name()).await?; let latest_processed_batch = self.loader.latest_processed_batch(&mut conn).await?; let last_processable_batch = self.loader.last_ready_to_be_loaded_batch(&mut conn).await?; + drop(conn); anyhow::ensure!( l1_batch_number > latest_processed_batch, "Cannot handle an already processed batch #{} (latest is #{})", @@ -176,6 +177,8 @@ pub struct ConcurrentOutputHandlerFactoryTask { impl ConcurrentOutputHandlerFactoryTask { pub async fn run(self, stop_receiver: watch::Receiver) -> anyhow::Result<()> { + const SLEEP_INTERVAL: Duration = Duration::from_millis(50); + let mut conn = self.pool.connection_tagged(L::name()).await?; let mut latest_processed_batch = self.loader.latest_processed_batch(&mut conn).await?; drop(conn); @@ -186,10 +189,18 @@ impl ConcurrentOutputHandlerFactoryTask { } match self.state.remove(&(latest_processed_batch + 1)) { None => { - tokio::time::sleep(Duration::from_millis(50)).await; + tracing::debug!( + "Output handler for batch #{} has not been created yet", + latest_processed_batch + 1 + ); + tokio::time::sleep(SLEEP_INTERVAL).await; } Some((_, receiver)) => { + // Wait until the future is sent through the receiver, happens when + // `handle_l1_batch` is called on the corresponding output handler let future = receiver.await?; + // Wait until the future is completed, meaning that the `handle_l1_batch` + // computation has finished, and we can consider this batch to be completed future.await?; latest_processed_batch += 1; let mut conn = self.pool.connection_tagged(L::name()).await?; diff --git a/core/lib/zksync_core/src/vm_runner/storage.rs b/core/lib/zksync_core/src/vm_runner/storage.rs index 973652557f1..89caaf81fc2 100644 --- a/core/lib/zksync_core/src/vm_runner/storage.rs +++ b/core/lib/zksync_core/src/vm_runner/storage.rs @@ -277,6 +277,8 @@ impl StorageSyncTask { } pub async fn run(self, stop_receiver: watch::Receiver) -> anyhow::Result<()> { + const SLEEP_INTERVAL: Duration = Duration::from_millis(50); + self.catchup_task.run(stop_receiver.clone()).await?; let rocksdb = self.rocksdb_cell.get().ok_or_else(|| { anyhow::anyhow!("Expected RocksDB to be initialized by `AsyncCatchupTask`") @@ -300,7 +302,7 @@ impl StorageSyncTask { // No need to do anything, killing time until last processed batch is updated. drop(conn); drop(state); - tokio::time::sleep(Duration::from_millis(100)).await; + tokio::time::sleep(SLEEP_INTERVAL).await; continue; } } From e3fec2326490cabcf74a3dd78e9eee06b632a862 Mon Sep 17 00:00:00 2001 From: Daniyar Itegulov Date: Tue, 7 May 2024 15:49:57 +1000 Subject: [PATCH 04/12] rename `VmRunnerStorageLoader` to `VmRunnerIo` --- core/lib/zksync_core/src/vm_runner/io.rs | 45 +++++++++++ core/lib/zksync_core/src/vm_runner/mod.rs | 4 +- .../src/vm_runner/output_handler.rs | 48 ++++++------ core/lib/zksync_core/src/vm_runner/storage.rs | 78 ++++++------------- 4 files changed, 92 insertions(+), 83 deletions(-) create mode 100644 core/lib/zksync_core/src/vm_runner/io.rs diff --git a/core/lib/zksync_core/src/vm_runner/io.rs b/core/lib/zksync_core/src/vm_runner/io.rs new file mode 100644 index 00000000000..f9093339d51 --- /dev/null +++ b/core/lib/zksync_core/src/vm_runner/io.rs @@ -0,0 +1,45 @@ +use async_trait::async_trait; +use std::fmt::Debug; +use zksync_dal::Core; +use zksync_db_connection::connection::Connection; +use zksync_types::L1BatchNumber; + +/// Functionality to fetch/save data about processed/unprocessed batches for a particular VM runner +/// instance. +#[async_trait] +pub trait VmRunnerIo: Debug + Send + Sync + 'static { + /// Unique name of the VM runner instance. + fn name() -> &'static str; + + /// Returns the last L1 batch number that has been processed by this VM runner instance. + /// + /// # Errors + /// + /// Propagates DB errors. + async fn latest_processed_batch( + &self, + conn: &mut Connection<'_, Core>, + ) -> anyhow::Result; + + /// Returns the last L1 batch number that is ready to be loaded by this VM runner instance. + /// + /// # Errors + /// + /// Propagates DB errors. + async fn last_ready_to_be_loaded_batch( + &self, + conn: &mut Connection<'_, Core>, + ) -> anyhow::Result; + + /// Marks the specified batch as the latest completed batch. All earlier batches are considered + /// to be completed too. No guarantees about later batches. + /// + /// # Errors + /// + /// Propagates DB errors. + async fn mark_l1_batch_as_completed( + &self, + conn: &mut Connection<'_, Core>, + l1_batch_number: L1BatchNumber, + ) -> anyhow::Result<()>; +} diff --git a/core/lib/zksync_core/src/vm_runner/mod.rs b/core/lib/zksync_core/src/vm_runner/mod.rs index 004edef1d40..eea5c90c3ed 100644 --- a/core/lib/zksync_core/src/vm_runner/mod.rs +++ b/core/lib/zksync_core/src/vm_runner/mod.rs @@ -1,10 +1,12 @@ +mod io; mod output_handler; mod storage; #[cfg(test)] mod tests; +pub use io::VmRunnerIo; pub use output_handler::{ ConcurrentOutputHandlerFactory, ConcurrentOutputHandlerFactoryTask, OutputHandlerFactory, }; -pub use storage::{BatchExecuteData, VmRunnerStorage, VmRunnerStorageLoader}; +pub use storage::{BatchExecuteData, VmRunnerStorage}; diff --git a/core/lib/zksync_core/src/vm_runner/output_handler.rs b/core/lib/zksync_core/src/vm_runner/output_handler.rs index 11ec205136a..4ec18e86c38 100644 --- a/core/lib/zksync_core/src/vm_runner/output_handler.rs +++ b/core/lib/zksync_core/src/vm_runner/output_handler.rs @@ -14,7 +14,7 @@ use zksync_types::L1BatchNumber; use crate::{ state_keeper::{updates::UpdatesManager, StateKeeperOutputHandler}, - vm_runner::VmRunnerStorageLoader, + vm_runner::VmRunnerIo, }; #[async_trait] @@ -25,44 +25,40 @@ pub trait OutputHandlerFactory: Debug + Send { ) -> anyhow::Result>; } -pub struct ConcurrentOutputHandlerFactory { +pub struct ConcurrentOutputHandlerFactory { pool: ConnectionPool, state: Arc>>>>, - loader: L, + io: Io, factory: F, } -impl Debug - for ConcurrentOutputHandlerFactory -{ +impl Debug for ConcurrentOutputHandlerFactory { fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { f.debug_struct("ConcurrentOutputHandlerFactory") .field("pool", &self.pool) - .field("loader", &self.loader) + .field("io", &self.io) .field("factory", &self.factory) .finish() } } -impl - ConcurrentOutputHandlerFactory -{ +impl ConcurrentOutputHandlerFactory { pub fn new( pool: ConnectionPool, - loader: L, + io: Io, factory: F, - ) -> (Self, ConcurrentOutputHandlerFactoryTask) { + ) -> (Self, ConcurrentOutputHandlerFactoryTask) { let state = Arc::new(DashMap::new()); let task = ConcurrentOutputHandlerFactoryTask { pool: pool.clone(), - loader: loader.clone(), + io: io.clone(), state: state.clone(), }; ( Self { pool, state, - loader, + io, factory, }, task, @@ -71,16 +67,16 @@ impl } #[async_trait] -impl OutputHandlerFactory - for ConcurrentOutputHandlerFactory +impl OutputHandlerFactory + for ConcurrentOutputHandlerFactory { async fn create_handler( &mut self, l1_batch_number: L1BatchNumber, ) -> anyhow::Result> { - let mut conn = self.pool.connection_tagged(L::name()).await?; - let latest_processed_batch = self.loader.latest_processed_batch(&mut conn).await?; - let last_processable_batch = self.loader.last_ready_to_be_loaded_batch(&mut conn).await?; + let mut conn = self.pool.connection_tagged(Io::name()).await?; + let latest_processed_batch = self.io.latest_processed_batch(&mut conn).await?; + let last_processable_batch = self.io.last_ready_to_be_loaded_batch(&mut conn).await?; drop(conn); anyhow::ensure!( l1_batch_number > latest_processed_batch, @@ -169,18 +165,18 @@ impl StateKeeperOutputHandler for AsyncOutputHandler { } } -pub struct ConcurrentOutputHandlerFactoryTask { +pub struct ConcurrentOutputHandlerFactoryTask { pool: ConnectionPool, - loader: L, + io: Io, state: Arc>>>>, } -impl ConcurrentOutputHandlerFactoryTask { +impl ConcurrentOutputHandlerFactoryTask { pub async fn run(self, stop_receiver: watch::Receiver) -> anyhow::Result<()> { const SLEEP_INTERVAL: Duration = Duration::from_millis(50); - let mut conn = self.pool.connection_tagged(L::name()).await?; - let mut latest_processed_batch = self.loader.latest_processed_batch(&mut conn).await?; + let mut conn = self.pool.connection_tagged(Io::name()).await?; + let mut latest_processed_batch = self.io.latest_processed_batch(&mut conn).await?; drop(conn); loop { if *stop_receiver.borrow() { @@ -203,8 +199,8 @@ impl ConcurrentOutputHandlerFactoryTask { // computation has finished, and we can consider this batch to be completed future.await?; latest_processed_batch += 1; - let mut conn = self.pool.connection_tagged(L::name()).await?; - self.loader + let mut conn = self.pool.connection_tagged(Io::name()).await?; + self.io .mark_l1_batch_as_completed(&mut conn, latest_processed_batch) .await?; drop(conn); diff --git a/core/lib/zksync_core/src/vm_runner/storage.rs b/core/lib/zksync_core/src/vm_runner/storage.rs index 89caaf81fc2..59eaa2862dc 100644 --- a/core/lib/zksync_core/src/vm_runner/storage.rs +++ b/core/lib/zksync_core/src/vm_runner/storage.rs @@ -6,6 +6,7 @@ use std::{ time::Duration, }; +use crate::vm_runner::VmRunnerIo; use anyhow::Context as _; use async_trait::async_trait; use multivm::{interface::L1BatchEnv, vm_1_4_2::SystemEnv}; @@ -37,40 +38,6 @@ struct BatchData { diff: BatchDiff, } -/// Functionality to fetch data about processed/unprocessed batches for a particular VM runner -/// instance. -#[async_trait] -pub trait VmRunnerStorageLoader: Debug + Send + Sync + 'static { - /// Unique name of the VM runner instance. - fn name() -> &'static str; - - /// Returns the last L1 batch number that has been processed by this VM runner instance. - /// - /// # Errors - /// - /// Propagates DB errors. - async fn latest_processed_batch( - &self, - conn: &mut Connection<'_, Core>, - ) -> anyhow::Result; - - /// Returns the last L1 batch number that is ready to be loaded by this VM runner instance. - /// - /// # Errors - /// - /// Propagates DB errors. - async fn last_ready_to_be_loaded_batch( - &self, - conn: &mut Connection<'_, Core>, - ) -> anyhow::Result; - - async fn mark_l1_batch_as_completed( - &self, - conn: &mut Connection<'_, Core>, - l1_batch_number: L1BatchNumber, - ) -> anyhow::Result<()>; -} - /// Abstraction for VM runner's storage layer that provides two main features: /// /// 1. A [`ReadStorageFactory`] implementation backed by either Postgres or RocksDB (if it's @@ -81,12 +48,12 @@ pub trait VmRunnerStorageLoader: Debug + Send + Sync + 'static { /// Users of `VmRunnerStorage` are not supposed to retain storage access to batches that are less /// than `L::latest_processed_batch`. Holding one is considered to be an undefined behavior. #[derive(Debug)] -pub struct VmRunnerStorage { +pub struct VmRunnerStorage { pool: ConnectionPool, l1_batch_params_provider: L1BatchParamsProvider, chain_id: L2ChainId, state: Arc>, - _marker: PhantomData, + _marker: PhantomData, } #[derive(Debug)] @@ -103,15 +70,15 @@ impl State { } } -impl VmRunnerStorage { +impl VmRunnerStorage { /// Creates a new VM runner storage using provided Postgres pool and RocksDB path. pub async fn new( pool: ConnectionPool, rocksdb_path: String, - loader: L, + io: Io, chain_id: L2ChainId, - ) -> anyhow::Result<(Self, StorageSyncTask)> { - let mut conn = pool.connection_tagged(L::name()).await?; + ) -> anyhow::Result<(Self, StorageSyncTask)> { + let mut conn = pool.connection_tagged(Io::name()).await?; let l1_batch_params_provider = L1BatchParamsProvider::new(&mut conn) .await .context("Failed initializing L1 batch params provider")?; @@ -122,8 +89,7 @@ impl VmRunnerStorage { storage: BTreeMap::new(), })); let task = - StorageSyncTask::new(pool.clone(), chain_id, rocksdb_path, loader, state.clone()) - .await?; + StorageSyncTask::new(pool.clone(), chain_id, rocksdb_path, io, state.clone()).await?; Ok(( Self { pool, @@ -193,8 +159,8 @@ impl VmRunnerStorage { ) -> anyhow::Result> { let state = self.state.read().await; if state.rocksdb.is_none() { - let mut conn = self.pool.connection_tagged(L::name()).await?; - return StorageSyncTask::::load_batch_execute_data( + let mut conn = self.pool.connection_tagged(Io::name()).await?; + return StorageSyncTask::::load_batch_execute_data( &mut conn, l1_batch_number, &self.l1_batch_params_provider, @@ -218,7 +184,7 @@ impl VmRunnerStorage { } #[async_trait] -impl ReadStorageFactory for VmRunnerStorage { +impl ReadStorageFactory for VmRunnerStorage { async fn access_storage( &self, stop_receiver: &watch::Receiver, @@ -234,25 +200,25 @@ impl ReadStorageFactory for VmRunnerStorage { /// In the meanwhile, `StorageSyncTask` also loads the next `max_batches_to_load` batches in memory /// so that they are immediately accessible by [`VmRunnerStorage`]. #[derive(Debug)] -pub struct StorageSyncTask { +pub struct StorageSyncTask { pool: ConnectionPool, l1_batch_params_provider: L1BatchParamsProvider, chain_id: L2ChainId, rocksdb_cell: Arc>>, - loader: L, + io: Io, state: Arc>, catchup_task: AsyncCatchupTask, } -impl StorageSyncTask { +impl StorageSyncTask { async fn new( pool: ConnectionPool, chain_id: L2ChainId, rocksdb_path: String, - loader: L, + io: Io, state: Arc>, ) -> anyhow::Result { - let mut conn = pool.connection_tagged(L::name()).await?; + let mut conn = pool.connection_tagged(Io::name()).await?; let l1_batch_params_provider = L1BatchParamsProvider::new(&mut conn) .await .context("Failed initializing L1 batch params provider")?; @@ -262,7 +228,7 @@ impl StorageSyncTask { rocksdb_path, RocksdbStorageOptions::default(), rocksdb_cell.clone(), - Some(loader.latest_processed_batch(&mut conn).await?), + Some(io.latest_processed_batch(&mut conn).await?), ); drop(conn); Ok(Self { @@ -270,7 +236,7 @@ impl StorageSyncTask { l1_batch_params_provider, chain_id, rocksdb_cell, - loader, + io, state, catchup_task, }) @@ -288,8 +254,8 @@ impl StorageSyncTask { tracing::info!("`StorageSyncTask` was interrupted"); return Ok(()); } - let mut conn = self.pool.connection_tagged(L::name()).await?; - let latest_processed_batch = self.loader.latest_processed_batch(&mut conn).await?; + let mut conn = self.pool.connection_tagged(Io::name()).await?; + let latest_processed_batch = self.io.latest_processed_batch(&mut conn).await?; let rocksdb_builder = RocksdbStorageBuilder::from_rocksdb(rocksdb.clone()); if rocksdb_builder.l1_batch_number().await == Some(latest_processed_batch + 1) { // RocksDB is already caught up, we might not need to do anything. @@ -297,7 +263,7 @@ impl StorageSyncTask { let state = self.state.read().await; if state .storage - .contains_key(&self.loader.last_ready_to_be_loaded_batch(&mut conn).await?) + .contains_key(&self.io.last_ready_to_be_loaded_batch(&mut conn).await?) { // No need to do anything, killing time until last processed batch is updated. drop(conn); @@ -330,7 +296,7 @@ impl StorageSyncTask { .map(|e| *e.key()) .unwrap_or(latest_processed_batch); drop(state); - let max_desired = self.loader.last_ready_to_be_loaded_batch(&mut conn).await?; + let max_desired = self.io.last_ready_to_be_loaded_batch(&mut conn).await?; for l1_batch_number in max_present.0 + 1..=max_desired.0 { let l1_batch_number = L1BatchNumber(l1_batch_number); let Some(execute_data) = Self::load_batch_execute_data( From c3dc53c5e0df8ef538c21f0c2171ad8db85c1a03 Mon Sep 17 00:00:00 2001 From: Daniyar Itegulov Date: Tue, 7 May 2024 16:04:40 +1000 Subject: [PATCH 05/12] fmt + lint --- core/lib/zksync_core/src/vm_runner/io.rs | 3 +- core/lib/zksync_core/src/vm_runner/storage.rs | 3 +- .../zksync_core/src/vm_runner/tests/mod.rs | 34 +++++++++---------- 3 files changed, 21 insertions(+), 19 deletions(-) diff --git a/core/lib/zksync_core/src/vm_runner/io.rs b/core/lib/zksync_core/src/vm_runner/io.rs index f9093339d51..aa95248f9d5 100644 --- a/core/lib/zksync_core/src/vm_runner/io.rs +++ b/core/lib/zksync_core/src/vm_runner/io.rs @@ -1,5 +1,6 @@ -use async_trait::async_trait; use std::fmt::Debug; + +use async_trait::async_trait; use zksync_dal::Core; use zksync_db_connection::connection::Connection; use zksync_types::L1BatchNumber; diff --git a/core/lib/zksync_core/src/vm_runner/storage.rs b/core/lib/zksync_core/src/vm_runner/storage.rs index 59eaa2862dc..85778bd4b03 100644 --- a/core/lib/zksync_core/src/vm_runner/storage.rs +++ b/core/lib/zksync_core/src/vm_runner/storage.rs @@ -6,7 +6,6 @@ use std::{ time::Duration, }; -use crate::vm_runner::VmRunnerIo; use anyhow::Context as _; use async_trait::async_trait; use multivm::{interface::L1BatchEnv, vm_1_4_2::SystemEnv}; @@ -21,6 +20,8 @@ use zksync_state::{ use zksync_storage::RocksDB; use zksync_types::{block::L2BlockExecutionData, L1BatchNumber, L2ChainId}; +use crate::vm_runner::VmRunnerIo; + /// Data needed to execute an L1 batch. #[derive(Debug, Clone)] pub struct BatchExecuteData { diff --git a/core/lib/zksync_core/src/vm_runner/tests/mod.rs b/core/lib/zksync_core/src/vm_runner/tests/mod.rs index ebb26d76bfb..80bdb1de28d 100644 --- a/core/lib/zksync_core/src/vm_runner/tests/mod.rs +++ b/core/lib/zksync_core/src/vm_runner/tests/mod.rs @@ -20,7 +20,7 @@ use zksync_types::{ StorageLogKind, StorageValue, H160, H256, }; -use super::{BatchExecuteData, VmRunnerStorage, VmRunnerStorageLoader}; +use super::{BatchExecuteData, VmRunnerIo, VmRunnerStorage}; use crate::utils::testonly::{ create_l1_batch_metadata, create_l2_block, create_l2_transaction, execute_l2_transaction, l1_batch_metadata_to_commitment_artifacts, @@ -33,9 +33,9 @@ struct LoaderMock { } #[async_trait] -impl VmRunnerStorageLoader for Arc> { +impl VmRunnerIo for Arc> { fn name() -> &'static str { - "loader_mock" + "io_mock" } async fn latest_processed_batch( @@ -79,12 +79,12 @@ impl VmRunnerTester { async fn create_storage( &mut self, - loader_mock: Arc>, + io_mock: Arc>, ) -> anyhow::Result>>> { let (vm_runner_storage, task) = VmRunnerStorage::new( self.pool.clone(), self.db_dir.path().to_str().unwrap().to_owned(), - loader_mock, + io_mock, L2ChainId::from(270), ) .await?; @@ -97,7 +97,7 @@ impl VmRunnerTester { } } -impl VmRunnerStorage { +impl VmRunnerStorage { async fn load_batch_eventually( &self, number: L1BatchNumber, @@ -268,11 +268,11 @@ async fn rerun_storage_on_existing_data() -> anyhow::Result<()> { .await?; let mut tester = VmRunnerTester::new(connection_pool.clone()); - let loader_mock = Arc::new(RwLock::new(LoaderMock { + let io_mock = Arc::new(RwLock::new(LoaderMock { current: 0.into(), max: 10.into(), })); - let storage = tester.create_storage(loader_mock.clone()).await?; + let storage = tester.create_storage(io_mock.clone()).await?; // Check that existing batches are returned in the exact same order with the exact same data for batch in &batches { let batch_data = storage.load_batch_eventually(batch.number).await?; @@ -323,7 +323,7 @@ async fn rerun_storage_on_existing_data() -> anyhow::Result<()> { } // "Mark" these batches as processed - loader_mock.write().await.current += batches.len() as u32; + io_mock.write().await.current += batches.len() as u32; // All old batches should no longer be loadable for batch in batches { @@ -346,8 +346,8 @@ async fn continuously_load_new_batches() -> anyhow::Result<()> { drop(conn); let mut tester = VmRunnerTester::new(connection_pool.clone()); - let loader_mock = Arc::new(RwLock::new(LoaderMock::default())); - let storage = tester.create_storage(loader_mock.clone()).await?; + let io_mock = Arc::new(RwLock::new(LoaderMock::default())); + let storage = tester.create_storage(io_mock.clone()).await?; // No batches available yet assert!(storage.load_batch(L1BatchNumber(1)).await?.is_none()); @@ -358,7 +358,7 @@ async fn continuously_load_new_batches() -> anyhow::Result<()> { genesis_params.base_system_contracts().hashes(), ) .await?; - loader_mock.write().await.max += 1; + io_mock.write().await.max += 1; // Load batch and mark it as processed assert_eq!( @@ -369,7 +369,7 @@ async fn continuously_load_new_batches() -> anyhow::Result<()> { .number, L1BatchNumber(1) ); - loader_mock.write().await.current += 1; + io_mock.write().await.current += 1; // No more batches after that assert!(storage.batch_stays_unloaded(L1BatchNumber(2)).await); @@ -381,7 +381,7 @@ async fn continuously_load_new_batches() -> anyhow::Result<()> { genesis_params.base_system_contracts().hashes(), ) .await?; - loader_mock.write().await.max += 1; + io_mock.write().await.max += 1; // Load batch and mark it as processed @@ -393,7 +393,7 @@ async fn continuously_load_new_batches() -> anyhow::Result<()> { .number, L1BatchNumber(2) ); - loader_mock.write().await.current += 1; + io_mock.write().await.current += 1; // No more batches after that assert!(storage.batch_stays_unloaded(L1BatchNumber(3)).await); @@ -433,14 +433,14 @@ async fn access_vm_runner_storage() -> anyhow::Result<()> { let (_sender, receiver) = watch::channel(false); let mut tester = VmRunnerTester::new(connection_pool.clone()); - let loader_mock = Arc::new(RwLock::new(LoaderMock { + let io_mock = Arc::new(RwLock::new(LoaderMock { current: 0.into(), max: 10.into(), })); let rt_handle = Handle::current(); let handle = tokio::task::spawn_blocking(move || { let vm_runner_storage = - rt_handle.block_on(async { tester.create_storage(loader_mock.clone()).await.unwrap() }); + rt_handle.block_on(async { tester.create_storage(io_mock.clone()).await.unwrap() }); for i in 1..=10 { let mut conn = rt_handle.block_on(connection_pool.connection()).unwrap(); let (_, last_l2_block_number) = rt_handle From 5a8be3008e154cb59ee5c6723edb764224f5d4fa Mon Sep 17 00:00:00 2001 From: Daniyar Itegulov Date: Wed, 8 May 2024 13:11:08 +1000 Subject: [PATCH 06/12] add some tests --- .../src/vm_runner/output_handler.rs | 292 ++++++++++++++++++ .../zksync_core/src/vm_runner/tests/mod.rs | 14 +- 2 files changed, 299 insertions(+), 7 deletions(-) diff --git a/core/lib/zksync_core/src/vm_runner/output_handler.rs b/core/lib/zksync_core/src/vm_runner/output_handler.rs index 4ec18e86c38..f651afff1f9 100644 --- a/core/lib/zksync_core/src/vm_runner/output_handler.rs +++ b/core/lib/zksync_core/src/vm_runner/output_handler.rs @@ -209,3 +209,295 @@ impl ConcurrentOutputHandlerFactoryTask { } } } + +#[cfg(test)] +mod tests { + use crate::state_keeper::updates::UpdatesManager; + use crate::state_keeper::StateKeeperOutputHandler; + use crate::vm_runner::{ConcurrentOutputHandlerFactory, OutputHandlerFactory, VmRunnerIo}; + use async_trait::async_trait; + use backon::{ConstantBuilder, Retryable}; + use multivm::interface::{L1BatchEnv, L2BlockEnv, SystemEnv, TxExecutionMode}; + use std::collections::HashMap; + use std::sync::Arc; + use std::time::Duration; + use tokio::sync::{watch, RwLock}; + use tokio::task::JoinHandle; + use zksync_contracts::{BaseSystemContracts, SystemContractCode}; + use zksync_dal::Core; + use zksync_db_connection::connection::Connection; + use zksync_db_connection::connection_pool::ConnectionPool; + use zksync_types::L1BatchNumber; + + #[derive(Debug, Default)] + struct IoMock { + current: L1BatchNumber, + max: u32, + } + + #[async_trait] + impl VmRunnerIo for Arc> { + fn name() -> &'static str { + "io_mock" + } + + async fn latest_processed_batch( + &self, + _conn: &mut Connection<'_, Core>, + ) -> anyhow::Result { + Ok(self.read().await.current) + } + + async fn last_ready_to_be_loaded_batch( + &self, + _conn: &mut Connection<'_, Core>, + ) -> anyhow::Result { + let io = self.read().await; + Ok(io.current + io.max) + } + + async fn mark_l1_batch_as_completed( + &self, + _conn: &mut Connection<'_, Core>, + l1_batch_number: L1BatchNumber, + ) -> anyhow::Result<()> { + self.write().await.current = l1_batch_number; + Ok(()) + } + } + + #[derive(Debug)] + struct TestOutputFactory { + delays: HashMap, + } + + #[async_trait] + impl OutputHandlerFactory for TestOutputFactory { + async fn create_handler( + &mut self, + l1_batch_number: L1BatchNumber, + ) -> anyhow::Result> { + let delay = self.delays.get(&l1_batch_number).copied(); + #[derive(Debug)] + struct TestOutputHandler { + delay: Option, + } + #[async_trait] + impl StateKeeperOutputHandler for TestOutputHandler { + async fn handle_l2_block( + &mut self, + _updates_manager: &UpdatesManager, + ) -> anyhow::Result<()> { + Ok(()) + } + + async fn handle_l1_batch( + &mut self, + _updates_manager: Arc, + ) -> anyhow::Result<()> { + if let Some(delay) = self.delay { + tokio::time::sleep(delay).await + } + Ok(()) + } + } + Ok(Box::new(TestOutputHandler { delay })) + } + } + + struct OutputHandlerTester { + io: Arc>, + output_factory: ConcurrentOutputHandlerFactory>, TestOutputFactory>, + tasks: Vec>, + stop_sender: watch::Sender, + } + + impl OutputHandlerTester { + fn new( + io: Arc>, + pool: ConnectionPool, + delays: HashMap, + ) -> Self { + let test_factory = TestOutputFactory { delays }; + let (output_factory, task) = + ConcurrentOutputHandlerFactory::new(pool, io.clone(), test_factory); + let (stop_sender, stop_receiver) = watch::channel(false); + let join_handle = + tokio::task::spawn(async move { task.run(stop_receiver).await.unwrap() }); + let tasks = vec![join_handle]; + Self { + io, + output_factory, + tasks, + stop_sender, + } + } + + async fn spawn_test_task(&mut self, l1_batch_number: L1BatchNumber) -> anyhow::Result<()> { + let mut output_handler = self.output_factory.create_handler(l1_batch_number).await?; + let join_handle = tokio::task::spawn(async move { + let l1_batch_env = L1BatchEnv { + previous_batch_hash: None, + number: Default::default(), + timestamp: 0, + fee_input: Default::default(), + fee_account: Default::default(), + enforced_base_fee: None, + first_l2_block: L2BlockEnv { + number: 0, + timestamp: 0, + prev_block_hash: Default::default(), + max_virtual_blocks_to_create: 0, + }, + }; + let system_env = SystemEnv { + zk_porter_available: false, + version: Default::default(), + base_system_smart_contracts: BaseSystemContracts { + bootloader: SystemContractCode { + code: vec![], + hash: Default::default(), + }, + default_aa: SystemContractCode { + code: vec![], + hash: Default::default(), + }, + }, + bootloader_gas_limit: 0, + execution_mode: TxExecutionMode::VerifyExecute, + default_validation_computational_gas_limit: 0, + chain_id: Default::default(), + }; + let updates_manager = UpdatesManager::new(&l1_batch_env, &system_env); + output_handler + .handle_l2_block(&updates_manager) + .await + .unwrap(); + output_handler + .handle_l1_batch(Arc::new(updates_manager)) + .await + .unwrap(); + }); + self.tasks.push(join_handle); + Ok(()) + } + + async fn wait_for_batch( + &self, + l1_batch_number: L1BatchNumber, + timeout: Duration, + ) -> anyhow::Result<()> { + const RETRY_INTERVAL: Duration = Duration::from_millis(500); + + let max_tries = (timeout.as_secs_f64() / RETRY_INTERVAL.as_secs_f64()).ceil() as u64; + (|| async { + let current = self.io.read().await.current; + anyhow::ensure!( + current == l1_batch_number, + "Batch #{} has not been processed yet (current is #{})", + l1_batch_number, + current + ); + Ok(()) + }) + .retry( + &ConstantBuilder::default() + .with_delay(RETRY_INTERVAL) + .with_max_times(max_tries as usize), + ) + .await + } + + async fn wait_for_batch_progressively( + &self, + l1_batch_number: L1BatchNumber, + timeout: Duration, + ) -> anyhow::Result<()> { + const SLEEP_INTERVAL: Duration = Duration::from_millis(500); + + let mut current = self.io.read().await.current; + let max_tries = (timeout.as_secs_f64() / SLEEP_INTERVAL.as_secs_f64()).ceil() as u64; + let mut try_num = 0; + loop { + tokio::time::sleep(SLEEP_INTERVAL).await; + try_num += 1; + if try_num >= max_tries { + anyhow::bail!("Timeout"); + } + let new_current = self.io.read().await.current; + // Ensure we did not go back in latest processed batch + if new_current < current { + anyhow::bail!( + "Latest processed batch regressed to #{} back from #{}", + new_current, + current + ); + } + current = new_current; + if current >= l1_batch_number { + return Ok(()); + } + } + } + + async fn stop_and_wait_for_all_tasks(self) -> anyhow::Result<()> { + self.stop_sender.send(true)?; + futures::future::join_all(self.tasks).await; + Ok(()) + } + } + + #[tokio::test(flavor = "multi_thread", worker_threads = 10)] + async fn monotonically_progress_processed_batches() -> anyhow::Result<()> { + let pool = ConnectionPool::::test_pool().await; + let io = Arc::new(RwLock::new(IoMock { + current: 0.into(), + max: 10, + })); + // Distribute progressively higher delays for higher batches so that we can observe + // each batch being marked as processed. In other words, batch 1 would be marked as processed, + // then there will be a minimum 1 sec of delay (more in <10 thread environments), then batch + // 2 would be marked as processed etc. + let delays = (1..10) + .map(|i| (L1BatchNumber(i), Duration::from_secs(i as u64))) + .collect(); + let mut tester = OutputHandlerTester::new(io.clone(), pool, delays); + for i in 1..10 { + tester.spawn_test_task(i.into()).await?; + } + assert_eq!(io.read().await.current, L1BatchNumber(0)); + for i in 1..10 { + tester + .wait_for_batch(i.into(), Duration::from_secs(10)) + .await?; + } + tester.stop_and_wait_for_all_tasks().await?; + assert_eq!(io.read().await.current, L1BatchNumber(9)); + Ok(()) + } + + #[tokio::test(flavor = "multi_thread", worker_threads = 10)] + async fn do_not_progress_with_gaps() -> anyhow::Result<()> { + let pool = ConnectionPool::::test_pool().await; + let io = Arc::new(RwLock::new(IoMock { + current: 0.into(), + max: 10, + })); + // Distribute progressively lower delays for higher batches so that we can observe last + // processed batch not move until the first batch (with longest delay) is processed. + let delays = (1..10) + .map(|i| (L1BatchNumber(i), Duration::from_secs(10 - i as u64))) + .collect(); + let mut tester = OutputHandlerTester::new(io.clone(), pool, delays); + for i in 1..10 { + tester.spawn_test_task(i.into()).await?; + } + assert_eq!(io.read().await.current, L1BatchNumber(0)); + tester + .wait_for_batch_progressively(L1BatchNumber(9), Duration::from_secs(60)) + .await?; + tester.stop_and_wait_for_all_tasks().await?; + assert_eq!(io.read().await.current, L1BatchNumber(9)); + Ok(()) + } +} diff --git a/core/lib/zksync_core/src/vm_runner/tests/mod.rs b/core/lib/zksync_core/src/vm_runner/tests/mod.rs index 80bdb1de28d..9e837037709 100644 --- a/core/lib/zksync_core/src/vm_runner/tests/mod.rs +++ b/core/lib/zksync_core/src/vm_runner/tests/mod.rs @@ -27,13 +27,13 @@ use crate::utils::testonly::{ }; #[derive(Debug, Default)] -struct LoaderMock { +struct IoMock { current: L1BatchNumber, max: L1BatchNumber, } #[async_trait] -impl VmRunnerIo for Arc> { +impl VmRunnerIo for Arc> { fn name() -> &'static str { "io_mock" } @@ -79,8 +79,8 @@ impl VmRunnerTester { async fn create_storage( &mut self, - io_mock: Arc>, - ) -> anyhow::Result>>> { + io_mock: Arc>, + ) -> anyhow::Result>>> { let (vm_runner_storage, task) = VmRunnerStorage::new( self.pool.clone(), self.db_dir.path().to_str().unwrap().to_owned(), @@ -268,7 +268,7 @@ async fn rerun_storage_on_existing_data() -> anyhow::Result<()> { .await?; let mut tester = VmRunnerTester::new(connection_pool.clone()); - let io_mock = Arc::new(RwLock::new(LoaderMock { + let io_mock = Arc::new(RwLock::new(IoMock { current: 0.into(), max: 10.into(), })); @@ -346,7 +346,7 @@ async fn continuously_load_new_batches() -> anyhow::Result<()> { drop(conn); let mut tester = VmRunnerTester::new(connection_pool.clone()); - let io_mock = Arc::new(RwLock::new(LoaderMock::default())); + let io_mock = Arc::new(RwLock::new(IoMock::default())); let storage = tester.create_storage(io_mock.clone()).await?; // No batches available yet assert!(storage.load_batch(L1BatchNumber(1)).await?.is_none()); @@ -433,7 +433,7 @@ async fn access_vm_runner_storage() -> anyhow::Result<()> { let (_sender, receiver) = watch::channel(false); let mut tester = VmRunnerTester::new(connection_pool.clone()); - let io_mock = Arc::new(RwLock::new(LoaderMock { + let io_mock = Arc::new(RwLock::new(IoMock { current: 0.into(), max: 10.into(), })); From b0d136827408b37a78071018c7fc0c05fdc2ccc3 Mon Sep 17 00:00:00 2001 From: Daniyar Itegulov Date: Wed, 8 May 2024 13:57:08 +1000 Subject: [PATCH 07/12] fmt --- core/node/vm_runner/src/output_handler.rs | 14 ++++++++------ 1 file changed, 8 insertions(+), 6 deletions(-) diff --git a/core/node/vm_runner/src/output_handler.rs b/core/node/vm_runner/src/output_handler.rs index 33b2bff4e30..a6a8cb08843 100644 --- a/core/node/vm_runner/src/output_handler.rs +++ b/core/node/vm_runner/src/output_handler.rs @@ -209,20 +209,22 @@ impl ConcurrentOutputHandlerFactoryTask { #[cfg(test)] mod tests { - use crate::{ConcurrentOutputHandlerFactory, OutputHandlerFactory, VmRunnerIo}; + use std::{collections::HashMap, sync::Arc, time::Duration}; + use async_trait::async_trait; use backon::{ConstantBuilder, Retryable}; use multivm::interface::{L1BatchEnv, L2BlockEnv, SystemEnv, TxExecutionMode}; - use std::collections::HashMap; - use std::sync::Arc; - use std::time::Duration; - use tokio::sync::{watch, RwLock}; - use tokio::task::JoinHandle; + use tokio::{ + sync::{watch, RwLock}, + task::JoinHandle, + }; use zksync_contracts::{BaseSystemContracts, SystemContractCode}; use zksync_core::state_keeper::{StateKeeperOutputHandler, UpdatesManager}; use zksync_dal::{Connection, ConnectionPool, Core}; use zksync_types::L1BatchNumber; + use crate::{ConcurrentOutputHandlerFactory, OutputHandlerFactory, VmRunnerIo}; + #[derive(Debug, Default)] struct IoMock { current: L1BatchNumber, From 032007ca4100d5b260274264f208b5d6f267988e Mon Sep 17 00:00:00 2001 From: Daniyar Itegulov Date: Thu, 9 May 2024 16:08:25 +1000 Subject: [PATCH 08/12] ensure the crate has documentation --- core/node/vm_runner/src/lib.rs | 5 +++ core/node/vm_runner/src/output_handler.rs | 42 +++++++++++++++++++++++ 2 files changed, 47 insertions(+) diff --git a/core/node/vm_runner/src/lib.rs b/core/node/vm_runner/src/lib.rs index eea5c90c3ed..44db8564450 100644 --- a/core/node/vm_runner/src/lib.rs +++ b/core/node/vm_runner/src/lib.rs @@ -1,3 +1,8 @@ +//! VM Runner is a framework to build batch processor components, i.e. components that would re-run +//! batches in VM independently from state keeper and handle some output as a result. + +#![warn(missing_debug_implementations, missing_docs)] + mod io; mod output_handler; mod storage; diff --git a/core/node/vm_runner/src/output_handler.rs b/core/node/vm_runner/src/output_handler.rs index a6a8cb08843..d9c07bdf7d4 100644 --- a/core/node/vm_runner/src/output_handler.rs +++ b/core/node/vm_runner/src/output_handler.rs @@ -14,14 +14,36 @@ use zksync_types::L1BatchNumber; use crate::VmRunnerIo; +/// Functionality to produce a [`StateKeeperOutputHandler`] implementation for a specific L1 batch. +/// +/// The idea behind this trait is that often handling output data is independent of the order of the +/// batch that data belongs to. In other words, one could be handling output of batch #100 and #1000 +/// simultaneously. Implementing this trait signifies that this property is held for the data the +/// implementation is responsible for. #[async_trait] pub trait OutputHandlerFactory: Debug + Send { + /// Creates a [`StateKeeperOutputHandler`] implementation for the provided L1 batch. Only + /// supposed to be used for the L1 batch data it was created against. Using it for anything else + /// is undefined behavior. + /// + /// # Errors + /// + /// Propagates DB errors. async fn create_handler( &mut self, l1_batch_number: L1BatchNumber, ) -> anyhow::Result>; } +/// A delegator factory that requires an underlying factory `F` that does the actual work, however +/// this struct is orchestrated such that any output handler it produces has a non-blocking +/// `handle_l1_batch` implementation (where the heaviest work is expected to happen). +/// +/// Once the asynchronous work done in `handle_l1_batch` finishes it is also guaranteed to mark the +/// batch is processed by `Io`. It is guaranteed, however, that for any processed batch all batches +/// preceding it are also processed. No guarantees about subsequent batches. For example, if +/// batches #1, #2, #3, #5, #9, #100 are processed then only batches #{1-3} will be marked as +/// processed and #3 would be the latest processed batch as defined in [`VmRunnerIo`]. pub struct ConcurrentOutputHandlerFactory { pool: ConnectionPool, state: Arc>>>>, @@ -40,6 +62,10 @@ impl Debug for ConcurrentOutputHandlerF } impl ConcurrentOutputHandlerFactory { + /// Creates a new concurrent delegator factory using provided Postgres pool, VM runner IO + /// and underlying output handler factory. + /// + /// Returns a [`ConcurrentOutputHandlerFactoryTask`] which is supposed to be run by the caller. pub fn new( pool: ConnectionPool, io: Io, @@ -162,13 +188,29 @@ impl StateKeeperOutputHandler for AsyncOutputHandler { } } +/// A runnable task that continually awaits for the very next unprocessed batch to be processed and +/// marks it as so using `Io`. pub struct ConcurrentOutputHandlerFactoryTask { pool: ConnectionPool, io: Io, state: Arc>>>>, } +impl Debug for ConcurrentOutputHandlerFactoryTask { + fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { + f.debug_struct("ConcurrentOutputHandlerFactoryTask") + .field("pool", &self.pool) + .field("io", &self.io) + .finish() + } +} + impl ConcurrentOutputHandlerFactoryTask { + /// Starts running the task which is supposed to last until the end of the node's lifetime. + /// + /// # Errors + /// + /// Propagates DB errors. pub async fn run(self, stop_receiver: watch::Receiver) -> anyhow::Result<()> { const SLEEP_INTERVAL: Duration = Duration::from_millis(50); From f7142571cdaa2e8e367504c6d1dffd1b09c93da3 Mon Sep 17 00:00:00 2001 From: Daniyar Itegulov Date: Fri, 10 May 2024 17:06:58 +1000 Subject: [PATCH 09/12] fix spellcheck --- checks-config/era.dic | 1 + 1 file changed, 1 insertion(+) diff --git a/checks-config/era.dic b/checks-config/era.dic index 28b326ce281..0ad16050566 100644 --- a/checks-config/era.dic +++ b/checks-config/era.dic @@ -957,3 +957,4 @@ RECURSION_TIP_ARITY empty_proof hyperchain storages +delegator From 1d8a80880db64a8a54545b4f979a72d98e65304e Mon Sep 17 00:00:00 2001 From: Daniyar Itegulov Date: Wed, 15 May 2024 22:18:17 +1000 Subject: [PATCH 10/12] replace futures with handles --- core/node/vm_runner/src/output_handler.rs | 18 +++++++++--------- 1 file changed, 9 insertions(+), 9 deletions(-) diff --git a/core/node/vm_runner/src/output_handler.rs b/core/node/vm_runner/src/output_handler.rs index d9c07bdf7d4..167db8aff50 100644 --- a/core/node/vm_runner/src/output_handler.rs +++ b/core/node/vm_runner/src/output_handler.rs @@ -6,8 +6,8 @@ use std::{ use async_trait::async_trait; use dashmap::DashMap; -use futures::future::BoxFuture; use tokio::sync::{oneshot, watch}; +use tokio::task::JoinHandle; use zksync_core::state_keeper::{StateKeeperOutputHandler, UpdatesManager}; use zksync_dal::{ConnectionPool, Core}; use zksync_types::L1BatchNumber; @@ -46,7 +46,7 @@ pub trait OutputHandlerFactory: Debug + Send { /// processed and #3 would be the latest processed batch as defined in [`VmRunnerIo`]. pub struct ConcurrentOutputHandlerFactory { pool: ConnectionPool, - state: Arc>>>>, + state: Arc>>>>, io: Io, factory: F, } @@ -127,7 +127,7 @@ impl OutputHandlerFactory enum OutputHandlerState { Running { handler: Box, - sender: oneshot::Sender>>, + sender: oneshot::Sender>>, }, Finished, } @@ -171,7 +171,7 @@ impl StateKeeperOutputHandler for AsyncOutputHandler { }) => { self.internal = Some(OutputHandlerState::Finished); sender - .send(Box::pin(async move { + .send(tokio::task::spawn(async move { handler.handle_l1_batch(updates_manager).await })) .ok(); @@ -193,7 +193,7 @@ impl StateKeeperOutputHandler for AsyncOutputHandler { pub struct ConcurrentOutputHandlerFactoryTask { pool: ConnectionPool, io: Io, - state: Arc>>>>, + state: Arc>>>>, } impl Debug for ConcurrentOutputHandlerFactoryTask { @@ -231,12 +231,12 @@ impl ConcurrentOutputHandlerFactoryTask { tokio::time::sleep(SLEEP_INTERVAL).await; } Some((_, receiver)) => { - // Wait until the future is sent through the receiver, happens when + // Wait until the `JoinHandle` is sent through the receiver, happens when // `handle_l1_batch` is called on the corresponding output handler - let future = receiver.await?; - // Wait until the future is completed, meaning that the `handle_l1_batch` + let handle = receiver.await?; + // Wait until the handle is resolved, meaning that the `handle_l1_batch` // computation has finished, and we can consider this batch to be completed - future.await?; + handle.await??; latest_processed_batch += 1; let mut conn = self.pool.connection_tagged(Io::name()).await?; self.io From 1d9c102b463bf2cb907a82bb33ae05a8c13668f9 Mon Sep 17 00:00:00 2001 From: Daniyar Itegulov Date: Wed, 15 May 2024 22:35:49 +1000 Subject: [PATCH 11/12] a bunch of minor improvements --- core/node/vm_runner/Cargo.toml | 2 +- core/node/vm_runner/src/output_handler.rs | 68 +++++++++++------------ 2 files changed, 35 insertions(+), 35 deletions(-) diff --git a/core/node/vm_runner/Cargo.toml b/core/node/vm_runner/Cargo.toml index b701e7fb4b3..0c754321674 100644 --- a/core/node/vm_runner/Cargo.toml +++ b/core/node/vm_runner/Cargo.toml @@ -24,12 +24,12 @@ anyhow.workspace = true async-trait.workspace = true once_cell.workspace = true tracing.workspace = true -futures = { workspace = true, features = ["compat"] } dashmap.workspace = true [dev-dependencies] zksync_node_test_utils.workspace = true zksync_node_genesis.workspace = true backon.workspace = true +futures = { workspace = true, features = ["compat"] } rand.workspace = true tempfile.workspace = true diff --git a/core/node/vm_runner/src/output_handler.rs b/core/node/vm_runner/src/output_handler.rs index 167db8aff50..f026a348e4d 100644 --- a/core/node/vm_runner/src/output_handler.rs +++ b/core/node/vm_runner/src/output_handler.rs @@ -1,19 +1,25 @@ use std::{ fmt::{Debug, Formatter}, + mem, sync::Arc, time::Duration, }; +use anyhow::Context; use async_trait::async_trait; use dashmap::DashMap; -use tokio::sync::{oneshot, watch}; -use tokio::task::JoinHandle; +use tokio::{ + sync::{oneshot, watch}, + task::JoinHandle, +}; use zksync_core::state_keeper::{StateKeeperOutputHandler, UpdatesManager}; use zksync_dal::{ConnectionPool, Core}; use zksync_types::L1BatchNumber; use crate::VmRunnerIo; +type BatchReceiver = oneshot::Receiver>>; + /// Functionality to produce a [`StateKeeperOutputHandler`] implementation for a specific L1 batch. /// /// The idea behind this trait is that often handling output data is independent of the order of the @@ -24,7 +30,7 @@ use crate::VmRunnerIo; pub trait OutputHandlerFactory: Debug + Send { /// Creates a [`StateKeeperOutputHandler`] implementation for the provided L1 batch. Only /// supposed to be used for the L1 batch data it was created against. Using it for anything else - /// is undefined behavior. + /// will lead to errors. /// /// # Errors /// @@ -46,7 +52,7 @@ pub trait OutputHandlerFactory: Debug + Send { /// processed and #3 would be the latest processed batch as defined in [`VmRunnerIo`]. pub struct ConcurrentOutputHandlerFactory { pool: ConnectionPool, - state: Arc>>>>, + state: Arc>, io: Io, factory: F, } @@ -118,13 +124,11 @@ impl OutputHandlerFactory let handler = self.factory.create_handler(l1_batch_number).await?; let (sender, receiver) = oneshot::channel(); self.state.insert(l1_batch_number, receiver); - Ok(Box::new(AsyncOutputHandler { - internal: Some(OutputHandlerState::Running { handler, sender }), - })) + Ok(Box::new(AsyncOutputHandler::Running { handler, sender })) } } -enum OutputHandlerState { +enum AsyncOutputHandler { Running { handler: Box, sender: oneshot::Sender>>, @@ -132,30 +136,28 @@ enum OutputHandlerState { Finished, } -impl Debug for OutputHandlerState { +impl Debug for AsyncOutputHandler { fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { - f.debug_struct("OutputHandlerState").finish() + match self { + AsyncOutputHandler::Running { handler, .. } => f + .debug_struct("AsyncOutputHandler::Running") + .field("handler", handler) + .finish(), + AsyncOutputHandler::Finished => f.debug_struct("AsyncOutputHandler::Finished").finish(), + } } } -#[derive(Debug)] -struct AsyncOutputHandler { - internal: Option, -} - #[async_trait] impl StateKeeperOutputHandler for AsyncOutputHandler { async fn handle_l2_block(&mut self, updates_manager: &UpdatesManager) -> anyhow::Result<()> { - match &mut self.internal { - Some(OutputHandlerState::Running { handler, .. }) => { + match self { + AsyncOutputHandler::Running { handler, .. } => { handler.handle_l2_block(updates_manager).await } - Some(OutputHandlerState::Finished) => { + AsyncOutputHandler::Finished => { Err(anyhow::anyhow!("Cannot handle any more L2 blocks")) } - None => Err(anyhow::anyhow!( - "Unexpected state, missing output handler state" - )), } } @@ -163,13 +165,12 @@ impl StateKeeperOutputHandler for AsyncOutputHandler { &mut self, updates_manager: Arc, ) -> anyhow::Result<()> { - let state = self.internal.take(); + let state = mem::replace(self, AsyncOutputHandler::Finished); match state { - Some(OutputHandlerState::Running { + AsyncOutputHandler::Running { mut handler, sender, - }) => { - self.internal = Some(OutputHandlerState::Finished); + } => { sender .send(tokio::task::spawn(async move { handler.handle_l1_batch(updates_manager).await @@ -177,13 +178,9 @@ impl StateKeeperOutputHandler for AsyncOutputHandler { .ok(); Ok(()) } - Some(OutputHandlerState::Finished) => { - self.internal = state; + AsyncOutputHandler::Finished => { Err(anyhow::anyhow!("Cannot handle any more L1 batches")) } - None => Err(anyhow::anyhow!( - "Unexpected state, missing output handler state" - )), } } } @@ -193,7 +190,7 @@ impl StateKeeperOutputHandler for AsyncOutputHandler { pub struct ConcurrentOutputHandlerFactoryTask { pool: ConnectionPool, io: Io, - state: Arc>>>>, + state: Arc>, } impl Debug for ConcurrentOutputHandlerFactoryTask { @@ -233,16 +230,19 @@ impl ConcurrentOutputHandlerFactoryTask { Some((_, receiver)) => { // Wait until the `JoinHandle` is sent through the receiver, happens when // `handle_l1_batch` is called on the corresponding output handler - let handle = receiver.await?; + let handle = receiver + .await + .context("handler was dropped before the batch was fully processed")?; // Wait until the handle is resolved, meaning that the `handle_l1_batch` // computation has finished, and we can consider this batch to be completed - handle.await??; + handle + .await + .context("failed to await for batch to be processed")??; latest_processed_batch += 1; let mut conn = self.pool.connection_tagged(Io::name()).await?; self.io .mark_l1_batch_as_completed(&mut conn, latest_processed_batch) .await?; - drop(conn); } } } From d8bfb37241eccd75af10aceb0128f5e33e0bad7e Mon Sep 17 00:00:00 2001 From: Daniyar Itegulov Date: Thu, 16 May 2024 16:42:38 +1000 Subject: [PATCH 12/12] depend solely on state keeper crate --- Cargo.lock | 2 +- core/node/vm_runner/Cargo.toml | 2 +- core/node/vm_runner/src/output_handler.rs | 4 ++-- 3 files changed, 4 insertions(+), 4 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index a439fc2ca9d..a1a167c2c06 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -9442,11 +9442,11 @@ dependencies = [ "tracing", "vm_utils", "zksync_contracts", - "zksync_core", "zksync_dal", "zksync_node_genesis", "zksync_node_test_utils", "zksync_state", + "zksync_state_keeper", "zksync_storage", "zksync_types", ] diff --git a/core/node/vm_runner/Cargo.toml b/core/node/vm_runner/Cargo.toml index 0c754321674..94d5fa01443 100644 --- a/core/node/vm_runner/Cargo.toml +++ b/core/node/vm_runner/Cargo.toml @@ -16,7 +16,7 @@ zksync_dal.workspace = true zksync_contracts.workspace = true zksync_state.workspace = true zksync_storage.workspace = true -zksync_core.workspace = true +zksync_state_keeper.workspace = true vm_utils.workspace = true tokio = { workspace = true, features = ["time"] } diff --git a/core/node/vm_runner/src/output_handler.rs b/core/node/vm_runner/src/output_handler.rs index f026a348e4d..39cb1d33615 100644 --- a/core/node/vm_runner/src/output_handler.rs +++ b/core/node/vm_runner/src/output_handler.rs @@ -12,8 +12,8 @@ use tokio::{ sync::{oneshot, watch}, task::JoinHandle, }; -use zksync_core::state_keeper::{StateKeeperOutputHandler, UpdatesManager}; use zksync_dal::{ConnectionPool, Core}; +use zksync_state_keeper::{StateKeeperOutputHandler, UpdatesManager}; use zksync_types::L1BatchNumber; use crate::VmRunnerIo; @@ -261,8 +261,8 @@ mod tests { task::JoinHandle, }; use zksync_contracts::{BaseSystemContracts, SystemContractCode}; - use zksync_core::state_keeper::{StateKeeperOutputHandler, UpdatesManager}; use zksync_dal::{Connection, ConnectionPool, Core}; + use zksync_state_keeper::{StateKeeperOutputHandler, UpdatesManager}; use zksync_types::L1BatchNumber; use crate::{ConcurrentOutputHandlerFactory, OutputHandlerFactory, VmRunnerIo};