Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Notify lookup sync of gossip processing results #5722

Merged
merged 6 commits into from
May 13, 2024
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 20 additions & 0 deletions beacon_node/beacon_chain/src/block_verification_types.rs
Original file line number Diff line number Diff line change
Expand Up @@ -314,6 +314,26 @@ pub struct BlockImportData<E: EthSpec> {
pub consensus_context: ConsensusContext<E>,
}

impl<E: EthSpec> BlockImportData<E> {
pub fn __new_for_test(
block_root: Hash256,
state: BeaconState<E>,
parent_block: SignedBeaconBlock<E, BlindedPayload<E>>,
) -> Self {
Self {
block_root,
state,
parent_block,
parent_eth1_finalization_data: Eth1FinalizationData {
eth1_data: <_>::default(),
eth1_deposit_index: 0,
},
confirmed_state_roots: vec![],
consensus_context: ConsensusContext::new(Slot::new(0)),
}
}
}

pub type GossipVerifiedBlockContents<E> =
(GossipVerifiedBlock<E>, Option<GossipVerifiedBlobList<E>>);

Expand Down
7 changes: 4 additions & 3 deletions beacon_node/beacon_chain/src/data_availability_checker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -84,10 +84,11 @@ impl<T: BeaconChainTypes> DataAvailabilityChecker<T> {
})
}

/// Checks if the block root is currenlty in the availability cache awaiting processing because
/// Checks if the block root is currenlty in the availability cache awaiting import because
/// of missing components.
pub fn has_block(&self, block_root: &Hash256) -> bool {
self.availability_cache.has_block(block_root)
pub fn has_execution_valid_block(&self, block_root: &Hash256) -> bool {
self.availability_cache
.has_execution_valid_block(block_root)
}

/// Return the required blobs `block_root` expects if the block is currenlty in the cache.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -432,11 +432,6 @@ impl<T: BeaconChainTypes> Critical<T> {
Ok(())
}

/// Returns true if the block root is known, without altering the LRU ordering
pub fn has_block(&self, block_root: &Hash256) -> bool {
self.in_memory.peek(block_root).is_some() || self.store_keys.contains(block_root)
}

