Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(vm-runner): implement output handler for VM runner #1856

Merged
merged 20 commits into from
May 16, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
20 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,7 @@ clap = "4.2.2"
codegen = "0.2.0"
criterion = "0.4.0"
ctrlc = "3.1"
dashmap = "5.5.3"
derive_more = "=1.0.0-beta.6"
envy = "0.4"
ethabi = "18.0.0"
Expand Down
1 change: 1 addition & 0 deletions checks-config/era.dic
Original file line number Diff line number Diff line change
Expand Up @@ -960,3 +960,4 @@ storages
vec
zksync_merkle_tree
TreeMetadata
delegator
1 change: 1 addition & 0 deletions core/lib/zksync_core_leftovers/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,7 @@ axum = { workspace = true, features = [
"tokio",
] }
once_cell.workspace = true
dashmap.workspace = true

tracing.workspace = true

Expand Down
5 changes: 4 additions & 1 deletion core/node/node_sync/src/sync_state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -121,7 +121,10 @@ impl StateKeeperOutputHandler for SyncState {
Ok(())
}

async fn handle_l1_batch(&mut self, updates_manager: &UpdatesManager) -> anyhow::Result<()> {
async fn handle_l1_batch(
&mut self,
updates_manager: Arc<UpdatesManager>,
) -> anyhow::Result<()> {
let sealed_block_number = updates_manager.l2_block.number;
self.set_local_block(sealed_block_number);
Ok(())
Expand Down
11 changes: 7 additions & 4 deletions core/node/state_keeper/src/io/output_handler.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
//! Handling outputs produced by the state keeper.

use std::fmt;
use std::{fmt, sync::Arc};

use anyhow::Context as _;
use async_trait::async_trait;
Expand All @@ -20,7 +20,10 @@ pub trait StateKeeperOutputHandler: 'static + Send + fmt::Debug {
async fn handle_l2_block(&mut self, updates_manager: &UpdatesManager) -> anyhow::Result<()>;

/// Handles an L1 batch produced by the state keeper.
async fn handle_l1_batch(&mut self, _updates_manager: &UpdatesManager) -> anyhow::Result<()> {
async fn handle_l1_batch(
&mut self,
_updates_manager: Arc<UpdatesManager>,
) -> anyhow::Result<()> {
Ok(())
}
}
Expand Down Expand Up @@ -81,11 +84,11 @@ impl OutputHandler {

pub(crate) async fn handle_l1_batch(
&mut self,
updates_manager: &UpdatesManager,
updates_manager: Arc<UpdatesManager>,
) -> anyhow::Result<()> {
for handler in &mut self.inner {
handler
.handle_l1_batch(updates_manager)
.handle_l1_batch(updates_manager.clone())
.await
.with_context(|| {
format!(
Expand Down
12 changes: 9 additions & 3 deletions core/node/state_keeper/src/io/persistence.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
//! State keeper persistence logic.

use std::time::Instant;
use std::{sync::Arc, time::Instant};

use anyhow::Context as _;
use async_trait::async_trait;
Expand Down Expand Up @@ -162,7 +162,10 @@ impl StateKeeperOutputHandler for StateKeeperPersistence {
Ok(())
}

async fn handle_l1_batch(&mut self, updates_manager: &UpdatesManager) -> anyhow::Result<()> {
async fn handle_l1_batch(
&mut self,
updates_manager: Arc<UpdatesManager>,
) -> anyhow::Result<()> {
// We cannot start sealing an L1 batch until we've sealed all L2 blocks included in it.
self.wait_for_all_commands().await;

Expand Down Expand Up @@ -372,7 +375,10 @@ mod tests {
);

updates.finish_batch(batch_result);
persistence.handle_l1_batch(&updates).await.unwrap();
persistence
.handle_l1_batch(Arc::new(updates))
.await
.unwrap();

tx_hash
}
Expand Down
4 changes: 2 additions & 2 deletions core/node/state_keeper/src/keeper.rs
Original file line number Diff line number Diff line change
Expand Up @@ -176,8 +176,9 @@ impl ZkSyncStateKeeper {
let finished_batch = batch_executor.finish_batch().await;
let sealed_batch_protocol_version = updates_manager.protocol_version();
updates_manager.finish_batch(finished_batch);
let mut next_cursor = updates_manager.io_cursor();
self.output_handler
.handle_l1_batch(&updates_manager)
.handle_l1_batch(Arc::new(updates_manager))
.await
.with_context(|| format!("failed sealing L1 batch {l1_batch_env:?}"))?;

Expand All @@ -187,7 +188,6 @@ impl ZkSyncStateKeeper {
l1_batch_seal_delta = Some(Instant::now());

// Start the new batch.
let mut next_cursor = updates_manager.io_cursor();
next_cursor.l1_batch += 1;
(system_env, l1_batch_env) = self.wait_for_new_batch_env(&next_cursor).await?;
updates_manager = UpdatesManager::new(&l1_batch_env, &system_env);
Expand Down
1 change: 1 addition & 0 deletions core/node/state_keeper/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ pub use self::{
seal_criteria::SequencerSealer,
state_keeper_storage::AsyncRocksdbCache,
types::MempoolGuard,
updates::UpdatesManager,
};

mod batch_executor;
Expand Down
7 changes: 5 additions & 2 deletions core/node/state_keeper/src/testonly/test_batch_executor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -532,13 +532,16 @@ impl StateKeeperOutputHandler for TestPersistence {
Ok(())
}

async fn handle_l1_batch(&mut self, updates_manager: &UpdatesManager) -> anyhow::Result<()> {
async fn handle_l1_batch(
&mut self,
updates_manager: Arc<UpdatesManager>,
) -> anyhow::Result<()> {
let action = self.pop_next_item("seal_l1_batch");
let ScenarioItem::BatchSeal(_, check_fn) = action else {
anyhow::bail!("Unexpected action: {:?}", action);
};
if let Some(check_fn) = check_fn {
check_fn(updates_manager);
check_fn(&updates_manager);
}
Ok(())
}
Expand Down
2 changes: 1 addition & 1 deletion core/node/state_keeper/src/updates/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ pub struct UpdatesManager {
}

impl UpdatesManager {
pub(crate) fn new(l1_batch_env: &L1BatchEnv, system_env: &SystemEnv) -> Self {
pub fn new(l1_batch_env: &L1BatchEnv, system_env: &SystemEnv) -> Self {
let protocol_version = system_env.version;
Self {
batch_timestamp: l1_batch_env.timestamp,
Expand Down
3 changes: 3 additions & 0 deletions core/node/vm_runner/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -16,17 +16,20 @@ zksync_dal.workspace = true
zksync_contracts.workspace = true
zksync_state.workspace = true
zksync_storage.workspace = true
zksync_state_keeper.workspace = true
vm_utils.workspace = true

tokio = { workspace = true, features = ["time"] }
anyhow.workspace = true
async-trait.workspace = true
once_cell.workspace = true
tracing.workspace = true
dashmap.workspace = true

[dev-dependencies]
zksync_node_test_utils.workspace = true
zksync_node_genesis.workspace = true
backon.workspace = true
futures = { workspace = true, features = ["compat"] }
rand.workspace = true
tempfile.workspace = true
45 changes: 45 additions & 0 deletions core/node/vm_runner/src/io.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
use std::fmt::Debug;

use async_trait::async_trait;
use zksync_dal::{Connection, Core};
use zksync_types::L1BatchNumber;

/// Functionality to fetch/save data about processed/unprocessed batches for a particular VM runner
/// instance.
#[async_trait]
pub trait VmRunnerIo: Debug + Send + Sync + 'static {
/// Unique name of the VM runner instance.
fn name() -> &'static str;

/// Returns the last L1 batch number that has been processed by this VM runner instance.
///
/// # Errors
///
/// Propagates DB errors.
async fn latest_processed_batch(
&self,
conn: &mut Connection<'_, Core>,
) -> anyhow::Result<L1BatchNumber>;

/// Returns the last L1 batch number that is ready to be loaded by this VM runner instance.
///
/// # Errors
///
/// Propagates DB errors.
async fn last_ready_to_be_loaded_batch(
&self,
conn: &mut Connection<'_, Core>,
) -> anyhow::Result<L1BatchNumber>;

/// Marks the specified batch as the latest completed batch. All earlier batches are considered
/// to be completed too. No guarantees about later batches.
///
/// # Errors
///
/// Propagates DB errors.
async fn mark_l1_batch_as_completed(
&self,
conn: &mut Connection<'_, Core>,
l1_batch_number: L1BatchNumber,
) -> anyhow::Result<()>;
}
13 changes: 12 additions & 1 deletion core/node/vm_runner/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,17 @@
//! VM Runner is a framework to build batch processor components, i.e. components that would re-run
//! batches in VM independently from state keeper and handle some output as a result.

#![warn(missing_debug_implementations, missing_docs)]

mod io;
mod output_handler;
mod storage;

#[cfg(test)]
mod tests;

pub use storage::{BatchExecuteData, VmRunnerStorage, VmRunnerStorageLoader};
pub use io::VmRunnerIo;
pub use output_handler::{
ConcurrentOutputHandlerFactory, ConcurrentOutputHandlerFactoryTask, OutputHandlerFactory,
};
pub use storage::{BatchExecuteData, VmRunnerStorage};
Loading
Loading