Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor(vm-runner): Improve VM runner / VM playground #2840

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions core/lib/state/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ pub use self::{
shadow_storage::ShadowStorage,
storage_factory::{
BatchDiff, CommonStorage, OwnedStorage, ReadStorageFactory, RocksdbWithMemory,
SnapshotStorage,
},
};

Expand Down
37 changes: 37 additions & 0 deletions core/lib/state/src/storage_factory/metrics.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
use std::time::Duration;

use vise::{Buckets, EncodeLabelSet, EncodeLabelValue, Family, Histogram, Metrics, Unit};

#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, EncodeLabelValue, EncodeLabelSet)]
#[metrics(label = "stage", rename_all = "snake_case")]
pub(super) enum SnapshotStage {
BatchHeader,
ProtectiveReads,
TouchedSlots,
PreviousValues,
InitialWrites,
Bytecodes,
}

#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, EncodeLabelValue, EncodeLabelSet)]
#[metrics(label = "kind", rename_all = "snake_case")]
pub(super) enum AccessKind {
ReadValue,
IsWriteInitial,
LoadFactoryDep,
GetEnumerationIndex,
}

#[derive(Debug, Metrics)]
#[metrics(prefix = "state_snapshot")]
pub(super) struct SnapshotMetrics {
/// Latency of loading a batch snapshot split by stage.
#[metrics(buckets = Buckets::LATENCIES, unit = Unit::Seconds)]
pub load_latency: Family<SnapshotStage, Histogram<Duration>>,
/// Latency of accessing the fallback storage for a batch snapshot.
#[metrics(buckets = Buckets::LATENCIES, unit = Unit::Seconds)]
pub fallback_access_latency: Family<AccessKind, Histogram<Duration>>,
}

#[vise::register]
pub(super) static SNAPSHOT_METRICS: vise::Global<SnapshotMetrics> = vise::Global::new();
Original file line number Diff line number Diff line change
@@ -1,7 +1,4 @@
use std::{
collections::{HashMap, HashSet},
fmt::Debug,
};
use std::{collections::HashSet, fmt};

use anyhow::Context as _;
use async_trait::async_trait;
Expand All @@ -10,64 +7,18 @@ use zksync_dal::{Connection, ConnectionPool, Core, CoreDal};
use zksync_storage::RocksDB;
use zksync_types::{L1BatchNumber, StorageKey, StorageValue, H256};
use zksync_utils::u256_to_h256;
use zksync_vm_interface::storage::{ReadStorage, StorageSnapshot, StorageWithSnapshot};
use zksync_vm_interface::storage::{ReadStorage, StorageSnapshot};

use self::metrics::{SnapshotStage, SNAPSHOT_METRICS};
pub use self::{
rocksdb_with_memory::{BatchDiff, RocksdbWithMemory},
snapshot::SnapshotStorage,
};
use crate::{PostgresStorage, RocksdbStorage, RocksdbStorageBuilder, StateKeeperColumnFamily};

/// Storage with a static lifetime that can be sent to Tokio tasks etc.
pub type OwnedStorage = CommonStorage<'static>;

/// Factory that can produce storage instances on demand. The storage type is encapsulated as a type param
/// (mostly for testing purposes); the default is [`OwnedStorage`].
#[async_trait]
pub trait ReadStorageFactory<S = OwnedStorage>: Debug + Send + Sync + 'static {
/// Creates a storage instance, e.g. over a Postgres connection or a RocksDB instance.
/// The specific criteria on which one are left up to the implementation.
///
/// Implementations may be cancel-aware and return `Ok(None)` iff `stop_receiver` receives
/// a stop signal; this is the only case in which `Ok(None)` should be returned.
async fn access_storage(
&self,
stop_receiver: &watch::Receiver<bool>,
l1_batch_number: L1BatchNumber,
) -> anyhow::Result<Option<S>>;
}

/// [`ReadStorageFactory`] producing Postgres-backed storage instances. Hence, it is slower than more advanced
/// alternatives with RocksDB caches and should be used sparingly (e.g., for testing).
#[async_trait]
impl ReadStorageFactory for ConnectionPool<Core> {
async fn access_storage(
&self,
_stop_receiver: &watch::Receiver<bool>,
l1_batch_number: L1BatchNumber,
) -> anyhow::Result<Option<OwnedStorage>> {
let connection = self.connection().await?;
let storage = OwnedStorage::postgres(connection, l1_batch_number).await?;
Ok(Some(storage.into()))
}
}

/// DB difference introduced by one batch.
#[derive(Debug, Clone)]
pub struct BatchDiff {
/// Storage slots touched by this batch along with new values there.
pub state_diff: HashMap<H256, H256>,
/// Initial write indices introduced by this batch.
pub enum_index_diff: HashMap<H256, u64>,
/// Factory dependencies introduced by this batch.
pub factory_dep_diff: HashMap<H256, Vec<u8>>,
}

