Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add custody by root request to sync #6275

Closed
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
35 changes: 34 additions & 1 deletion beacon_node/beacon_chain/src/test_utils.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
use crate::block_verification_types::{AsBlock, RpcBlock};
use crate::kzg_utils::blobs_to_data_column_sidecars;
use crate::observed_operations::ObservationOutcome;
pub use crate::persisted_beacon_chain::PersistedBeaconChain;
use crate::BeaconBlockResponseWrapper;
Expand Down Expand Up @@ -82,6 +83,14 @@ pub static KZG: LazyLock<Arc<Kzg>> = LazyLock::new(|| {
Arc::new(kzg)
});

pub static KZG_PEERDAS: LazyLock<Arc<Kzg>> = LazyLock::new(|| {
let trusted_setup: TrustedSetup = serde_json::from_reader(TRUSTED_SETUP_BYTES)
.map_err(|e| format!("Unable to read trusted setup file: {}", e))
.expect("should have trusted setup");
let kzg = Kzg::new_from_trusted_setup_das_enabled(trusted_setup).expect("should create kzg");
Arc::new(kzg)
});

pub type BaseHarnessType<E, THotStore, TColdStore> =
Witness<TestingSlotClock, CachingEth1Backend<E>, E, THotStore, TColdStore>;

Expand Down Expand Up @@ -507,7 +516,13 @@ where
let validator_keypairs = self
.validator_keypairs
.expect("cannot build without validator keypairs");
let kzg = spec.deneb_fork_epoch.map(|_| KZG.clone());
let kzg = if spec.is_peer_das_scheduled() {
Some(KZG_PEERDAS.clone())
} else if spec.deneb_fork_epoch.is_some() {
Some(KZG.clone())
} else {
None
};

let validator_monitor_config = self.validator_monitor_config.unwrap_or_default();

Expand Down Expand Up @@ -2690,3 +2705,21 @@ pub fn generate_rand_block_and_blobs<E: EthSpec>(
}
(block, blob_sidecars)
}

#[allow(clippy::type_complexity)]
pub fn generate_rand_block_and_data_columns<E: EthSpec>(
fork_name: ForkName,
num_blobs: NumBlobs,
rng: &mut impl Rng,
kzg: &Kzg,
spec: &ChainSpec,
) -> (
SignedBeaconBlock<E, FullPayload<E>>,
Vec<Arc<DataColumnSidecar<E>>>,
) {
let (block, blobs) = generate_rand_block_and_blobs(fork_name, num_blobs, rng);
let blob: BlobsList<E> = blobs.into_iter().map(|b| b.blob).collect::<Vec<_>>().into();
let data_columns = blobs_to_data_column_sidecars(&blob, &block, kzg, spec).unwrap();

(block, data_columns)
}
58 changes: 53 additions & 5 deletions beacon_node/lighthouse_network/src/peer_manager/peerdb.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,7 @@
use crate::discovery::enr::PEERDAS_CUSTODY_SUBNET_COUNT_ENR_KEY;
use crate::discovery::CombinedKey;
use crate::rpc::{MetaData, MetaDataV2};
use crate::EnrExt;
use crate::{metrics, multiaddr::Multiaddr, types::Subnet, Enr, Gossipsub, PeerId};
use peer_info::{ConnectionDirection, PeerConnectionStatus, PeerInfo};
use rand::seq::SliceRandom;
Expand All @@ -12,7 +15,7 @@ use std::{
fmt::Formatter,
};
use sync_status::SyncStatus;
use types::EthSpec;
use types::{ChainSpec, DataColumnSubnetId, EthSpec};

pub mod client;
pub mod peer_info;
Expand Down Expand Up @@ -673,17 +676,62 @@ impl<E: EthSpec> PeerDB<E> {
}

/// Updates the connection state. MUST ONLY BE USED IN TESTS.
pub fn __add_connected_peer_testing_only(&mut self, peer_id: &PeerId) -> Option<BanOperation> {
pub fn __add_connected_peer_testing_only(
&mut self,
supernode: bool,
spec: &ChainSpec,
) -> PeerId {
let enr_key = CombinedKey::generate_secp256k1();
let enr = Enr::builder().build(&enr_key).unwrap();
let mut enr = Enr::builder().build(&enr_key).unwrap();
let peer_id = enr.peer_id();
let node_id = enr.node_id().raw().into();

let custody_subnet_count = if supernode {
spec.data_column_sidecar_subnet_count
} else {
spec.custody_requirement
};

enr.insert(
PEERDAS_CUSTODY_SUBNET_COUNT_ENR_KEY,
&custody_subnet_count,
&enr_key,
)
.expect("u64 can be encoded");

self.update_connection_state(
peer_id,
&peer_id,
NewConnectionState::Connected {
enr: Some(enr),
seen_address: Multiaddr::empty(),
direction: ConnectionDirection::Outgoing,
},
)
);
let peer = self.peers.get_mut(&peer_id).expect("peer exists");

// Need to insert an empty metadata to pass the condition `on_subnet_metadata`
peer.set_meta_data(MetaData::V2(MetaDataV2 {
seq_number: 0,
attnets: <_>::default(),
syncnets: <_>::default(),
}));

for subnet in
DataColumnSubnetId::compute_custody_columns::<E>(node_id, custody_subnet_count, spec)
{
// Need to pass the tests on PeerInfo
// - on_subnet_metadata: no action, `Subnet::DataColumn` returns true
// - on_subnet_gossipsub: subnets field contains subnet
// - is_good_gossipsub_peer: score >= 0, which equals default score
// peer.set_meta_data(crate::rpc::MetaData::V2(crate::rpc::MetaDataV2 {
// seq_number: 0,
// attnets: <_>::default(),
// syncnets: <_>::default(),
// }));
peer.insert_subnet(Subnet::DataColumn(subnet.into()));
}

peer_id
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is all this change required? Why not just take the latest change from das branch? The change in #6218 fixes the custody lookup tests.

}