/// This only checks for the blobs in memory
pub fn peek_blob(
&self,
Expand Down Expand Up @@ -549,8 +544,12 @@ impl<T: BeaconChainTypes> OverflowLRUCache<T> {
}

/// Returns true if the block root is known, without altering the LRU ordering
pub fn has_block(&self, block_root: &Hash256) -> bool {
self.critical.read().has_block(block_root)
pub fn has_execution_valid_block(&self, block_root: &Hash256) -> bool {
if let Some(pending_components) = self.critical.read().peek_pending_components(block_root) {
pending_components.executed_block.is_some()
} else {
false
}
}

/// Fetch a blob from the cache without affecting the LRU ordering
Expand Down
20 changes: 12 additions & 8 deletions beacon_node/network/src/network_beacon_processor/gossip_methods.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1187,19 +1187,18 @@ impl<T: BeaconChainTypes> NetworkBeaconProcessor<T> {
"block_root" => %block_root,
);
}
Err(BlockError::ParentUnknown(block)) => {
// Inform the sync manager to find parents for this block
// This should not occur. It should be checked by `should_forward_block`
Err(BlockError::ParentUnknown(_)) => {
// This should not occur. It should be checked by `should_forward_block`.
// Do not send sync message UnknownParentBlock to prevent conflicts with the
// BlockComponentProcessed message below. If this error ever happens, lookup sync
// can recover by receiving another block / blob / attestation referencing the
// chain that includes this block.
error!(
self.log,
"Block with unknown parent attempted to be processed";
"block_root" => %block_root,
"peer_id" => %peer_id
);
self.send_sync_message(SyncMessage::UnknownParentBlock(
peer_id,
block.clone(),
block_root,
));
}
Err(ref e @ BlockError::ExecutionPayloadError(ref epe)) if !epe.penalize_peer() => {
debug!(
Expand Down Expand Up @@ -1263,6 +1262,11 @@ impl<T: BeaconChainTypes> NetworkBeaconProcessor<T> {
&self.log,
);
}

self.send_sync_message(SyncMessage::GossipBlockProcessResult {
block_root,
imported: matches!(result, Ok(AvailabilityProcessingStatus::Imported(_))),
});
}

pub fn process_gossip_voluntary_exit(
Expand Down
6 changes: 2 additions & 4 deletions beacon_node/network/src/network_beacon_processor/mod.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,5 @@
use crate::{
service::NetworkMessage,
sync::{manager::BlockProcessType, SyncMessage},
};
use crate::sync::manager::BlockProcessType;
use crate::{service::NetworkMessage, sync::manager::SyncMessage};
use beacon_chain::block_verification_types::RpcBlock;
use beacon_chain::{builder::Witness, eth1_chain::CachingEth1Backend, BeaconChain};
use beacon_chain::{BeaconChainTypes, NotifyExecutionLayer};
Expand Down
22 changes: 9 additions & 13 deletions beacon_node/network/src/network_beacon_processor/sync_methods.rs
Original file line number Diff line number Diff line change
Expand Up @@ -112,10 +112,9 @@ impl<T: BeaconChainTypes> NetworkBeaconProcessor<T> {
let Some(handle) = duplicate_cache.check_and_insert(block_root) else {
debug!(
self.log,
"Gossip block is being processed";
"Rpc block is being processed";
realbigsean marked this conversation as resolved.
Show resolved Hide resolved
"action" => "sending rpc block to reprocessing queue",
"block_root" => %block_root,
"process_type" => ?process_type,
);

// Send message to work reprocess queue to retry the block
Expand Down Expand Up @@ -148,7 +147,6 @@ impl<T: BeaconChainTypes> NetworkBeaconProcessor<T> {
"proposer" => block.message().proposer_index(),
"slot" => block.slot(),
"commitments" => commitments_formatted,
"process_type" => ?process_type,
);

let result = self
Expand All @@ -170,17 +168,15 @@ impl<T: BeaconChainTypes> NetworkBeaconProcessor<T> {
if reprocess_tx.try_send(reprocess_msg).is_err() {
error!(self.log, "Failed to inform block import"; "source" => "rpc", "block_root" => %hash)
};
if matches!(process_type, BlockProcessType::SingleBlock { .. }) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why are we removing this match?

It it redundant because we now only process single blocks here?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, blobs are processed in process_rpc_blobs

self.chain.block_times_cache.write().set_time_observed(
hash,
slot,
seen_timestamp,
None,
None,
);
self.chain.block_times_cache.write().set_time_observed(
hash,
slot,
seen_timestamp,
None,
None,
);

self.chain.recompute_head_at_current_slot().await;
}
self.chain.recompute_head_at_current_slot().await;
}
// Sync handles these results
self.send_sync_message(SyncMessage::BlockComponentProcessed {
Expand Down
21 changes: 8 additions & 13 deletions beacon_node/network/src/sync/block_lookups/common.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,8 @@ use crate::sync::block_lookups::single_block_lookup::{
LookupRequestError, SingleBlockLookup, SingleLookupRequestState,
};
use crate::sync::block_lookups::{BlobRequestState, BlockRequestState, PeerId};
use crate::sync::manager::{BlockProcessType, Id, SLOT_IMPORT_TOLERANCE};
use crate::sync::network_context::SyncNetworkContext;
use crate::sync::manager::{Id, SLOT_IMPORT_TOLERANCE};
use crate::sync::network_context::{LookupRequestResult, SyncNetworkContext};
use beacon_chain::block_verification_types::RpcBlock;
use beacon_chain::BeaconChainTypes;
use std::sync::Arc;
Expand Down Expand Up @@ -45,7 +45,7 @@ pub trait RequestState<T: BeaconChainTypes> {
peer_id: PeerId,
downloaded_block_expected_blobs: Option<usize>,
cx: &mut SyncNetworkContext<T>,
) -> Result<bool, LookupRequestError>;
) -> Result<LookupRequestResult, LookupRequestError>;

/* Response handling methods */

Expand Down Expand Up @@ -80,7 +80,7 @@ impl<T: BeaconChainTypes> RequestState<T> for BlockRequestState<T::EthSpec> {
peer_id: PeerId,
_: Option<usize>,
cx: &mut SyncNetworkContext<T>,
) -> Result<bool, LookupRequestError> {
) -> Result<LookupRequestResult, LookupRequestError> {
cx.block_lookup_request(id, peer_id, self.requested_block_root)
.map_err(LookupRequestError::SendFailed)
}
Expand All @@ -97,10 +97,10 @@ impl<T: BeaconChainTypes> RequestState<T> for BlockRequestState<T::EthSpec> {
peer_id: _,
} = download_result;
cx.send_block_for_processing(
id,
block_root,
RpcBlock::new_without_blobs(Some(block_root), value),
seen_timestamp,
BlockProcessType::SingleBlock { id },
)
.map_err(LookupRequestError::SendFailed)
}
Expand Down Expand Up @@ -128,7 +128,7 @@ impl<T: BeaconChainTypes> RequestState<T> for BlobRequestState<T::EthSpec> {
peer_id: PeerId,
downloaded_block_expected_blobs: Option<usize>,
cx: &mut SyncNetworkContext<T>,
) -> Result<bool, LookupRequestError> {
) -> Result<LookupRequestResult, LookupRequestError> {
cx.blob_lookup_request(
id,
peer_id,
Expand All @@ -149,13 +149,8 @@ impl<T: BeaconChainTypes> RequestState<T> for BlobRequestState<T::EthSpec> {
seen_timestamp,
peer_id: _,
} = download_result;
cx.send_blobs_for_processing(
block_root,
value,
seen_timestamp,
BlockProcessType::SingleBlob { id },
)
.map_err(LookupRequestError::SendFailed)
cx.send_blobs_for_processing(id, block_root, value, seen_timestamp)
.map_err(LookupRequestError::SendFailed)
}

fn response_type() -> ResponseType {
Expand Down
30 changes: 29 additions & 1 deletion beacon_node/network/src/sync/block_lookups/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -408,7 +408,10 @@ impl<T: BeaconChainTypes> BlockLookups<T> {
self.on_processing_result_inner::<BlobRequestState<T::EthSpec>>(id, result, cx)
}
};
self.on_lookup_result(process_type.id(), lookup_result, "processing_result", cx);
let id = match process_type {
BlockProcessType::SingleBlock { id } | BlockProcessType::SingleBlob { id } => id,
};
self.on_lookup_result(id, lookup_result, "processing_result", cx);
}

pub fn on_processing_result_inner<R: RequestState<T>>(
Expand Down Expand Up @@ -521,6 +524,7 @@ impl<T: BeaconChainTypes> BlockLookups<T> {
}
other => {
debug!(self.log, "Invalid lookup component"; "block_root" => ?block_root, "component" => ?R::response_type(), "error" => ?other);

let peer_id = request_state.on_processing_failure()?;
cx.report_peer(
peer_id,
Expand Down Expand Up @@ -561,6 +565,30 @@ impl<T: BeaconChainTypes> BlockLookups<T> {
}
}

pub fn on_external_processing_result(
&mut self,
block_root: Hash256,
imported: bool,
cx: &mut SyncNetworkContext<T>,
) {
let Some((id, lookup)) = self
.single_block_lookups
.iter_mut()
.find(|(_, lookup)| lookup.is_for_block(block_root))
else {
// Ok to ignore gossip process events
return;
};

let lookup_result = if imported {
Ok(LookupResult::Completed)
} else {
lookup.continue_requests(cx)
};
let id = *id;
self.on_lookup_result(id, lookup_result, "external_processing_result", cx);
realbigsean marked this conversation as resolved.
Show resolved Hide resolved
}

/// Makes progress on the immediate children of `block_root`
pub fn continue_child_lookups(&mut self, block_root: Hash256, cx: &mut SyncNetworkContext<T>) {
let mut lookup_results = vec![]; // < need to buffer lookup results to not re-borrow &mut self
Expand Down
35 changes: 17 additions & 18 deletions beacon_node/network/src/sync/block_lookups/single_block_lookup.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ use super::common::ResponseType;
use super::{BlockComponent, PeerId, SINGLE_BLOCK_LOOKUP_MAX_ATTEMPTS};
use crate::sync::block_lookups::common::RequestState;
use crate::sync::block_lookups::Id;
use crate::sync::network_context::SyncNetworkContext;
use crate::sync::network_context::{LookupRequestResult, SyncNetworkContext};
use beacon_chain::BeaconChainTypes;
use itertools::Itertools;
use rand::seq::IteratorRandom;
Expand Down Expand Up @@ -179,11 +179,13 @@ impl<T: BeaconChainTypes> SingleBlockLookup<T> {
.use_rand_available_peer()
.ok_or(LookupRequestError::NoPeers)?;

// make_request returns true only if a request needs to be made
if request.make_request(id, peer_id, downloaded_block_expected_blobs, cx)? {
request.get_state_mut().on_download_start()?;
} else {
request.get_state_mut().on_completed_request()?;
match request.make_request(id, peer_id, downloaded_block_expected_blobs, cx)? {
LookupRequestResult::RequestSent => request.get_state_mut().on_download_start()?,
LookupRequestResult::NoRequestNeeded => {
request.get_state_mut().on_completed_request()?
}
// Sync will receive a future event to make progress on the request, do nothing now
LookupRequestResult::Pending => return Ok(()),
}

// Otherwise, attempt to progress awaiting processing
Expand Down Expand Up @@ -262,12 +264,16 @@ pub struct DownloadResult<T: Clone> {
pub peer_id: PeerId,
}

#[derive(Debug, PartialEq, Eq)]
#[derive(Debug, PartialEq, Eq, IntoStaticStr)]
pub enum State<T: Clone> {
AwaitingDownload,
Downloading,
AwaitingProcess(DownloadResult<T>),
/// Request is processing, sent by lookup sync
Processing(DownloadResult<T>),
/// Request is processed:
/// - `Processed(Some)` if lookup sync downloaded and sent to process this request
/// - `Processed(None)` if another source (i.e. gossip) sent this component for processing
Processed(Option<PeerId>),
}

Expand Down Expand Up @@ -428,12 +434,11 @@ impl<T: Clone> SingleLookupRequestState<T> {
}
}

pub fn on_processing_success(&mut self) -> Result<PeerId, LookupRequestError> {
pub fn on_processing_success(&mut self) -> Result<(), LookupRequestError> {
match &self.state {
State::Processing(result) => {
let peer_id = result.peer_id;
self.state = State::Processed(Some(peer_id));
Ok(peer_id)
self.state = State::Processed(Some(result.peer_id));
Ok(())
}
other => Err(LookupRequestError::BadState(format!(
"Bad state on_processing_success expected Processing got {other}"
Expand Down Expand Up @@ -514,12 +519,6 @@ impl<T: Clone> SingleLookupRequestState<T> {

impl<T: Clone> std::fmt::Display for State<T> {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
State::AwaitingDownload => write!(f, "AwaitingDownload"),
State::Downloading { .. } => write!(f, "Downloading"),
State::AwaitingProcess { .. } => write!(f, "AwaitingProcessing"),
State::Processing { .. } => write!(f, "Processing"),
State::Processed { .. } => write!(f, "Processed"),
}
write!(f, "{}", Into::<&'static str>::into(self))
}
}
Loading
Loading