diff --git a/Cargo.lock b/Cargo.lock index dbf4ca7e449..327ee846709 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -569,9 +569,9 @@ dependencies = [ "lru", "maplit", "merkle_proof", - "oneshot_broadcast", "operation_pool", "parking_lot 0.12.1", + "promise_cache", "proto_array", "rand 0.8.5", "rayon", @@ -6160,6 +6160,16 @@ dependencies = [ "syn 2.0.48", ] +[[package]] +name = "promise_cache" +version = "0.1.0" +dependencies = [ + "derivative", + "itertools", + "oneshot_broadcast", + "slog", +] + [[package]] name = "proto_array" version = "0.2.0" diff --git a/Cargo.toml b/Cargo.toml index e962ee5e86a..c5d304d73e5 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -37,6 +37,7 @@ members = [ "common/malloc_utils", "common/oneshot_broadcast", "common/pretty_reqwest_error", + "common/promise_cache", "common/sensitive_url", "common/slot_clock", "common/system_health", diff --git a/beacon_node/beacon_chain/Cargo.toml b/beacon_node/beacon_chain/Cargo.toml index 419918de9d9..aa169f663dc 100644 --- a/beacon_node/beacon_chain/Cargo.toml +++ b/beacon_node/beacon_chain/Cargo.toml @@ -43,9 +43,9 @@ lighthouse_metrics = { workspace = true } logging = { workspace = true } lru = { workspace = true } merkle_proof = { workspace = true } -oneshot_broadcast = { path = "../../common/oneshot_broadcast/" } operation_pool = { workspace = true } parking_lot = { workspace = true } +promise_cache = { path = "../../common/promise_cache" } proto_array = { workspace = true } rand = { workspace = true } rayon = { workspace = true } diff --git a/beacon_node/beacon_chain/src/beacon_chain.rs b/beacon_node/beacon_chain/src/beacon_chain.rs index 7190626c181..2f2723f1f14 100644 --- a/beacon_node/beacon_chain/src/beacon_chain.rs +++ b/beacon_node/beacon_chain/src/beacon_chain.rs @@ -53,6 +53,7 @@ use crate::observed_blob_sidecars::ObservedBlobSidecars; use crate::observed_block_producers::ObservedBlockProducers; use crate::observed_operations::{ObservationOutcome, ObservedOperations}; use crate::observed_slashable::ObservedSlashable; +use crate::parallel_state_cache::ParallelStateCache; use crate::persisted_beacon_chain::{PersistedBeaconChain, DUMMY_CANONICAL_HEAD_BLOCK_ROOT}; use crate::persisted_fork_choice::PersistedForkChoice; use crate::pre_finalization_cache::PreFinalizationBlockCache; @@ -460,6 +461,10 @@ pub struct BeaconChain { pub block_times_cache: Arc>, /// A cache used to track pre-finalization block roots for quick rejection. pub pre_finalization_block_cache: PreFinalizationBlockCache, + /// A cache used to de-duplicate HTTP state requests. + /// + /// The cache is keyed by `state_root`. + pub parallel_state_cache: Arc>>, /// Sender given to tasks, so that if they encounter a state in which execution cannot /// continue they can request that everything shuts down. pub shutdown_sender: Sender, @@ -3868,7 +3873,7 @@ impl BeaconChain { self.shuffling_cache .try_write_for(ATTESTATION_CACHE_LOCK_TIMEOUT) .ok_or(Error::AttestationCacheLockTimeout)? - .insert_committee_cache(shuffling_id, committee_cache); + .insert_value(shuffling_id, committee_cache); } } Ok(()) @@ -6041,7 +6046,7 @@ impl BeaconChain { // access. drop(shuffling_cache); - let committee_cache = cache_item.wait()?; + let committee_cache = cache_item.wait().map_err(Error::ShufflingCacheError)?; map_fn(&committee_cache, shuffling_id.shuffling_decision_block) } else { // Create an entry in the cache that "promises" this value will eventually be computed. @@ -6050,7 +6055,9 @@ impl BeaconChain { // // Creating the promise whilst we hold the `shuffling_cache` lock will prevent the same // promise from being created twice. - let sender = shuffling_cache.create_promise(shuffling_id.clone())?; + let sender = shuffling_cache + .create_promise(shuffling_id.clone()) + .map_err(Error::ShufflingCacheError)?; // Drop the shuffling cache to avoid holding the lock for any longer than // required. @@ -6144,7 +6151,7 @@ impl BeaconChain { self.shuffling_cache .try_write_for(ATTESTATION_CACHE_LOCK_TIMEOUT) .ok_or(Error::AttestationCacheLockTimeout)? - .insert_committee_cache(shuffling_id, &committee_cache); + .insert_value(shuffling_id, &committee_cache); metrics::stop_timer(committee_building_timer); @@ -6446,6 +6453,10 @@ impl BeaconChain { self.data_availability_checker.data_availability_boundary() } + pub fn logger(&self) -> &Logger { + &self.log + } + /// Gets the `LightClientBootstrap` object for a requested block root. /// /// Returns `None` when the state or block is not found in the database. diff --git a/beacon_node/beacon_chain/src/builder.rs b/beacon_node/beacon_chain/src/builder.rs index 322bfbd208c..3ce8040efe7 100644 --- a/beacon_node/beacon_chain/src/builder.rs +++ b/beacon_node/beacon_chain/src/builder.rs @@ -23,6 +23,7 @@ use futures::channel::mpsc::Sender; use kzg::{Kzg, TrustedSetup}; use operation_pool::{OperationPool, PersistedOperationPool}; use parking_lot::{Mutex, RwLock}; +use promise_cache::PromiseCache; use proto_array::{DisallowedReOrgOffsets, ReOrgThreshold}; use slasher::Slasher; use slog::{crit, debug, error, info, o, Logger}; @@ -851,6 +852,7 @@ where let genesis_time = head_snapshot.beacon_state.genesis_time(); let canonical_head = CanonicalHead::new(fork_choice, Arc::new(head_snapshot)); let shuffling_cache_size = self.chain_config.shuffling_cache_size; + let parallel_state_cache_size = self.chain_config.parallel_state_cache_size; // Calculate the weak subjectivity point in which to backfill blocks to. let genesis_backfill_slot = if self.chain_config.genesis_backfill { @@ -933,6 +935,11 @@ where beacon_proposer_cache, block_times_cache: <_>::default(), pre_finalization_block_cache: <_>::default(), + parallel_state_cache: Arc::new(RwLock::new(PromiseCache::new( + parallel_state_cache_size, + Default::default(), + log.clone(), + ))), validator_pubkey_cache, attester_cache: <_>::default(), early_attester_cache: <_>::default(), diff --git a/beacon_node/beacon_chain/src/canonical_head.rs b/beacon_node/beacon_chain/src/canonical_head.rs index e986b0bfeac..e54ea6d49ca 100644 --- a/beacon_node/beacon_chain/src/canonical_head.rs +++ b/beacon_node/beacon_chain/src/canonical_head.rs @@ -820,9 +820,7 @@ impl BeaconChain { Ok(head_shuffling_ids) => { self.shuffling_cache .try_write_for(ATTESTATION_CACHE_LOCK_TIMEOUT) - .map(|mut shuffling_cache| { - shuffling_cache.update_head_shuffling_ids(head_shuffling_ids) - }) + .map(|mut shuffling_cache| shuffling_cache.update_protector(head_shuffling_ids)) .unwrap_or_else(|| { error!( self.log, diff --git a/beacon_node/beacon_chain/src/chain_config.rs b/beacon_node/beacon_chain/src/chain_config.rs index 7bcb764ab0c..79d8f2a4419 100644 --- a/beacon_node/beacon_chain/src/chain_config.rs +++ b/beacon_node/beacon_chain/src/chain_config.rs @@ -15,6 +15,9 @@ pub const DEFAULT_PREPARE_PAYLOAD_LOOKAHEAD_FACTOR: u32 = 3; /// Fraction of a slot lookahead for fork choice in the state advance timer (500ms on mainnet). pub const FORK_CHOICE_LOOKAHEAD_FACTOR: u32 = 24; +/// Cache only a small number of states in the parallel cache by default. +pub const DEFAULT_PARALLEL_STATE_CACHE_SIZE: usize = 2; + #[derive(Debug, PartialEq, Eq, Clone, Deserialize, Serialize)] pub struct ChainConfig { /// Maximum number of slots to skip when importing an attestation. @@ -83,6 +86,8 @@ pub struct ChainConfig { pub progressive_balances_mode: ProgressiveBalancesMode, /// Number of epochs between each migration of data from the hot database to the freezer. pub epochs_per_migration: u64, + /// Size of the promise cache for de-duplicating parallel state requests. + pub parallel_state_cache_size: usize, } impl Default for ChainConfig { @@ -114,6 +119,7 @@ impl Default for ChainConfig { always_prepare_payload: false, progressive_balances_mode: ProgressiveBalancesMode::Fast, epochs_per_migration: crate::migrate::DEFAULT_EPOCHS_PER_MIGRATION, + parallel_state_cache_size: DEFAULT_PARALLEL_STATE_CACHE_SIZE, } } } diff --git a/beacon_node/beacon_chain/src/errors.rs b/beacon_node/beacon_chain/src/errors.rs index 1f92bd5b03f..6d2a178faa7 100644 --- a/beacon_node/beacon_chain/src/errors.rs +++ b/beacon_node/beacon_chain/src/errors.rs @@ -211,8 +211,7 @@ pub enum BeaconChainError { }, AttestationHeadNotInForkChoice(Hash256), MissingPersistedForkChoice, - CommitteePromiseFailed(oneshot_broadcast::Error), - MaxCommitteePromises(usize), + ShufflingCacheError(promise_cache::PromiseCacheError), BlsToExecutionPriorToCapella, BlsToExecutionConflictsWithPool, InconsistentFork(InconsistentFork), diff --git a/beacon_node/beacon_chain/src/lib.rs b/beacon_node/beacon_chain/src/lib.rs index 59d006180cc..0cfd9927754 100644 --- a/beacon_node/beacon_chain/src/lib.rs +++ b/beacon_node/beacon_chain/src/lib.rs @@ -42,6 +42,7 @@ pub mod observed_block_producers; pub mod observed_operations; mod observed_slashable; pub mod otb_verification_service; +mod parallel_state_cache; mod persisted_beacon_chain; mod persisted_fork_choice; mod pre_finalization_cache; diff --git a/beacon_node/beacon_chain/src/parallel_state_cache.rs b/beacon_node/beacon_chain/src/parallel_state_cache.rs new file mode 100644 index 00000000000..d568d3248cd --- /dev/null +++ b/beacon_node/beacon_chain/src/parallel_state_cache.rs @@ -0,0 +1,22 @@ +use promise_cache::{PromiseCache, Protect}; +use types::{BeaconState, Hash256}; + +#[derive(Debug, Default)] +pub struct ParallelStateProtector; + +impl Protect for ParallelStateProtector { + type SortKey = usize; + + /// Evict in arbitrary (hashmap) order by using the same key for every value. + fn sort_key(&self, _: &Hash256) -> Self::SortKey { + 0 + } + + /// We don't care too much about preventing evictions of particular states here. All the states + /// in this cache should be different from the head state. + fn protect_from_eviction(&self, _: &Hash256) -> bool { + false + } +} + +pub type ParallelStateCache = PromiseCache, ParallelStateProtector>; diff --git a/beacon_node/beacon_chain/src/shuffling_cache.rs b/beacon_node/beacon_chain/src/shuffling_cache.rs index 956520cdcfc..64b24ccd043 100644 --- a/beacon_node/beacon_chain/src/shuffling_cache.rs +++ b/beacon_node/beacon_chain/src/shuffling_cache.rs @@ -1,240 +1,53 @@ -use std::collections::HashMap; -use std::sync::Arc; - -use itertools::Itertools; +use promise_cache::{PromiseCache, Protect}; use slog::{debug, Logger}; - -use oneshot_broadcast::{oneshot, Receiver, Sender}; use types::{ beacon_state::CommitteeCache, AttestationShufflingId, BeaconState, Epoch, EthSpec, Hash256, RelativeEpoch, }; -use crate::{metrics, BeaconChainError}; - /// The size of the cache that stores committee caches for quicker verification. /// /// Each entry should be `8 + 800,000 = 800,008` bytes in size with 100k validators. (8-byte hash + /// 100k indices). Therefore, this cache should be approx `16 * 800,008 = 12.8 MB`. (Note: this /// ignores a few extra bytes in the caches that should be insignificant compared to the indices). -pub const DEFAULT_CACHE_SIZE: usize = 16; - -/// The maximum number of concurrent committee cache "promises" that can be issued. In effect, this -/// limits the number of concurrent states that can be loaded into memory for the committee cache. -/// This prevents excessive memory usage at the cost of rejecting some attestations. +/// +/// The cache size also determines the maximum number of concurrent committee cache "promises" that +/// can be issued. In effect, this limits the number of concurrent states that can be loaded into +/// memory for the committee cache. This prevents excessive memory usage at the cost of rejecting +/// some attestations. /// /// We set this value to 2 since states can be quite large and have a significant impact on memory /// usage. A healthy network cannot have more than a few committee caches and those caches should /// always be inserted during block import. Unstable networks with a high degree of forking might /// see some attestations dropped due to this concurrency limit, however I propose that this is /// better than low-resource nodes going OOM. -const MAX_CONCURRENT_PROMISES: usize = 2; - -#[derive(Clone)] -pub enum CacheItem { - /// A committee. - Committee(Arc), - /// A promise for a future committee. - Promise(Receiver>), -} - -impl CacheItem { - pub fn is_promise(&self) -> bool { - matches!(self, CacheItem::Promise(_)) - } - - pub fn wait(self) -> Result, BeaconChainError> { - match self { - CacheItem::Committee(cache) => Ok(cache), - CacheItem::Promise(receiver) => receiver - .recv() - .map_err(BeaconChainError::CommitteePromiseFailed), - } - } -} - -/// Provides a cache for `CommitteeCache`. -/// -/// It has been named `ShufflingCache` because `CommitteeCacheCache` is a bit weird and looks like -/// a find/replace error. -pub struct ShufflingCache { - cache: HashMap, - cache_size: usize, - head_shuffling_ids: BlockShufflingIds, - logger: Logger, -} - -impl ShufflingCache { - pub fn new(cache_size: usize, head_shuffling_ids: BlockShufflingIds, logger: Logger) -> Self { - Self { - cache: HashMap::new(), - cache_size, - head_shuffling_ids, - logger, - } - } - - pub fn get(&mut self, key: &AttestationShufflingId) -> Option { - match self.cache.get(key) { - // The cache contained the committee cache, return it. - item @ Some(CacheItem::Committee(_)) => { - metrics::inc_counter(&metrics::SHUFFLING_CACHE_HITS); - item.cloned() - } - // The cache contains a promise for the committee cache. Check to see if the promise has - // already been resolved, without waiting for it. - item @ Some(CacheItem::Promise(receiver)) => match receiver.try_recv() { - // The promise has already been resolved. Replace the entry in the cache with a - // `Committee` entry and then return the committee. - Ok(Some(committee)) => { - metrics::inc_counter(&metrics::SHUFFLING_CACHE_PROMISE_HITS); - metrics::inc_counter(&metrics::SHUFFLING_CACHE_HITS); - let ready = CacheItem::Committee(committee); - self.insert_cache_item(key.clone(), ready.clone()); - Some(ready) - } - // The promise has not yet been resolved. Return the promise so the caller can await - // it. - Ok(None) => { - metrics::inc_counter(&metrics::SHUFFLING_CACHE_PROMISE_HITS); - metrics::inc_counter(&metrics::SHUFFLING_CACHE_HITS); - item.cloned() - } - // The sender has been dropped without sending a committee. There was most likely an - // error computing the committee cache. Drop the key from the cache and return - // `None` so the caller can recompute the committee. - // - // It's worth noting that this is the only place where we removed unresolved - // promises from the cache. This means unresolved promises will only be removed if - // we try to access them again. This is OK, since the promises don't consume much - // memory. We expect that *all* promises should be resolved, unless there is a - // programming or database error. - Err(oneshot_broadcast::Error::SenderDropped) => { - metrics::inc_counter(&metrics::SHUFFLING_CACHE_PROMISE_FAILS); - metrics::inc_counter(&metrics::SHUFFLING_CACHE_MISSES); - self.cache.remove(key); - None - } - }, - // The cache does not have this committee and it's not already promised to be computed. - None => { - metrics::inc_counter(&metrics::SHUFFLING_CACHE_MISSES); - None - } - } - } - - pub fn contains(&self, key: &AttestationShufflingId) -> bool { - self.cache.contains_key(key) - } - - pub fn insert_committee_cache( - &mut self, - key: AttestationShufflingId, - committee_cache: &C, - ) { - if self - .cache - .get(&key) - // Replace the committee if it's not present or if it's a promise. A bird in the hand is - // worth two in the promise-bush! - .map_or(true, CacheItem::is_promise) - { - self.insert_cache_item( - key, - CacheItem::Committee(committee_cache.to_arc_committee_cache()), - ); - } - } - - /// Prunes the cache first before inserting a new cache item. - fn insert_cache_item(&mut self, key: AttestationShufflingId, cache_item: CacheItem) { - self.prune_cache(); - self.cache.insert(key, cache_item); - } - - /// Prunes the `cache` to keep the size below the `cache_size` limit, based on the following - /// preferences: - /// - Entries from more recent epochs are preferred over older ones. - /// - Entries with shuffling ids matching the head's previous, current, and future epochs must - /// not be pruned. - fn prune_cache(&mut self) { - let target_cache_size = self.cache_size.saturating_sub(1); - if let Some(prune_count) = self.cache.len().checked_sub(target_cache_size) { - let shuffling_ids_to_prune = self - .cache - .keys() - .sorted_by_key(|key| key.shuffling_epoch) - .filter(|shuffling_id| { - Some(shuffling_id) - != self - .head_shuffling_ids - .id_for_epoch(shuffling_id.shuffling_epoch) - .as_ref() - .as_ref() - }) - .take(prune_count) - .cloned() - .collect::>(); - - for shuffling_id in shuffling_ids_to_prune.iter() { - debug!( - self.logger, - "Removing old shuffling from cache"; - "shuffling_epoch" => shuffling_id.shuffling_epoch, - "shuffling_decision_block" => ?shuffling_id.shuffling_decision_block - ); - self.cache.remove(shuffling_id); - } - } - } +pub const DEFAULT_CACHE_SIZE: usize = 16; - pub fn create_promise( - &mut self, - key: AttestationShufflingId, - ) -> Result>, BeaconChainError> { - let num_active_promises = self - .cache - .iter() - .filter(|(_, item)| item.is_promise()) - .count(); - if num_active_promises >= MAX_CONCURRENT_PROMISES { - return Err(BeaconChainError::MaxCommitteePromises(num_active_promises)); - } +impl Protect for BlockShufflingIds { + type SortKey = Epoch; - let (sender, receiver) = oneshot(); - self.insert_cache_item(key, CacheItem::Promise(receiver)); - Ok(sender) + fn sort_key(&self, k: &AttestationShufflingId) -> Epoch { + k.shuffling_epoch } - /// Inform the cache that the shuffling decision roots for the head has changed. - /// - /// The shufflings for the head's previous, current, and future epochs will never be ejected from - /// the cache during `Self::insert_cache_item`. - pub fn update_head_shuffling_ids(&mut self, head_shuffling_ids: BlockShufflingIds) { - self.head_shuffling_ids = head_shuffling_ids; + fn protect_from_eviction(&self, shuffling_id: &AttestationShufflingId) -> bool { + Some(shuffling_id) == self.id_for_epoch(shuffling_id.shuffling_epoch).as_ref() } -} -/// A helper trait to allow lazy-cloning of the committee cache when inserting into the cache. -pub trait ToArcCommitteeCache { - fn to_arc_committee_cache(&self) -> Arc; -} - -impl ToArcCommitteeCache for CommitteeCache { - fn to_arc_committee_cache(&self) -> Arc { - Arc::new(self.clone()) + fn notify_eviction(&self, shuffling_id: &AttestationShufflingId, logger: &Logger) { + debug!( + logger, + "Removing old shuffling from cache"; + "shuffling_epoch" => shuffling_id.shuffling_epoch, + "shuffling_decision_block" => ?shuffling_id.shuffling_decision_block + ); } } -impl ToArcCommitteeCache for Arc { - fn to_arc_committee_cache(&self) -> Arc { - self.clone() - } -} +pub type ShufflingCache = PromiseCache; /// Contains the shuffling IDs for a beacon block. -#[derive(Clone)] +#[derive(Debug, Clone)] pub struct BlockShufflingIds { pub current: AttestationShufflingId, pub next: AttestationShufflingId, @@ -294,13 +107,13 @@ impl BlockShufflingIds { #[cfg(not(debug_assertions))] #[cfg(test)] mod test { + use super::*; + use crate::test_utils::EphemeralHarnessType; + use promise_cache::{CacheItem, PromiseCacheError}; + use std::sync::Arc; use task_executor::test_utils::null_logger; use types::*; - use crate::test_utils::EphemeralHarnessType; - - use super::*; - type E = MinimalEthSpec; type TestBeaconChainType = EphemeralHarnessType; type BeaconChainHarness = crate::test_utils::BeaconChainHarness; @@ -372,10 +185,10 @@ mod test { // Ensure the promise has been resolved. let item = cache.get(&id_a).unwrap(); assert!( - matches!(item, CacheItem::Committee(committee) if committee == committee_a), + matches!(item, CacheItem::Complete(committee) if committee == committee_a), "the promise should be resolved" ); - assert_eq!(cache.cache.len(), 1, "the cache should have one entry"); + assert_eq!(cache.len(), 1, "the cache should have one entry"); } #[test] @@ -399,7 +212,7 @@ mod test { // Ensure the key now indicates an empty slot. assert!(cache.get(&id_a).is_none(), "the slot should be empty"); - assert!(cache.cache.is_empty(), "the cache should be empty"); + assert!(cache.is_empty(), "the cache should be empty"); } #[test] @@ -433,7 +246,7 @@ mod test { // Ensure promise A has been resolved. let item = cache.get(&id_a).unwrap(); assert!( - matches!(item, CacheItem::Committee(committee) if committee == committee_a), + matches!(item, CacheItem::Complete(committee) if committee == committee_a), "promise A should be resolved" ); @@ -442,41 +255,40 @@ mod test { // Ensure promise B has been resolved. let item = cache.get(&id_b).unwrap(); assert!( - matches!(item, CacheItem::Committee(committee) if committee == committee_b), + matches!(item, CacheItem::Complete(committee) if committee == committee_b), "promise B should be resolved" ); // Check both entries again. assert!( - matches!(cache.get(&id_a).unwrap(), CacheItem::Committee(committee) if committee == committee_a), + matches!(cache.get(&id_a).unwrap(), CacheItem::Complete(committee) if committee == committee_a), "promise A should remain resolved" ); assert!( - matches!(cache.get(&id_b).unwrap(), CacheItem::Committee(committee) if committee == committee_b), + matches!(cache.get(&id_b).unwrap(), CacheItem::Complete(committee) if committee == committee_b), "promise B should remain resolved" ); - assert_eq!(cache.cache.len(), 2, "the cache should have two entries"); + assert_eq!(cache.len(), 2, "the cache should have two entries"); } #[test] fn too_many_promises() { let mut cache = new_shuffling_cache(); - for i in 0..MAX_CONCURRENT_PROMISES { + for i in 0..cache.max_concurrent_promises() { cache.create_promise(shuffling_id(i as u64)).unwrap(); } // Ensure that the next promise returns an error. It is important for the application to // dump his ass when he can't keep his promises, you're a queen and you deserve better. assert!(matches!( - cache.create_promise(shuffling_id(MAX_CONCURRENT_PROMISES as u64)), - Err(BeaconChainError::MaxCommitteePromises( - MAX_CONCURRENT_PROMISES - )) + cache.create_promise(shuffling_id(cache.max_concurrent_promises() as u64)), + Err(PromiseCacheError::MaxConcurrentPromises(n)) + if n == cache.max_concurrent_promises() )); assert_eq!( - cache.cache.len(), - MAX_CONCURRENT_PROMISES, + cache.len(), + cache.max_concurrent_promises(), "the cache should have two entries" ); } @@ -486,9 +298,9 @@ mod test { let mut cache = new_shuffling_cache(); let id_a = shuffling_id(1); let committee_cache_a = Arc::new(CommitteeCache::default()); - cache.insert_committee_cache(id_a.clone(), &committee_cache_a); + cache.insert_value(id_a.clone(), &committee_cache_a); assert!( - matches!(cache.get(&id_a).unwrap(), CacheItem::Committee(committee_cache) if committee_cache == committee_cache_a), + matches!(cache.get(&id_a).unwrap(), CacheItem::Complete(committee_cache) if committee_cache == committee_cache_a), "should insert committee cache" ); } @@ -501,7 +313,7 @@ mod test { .collect::>(); for (shuffling_id, committee_cache) in shuffling_id_and_committee_caches.iter() { - cache.insert_committee_cache(shuffling_id.clone(), committee_cache); + cache.insert_value(shuffling_id.clone(), committee_cache); } for i in 1..(TEST_CACHE_SIZE + 1) { @@ -515,11 +327,7 @@ mod test { !cache.contains(&shuffling_id_and_committee_caches.get(0).unwrap().0), "should not contain oldest epoch shuffling id" ); - assert_eq!( - cache.cache.len(), - cache.cache_size, - "should limit cache size" - ); + assert_eq!(cache.len(), TEST_CACHE_SIZE, "should limit cache size"); } #[test] @@ -534,7 +342,7 @@ mod test { shuffling_epoch: (current_epoch + 1).into(), shuffling_decision_block: Hash256::from_low_u64_be(current_epoch + i as u64), }; - cache.insert_committee_cache(shuffling_id, &committee_cache); + cache.insert_value(shuffling_id, &committee_cache); } // Now, update the head shuffling ids @@ -544,12 +352,12 @@ mod test { previous: Some(shuffling_id(current_epoch - 1)), block_root: Hash256::from_low_u64_le(42), }; - cache.update_head_shuffling_ids(head_shuffling_ids.clone()); + cache.update_protector(head_shuffling_ids.clone()); // Insert head state shuffling ids. Should not be overridden by other shuffling ids. - cache.insert_committee_cache(head_shuffling_ids.current.clone(), &committee_cache); - cache.insert_committee_cache(head_shuffling_ids.next.clone(), &committee_cache); - cache.insert_committee_cache( + cache.insert_value(head_shuffling_ids.current.clone(), &committee_cache); + cache.insert_value(head_shuffling_ids.next.clone(), &committee_cache); + cache.insert_value( head_shuffling_ids.previous.clone().unwrap(), &committee_cache, ); @@ -560,7 +368,7 @@ mod test { shuffling_epoch: Epoch::from(i), shuffling_decision_block: Hash256::from_low_u64_be(i as u64), }; - cache.insert_committee_cache(shuffling_id, &committee_cache); + cache.insert_value(shuffling_id, &committee_cache); } assert!( @@ -575,10 +383,6 @@ mod test { cache.contains(&head_shuffling_ids.previous.unwrap()), "should retain head shuffling id for previous epoch." ); - assert_eq!( - cache.cache.len(), - cache.cache_size, - "should limit cache size" - ); + assert_eq!(cache.len(), TEST_CACHE_SIZE, "should limit cache size"); } } diff --git a/beacon_node/beacon_chain/src/state_advance_timer.rs b/beacon_node/beacon_chain/src/state_advance_timer.rs index 53e51515c3f..d686e6950de 100644 --- a/beacon_node/beacon_chain/src/state_advance_timer.rs +++ b/beacon_node/beacon_chain/src/state_advance_timer.rs @@ -408,7 +408,7 @@ fn advance_head( .shuffling_cache .try_write_for(ATTESTATION_CACHE_LOCK_TIMEOUT) .ok_or(BeaconChainError::AttestationCacheLockTimeout)? - .insert_committee_cache(shuffling_id.clone(), committee_cache); + .insert_value(shuffling_id.clone(), committee_cache); debug!( log, diff --git a/beacon_node/http_api/src/lib.rs b/beacon_node/http_api/src/lib.rs index a836f6216cc..3f4d2981efa 100644 --- a/beacon_node/http_api/src/lib.rs +++ b/beacon_node/http_api/src/lib.rs @@ -926,7 +926,7 @@ pub fn serve( .shuffling_cache .try_write_for(std::time::Duration::from_secs(1)) { - cache_write.insert_committee_cache( + cache_write.insert_value( shuffling_id, &possibly_built_cache, ); diff --git a/beacon_node/http_api/src/state_id.rs b/beacon_node/http_api/src/state_id.rs index fdc99fa954e..c4b721f0411 100644 --- a/beacon_node/http_api/src/state_id.rs +++ b/beacon_node/http_api/src/state_id.rs @@ -1,6 +1,7 @@ use crate::ExecutionOptimistic; use beacon_chain::{BeaconChain, BeaconChainError, BeaconChainTypes}; use eth2::types::StateId as CoreStateId; +use slog::{debug, warn}; use std::fmt; use std::str::FromStr; use types::{BeaconState, Checkpoint, EthSpec, Fork, Hash256, Slot}; @@ -187,6 +188,49 @@ impl StateId { _ => (self.root(chain)?, None), }; + let mut opt_state_cache = Some(chain.parallel_state_cache.write()); + + // Try the cache. + if let Some(cache_item) = opt_state_cache + .as_mut() + .and_then(|cache| cache.get(&state_root)) + { + drop(opt_state_cache.take()); + match cache_item.wait() { + Ok(state) => { + debug!( + chain.logger(), + "HTTP state cache hit"; + "state_root" => ?state_root, + "slot" => state.slot(), + ); + return Ok(((*state).clone(), execution_optimistic, finalized)); + } + Err(e) => { + warn!( + chain.logger(), + "State promise failed"; + "state_root" => ?state_root, + "outcome" => "re-computing", + "error" => ?e, + ); + } + } + } + + // Re-lock only in case of failed promise. + debug!( + chain.logger(), + "HTTP state cache miss"; + "state_root" => ?state_root + ); + let mut state_cache = opt_state_cache.unwrap_or_else(|| chain.parallel_state_cache.write()); + + let sender = state_cache.create_promise(state_root).map_err(|e| { + warp_utils::reject::custom_server_error(format!("too many concurrent requests: {e:?}")) + })?; + drop(state_cache); + let state = chain .get_state(&state_root, slot_opt) .map_err(warp_utils::reject::beacon_chain_error) @@ -199,6 +243,11 @@ impl StateId { }) })?; + // Fulfil promise (and re-lock again). + let mut state_cache = chain.parallel_state_cache.write(); + state_cache.resolve_promise(sender, state_root, &state); + drop(state_cache); + Ok((state, execution_optimistic, finalized)) } diff --git a/beacon_node/network/src/network_beacon_processor/gossip_methods.rs b/beacon_node/network/src/network_beacon_processor/gossip_methods.rs index 9d9b196e9be..957d539c87e 100644 --- a/beacon_node/network/src/network_beacon_processor/gossip_methods.rs +++ b/beacon_node/network/src/network_beacon_processor/gossip_methods.rs @@ -2315,7 +2315,7 @@ impl NetworkBeaconProcessor { debug!(self.log, "Attestation for finalized state"; "peer_id" => % peer_id); self.propagate_validation_result(message_id, peer_id, MessageAcceptance::Ignore); } - e @ AttnError::BeaconChainError(BeaconChainError::MaxCommitteePromises(_)) => { + AttnError::BeaconChainError(BeaconChainError::ShufflingCacheError(e)) => { debug!( self.log, "Dropping attestation"; diff --git a/beacon_node/src/cli.rs b/beacon_node/src/cli.rs index 347d37d667d..bc452cef039 100644 --- a/beacon_node/src/cli.rs +++ b/beacon_node/src/cli.rs @@ -821,6 +821,17 @@ pub fn cli_app<'a, 'b>() -> App<'a, 'b> { .takes_value(true) .default_value("0") ) + .arg( + Arg::with_name("parallel-state-cache-size") + .long("parallel-state-cache-size") + .value_name("N") + .help("Set the size of the cache used to de-duplicate requests for the same \ + state. This cache is additional to other state caches within Lighthouse \ + and should be kept small unless a large number of parallel requests for \ + different states are anticipated.") + .takes_value(true) + .default_value("2") + ) /* * Misc. diff --git a/beacon_node/src/config.rs b/beacon_node/src/config.rs index 88dc74066f1..ffb0287c522 100644 --- a/beacon_node/src/config.rs +++ b/beacon_node/src/config.rs @@ -391,6 +391,11 @@ pub fn get_config( if let Some(state_cache_size) = clap_utils::parse_optional(cli_args, "state-cache-size")? { client_config.store.state_cache_size = state_cache_size; } + if let Some(parallel_state_cache_size) = + clap_utils::parse_optional(cli_args, "parallel-state-cache-size")? + { + client_config.chain.parallel_state_cache_size = parallel_state_cache_size; + } if let Some(diff_buffer_cache_size) = clap_utils::parse_optional(cli_args, "diff-buffer-cache-size")? { diff --git a/book/src/help_bn.md b/book/src/help_bn.md index bb4d7a10c64..6dd4a8ee600 100644 --- a/book/src/help_bn.md +++ b/book/src/help_bn.md @@ -117,6 +117,8 @@ FLAGS: --eth1` pre-merge --subscribe-all-subnets Subscribe to all subnets regardless of validator count. This will also advertise the beacon node as being long-lived subscribed to all subnets. + --unsafe-and-dangerous-mode Don't use this flag unless you know what you're doing. Go back and + download a stable Lighthouse release --validator-monitor-auto Enables the automatic detection and monitoring of validators connected to the HTTP API and using the subnet subscription endpoint. This generally has the effect of providing additional logging and metrics for locally @@ -195,6 +197,9 @@ OPTIONS: --checkpoint-sync-url-timeout Set the timeout for checkpoint sync calls to remote beacon node HTTP endpoint. [default: 180] + --compression-level + Compression level (-99 to 22) for zstd compression applied to states on disk [default: 1]. You may change + the compression level freely without re-syncing. -d, --datadir Used to specify a custom root data directory for lighthouse keys and databases. Defaults to $HOME/.lighthouse/{network} where network is the value of the `network` flag Note: Users should specify @@ -202,6 +207,9 @@ OPTIONS: --debug-level Specifies the verbosity level used when emitting logs to the terminal. [default: info] [possible values: info, debug, trace, warn, error, crit] + --diff-buffer-cache-size + The maximum number of diff buffers to hold in memory. This cache is used when fetching historic states + [default: 16] --discovery-port The UDP port that discovery will listen on. Defaults to `port` @@ -238,6 +246,10 @@ OPTIONS: --epochs-per-migration The number of epochs to wait between running the migration of data from the hot DB to the cold DB. Less frequent runs can be useful for minimizing disk writes [default: 1] + --epochs-per-state-diff + Number of epochs between state diffs stored in the database. Lower values result in more writes and more + data stored, while higher values result in more block replaying and longer load times in case of cache miss. + [default: 16] --eth1-blocks-per-log-query Specifies the number of blocks that a deposit log query should span. This will reduce the size of responses from the Eth1 endpoint. [default: 1000] @@ -280,6 +292,13 @@ OPTIONS: --graffiti Specify your custom graffiti to be included in blocks. Defaults to the current version and commit, truncated to fit in 32 bytes. + --hierarchy-exponents + Specifies the frequency for storing full state snapshots and hierarchical diffs in the freezer DB. Accepts a + comma-separated list of ascending exponents. Each exponent defines an interval for storing diffs to the + layer above. The last exponent defines the interval for full snapshots. For example, a config of '4,8,12' + would store a full snapshot every 4096 (2^12) slots, first-level diffs every 256 (2^8) slots, and second- + level diffs every 16 (2^4) slots. Cannot be changed after initialization. [default: + 5,9,11,13,16,18,21] --historic-state-cache-size Specifies how many states from the freezer database should cache in memory [default: 1] @@ -376,6 +395,10 @@ OPTIONS: --network-dir Data directory for network keys. Defaults to network/ inside the beacon node dir. + --parallel-state-cache-size + Set the size of the cache used to de-duplicate requests for the same state. This cache is additional to + other state caches within Lighthouse and should be kept small unless a large number of parallel requests for + different states are anticipated. [default: 2] --port The TCP/UDP ports to listen on. There are two UDP ports. The discovery UDP port will be set to this value and the Quic UDP port will be set to this value + 1. The discovery port can be modified by the --discovery- @@ -462,6 +485,9 @@ OPTIONS: --slots-per-restore-point Specifies how often a freezer DB restore point should be stored. Cannot be changed after initialization. [default: 8192 (mainnet) or 64 (minimal)] + --state-cache-size + Specifies how many states the database should cache in memory [default: 128] + --suggested-fee-recipient Emergency fallback fee recipient for use in case the validator client does not have one configured. You should set this flag on the validator client instead of (or in addition to) setting it here. diff --git a/common/promise_cache/Cargo.toml b/common/promise_cache/Cargo.toml new file mode 100644 index 00000000000..b5fa42bd438 --- /dev/null +++ b/common/promise_cache/Cargo.toml @@ -0,0 +1,10 @@ +[package] +name = "promise_cache" +version = "0.1.0" +edition.workspace = true + +[dependencies] +derivative = { workspace = true } +oneshot_broadcast = { path = "../oneshot_broadcast" } +itertools = { workspace = true } +slog = { workspace = true } diff --git a/common/promise_cache/src/lib.rs b/common/promise_cache/src/lib.rs new file mode 100644 index 00000000000..36b6bd984f5 --- /dev/null +++ b/common/promise_cache/src/lib.rs @@ -0,0 +1,227 @@ +use derivative::Derivative; +use itertools::Itertools; +use oneshot_broadcast::{oneshot, Receiver, Sender}; +use slog::Logger; +use std::collections::HashMap; +use std::hash::Hash; +use std::sync::Arc; + +#[derive(Debug)] +pub struct PromiseCache +where + K: Hash + Eq + Clone, + P: Protect, +{ + cache: HashMap>, + capacity: usize, + protector: P, + max_concurrent_promises: usize, + logger: Logger, +} + +/// A value implementing `Protect` is capable of preventing keys of type `K` from being evicted. +/// +/// It also dictates an ordering on keys which is used to prioritise evictions. +pub trait Protect { + type SortKey: Ord; + + fn sort_key(&self, k: &K) -> Self::SortKey; + + fn protect_from_eviction(&self, k: &K) -> bool; + + fn notify_eviction(&self, _k: &K, _log: &Logger) {} +} + +#[derive(Derivative)] +#[derivative(Clone(bound = ""))] +pub enum CacheItem { + Complete(Arc), + Promise(Receiver>), +} + +impl std::fmt::Debug for CacheItem { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> Result<(), std::fmt::Error> { + match self { + CacheItem::Complete(value) => value.fmt(f), + CacheItem::Promise(_) => "Promise(..)".fmt(f), + } + } +} + +#[derive(Debug)] +pub enum PromiseCacheError { + Failed(oneshot_broadcast::Error), + MaxConcurrentPromises(usize), +} + +pub trait ToArc { + fn to_arc(&self) -> Arc; +} + +impl CacheItem { + pub fn is_promise(&self) -> bool { + matches!(self, CacheItem::Promise(_)) + } + + pub fn wait(self) -> Result, PromiseCacheError> { + match self { + CacheItem::Complete(value) => Ok(value), + CacheItem::Promise(receiver) => receiver.recv().map_err(PromiseCacheError::Failed), + } + } +} + +impl ToArc for Arc { + fn to_arc(&self) -> Arc { + self.clone() + } +} + +impl ToArc for T +where + T: Clone, +{ + fn to_arc(&self) -> Arc { + Arc::new(self.clone()) + } +} + +impl PromiseCache +where + K: Hash + Eq + Clone, + P: Protect, +{ + pub fn new(capacity: usize, protector: P, logger: Logger) -> Self { + // Making the concurrent promises directly configurable is considered overkill for now, + // so we just derive a vaguely sensible value from the cache size. + let max_concurrent_promises = std::cmp::max(2, capacity / 8); + Self { + cache: HashMap::new(), + capacity, + protector, + max_concurrent_promises, + logger, + } + } + + pub fn get(&mut self, key: &K) -> Option> { + match self.cache.get(key) { + // The cache contained the value, return it. + item @ Some(CacheItem::Complete(_)) => item.cloned(), + // The cache contains a promise for the value. Check to see if the promise has already + // been resolved, without waiting for it. + item @ Some(CacheItem::Promise(receiver)) => match receiver.try_recv() { + // The promise has already been resolved. Replace the entry in the cache with a + // `Complete` entry and then return the value. + Ok(Some(value)) => { + let ready = CacheItem::Complete(value); + self.insert_cache_item(key.clone(), ready.clone()); + Some(ready) + } + // The promise has not yet been resolved. Return the promise so the caller can await + // it. + Ok(None) => item.cloned(), + // The sender has been dropped without sending a value. There was most likely an + // error computing the value. Drop the key from the cache and return + // `None` so the caller can recompute the value. + // + // It's worth noting that this is the only place where we removed unresolved + // promises from the cache. This means unresolved promises will only be removed if + // we try to access them again. This is OK, since the promises don't consume much + // memory. We expect that *all* promises should be resolved, unless there is a + // programming or database error. + Err(oneshot_broadcast::Error::SenderDropped) => { + self.cache.remove(key); + None + } + }, + // The cache does not have this value and it's not already promised to be computed. + None => None, + } + } + + pub fn contains(&self, key: &K) -> bool { + self.cache.contains_key(key) + } + + pub fn insert_value>(&mut self, key: K, value: &C) { + if self + .cache + .get(&key) + // Replace the value if it's not present or if it's a promise. A bird in the hand is + // worth two in the promise-bush! + .map_or(true, CacheItem::is_promise) + { + self.insert_cache_item(key, CacheItem::Complete(value.to_arc())); + } + } + + /// Take care of resolving a promise by ensuring the value is made available: + /// + /// 1. To all waiting thread that are holding a `Receiver`. + /// 2. In the cache itself for future callers. + pub fn resolve_promise>(&mut self, sender: Sender>, key: K, value: &C) { + // Use the sender to notify all actively waiting receivers. + let arc_value = value.to_arc(); + sender.send(arc_value.clone()); + + // Re-insert the value into the cache. The promise may have been evicted in the meantime, + // but we probably want to keep this value (which resolved recently) over other older cache + // entries. + self.insert_value(key, &arc_value); + } + + /// Prunes the cache first before inserting a new item. + fn insert_cache_item(&mut self, key: K, cache_item: CacheItem) { + self.prune_cache(); + self.cache.insert(key, cache_item); + } + + pub fn create_promise(&mut self, key: K) -> Result>, PromiseCacheError> { + let num_active_promises = self.cache.values().filter(|item| item.is_promise()).count(); + if num_active_promises >= self.max_concurrent_promises { + return Err(PromiseCacheError::MaxConcurrentPromises( + num_active_promises, + )); + } + + let (sender, receiver) = oneshot(); + self.insert_cache_item(key, CacheItem::Promise(receiver)); + Ok(sender) + } + + fn prune_cache(&mut self) { + let target_cache_size = self.capacity.saturating_sub(1); + if let Some(prune_count) = self.cache.len().checked_sub(target_cache_size) { + let keys_to_prune = self + .cache + .keys() + .filter(|k| !self.protector.protect_from_eviction(*k)) + .sorted_by_key(|k| self.protector.sort_key(k)) + .take(prune_count) + .cloned() + .collect::>(); + + for key in &keys_to_prune { + self.protector.notify_eviction(key, &self.logger); + self.cache.remove(key); + } + } + } + + pub fn update_protector(&mut self, protector: P) { + self.protector = protector; + } + + pub fn len(&self) -> usize { + self.cache.len() + } + + pub fn is_empty(&self) -> bool { + self.cache.is_empty() + } + + pub fn max_concurrent_promises(&self) -> usize { + self.max_concurrent_promises + } +} diff --git a/lighthouse/tests/beacon_node.rs b/lighthouse/tests/beacon_node.rs index 708773e46a1..bec08264883 100644 --- a/lighthouse/tests/beacon_node.rs +++ b/lighthouse/tests/beacon_node.rs @@ -1717,6 +1717,25 @@ fn historic_state_cache_size_default() { }); } #[test] +fn parallel_state_cache_size_flag() { + CommandLineTest::new() + .flag("parallel-state-cache-size", Some("4")) + .run_with_zero_port() + .with_config(|config| assert_eq!(config.chain.parallel_state_cache_size, 4_usize)); +} +#[test] +fn parallel_state_cache_size_default() { + use beacon_node::beacon_chain::chain_config::DEFAULT_PARALLEL_STATE_CACHE_SIZE; + CommandLineTest::new() + .run_with_zero_port() + .with_config(|config| { + assert_eq!( + config.chain.parallel_state_cache_size, + DEFAULT_PARALLEL_STATE_CACHE_SIZE + ); + }); +} +#[test] fn auto_compact_db_flag() { CommandLineTest::new() .flag("auto-compact-db", Some("false"))