/// A RocksDB cache instance with in-memory DB diffs that gives access to DB state at batches `N` to
/// `N + K`, where `K` is the number of diffs.
#[derive(Debug)]
pub struct RocksdbWithMemory {
/// RocksDB cache instance caught up to batch `N`.
pub rocksdb: RocksdbStorage,
/// Diffs for batches `N + 1` to `N + K`.
pub batch_diffs: Vec<BatchDiff>,
}
mod metrics;
mod rocksdb_with_memory;
mod snapshot;

/// Union of all [`ReadStorage`] implementations that are returned by [`ReadStorageFactory`], such as
/// Postgres- and RocksDB-backed storages.
Expand All @@ -83,7 +34,7 @@ pub enum CommonStorage<'a> {
/// Implementation over a RocksDB cache instance with in-memory DB diffs.
RocksdbWithMemory(RocksdbWithMemory),
/// In-memory storage snapshot with the Postgres storage fallback.
Snapshot(StorageWithSnapshot<PostgresStorage<'a>>),
Snapshot(SnapshotStorage<'a>),
/// Generic implementation. Should be used for testing purposes only since it has performance penalty because
/// of the dynamic dispatch.
Boxed(Box<dyn ReadStorage + Send + 'a>),
Expand Down Expand Up @@ -176,6 +127,7 @@ impl CommonStorage<'static> {
connection: &mut Connection<'static, Core>,
l1_batch_number: L1BatchNumber,
) -> anyhow::Result<Option<StorageSnapshot>> {
let latency = SNAPSHOT_METRICS.load_latency[&SnapshotStage::BatchHeader].start();
let Some(header) = connection
.blocks_dal()
.get_l1_batch_header(l1_batch_number)
Expand All @@ -188,8 +140,10 @@ impl CommonStorage<'static> {
.into_iter()
.map(u256_to_h256)
.collect();
latency.observe();

// Check protective reads early on.
let latency = SNAPSHOT_METRICS.load_latency[&SnapshotStage::ProtectiveReads].start();
let protective_reads = connection
.storage_logs_dedup_dal()
.get_protective_reads_for_l1_batch(l1_batch_number)
Expand All @@ -199,14 +153,18 @@ impl CommonStorage<'static> {
return Ok(None);
}
let protective_reads_len = protective_reads.len();
tracing::debug!("Loaded {protective_reads_len} protective reads");
let latency = latency.observe();
tracing::debug!("Loaded {protective_reads_len} protective reads in {latency:?}");

let latency = SNAPSHOT_METRICS.load_latency[&SnapshotStage::TouchedSlots].start();
let touched_slots = connection
.storage_logs_dal()
.get_touched_slots_for_l1_batch(l1_batch_number)
.await?;
tracing::debug!("Loaded {} touched keys", touched_slots.len());
let latency = latency.observe();
tracing::debug!("Loaded {} touched keys in {latency:?}", touched_slots.len());

let latency = SNAPSHOT_METRICS.load_latency[&SnapshotStage::PreviousValues].start();
let all_accessed_keys: Vec<_> = protective_reads
.into_iter()
.map(|key| key.hashed_key())
Expand All @@ -216,21 +174,31 @@ impl CommonStorage<'static> {
.storage_logs_dal()
.get_previous_storage_values(&all_accessed_keys, l1_batch_number)
.await?;
let latency = latency.observe();
tracing::debug!(
"Obtained {} previous values for accessed keys",
"Obtained {} previous values for accessed keys in {latency:?}",
previous_values.len()
);

let latency = SNAPSHOT_METRICS.load_latency[&SnapshotStage::InitialWrites].start();
let initial_write_info = connection
.storage_logs_dal()
.get_l1_batches_and_indices_for_initial_writes(&all_accessed_keys)
.await?;
tracing::debug!("Obtained initial write info for accessed keys");
let latency = latency.observe();
tracing::debug!("Obtained initial write info for accessed keys in {latency:?}");

let latency = SNAPSHOT_METRICS.load_latency[&SnapshotStage::Bytecodes].start();
let bytecodes = connection
.factory_deps_dal()
.get_factory_deps(&bytecode_hashes)
.await;
tracing::debug!("Loaded {} bytecodes used in the batch", bytecodes.len());
let latency = latency.observe();
tracing::debug!(
"Loaded {} bytecodes used in the batch in {latency:?}",
bytecodes.len()
);

let factory_deps = bytecodes
.into_iter()
.map(|(hash_u256, words)| {
Expand All @@ -256,54 +224,6 @@ impl CommonStorage<'static> {
}
}

impl ReadStorage for RocksdbWithMemory {
fn read_value(&mut self, key: &StorageKey) -> StorageValue {
let hashed_key = key.hashed_key();
match self
.batch_diffs
.iter()
.rev()
.find_map(|b| b.state_diff.get(&hashed_key))
{
None => self.rocksdb.read_value(key),
Some(value) => *value,
}
}

fn is_write_initial(&mut self, key: &StorageKey) -> bool {
match self
.batch_diffs
.iter()
.find_map(|b| b.enum_index_diff.get(&key.hashed_key()))
{
None => self.rocksdb.is_write_initial(key),
Some(_) => false,
}
}

fn load_factory_dep(&mut self, hash: H256) -> Option<Vec<u8>> {
match self
.batch_diffs
.iter()
.find_map(|b| b.factory_dep_diff.get(&hash))
{
None => self.rocksdb.load_factory_dep(hash),
Some(value) => Some(value.clone()),
}
}

fn get_enumeration_index(&mut self, key: &StorageKey) -> Option<u64> {
match self
.batch_diffs
.iter()
.find_map(|b| b.enum_index_diff.get(&key.hashed_key()))
{
None => self.rocksdb.get_enumeration_index(key),
Some(value) => Some(*value),
}
}
}

impl ReadStorage for CommonStorage<'_> {
fn read_value(&mut self, key: &StorageKey) -> StorageValue {
match self {
Expand Down Expand Up @@ -358,8 +278,42 @@ impl From<RocksdbStorage> for CommonStorage<'_> {
}
}

impl<'a> From<StorageWithSnapshot<PostgresStorage<'a>>> for CommonStorage<'a> {
fn from(value: StorageWithSnapshot<PostgresStorage<'a>>) -> Self {
impl<'a> From<SnapshotStorage<'a>> for CommonStorage<'a> {
fn from(value: SnapshotStorage<'a>) -> Self {
Self::Snapshot(value)
}
}

/// Storage with a static lifetime that can be sent to Tokio tasks etc.
pub type OwnedStorage = CommonStorage<'static>;

/// Factory that can produce storage instances on demand. The storage type is encapsulated as a type param
/// (mostly for testing purposes); the default is [`OwnedStorage`].
#[async_trait]
pub trait ReadStorageFactory<S = OwnedStorage>: fmt::Debug + Send + Sync + 'static {
/// Creates a storage instance, e.g. over a Postgres connection or a RocksDB instance.
/// The specific criteria on which one are left up to the implementation.
///
/// Implementations may be cancel-aware and return `Ok(None)` iff `stop_receiver` receives
/// a stop signal; this is the only case in which `Ok(None)` should be returned.
async fn access_storage(
&self,
stop_receiver: &watch::Receiver<bool>,
l1_batch_number: L1BatchNumber,
) -> anyhow::Result<Option<S>>;
}

/// [`ReadStorageFactory`] producing Postgres-backed storage instances. Hence, it is slower than more advanced
/// alternatives with RocksDB caches and should be used sparingly (e.g., for testing).
#[async_trait]
impl ReadStorageFactory for ConnectionPool<Core> {
async fn access_storage(
&self,
_stop_receiver: &watch::Receiver<bool>,
l1_batch_number: L1BatchNumber,
) -> anyhow::Result<Option<OwnedStorage>> {
let connection = self.connection().await?;
let storage = OwnedStorage::postgres(connection, l1_batch_number).await?;
Ok(Some(storage.into()))
}
}
75 changes: 75 additions & 0 deletions core/lib/state/src/storage_factory/rocksdb_with_memory.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
use std::collections::HashMap;

use zksync_types::{StorageKey, StorageValue, H256};
use zksync_vm_interface::storage::ReadStorage;

use crate::RocksdbStorage;

/// DB difference introduced by one batch.
#[derive(Debug, Clone)]
pub struct BatchDiff {
/// Storage slots touched by this batch along with new values there.
pub state_diff: HashMap<H256, H256>,
/// Initial write indices introduced by this batch.
pub enum_index_diff: HashMap<H256, u64>,
/// Factory dependencies introduced by this batch.
pub factory_dep_diff: HashMap<H256, Vec<u8>>,
}

/// A RocksDB cache instance with in-memory DB diffs that gives access to DB state at batches `N` to
/// `N + K`, where `K` is the number of diffs.
#[derive(Debug)]
pub struct RocksdbWithMemory {
/// RocksDB cache instance caught up to batch `N`.
pub rocksdb: RocksdbStorage,
/// Diffs for batches `N + 1` to `N + K`.
pub batch_diffs: Vec<BatchDiff>,
}

impl ReadStorage for RocksdbWithMemory {
fn read_value(&mut self, key: &StorageKey) -> StorageValue {
let hashed_key = key.hashed_key();
match self
.batch_diffs
.iter()
.rev()
.find_map(|b| b.state_diff.get(&hashed_key))
{
None => self.rocksdb.read_value(key),
Some(value) => *value,
}
}

fn is_write_initial(&mut self, key: &StorageKey) -> bool {
match self
.batch_diffs
.iter()
.find_map(|b| b.enum_index_diff.get(&key.hashed_key()))
{
None => self.rocksdb.is_write_initial(key),
Some(_) => false,
}
}

fn load_factory_dep(&mut self, hash: H256) -> Option<Vec<u8>> {
match self
.batch_diffs
.iter()
.find_map(|b| b.factory_dep_diff.get(&hash))
{
None => self.rocksdb.load_factory_dep(hash),
Some(value) => Some(value.clone()),
}
}

fn get_enumeration_index(&mut self, key: &StorageKey) -> Option<u64> {
match self
.batch_diffs
.iter()
.find_map(|b| b.enum_index_diff.get(&key.hashed_key()))
{
None => self.rocksdb.get_enumeration_index(key),
Some(value) => Some(*value),
}
}
}
Loading
Loading