diff --git a/aptos-move/aptos-vm/src/block_executor/mod.rs b/aptos-move/aptos-vm/src/block_executor/mod.rs index de16c676676b6..cdb6e31e12258 100644 --- a/aptos-move/aptos-vm/src/block_executor/mod.rs +++ b/aptos-move/aptos-vm/src/block_executor/mod.rs @@ -448,7 +448,7 @@ impl BlockExecutorTransactionOutput for AptosTransactionOutput { pub struct BlockAptosVM; impl BlockAptosVM { - fn execute_block_on_thread_pool< + pub fn execute_block_on_thread_pool_with_module_cache< S: StateView + Sync, L: TransactionCommitHook, >( @@ -520,7 +520,8 @@ impl BlockAptosVM { } } - pub fn execute_block_on_thread_pool_without_global_module_cache< + /// Uses shared global module cache to execute blocks. + pub fn execute_block_on_thread_pool< S: StateView + Sync, L: TransactionCommitHook, >( @@ -530,11 +531,11 @@ impl BlockAptosVM { config: BlockExecutorConfig, transaction_commit_listener: Option, ) -> Result, VMStatus> { - Self::execute_block_on_thread_pool::( + Self::execute_block_on_thread_pool_with_module_cache::( executor_thread_pool, signature_verified_block, state_view, - Arc::new(ImmutableModuleCache::empty()), + Arc::clone(&GLOBAL_MODULE_CACHE), config, transaction_commit_listener, ) @@ -550,7 +551,7 @@ impl BlockAptosVM { config: BlockExecutorConfig, transaction_commit_listener: Option, ) -> Result, VMStatus> { - Self::execute_block_on_thread_pool::( + Self::execute_block_on_thread_pool_with_module_cache::( Arc::clone(&RAYON_EXEC_POOL), signature_verified_block, state_view, @@ -564,15 +565,21 @@ impl BlockAptosVM { #[cfg(test)] mod test { use super::*; - use aptos_block_executor::code_cache_global::ImmutableModuleCache; + use aptos_block_executor::{ + code_cache_global::ImmutableModuleCache, txn_commit_hook::NoOpTransactionCommitHook, + }; + use aptos_crypto::HashValue; use aptos_language_e2e_tests::data_store::FakeDataStore; - use aptos_types::on_chain_config::{FeatureFlag, Features}; + use aptos_types::{ + on_chain_config::{FeatureFlag, Features}, + transaction::Transaction, + }; use aptos_vm_environment::environment::AptosEnvironment; use claims::assert_ok; use move_vm_types::code::mock_verified_code; #[test] - fn test_cross_block_module_cache_flush() { + fn test_global_module_cache_flushed_when_features_change() { let global_module_cache = ImmutableModuleCache::empty(); global_module_cache.insert(0, mock_verified_code(0, None)); @@ -603,4 +610,36 @@ mod test { assert!(env_old != env_new); assert_eq!(global_module_cache.size(), 0); } + + #[test] + fn global_module_cache_generation_is_incremented_per_block() { + let global_module_cache = Arc::new(ImmutableModuleCache::empty()); + assert_eq!(global_module_cache.generation(), 0); + + let concurrency_level = num_cpus::get(); + let executor_thread_pool = Arc::new( + rayon::ThreadPoolBuilder::new() + .num_threads(concurrency_level) + .build() + .unwrap(), + ); + let config = BlockExecutorConfig::new_no_block_limit(concurrency_level); + + // Put at least one transaction in the block so that the block executor does not return + // early. + let block = vec![SignatureVerifiedTransaction::Invalid( + Transaction::StateCheckpoint(HashValue::zero()), + )]; + + let result = BlockAptosVM::execute_block_on_thread_pool_with_module_cache( + executor_thread_pool, + &block, + &FakeDataStore::default(), + global_module_cache.clone(), + config, + None::>, + ); + assert_ok!(result); + assert_eq!(global_module_cache.generation(), 1); + } } diff --git a/aptos-move/aptos-vm/src/sharded_block_executor/sharded_executor_service.rs b/aptos-move/aptos-vm/src/sharded_block_executor/sharded_executor_service.rs index efe860c37103e..d8fea825176cc 100644 --- a/aptos-move/aptos-vm/src/sharded_block_executor/sharded_executor_service.rs +++ b/aptos-move/aptos-vm/src/sharded_block_executor/sharded_executor_service.rs @@ -16,6 +16,7 @@ use crate::{ ExecutorShardCommand, }, }; +use aptos_block_executor::code_cache_global::ImmutableModuleCache; use aptos_logger::{info, trace}; use aptos_types::{ block_executor::{ @@ -135,10 +136,15 @@ impl ShardedExecutorService { ); }); s.spawn(move |_| { - let ret = BlockAptosVM::execute_block_on_thread_pool_without_global_module_cache( + // Use empty global module cache to avoid undefined behaviour when it is mutated + // concurrently. + let empty_module_cache = Arc::new(ImmutableModuleCache::empty()); + + let ret = BlockAptosVM::execute_block_on_thread_pool_with_module_cache( executor_thread_pool, &signature_verified_transactions, aggr_overridden_state_view.as_ref(), + empty_module_cache, config, cross_shard_commit_sender, ) diff --git a/aptos-move/block-executor/src/code_cache.rs b/aptos-move/block-executor/src/code_cache.rs index 5f4864621b1f1..11c6c0faa6770 100644 --- a/aptos-move/block-executor/src/code_cache.rs +++ b/aptos-move/block-executor/src/code_cache.rs @@ -145,17 +145,28 @@ impl<'a, T: Transaction, S: TStateView, X: Executable> ModuleCache > { // First, look up the module in the cross-block global module cache. Record the read for // later validation in case the read module is republished. - if let Some(module) = self.global_module_cache.get(key) { - match &self.latest_view { - ViewState::Sync(state) => state - .captured_reads - .borrow_mut() - .capture_global_cache_read(key.clone()), - ViewState::Unsync(state) => { - state.read_set.borrow_mut().capture_module_read(key.clone()) - }, + if let Some((module, needs_validation)) = self.global_module_cache.get(key) { + // If we do not need to validate global cache, return early. + if !needs_validation { + self.capture_global_cache_read(key); + return Ok(Some(module.clone())); + } + + // Otherwise, this is the first time this module gets accessed in this block. We need + // to validate it is the same as in the state. We do it only once on the first access. + let is_valid = builder + .build(key)? + .is_some_and(|m| m.extension() == module.extension()); + if is_valid { + // This module is valid for this block, mark as so. + self.global_module_cache.set_generation(key); + self.capture_global_cache_read(key); + return Ok(Some(module.clone())); } - return Ok(Some(module.clone())); + + // Validation failed, global cache is not consistent with the state! Mark the entry as + // invalid and fall-back to slow path via sync module cache. + self.global_module_cache.mark_invalid(key); } // Global cache miss: check module cache in versioned/unsync maps. @@ -241,4 +252,17 @@ impl<'a, T: Transaction, S: TStateView, X: Executable> LatestView< ViewState::Unsync(state) => state.unsync_map.module_cache(), } } + + /// Captures the read from global module cache. + fn capture_global_cache_read(&self, key: &ModuleId) { + match &self.latest_view { + ViewState::Sync(state) => state + .captured_reads + .borrow_mut() + .capture_global_cache_read(key.clone()), + ViewState::Unsync(state) => { + state.read_set.borrow_mut().capture_module_read(key.clone()) + }, + } + } } diff --git a/aptos-move/block-executor/src/code_cache_global.rs b/aptos-move/block-executor/src/code_cache_global.rs index 6ff31da656e79..fee17ab534fc2 100644 --- a/aptos-move/block-executor/src/code_cache_global.rs +++ b/aptos-move/block-executor/src/code_cache_global.rs @@ -11,20 +11,22 @@ use std::{ hash::Hash, ops::Deref, sync::{ - atomic::{AtomicBool, Ordering}, + atomic::{AtomicBool, AtomicU32, Ordering}, Arc, }, }; /// Module code stored in cross-block module cache. -// TODO(loader_v2): -// We can move this to move-vm-types, but then we also need to have version generic or expose -// transaction index there, and define PanicError in Move (or convert from VMError). struct ImmutableModuleCode { /// True if this code is "valid" within the block execution context (i.e, there has been no /// republishing of this module so far). If false, executor needs to read the module from the /// sync/unsync module caches. valid: CachePadded, + /// Represents the generation of the module cache for which this entry has been validated + /// against the global state. For example, if the generation of the cache is 1, but the entry + /// has generation 0, it should be re-validated against the state and has its generation reset + /// accordingly. + generation: CachePadded, /// Cached verified module. While [ModuleCode] type is used, the following invariants always /// hold: /// 1. Module's version is [None] (storage version). @@ -38,7 +40,10 @@ where { /// Returns a new valid module. Returns a (panic) error if the module is not verified or has /// non-storage version. - fn new(module: Arc>>) -> Result { + fn new( + module: Arc>>, + generation: u32, + ) -> Result { if !module.code().is_verified() || module.version().is_some() { let msg = format!( "Invariant violated for immutable module code : verified ({}), version({:?})", @@ -50,6 +55,7 @@ where Ok(Self { valid: CachePadded::new(AtomicBool::new(true)), + generation: CachePadded::new(AtomicU32::new(generation)), module: CachePadded::new(module), }) } @@ -59,6 +65,16 @@ where self.valid.store(false, Ordering::Release) } + /// Returns the generation of this module. + pub(crate) fn generation(&self) -> u32 { + self.generation.load(Ordering::Acquire) + } + + /// Resets the generation of this module. + fn set_generation(&self, new_generation: u32) { + self.generation.store(new_generation, Ordering::Release); + } + /// Returns true if the module is valid. pub(crate) fn is_valid(&self) -> bool { self.valid.load(Ordering::Acquire) @@ -78,6 +94,9 @@ pub struct ImmutableModuleCache { /// Maximum cache size. If the size is greater than this limit, the cache is flushed. Note that /// this can only be done at block boundaries. capacity: usize, + + /// Represents the generation of this cache. Incremented for every block. + generation: ExplicitSyncWrapper, } impl ImmutableModuleCache @@ -96,6 +115,7 @@ where Self { module_cache: ExplicitSyncWrapper::new(HashMap::new()), capacity, + generation: ExplicitSyncWrapper::new(0), } } @@ -116,12 +136,24 @@ where } } + /// Sets the generation of the module stored at associated key to the generation of the cache. + pub(crate) fn set_generation(&self, key: &K) { + if let Some(module) = self.module_cache.acquire().get(key) { + module.set_generation(*self.generation.acquire()); + } + } + /// Returns the module stored in cache. If the module has not been cached, or it exists but is - /// not valid, [None] is returned. - pub(crate) fn get(&self, key: &K) -> Option>>> { + /// not valid, [None] is returned. Also returns a boolean flag to indicate if the cached module + /// needs validation (i.e., its generation is not equal to the generation of the cache). + pub(crate) fn get( + &self, + key: &K, + ) -> Option<(Arc>>, bool)> { self.module_cache.acquire().get(key).and_then(|module| { if module.is_valid() { - Some(module.inner().clone()) + let needs_validation = *self.generation.acquire() != module.generation(); + Some((module.inner().clone(), needs_validation)) } else { None } @@ -133,8 +165,8 @@ where self.module_cache.acquire().clear(); } - /// Inserts modules into the cache. Should never be called throughout block-execution. Use with - /// caution. + /// Inserts modules into the cache, and increments the generation counter of the cache. Should + /// never be called throughout block-execution. Use with caution. /// /// Notes: /// 1. Only verified modules are inserted. @@ -143,12 +175,13 @@ where /// these constraints are violated, a panic error is returned. /// 4. If the cache size exceeds its capacity after all verified modules have been inserted, /// the cache is flushed. - pub(crate) fn insert_verified_unchecked( + pub(crate) fn insert_verified_and_increment_generation_unchecked( &self, modules: impl Iterator>>)>, ) -> Result<(), PanicError> { use hashbrown::hash_map::Entry::*; + let current_generation = *self.generation.acquire(); let mut guard = self.module_cache.acquire(); let module_cache = guard.dereference_mut(); @@ -167,18 +200,24 @@ where if module.code().is_verified() { let mut module = module.as_ref().clone(); module.set_version(None); - let prev = - module_cache.insert(key.clone(), ImmutableModuleCode::new(Arc::new(module))?); + + let code = ImmutableModuleCode::new(Arc::new(module), current_generation)?; + let prev = module_cache.insert(key.clone(), code); // At this point, we must have removed the entry, or returned a panic error. assert!(prev.is_none()) } } + // In case capacity is exceeded, flush the cache. if module_cache.len() > self.capacity { module_cache.clear(); } + // Increment generation counter to ensure we can later check that module cache is in sync + // with the state. + *self.generation.acquire() = current_generation.wrapping_add(1); + Ok(()) } @@ -187,7 +226,7 @@ where pub fn insert(&self, key: K, module: Arc>>) { self.module_cache .acquire() - .insert(key, ImmutableModuleCode::new(module).unwrap()); + .insert(key, ImmutableModuleCode::new(module, 0).unwrap()); } /// Removes the module from cache. Used for tests only. @@ -201,6 +240,12 @@ where pub fn size(&self) -> usize { self.module_cache.acquire().len() } + + /// Returns the generation counter for the cache. Used for tests only. + #[cfg(any(test, feature = "testing"))] + pub fn generation(&self) -> u32 { + *self.generation.acquire() + } } #[cfg(test)] @@ -211,21 +256,30 @@ mod test { #[test] fn test_immutable_module_code() { - assert!(ImmutableModuleCode::new(mock_deserialized_code(0, None)).is_err()); - assert!(ImmutableModuleCode::new(mock_deserialized_code(0, Some(22))).is_err()); - assert!(ImmutableModuleCode::new(mock_verified_code(0, Some(22))).is_err()); - assert!(ImmutableModuleCode::new(mock_verified_code(0, None)).is_ok()); + assert!(ImmutableModuleCode::new(mock_deserialized_code(0, None), 0).is_err()); + assert!(ImmutableModuleCode::new(mock_deserialized_code(0, Some(22)), 0).is_err()); + assert!(ImmutableModuleCode::new(mock_verified_code(0, Some(22)), 0).is_err()); + assert!(ImmutableModuleCode::new(mock_verified_code(0, None), 0).is_ok()); } #[test] fn test_immutable_module_code_validity() { - let module_code = assert_ok!(ImmutableModuleCode::new(mock_verified_code(0, None))); + let module_code = assert_ok!(ImmutableModuleCode::new(mock_verified_code(0, None), 0)); assert!(module_code.is_valid()); module_code.mark_invalid(); assert!(!module_code.is_valid()); } + #[test] + fn test_immutable_module_code_generation() { + let module_code = assert_ok!(ImmutableModuleCode::new(mock_verified_code(0, None), 7)); + assert_eq!(module_code.generation(), 7); + + module_code.set_generation(10); + assert_eq!(module_code.generation(), 10); + } + #[test] fn test_global_module_cache() { let global_cache = ImmutableModuleCache::empty(); @@ -249,42 +303,61 @@ mod test { fn test_insert_verified_for_global_module_cache() { let capacity = 10; let global_cache = ImmutableModuleCache::with_capacity(capacity); + assert_eq!(global_cache.generation(), 0); let mut new_modules = vec![]; for i in 0..capacity { new_modules.push((i, mock_verified_code(i, Some(i as TxnIndex)))); } - let result = global_cache.insert_verified_unchecked(new_modules.into_iter()); + let result = global_cache + .insert_verified_and_increment_generation_unchecked(new_modules.into_iter()); assert!(result.is_ok()); assert_eq!(global_cache.size(), capacity); + assert_eq!(global_cache.generation(), 1); - // Versions should be set to storage. + // Versions should be set to storage. The returned code needs validation because generation + // has been incremented. for key in 0..capacity { - let code = assert_some!(global_cache.get(&key)); - assert!(code.version().is_none()) + let (code, needs_validation) = assert_some!(global_cache.get(&key)); + assert!(code.version().is_none()); + assert!(needs_validation); } // Too many modules added, the cache should be flushed. let new_modules = vec![(11, mock_verified_code(11, None))]; - let result = global_cache.insert_verified_unchecked(new_modules.into_iter()); + let result = global_cache + .insert_verified_and_increment_generation_unchecked(new_modules.into_iter()); assert!(result.is_ok()); assert_eq!(global_cache.size(), 0); + assert_eq!(global_cache.generation(), 2); // Should not add deserialized code. let deserialized_modules = vec![(0, mock_deserialized_code(0, None))]; - assert_ok!(global_cache.insert_verified_unchecked(deserialized_modules.into_iter())); + assert_ok!(global_cache + .insert_verified_and_increment_generation_unchecked(deserialized_modules.into_iter())); assert_eq!(global_cache.size(), 0); + assert_eq!(global_cache.generation(), 3); // Should not override valid modules. global_cache.insert(0, mock_verified_code(0, None)); let new_modules = vec![(0, mock_verified_code(100, None))]; - assert_err!(global_cache.insert_verified_unchecked(new_modules.into_iter())); + assert_err!(global_cache + .insert_verified_and_increment_generation_unchecked(new_modules.into_iter())); + assert_eq!(global_cache.generation(), 3); // Can override invalid modules. global_cache.mark_invalid(&0); let new_modules = vec![(0, mock_verified_code(100, None))]; - let result = global_cache.insert_verified_unchecked(new_modules.into_iter()); + let result = global_cache + .insert_verified_and_increment_generation_unchecked(new_modules.into_iter()); assert!(result.is_ok()); assert_eq!(global_cache.size(), 1); + assert_eq!(global_cache.generation(), 4); + + // Generation incremented even if there are no code publishes. + let result = + global_cache.insert_verified_and_increment_generation_unchecked(vec![].into_iter()); + assert!(result.is_ok()); + assert_eq!(global_cache.generation(), 5); } } diff --git a/aptos-move/block-executor/src/executor.rs b/aptos-move/block-executor/src/executor.rs index eb7965ace5adb..d800e03b87aaf 100644 --- a/aptos-move/block-executor/src/executor.rs +++ b/aptos-move/block-executor/src/executor.rs @@ -1169,7 +1169,7 @@ where counters::update_state_counters(versioned_cache.stats(), true); self.global_module_cache - .insert_verified_unchecked(versioned_cache.take_modules_iter()) + .insert_verified_and_increment_generation_unchecked(versioned_cache.take_modules_iter()) .map_err(|err| { alert!("[BlockSTM] Encountered panic error: {:?}", err); })?; @@ -1652,7 +1652,7 @@ where counters::update_state_counters(unsync_map.stats(), false); self.global_module_cache - .insert_verified_unchecked(unsync_map.into_modules_iter())?; + .insert_verified_and_increment_generation_unchecked(unsync_map.into_modules_iter())?; let block_end_info = if self .config diff --git a/aptos-move/e2e-tests/src/executor.rs b/aptos-move/e2e-tests/src/executor.rs index 4c44a7e585c63..e326cc0762cbf 100644 --- a/aptos-move/e2e-tests/src/executor.rs +++ b/aptos-move/e2e-tests/src/executor.rs @@ -636,7 +636,12 @@ impl FakeExecutor { }, onchain: onchain_config, }; - BlockAptosVM::execute_block_on_thread_pool_without_global_module_cache::< + + // Note: + // This uses a shared global module cache, but it is ok because we validate it against + // the state for every block, ensuring consistency of cached data. This way all tests + // implicitly test cache implementation. + BlockAptosVM::execute_block_on_thread_pool::< _, NoOpTransactionCommitHook, >( diff --git a/types/src/state_store/state_value.rs b/types/src/state_store/state_value.rs index d69b8b3fc8ec5..86665fdcd19cc 100644 --- a/types/src/state_store/state_value.rs +++ b/types/src/state_store/state_value.rs @@ -268,6 +268,10 @@ impl StateValue { &self.inner.data } + pub fn metadata(&self) -> &StateValueMetadata { + &self.inner.metadata + } + /// Applies a bytes-to-bytes transformation on the state value contents, /// leaving the state value metadata untouched. pub fn map_bytes anyhow::Result>( diff --git a/types/src/vm/modules.rs b/types/src/vm/modules.rs index 96a4540f4cc5d..a6c0da7923061 100644 --- a/types/src/vm/modules.rs +++ b/types/src/vm/modules.rs @@ -10,36 +10,28 @@ use move_vm_types::{ /// Additional data stored alongside deserialized or verified modules. pub struct AptosModuleExtension { - /// Serialized representation of the module. - bytes: Bytes, /// Module's hash. hash: [u8; 32], - /// The state value metadata associated with the module, when read from or - /// written to storage. - state_value_metadata: StateValueMetadata, + /// The original state value associated with the module, when read from or written to storage. + state_value: StateValue, } impl AptosModuleExtension { /// Creates new extension based on [StateValue]. pub fn new(state_value: StateValue) -> Self { - let (state_value_metadata, bytes) = state_value.unpack(); - let hash = sha3_256(&bytes); - Self { - bytes, - hash, - state_value_metadata, - } + let hash = sha3_256(state_value.bytes()); + Self { hash, state_value } } - /// Returns the state value metadata stored in extension. + /// Returns state value metadata stored in extension. pub fn state_value_metadata(&self) -> &StateValueMetadata { - &self.state_value_metadata + self.state_value.metadata() } } impl WithBytes for AptosModuleExtension { fn bytes(&self) -> &Bytes { - &self.bytes + self.state_value.bytes() } } @@ -48,3 +40,11 @@ impl WithHash for AptosModuleExtension { &self.hash } } + +impl PartialEq for AptosModuleExtension { + fn eq(&self, other: &Self) -> bool { + self.hash.eq(&other.hash) && self.state_value_metadata().eq(other.state_value_metadata()) + } +} + +impl Eq for AptosModuleExtension {}