Skip to content

Commit

Permalink
Remove DataAvailabilityView trait from ChildComponents (sigp#5421)
Browse files Browse the repository at this point in the history
* Remove DataAvailabilityView trait from ChildComponents

* PR reviews

* Update beacon_node/network/src/sync/block_lookups/common.rs

Co-authored-by: realbigsean <seananderson33@GMAIL.com>

* Merge branch 'unstable' of https://github.com/sigp/lighthouse into child_components_independent
  • Loading branch information
dapplion authored Apr 4, 2024
1 parent feb531f commit 053525e
Show file tree
Hide file tree
Showing 7 changed files with 73 additions and 96 deletions.
53 changes: 13 additions & 40 deletions beacon_node/beacon_chain/src/data_availability_checker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ pub use processing_cache::ProcessingComponents;
use slasher::test_utils::E;
use slog::{debug, error, Logger};
use slot_clock::SlotClock;
use ssz_types::FixedVector;
use std::fmt;
use std::fmt::Debug;
use std::num::NonZeroUsize;
Expand Down Expand Up @@ -112,10 +113,11 @@ impl<T: BeaconChainTypes> DataAvailabilityChecker<T> {

/// If there's no block, all possible ids will be returned that don't exist in the given blobs.
/// If there no blobs, all possible ids will be returned.
pub fn get_missing_blob_ids<V: AvailabilityView<T::EthSpec>>(
pub fn get_missing_blob_ids<V>(
&self,
block_root: Hash256,
availability_view: &V,
block: &Option<Arc<SignedBeaconBlock<T::EthSpec>>>,
blobs: &FixedVector<Option<V>, <T::EthSpec as EthSpec>::MaxBlobsPerBlock>,
) -> MissingBlobs {
let Some(current_slot) = self.slot_clock.now_or_genesis() else {
error!(
Expand All @@ -128,49 +130,20 @@ impl<T: BeaconChainTypes> DataAvailabilityChecker<T> {
let current_epoch = current_slot.epoch(T::EthSpec::slots_per_epoch());

if self.da_check_required_for_epoch(current_epoch) {
match availability_view.get_cached_block() {
match block {
Some(cached_block) => {
let block_commitments = cached_block.get_commitments();
let blob_commitments = availability_view.get_cached_blobs();

let num_blobs_expected = block_commitments.len();
let mut blob_ids = Vec::with_capacity(num_blobs_expected);

// Zip here will always limit the number of iterations to the size of
// `block_commitment` because `blob_commitments` will always be populated
// with `Option` values up to `MAX_BLOBS_PER_BLOCK`.
for (index, (block_commitment, blob_commitment_opt)) in block_commitments
.into_iter()
.zip(blob_commitments.iter())
let blob_ids = blobs
.iter()
.take(block_commitments.len())
.enumerate()
{
// Always add a missing blob.
let Some(blob_commitment) = blob_commitment_opt else {
blob_ids.push(BlobIdentifier {
.filter_map(|(index, blob_commitment_opt)| {
blob_commitment_opt.is_none().then_some(BlobIdentifier {
block_root,
index: index as u64,
});
continue;
};

let blob_commitment = *blob_commitment.get_commitment();

// Check for consistency, but this shouldn't happen, an availability view
// should guaruntee consistency.
if blob_commitment != block_commitment {
error!(self.log,
"Inconsistent availability view";
"block_root" => ?block_root,
"block_commitment" => ?block_commitment,
"blob_commitment" => ?blob_commitment,
"index" => index
);
blob_ids.push(BlobIdentifier {
block_root,
index: index as u64,
});
}
}
})
})
.collect();
MissingBlobs::KnownMissing(blob_ids)
}
None => {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
use super::child_components::ChildComponents;
use super::state_lru_cache::DietAvailabilityPendingExecutedBlock;
use crate::blob_verification::KzgVerifiedBlob;
use crate::block_verification_types::AsBlock;
Expand Down Expand Up @@ -195,14 +194,6 @@ impl_availability_view!(
verified_blobs
);

impl_availability_view!(
ChildComponents,
Arc<SignedBeaconBlock<E>>,
Arc<BlobSidecar<E>>,
downloaded_block,
downloaded_blobs
);

pub trait GetCommitments<E: EthSpec> {
fn get_commitments(&self) -> KzgCommitments<E>;
}
Expand Down Expand Up @@ -381,23 +372,6 @@ pub mod tests {
(block.into(), blobs, invalid_blobs)
}

type ChildComponentsSetup<E> = (
Arc<SignedBeaconBlock<E>>,
FixedVector<Option<Arc<BlobSidecar<E>>>, <E as EthSpec>::MaxBlobsPerBlock>,
FixedVector<Option<Arc<BlobSidecar<E>>>, <E as EthSpec>::MaxBlobsPerBlock>,
);

pub fn setup_child_components(
block: SignedBeaconBlock<E>,
valid_blobs: FixedVector<Option<Arc<BlobSidecar<E>>>, <E as EthSpec>::MaxBlobsPerBlock>,
invalid_blobs: FixedVector<Option<Arc<BlobSidecar<E>>>, <E as EthSpec>::MaxBlobsPerBlock>,
) -> ChildComponentsSetup<E> {
let blobs = FixedVector::from(valid_blobs.into_iter().cloned().collect::<Vec<_>>());
let invalid_blobs =
FixedVector::from(invalid_blobs.into_iter().cloned().collect::<Vec<_>>());
(Arc::new(block), blobs, invalid_blobs)
}

pub fn assert_cache_consistent<V: AvailabilityView<E>>(cache: V) {
if let Some(cached_block) = cache.get_cached_block() {
let cached_block_commitments = cached_block.get_commitments();
Expand Down Expand Up @@ -530,11 +504,4 @@ pub mod tests {
verified_blobs,
setup_pending_components
);
generate_tests!(
child_component_tests,
ChildComponents::<E>,
downloaded_block,
downloaded_blobs,
setup_child_components
);
}
Original file line number Diff line number Diff line change
@@ -1,9 +1,8 @@
use crate::block_verification_types::RpcBlock;
use crate::data_availability_checker::AvailabilityView;
use bls::Hash256;
use std::sync::Arc;
use types::blob_sidecar::FixedBlobSidecarList;
use types::{EthSpec, SignedBeaconBlock};
use types::{BlobSidecar, EthSpec, SignedBeaconBlock};

/// For requests triggered by an `UnknownBlockParent` or `UnknownBlobParent`, this struct
/// is used to cache components as they are sent to the network service. We can't use the
Expand Down Expand Up @@ -48,6 +47,22 @@ impl<E: EthSpec> ChildComponents<E> {
cache
}

pub fn merge_block(&mut self, block: Arc<SignedBeaconBlock<E>>) {
self.downloaded_block = Some(block);
}

pub fn merge_blob(&mut self, blob: Arc<BlobSidecar<E>>) {
if let Some(blob_ref) = self.downloaded_blobs.get_mut(blob.index as usize) {
*blob_ref = Some(blob);
}
}

pub fn merge_blobs(&mut self, blobs: FixedBlobSidecarList<E>) {
for blob in blobs.iter().flatten() {
self.merge_blob(blob.clone());
}
}

pub fn clear_blobs(&mut self) {
self.downloaded_blobs = FixedBlobSidecarList::default();
}
Expand Down
34 changes: 21 additions & 13 deletions beacon_node/network/src/sync/block_lookups/common.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ use crate::sync::block_lookups::{
use crate::sync::manager::{BlockProcessType, Id, SingleLookupReqId};
use crate::sync::network_context::SyncNetworkContext;
use beacon_chain::block_verification_types::RpcBlock;
use beacon_chain::data_availability_checker::{AvailabilityView, ChildComponents};
use beacon_chain::data_availability_checker::ChildComponents;
use beacon_chain::{get_block_root, BeaconChainTypes};
use lighthouse_network::rpc::methods::BlobsByRootRequest;
use lighthouse_network::rpc::BlocksByRootRequest;
Expand All @@ -17,7 +17,7 @@ use std::ops::IndexMut;
use std::sync::Arc;
use std::time::Duration;
use types::blob_sidecar::{BlobIdentifier, FixedBlobSidecarList};
use types::{BlobSidecar, ChainSpec, EthSpec, Hash256, SignedBeaconBlock};
use types::{BlobSidecar, ChainSpec, Hash256, SignedBeaconBlock};

#[derive(Debug, Copy, Clone)]
pub enum ResponseType {
Expand Down Expand Up @@ -371,27 +371,35 @@ impl<L: Lookup, T: BeaconChainTypes> RequestState<L, T> for BlobRequestState<L,

fn verify_response_inner(
&mut self,
_expected_block_root: Hash256,
expected_block_root: Hash256,
blob: Option<Self::ResponseType>,
peer_id: PeerId,
) -> Result<Option<FixedBlobSidecarList<T::EthSpec>>, LookupVerifyError> {
match blob {
Some(blob) => {
let received_id = blob.id();

if !self.requested_ids.contains(&received_id) {
self.state.register_failure_downloading();
Err(LookupVerifyError::UnrequestedBlobId(received_id))
} else if !blob.verify_blob_sidecar_inclusion_proof().unwrap_or(false) {
Err(LookupVerifyError::InvalidInclusionProof)
} else if blob.block_root() != expected_block_root {
Err(LookupVerifyError::UnrequestedHeader)
} else {
// State should remain downloading until we receive the stream terminator.
self.requested_ids.remove(&received_id);
let blob_index = blob.index;

if blob_index >= T::EthSpec::max_blobs_per_block() as u64 {
return Err(LookupVerifyError::InvalidIndex(blob.index));
}
*self.blob_download_queue.index_mut(blob_index as usize) = Some(blob);
Ok(None)
Ok(())
}
.map_err(|e| {
self.state.register_failure_downloading();
e
})?;

// State should remain downloading until we receive the stream terminator.
self.requested_ids.remove(&received_id);

// The inclusion proof check above ensures `blob.index` is < MAX_BLOBS_PER_BLOCK
let blob_index = blob.index;
*self.blob_download_queue.index_mut(blob_index as usize) = Some(blob);
Ok(None)
}
None => {
self.state.state = State::Processing { peer_id };
Expand Down
2 changes: 2 additions & 0 deletions beacon_node/network/src/sync/block_lookups/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -570,6 +570,8 @@ impl<T: BeaconChainTypes> BlockLookups<T> {
| ParentVerifyError::NotEnoughBlobsReturned
| ParentVerifyError::ExtraBlocksReturned
| ParentVerifyError::UnrequestedBlobId(_)
| ParentVerifyError::InvalidInclusionProof
| ParentVerifyError::UnrequestedHeader
| ParentVerifyError::ExtraBlobsReturned
| ParentVerifyError::InvalidIndex(_) => {
let e = e.into();
Expand Down
4 changes: 4 additions & 0 deletions beacon_node/network/src/sync/block_lookups/parent_lookup.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,8 @@ pub enum ParentVerifyError {
NotEnoughBlobsReturned,
ExtraBlocksReturned,
UnrequestedBlobId(BlobIdentifier),
InvalidInclusionProof,
UnrequestedHeader,
ExtraBlobsReturned,
InvalidIndex(u64),
PreviousFailure { parent_root: Hash256 },
Expand Down Expand Up @@ -244,6 +246,8 @@ impl From<LookupVerifyError> for ParentVerifyError {
E::NoBlockReturned => ParentVerifyError::NoBlockReturned,
E::ExtraBlocksReturned => ParentVerifyError::ExtraBlocksReturned,
E::UnrequestedBlobId(blob_id) => ParentVerifyError::UnrequestedBlobId(blob_id),
E::InvalidInclusionProof => ParentVerifyError::InvalidInclusionProof,
E::UnrequestedHeader => ParentVerifyError::UnrequestedHeader,
E::ExtraBlobsReturned => ParentVerifyError::ExtraBlobsReturned,
E::InvalidIndex(index) => ParentVerifyError::InvalidIndex(index),
E::NotEnoughBlobsReturned => ParentVerifyError::NotEnoughBlobsReturned,
Expand Down
24 changes: 16 additions & 8 deletions beacon_node/network/src/sync/block_lookups/single_block_lookup.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,10 @@ use crate::sync::block_lookups::common::{Lookup, RequestState};
use crate::sync::block_lookups::Id;
use crate::sync::network_context::SyncNetworkContext;
use beacon_chain::block_verification_types::RpcBlock;
use beacon_chain::data_availability_checker::ChildComponents;
use beacon_chain::data_availability_checker::{
AvailabilityCheckError, DataAvailabilityChecker, MissingBlobs,
};
use beacon_chain::data_availability_checker::{AvailabilityView, ChildComponents};
use beacon_chain::BeaconChainTypes;
use lighthouse_network::PeerAction;
use slog::{trace, Logger};
Expand All @@ -32,6 +32,8 @@ pub enum LookupVerifyError {
NoBlockReturned,
ExtraBlocksReturned,
UnrequestedBlobId(BlobIdentifier),
InvalidInclusionProof,
UnrequestedHeader,
ExtraBlobsReturned,
NotEnoughBlobsReturned,
InvalidIndex(u64),
Expand Down Expand Up @@ -247,7 +249,7 @@ impl<L: Lookup, T: BeaconChainTypes> SingleBlockLookup<L, T> {
/// Returns `true` if the block has already been downloaded.
pub(crate) fn block_already_downloaded(&self) -> bool {
if let Some(components) = self.child_components.as_ref() {
components.block_exists()
components.downloaded_block.is_some()
} else {
self.da_checker.has_block(&self.block_root())
}
Expand All @@ -274,19 +276,25 @@ impl<L: Lookup, T: BeaconChainTypes> SingleBlockLookup<L, T> {
pub(crate) fn missing_blob_ids(&self) -> MissingBlobs {
let block_root = self.block_root();
if let Some(components) = self.child_components.as_ref() {
self.da_checker.get_missing_blob_ids(block_root, components)
self.da_checker.get_missing_blob_ids(
block_root,
&components.downloaded_block,
&components.downloaded_blobs,
)
} else {
let Some(processing_availability_view) =
self.da_checker.get_processing_components(block_root)
let Some(processing_components) = self.da_checker.get_processing_components(block_root)
else {
return MissingBlobs::new_without_block(block_root, self.da_checker.is_deneb());
};
self.da_checker
.get_missing_blob_ids(block_root, &processing_availability_view)
self.da_checker.get_missing_blob_ids(
block_root,
&processing_components.block,
&processing_components.blob_commitments,
)
}
}

/// Penalizes a blob peer if it should have blobs but didn't return them to us.
/// Penalizes a blob peer if it should have blobs but didn't return them to us.
pub fn penalize_blob_peer(&mut self, cx: &SyncNetworkContext<T>) {
if let Ok(blob_peer) = self.blob_request_state.state.processing_peer() {
cx.report_peer(
Expand Down

0 comments on commit 053525e

Please sign in to comment.