Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Clone state ahead of block production #4925

Merged
merged 3 commits into from
Nov 30, 2023
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
61 changes: 48 additions & 13 deletions beacon_node/beacon_chain/src/beacon_chain.rs
Original file line number Diff line number Diff line change
Expand Up @@ -482,6 +482,8 @@ pub struct BeaconChain<T: BeaconChainTypes> {
pub data_availability_checker: Arc<DataAvailabilityChecker<T>>,
/// The KZG trusted setup used by this chain.
pub kzg: Option<Arc<Kzg>>,
#[allow(clippy::type_complexity)]
pub block_production_state: Arc<Mutex<Option<(Hash256, BlockProductionPreState<T::EthSpec>)>>>,
}

pub enum BeaconBlockResponseType<T: EthSpec> {
Expand Down Expand Up @@ -4030,14 +4032,29 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
);
(re_org_state.pre_state, re_org_state.state_root)
}
// Normal case: proposing a block atop the current head. Use the snapshot cache.
// Normal case: proposing a block atop the current head using the cache.
else if let Some((_, cached_state)) = self
.block_production_state
.lock()
.take()
.filter(|(cached_block_root, _)| *cached_block_root == head_block_root)
{
(cached_state.pre_state, cached_state.state_root)
}
// Fall back to a direct read of the snapshot cache.
else if let Some(pre_state) = self
.snapshot_cache
.try_read_for(BLOCK_PROCESSING_CACHE_LOCK_TIMEOUT)
.and_then(|snapshot_cache| {
snapshot_cache.get_state_for_block_production(head_block_root)
})
{
warn!(
self.log,
"Block production cache miss";
"message" => "falling back to snapshot cache clone",
"slot" => slot
);
(pre_state.pre_state, pre_state.state_root)
} else {
warn!(
Expand Down Expand Up @@ -4161,12 +4178,27 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
drop(proposer_head_timer);
let re_org_parent_block = proposer_head.parent_node.root;

// Only attempt a re-org if we hit the snapshot cache.
// Only attempt a re-org if we hit the block production cache or snapshot cache.
let pre_state = self
.snapshot_cache
.try_read_for(BLOCK_PROCESSING_CACHE_LOCK_TIMEOUT)
.and_then(|snapshot_cache| {
snapshot_cache.get_state_for_block_production(re_org_parent_block)
.block_production_state
.lock()
.take()
.and_then(|(cached_block_root, state)| {
(cached_block_root == re_org_parent_block).then_some(state)
})
.or_else(|| {
warn!(
self.log,
"Block production cache miss";
"message" => "falling back to snapshot cache during re-org",
"slot" => slot,
"block_root" => ?re_org_parent_block
);
self.snapshot_cache
.try_read_for(BLOCK_PROCESSING_CACHE_LOCK_TIMEOUT)
.and_then(|snapshot_cache| {
snapshot_cache.get_state_for_block_production(re_org_parent_block)
})
})
.or_else(|| {
debug!(
Expand Down Expand Up @@ -5326,15 +5358,18 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
///
/// This function will result in a call to `forkchoiceUpdated` on the EL if we're in the
/// tail-end of the slot (as defined by `self.config.prepare_payload_lookahead`).
///
/// Return `Ok(Some(head_block_root))` if this node prepared to propose at the next slot on
/// top of `head_block_root`.
pub async fn prepare_beacon_proposer(
self: &Arc<Self>,
current_slot: Slot,
) -> Result<(), Error> {
) -> Result<Option<Hash256>, Error> {
let prepare_slot = current_slot + 1;

// There's no need to run the proposer preparation routine before the bellatrix fork.
if self.slot_is_prior_to_bellatrix(prepare_slot) {
return Ok(());
return Ok(None);
}

let execution_layer = self
Expand All @@ -5347,7 +5382,7 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
if !self.config.always_prepare_payload
&& !execution_layer.has_any_proposer_preparation_data().await
{
return Ok(());
return Ok(None);
}

// Load the cached head and its forkchoice update parameters.
Expand Down Expand Up @@ -5394,7 +5429,7 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
let Some((forkchoice_update_params, Some(pre_payload_attributes))) = maybe_prep_data else {
// Appropriate log messages have already been logged above and in
// `get_pre_payload_attributes`.
return Ok(());
return Ok(None);
};

// If the execution layer doesn't have any proposer data for this validator then we assume
Expand All @@ -5405,7 +5440,7 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
.has_proposer_preparation_data(proposer)
.await
{
return Ok(());
return Ok(None);
}

// Fetch payload attributes from the execution layer's cache, or compute them from scratch
Expand Down Expand Up @@ -5500,7 +5535,7 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
"prepare_slot" => prepare_slot,
"validator" => proposer,
);
return Ok(());
return Ok(None);
};

// If we are close enough to the proposal slot, send an fcU, which will have payload
Expand All @@ -5523,7 +5558,7 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
.await?;
}

Ok(())
Ok(Some(head_root))
}

pub async fn update_execution_engine_forkchoice(
Expand Down
1 change: 1 addition & 0 deletions beacon_node/beacon_chain/src/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -925,6 +925,7 @@ where
.map_err(|e| format!("Error initializing DataAvailabiltyChecker: {:?}", e))?,
),
kzg,
block_production_state: Arc::new(Mutex::new(None)),
};

let head = beacon_chain.head_snapshot();
Expand Down
48 changes: 40 additions & 8 deletions beacon_node/beacon_chain/src/state_advance_timer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -227,19 +227,51 @@ async fn state_advance_timer<T: BeaconChainTypes>(

// Prepare proposers so that the node can send payload attributes in the case where
// it decides to abandon a proposer boost re-org.
if let Err(e) = beacon_chain.prepare_beacon_proposer(current_slot).await {
warn!(
log,
"Unable to prepare proposer with lookahead";
"error" => ?e,
"slot" => next_slot,
);
}
let proposer_head = beacon_chain
.prepare_beacon_proposer(current_slot)
.await
.unwrap_or_else(|e| {
warn!(
log,
"Unable to prepare proposer with lookahead";
"error" => ?e,
"slot" => next_slot,
);
None
});

// Use a blocking task to avoid blocking the core executor whilst waiting for locks
// in `ForkChoiceSignalTx`.
beacon_chain.task_executor.clone().spawn_blocking(
move || {
// If we're proposing, clone the head state preemptively so that it isn't on the hot
michaelsproul marked this conversation as resolved.
Show resolved Hide resolved
// path of proposing. We can delete this once we have tree-states.
if let Some(proposer_head) = proposer_head {
if let Some(proposer_state) = beacon_chain
Copy link
Member

@paulhauner paulhauner Nov 21, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wonder if it would be beneficial to drop the block_production_state above this line. It would prevent us from holding the old state in memory whilst we load the new one.

The downside would be that there are some times where we clear the block_production_state when we don't need to (i.e. the state is not in the snapshot cache but would be useful for the next proposal).

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good idea. I've added that in 6d5d558. It will still hold 2 states in memory if the beacon node has two proposals two slots in a row on the same head block, but I think this is reasonably obscure enough to be tolerated until we get tree-states. I'll try to deploy that change on Holesky in a few hours so I can confirm it didn't regress on the perf front.

.snapshot_cache
.try_read_for(BLOCK_PROCESSING_CACHE_LOCK_TIMEOUT)
.and_then(|snapshot_cache| {
snapshot_cache.get_state_for_block_production(proposer_head)
})
{
*beacon_chain.block_production_state.lock() =
Some((proposer_head, proposer_state));
debug!(
log,
"Cloned state ready for block production";
"head_block_root" => ?proposer_head,
"slot" => next_slot
);
}
} else {
warn!(
log,
"Block production state missing from snapshot cache";
"head_block_root" => ?proposer_head,
"slot" => next_slot
);
}

// Signal block proposal for the next slot (if it happens to be waiting).
if let Some(tx) = &beacon_chain.fork_choice_signal_tx {
if let Err(e) = tx.notify_fork_choice_complete(next_slot) {
Expand Down
Loading