Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[loader] Generation counters to keep global module cache in sync #15167

Open
wants to merge 3 commits into
base: george/enable-loader-v2
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
55 changes: 47 additions & 8 deletions aptos-move/aptos-vm/src/block_executor/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -448,7 +448,7 @@ impl BlockExecutorTransactionOutput for AptosTransactionOutput {
pub struct BlockAptosVM;

impl BlockAptosVM {
fn execute_block_on_thread_pool<
pub fn execute_block_on_thread_pool_with_module_cache<
S: StateView + Sync,
L: TransactionCommitHook<Output = AptosTransactionOutput>,
>(
Expand Down Expand Up @@ -520,7 +520,8 @@ impl BlockAptosVM {
}
}

pub fn execute_block_on_thread_pool_without_global_module_cache<
/// Uses shared global module cache to execute blocks.
pub fn execute_block_on_thread_pool<
S: StateView + Sync,
L: TransactionCommitHook<Output = AptosTransactionOutput>,
>(
Expand All @@ -530,11 +531,11 @@ impl BlockAptosVM {
config: BlockExecutorConfig,
transaction_commit_listener: Option<L>,
) -> Result<BlockOutput<TransactionOutput>, VMStatus> {
Self::execute_block_on_thread_pool::<S, L>(
Self::execute_block_on_thread_pool_with_module_cache::<S, L>(
executor_thread_pool,
signature_verified_block,
state_view,
Arc::new(ImmutableModuleCache::empty()),
Arc::clone(&GLOBAL_MODULE_CACHE),
config,
transaction_commit_listener,
)
Expand All @@ -550,7 +551,7 @@ impl BlockAptosVM {
config: BlockExecutorConfig,
transaction_commit_listener: Option<L>,
) -> Result<BlockOutput<TransactionOutput>, VMStatus> {
Self::execute_block_on_thread_pool::<S, L>(
Self::execute_block_on_thread_pool_with_module_cache::<S, L>(
Arc::clone(&RAYON_EXEC_POOL),
signature_verified_block,
state_view,
Expand All @@ -564,15 +565,21 @@ impl BlockAptosVM {
#[cfg(test)]
mod test {
use super::*;
use aptos_block_executor::code_cache_global::ImmutableModuleCache;
use aptos_block_executor::{
code_cache_global::ImmutableModuleCache, txn_commit_hook::NoOpTransactionCommitHook,
};
use aptos_crypto::HashValue;
use aptos_language_e2e_tests::data_store::FakeDataStore;
use aptos_types::on_chain_config::{FeatureFlag, Features};
use aptos_types::{
on_chain_config::{FeatureFlag, Features},
transaction::Transaction,
};
use aptos_vm_environment::environment::AptosEnvironment;
use claims::assert_ok;
use move_vm_types::code::mock_verified_code;

#[test]
fn test_cross_block_module_cache_flush() {
fn test_global_module_cache_flushed_when_features_change() {
let global_module_cache = ImmutableModuleCache::empty();

global_module_cache.insert(0, mock_verified_code(0, None));
Expand Down Expand Up @@ -603,4 +610,36 @@ mod test {
assert!(env_old != env_new);
assert_eq!(global_module_cache.size(), 0);
}

#[test]
fn global_module_cache_generation_is_incremented_per_block() {
let global_module_cache = Arc::new(ImmutableModuleCache::empty());
assert_eq!(global_module_cache.generation(), 0);

let concurrency_level = num_cpus::get();
let executor_thread_pool = Arc::new(
rayon::ThreadPoolBuilder::new()
.num_threads(concurrency_level)
.build()
.unwrap(),
);
let config = BlockExecutorConfig::new_no_block_limit(concurrency_level);

// Put at least one transaction in the block so that the block executor does not return
// early.
let block = vec![SignatureVerifiedTransaction::Invalid(
Transaction::StateCheckpoint(HashValue::zero()),
)];

let result = BlockAptosVM::execute_block_on_thread_pool_with_module_cache(
executor_thread_pool,
&block,
&FakeDataStore::default(),
global_module_cache.clone(),
config,
None::<NoOpTransactionCommitHook<AptosTransactionOutput, VMStatus>>,
);
assert_ok!(result);
assert_eq!(global_module_cache.generation(), 1);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ use crate::{
ExecutorShardCommand,
},
};
use aptos_block_executor::code_cache_global::ImmutableModuleCache;
use aptos_logger::{info, trace};
use aptos_types::{
block_executor::{
Expand Down Expand Up @@ -135,10 +136,15 @@ impl<S: StateView + Sync + Send + 'static> ShardedExecutorService<S> {
);
});
s.spawn(move |_| {
let ret = BlockAptosVM::execute_block_on_thread_pool_without_global_module_cache(
// Use empty global module cache to avoid undefined behaviour when it is mutated
// concurrently.
let empty_module_cache = Arc::new(ImmutableModuleCache::empty());

let ret = BlockAptosVM::execute_block_on_thread_pool_with_module_cache(
executor_thread_pool,
&signature_verified_transactions,
aggr_overridden_state_view.as_ref(),
empty_module_cache,
config,
cross_shard_commit_sender,
)
Expand Down
44 changes: 34 additions & 10 deletions aptos-move/block-executor/src/code_cache.rs
Original file line number Diff line number Diff line change
Expand Up @@ -145,17 +145,28 @@ impl<'a, T: Transaction, S: TStateView<Key = T::Key>, X: Executable> ModuleCache
> {
// First, look up the module in the cross-block global module cache. Record the read for
// later validation in case the read module is republished.
if let Some(module) = self.global_module_cache.get(key) {
match &self.latest_view {
ViewState::Sync(state) => state
.captured_reads
.borrow_mut()
.capture_global_cache_read(key.clone()),
ViewState::Unsync(state) => {
state.read_set.borrow_mut().capture_module_read(key.clone())
},
if let Some((module, needs_validation)) = self.global_module_cache.get(key) {
// If we do not need to validate global cache, return early.
if !needs_validation {
self.capture_global_cache_read(key);
return Ok(Some(module.clone()));
}

// Otherwise, this is the first time this module gets accessed in this block. We need
// to validate it is the same as in the state. We do it only once on the first access.
let is_valid = builder
.build(key)?
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this builds the whole thing from scratch? in theory we only need to check if the hash/bytes matches raw data from storage and skip the actual build part?

.is_some_and(|m| m.extension() == module.extension());
if is_valid {
// This module is valid for this block, mark as so.
self.global_module_cache.set_generation(key);
self.capture_global_cache_read(key);
return Ok(Some(module.clone()));
}
return Ok(Some(module.clone()));

// Validation failed, global cache is not consistent with the state! Mark the entry as
// invalid and fall-back to slow path via sync module cache.
self.global_module_cache.mark_invalid(key);
}

// Global cache miss: check module cache in versioned/unsync maps.
Expand Down Expand Up @@ -241,4 +252,17 @@ impl<'a, T: Transaction, S: TStateView<Key = T::Key>, X: Executable> LatestView<
ViewState::Unsync(state) => state.unsync_map.module_cache(),
}
}

/// Captures the read from global module cache.
fn capture_global_cache_read(&self, key: &ModuleId) {
match &self.latest_view {
ViewState::Sync(state) => state
.captured_reads
.borrow_mut()
.capture_global_cache_read(key.clone()),
ViewState::Unsync(state) => {
state.read_set.borrow_mut().capture_module_read(key.clone())
},
}
}
}
Loading
Loading