Skip to content

Commit

Permalink
Add cache for parallel HTTP requests (sigp#4879)
Browse files Browse the repository at this point in the history
  • Loading branch information
michaelsproul authored Jan 11, 2024
1 parent 8db17da commit 664a778
Show file tree
Hide file tree
Showing 21 changed files with 467 additions and 261 deletions.
12 changes: 11 additions & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ members = [
"common/malloc_utils",
"common/oneshot_broadcast",
"common/pretty_reqwest_error",
"common/promise_cache",
"common/sensitive_url",
"common/slot_clock",
"common/system_health",
Expand Down
2 changes: 1 addition & 1 deletion beacon_node/beacon_chain/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -43,9 +43,9 @@ lighthouse_metrics = { workspace = true }
logging = { workspace = true }
lru = { workspace = true }
merkle_proof = { workspace = true }
oneshot_broadcast = { path = "../../common/oneshot_broadcast/" }
operation_pool = { workspace = true }
parking_lot = { workspace = true }
promise_cache = { path = "../../common/promise_cache" }
proto_array = { workspace = true }
rand = { workspace = true }
rayon = { workspace = true }
Expand Down
19 changes: 15 additions & 4 deletions beacon_node/beacon_chain/src/beacon_chain.rs
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ use crate::observed_blob_sidecars::ObservedBlobSidecars;
use crate::observed_block_producers::ObservedBlockProducers;
use crate::observed_operations::{ObservationOutcome, ObservedOperations};
use crate::observed_slashable::ObservedSlashable;
use crate::parallel_state_cache::ParallelStateCache;
use crate::persisted_beacon_chain::{PersistedBeaconChain, DUMMY_CANONICAL_HEAD_BLOCK_ROOT};
use crate::persisted_fork_choice::PersistedForkChoice;
use crate::pre_finalization_cache::PreFinalizationBlockCache;
Expand Down Expand Up @@ -460,6 +461,10 @@ pub struct BeaconChain<T: BeaconChainTypes> {
pub block_times_cache: Arc<RwLock<BlockTimesCache>>,
/// A cache used to track pre-finalization block roots for quick rejection.
pub pre_finalization_block_cache: PreFinalizationBlockCache,
/// A cache used to de-duplicate HTTP state requests.
///
/// The cache is keyed by `state_root`.
pub parallel_state_cache: Arc<RwLock<ParallelStateCache<T::EthSpec>>>,
/// Sender given to tasks, so that if they encounter a state in which execution cannot
/// continue they can request that everything shuts down.
pub shutdown_sender: Sender<ShutdownReason>,
Expand Down Expand Up @@ -3868,7 +3873,7 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
self.shuffling_cache
.try_write_for(ATTESTATION_CACHE_LOCK_TIMEOUT)
.ok_or(Error::AttestationCacheLockTimeout)?
.insert_committee_cache(shuffling_id, committee_cache);
.insert_value(shuffling_id, committee_cache);
}
}
Ok(())
Expand Down Expand Up @@ -6041,7 +6046,7 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
// access.
drop(shuffling_cache);

let committee_cache = cache_item.wait()?;
let committee_cache = cache_item.wait().map_err(Error::ShufflingCacheError)?;
map_fn(&committee_cache, shuffling_id.shuffling_decision_block)
} else {
// Create an entry in the cache that "promises" this value will eventually be computed.
Expand All @@ -6050,7 +6055,9 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
//
// Creating the promise whilst we hold the `shuffling_cache` lock will prevent the same
// promise from being created twice.
let sender = shuffling_cache.create_promise(shuffling_id.clone())?;
let sender = shuffling_cache
.create_promise(shuffling_id.clone())
.map_err(Error::ShufflingCacheError)?;

// Drop the shuffling cache to avoid holding the lock for any longer than
// required.
Expand Down Expand Up @@ -6144,7 +6151,7 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
self.shuffling_cache
.try_write_for(ATTESTATION_CACHE_LOCK_TIMEOUT)
.ok_or(Error::AttestationCacheLockTimeout)?
.insert_committee_cache(shuffling_id, &committee_cache);
.insert_value(shuffling_id, &committee_cache);

metrics::stop_timer(committee_building_timer);

Expand Down Expand Up @@ -6446,6 +6453,10 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
self.data_availability_checker.data_availability_boundary()
}

pub fn logger(&self) -> &Logger {
&self.log
}

/// Gets the `LightClientBootstrap` object for a requested block root.
///
/// Returns `None` when the state or block is not found in the database.
Expand Down
7 changes: 7 additions & 0 deletions beacon_node/beacon_chain/src/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ use futures::channel::mpsc::Sender;
use kzg::{Kzg, TrustedSetup};
use operation_pool::{OperationPool, PersistedOperationPool};
use parking_lot::{Mutex, RwLock};
use promise_cache::PromiseCache;
use proto_array::{DisallowedReOrgOffsets, ReOrgThreshold};
use slasher::Slasher;
use slog::{crit, debug, error, info, o, Logger};
Expand Down Expand Up @@ -851,6 +852,7 @@ where
let genesis_time = head_snapshot.beacon_state.genesis_time();
let canonical_head = CanonicalHead::new(fork_choice, Arc::new(head_snapshot));
let shuffling_cache_size = self.chain_config.shuffling_cache_size;
let parallel_state_cache_size = self.chain_config.parallel_state_cache_size;

