Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add sync lookup custody request state #6257

Merged
merged 4 commits into from
Aug 15, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 18 additions & 0 deletions beacon_node/beacon_chain/src/beacon_chain.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6776,6 +6776,24 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
self.data_availability_checker.data_availability_boundary()
}

/// Returns true if epoch is within the data availability boundary
pub fn da_check_required_for_epoch(&self, epoch: Epoch) -> bool {
self.data_availability_checker
.da_check_required_for_epoch(epoch)
}

/// Returns true if we should fetch blobs for this block
pub fn should_fetch_blobs(&self, block_epoch: Epoch) -> bool {
self.da_check_required_for_epoch(block_epoch)
&& !self.spec.is_peer_das_enabled_for_epoch(block_epoch)
}

/// Returns true if we should fetch custody columns for this block
pub fn should_fetch_custody_columns(&self, block_epoch: Epoch) -> bool {
self.da_check_required_for_epoch(block_epoch)
&& self.spec.is_peer_das_enabled_for_epoch(block_epoch)
}

pub fn logger(&self) -> &Logger {
&self.log
}
Expand Down
9 changes: 9 additions & 0 deletions beacon_node/beacon_chain/src/data_availability_checker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -154,6 +154,15 @@ impl<T: BeaconChainTypes> DataAvailabilityChecker<T> {
})
}

/// Return the set of imported custody column indexes for `block_root`. Returns None if there is
/// no block component for `block_root`.
pub fn imported_custody_column_indexes(&self, block_root: &Hash256) -> Option<Vec<u64>> {
self.availability_cache
.peek_pending_components(block_root, |components| {
components.map(|components| components.get_cached_data_columns_indices())
})
}

