From 90c30b329bd3dfe57c2ba99765d5ef49fb8cb3a7 Mon Sep 17 00:00:00 2001 From: dapplion <35266934+dapplion@users.noreply.github.com> Date: Mon, 6 May 2024 22:52:48 +0900 Subject: [PATCH] Notify lookup sync of gossip processing results --- .../src/data_availability_checker.rs | 2 +- .../gossip_methods.rs | 26 ++++++--- .../src/network_beacon_processor/mod.rs | 17 +++--- .../network_beacon_processor/sync_methods.rs | 50 +++++++++-------- .../network/src/sync/block_lookups/common.rs | 35 ++++++------ .../network/src/sync/block_lookups/mod.rs | 47 +++++++++++----- .../sync/block_lookups/single_block_lookup.rs | 56 ++++++++++++------- .../network/src/sync/block_lookups/tests.rs | 6 +- beacon_node/network/src/sync/manager.rs | 25 +++++---- .../network/src/sync/network_context.rs | 53 ++++++++++++------ 10 files changed, 194 insertions(+), 123 deletions(-) diff --git a/beacon_node/beacon_chain/src/data_availability_checker.rs b/beacon_node/beacon_chain/src/data_availability_checker.rs index 27ed0ae6d56..bc5b284b581 100644 --- a/beacon_node/beacon_chain/src/data_availability_checker.rs +++ b/beacon_node/beacon_chain/src/data_availability_checker.rs @@ -84,7 +84,7 @@ impl DataAvailabilityChecker { }) } - /// Checks if the block root is currenlty in the availability cache awaiting processing because + /// Checks if the block root is currenlty in the availability cache awaiting import because /// of missing components. pub fn has_block(&self, block_root: &Hash256) -> bool { self.availability_cache.has_block(block_root) diff --git a/beacon_node/network/src/network_beacon_processor/gossip_methods.rs b/beacon_node/network/src/network_beacon_processor/gossip_methods.rs index af7f3a53e56..f4da4adea1f 100644 --- a/beacon_node/network/src/network_beacon_processor/gossip_methods.rs +++ b/beacon_node/network/src/network_beacon_processor/gossip_methods.rs @@ -2,7 +2,10 @@ use crate::{ metrics, network_beacon_processor::{InvalidBlockStorage, NetworkBeaconProcessor}, service::NetworkMessage, - sync::SyncMessage, + sync::{ + manager::{BlockProcessSource, BlockProcessType}, + SyncMessage, + }, }; use beacon_chain::blob_verification::{GossipBlobError, GossipVerifiedBlob}; use beacon_chain::block_verification_types::AsBlock; @@ -1187,19 +1190,18 @@ impl NetworkBeaconProcessor { "block_root" => %block_root, ); } - Err(BlockError::ParentUnknown(block)) => { - // Inform the sync manager to find parents for this block - // This should not occur. It should be checked by `should_forward_block` + Err(BlockError::ParentUnknown(_)) => { + // This should not occur. It should be checked by `should_forward_block`. + // Do not send sync message UnknownParentBlock to prevent conflicts with the + // BlockComponentProcessed message below. If this error ever happens, lookup sync + // can recover by receiving another block / blob / attestation referencing the + // chain that includes this block. error!( self.log, "Block with unknown parent attempted to be processed"; + "block_root" => %block_root, "peer_id" => %peer_id ); - self.send_sync_message(SyncMessage::UnknownParentBlock( - peer_id, - block.clone(), - block_root, - )); } Err(ref e @ BlockError::ExecutionPayloadError(ref epe)) if !epe.penalize_peer() => { debug!( @@ -1263,6 +1265,12 @@ impl NetworkBeaconProcessor { &self.log, ); } + + self.send_sync_message(SyncMessage::BlockComponentProcessed { + process_type: BlockProcessType::SingleBlock, + source: BlockProcessSource::Gossip(block_root), + result: result.into(), + }); } pub fn process_gossip_voluntary_exit( diff --git a/beacon_node/network/src/network_beacon_processor/mod.rs b/beacon_node/network/src/network_beacon_processor/mod.rs index f10646c7414..64bf3dcd01a 100644 --- a/beacon_node/network/src/network_beacon_processor/mod.rs +++ b/beacon_node/network/src/network_beacon_processor/mod.rs @@ -1,6 +1,6 @@ use crate::{ service::NetworkMessage, - sync::{manager::BlockProcessType, SyncMessage}, + sync::{manager::BlockProcessSource, SyncMessage}, }; use beacon_chain::block_verification_types::RpcBlock; use beacon_chain::{builder::Witness, eth1_chain::CachingEth1Backend, BeaconChain}; @@ -407,13 +407,13 @@ impl NetworkBeaconProcessor { block_root: Hash256, block: RpcBlock, seen_timestamp: Duration, - process_type: BlockProcessType, + source: BlockProcessSource, ) -> Result<(), Error> { let process_fn = self.clone().generate_rpc_beacon_block_process_fn( block_root, block, seen_timestamp, - process_type, + source, ); self.try_send(BeaconWorkEvent { drop_during_sync: false, @@ -428,18 +428,15 @@ impl NetworkBeaconProcessor { block_root: Hash256, blobs: FixedBlobSidecarList, seen_timestamp: Duration, - process_type: BlockProcessType, + source: BlockProcessSource, ) -> Result<(), Error> { let blob_count = blobs.iter().filter(|b| b.is_some()).count(); if blob_count == 0 { return Ok(()); } - let process_fn = self.clone().generate_rpc_blobs_process_fn( - block_root, - blobs, - seen_timestamp, - process_type, - ); + let process_fn = + self.clone() + .generate_rpc_blobs_process_fn(block_root, blobs, seen_timestamp, source); self.try_send(BeaconWorkEvent { drop_during_sync: false, work: Work::RpcBlobs { process_fn }, diff --git a/beacon_node/network/src/network_beacon_processor/sync_methods.rs b/beacon_node/network/src/network_beacon_processor/sync_methods.rs index daa9a2cf197..c6eefdf494d 100644 --- a/beacon_node/network/src/network_beacon_processor/sync_methods.rs +++ b/beacon_node/network/src/network_beacon_processor/sync_methods.rs @@ -1,5 +1,6 @@ use crate::metrics; use crate::network_beacon_processor::{NetworkBeaconProcessor, FUTURE_SLOT_TOLERANCE}; +use crate::sync::manager::BlockProcessSource; use crate::sync::BatchProcessResult; use crate::sync::{ manager::{BlockProcessType, SyncMessage}, @@ -53,7 +54,7 @@ impl NetworkBeaconProcessor { block_root: Hash256, block: RpcBlock, seen_timestamp: Duration, - process_type: BlockProcessType, + source: BlockProcessSource, ) -> AsyncFn { let process_fn = async move { let reprocess_tx = self.reprocess_tx.clone(); @@ -62,7 +63,7 @@ impl NetworkBeaconProcessor { block_root, block, seen_timestamp, - process_type, + source, reprocess_tx, duplicate_cache, ) @@ -77,20 +78,21 @@ impl NetworkBeaconProcessor { block_root: Hash256, block: RpcBlock, seen_timestamp: Duration, - process_type: BlockProcessType, + source: BlockProcessSource, ) -> (AsyncFn, BlockingFn) { // An async closure which will import the block. let process_fn = self.clone().generate_rpc_beacon_block_process_fn( block_root, block, seen_timestamp, - process_type.clone(), + source.clone(), ); // A closure which will ignore the block. let ignore_fn = move || { // Sync handles these results self.send_sync_message(SyncMessage::BlockComponentProcessed { - process_type, + process_type: BlockProcessType::SingleBlock, + source, result: crate::sync::manager::BlockProcessingResult::Ignored, }); }; @@ -104,7 +106,7 @@ impl NetworkBeaconProcessor { block_root: Hash256, block: RpcBlock, seen_timestamp: Duration, - process_type: BlockProcessType, + source: BlockProcessSource, reprocess_tx: mpsc::Sender, duplicate_cache: DuplicateCache, ) { @@ -115,7 +117,7 @@ impl NetworkBeaconProcessor { "Gossip block is being processed"; "action" => "sending rpc block to reprocessing queue", "block_root" => %block_root, - "process_type" => ?process_type, + "source" => ?source, ); // Send message to work reprocess queue to retry the block @@ -123,7 +125,7 @@ impl NetworkBeaconProcessor { block_root, block, seen_timestamp, - process_type, + source, ); let reprocess_msg = ReprocessQueueMessage::RpcBlock(QueuedRpcBlock { beacon_block_root: block_root, @@ -148,7 +150,7 @@ impl NetworkBeaconProcessor { "proposer" => block.message().proposer_index(), "slot" => block.slot(), "commitments" => commitments_formatted, - "process_type" => ?process_type, + "source" => ?source, ); let result = self @@ -170,21 +172,20 @@ impl NetworkBeaconProcessor { if reprocess_tx.try_send(reprocess_msg).is_err() { error!(self.log, "Failed to inform block import"; "source" => "rpc", "block_root" => %hash) }; - if matches!(process_type, BlockProcessType::SingleBlock { .. }) { - self.chain.block_times_cache.write().set_time_observed( - hash, - slot, - seen_timestamp, - None, - None, - ); + self.chain.block_times_cache.write().set_time_observed( + hash, + slot, + seen_timestamp, + None, + None, + ); - self.chain.recompute_head_at_current_slot().await; - } + self.chain.recompute_head_at_current_slot().await; } // Sync handles these results self.send_sync_message(SyncMessage::BlockComponentProcessed { - process_type, + process_type: BlockProcessType::SingleBlock, + source, result: result.into(), }); @@ -201,11 +202,11 @@ impl NetworkBeaconProcessor { block_root: Hash256, blobs: FixedBlobSidecarList, seen_timestamp: Duration, - process_type: BlockProcessType, + source: BlockProcessSource, ) -> AsyncFn { let process_fn = async move { self.clone() - .process_rpc_blobs(block_root, blobs, seen_timestamp, process_type) + .process_rpc_blobs(block_root, blobs, seen_timestamp, source) .await; }; Box::pin(process_fn) @@ -217,7 +218,7 @@ impl NetworkBeaconProcessor { block_root: Hash256, blobs: FixedBlobSidecarList, seen_timestamp: Duration, - process_type: BlockProcessType, + source: BlockProcessSource, ) { let Some(slot) = blobs .iter() @@ -298,7 +299,8 @@ impl NetworkBeaconProcessor { // Sync handles these results self.send_sync_message(SyncMessage::BlockComponentProcessed { - process_type, + process_type: BlockProcessType::SingleBlob, + source, result: result.into(), }); } diff --git a/beacon_node/network/src/sync/block_lookups/common.rs b/beacon_node/network/src/sync/block_lookups/common.rs index 06e00ea6d1e..8f4cc0c07bb 100644 --- a/beacon_node/network/src/sync/block_lookups/common.rs +++ b/beacon_node/network/src/sync/block_lookups/common.rs @@ -4,8 +4,8 @@ use crate::sync::block_lookups::single_block_lookup::{ use crate::sync::block_lookups::{ BlobRequestState, BlockRequestState, PeerId, SINGLE_BLOCK_LOOKUP_MAX_ATTEMPTS, }; -use crate::sync::manager::{BlockProcessType, Id, SLOT_IMPORT_TOLERANCE}; -use crate::sync::network_context::SyncNetworkContext; +use crate::sync::manager::{Id, SLOT_IMPORT_TOLERANCE}; +use crate::sync::network_context::{LookupRequestResult, SyncNetworkContext}; use beacon_chain::block_verification_types::RpcBlock; use beacon_chain::BeaconChainTypes; use std::sync::Arc; @@ -66,11 +66,15 @@ pub trait RequestState { .use_rand_available_peer() .ok_or(LookupRequestError::NoPeers)?; - // make_request returns true only if a request needs to be made - if self.make_request(id, peer_id, downloaded_block_expected_blobs, cx)? { - self.get_state_mut().on_download_start()?; - } else { - self.get_state_mut().on_completed_request()?; + match self.make_request(id, peer_id, downloaded_block_expected_blobs, cx)? { + LookupRequestResult::RequestSent => self.get_state_mut().on_download_start()?, + LookupRequestResult::NoRequestNeeded => { + self.get_state_mut().on_completed_request()? + } + + LookupRequestResult::AwaitingOtherSource => { + self.get_state_mut().on_processing_from_other_source()? + } } // Otherwise, attempt to progress awaiting processing @@ -98,7 +102,7 @@ pub trait RequestState { peer_id: PeerId, downloaded_block_expected_blobs: Option, cx: &mut SyncNetworkContext, - ) -> Result; + ) -> Result; /* Response handling methods */ @@ -133,7 +137,7 @@ impl RequestState for BlockRequestState { peer_id: PeerId, _: Option, cx: &mut SyncNetworkContext, - ) -> Result { + ) -> Result { cx.block_lookup_request(id, peer_id, self.requested_block_root) .map_err(LookupRequestError::SendFailed) } @@ -150,10 +154,10 @@ impl RequestState for BlockRequestState { peer_id: _, } = download_result; cx.send_block_for_processing( + id, block_root, RpcBlock::new_without_blobs(Some(block_root), value), seen_timestamp, - BlockProcessType::SingleBlock { id }, ) .map_err(LookupRequestError::SendFailed) } @@ -181,7 +185,7 @@ impl RequestState for BlobRequestState { peer_id: PeerId, downloaded_block_expected_blobs: Option, cx: &mut SyncNetworkContext, - ) -> Result { + ) -> Result { cx.blob_lookup_request( id, peer_id, @@ -202,13 +206,8 @@ impl RequestState for BlobRequestState { seen_timestamp, peer_id: _, } = download_result; - cx.send_blobs_for_processing( - block_root, - value, - seen_timestamp, - BlockProcessType::SingleBlob { id }, - ) - .map_err(LookupRequestError::SendFailed) + cx.send_blobs_for_processing(id, block_root, value, seen_timestamp) + .map_err(LookupRequestError::SendFailed) } fn response_type() -> ResponseType { diff --git a/beacon_node/network/src/sync/block_lookups/mod.rs b/beacon_node/network/src/sync/block_lookups/mod.rs index 6852761d8bf..8f2a6adc220 100644 --- a/beacon_node/network/src/sync/block_lookups/mod.rs +++ b/beacon_node/network/src/sync/block_lookups/mod.rs @@ -1,7 +1,7 @@ use self::parent_chain::{compute_parent_chains, NodeChain}; pub use self::single_block_lookup::DownloadResult; use self::single_block_lookup::{LookupRequestError, LookupResult, SingleBlockLookup}; -use super::manager::{BlockProcessType, BlockProcessingResult}; +use super::manager::{BlockProcessSource, BlockProcessType, BlockProcessingResult}; use super::network_context::{RpcProcessingResult, SyncNetworkContext}; use crate::metrics; use crate::sync::block_lookups::common::{ResponseType, PARENT_DEPTH_TOLERANCE}; @@ -392,18 +392,35 @@ impl BlockLookups { pub fn on_processing_result( &mut self, process_type: BlockProcessType, + source: BlockProcessSource, result: BlockProcessingResult, cx: &mut SyncNetworkContext, ) { + let id = match source { + BlockProcessSource::Rpc(id) => id, + BlockProcessSource::Gossip(block_root) => { + if let Some(lookup) = self + .single_block_lookups + .iter() + .find(|(_, lookup)| lookup.is_for_block(block_root)) + { + *lookup.0 + } else { + // Ok to ignore gossip process events + return; + } + } + }; + let lookup_result = match process_type { - BlockProcessType::SingleBlock { id } => { + BlockProcessType::SingleBlock => { self.on_processing_result_inner::>(id, result, cx) } - BlockProcessType::SingleBlob { id } => { + BlockProcessType::SingleBlob => { self.on_processing_result_inner::>(id, result, cx) } }; - self.on_lookup_result(process_type.id(), lookup_result, "processing_result", cx); + self.on_lookup_result(id, lookup_result, "processing_result", cx); } pub fn on_processing_result_inner>( @@ -515,15 +532,19 @@ impl BlockLookups { } other => { debug!(self.log, "Invalid lookup component"; "block_root" => %block_root, "component" => ?R::response_type(), "error" => ?other); - let peer_id = request_state.on_processing_failure()?; - cx.report_peer( - peer_id, - PeerAction::MidToleranceError, - match R::response_type() { - ResponseType::Block => "lookup_block_processing_failure", - ResponseType::Blob => "lookup_blobs_processing_failure", - }, - ); + + // If this request is processing from gossip, on_processing_failure returns + // None. That is ok, as the gossip handler will downscore the peer in case of errors. + if let Some(peer_id) = request_state.on_processing_failure()? { + cx.report_peer( + peer_id, + PeerAction::MidToleranceError, + match R::response_type() { + ResponseType::Block => "lookup_block_processing_failure", + ResponseType::Blob => "lookup_blobs_processing_failure", + }, + ); + } Action::Retry } diff --git a/beacon_node/network/src/sync/block_lookups/single_block_lookup.rs b/beacon_node/network/src/sync/block_lookups/single_block_lookup.rs index b642ec8e5b2..c8ac8fd6f27 100644 --- a/beacon_node/network/src/sync/block_lookups/single_block_lookup.rs +++ b/beacon_node/network/src/sync/block_lookups/single_block_lookup.rs @@ -150,7 +150,7 @@ impl SingleBlockLookup { } } - /// Wrapper around `RequestState::continue_request` to inject lookup data + /// Potentially makes progress on this request if it's in a progress-able state pub fn continue_request>( &mut self, cx: &mut SyncNetworkContext, @@ -247,12 +247,18 @@ pub struct DownloadResult { pub peer_id: PeerId, } -#[derive(Debug, PartialEq, Eq)] +#[derive(Debug, PartialEq, Eq, IntoStaticStr)] pub enum State { AwaitingDownload, Downloading, AwaitingProcess(DownloadResult), - Processing(DownloadResult), + /// Request is processing: + /// - `Processing(Some)` if lookup sync downloaded and sent to process this request + /// - `Processing(None)` if another source (i.e. gossip) sent this block for processing + Processing(Option>), + /// Request is processed: + /// - `Processed(Some)` if lookup sync downloaded and sent to process this request + /// - `Processed(None)` if another source (i.e. gossip) sent this component for processing Processed(Option), } @@ -312,7 +318,7 @@ impl SingleLookupRequestState { State::AwaitingDownload => None, State::Downloading { .. } => None, State::AwaitingProcess(result) => Some(&result.value), - State::Processing(result) => Some(&result.value), + State::Processing(result) => result.as_ref().map(|r| &r.value), State::Processed { .. } => None, } } @@ -377,7 +383,7 @@ impl SingleLookupRequestState { match &self.state { State::AwaitingProcess(result) => { let result = result.clone(); - self.state = State::Processing(result.clone()); + self.state = State::Processing(Some(result.clone())); Some(result) } _ => None, @@ -388,10 +394,13 @@ impl SingleLookupRequestState { /// processing latter. pub fn revert_to_awaiting_processing(&mut self) -> Result<(), LookupRequestError> { match &self.state { - State::Processing(result) => { + State::Processing(Some(result)) => { self.state = State::AwaitingProcess(result.clone()); Ok(()) } + State::Processing(None) => Err(LookupRequestError::BadState( + "Can not revert to AwaitingProcessing a request sent for processing from another source".to_owned(), + )), other => Err(LookupRequestError::BadState(format!( "Bad state on revert_to_awaiting_processing expected Processing got {other}" ))), @@ -399,10 +408,10 @@ impl SingleLookupRequestState { } /// Registers a failure in processing a block. - pub fn on_processing_failure(&mut self) -> Result { + pub fn on_processing_failure(&mut self) -> Result, LookupRequestError> { match &self.state { State::Processing(result) => { - let peer_id = result.peer_id; + let peer_id = result.as_ref().map(|r| r.peer_id); self.failed_processing = self.failed_processing.saturating_add(1); self.state = State::AwaitingDownload; Ok(peer_id) @@ -413,12 +422,12 @@ impl SingleLookupRequestState { } } - pub fn on_processing_success(&mut self) -> Result { + pub fn on_processing_success(&mut self) -> Result<(), LookupRequestError> { match &self.state { State::Processing(result) => { - let peer_id = result.peer_id; - self.state = State::Processed(Some(peer_id)); - Ok(peer_id) + let peer_id = result.as_ref().map(|r| r.peer_id); + self.state = State::Processed(peer_id); + Ok(()) } other => Err(LookupRequestError::BadState(format!( "Bad state on_processing_success expected Processing got {other}" @@ -455,6 +464,21 @@ impl SingleLookupRequestState { } } + /// Mark a request as complete without any download or processing + pub fn on_processing_from_other_source(&mut self) -> Result<(), LookupRequestError> { + match &self.state { + State::AwaitingDownload => { + // Can't track downloaded data if component is processing from gossip. No need to + // either, gossip handler will downscore the peer on error. + self.state = State::Processing(None); + Ok(()) + } + other => Err(LookupRequestError::BadState(format!( + "Bad state on_processing_another_thread expected AwaitingDownload got {other}" + ))), + } + } + /// The total number of failures, whether it be processing or downloading. pub fn failed_attempts(&self) -> u8 { self.failed_processing + self.failed_downloading @@ -496,12 +520,6 @@ impl SingleLookupRequestState { impl std::fmt::Display for State { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - match self { - State::AwaitingDownload => write!(f, "AwaitingDownload"), - State::Downloading { .. } => write!(f, "Downloading"), - State::AwaitingProcess { .. } => write!(f, "AwaitingProcessing"), - State::Processing { .. } => write!(f, "Processing"), - State::Processed { .. } => write!(f, "Processed"), - } + write!(f, "{}", Into::<&'static str>::into(self)) } } diff --git a/beacon_node/network/src/sync/block_lookups/tests.rs b/beacon_node/network/src/sync/block_lookups/tests.rs index 302a0489c3b..e950ba9f5c8 100644 --- a/beacon_node/network/src/sync/block_lookups/tests.rs +++ b/beacon_node/network/src/sync/block_lookups/tests.rs @@ -341,7 +341,8 @@ impl TestRig { fn single_block_component_processed(&mut self, id: Id, result: BlockProcessingResult) { self.send_sync_message(SyncMessage::BlockComponentProcessed { - process_type: BlockProcessType::SingleBlock { id }, + process_type: BlockProcessType::SingleBlock, + source: BlockProcessSource::Rpc(id), result, }) } @@ -356,7 +357,8 @@ impl TestRig { fn single_blob_component_processed(&mut self, id: Id, result: BlockProcessingResult) { self.send_sync_message(SyncMessage::BlockComponentProcessed { - process_type: BlockProcessType::SingleBlob { id }, + process_type: BlockProcessType::SingleBlob, + source: BlockProcessSource::Rpc(id), result, }) } diff --git a/beacon_node/network/src/sync/manager.rs b/beacon_node/network/src/sync/manager.rs index 0836d97c49f..77501b71965 100644 --- a/beacon_node/network/src/sync/manager.rs +++ b/beacon_node/network/src/sync/manager.rs @@ -142,6 +142,7 @@ pub enum SyncMessage { /// Block processed BlockComponentProcessed { process_type: BlockProcessType, + source: BlockProcessSource, result: BlockProcessingResult, }, } @@ -149,16 +150,14 @@ pub enum SyncMessage { /// The type of processing specified for a received block. #[derive(Debug, Clone)] pub enum BlockProcessType { - SingleBlock { id: Id }, - SingleBlob { id: Id }, + SingleBlock, + SingleBlob, } -impl BlockProcessType { - pub fn id(&self) -> Id { - match self { - BlockProcessType::SingleBlock { id } | BlockProcessType::SingleBlob { id } => *id, - } - } +#[derive(Debug, Clone)] +pub enum BlockProcessSource { + Rpc(Id), + Gossip(Hash256), } #[derive(Debug)] @@ -628,10 +627,14 @@ impl SyncManager { } => self.inject_error(peer_id, request_id, error), SyncMessage::BlockComponentProcessed { process_type, + source, result, - } => self - .block_lookups - .on_processing_result(process_type, result, &mut self.network), + } => self.block_lookups.on_processing_result( + process_type, + source, + result, + &mut self.network, + ), SyncMessage::BatchProcessed { sync_type, result } => match sync_type { ChainSegmentProcessId::RangeBatchId(chain_id, epoch) => { self.range_sync.handle_block_process_result( diff --git a/beacon_node/network/src/sync/network_context.rs b/beacon_node/network/src/sync/network_context.rs index 88495a5b350..fdcd6184098 100644 --- a/beacon_node/network/src/sync/network_context.rs +++ b/beacon_node/network/src/sync/network_context.rs @@ -4,13 +4,13 @@ use self::requests::{ActiveBlobsByRootRequest, ActiveBlocksByRootRequest}; pub use self::requests::{BlobsByRootSingleBlockRequest, BlocksByRootSingleRequest}; use super::block_sidecar_coupling::BlocksAndBlobsRequestInfo; -use super::manager::{BlockProcessType, Id, RequestId as SyncRequestId}; +use super::manager::{Id, RequestId as SyncRequestId}; use super::range_sync::{BatchId, ByRangeRequestType, ChainId}; use crate::network_beacon_processor::NetworkBeaconProcessor; use crate::service::{NetworkMessage, RequestId}; use crate::status::ToStatusMessage; use crate::sync::block_lookups::SingleLookupId; -use crate::sync::manager::SingleLookupReqId; +use crate::sync::manager::{BlockProcessSource, SingleLookupReqId}; use beacon_chain::block_verification_types::RpcBlock; use beacon_chain::validator_monitor::timestamp_now; use beacon_chain::{BeaconChain, BeaconChainTypes, EngineState}; @@ -81,6 +81,18 @@ impl From for LookupFailure { } } +pub enum LookupRequestResult { + /// A request is sent. Sync MUST receive an event from the network in the future for either: + /// completed response or failed request + RequestSent, + /// No request is sent, and no further action is necessary to consider this request completed + NoRequestNeeded, + /// No request is sent, but the request is not completed. Request is processing from a different + /// source (i.e. block received from gossip) and sync MUST receive an event with that processing + /// result. + AwaitingOtherSource, +} + /// Wraps a Network channel to employ various RPC related network functionality for the Sync manager. This includes management of a global RPC request Id. pub struct SyncNetworkContext { /// The network channel to relay messages to the Network service. @@ -305,14 +317,20 @@ impl SyncNetworkContext { lookup_id: SingleLookupId, peer_id: PeerId, block_root: Hash256, - ) -> Result { + ) -> Result { + // da_checker includes block that are execution verified, but are missing components + if self.chain.data_availability_checker.has_block(&block_root) { + return Ok(LookupRequestResult::NoRequestNeeded); + } + + // reqresp_pre_import_cache includes blocks that may not be yet execution verified if self .chain .reqresp_pre_import_cache .read() .contains_key(&block_root) { - return Ok(false); + return Ok(LookupRequestResult::AwaitingOtherSource); } let id = SingleLookupReqId { @@ -340,7 +358,7 @@ impl SyncNetworkContext { self.blocks_by_root_requests .insert(id, ActiveBlocksByRootRequest::new(request)); - Ok(true) + Ok(LookupRequestResult::RequestSent) } /// Request necessary blobs for `block_root`. Requests only the necessary blobs by checking: @@ -355,7 +373,7 @@ impl SyncNetworkContext { peer_id: PeerId, block_root: Hash256, downloaded_block_expected_blobs: Option, - ) -> Result { + ) -> Result { let expected_blobs = downloaded_block_expected_blobs .or_else(|| { self.chain @@ -387,7 +405,7 @@ impl SyncNetworkContext { if indices.is_empty() { // No blobs required, do not issue any request - return Ok(false); + return Ok(LookupRequestResult::NoRequestNeeded); } let id = SingleLookupReqId { @@ -419,7 +437,7 @@ impl SyncNetworkContext { self.blobs_by_root_requests .insert(id, ActiveBlobsByRootRequest::new(request)); - Ok(true) + Ok(LookupRequestResult::RequestSent) } pub fn is_execution_engine_online(&self) -> bool { @@ -595,19 +613,19 @@ impl SyncNetworkContext { pub fn send_block_for_processing( &self, + id: Id, block_root: Hash256, block: RpcBlock, duration: Duration, - process_type: BlockProcessType, ) -> Result<(), &'static str> { match self.beacon_processor_if_enabled() { Some(beacon_processor) => { - debug!(self.log, "Sending block for processing"; "block" => ?block_root, "process" => ?process_type); + debug!(self.log, "Sending block for processing"; "block" => ?block_root, "id" => id); if let Err(e) = beacon_processor.send_rpc_beacon_block( block_root, block, duration, - process_type, + BlockProcessSource::Rpc(id), ) { error!( self.log, @@ -628,17 +646,20 @@ impl SyncNetworkContext { pub fn send_blobs_for_processing( &self, + id: Id, block_root: Hash256, blobs: FixedBlobSidecarList, duration: Duration, - process_type: BlockProcessType, ) -> Result<(), &'static str> { match self.beacon_processor_if_enabled() { Some(beacon_processor) => { - debug!(self.log, "Sending blobs for processing"; "block" => ?block_root, "process_type" => ?process_type); - if let Err(e) = - beacon_processor.send_rpc_blobs(block_root, blobs, duration, process_type) - { + debug!(self.log, "Sending blobs for processing"; "block" => ?block_root, "id" => id); + if let Err(e) = beacon_processor.send_rpc_blobs( + block_root, + blobs, + duration, + BlockProcessSource::Rpc(id), + ) { error!( self.log, "Failed to send sync blobs to processor";