diff --git a/beacon_node/beacon_chain/src/test_utils.rs b/beacon_node/beacon_chain/src/test_utils.rs index 87a3eeb359e..b120420e54c 100644 --- a/beacon_node/beacon_chain/src/test_utils.rs +++ b/beacon_node/beacon_chain/src/test_utils.rs @@ -1,4 +1,5 @@ use crate::block_verification_types::{AsBlock, RpcBlock}; +use crate::kzg_utils::blobs_to_data_column_sidecars; use crate::observed_operations::ObservationOutcome; pub use crate::persisted_beacon_chain::PersistedBeaconChain; use crate::BeaconBlockResponseWrapper; @@ -82,6 +83,14 @@ pub static KZG: LazyLock> = LazyLock::new(|| { Arc::new(kzg) }); +pub static KZG_PEERDAS: LazyLock> = LazyLock::new(|| { + let trusted_setup: TrustedSetup = serde_json::from_reader(TRUSTED_SETUP_BYTES) + .map_err(|e| format!("Unable to read trusted setup file: {}", e)) + .expect("should have trusted setup"); + let kzg = Kzg::new_from_trusted_setup_das_enabled(trusted_setup).expect("should create kzg"); + Arc::new(kzg) +}); + pub type BaseHarnessType = Witness, E, THotStore, TColdStore>; @@ -507,7 +516,13 @@ where let validator_keypairs = self .validator_keypairs .expect("cannot build without validator keypairs"); - let kzg = spec.deneb_fork_epoch.map(|_| KZG.clone()); + let kzg = if spec.is_peer_das_scheduled() { + Some(KZG_PEERDAS.clone()) + } else if spec.deneb_fork_epoch.is_some() { + Some(KZG.clone()) + } else { + None + }; let validator_monitor_config = self.validator_monitor_config.unwrap_or_default(); @@ -2690,3 +2705,21 @@ pub fn generate_rand_block_and_blobs( } (block, blob_sidecars) } + +#[allow(clippy::type_complexity)] +pub fn generate_rand_block_and_data_columns( + fork_name: ForkName, + num_blobs: NumBlobs, + rng: &mut impl Rng, + kzg: &Kzg, + spec: &ChainSpec, +) -> ( + SignedBeaconBlock>, + Vec>>, +) { + let (block, blobs) = generate_rand_block_and_blobs(fork_name, num_blobs, rng); + let blob: BlobsList = blobs.into_iter().map(|b| b.blob).collect::>().into(); + let data_columns = blobs_to_data_column_sidecars(&blob, &block, kzg, spec).unwrap(); + + (block, data_columns) +} diff --git a/beacon_node/lighthouse_network/src/peer_manager/peerdb.rs b/beacon_node/lighthouse_network/src/peer_manager/peerdb.rs index c3e77ae225e..d602ee2cae3 100644 --- a/beacon_node/lighthouse_network/src/peer_manager/peerdb.rs +++ b/beacon_node/lighthouse_network/src/peer_manager/peerdb.rs @@ -1,4 +1,7 @@ +use crate::discovery::enr::PEERDAS_CUSTODY_SUBNET_COUNT_ENR_KEY; use crate::discovery::CombinedKey; +use crate::rpc::{MetaData, MetaDataV2}; +use crate::EnrExt; use crate::{metrics, multiaddr::Multiaddr, types::Subnet, Enr, Gossipsub, PeerId}; use peer_info::{ConnectionDirection, PeerConnectionStatus, PeerInfo}; use rand::seq::SliceRandom; @@ -12,7 +15,7 @@ use std::{ fmt::Formatter, }; use sync_status::SyncStatus; -use types::EthSpec; +use types::{ChainSpec, DataColumnSubnetId, EthSpec}; pub mod client; pub mod peer_info; @@ -673,17 +676,62 @@ impl PeerDB { } /// Updates the connection state. MUST ONLY BE USED IN TESTS. - pub fn __add_connected_peer_testing_only(&mut self, peer_id: &PeerId) -> Option { + pub fn __add_connected_peer_testing_only( + &mut self, + supernode: bool, + spec: &ChainSpec, + ) -> PeerId { let enr_key = CombinedKey::generate_secp256k1(); - let enr = Enr::builder().build(&enr_key).unwrap(); + let mut enr = Enr::builder().build(&enr_key).unwrap(); + let peer_id = enr.peer_id(); + let node_id = enr.node_id().raw().into(); + + let custody_subnet_count = if supernode { + spec.data_column_sidecar_subnet_count + } else { + spec.custody_requirement + }; + + enr.insert( + PEERDAS_CUSTODY_SUBNET_COUNT_ENR_KEY, + &custody_subnet_count, + &enr_key, + ) + .expect("u64 can be encoded"); + self.update_connection_state( - peer_id, + &peer_id, NewConnectionState::Connected { enr: Some(enr), seen_address: Multiaddr::empty(), direction: ConnectionDirection::Outgoing, }, - ) + ); + let peer = self.peers.get_mut(&peer_id).expect("peer exists"); + + // Need to insert an empty metadata to pass the condition `on_subnet_metadata` + peer.set_meta_data(MetaData::V2(MetaDataV2 { + seq_number: 0, + attnets: <_>::default(), + syncnets: <_>::default(), + })); + + for subnet in + DataColumnSubnetId::compute_custody_columns::(node_id, custody_subnet_count, spec) + { + // Need to pass the tests on PeerInfo + // - on_subnet_metadata: no action, `Subnet::DataColumn` returns true + // - on_subnet_gossipsub: subnets field contains subnet + // - is_good_gossipsub_peer: score >= 0, which equals default score + // peer.set_meta_data(crate::rpc::MetaData::V2(crate::rpc::MetaDataV2 { + // seq_number: 0, + // attnets: <_>::default(), + // syncnets: <_>::default(), + // })); + peer.insert_subnet(Subnet::DataColumn(subnet.into())); + } + + peer_id } /// The connection state of the peer has been changed. Modify the peer in the db to ensure all diff --git a/beacon_node/lighthouse_network/src/types/globals.rs b/beacon_node/lighthouse_network/src/types/globals.rs index 1c7c7f07d0a..ec6b55333f0 100644 --- a/beacon_node/lighthouse_network/src/types/globals.rs +++ b/beacon_node/lighthouse_network/src/types/globals.rs @@ -1,4 +1,5 @@ //! A collection of variables that are accessible outside of the network thread itself. +use crate::discovery::enr::Eth2Enr; use crate::peer_manager::peerdb::PeerDB; use crate::rpc::{MetaData, MetaDataV2}; use crate::types::{BackFillState, SyncState}; @@ -7,7 +8,9 @@ use crate::EnrExt; use crate::{Enr, GossipTopic, Multiaddr, PeerId}; use parking_lot::RwLock; use std::collections::HashSet; -use types::{ChainSpec, ColumnIndex, EthSpec}; +use types::{ChainSpec, ColumnIndex, DataColumnSubnetId, EthSpec}; + +use super::Subnet; pub struct NetworkGlobals { /// The current local ENR. @@ -111,10 +114,32 @@ impl NetworkGlobals { } /// Compute custody data columns the node is assigned to custody. - pub fn custody_columns(&self, _spec: &ChainSpec) -> Vec { - let _enr = self.local_enr(); - //TODO(das): implement ENR changes - vec![] + pub fn custody_columns(&self, spec: &ChainSpec) -> Vec { + let enr = self.local_enr(); + let node_id = enr.node_id().raw().into(); + let custody_subnet_count = enr.custody_subnet_count::(spec); + DataColumnSubnetId::compute_custody_columns::(node_id, custody_subnet_count, spec) + .collect() + } + + /// Returns a connected peer that: + /// 1. is connected + /// 2. assigned to custody the column based on it's `custody_subnet_count` from metadata (WIP) + /// 3. has a good score + /// 4. subscribed to the specified column - this condition can be removed later, so we can + /// identify and penalise peers that are supposed to custody the column. + pub fn custody_peers_for_column( + &self, + column_index: ColumnIndex, + spec: &ChainSpec, + ) -> Vec { + self.peers + .read() + .good_peers_on_subnet(Subnet::DataColumn( + DataColumnSubnetId::from_column_index::(column_index as usize, spec), + )) + .cloned() + .collect::>() } /// TESTING ONLY. Build a dummy NetworkGlobals instance. diff --git a/beacon_node/network/src/sync/block_lookups/mod.rs b/beacon_node/network/src/sync/block_lookups/mod.rs index 7194faa2860..05a72a6ccd7 100644 --- a/beacon_node/network/src/sync/block_lookups/mod.rs +++ b/beacon_node/network/src/sync/block_lookups/mod.rs @@ -481,9 +481,14 @@ impl BlockLookups { BlockProcessType::SingleBlob { id } => { self.on_processing_result_inner::>(id, result, cx) } + BlockProcessType::SingleCustodyColumn { id } => { + self.on_processing_result_inner::>(id, result, cx) + } }; let id = match process_type { - BlockProcessType::SingleBlock { id } | BlockProcessType::SingleBlob { id } => id, + BlockProcessType::SingleBlock { id } + | BlockProcessType::SingleBlob { id } + | BlockProcessType::SingleCustodyColumn { id } => id, }; self.on_lookup_result(id, lookup_result, "processing_result", cx); } diff --git a/beacon_node/network/src/sync/block_lookups/single_block_lookup.rs b/beacon_node/network/src/sync/block_lookups/single_block_lookup.rs index b9cd4e3e035..18b5b5fff42 100644 --- a/beacon_node/network/src/sync/block_lookups/single_block_lookup.rs +++ b/beacon_node/network/src/sync/block_lookups/single_block_lookup.rs @@ -220,7 +220,7 @@ impl SingleBlockLookup { // with this `req_id`. request.get_state_mut().on_download_start(req_id)? } - LookupRequestResult::NoRequestNeeded => { + LookupRequestResult::NoRequestNeeded(_reason) => { // Lookup sync event safety: Advances this request to the terminal `Processed` // state. If all requests reach this state, the request is marked as completed // in `Self::continue_requests`. diff --git a/beacon_node/network/src/sync/block_lookups/tests.rs b/beacon_node/network/src/sync/block_lookups/tests.rs index fcd0d768b7b..43b73b6e6df 100644 --- a/beacon_node/network/src/sync/block_lookups/tests.rs +++ b/beacon_node/network/src/sync/block_lookups/tests.rs @@ -14,8 +14,10 @@ use beacon_chain::builder::Witness; use beacon_chain::data_availability_checker::Availability; use beacon_chain::eth1_chain::CachingEth1Backend; use beacon_chain::test_utils::{ - build_log, generate_rand_block_and_blobs, BeaconChainHarness, EphemeralHarnessType, NumBlobs, + build_log, generate_rand_block_and_blobs, generate_rand_block_and_data_columns, test_spec, + BeaconChainHarness, EphemeralHarnessType, NumBlobs, }; +use beacon_chain::validator_monitor::timestamp_now; use beacon_chain::{ AvailabilityPendingExecutedBlock, PayloadVerificationOutcome, PayloadVerificationStatus, }; @@ -33,7 +35,7 @@ use types::{ test_utils::{SeedableRng, XorShiftRng}, BlobSidecar, ForkName, MinimalEthSpec as E, SignedBeaconBlock, Slot, }; -use types::{BeaconState, BeaconStateBase}; +use types::{BeaconState, BeaconStateBase, ColumnIndex, DataColumnSidecar, Epoch}; type T = Witness, E, MemoryStore, MemoryStore>; @@ -85,14 +87,30 @@ struct TestRig { const D: Duration = Duration::new(0, 0); const PARENT_FAIL_TOLERANCE: u8 = SINGLE_BLOCK_LOOKUP_MAX_ATTEMPTS; +type DCByRootIds = Vec; +type DCByRootId = (SyncRequestId, Vec); + +struct TestRigConfig { + peer_das_enabled: bool, +} + impl TestRig { - fn test_setup() -> Self { + fn test_setup_with_config(config: Option) -> Self { let enable_log = cfg!(feature = "test_logger"); let log = build_log(slog::Level::Trace, enable_log); + // Use `fork_from_env` logic to set correct fork epochs + let mut spec = test_spec::(); + + if let Some(config) = config { + if config.peer_das_enabled { + spec.eip7594_fork_epoch = Some(Epoch::new(0)); + } + } + // Initialise a new beacon chain let harness = BeaconChainHarness::>::builder(E) - .default_spec() + .spec(spec) .logger(log.clone()) .deterministic_keypairs(1) .fresh_ephemeral_store() @@ -144,6 +162,10 @@ impl TestRig { } } + fn test_setup() -> Self { + Self::test_setup_with_config(None) + } + fn test_setup_after_deneb() -> Option { let r = Self::test_setup(); if r.after_deneb() { @@ -153,6 +175,17 @@ impl TestRig { } } + fn test_setup_after_peerdas() -> Option { + let r = Self::test_setup_with_config(Some(TestRigConfig { + peer_das_enabled: true, + })); + if r.after_deneb() { + Some(r) + } else { + None + } + } + fn log(&self, msg: &str) { info!(self.log, "TEST_RIG"; "msg" => msg); } @@ -193,6 +226,19 @@ impl TestRig { generate_rand_block_and_blobs::(fork_name, num_blobs, rng) } + fn rand_block_and_data_columns( + &mut self, + ) -> (SignedBeaconBlock, Vec>>) { + let num_blobs = NumBlobs::Number(1); + generate_rand_block_and_data_columns::( + self.fork_name, + num_blobs, + &mut self.rng, + &self.harness.chain.kzg.as_ref().expect("no KZG"), + &self.harness.spec, + ) + } + pub fn rand_block_and_parent( &mut self, ) -> (SignedBeaconBlock, SignedBeaconBlock, Hash256, Hash256) { @@ -311,12 +357,34 @@ impl TestRig { } fn new_connected_peer(&mut self) -> PeerId { - let peer_id = PeerId::random(); self.network_globals .peers .write() - .__add_connected_peer_testing_only(&peer_id); - peer_id + .__add_connected_peer_testing_only(false, &self.harness.spec) + } + + fn new_connected_supernode_peer(&mut self) -> PeerId { + self.network_globals + .peers + .write() + .__add_connected_peer_testing_only(true, &self.harness.spec) + } + + fn new_connected_peers_for_peerdas(&mut self) { + // Enough sampling peers with few columns + for _ in 0..100 { + self.new_connected_peer(); + } + // One supernode peer to ensure all columns have at least one peer + self.new_connected_supernode_peer(); + + // ensure we have peers in each subnet + for i in 0..self.harness.spec.data_column_sidecar_subnet_count { + let peers = self + .network_globals + .custody_peers_for_column(i, &self.harness.spec); + println!("peers column {} {:?}", i, peers); + } } fn parent_chain_processed_success( @@ -515,6 +583,100 @@ impl TestRig { self.complete_lookup_block_import_valid(block_root, import) } + fn complete_valid_block_request( + &mut self, + id: SingleLookupReqId, + block: Arc>, + missing_components: bool, + ) { + // Complete download + let peer_id = PeerId::random(); + let slot = block.slot(); + let block_root = block.canonical_root(); + self.single_lookup_block_response(id, peer_id, Some(block)); + self.single_lookup_block_response(id, peer_id, None); + // Expect processing and resolve with import + self.expect_block_process(ResponseType::Block); + self.single_block_component_processed( + id.lookup_id, + if missing_components { + BlockProcessingResult::Ok(AvailabilityProcessingStatus::MissingComponents( + slot, block_root, + )) + } else { + BlockProcessingResult::Ok(AvailabilityProcessingStatus::Imported(block_root)) + }, + ) + } + + fn complete_valid_custody_request( + &mut self, + ids: DCByRootIds, + data_columns: Vec>>, + missing_components: bool, + ) { + let lookup_id = if let SyncRequestId::DataColumnsByRoot(_, id) = ids.first().unwrap().0 { + id.lookup_id + } else { + panic!("not a custody requester") + }; + + let first_column = data_columns.first().cloned().unwrap(); + + for id in ids { + self.log(&format!("return valid data column for {id:?}")); + let indices = &id.1; + let columns_to_send = indices + .iter() + .map(|&i| data_columns[i as usize].clone()) + .collect::>(); + self.complete_data_columns_by_root_request(id, &columns_to_send); + } + + // Expect work event + // TODO(das): worth it to append sender id to the work event for stricter assertion? + self.expect_rpc_custody_column_work_event(); + + // Respond with valid result + self.send_sync_message(SyncMessage::BlockComponentProcessed { + process_type: BlockProcessType::SingleCustodyColumn { id: lookup_id }, + result: if missing_components { + BlockProcessingResult::Ok(AvailabilityProcessingStatus::MissingComponents( + first_column.slot(), + first_column.block_root(), + )) + } else { + BlockProcessingResult::Ok(AvailabilityProcessingStatus::Imported( + first_column.block_root(), + )) + }, + }); + } + + fn complete_data_columns_by_root_request( + &mut self, + (request_id, _): DCByRootId, + data_columns: &[Arc>], + ) { + let peer_id = PeerId::random(); + for data_column in data_columns { + // Send chunks + self.send_sync_message(SyncMessage::RpcDataColumn { + request_id, + peer_id, + data_column: Some(data_column.clone()), + seen_timestamp: timestamp_now(), + }); + } + // Send stream termination + self.send_sync_message(SyncMessage::RpcDataColumn { + request_id, + peer_id, + data_column: None, + seen_timestamp: timestamp_now(), + }); + } + fn parent_lookup_failed(&mut self, id: SingleLookupReqId, peer_id: PeerId, error: RPCError) { self.send_sync_message(SyncMessage::RpcError { peer_id, @@ -710,6 +872,59 @@ impl TestRig { .unwrap_or_else(|e| panic!("Expected blob parent request for {for_block:?}: {e}")) } + /// Retrieves an unknown number of requests for data columns of `block_root`. Because peer ENRs + /// are random, and peer selection is random, the total number of batched requests is unknown. + fn expect_data_columns_by_root_requests( + &mut self, + block_root: Hash256, + count: usize, + ) -> DCByRootIds { + let mut requests: DCByRootIds = vec![]; + loop { + let req = self + .pop_received_network_event(|ev| match ev { + NetworkMessage::SendRequest { + peer_id: _, + request: Request::DataColumnsByRoot(request), + request_id: AppRequestId::Sync(id @ SyncRequestId::DataColumnsByRoot { .. }), + } if request + .data_column_ids + .to_vec() + .iter() + .any(|r| r.block_root == block_root) => + { + let indices = request + .data_column_ids + .to_vec() + .iter() + .map(|cid| cid.index) + .collect::>(); + Some((*id, indices)) + } + _ => None, + }) + .unwrap_or_else(|e| { + panic!("Expected more DataColumnsByRoot requests for {block_root:?}: {e}") + }); + requests.push(req); + + // Should never infinite loop because sync does not send requests for 0 columns + if requests.iter().map(|r| r.1.len()).sum::() >= count { + return requests; + } + } + } + + fn expect_only_data_columns_by_root_requests( + &mut self, + for_block: Hash256, + count: usize, + ) -> DCByRootIds { + let ids = self.expect_data_columns_by_root_requests(for_block, count); + self.expect_empty_network(); + ids + } + #[track_caller] fn expect_block_process(&mut self, response_type: ResponseType) { match response_type { @@ -728,6 +943,17 @@ impl TestRig { } } + fn expect_rpc_custody_column_work_event(&mut self) { + self.pop_received_processor_event(|ev| { + if ev.work_type() == beacon_processor::RPC_CUSTODY_COLUMN { + Some(()) + } else { + None + } + }) + .unwrap_or_else(|e| panic!("Expected RPC custody column work: {e}")) + } + fn expect_no_penalty_for(&mut self, peer_id: PeerId) { self.drain_network_rx(); let downscore_events = self @@ -1588,6 +1814,27 @@ fn blobs_in_da_checker_skip_download() { r.expect_no_active_lookups(); } +#[test] +fn custody_lookup_happy_path() { + let Some(mut r) = TestRig::test_setup_after_peerdas() else { + return; + }; + let spec = E::default_spec(); + r.new_connected_peers_for_peerdas(); + let (block, data_columns) = r.rand_block_and_data_columns(); + let block_root = block.canonical_root(); + let peer_id = r.new_connected_peer(); + r.trigger_unknown_block_from_attestation(block_root, peer_id); + // Should not request blobs + let id = r.expect_block_lookup_request(block.canonical_root()); + r.complete_valid_block_request(id, block.into(), true); + let custody_column_count = spec.custody_requirement * spec.data_columns_per_subnet() as u64; + let custody_ids = + r.expect_only_data_columns_by_root_requests(block_root, custody_column_count as usize); + r.complete_valid_custody_request(custody_ids, data_columns, false); + r.expect_no_active_lookups(); +} + mod deneb_only { use super::*; use beacon_chain::{ diff --git a/beacon_node/network/src/sync/manager.rs b/beacon_node/network/src/sync/manager.rs index e494f1f94fc..ae1216be9eb 100644 --- a/beacon_node/network/src/sync/manager.rs +++ b/beacon_node/network/src/sync/manager.rs @@ -34,7 +34,7 @@ //! search for the block and subsequently search for parents if needed. use super::backfill_sync::{BackFillSync, ProcessResult, SyncStart}; -use super::block_lookups::BlockLookups; +use super::block_lookups::{BlockLookups, CustodyRequestState}; use super::network_context::{BlockOrBlob, RangeRequestId, RpcEvent, SyncNetworkContext}; use super::peer_sync_info::{remote_sync_type, PeerSyncType}; use super::range_sync::{RangeSync, RangeSyncType, EPOCHS_PER_BATCH}; @@ -155,6 +155,7 @@ pub enum SyncMessage { pub enum BlockProcessType { SingleBlock { id: Id }, SingleBlob { id: Id }, + SingleCustodyColumn { id: Id }, } #[derive(Debug)] @@ -965,15 +966,31 @@ impl SyncManager { fn on_data_columns_by_root_response( &mut self, req_id: DataColumnsByRootRequestId, - _requester: SingleLookupReqId, + requester: SingleLookupReqId, peer_id: PeerId, rpc_event: RpcEvent>>, ) { - if let Some(_resp) = self + if let Some(resp) = self .network .on_data_columns_by_root_response(req_id, peer_id, rpc_event) { - // TODO(das): pass data_columns_by_root result to consumer + if let Some(custody_columns) = self + .network + .on_custody_by_root_response(requester, req_id, peer_id, resp) + { + // TODO(das): get proper timestamp + let seen_timestamp = timestamp_now(); + self.block_lookups + .on_download_response::>( + requester, + // TODO(das): this is not the correct peer to attribute to a response. It's + // just the last peer of multiple potential requests. We should switch to a + // concept of "peer group" to attribute fault to the correct peer. + peer_id, + custody_columns.map(|custody_columns| (custody_columns, seen_timestamp)), + &mut self.network, + ); + } } } diff --git a/beacon_node/network/src/sync/network_context.rs b/beacon_node/network/src/sync/network_context.rs index fa9159f7f8e..6bd3be6dfd0 100644 --- a/beacon_node/network/src/sync/network_context.rs +++ b/beacon_node/network/src/sync/network_context.rs @@ -13,6 +13,7 @@ use crate::sync::block_lookups::SingleLookupId; use crate::sync::manager::BlockProcessType; use beacon_chain::block_verification_types::RpcBlock; use beacon_chain::{BeaconChain, BeaconChainTypes, BlockProcessStatus, EngineState}; +use custody_by_root::ActiveCustodyRequest; use fnv::FnvHashMap; use lighthouse_network::rpc::methods::BlobsByRangeRequest; use lighthouse_network::rpc::{BlocksByRangeRequest, GoodbyeReason, RPCError}; @@ -29,9 +30,11 @@ use std::time::Duration; use tokio::sync::mpsc; use types::blob_sidecar::FixedBlobSidecarList; use types::{ - BlobSidecar, DataColumnSidecar, DataColumnSidecarList, EthSpec, Hash256, SignedBeaconBlock, + BlobSidecar, ColumnIndex, DataColumnSidecar, DataColumnSidecarList, EthSpec, Hash256, + SignedBeaconBlock, }; +mod custody_by_root; mod requests; pub struct BlocksAndBlobsByRangeResponse { @@ -60,15 +63,20 @@ pub enum RpcEvent { pub type RpcResponseResult = Result<(T, Duration), RpcResponseError>; +#[derive(Debug)] pub enum RpcResponseError { RpcError(RPCError), VerifyError(LookupVerifyError), + // TODO(das): handle this nested error better + CustodyRequestError(custody_by_root::Error), } #[derive(Debug, PartialEq, Eq)] pub enum RpcRequestSendError { /// Network channel send failed NetworkSendError, + // TODO(das): handle this nested error better + CustodyRequestError(custody_by_root::Error), } #[derive(Debug, PartialEq, Eq)] @@ -82,6 +90,7 @@ impl std::fmt::Display for RpcResponseError { match self { RpcResponseError::RpcError(e) => write!(f, "RPC Error: {:?}", e), RpcResponseError::VerifyError(e) => write!(f, "Lookup Verify Error: {:?}", e), + RpcResponseError::CustodyRequestError(e) => write!(f, "Custody Send Error: {:?}", e), } } } @@ -106,7 +115,7 @@ pub enum LookupRequestResult { /// completed response or failed request RequestSent(I), /// No request is sent, and no further action is necessary to consider this request completed - NoRequestNeeded, + NoRequestNeeded(&'static str), /// No request is sent, but the request is not completed. Sync MUST receive some future event /// that makes progress on the request. For example: request is processing from a different /// source (i.e. block received from gossip) and sync MUST receive an event with that processing @@ -132,6 +141,9 @@ pub struct SyncNetworkContext { data_columns_by_root_requests: FnvHashMap>, + /// Mapping of active custody column requests for a block root + custody_by_root_requests: FnvHashMap>, + /// BlocksByRange requests paired with BlobsByRange range_blocks_and_blobs_requests: FnvHashMap)>, @@ -181,6 +193,7 @@ impl SyncNetworkContext { blocks_by_root_requests: <_>::default(), blobs_by_root_requests: <_>::default(), data_columns_by_root_requests: <_>::default(), + custody_by_root_requests: <_>::default(), range_blocks_and_blobs_requests: FnvHashMap::default(), network_beacon_processor, chain, @@ -232,6 +245,9 @@ impl SyncNetworkContext { } }); + // No need to fail custody by root requests as those only include items already + // inside data_columns_by_root_requests + failed_range_ids .chain(failed_block_ids) .chain(failed_blob_ids) @@ -243,6 +259,11 @@ impl SyncNetworkContext { &self.network_beacon_processor.network_globals } + pub fn get_custodial_peers(&self, column_index: ColumnIndex) -> Vec { + self.network_globals() + .custody_peers_for_column(column_index, &self.chain.spec) + } + /// Returns the Client type of the peer if known pub fn client_type(&self, peer_id: &PeerId) -> Client { self.network_globals() @@ -419,7 +440,9 @@ impl SyncNetworkContext { // Block is fully validated. If it's not yet imported it's waiting for missing block // components. Consider this request completed and do nothing. BlockProcessStatus::ExecutionValidated { .. } => { - return Ok(LookupRequestResult::NoRequestNeeded) + return Ok(LookupRequestResult::NoRequestNeeded( + "block execution validated", + )) } } @@ -499,7 +522,7 @@ impl SyncNetworkContext { // Check if we are into peerdas if !self.chain.should_fetch_blobs(block_epoch) { - return Ok(LookupRequestResult::NoRequestNeeded); + return Ok(LookupRequestResult::NoRequestNeeded("no data")); } let imported_blob_indexes = self @@ -514,7 +537,7 @@ impl SyncNetworkContext { if indices.is_empty() { // No blobs required, do not issue any request - return Ok(LookupRequestResult::NoRequestNeeded); + return Ok(LookupRequestResult::NoRequestNeeded("no indices to fetch")); } let req_id = self.next_id(); @@ -612,12 +635,14 @@ impl SyncNetworkContext { // Check if we are into peerdas if !self.chain.should_fetch_custody_columns(block_epoch) { - return Ok(LookupRequestResult::NoRequestNeeded); + return Ok(LookupRequestResult::NoRequestNeeded( + "outside peerdas da window", + )); } // No data required for this block if expected_blobs == 0 { - return Ok(LookupRequestResult::NoRequestNeeded); + return Ok(LookupRequestResult::NoRequestNeeded("no data")); } let custody_indexes_imported = self @@ -637,7 +662,7 @@ impl SyncNetworkContext { if custody_indexes_to_fetch.is_empty() { // No indexes required, do not issue any request - return Ok(LookupRequestResult::NoRequestNeeded); + return Ok(LookupRequestResult::NoRequestNeeded("no indices to fetch")); } let req_id = self.next_id(); @@ -651,10 +676,27 @@ impl SyncNetworkContext { "id" => ?id ); - // TODO(das): Issue a custody request with `id` for the set of columns - // `custody_indexes_to_fetch` and block `block_root`. + let mut request = ActiveCustodyRequest::new( + block_root, + // TODO(das): should attach a unique req_id here? + id, + &custody_indexes_to_fetch, + self.log.clone(), + ); - Ok(LookupRequestResult::RequestSent(req_id)) + // Potentially trigger N data_columns_by_root requests. + // Note that you can only send, but not handle a response here + match request.continue_requests(self) { + Ok(_) => { + // Ignoring the result of `continue_requests` is okay. A request that has just been + // created cannot return data immediately, it must send some request to the network + // first. And there must exist some request, `custody_indexes_to_fetch` is not empty. + self.custody_by_root_requests.insert(id, request); + Ok(LookupRequestResult::RequestSent(req_id)) + } + // TODO(das): handle this error properly + Err(e) => Err(RpcRequestSendError::CustodyRequestError(e)), + } } pub fn is_execution_engine_online(&self) -> bool { @@ -897,6 +939,53 @@ impl SyncNetworkContext { } } + /// Insert a downloaded column into an active custody request. Then make progress on the + /// entire request. + /// + /// ### Returns + /// + /// - `Some`: Request completed, won't make more progress. Expect requester to act on the result. + /// - `None`: Request still active, requester should do no action + #[allow(clippy::type_complexity)] + pub fn on_custody_by_root_response( + &mut self, + requester: SingleLookupReqId, + req_id: DataColumnsByRootRequestId, + peer_id: PeerId, + resp: RpcResponseResult>>>, + ) -> Option, RpcResponseError>> { + // Note: need to remove the request to borrow self again below. Otherwise we can't + // do nested requests + let Some(mut request) = self.custody_by_root_requests.remove(&requester) else { + // TOOD(das): This log can happen if the request is error'ed early and dropped + debug!(self.log, "Custody column downloaded event for unknown request"; "id" => ?requester); + return None; + }; + + let result = request + .on_data_column_downloaded(peer_id, req_id, resp, self) + .map_err(RpcResponseError::CustodyRequestError) + .transpose(); + + // Convert a result from internal format of `ActiveCustodyRequest` (error first to use ?) to + // an Option first to use in an `if let Some() { act on result }` block. + if let Some(result) = result { + match result.as_ref() { + Ok(columns) => { + debug!(self.log, "Custody request success, removing"; "id" => ?requester, "count" => columns.len()) + } + Err(e) => { + debug!(self.log, "Custody request failure, removing"; "id" => ?requester, "error" => ?e) + } + } + + Some(result) + } else { + self.custody_by_root_requests.insert(requester, request); + None + } + } + pub fn send_block_for_processing( &self, id: Id, @@ -963,20 +1052,31 @@ impl SyncNetworkContext { &self, id: Id, block_root: Hash256, - _custody_columns: DataColumnSidecarList, - _duration: Duration, + custody_columns: DataColumnSidecarList, + duration: Duration, ) -> Result<(), SendErrorProcessor> { - let _beacon_processor = self + let beacon_processor = self .beacon_processor_if_enabled() .ok_or(SendErrorProcessor::ProcessorNotAvailable)?; debug!(self.log, "Sending custody columns for processing"; "block" => ?block_root, "id" => id); - // Lookup sync event safety: If `beacon_processor.send_rpc_custody_columns` returns Ok() sync // must receive a single `SyncMessage::BlockComponentProcessed` event with this process type - // - // TODO(das): After merging processor import PR, actually send columns to beacon processor. - Ok(()) + beacon_processor + .send_rpc_custody_columns( + block_root, + custody_columns, + duration, + BlockProcessType::SingleCustodyColumn { id }, + ) + .map_err(|e| { + error!( + self.log, + "Failed to send sync custody columns to processor"; + "error" => ?e + ); + SendErrorProcessor::SendError + }) } pub(crate) fn register_metrics(&self) { diff --git a/beacon_node/network/src/sync/network_context/custody_by_root.rs b/beacon_node/network/src/sync/network_context/custody_by_root.rs new file mode 100644 index 00000000000..8d844f54e55 --- /dev/null +++ b/beacon_node/network/src/sync/network_context/custody_by_root.rs @@ -0,0 +1,417 @@ +use beacon_chain::BeaconChainTypes; +use fnv::FnvHashMap; +use lighthouse_network::service::api_types::{DataColumnsByRootRequestId, SingleLookupReqId}; +use lighthouse_network::PeerId; +use lru_cache::LRUTimeCache; +use rand::Rng; +use slog::{debug, warn}; +use std::time::Duration; +use std::{collections::HashMap, marker::PhantomData, sync::Arc}; +use types::EthSpec; +use types::{data_column_sidecar::ColumnIndex, DataColumnSidecar, Hash256}; + +use super::{ + DataColumnsByRootSingleBlockRequest, LookupRequestResult, RpcResponseResult, SyncNetworkContext, +}; + +const FAILED_PEERS_CACHE_EXPIRY_SECONDS: u64 = 5; + +type DataColumnSidecarVec = Vec>>; + +pub struct ActiveCustodyRequest { + block_root: Hash256, + requester: SingleLookupReqId, + /// List of column indices this request needs to download to complete successfully + column_requests: FnvHashMap>, + /// Active requests for 1 or more columns each + active_batch_columns_requests: + FnvHashMap, + /// Peers that have recently failed to successfully respond to a columns by root request. + /// Having a LRUTimeCache allows this request to not have to track disconnecting peers. + failed_peers: LRUTimeCache, + /// Logger for the `SyncNetworkContext`. + pub log: slog::Logger, + _phantom: PhantomData, +} + +#[derive(Debug, Eq, PartialEq)] +pub enum Error { + SendFailed(&'static str), + TooManyFailures, + BadState(String), + NoPeers(ColumnIndex), + /// Received a download result for a different request id than the in-flight request. + /// There should only exist a single request at a time. Having multiple requests is a bug and + /// can result in undefined state, so it's treated as a hard error and the lookup is dropped. + UnexpectedRequestId { + expected_req_id: DataColumnsByRootRequestId, + req_id: DataColumnsByRootRequestId, + }, +} + +struct ActiveBatchColumnsRequest { + peer_id: PeerId, + indices: Vec, +} + +type CustodyRequestResult = Result>, Error>; + +impl ActiveCustodyRequest { + pub(crate) fn new( + block_root: Hash256, + requester: SingleLookupReqId, + column_indices: &[ColumnIndex], + log: slog::Logger, + ) -> Self { + Self { + block_root, + requester, + column_requests: HashMap::from_iter( + column_indices + .iter() + .map(|index| (*index, ColumnRequest::new())), + ), + active_batch_columns_requests: <_>::default(), + failed_peers: LRUTimeCache::new(Duration::from_secs(FAILED_PEERS_CACHE_EXPIRY_SECONDS)), + log, + _phantom: PhantomData, + } + } + + /// Insert a downloaded column into an active custody request. Then make progress on the + /// entire request. + /// + /// ### Returns + /// + /// - `Err`: Custody request has failed and will be dropped + /// - `Ok(Some)`: Custody request has successfully completed and will be dropped + /// - `Ok(None)`: Custody request still active + pub(crate) fn on_data_column_downloaded( + &mut self, + peer_id: PeerId, + req_id: DataColumnsByRootRequestId, + resp: RpcResponseResult>, + cx: &mut SyncNetworkContext, + ) -> CustodyRequestResult { + // TODO(das): Should downscore peers for verify errors here + + let Some(batch_request) = self.active_batch_columns_requests.get_mut(&req_id) else { + warn!(self.log, + "Received custody column response for unrequested index"; + "id" => ?self.requester, + "block_root" => ?self.block_root, + "req_id" => %req_id, + ); + return Ok(None); + }; + + match resp { + Ok((data_columns, _seen_timestamp)) => { + debug!(self.log, + "Custody column download success"; + "id" => ?self.requester, + "block_root" => ?self.block_root, + "req_id" => %req_id, + "peer" => %peer_id, + "count" => data_columns.len() + ); + + // Map columns by index as an optimization to not loop the returned list on each + // requested index. The worse case is 128 loops over a 128 item vec + mutation to + // drop the consumed columns. + let mut data_columns = HashMap::::from_iter( + data_columns.into_iter().map(|d| (d.index, d)), + ); + // Accumulate columns that the peer does not have to issue a single log per request + let mut missing_column_indexes = vec![]; + + for column_index in &batch_request.indices { + let column_request = self + .column_requests + .get_mut(column_index) + .ok_or(Error::BadState("unknown column_index".to_owned()))?; + + if let Some(data_column) = data_columns.remove(column_index) { + column_request.on_download_success( + req_id, + peer_id, + // Safe to cast, self.column_requests only contains indexes for columns we must custody + data_column, + )?; + } else { + // Peer does not have the requested data. + // TODO(das) do not consider this case a success. We know for sure the block has + // data. However we allow the peer to return empty as we can't attribute fault. + // TODO(das): Should track which columns are missing and eventually give up + // TODO(das): If the peer is in the lookup peer set it claims to have imported + // the block AND its custody columns. So in this case we can downscore + column_request.on_download_error(req_id)?; + missing_column_indexes.push(column_index); + } + } + + // Note: no need to check data_columns is empty, SyncNetworkContext ensures that + // successful responses only contain requested data. + + if !missing_column_indexes.is_empty() { + // Note: Batch logging that columns are missing to not spam logger + debug!(self.log, + "Custody column peer claims to not have some data"; + "id" => ?self.requester, + "block_root" => ?self.block_root, + "req_id" => %req_id, + "peer" => %peer_id, + // TODO(das): this property can become very noisy, being the full range 0..128 + "missing_column_indexes" => ?missing_column_indexes + ); + + self.failed_peers.insert(peer_id); + } + } + Err(err) => { + debug!(self.log, + "Custody column download error"; + "id" => ?self.requester, + "block_root" => ?self.block_root, + "req_id" => ?req_id, + "peer" => %peer_id, + "error" => ?err + ); + + // TODO(das): Should mark peer as failed and try from another peer + for column_index in &batch_request.indices { + self.column_requests + .get_mut(column_index) + .ok_or(Error::BadState("unknown column_index".to_owned()))? + .on_download_error_and_mark_failure(req_id)?; + } + + self.failed_peers.insert(peer_id); + } + }; + + self.continue_requests(cx) + } + + pub(crate) fn continue_requests( + &mut self, + cx: &mut SyncNetworkContext, + ) -> CustodyRequestResult { + if self.column_requests.values().all(|r| r.is_downloaded()) { + // All requests have completed successfully. + let mut peers = HashMap::>::new(); + let columns = std::mem::take(&mut self.column_requests) + .into_values() + .map(|request| { + let (peer, data_column) = request.complete()?; + peers + .entry(peer) + .or_default() + .push(data_column.index as usize); + Ok(data_column) + }) + .collect::, _>>()?; + + return Ok(Some(columns)); + } + + let mut columns_to_request_by_peer = HashMap::>::new(); + + // Need to: + // - track how many active requests a peer has for load balancing + // - which peers have failures to attempt others + // - which peer returned what to have PeerGroup attributability + + for (column_index, request) in self.column_requests.iter_mut() { + if request.is_awaiting_download() { + if request.download_failures > MAX_CUSTODY_COLUMN_DOWNLOAD_ATTEMPTS { + return Err(Error::TooManyFailures); + } + + // TODO: When is a fork and only a subset of your peers know about a block, we should only + // query the peers on that fork. Should this case be handled? How to handle it? + let custodial_peers = cx.get_custodial_peers(*column_index); + + // TODO(das): cache this computation in a OneCell or similar to prevent having to + // run it every loop + let mut active_requests_by_peer = HashMap::::new(); + for batch_request in self.active_batch_columns_requests.values() { + *active_requests_by_peer + .entry(batch_request.peer_id) + .or_default() += 1; + } + + let mut priorized_peers = custodial_peers + .iter() + .map(|peer| { + ( + // De-prioritize peers that have failed to successfully respond to + // requests recently + self.failed_peers.contains(peer), + // Prefer peers with less requests to load balance across peers + active_requests_by_peer.get(peer).copied().unwrap_or(0), + // Final random factor to give all peers a shot in each retry + rand::thread_rng().gen::(), + *peer, + ) + }) + .collect::>(); + priorized_peers.sort_unstable(); + + let Some((_, _, _, peer_id)) = priorized_peers.first() else { + // Do not tolerate not having custody peers, hard error. + // TODO(das): we might implement some grace period. The request will pause for X + // seconds expecting the peer manager to find peers before failing the request. + return Err(Error::NoPeers(*column_index)); + }; + + columns_to_request_by_peer + .entry(*peer_id) + .or_default() + .push(*column_index); + } + } + + for (peer_id, indices) in columns_to_request_by_peer.into_iter() { + let request_result = cx + .data_column_lookup_request( + self.requester, + peer_id, + DataColumnsByRootSingleBlockRequest { + block_root: self.block_root, + indices: indices.clone(), + }, + ) + .map_err(Error::SendFailed)?; + + match request_result { + LookupRequestResult::RequestSent(req_id) => { + for column_index in &indices { + let column_request = self + .column_requests + .get_mut(column_index) + .ok_or(Error::BadState("unknown column_index".to_owned()))?; + + column_request.on_download_start(req_id)?; + } + + self.active_batch_columns_requests + .insert(req_id, ActiveBatchColumnsRequest { indices, peer_id }); + } + LookupRequestResult::NoRequestNeeded(_) => unreachable!(), + LookupRequestResult::Pending(_) => unreachable!(), + } + } + + Ok(None) + } +} + +/// TODO(das): this attempt count is nested into the existing lookup request count. +const MAX_CUSTODY_COLUMN_DOWNLOAD_ATTEMPTS: usize = 3; + +struct ColumnRequest { + status: Status, + download_failures: usize, +} + +#[derive(Debug, Clone)] +enum Status { + NotStarted, + Downloading(DataColumnsByRootRequestId), + Downloaded(PeerId, Arc>), +} + +impl ColumnRequest { + fn new() -> Self { + Self { + status: Status::NotStarted, + download_failures: 0, + } + } + + fn is_awaiting_download(&self) -> bool { + match self.status { + Status::NotStarted => true, + Status::Downloading { .. } | Status::Downloaded { .. } => false, + } + } + + fn is_downloaded(&self) -> bool { + match self.status { + Status::NotStarted | Status::Downloading { .. } => false, + Status::Downloaded { .. } => true, + } + } + + fn on_download_start(&mut self, req_id: DataColumnsByRootRequestId) -> Result<(), Error> { + match &self.status { + Status::NotStarted => { + self.status = Status::Downloading(req_id); + Ok(()) + } + other => Err(Error::BadState(format!( + "bad state on_download_start expected NotStarted got {other:?}" + ))), + } + } + + fn on_download_error(&mut self, req_id: DataColumnsByRootRequestId) -> Result<(), Error> { + match &self.status { + Status::Downloading(expected_req_id) => { + if req_id != *expected_req_id { + return Err(Error::UnexpectedRequestId { + expected_req_id: *expected_req_id, + req_id, + }); + } + self.status = Status::NotStarted; + Ok(()) + } + other => Err(Error::BadState(format!( + "bad state on_download_error expected Downloading got {other:?}" + ))), + } + } + + fn on_download_error_and_mark_failure( + &mut self, + req_id: DataColumnsByRootRequestId, + ) -> Result<(), Error> { + // TODO(das): Should track which peers don't have data + self.download_failures += 1; + self.on_download_error(req_id) + } + + fn on_download_success( + &mut self, + req_id: DataColumnsByRootRequestId, + peer_id: PeerId, + data_column: Arc>, + ) -> Result<(), Error> { + match &self.status { + Status::Downloading(expected_req_id) => { + if req_id != *expected_req_id { + return Err(Error::UnexpectedRequestId { + expected_req_id: *expected_req_id, + req_id, + }); + } + self.status = Status::Downloaded(peer_id, data_column); + Ok(()) + } + other => Err(Error::BadState(format!( + "bad state on_download_success expected Downloading got {other:?}" + ))), + } + } + + fn complete(self) -> Result<(PeerId, Arc>), Error> { + match self.status { + Status::Downloaded(peer_id, data_column) => Ok((peer_id, data_column)), + other => Err(Error::BadState(format!( + "bad state complete expected Downloaded got {other:?}" + ))), + } + } +}