/// Get a blob from the availability cache.
pub fn get_blob(
&self,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ use ssz_types::{FixedVector, VariableList};
use std::num::NonZeroUsize;
use std::sync::Arc;
use types::blob_sidecar::BlobIdentifier;
use types::{BlobSidecar, ChainSpec, Epoch, EthSpec, Hash256, SignedBeaconBlock};
use types::{BlobSidecar, ChainSpec, ColumnIndex, Epoch, EthSpec, Hash256, SignedBeaconBlock};

/// This represents the components of a partially available block
///
Expand Down Expand Up @@ -108,6 +108,14 @@ impl<E: EthSpec> PendingComponents<E> {
self.verified_data_columns.len()
}

/// Returns the indices of cached custody columns
pub fn get_cached_data_columns_indices(&self) -> Vec<ColumnIndex> {
self.verified_data_columns
.iter()
.map(|d| d.index())
.collect()
}

/// Inserts a block into the cache.
pub fn insert_block(&mut self, block: DietAvailabilityPendingExecutedBlock<E>) {
*self.get_cached_block_mut() = Some(block)
Expand Down
9 changes: 8 additions & 1 deletion beacon_node/lighthouse_network/src/types/globals.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ use crate::EnrExt;
use crate::{Enr, GossipTopic, Multiaddr, PeerId};
use parking_lot::RwLock;
use std::collections::HashSet;
use types::EthSpec;
use types::{ChainSpec, ColumnIndex, EthSpec};

pub struct NetworkGlobals<E: EthSpec> {
/// The current local ENR.
Expand Down Expand Up @@ -110,6 +110,13 @@ impl<E: EthSpec> NetworkGlobals<E> {
std::mem::replace(&mut *self.sync_state.write(), new_state)
}

/// Compute custody data columns the node is assigned to custody.
pub fn custody_columns(&self, _spec: &ChainSpec) -> Vec<ColumnIndex> {
let _enr = self.local_enr();
//TODO(das): implement ENR changes
vec![]
}

/// TESTING ONLY. Build a dummy NetworkGlobals instance.
pub fn new_test_globals(trusted_peers: Vec<PeerId>, log: &slog::Logger) -> NetworkGlobals<E> {
use crate::CombinedKeyExt;
Expand Down
66 changes: 54 additions & 12 deletions beacon_node/network/src/sync/block_lookups/common.rs
Original file line number Diff line number Diff line change
@@ -1,14 +1,16 @@
use crate::sync::block_lookups::single_block_lookup::{
LookupRequestError, SingleBlockLookup, SingleLookupRequestState,
};
use crate::sync::block_lookups::{BlobRequestState, BlockRequestState, PeerId};
use crate::sync::block_lookups::{
BlobRequestState, BlockRequestState, CustodyRequestState, PeerId,
};
use crate::sync::network_context::{LookupRequestResult, SyncNetworkContext};
use beacon_chain::block_verification_types::RpcBlock;
use beacon_chain::BeaconChainTypes;
use lighthouse_network::service::api_types::Id;
use std::sync::Arc;
use types::blob_sidecar::FixedBlobSidecarList;
use types::SignedBeaconBlock;
use types::{DataColumnSidecarList, SignedBeaconBlock};

use super::single_block_lookup::DownloadResult;
use super::SingleLookupId;
Expand All @@ -17,6 +19,7 @@ use super::SingleLookupId;
pub enum ResponseType {
Block,
Blob,
CustodyColumn,
}

/// This trait unifies common single block lookup functionality across blocks and blobs. This
Expand All @@ -38,7 +41,7 @@ pub trait RequestState<T: BeaconChainTypes> {
&self,
id: Id,
peer_id: PeerId,
downloaded_block_expected_blobs: Option<usize>,
downloaded_block: Option<Arc<SignedBeaconBlock<T::EthSpec>>>,
cx: &mut SyncNetworkContext<T>,
) -> Result<LookupRequestResult, LookupRequestError>;

Expand Down Expand Up @@ -73,7 +76,7 @@ impl<T: BeaconChainTypes> RequestState<T> for BlockRequestState<T::EthSpec> {
&self,
id: SingleLookupId,
peer_id: PeerId,
_: Option<usize>,
_: Option<Arc<SignedBeaconBlock<T::EthSpec>>>,
cx: &mut SyncNetworkContext<T>,
) -> Result<LookupRequestResult, LookupRequestError> {
cx.block_lookup_request(id, peer_id, self.requested_block_root)
Expand Down Expand Up @@ -121,16 +124,11 @@ impl<T: BeaconChainTypes> RequestState<T> for BlobRequestState<T::EthSpec> {
&self,
id: Id,
peer_id: PeerId,
downloaded_block_expected_blobs: Option<usize>,
downloaded_block: Option<Arc<SignedBeaconBlock<T::EthSpec>>>,
cx: &mut SyncNetworkContext<T>,
) -> Result<LookupRequestResult, LookupRequestError> {
cx.blob_lookup_request(
id,
peer_id,
self.block_root,
downloaded_block_expected_blobs,
)
.map_err(LookupRequestError::SendFailedNetwork)
cx.blob_lookup_request(id, peer_id, self.block_root, downloaded_block)
.map_err(LookupRequestError::SendFailedNetwork)
}

fn send_for_processing(
Expand Down Expand Up @@ -161,3 +159,47 @@ impl<T: BeaconChainTypes> RequestState<T> for BlobRequestState<T::EthSpec> {
&mut self.state
}
}

impl<T: BeaconChainTypes> RequestState<T> for CustodyRequestState<T::EthSpec> {
type VerifiedResponseType = DataColumnSidecarList<T::EthSpec>;

fn make_request(
&self,
id: Id,
// TODO(das): consider selecting peers that have custody but are in this set
_peer_id: PeerId,
downloaded_block: Option<Arc<SignedBeaconBlock<T::EthSpec>>>,
cx: &mut SyncNetworkContext<T>,
) -> Result<LookupRequestResult, LookupRequestError> {
cx.custody_lookup_request(id, self.block_root, downloaded_block)
.map_err(LookupRequestError::SendFailedNetwork)
}

fn send_for_processing(
id: Id,
download_result: DownloadResult<Self::VerifiedResponseType>,
cx: &SyncNetworkContext<T>,
) -> Result<(), LookupRequestError> {
let DownloadResult {
value,
block_root,
seen_timestamp,
..
} = download_result;
cx.send_custody_columns_for_processing(id, block_root, value, seen_timestamp)
.map_err(LookupRequestError::SendFailedProcessor)
}

fn response_type() -> ResponseType {
ResponseType::CustodyColumn
}
fn request_state_mut(request: &mut SingleBlockLookup<T>) -> &mut Self {
&mut request.custody_request_state
}
fn get_state(&self) -> &SingleLookupRequestState<Self::VerifiedResponseType> {
&self.state
}
fn get_state_mut(&mut self) -> &mut SingleLookupRequestState<Self::VerifiedResponseType> {
&mut self.state
}
}
5 changes: 3 additions & 2 deletions beacon_node/network/src/sync/block_lookups/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ use fnv::FnvHashMap;
use lighthouse_network::service::api_types::SingleLookupReqId;
use lighthouse_network::{PeerAction, PeerId};
use lru_cache::LRUTimeCache;
pub use single_block_lookup::{BlobRequestState, BlockRequestState};
pub use single_block_lookup::{BlobRequestState, BlockRequestState, CustodyRequestState};
use slog::{debug, error, warn, Logger};
use std::collections::hash_map::Entry;
use std::sync::Arc;
Expand Down Expand Up @@ -527,7 +527,7 @@ impl<T: BeaconChainTypes> BlockLookups<T> {
// if both components have been processed.
request_state.on_processing_success()?;

if lookup.both_components_processed() {
if lookup.all_components_processed() {
// We don't request for other block components until being sure that the block has
// data. If we request blobs / columns to a peer we are sure those must exist.
// Therefore if all components are processed and we still receive `MissingComponents`
Expand Down Expand Up @@ -599,6 +599,7 @@ impl<T: BeaconChainTypes> BlockLookups<T> {
match R::response_type() {
ResponseType::Block => "lookup_block_processing_failure",
ResponseType::Blob => "lookup_blobs_processing_failure",
ResponseType::CustodyColumn => "lookup_custody_processing_failure",
},
);

Expand Down
49 changes: 35 additions & 14 deletions beacon_node/network/src/sync/block_lookups/single_block_lookup.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ use std::time::{Duration, Instant};
use store::Hash256;
use strum::IntoStaticStr;
use types::blob_sidecar::FixedBlobSidecarList;
use types::{EthSpec, SignedBeaconBlock};
use types::{DataColumnSidecarList, EthSpec, SignedBeaconBlock};

// Dedicated enum for LookupResult to force its usage
#[must_use = "LookupResult must be handled with on_lookup_result"]
Expand Down Expand Up @@ -63,6 +63,7 @@ pub struct SingleBlockLookup<T: BeaconChainTypes> {
pub id: Id,
pub block_request_state: BlockRequestState<T::EthSpec>,
pub blob_request_state: BlobRequestState<T::EthSpec>,
pub custody_request_state: CustodyRequestState<T::EthSpec>,
/// Peers that claim to have imported this set of block components
#[derivative(Debug(format_with = "fmt_peer_set_as_len"))]
peers: HashSet<PeerId>,
Expand All @@ -82,6 +83,7 @@ impl<T: BeaconChainTypes> SingleBlockLookup<T> {
id,
block_request_state: BlockRequestState::new(requested_block_root),
blob_request_state: BlobRequestState::new(requested_block_root),
custody_request_state: CustodyRequestState::new(requested_block_root),
peers: HashSet::from_iter(peers.iter().copied()),
block_root: requested_block_root,
awaiting_parent,
Expand Down Expand Up @@ -138,16 +140,18 @@ impl<T: BeaconChainTypes> SingleBlockLookup<T> {
}

/// Returns true if the block has already been downloaded.
pub fn both_components_processed(&self) -> bool {
pub fn all_components_processed(&self) -> bool {
self.block_request_state.state.is_processed()
&& self.blob_request_state.state.is_processed()
&& self.custody_request_state.state.is_processed()
}

/// Returns true if this request is expecting some event to make progress
pub fn is_awaiting_event(&self) -> bool {
self.awaiting_parent.is_some()
|| self.block_request_state.state.is_awaiting_event()
|| self.blob_request_state.state.is_awaiting_event()
|| self.custody_request_state.state.is_awaiting_event()
}

/// Makes progress on all requests of this lookup. Any error is not recoverable and must result
Expand All @@ -159,13 +163,12 @@ impl<T: BeaconChainTypes> SingleBlockLookup<T> {
// TODO: Check what's necessary to download, specially for blobs
self.continue_request::<BlockRequestState<T::EthSpec>>(cx)?;
self.continue_request::<BlobRequestState<T::EthSpec>>(cx)?;
self.continue_request::<CustodyRequestState<T::EthSpec>>(cx)?;

// If all components of this lookup are already processed, there will be no future events
// that can make progress so it must be dropped. Consider the lookup completed.
// This case can happen if we receive the components from gossip during a retry.
if self.block_request_state.state.is_processed()
&& self.blob_request_state.state.is_processed()
{
if self.all_components_processed() {
Ok(LookupResult::Completed)
} else {
Ok(LookupResult::Pending)
Expand All @@ -179,11 +182,11 @@ impl<T: BeaconChainTypes> SingleBlockLookup<T> {
) -> Result<(), LookupRequestError> {
let id = self.id;
let awaiting_parent = self.awaiting_parent.is_some();
let downloaded_block_expected_blobs = self
let downloaded_block = self
.block_request_state
.state
.peek_downloaded_data()
.map(|block| block.num_expected_blobs());
.cloned();
let block_is_processed = self.block_request_state.state.is_processed();
let request = R::request_state_mut(self);

Expand All @@ -210,7 +213,7 @@ impl<T: BeaconChainTypes> SingleBlockLookup<T> {
};

let request = R::request_state_mut(self);
match request.make_request(id, peer_id, downloaded_block_expected_blobs, cx)? {
match request.make_request(id, peer_id, downloaded_block, cx)? {
LookupRequestResult::RequestSent(req_id) => {
// Lookup sync event safety: If make_request returns `RequestSent`, we are
// guaranteed that `BlockLookups::on_download_response` will be called exactly
Expand Down Expand Up @@ -289,6 +292,24 @@ impl<T: BeaconChainTypes> SingleBlockLookup<T> {
}
}

/// The state of the block request component of a `SingleBlockLookup`.
#[derive(Derivative)]
#[derivative(Debug)]
pub struct BlockRequestState<E: EthSpec> {
#[derivative(Debug = "ignore")]
pub requested_block_root: Hash256,
pub state: SingleLookupRequestState<Arc<SignedBeaconBlock<E>>>,
}

impl<E: EthSpec> BlockRequestState<E> {
pub fn new(block_root: Hash256) -> Self {
Self {
requested_block_root: block_root,
state: SingleLookupRequestState::new(),
}
}
}

/// The state of the blob request component of a `SingleBlockLookup`.
#[derive(Derivative)]
#[derivative(Debug)]
Expand All @@ -307,19 +328,19 @@ impl<E: EthSpec> BlobRequestState<E> {
}
}

/// The state of the block request component of a `SingleBlockLookup`.
/// The state of the custody request component of a `SingleBlockLookup`.
#[derive(Derivative)]
#[derivative(Debug)]
pub struct BlockRequestState<E: EthSpec> {
pub struct CustodyRequestState<E: EthSpec> {
#[derivative(Debug = "ignore")]
pub requested_block_root: Hash256,
pub state: SingleLookupRequestState<Arc<SignedBeaconBlock<E>>>,
pub block_root: Hash256,
pub state: SingleLookupRequestState<DataColumnSidecarList<E>>,
}

impl<E: EthSpec> BlockRequestState<E> {
impl<E: EthSpec> CustodyRequestState<E> {
pub fn new(block_root: Hash256) -> Self {
Self {
requested_block_root: block_root,
block_root,
state: SingleLookupRequestState::new(),
}
}
Expand Down
2 changes: 2 additions & 0 deletions beacon_node/network/src/sync/block_lookups/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -723,6 +723,8 @@ impl TestRig {
(ev.work_type() == beacon_processor::RPC_BLOBS).then_some(())
})
.unwrap_or_else(|e| panic!("Expected blobs work event: {e}")),
// TODO(das): remove todo when adding tests for custody sync lookup
ResponseType::CustodyColumn => todo!(),
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
ResponseType::CustodyColumn => todo!(),
ResponseType::CustodyColumn => todo!(), //TODO(das)

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added comment in 96e2363

}
}

Expand Down
Loading