/// The connection state of the peer has been changed. Modify the peer in the db to ensure all
Expand Down
35 changes: 30 additions & 5 deletions beacon_node/lighthouse_network/src/types/globals.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
//! A collection of variables that are accessible outside of the network thread itself.
use crate::discovery::enr::Eth2Enr;
use crate::peer_manager::peerdb::PeerDB;
use crate::rpc::{MetaData, MetaDataV2};
use crate::types::{BackFillState, SyncState};
Expand All @@ -7,7 +8,9 @@ use crate::EnrExt;
use crate::{Enr, GossipTopic, Multiaddr, PeerId};
use parking_lot::RwLock;
use std::collections::HashSet;
use types::{ChainSpec, ColumnIndex, EthSpec};
use types::{ChainSpec, ColumnIndex, DataColumnSubnetId, EthSpec};

use super::Subnet;

pub struct NetworkGlobals<E: EthSpec> {
/// The current local ENR.
Expand Down Expand Up @@ -111,10 +114,32 @@ impl<E: EthSpec> NetworkGlobals<E> {
}

/// Compute custody data columns the node is assigned to custody.
pub fn custody_columns(&self, _spec: &ChainSpec) -> Vec<ColumnIndex> {
let _enr = self.local_enr();
//TODO(das): implement ENR changes
vec![]
pub fn custody_columns(&self, spec: &ChainSpec) -> Vec<ColumnIndex> {
let enr = self.local_enr();
let node_id = enr.node_id().raw().into();
let custody_subnet_count = enr.custody_subnet_count::<E>(spec);
DataColumnSubnetId::compute_custody_columns::<E>(node_id, custody_subnet_count, spec)
.collect()
}

/// Returns a connected peer that:
/// 1. is connected
/// 2. assigned to custody the column based on it's `custody_subnet_count` from metadata (WIP)
/// 3. has a good score
/// 4. subscribed to the specified column - this condition can be removed later, so we can
/// identify and penalise peers that are supposed to custody the column.
pub fn custody_peers_for_column(
&self,
column_index: ColumnIndex,
spec: &ChainSpec,
) -> Vec<PeerId> {
self.peers
.read()
.good_peers_on_subnet(Subnet::DataColumn(
DataColumnSubnetId::from_column_index::<E>(column_index as usize, spec),
))
.cloned()
.collect::<Vec<_>>()
Copy link
Member

@jimmygchen jimmygchen Aug 21, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why not take the latest change from dasbranch and just use good_custody_subnet_peer?
This undos most the changes from #6218 - i understand selecting peers from entire peers is not the right solution, but even this doesn't fix it and just revert back to the previous version.

Copy link
Member

@jimmygchen jimmygchen Aug 21, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also happy if you'd like to change how it works on the das branch but i think it's better to target rework on the das branch before we finish the merge to unstable.

}

/// TESTING ONLY. Build a dummy NetworkGlobals instance.
Expand Down
7 changes: 6 additions & 1 deletion beacon_node/network/src/sync/block_lookups/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -481,9 +481,14 @@ impl<T: BeaconChainTypes> BlockLookups<T> {
BlockProcessType::SingleBlob { id } => {
self.on_processing_result_inner::<BlobRequestState<T::EthSpec>>(id, result, cx)
}
BlockProcessType::SingleCustodyColumn { id } => {
self.on_processing_result_inner::<CustodyRequestState<T::EthSpec>>(id, result, cx)
}
};
let id = match process_type {
BlockProcessType::SingleBlock { id } | BlockProcessType::SingleBlob { id } => id,
BlockProcessType::SingleBlock { id }
| BlockProcessType::SingleBlob { id }
| BlockProcessType::SingleCustodyColumn { id } => id,
};
self.on_lookup_result(id, lookup_result, "processing_result", cx);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -220,7 +220,7 @@ impl<T: BeaconChainTypes> SingleBlockLookup<T> {
// with this `req_id`.
request.get_state_mut().on_download_start(req_id)?
}
LookupRequestResult::NoRequestNeeded => {
LookupRequestResult::NoRequestNeeded(_reason) => {
// Lookup sync event safety: Advances this request to the terminal `Processed`
// state. If all requests reach this state, the request is marked as completed
// in `Self::continue_requests`.
Expand Down
Loading
Loading