From 05bea70300e541755ed86e4e0bd26811e6c7c1ee Mon Sep 17 00:00:00 2001 From: dapplion <35266934+dapplion@users.noreply.github.com> Date: Wed, 14 Aug 2024 11:47:56 +0300 Subject: [PATCH 1/3] Add sync lookup custody request state --- beacon_node/beacon_chain/src/beacon_chain.rs | 21 ++++ .../src/data_availability_checker.rs | 9 ++ .../overflow_lru_cache.rs | 10 +- .../lighthouse_network/src/types/globals.rs | 8 +- .../network/src/sync/block_lookups/common.rs | 70 +++++++++-- .../network/src/sync/block_lookups/mod.rs | 5 +- .../sync/block_lookups/single_block_lookup.rs | 49 +++++--- .../network/src/sync/block_lookups/tests.rs | 1 + .../network/src/sync/network_context.rs | 115 +++++++++++++++++- 9 files changed, 253 insertions(+), 35 deletions(-) diff --git a/beacon_node/beacon_chain/src/beacon_chain.rs b/beacon_node/beacon_chain/src/beacon_chain.rs index 3bf75284779..eaf20168a64 100644 --- a/beacon_node/beacon_chain/src/beacon_chain.rs +++ b/beacon_node/beacon_chain/src/beacon_chain.rs @@ -6777,6 +6777,27 @@ impl BeaconChain { self.data_availability_checker.data_availability_boundary() } + /// Returns true if epoch is within the data availability boundary + pub fn is_within_data_availability_boundary(&self, epoch: Epoch) -> bool { + if let Some(boundary) = self.data_availability_boundary() { + epoch > boundary + } else { + false + } + } + + /// Returns true if we should fetch blobs for this block + pub fn should_fetch_blobs(&self, block_epoch: Epoch) -> bool { + self.is_within_data_availability_boundary(block_epoch) + && !self.spec.is_peer_das_enabled_for_epoch(block_epoch) + } + + /// Returns true if we should fetch custody columns for this block + pub fn should_fetch_custody_columns(&self, block_epoch: Epoch) -> bool { + self.is_within_data_availability_boundary(block_epoch) + && self.spec.is_peer_das_enabled_for_epoch(block_epoch) + } + pub fn logger(&self) -> &Logger { &self.log } diff --git a/beacon_node/beacon_chain/src/data_availability_checker.rs b/beacon_node/beacon_chain/src/data_availability_checker.rs index b4336a054e2..81f914d0182 100644 --- a/beacon_node/beacon_chain/src/data_availability_checker.rs +++ b/beacon_node/beacon_chain/src/data_availability_checker.rs @@ -148,6 +148,15 @@ impl DataAvailabilityChecker { }) } + /// Return the set of imported custody column indexes for `block_root`. Returns None if there is + /// no block component for `block_root`. + pub fn imported_custody_column_indexes(&self, block_root: &Hash256) -> Option> { + self.availability_cache + .peek_pending_components(block_root, |components| { + components.map(|components| components.get_cached_data_columns_indices()) + }) + } + /// Get a blob from the availability cache. pub fn get_blob( &self, diff --git a/beacon_node/beacon_chain/src/data_availability_checker/overflow_lru_cache.rs b/beacon_node/beacon_chain/src/data_availability_checker/overflow_lru_cache.rs index 6c9964bdf86..019b69eea76 100644 --- a/beacon_node/beacon_chain/src/data_availability_checker/overflow_lru_cache.rs +++ b/beacon_node/beacon_chain/src/data_availability_checker/overflow_lru_cache.rs @@ -14,7 +14,7 @@ use ssz_types::{FixedVector, VariableList}; use std::num::NonZeroUsize; use std::sync::Arc; use types::blob_sidecar::BlobIdentifier; -use types::{BlobSidecar, ChainSpec, Epoch, EthSpec, Hash256, SignedBeaconBlock}; +use types::{BlobSidecar, ChainSpec, ColumnIndex, Epoch, EthSpec, Hash256, SignedBeaconBlock}; /// This represents the components of a partially available block /// @@ -109,6 +109,14 @@ impl PendingComponents { self.verified_data_columns.len() } + /// Returns the indices of cached custody columns + pub fn get_cached_data_columns_indices(&self) -> Vec { + self.verified_data_columns + .iter() + .map(|d| d.index()) + .collect() + } + /// Inserts a block into the cache. pub fn insert_block(&mut self, block: DietAvailabilityPendingExecutedBlock) { *self.get_cached_block_mut() = Some(block) diff --git a/beacon_node/lighthouse_network/src/types/globals.rs b/beacon_node/lighthouse_network/src/types/globals.rs index f9ed2c9f740..2662993bff3 100644 --- a/beacon_node/lighthouse_network/src/types/globals.rs +++ b/beacon_node/lighthouse_network/src/types/globals.rs @@ -7,7 +7,7 @@ use crate::EnrExt; use crate::{Enr, GossipTopic, Multiaddr, PeerId}; use parking_lot::RwLock; use std::collections::HashSet; -use types::EthSpec; +use types::{ChainSpec, ColumnIndex, EthSpec}; pub struct NetworkGlobals { /// The current local ENR. @@ -110,6 +110,12 @@ impl NetworkGlobals { std::mem::replace(&mut *self.sync_state.write(), new_state) } + /// Compute custody data columns the node is assigned to custody. + pub fn custody_columns(&self, _spec: &ChainSpec) -> Vec { + let _enr = self.local_enr(); + todo!("implement ENR changes"); + } + /// TESTING ONLY. Build a dummy NetworkGlobals instance. pub fn new_test_globals(trusted_peers: Vec, log: &slog::Logger) -> NetworkGlobals { use crate::CombinedKeyExt; diff --git a/beacon_node/network/src/sync/block_lookups/common.rs b/beacon_node/network/src/sync/block_lookups/common.rs index e94e9589c0a..93789f08e34 100644 --- a/beacon_node/network/src/sync/block_lookups/common.rs +++ b/beacon_node/network/src/sync/block_lookups/common.rs @@ -1,14 +1,18 @@ use crate::sync::block_lookups::single_block_lookup::{ LookupRequestError, SingleBlockLookup, SingleLookupRequestState, }; -use crate::sync::block_lookups::{BlobRequestState, BlockRequestState, PeerId}; -use crate::sync::network_context::{LookupRequestResult, SyncNetworkContext}; +use crate::sync::block_lookups::{ + BlobRequestState, BlockRequestState, CustodyRequestState, PeerId, +}; +use crate::sync::network_context::{ + DownloadedBlockSummary, LookupRequestResult, SyncNetworkContext, +}; use beacon_chain::block_verification_types::RpcBlock; use beacon_chain::BeaconChainTypes; use lighthouse_network::service::api_types::Id; use std::sync::Arc; use types::blob_sidecar::FixedBlobSidecarList; -use types::SignedBeaconBlock; +use types::{DataColumnSidecarList, SignedBeaconBlock}; use super::single_block_lookup::DownloadResult; use super::SingleLookupId; @@ -17,6 +21,7 @@ use super::SingleLookupId; pub enum ResponseType { Block, Blob, + CustodyColumn, } /// This trait unifies common single block lookup functionality across blocks and blobs. This @@ -38,7 +43,7 @@ pub trait RequestState { &self, id: Id, peer_id: PeerId, - downloaded_block_expected_blobs: Option, + downloaded_block: Option, cx: &mut SyncNetworkContext, ) -> Result; @@ -73,7 +78,7 @@ impl RequestState for BlockRequestState { &self, id: SingleLookupId, peer_id: PeerId, - _: Option, + _: Option, cx: &mut SyncNetworkContext, ) -> Result { cx.block_lookup_request(id, peer_id, self.requested_block_root) @@ -121,16 +126,11 @@ impl RequestState for BlobRequestState { &self, id: Id, peer_id: PeerId, - downloaded_block_expected_blobs: Option, + downloaded_block: Option, cx: &mut SyncNetworkContext, ) -> Result { - cx.blob_lookup_request( - id, - peer_id, - self.block_root, - downloaded_block_expected_blobs, - ) - .map_err(LookupRequestError::SendFailedNetwork) + cx.blob_lookup_request(id, peer_id, self.block_root, downloaded_block) + .map_err(LookupRequestError::SendFailedNetwork) } fn send_for_processing( @@ -161,3 +161,47 @@ impl RequestState for BlobRequestState { &mut self.state } } + +impl RequestState for CustodyRequestState { + type VerifiedResponseType = DataColumnSidecarList; + + fn make_request( + &self, + id: Id, + // TODO(das): consider selecting peers that have custody but are in this set + _peer_id: PeerId, + downloaded_block: Option, + cx: &mut SyncNetworkContext, + ) -> Result { + cx.custody_lookup_request(id, self.block_root, downloaded_block) + .map_err(LookupRequestError::SendFailedNetwork) + } + + fn send_for_processing( + id: Id, + download_result: DownloadResult, + cx: &SyncNetworkContext, + ) -> Result<(), LookupRequestError> { + let DownloadResult { + value, + block_root, + seen_timestamp, + .. + } = download_result; + cx.send_custody_columns_for_processing(id, block_root, value, seen_timestamp) + .map_err(LookupRequestError::SendFailedProcessor) + } + + fn response_type() -> ResponseType { + ResponseType::CustodyColumn + } + fn request_state_mut(request: &mut SingleBlockLookup) -> &mut Self { + &mut request.custody_request_state + } + fn get_state(&self) -> &SingleLookupRequestState { + &self.state + } + fn get_state_mut(&mut self) -> &mut SingleLookupRequestState { + &mut self.state + } +} diff --git a/beacon_node/network/src/sync/block_lookups/mod.rs b/beacon_node/network/src/sync/block_lookups/mod.rs index 3b93b8072c3..7194faa2860 100644 --- a/beacon_node/network/src/sync/block_lookups/mod.rs +++ b/beacon_node/network/src/sync/block_lookups/mod.rs @@ -36,7 +36,7 @@ use fnv::FnvHashMap; use lighthouse_network::service::api_types::SingleLookupReqId; use lighthouse_network::{PeerAction, PeerId}; use lru_cache::LRUTimeCache; -pub use single_block_lookup::{BlobRequestState, BlockRequestState}; +pub use single_block_lookup::{BlobRequestState, BlockRequestState, CustodyRequestState}; use slog::{debug, error, warn, Logger}; use std::collections::hash_map::Entry; use std::sync::Arc; @@ -527,7 +527,7 @@ impl BlockLookups { // if both components have been processed. request_state.on_processing_success()?; - if lookup.both_components_processed() { + if lookup.all_components_processed() { // We don't request for other block components until being sure that the block has // data. If we request blobs / columns to a peer we are sure those must exist. // Therefore if all components are processed and we still receive `MissingComponents` @@ -599,6 +599,7 @@ impl BlockLookups { match R::response_type() { ResponseType::Block => "lookup_block_processing_failure", ResponseType::Blob => "lookup_blobs_processing_failure", + ResponseType::CustodyColumn => "lookup_custody_processing_failure", }, ); diff --git a/beacon_node/network/src/sync/block_lookups/single_block_lookup.rs b/beacon_node/network/src/sync/block_lookups/single_block_lookup.rs index 0466636fb7d..c5fa4eb5034 100644 --- a/beacon_node/network/src/sync/block_lookups/single_block_lookup.rs +++ b/beacon_node/network/src/sync/block_lookups/single_block_lookup.rs @@ -15,7 +15,7 @@ use std::time::{Duration, Instant}; use store::Hash256; use strum::IntoStaticStr; use types::blob_sidecar::FixedBlobSidecarList; -use types::{EthSpec, SignedBeaconBlock}; +use types::{DataColumnSidecarList, EthSpec, SignedBeaconBlock}; // Dedicated enum for LookupResult to force its usage #[must_use = "LookupResult must be handled with on_lookup_result"] @@ -63,6 +63,7 @@ pub struct SingleBlockLookup { pub id: Id, pub block_request_state: BlockRequestState, pub blob_request_state: BlobRequestState, + pub custody_request_state: CustodyRequestState, /// Peers that claim to have imported this set of block components #[derivative(Debug(format_with = "fmt_peer_set_as_len"))] peers: HashSet, @@ -82,6 +83,7 @@ impl SingleBlockLookup { id, block_request_state: BlockRequestState::new(requested_block_root), blob_request_state: BlobRequestState::new(requested_block_root), + custody_request_state: CustodyRequestState::new(requested_block_root), peers: HashSet::from_iter(peers.iter().copied()), block_root: requested_block_root, awaiting_parent, @@ -138,9 +140,10 @@ impl SingleBlockLookup { } /// Returns true if the block has already been downloaded. - pub fn both_components_processed(&self) -> bool { + pub fn all_components_processed(&self) -> bool { self.block_request_state.state.is_processed() && self.blob_request_state.state.is_processed() + && self.custody_request_state.state.is_processed() } /// Returns true if this request is expecting some event to make progress @@ -148,6 +151,7 @@ impl SingleBlockLookup { self.awaiting_parent.is_some() || self.block_request_state.state.is_awaiting_event() || self.blob_request_state.state.is_awaiting_event() + || self.custody_request_state.state.is_awaiting_event() } /// Makes progress on all requests of this lookup. Any error is not recoverable and must result @@ -159,13 +163,12 @@ impl SingleBlockLookup { // TODO: Check what's necessary to download, specially for blobs self.continue_request::>(cx)?; self.continue_request::>(cx)?; + self.continue_request::>(cx)?; // If all components of this lookup are already processed, there will be no future events // that can make progress so it must be dropped. Consider the lookup completed. // This case can happen if we receive the components from gossip during a retry. - if self.block_request_state.state.is_processed() - && self.blob_request_state.state.is_processed() - { + if self.all_components_processed() { Ok(LookupResult::Completed) } else { Ok(LookupResult::Pending) @@ -179,11 +182,11 @@ impl SingleBlockLookup { ) -> Result<(), LookupRequestError> { let id = self.id; let awaiting_parent = self.awaiting_parent.is_some(); - let downloaded_block_expected_blobs = self + let downloaded_block_summary = self .block_request_state .state .peek_downloaded_data() - .map(|block| block.num_expected_blobs()); + .map(|block| (block.num_expected_blobs(), block.slot())); let block_is_processed = self.block_request_state.state.is_processed(); let request = R::request_state_mut(self); @@ -210,7 +213,7 @@ impl SingleBlockLookup { }; let request = R::request_state_mut(self); - match request.make_request(id, peer_id, downloaded_block_expected_blobs, cx)? { + match request.make_request(id, peer_id, downloaded_block_summary, cx)? { LookupRequestResult::RequestSent(req_id) => { // Lookup sync event safety: If make_request returns `RequestSent`, we are // guaranteed that `BlockLookups::on_download_response` will be called exactly @@ -289,6 +292,24 @@ impl SingleBlockLookup { } } +/// The state of the block request component of a `SingleBlockLookup`. +#[derive(Derivative)] +#[derivative(Debug)] +pub struct BlockRequestState { + #[derivative(Debug = "ignore")] + pub requested_block_root: Hash256, + pub state: SingleLookupRequestState>>, +} + +impl BlockRequestState { + pub fn new(block_root: Hash256) -> Self { + Self { + requested_block_root: block_root, + state: SingleLookupRequestState::new(), + } + } +} + /// The state of the blob request component of a `SingleBlockLookup`. #[derive(Derivative)] #[derivative(Debug)] @@ -307,19 +328,19 @@ impl BlobRequestState { } } -/// The state of the block request component of a `SingleBlockLookup`. +/// The state of the blob request component of a `SingleBlockLookup`. #[derive(Derivative)] #[derivative(Debug)] -pub struct BlockRequestState { +pub struct CustodyRequestState { #[derivative(Debug = "ignore")] - pub requested_block_root: Hash256, - pub state: SingleLookupRequestState>>, + pub block_root: Hash256, + pub state: SingleLookupRequestState>, } -impl BlockRequestState { +impl CustodyRequestState { pub fn new(block_root: Hash256) -> Self { Self { - requested_block_root: block_root, + block_root, state: SingleLookupRequestState::new(), } } diff --git a/beacon_node/network/src/sync/block_lookups/tests.rs b/beacon_node/network/src/sync/block_lookups/tests.rs index ef2822fe563..bac76ccb849 100644 --- a/beacon_node/network/src/sync/block_lookups/tests.rs +++ b/beacon_node/network/src/sync/block_lookups/tests.rs @@ -723,6 +723,7 @@ impl TestRig { (ev.work_type() == beacon_processor::RPC_BLOBS).then_some(()) }) .unwrap_or_else(|e| panic!("Expected blobs work event: {e}")), + ResponseType::CustodyColumn => todo!(), } } diff --git a/beacon_node/network/src/sync/network_context.rs b/beacon_node/network/src/sync/network_context.rs index df8be9f6d59..5f938546ef2 100644 --- a/beacon_node/network/src/sync/network_context.rs +++ b/beacon_node/network/src/sync/network_context.rs @@ -25,7 +25,7 @@ use std::sync::Arc; use std::time::Duration; use tokio::sync::mpsc; use types::blob_sidecar::FixedBlobSidecarList; -use types::{BlobSidecar, EthSpec, Hash256, SignedBeaconBlock}; +use types::{BlobSidecar, DataColumnSidecarList, EthSpec, Hash256, SignedBeaconBlock, Slot}; mod requests; @@ -46,6 +46,10 @@ pub enum RangeRequestId { }, } +/// 0: expected blob count +/// 1: block slot +pub type DownloadedBlockSummary = (/* expected blob count */ usize, Slot); + #[derive(Debug)] pub enum RpcEvent { StreamTermination, @@ -447,16 +451,18 @@ impl SyncNetworkContext { lookup_id: SingleLookupId, peer_id: PeerId, block_root: Hash256, - downloaded_block_expected_blobs: Option, + downloaded_block: Option, ) -> Result { - let Some(expected_blobs) = downloaded_block_expected_blobs.or_else(|| { + let Some((expected_blobs, block_slot)) = downloaded_block.or_else(|| { // If the block is already being processed or fully validated, retrieve how many blobs // it expects. Consider any stage of the block. If the block root has been validated, we // can assert that this is the correct value of `blob_kzg_commitments_count`. match self.chain.get_block_process_status(&block_root) { BlockProcessStatus::Unknown => None, BlockProcessStatus::NotValidated(block) - | BlockProcessStatus::ExecutionValidated(block) => Some(block.num_expected_blobs()), + | BlockProcessStatus::ExecutionValidated(block) => { + Some((block.num_expected_blobs(), block.slot())) + } } }) else { // Wait to download the block before downloading blobs. Then we can be sure that the @@ -474,6 +480,14 @@ impl SyncNetworkContext { return Ok(LookupRequestResult::Pending("waiting for block download")); }; + // Check if we are into peerdas + if !self + .chain + .should_fetch_blobs(block_slot.epoch(T::EthSpec::slots_per_epoch())) + { + return Ok(LookupRequestResult::NoRequestNeeded); + } + let imported_blob_indexes = self .chain .data_availability_checker @@ -522,6 +536,79 @@ impl SyncNetworkContext { Ok(LookupRequestResult::RequestSent(req_id)) } + pub fn custody_lookup_request( + &mut self, + lookup_id: SingleLookupId, + block_root: Hash256, + downloaded_block: Option, + ) -> Result { + let Some((expected_blobs, block_slot)) = + downloaded_block.or_else(|| match self.chain.get_block_process_status(&block_root) { + BlockProcessStatus::Unknown => None, + BlockProcessStatus::NotValidated(block) + | BlockProcessStatus::ExecutionValidated(block) => { + Some((block.num_expected_blobs(), block.slot())) + } + }) + else { + // Wait to download the block before downloading columns. Then we can be sure that the + // block has data, so there's no need to do "blind" requests for all possible columns and + // latter handle the case where if the peer sent no columns, penalize. + // - if `downloaded_block_expected_blobs` is Some = block is downloading or processing. + // - if `num_expected_blobs` returns Some = block is processed. + return Ok(LookupRequestResult::Pending("waiting for block download")); + }; + + // Check if we are into peerdas + if !self + .chain + .should_fetch_custody_columns(block_slot.epoch(T::EthSpec::slots_per_epoch())) + { + return Ok(LookupRequestResult::NoRequestNeeded); + } + + // No data required for this block + if expected_blobs == 0 { + return Ok(LookupRequestResult::NoRequestNeeded); + } + + let custody_indexes_imported = self + .chain + .data_availability_checker + .imported_custody_column_indexes(&block_root) + .unwrap_or_default(); + + // TODO(das): figure out how to pass block.slot if we end up doing rotation + let custody_indexes_duty = self.network_globals().custody_columns(&self.chain.spec); + + // Include only the blob indexes not yet imported (received through gossip) + let custody_indexes_to_fetch = custody_indexes_duty + .into_iter() + .filter(|index| !custody_indexes_imported.contains(index)) + .collect::>(); + + if custody_indexes_to_fetch.is_empty() { + // No indexes required, do not issue any request + return Ok(LookupRequestResult::NoRequestNeeded); + } + + let req_id = self.next_id(); + let id = SingleLookupReqId { lookup_id, req_id }; + + debug!( + self.log, + "Starting custody columns request"; + "block_root" => ?block_root, + "indices" => ?custody_indexes_to_fetch, + "id" => ?id + ); + + // TODO(das): Issue a custody request with `id` for the set of columns + // `custody_indexes_to_fetch` and block `block_root`. + + Ok(LookupRequestResult::RequestSent(req_id)) + } + pub fn is_execution_engine_online(&self) -> bool { self.execution_engine_state == EngineState::Online } @@ -776,6 +863,26 @@ impl SyncNetworkContext { }) } + pub fn send_custody_columns_for_processing( + &self, + id: Id, + block_root: Hash256, + _custody_columns: DataColumnSidecarList, + _duration: Duration, + ) -> Result<(), SendErrorProcessor> { + let _beacon_processor = self + .beacon_processor_if_enabled() + .ok_or(SendErrorProcessor::ProcessorNotAvailable)?; + + debug!(self.log, "Sending custody columns for processing"; "block" => ?block_root, "id" => id); + + // Lookup sync event safety: If `beacon_processor.send_rpc_custody_columns` returns Ok() sync + // must receive a single `SyncMessage::BlockComponentProcessed` event with this process type + // + // TODO(das): After merging processor import PR, actually send columns to beacon processor. + Ok(()) + } + pub(crate) fn register_metrics(&self) { metrics::set_gauge_vec( &metrics::SYNC_ACTIVE_NETWORK_REQUESTS, From 96e2363d721f55d79f66f98f15fd61a854c7dc62 Mon Sep 17 00:00:00 2001 From: dapplion <35266934+dapplion@users.noreply.github.com> Date: Thu, 15 Aug 2024 09:36:30 +0200 Subject: [PATCH 2/3] Review PR --- beacon_node/beacon_chain/src/beacon_chain.rs | 13 +++---- .../lighthouse_network/src/types/globals.rs | 3 +- .../network/src/sync/block_lookups/common.rs | 12 +++---- .../sync/block_lookups/single_block_lookup.rs | 8 ++--- .../network/src/sync/block_lookups/tests.rs | 1 + .../network/src/sync/network_context.rs | 34 +++++++------------ 6 files changed, 29 insertions(+), 42 deletions(-) diff --git a/beacon_node/beacon_chain/src/beacon_chain.rs b/beacon_node/beacon_chain/src/beacon_chain.rs index eaf20168a64..8757ca0713e 100644 --- a/beacon_node/beacon_chain/src/beacon_chain.rs +++ b/beacon_node/beacon_chain/src/beacon_chain.rs @@ -6778,23 +6778,20 @@ impl BeaconChain { } /// Returns true if epoch is within the data availability boundary - pub fn is_within_data_availability_boundary(&self, epoch: Epoch) -> bool { - if let Some(boundary) = self.data_availability_boundary() { - epoch > boundary - } else { - false - } + pub fn da_check_required_for_epoch(&self, epoch: Epoch) -> bool { + self.data_availability_checker + .da_check_required_for_epoch(epoch) } /// Returns true if we should fetch blobs for this block pub fn should_fetch_blobs(&self, block_epoch: Epoch) -> bool { - self.is_within_data_availability_boundary(block_epoch) + self.da_check_required_for_epoch(block_epoch) && !self.spec.is_peer_das_enabled_for_epoch(block_epoch) } /// Returns true if we should fetch custody columns for this block pub fn should_fetch_custody_columns(&self, block_epoch: Epoch) -> bool { - self.is_within_data_availability_boundary(block_epoch) + self.da_check_required_for_epoch(block_epoch) && self.spec.is_peer_das_enabled_for_epoch(block_epoch) } diff --git a/beacon_node/lighthouse_network/src/types/globals.rs b/beacon_node/lighthouse_network/src/types/globals.rs index 2662993bff3..1c7c7f07d0a 100644 --- a/beacon_node/lighthouse_network/src/types/globals.rs +++ b/beacon_node/lighthouse_network/src/types/globals.rs @@ -113,7 +113,8 @@ impl NetworkGlobals { /// Compute custody data columns the node is assigned to custody. pub fn custody_columns(&self, _spec: &ChainSpec) -> Vec { let _enr = self.local_enr(); - todo!("implement ENR changes"); + //TODO(das): implement ENR changes + vec![] } /// TESTING ONLY. Build a dummy NetworkGlobals instance. diff --git a/beacon_node/network/src/sync/block_lookups/common.rs b/beacon_node/network/src/sync/block_lookups/common.rs index 93789f08e34..a7be72556e2 100644 --- a/beacon_node/network/src/sync/block_lookups/common.rs +++ b/beacon_node/network/src/sync/block_lookups/common.rs @@ -4,9 +4,7 @@ use crate::sync::block_lookups::single_block_lookup::{ use crate::sync::block_lookups::{ BlobRequestState, BlockRequestState, CustodyRequestState, PeerId, }; -use crate::sync::network_context::{ - DownloadedBlockSummary, LookupRequestResult, SyncNetworkContext, -}; +use crate::sync::network_context::{LookupRequestResult, SyncNetworkContext}; use beacon_chain::block_verification_types::RpcBlock; use beacon_chain::BeaconChainTypes; use lighthouse_network::service::api_types::Id; @@ -43,7 +41,7 @@ pub trait RequestState { &self, id: Id, peer_id: PeerId, - downloaded_block: Option, + downloaded_block: Option>>, cx: &mut SyncNetworkContext, ) -> Result; @@ -78,7 +76,7 @@ impl RequestState for BlockRequestState { &self, id: SingleLookupId, peer_id: PeerId, - _: Option, + _: Option>>, cx: &mut SyncNetworkContext, ) -> Result { cx.block_lookup_request(id, peer_id, self.requested_block_root) @@ -126,7 +124,7 @@ impl RequestState for BlobRequestState { &self, id: Id, peer_id: PeerId, - downloaded_block: Option, + downloaded_block: Option>>, cx: &mut SyncNetworkContext, ) -> Result { cx.blob_lookup_request(id, peer_id, self.block_root, downloaded_block) @@ -170,7 +168,7 @@ impl RequestState for CustodyRequestState { id: Id, // TODO(das): consider selecting peers that have custody but are in this set _peer_id: PeerId, - downloaded_block: Option, + downloaded_block: Option>>, cx: &mut SyncNetworkContext, ) -> Result { cx.custody_lookup_request(id, self.block_root, downloaded_block) diff --git a/beacon_node/network/src/sync/block_lookups/single_block_lookup.rs b/beacon_node/network/src/sync/block_lookups/single_block_lookup.rs index c5fa4eb5034..72fbb674786 100644 --- a/beacon_node/network/src/sync/block_lookups/single_block_lookup.rs +++ b/beacon_node/network/src/sync/block_lookups/single_block_lookup.rs @@ -182,11 +182,11 @@ impl SingleBlockLookup { ) -> Result<(), LookupRequestError> { let id = self.id; let awaiting_parent = self.awaiting_parent.is_some(); - let downloaded_block_summary = self + let downloaded_block = self .block_request_state .state .peek_downloaded_data() - .map(|block| (block.num_expected_blobs(), block.slot())); + .map(|block| block.clone()); let block_is_processed = self.block_request_state.state.is_processed(); let request = R::request_state_mut(self); @@ -213,7 +213,7 @@ impl SingleBlockLookup { }; let request = R::request_state_mut(self); - match request.make_request(id, peer_id, downloaded_block_summary, cx)? { + match request.make_request(id, peer_id, downloaded_block, cx)? { LookupRequestResult::RequestSent(req_id) => { // Lookup sync event safety: If make_request returns `RequestSent`, we are // guaranteed that `BlockLookups::on_download_response` will be called exactly @@ -328,7 +328,7 @@ impl BlobRequestState { } } -/// The state of the blob request component of a `SingleBlockLookup`. +/// The state of the custody request component of a `SingleBlockLookup`. #[derive(Derivative)] #[derivative(Debug)] pub struct CustodyRequestState { diff --git a/beacon_node/network/src/sync/block_lookups/tests.rs b/beacon_node/network/src/sync/block_lookups/tests.rs index bac76ccb849..fcd0d768b7b 100644 --- a/beacon_node/network/src/sync/block_lookups/tests.rs +++ b/beacon_node/network/src/sync/block_lookups/tests.rs @@ -723,6 +723,7 @@ impl TestRig { (ev.work_type() == beacon_processor::RPC_BLOBS).then_some(()) }) .unwrap_or_else(|e| panic!("Expected blobs work event: {e}")), + // TODO(das): remove todo when adding tests for custody sync lookup ResponseType::CustodyColumn => todo!(), } } diff --git a/beacon_node/network/src/sync/network_context.rs b/beacon_node/network/src/sync/network_context.rs index 5f938546ef2..d648f3b6e98 100644 --- a/beacon_node/network/src/sync/network_context.rs +++ b/beacon_node/network/src/sync/network_context.rs @@ -46,10 +46,6 @@ pub enum RangeRequestId { }, } -/// 0: expected blob count -/// 1: block slot -pub type DownloadedBlockSummary = (/* expected blob count */ usize, Slot); - #[derive(Debug)] pub enum RpcEvent { StreamTermination, @@ -451,18 +447,16 @@ impl SyncNetworkContext { lookup_id: SingleLookupId, peer_id: PeerId, block_root: Hash256, - downloaded_block: Option, + downloaded_block: Option>>, ) -> Result { - let Some((expected_blobs, block_slot)) = downloaded_block.or_else(|| { + let Some(block) = downloaded_block.or_else(|| { // If the block is already being processed or fully validated, retrieve how many blobs // it expects. Consider any stage of the block. If the block root has been validated, we // can assert that this is the correct value of `blob_kzg_commitments_count`. match self.chain.get_block_process_status(&block_root) { BlockProcessStatus::Unknown => None, BlockProcessStatus::NotValidated(block) - | BlockProcessStatus::ExecutionValidated(block) => { - Some((block.num_expected_blobs(), block.slot())) - } + | BlockProcessStatus::ExecutionValidated(block) => Some(block.clone()), } }) else { // Wait to download the block before downloading blobs. Then we can be sure that the @@ -479,12 +473,11 @@ impl SyncNetworkContext { // get dropped as completed. return Ok(LookupRequestResult::Pending("waiting for block download")); }; + let expected_blobs = block.num_expected_blobs(); + let block_epoch = block.slot().epoch(T::EthSpec::slots_per_epoch()); // Check if we are into peerdas - if !self - .chain - .should_fetch_blobs(block_slot.epoch(T::EthSpec::slots_per_epoch())) - { + if !self.chain.should_fetch_blobs(block_epoch) { return Ok(LookupRequestResult::NoRequestNeeded); } @@ -540,15 +533,13 @@ impl SyncNetworkContext { &mut self, lookup_id: SingleLookupId, block_root: Hash256, - downloaded_block: Option, + downloaded_block: Option>>, ) -> Result { - let Some((expected_blobs, block_slot)) = + let Some(block) = downloaded_block.or_else(|| match self.chain.get_block_process_status(&block_root) { BlockProcessStatus::Unknown => None, BlockProcessStatus::NotValidated(block) - | BlockProcessStatus::ExecutionValidated(block) => { - Some((block.num_expected_blobs(), block.slot())) - } + | BlockProcessStatus::ExecutionValidated(block) => Some(block.clone()), }) else { // Wait to download the block before downloading columns. Then we can be sure that the @@ -558,12 +549,11 @@ impl SyncNetworkContext { // - if `num_expected_blobs` returns Some = block is processed. return Ok(LookupRequestResult::Pending("waiting for block download")); }; + let expected_blobs = block.num_expected_blobs(); + let block_epoch = block.slot().epoch(T::EthSpec::slots_per_epoch()); // Check if we are into peerdas - if !self - .chain - .should_fetch_custody_columns(block_slot.epoch(T::EthSpec::slots_per_epoch())) - { + if !self.chain.should_fetch_custody_columns(block_epoch) { return Ok(LookupRequestResult::NoRequestNeeded); } From 0059a6c707066b72697606fb84b8b6de6986f2a0 Mon Sep 17 00:00:00 2001 From: dapplion <35266934+dapplion@users.noreply.github.com> Date: Thu, 15 Aug 2024 16:22:11 +0200 Subject: [PATCH 3/3] clippy --- .../network/src/sync/block_lookups/single_block_lookup.rs | 2 +- beacon_node/network/src/sync/network_context.rs | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/beacon_node/network/src/sync/block_lookups/single_block_lookup.rs b/beacon_node/network/src/sync/block_lookups/single_block_lookup.rs index 72fbb674786..b9cd4e3e035 100644 --- a/beacon_node/network/src/sync/block_lookups/single_block_lookup.rs +++ b/beacon_node/network/src/sync/block_lookups/single_block_lookup.rs @@ -186,7 +186,7 @@ impl SingleBlockLookup { .block_request_state .state .peek_downloaded_data() - .map(|block| block.clone()); + .cloned(); let block_is_processed = self.block_request_state.state.is_processed(); let request = R::request_state_mut(self); diff --git a/beacon_node/network/src/sync/network_context.rs b/beacon_node/network/src/sync/network_context.rs index d648f3b6e98..7bcc8ae9f27 100644 --- a/beacon_node/network/src/sync/network_context.rs +++ b/beacon_node/network/src/sync/network_context.rs @@ -25,7 +25,7 @@ use std::sync::Arc; use std::time::Duration; use tokio::sync::mpsc; use types::blob_sidecar::FixedBlobSidecarList; -use types::{BlobSidecar, DataColumnSidecarList, EthSpec, Hash256, SignedBeaconBlock, Slot}; +use types::{BlobSidecar, DataColumnSidecarList, EthSpec, Hash256, SignedBeaconBlock}; mod requests;