From 547ed1de635fedb4f686841c7ff0ba5cc6d3c110 Mon Sep 17 00:00:00 2001 From: Michael Sproul Date: Thu, 30 Nov 2023 13:49:35 +1100 Subject: [PATCH] Clone state ahead of block production (#4925) * Clone state ahead of block production * Add pruning and fix logging * Don't hold 2 states in mem --- beacon_node/beacon_chain/src/beacon_chain.rs | 64 ++++++++++++---- beacon_node/beacon_chain/src/builder.rs | 1 + .../beacon_chain/src/state_advance_timer.rs | 73 +++++++++++++++++-- 3 files changed, 117 insertions(+), 21 deletions(-) diff --git a/beacon_node/beacon_chain/src/beacon_chain.rs b/beacon_node/beacon_chain/src/beacon_chain.rs index 6b893d6967a..53583390fa3 100644 --- a/beacon_node/beacon_chain/src/beacon_chain.rs +++ b/beacon_node/beacon_chain/src/beacon_chain.rs @@ -482,6 +482,11 @@ pub struct BeaconChain { pub data_availability_checker: Arc>, /// The KZG trusted setup used by this chain. pub kzg: Option>, + /// State with complete tree hash cache, ready for block production. + /// + /// NB: We can delete this once we have tree-states. + #[allow(clippy::type_complexity)] + pub block_production_state: Arc)>>>, } pub enum BeaconBlockResponseType { @@ -4030,7 +4035,16 @@ impl BeaconChain { ); (re_org_state.pre_state, re_org_state.state_root) } - // Normal case: proposing a block atop the current head. Use the snapshot cache. + // Normal case: proposing a block atop the current head using the cache. + else if let Some((_, cached_state)) = self + .block_production_state + .lock() + .take() + .filter(|(cached_block_root, _)| *cached_block_root == head_block_root) + { + (cached_state.pre_state, cached_state.state_root) + } + // Fall back to a direct read of the snapshot cache. else if let Some(pre_state) = self .snapshot_cache .try_read_for(BLOCK_PROCESSING_CACHE_LOCK_TIMEOUT) @@ -4038,6 +4052,12 @@ impl BeaconChain { snapshot_cache.get_state_for_block_production(head_block_root) }) { + warn!( + self.log, + "Block production cache miss"; + "message" => "falling back to snapshot cache clone", + "slot" => slot + ); (pre_state.pre_state, pre_state.state_root) } else { warn!( @@ -4161,12 +4181,27 @@ impl BeaconChain { drop(proposer_head_timer); let re_org_parent_block = proposer_head.parent_node.root; - // Only attempt a re-org if we hit the snapshot cache. + // Only attempt a re-org if we hit the block production cache or snapshot cache. let pre_state = self - .snapshot_cache - .try_read_for(BLOCK_PROCESSING_CACHE_LOCK_TIMEOUT) - .and_then(|snapshot_cache| { - snapshot_cache.get_state_for_block_production(re_org_parent_block) + .block_production_state + .lock() + .take() + .and_then(|(cached_block_root, state)| { + (cached_block_root == re_org_parent_block).then_some(state) + }) + .or_else(|| { + warn!( + self.log, + "Block production cache miss"; + "message" => "falling back to snapshot cache during re-org", + "slot" => slot, + "block_root" => ?re_org_parent_block + ); + self.snapshot_cache + .try_read_for(BLOCK_PROCESSING_CACHE_LOCK_TIMEOUT) + .and_then(|snapshot_cache| { + snapshot_cache.get_state_for_block_production(re_org_parent_block) + }) }) .or_else(|| { debug!( @@ -5326,15 +5361,18 @@ impl BeaconChain { /// /// This function will result in a call to `forkchoiceUpdated` on the EL if we're in the /// tail-end of the slot (as defined by `self.config.prepare_payload_lookahead`). + /// + /// Return `Ok(Some(head_block_root))` if this node prepared to propose at the next slot on + /// top of `head_block_root`. pub async fn prepare_beacon_proposer( self: &Arc, current_slot: Slot, - ) -> Result<(), Error> { + ) -> Result, Error> { let prepare_slot = current_slot + 1; // There's no need to run the proposer preparation routine before the bellatrix fork. if self.slot_is_prior_to_bellatrix(prepare_slot) { - return Ok(()); + return Ok(None); } let execution_layer = self @@ -5347,7 +5385,7 @@ impl BeaconChain { if !self.config.always_prepare_payload && !execution_layer.has_any_proposer_preparation_data().await { - return Ok(()); + return Ok(None); } // Load the cached head and its forkchoice update parameters. @@ -5394,7 +5432,7 @@ impl BeaconChain { let Some((forkchoice_update_params, Some(pre_payload_attributes))) = maybe_prep_data else { // Appropriate log messages have already been logged above and in // `get_pre_payload_attributes`. - return Ok(()); + return Ok(None); }; // If the execution layer doesn't have any proposer data for this validator then we assume @@ -5405,7 +5443,7 @@ impl BeaconChain { .has_proposer_preparation_data(proposer) .await { - return Ok(()); + return Ok(None); } // Fetch payload attributes from the execution layer's cache, or compute them from scratch @@ -5500,7 +5538,7 @@ impl BeaconChain { "prepare_slot" => prepare_slot, "validator" => proposer, ); - return Ok(()); + return Ok(None); }; // If we are close enough to the proposal slot, send an fcU, which will have payload @@ -5523,7 +5561,7 @@ impl BeaconChain { .await?; } - Ok(()) + Ok(Some(head_root)) } pub async fn update_execution_engine_forkchoice( diff --git a/beacon_node/beacon_chain/src/builder.rs b/beacon_node/beacon_chain/src/builder.rs index bffb23aeb7e..fbd255126ee 100644 --- a/beacon_node/beacon_chain/src/builder.rs +++ b/beacon_node/beacon_chain/src/builder.rs @@ -925,6 +925,7 @@ where .map_err(|e| format!("Error initializing DataAvailabiltyChecker: {:?}", e))?, ), kzg, + block_production_state: Arc::new(Mutex::new(None)), }; let head = beacon_chain.head_snapshot(); diff --git a/beacon_node/beacon_chain/src/state_advance_timer.rs b/beacon_node/beacon_chain/src/state_advance_timer.rs index f3e97168a5c..c04815ebc13 100644 --- a/beacon_node/beacon_chain/src/state_advance_timer.rs +++ b/beacon_node/beacon_chain/src/state_advance_timer.rs @@ -45,6 +45,9 @@ const MAX_ADVANCE_DISTANCE: u64 = 4; /// impact whilst having 8 epochs without a block is a comfortable grace period. const MAX_FORK_CHOICE_DISTANCE: u64 = 256; +/// Drop any unused block production state cache after this many slots. +const MAX_BLOCK_PRODUCTION_CACHE_DISTANCE: u64 = 4; + #[derive(Debug)] enum Error { BeaconChain(BeaconChainError), @@ -227,19 +230,73 @@ async fn state_advance_timer( // Prepare proposers so that the node can send payload attributes in the case where // it decides to abandon a proposer boost re-org. - if let Err(e) = beacon_chain.prepare_beacon_proposer(current_slot).await { - warn!( - log, - "Unable to prepare proposer with lookahead"; - "error" => ?e, - "slot" => next_slot, - ); - } + let proposer_head = beacon_chain + .prepare_beacon_proposer(current_slot) + .await + .unwrap_or_else(|e| { + warn!( + log, + "Unable to prepare proposer with lookahead"; + "error" => ?e, + "slot" => next_slot, + ); + None + }); // Use a blocking task to avoid blocking the core executor whilst waiting for locks // in `ForkChoiceSignalTx`. beacon_chain.task_executor.clone().spawn_blocking( move || { + // If we're proposing, clone the head state preemptively so that it isn't on + // the hot path of proposing. We can delete this once we have tree-states. + if let Some(proposer_head) = proposer_head { + let mut cache = beacon_chain.block_production_state.lock(); + + // Avoid holding two states in memory. It's OK to hold the lock because + // we always lock the block production cache before the snapshot cache + // and we prefer for block production to wait for the block production + // cache if a clone is in-progress. + if cache + .as_ref() + .map_or(false, |(cached_head, _)| *cached_head != proposer_head) + { + drop(cache.take()); + } + if let Some(proposer_state) = beacon_chain + .snapshot_cache + .try_read_for(BLOCK_PROCESSING_CACHE_LOCK_TIMEOUT) + .and_then(|snapshot_cache| { + snapshot_cache.get_state_for_block_production(proposer_head) + }) + { + *cache = Some((proposer_head, proposer_state)); + debug!( + log, + "Cloned state ready for block production"; + "head_block_root" => ?proposer_head, + "slot" => next_slot + ); + } else { + warn!( + log, + "Block production state missing from snapshot cache"; + "head_block_root" => ?proposer_head, + "slot" => next_slot + ); + } + } else { + // If we aren't proposing, drop any old block production cache to save + // memory. + let mut cache = beacon_chain.block_production_state.lock(); + if let Some((_, state)) = &*cache { + if state.pre_state.slot() + MAX_BLOCK_PRODUCTION_CACHE_DISTANCE + <= current_slot + { + drop(cache.take()); + } + } + } + // Signal block proposal for the next slot (if it happens to be waiting). if let Some(tx) = &beacon_chain.fork_choice_signal_tx { if let Err(e) = tx.notify_fork_choice_complete(next_slot) {