// Calculate the weak subjectivity point in which to backfill blocks to.
let genesis_backfill_slot = if self.chain_config.genesis_backfill {
Expand Down Expand Up @@ -933,6 +935,11 @@ where
beacon_proposer_cache,
block_times_cache: <_>::default(),
pre_finalization_block_cache: <_>::default(),
parallel_state_cache: Arc::new(RwLock::new(PromiseCache::new(
parallel_state_cache_size,
Default::default(),
log.clone(),
))),
validator_pubkey_cache,
attester_cache: <_>::default(),
early_attester_cache: <_>::default(),
Expand Down
4 changes: 1 addition & 3 deletions beacon_node/beacon_chain/src/canonical_head.rs
Original file line number Diff line number Diff line change
Expand Up @@ -820,9 +820,7 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
Ok(head_shuffling_ids) => {
self.shuffling_cache
.try_write_for(ATTESTATION_CACHE_LOCK_TIMEOUT)
.map(|mut shuffling_cache| {
shuffling_cache.update_head_shuffling_ids(head_shuffling_ids)
})
.map(|mut shuffling_cache| shuffling_cache.update_protector(head_shuffling_ids))
.unwrap_or_else(|| {
error!(
self.log,
Expand Down
6 changes: 6 additions & 0 deletions beacon_node/beacon_chain/src/chain_config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,9 @@ pub const DEFAULT_PREPARE_PAYLOAD_LOOKAHEAD_FACTOR: u32 = 3;
/// Fraction of a slot lookahead for fork choice in the state advance timer (500ms on mainnet).
pub const FORK_CHOICE_LOOKAHEAD_FACTOR: u32 = 24;

/// Cache only a small number of states in the parallel cache by default.
pub const DEFAULT_PARALLEL_STATE_CACHE_SIZE: usize = 2;

#[derive(Debug, PartialEq, Eq, Clone, Deserialize, Serialize)]
pub struct ChainConfig {
/// Maximum number of slots to skip when importing an attestation.
Expand Down Expand Up @@ -83,6 +86,8 @@ pub struct ChainConfig {
pub progressive_balances_mode: ProgressiveBalancesMode,
/// Number of epochs between each migration of data from the hot database to the freezer.
pub epochs_per_migration: u64,
/// Size of the promise cache for de-duplicating parallel state requests.
pub parallel_state_cache_size: usize,
}

impl Default for ChainConfig {
Expand Down Expand Up @@ -114,6 +119,7 @@ impl Default for ChainConfig {
always_prepare_payload: false,
progressive_balances_mode: ProgressiveBalancesMode::Fast,
epochs_per_migration: crate::migrate::DEFAULT_EPOCHS_PER_MIGRATION,
parallel_state_cache_size: DEFAULT_PARALLEL_STATE_CACHE_SIZE,
}
}
}
Expand Down
3 changes: 1 addition & 2 deletions beacon_node/beacon_chain/src/errors.rs
Original file line number Diff line number Diff line change
Expand Up @@ -211,8 +211,7 @@ pub enum BeaconChainError {
},
AttestationHeadNotInForkChoice(Hash256),
MissingPersistedForkChoice,
CommitteePromiseFailed(oneshot_broadcast::Error),
MaxCommitteePromises(usize),
ShufflingCacheError(promise_cache::PromiseCacheError),
BlsToExecutionPriorToCapella,
BlsToExecutionConflictsWithPool,
InconsistentFork(InconsistentFork),
Expand Down
1 change: 1 addition & 0 deletions beacon_node/beacon_chain/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ pub mod observed_block_producers;
pub mod observed_operations;
mod observed_slashable;
pub mod otb_verification_service;
mod parallel_state_cache;
mod persisted_beacon_chain;
mod persisted_fork_choice;
mod pre_finalization_cache;
Expand Down
22 changes: 22 additions & 0 deletions beacon_node/beacon_chain/src/parallel_state_cache.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
use promise_cache::{PromiseCache, Protect};
use types::{BeaconState, Hash256};

#[derive(Debug, Default)]
pub struct ParallelStateProtector;

impl Protect<Hash256> for ParallelStateProtector {
type SortKey = usize;

/// Evict in arbitrary (hashmap) order by using the same key for every value.
fn sort_key(&self, _: &Hash256) -> Self::SortKey {
0
}

/// We don't care too much about preventing evictions of particular states here. All the states
/// in this cache should be different from the head state.
fn protect_from_eviction(&self, _: &Hash256) -> bool {
false
}
}

pub type ParallelStateCache<E> = PromiseCache<Hash256, BeaconState<E>, ParallelStateProtector>;
Loading

0 comments on commit 664a778

Please sign in to comment.