From 1d4a54ef40ec41aac608a13888d48276a47518ad Mon Sep 17 00:00:00 2001 From: Rob Date: Wed, 24 Jul 2024 15:40:11 -0400 Subject: [PATCH 1/2] topic refactor --- crates/examples/infra/mod.rs | 12 ++-- crates/hotshot/src/lib.rs | 2 +- crates/hotshot/src/traits.rs | 2 +- .../src/traits/election/static_committee.rs | 13 ++++ .../src/traits/networking/combined_network.rs | 10 +-- .../src/traits/networking/libp2p_network.rs | 35 +++++----- .../src/traits/networking/memory_network.rs | 68 +++++++++++++++---- .../src/traits/networking/push_cdn_network.rs | 15 +++- crates/task-impls/src/network.rs | 3 +- crates/testing/src/helpers.rs | 5 ++ crates/testing/src/test_runner.rs | 6 +- crates/testing/tests/tests_1/network_task.rs | 4 ++ .../testing/tests/tests_3/memory_network.rs | 36 +++++----- crates/types/src/traits/election.rs | 6 +- crates/types/src/traits/network.rs | 23 ++++++- 15 files changed, 172 insertions(+), 68 deletions(-) diff --git a/crates/examples/infra/mod.rs b/crates/examples/infra/mod.rs index c8ca5e626d..23350e43bf 100755 --- a/crates/examples/infra/mod.rs +++ b/crates/examples/infra/mod.rs @@ -21,8 +21,8 @@ use futures::StreamExt; use hotshot::{ traits::{ implementations::{ - derive_libp2p_peer_id, CdnMetricsValue, CombinedNetworks, Libp2pMetricsValue, - Libp2pNetwork, PushCdnNetwork, Topic, WrappedSignatureKey, + derive_libp2p_peer_id, CdnMetricsValue, CdnTopic, CombinedNetworks, Libp2pMetricsValue, + Libp2pNetwork, PushCdnNetwork, WrappedSignatureKey, }, BlockPayload, NodeImplementation, }, @@ -52,7 +52,7 @@ use hotshot_types::{ traits::{ block_contents::{BlockHeader, TestableBlock}, election::Membership, - network::ConnectedNetwork, + network::{ConnectedNetwork, Topic}, node_implementation::{ConsensusTime, NodeType}, states::TestableState, }, @@ -376,6 +376,7 @@ pub trait RunDa< let quorum_membership = ::Membership::create_election( known_nodes_with_stake.clone(), known_nodes_with_stake.clone(), + Topic::Global, config.config.fixed_leader_for_gpuvid, ); @@ -384,6 +385,7 @@ pub trait RunDa< let da_membership = ::Membership::create_election( known_nodes_with_stake.clone(), config.config.known_da_nodes.clone(), + Topic::Da, config.config.fixed_leader_for_gpuvid, ); @@ -625,9 +627,9 @@ where }; // See if we should be DA, subscribe to the DA topic if so - let mut topics = vec![Topic::Global]; + let mut topics = vec![CdnTopic::Global]; if config.config.my_own_validator_config.is_da { - topics.push(Topic::Da); + topics.push(CdnTopic::Da); } // Create the network and await the initial connection diff --git a/crates/hotshot/src/lib.rs b/crates/hotshot/src/lib.rs index fa57e2a187..4e47c9d037 100644 --- a/crates/hotshot/src/lib.rs +++ b/crates/hotshot/src/lib.rs @@ -432,7 +432,7 @@ impl> SystemContext { api .network.broadcast_message( serialized_message, - da_membership.whole_committee(view_number), + da_membership.committee_topic(), BroadcastDelay::None, ), api diff --git a/crates/hotshot/src/traits.rs b/crates/hotshot/src/traits.rs index a70eca75a3..a4d6372a5f 100644 --- a/crates/hotshot/src/traits.rs +++ b/crates/hotshot/src/traits.rs @@ -18,7 +18,7 @@ pub mod implementations { }, memory_network::{MasterMap, MemoryNetwork}, push_cdn_network::{ - CdnMetricsValue, KeyPair, ProductionDef, PushCdnNetwork, TestingDef, Topic, + CdnMetricsValue, KeyPair, ProductionDef, PushCdnNetwork, TestingDef, Topic as CdnTopic, WrappedSignatureKey, }, }; diff --git a/crates/hotshot/src/traits/election/static_committee.rs b/crates/hotshot/src/traits/election/static_committee.rs index 00f6f55b60..3163d3b532 100644 --- a/crates/hotshot/src/traits/election/static_committee.rs +++ b/crates/hotshot/src/traits/election/static_committee.rs @@ -1,6 +1,7 @@ use std::{marker::PhantomData, num::NonZeroU64}; use ethereum_types::U256; +use hotshot_types::traits::network::Topic; // use ark_bls12_381::Parameters as Param381; use hotshot_types::traits::signature_key::StakeTableEntryType; use hotshot_types::{ @@ -26,6 +27,9 @@ pub struct GeneralStaticCommittee { fixed_leader_for_gpuvid: usize, /// Node type phantom _type_phantom: PhantomData, + + /// The network topic of the committee + committee_topic: Topic, } /// static committee using a vrf kp @@ -39,6 +43,7 @@ impl GeneralStaticCommittee { nodes_with_stake: Vec, nodes_without_stake: Vec, fixed_leader_for_gpuvid: usize, + committee_topic: Topic, ) -> Self { Self { all_nodes_with_stake: nodes_with_stake.clone(), @@ -46,6 +51,7 @@ impl GeneralStaticCommittee { committee_nodes_without_stake: nodes_without_stake, fixed_leader_for_gpuvid, _type_phantom: PhantomData, + committee_topic, } } } @@ -60,6 +66,11 @@ where self.committee_nodes_with_stake.clone() } + /// Get the network topic for the committee + fn committee_topic(&self) -> Topic { + self.committee_topic.clone() + } + #[cfg(not(any( feature = "randomized-leader-election", feature = "fixed-leader-election" @@ -115,6 +126,7 @@ where fn create_election( mut all_nodes: Vec>, committee_members: Vec>, + committee_topic: Topic, fixed_leader_for_gpuvid: usize, ) -> Self { let mut committee_nodes_with_stake = Vec::new(); @@ -151,6 +163,7 @@ where committee_nodes_without_stake, fixed_leader_for_gpuvid, _type_phantom: PhantomData, + committee_topic, } } diff --git a/crates/hotshot/src/traits/networking/combined_network.rs b/crates/hotshot/src/traits/networking/combined_network.rs index 250d0a1757..7c9b590144 100644 --- a/crates/hotshot/src/traits/networking/combined_network.rs +++ b/crates/hotshot/src/traits/networking/combined_network.rs @@ -32,7 +32,7 @@ use hotshot_types::{ }, data::ViewNumber, traits::{ - network::{BroadcastDelay, ConnectedNetwork, ResponseChannel}, + network::{BroadcastDelay, ConnectedNetwork, ResponseChannel, Topic}, node_implementation::NodeType, }, BoxSyncFuture, @@ -372,24 +372,24 @@ impl ConnectedNetwork for CombinedNetworks async fn broadcast_message( &self, message: Vec, - recipients: BTreeSet, + topic: Topic, broadcast_delay: BroadcastDelay, ) -> Result<(), NetworkError> { let primary = self.primary().clone(); let secondary = self.secondary().clone(); let primary_message = message.clone(); let secondary_message = message.clone(); - let primary_recipients = recipients.clone(); + let topic_clone = topic.clone(); self.send_both_networks( message, async move { primary - .broadcast_message(primary_message, primary_recipients, BroadcastDelay::None) + .broadcast_message(primary_message, topic_clone, BroadcastDelay::None) .await }, async move { secondary - .broadcast_message(secondary_message, recipients, BroadcastDelay::None) + .broadcast_message(secondary_message, topic, BroadcastDelay::None) .await }, broadcast_delay, diff --git a/crates/hotshot/src/traits/networking/libp2p_network.rs b/crates/hotshot/src/traits/networking/libp2p_network.rs index c948202c47..3ab5a48e1a 100644 --- a/crates/hotshot/src/traits/networking/libp2p_network.rs +++ b/crates/hotshot/src/traits/networking/libp2p_network.rs @@ -45,7 +45,7 @@ use hotshot_types::{ traits::{ election::Membership, metrics::{Counter, Gauge, Metrics, NoMetrics}, - network::{self, ConnectedNetwork, NetworkError, ResponseMessage}, + network::{self, ConnectedNetwork, NetworkError, ResponseMessage, Topic}, node_implementation::{ConsensusTime, NodeType}, signature_key::SignatureKey, }, @@ -60,8 +60,8 @@ use libp2p_networking::{ behaviours::request_response::{Request, Response}, spawn_network_node, MeshParams, NetworkEvent::{self, DirectRequest, DirectResponse, GossipMsg}, - NetworkNodeConfig, NetworkNodeConfigBuilder, NetworkNodeHandle, NetworkNodeHandleError, - NetworkNodeReceiver, NetworkNodeType, DEFAULT_REPLICATION_FACTOR, + NetworkNodeConfig, NetworkNodeConfigBuilder, NetworkNodeHandle, NetworkNodeReceiver, + NetworkNodeType, DEFAULT_REPLICATION_FACTOR, }, reexport::{Multiaddr, ResponseChannel}, }; @@ -529,6 +529,7 @@ impl Libp2pNetwork { pubkey_pid_map.insert(pk.clone(), network_handle.peer_id()); let mut topic_map = BiHashMap::new(); + topic_map.insert(quorum_public_keys, QC_TOPIC.to_string()); topic_map.insert(da_public_keys, "DA".to_string()); @@ -922,7 +923,7 @@ impl ConnectedNetwork for Libp2pNetwork { async fn broadcast_message( &self, message: Vec, - recipients: BTreeSet, + topic: Topic, _broadcast_delay: BroadcastDelay, ) -> Result<(), NetworkError> { // If we're not ready, return an error @@ -931,20 +932,22 @@ impl ConnectedNetwork for Libp2pNetwork { return Err(NetworkError::NotReady); }; + // Get the topic subscribers + let topic = topic.to_string(); let topic_map = self.inner.topic_map.read().await; - let topic = topic_map - .get_by_left(&recipients) - .ok_or_else(|| { - self.inner.metrics.num_failed_messages.add(1); - NetworkError::Libp2p { - source: Box::new(NetworkNodeHandleError::NoSuchTopic), - } - })? - .clone(); - // gossip doesn't broadcast from itself, so special case - if recipients.contains(&self.inner.pk) { - // send to self + // If the topic existed, + if let Some(topic_subscribers) = topic_map.get_by_right(&topic) { + // And we are subscribed, + if topic_subscribers.contains(&self.inner.pk) { + // Short-circuit-send the message to ourselves + self.inner.sender.send(message.clone()).await.map_err(|_| { + self.inner.metrics.num_failed_messages.add(1); + NetworkError::ShutDown + })?; + } + } else { + // Edge case for all nodes being subscribed to the topic self.inner.sender.send(message.clone()).await.map_err(|_| { self.inner.metrics.num_failed_messages.add(1); NetworkError::ShutDown diff --git a/crates/hotshot/src/traits/networking/memory_network.rs b/crates/hotshot/src/traits/networking/memory_network.rs index e0b95c0ce8..d1f6c22bfe 100644 --- a/crates/hotshot/src/traits/networking/memory_network.rs +++ b/crates/hotshot/src/traits/networking/memory_network.rs @@ -26,6 +26,7 @@ use hotshot_types::{ traits::{ network::{ AsyncGenerator, BroadcastDelay, ConnectedNetwork, TestableNetworkingImplementation, + Topic, }, node_implementation::NodeType, signature_key::SignatureKey, @@ -46,6 +47,10 @@ pub struct MasterMap { /// The list of `MemoryNetwork`s #[debug(skip)] map: DashMap>, + + /// The list of `MemoryNetwork`s aggregated by topic + subscribed_map: DashMap)>>, + /// The id of this `MemoryNetwork` cluster id: u64, } @@ -56,6 +61,7 @@ impl MasterMap { pub fn new() -> Arc> { Arc::new(MasterMap { map: DashMap::new(), + subscribed_map: DashMap::new(), id: rand::thread_rng().gen(), }) } @@ -102,8 +108,9 @@ impl Debug for MemoryNetwork { impl MemoryNetwork { /// Creates a new `MemoryNetwork` and hooks it up to the group through the provided `MasterMap` pub fn new( - pub_key: K, + pub_key: &K, master_map: &Arc>, + subscribed_topics: &[Topic], reliability_config: Option>, ) -> MemoryNetwork { info!("Attaching new MemoryNetwork"); @@ -142,8 +149,16 @@ impl MemoryNetwork { reliability_config, }), }; - master_map.map.insert(pub_key, mn.clone()); - trace!("Master map updated"); + // Insert our public key into the master map + master_map.map.insert(pub_key.clone(), mn.clone()); + // Insert our subscribed topics into the master map + for topic in subscribed_topics { + master_map + .subscribed_map + .entry(topic.clone()) + .or_default() + .push((pub_key.clone(), mn.clone())); + } mn } @@ -169,7 +184,7 @@ impl TestableNetworkingImplementation _expected_node_count: usize, _num_bootstrap: usize, _network_id: usize, - _da_committee_size: usize, + da_committee_size: usize, _is_da: bool, reliability_config: Option>, _secondary_network_delay: Duration, @@ -179,7 +194,22 @@ impl TestableNetworkingImplementation Box::pin(move |node_id| { let privkey = TYPES::SignatureKey::generated_from_seed_indexed([0u8; 32], node_id).1; let pubkey = TYPES::SignatureKey::from_private(&privkey); - let net = MemoryNetwork::new(pubkey, &master, reliability_config.clone()); + + // Subscribe to topics based on our index + let subscribed_topics = if node_id < da_committee_size as u64 { + // DA node + vec![Topic::Da, Topic::Global] + } else { + // Non-DA node + vec![Topic::Global] + }; + + let net = MemoryNetwork::new( + &pubkey, + &master, + &subscribed_topics, + reliability_config.clone(), + ); Box::pin(async move { net.into() }) }) } @@ -219,16 +249,20 @@ impl ConnectedNetwork for MemoryNetwork { async fn broadcast_message( &self, message: Vec, - recipients: BTreeSet, + topic: Topic, _broadcast_delay: BroadcastDelay, ) -> Result<(), NetworkError> { trace!(?message, "Broadcasting message"); - for node in &self.inner.master_map.map { + for node in self + .inner + .master_map + .subscribed_map + .entry(topic) + .or_default() + .iter() + { // TODO delay/drop etc here - let (key, node) = node.pair(); - if !recipients.contains(key) { - continue; - } + let (key, node) = node; trace!(?key, "Sending message to node"); if let Some(ref config) = &self.inner.reliability_config { { @@ -268,7 +302,17 @@ impl ConnectedNetwork for MemoryNetwork { recipients: BTreeSet, broadcast_delay: BroadcastDelay, ) -> Result<(), NetworkError> { - self.broadcast_message(message, recipients, broadcast_delay) + // Iterate over all topics, compare to recipients, and get the `Topic` + let topic = self + .inner + .master_map + .subscribed_map + .iter() + .find(|v| v.value().iter().all(|(k, _)| recipients.contains(k))) + .map(|v| v.key().clone()) + .ok_or(NetworkError::NotFound)?; + + self.broadcast_message(message, topic, broadcast_delay) .await } diff --git a/crates/hotshot/src/traits/networking/push_cdn_network.rs b/crates/hotshot/src/traits/networking/push_cdn_network.rs index 52a62e0b58..9cb0e2b142 100644 --- a/crates/hotshot/src/traits/networking/push_cdn_network.rs +++ b/crates/hotshot/src/traits/networking/push_cdn_network.rs @@ -36,7 +36,7 @@ use hotshot_types::{ data::ViewNumber, traits::{ metrics::{Counter, Metrics, NoMetrics}, - network::{BroadcastDelay, ConnectedNetwork, PushCdnNetworkError}, + network::{BroadcastDelay, ConnectedNetwork, PushCdnNetworkError, Topic as HotShotTopic}, node_implementation::NodeType, signature_key::SignatureKey, }, @@ -457,10 +457,10 @@ impl ConnectedNetwork for PushCdnNetwork, - _recipients: BTreeSet, + topic: HotShotTopic, _broadcast_delay: BroadcastDelay, ) -> Result<(), NetworkError> { - self.broadcast_message(message, Topic::Global) + self.broadcast_message(message, topic.into()) .await .map_err(|e| { self.metrics.num_failed_messages.add(1); @@ -565,3 +565,12 @@ impl ConnectedNetwork for PushCdnNetwork for Topic { + fn from(topic: HotShotTopic) -> Self { + match topic { + HotShotTopic::Global => Topic::Global, + HotShotTopic::Da => Topic::Da, + } + } +} diff --git a/crates/task-impls/src/network.rs b/crates/task-impls/src/network.rs index 3573424af2..26ea2a9249 100644 --- a/crates/task-impls/src/network.rs +++ b/crates/task-impls/src/network.rs @@ -419,6 +419,7 @@ impl< }; let view = message.kind.view_number(); let committee = membership.whole_committee(view); + let committee_topic = membership.committee_topic(); let net = Arc::clone(&self.channel); let storage = Arc::clone(&self.storage); let decided_upgrade_certificate = self.decided_upgrade_certificate.clone(); @@ -455,7 +456,7 @@ impl< net.direct_message(serialized_message, recipient).await } TransmitType::Broadcast => { - net.broadcast_message(serialized_message, committee, broadcast_delay) + net.broadcast_message(serialized_message, committee_topic, broadcast_delay) .await } TransmitType::DaCommitteeBroadcast => { diff --git a/crates/testing/src/helpers.rs b/crates/testing/src/helpers.rs index 66cb0781d8..3516597418 100644 --- a/crates/testing/src/helpers.rs +++ b/crates/testing/src/helpers.rs @@ -25,6 +25,7 @@ use hotshot_types::{ block_contents::vid_commitment, consensus_api::ConsensusApi, election::Membership, + network::Topic, node_implementation::{ConsensusTime, NodeType}, }, utils::{View, ViewInner}, @@ -70,21 +71,25 @@ pub async fn build_system_handle( quorum_membership: ::Membership::create_election( known_nodes_with_stake.clone(), known_nodes_with_stake.clone(), + Topic::Global, config.fixed_leader_for_gpuvid, ), da_membership: ::Membership::create_election( known_nodes_with_stake.clone(), config.known_da_nodes.clone(), + Topic::Da, config.fixed_leader_for_gpuvid, ), vid_membership: ::Membership::create_election( known_nodes_with_stake.clone(), known_nodes_with_stake.clone(), + Topic::Global, config.fixed_leader_for_gpuvid, ), view_sync_membership: ::Membership::create_election( known_nodes_with_stake.clone(), known_nodes_with_stake, + Topic::Global, config.fixed_leader_for_gpuvid, ), }; diff --git a/crates/testing/src/test_runner.rs b/crates/testing/src/test_runner.rs index 519621e9d1..76ccbc8625 100644 --- a/crates/testing/src/test_runner.rs +++ b/crates/testing/src/test_runner.rs @@ -28,7 +28,7 @@ use hotshot_types::{ simple_certificate::QuorumCertificate, traits::{ election::Membership, - network::ConnectedNetwork, + network::{ConnectedNetwork, Topic}, node_implementation::{ConsensusTime, NodeImplementation, NodeType}, }, HotShotConfig, ValidatorConfig, @@ -392,21 +392,25 @@ where quorum_membership: ::Membership::create_election( known_nodes_with_stake.clone(), known_nodes_with_stake.clone(), + Topic::Global, config.fixed_leader_for_gpuvid, ), da_membership: ::Membership::create_election( known_nodes_with_stake.clone(), config.known_da_nodes.clone(), + Topic::Da, config.fixed_leader_for_gpuvid, ), vid_membership: ::Membership::create_election( known_nodes_with_stake.clone(), known_nodes_with_stake.clone(), + Topic::Global, config.fixed_leader_for_gpuvid, ), view_sync_membership: ::Membership::create_election( known_nodes_with_stake.clone(), known_nodes_with_stake.clone(), + Topic::Global, config.fixed_leader_for_gpuvid, ), }; diff --git a/crates/testing/tests/tests_1/network_task.rs b/crates/testing/tests/tests_1/network_task.rs index 517ca6cf5d..d362e4f991 100644 --- a/crates/testing/tests/tests_1/network_task.rs +++ b/crates/testing/tests/tests_1/network_task.rs @@ -30,6 +30,7 @@ use hotshot_types::{ #[allow(clippy::too_many_lines)] async fn test_network_task() { use futures::StreamExt; + use hotshot_types::traits::network::Topic; async_compatibility_layer::logging::setup_logging(); async_compatibility_layer::logging::setup_backtrace(); @@ -50,6 +51,7 @@ async fn test_network_task() { let membership = ::Membership::create_election( known_nodes_with_stake.clone(), known_nodes_with_stake, + Topic::Global, config.fixed_leader_for_gpuvid, ); let network_state: NetworkEventTaskState, _> = @@ -101,6 +103,7 @@ async fn test_network_task() { #[cfg_attr(async_executor_impl = "async-std", async_std::test)] async fn test_network_storage_fail() { use futures::StreamExt; + use hotshot_types::traits::network::Topic; async_compatibility_layer::logging::setup_logging(); async_compatibility_layer::logging::setup_backtrace(); @@ -122,6 +125,7 @@ async fn test_network_storage_fail() { let membership = ::Membership::create_election( known_nodes_with_stake.clone(), known_nodes_with_stake, + Topic::Global, config.fixed_leader_for_gpuvid, ); let network_state: NetworkEventTaskState, _> = diff --git a/crates/testing/tests/tests_3/memory_network.rs b/crates/testing/tests/tests_3/memory_network.rs index b3dff52722..591f7a5147 100644 --- a/crates/testing/tests/tests_3/memory_network.rs +++ b/crates/testing/tests/tests_3/memory_network.rs @@ -1,5 +1,5 @@ #![allow(clippy::panic)] -use std::{collections::BTreeSet, sync::Arc}; +use std::sync::Arc; use async_compatibility_layer::logging::setup_logging; use hotshot::{ @@ -11,17 +11,17 @@ use hotshot::{ types::SignatureKey, }; use hotshot_example_types::{ + auction_results_provider_types::TestAuctionResultsProvider, block_types::{TestBlockHeader, TestBlockPayload, TestTransaction}, state_types::{TestInstanceState, TestValidatedState}, storage_types::TestStorage, - auction_results_provider_types::TestAuctionResultsProvider, }; use hotshot_types::{ data::ViewNumber, message::{DataMessage, Message, MessageKind, VersionedMessage}, signature_key::{BLSPubKey, BuilderKey}, traits::{ - network::{BroadcastDelay, ConnectedNetwork, TestableNetworkingImplementation}, + network::{BroadcastDelay, ConnectedNetwork, TestableNetworkingImplementation, Topic}, node_implementation::{ConsensusTime, NodeType}, }, }; @@ -48,7 +48,10 @@ pub struct Test; impl NodeType for Test { type Base = StaticVersion<0, 1>; type Upgrade = StaticVersion<0, 2>; - const UPGRADE_HASH: [u8; 32] = [1, 0, 1, 0, 0, 1, 0, 0, 0, 1, 0, 0, 0, 0, 1, 0, 0, 0, 0, 1, 0, 0, 0, 0, 0, 1, 0, 0, 0, 0, 0, 0,]; + const UPGRADE_HASH: [u8; 32] = [ + 1, 0, 1, 0, 0, 1, 0, 0, 0, 1, 0, 0, 0, 0, 1, 0, 0, 0, 0, 1, 0, 0, 0, 0, 0, 1, 0, 0, 0, 0, + 0, 0, + ]; type Time = ViewNumber; type BlockHeader = TestBlockHeader; type BlockPayload = TestBlockPayload; @@ -150,10 +153,10 @@ async fn memory_network_direct_queue() { trace!(?group); let pub_key_1 = pubkey(); - let network1 = MemoryNetwork::new(pub_key_1, &group.clone(), Option::None); + let network1 = MemoryNetwork::new(&pub_key_1, &group.clone(), &[Topic::Global], Option::None); let pub_key_2 = pubkey(); - let network2 = MemoryNetwork::new(pub_key_2, &group, Option::None); + let network2 = MemoryNetwork::new(&pub_key_2, &group, &[Topic::Global], Option::None); let first_messages: Vec> = gen_messages(5, 100, pub_key_1); @@ -201,14 +204,12 @@ async fn memory_network_direct_queue() { #[cfg_attr(async_executor_impl = "async-std", async_std::test)] #[instrument] async fn memory_network_broadcast_queue() { - setup_logging(); // Make and connect the networking instances let group: Arc::SignatureKey>> = MasterMap::new(); - trace!(?group); let pub_key_1 = pubkey(); - let network1 = MemoryNetwork::new(pub_key_1, &group.clone(), Option::None); + let network1 = MemoryNetwork::new(&pub_key_1, &group.clone(), &[Topic::Global], Option::None); let pub_key_2 = pubkey(); - let network2 = MemoryNetwork::new(pub_key_2, &group, Option::None); + let network2 = MemoryNetwork::new(&pub_key_2, &group, &[Topic::Da], Option::None); let first_messages: Vec> = gen_messages(5, 100, pub_key_1); @@ -217,11 +218,7 @@ async fn memory_network_broadcast_queue() { for sent_message in first_messages { let serialized_message = VersionedMessage::serialize(&sent_message, &None).unwrap(); network1 - .broadcast_message( - serialized_message.clone(), - vec![pub_key_2].into_iter().collect::>(), - BroadcastDelay::None, - ) + .broadcast_message(serialized_message.clone(), Topic::Da, BroadcastDelay::None) .await .expect("Failed to message node"); let mut recv_messages = network2 @@ -243,7 +240,7 @@ async fn memory_network_broadcast_queue() { network2 .broadcast_message( serialized_message.clone(), - vec![pub_key_1].into_iter().collect::>(), + Topic::Global, BroadcastDelay::None, ) .await @@ -269,13 +266,12 @@ async fn memory_network_test_in_flight_message_count() { let group: Arc::SignatureKey>> = MasterMap::new(); trace!(?group); let pub_key_1 = pubkey(); - let network1 = MemoryNetwork::new(pub_key_1, &group.clone(), Option::None); + let network1 = MemoryNetwork::new(&pub_key_1, &group.clone(), &[Topic::Global], Option::None); let pub_key_2 = pubkey(); - let network2 = MemoryNetwork::new(pub_key_2, &group, Option::None); + let network2 = MemoryNetwork::new(&pub_key_2, &group, &[Topic::Global], Option::None); // Create some dummy messages let messages: Vec> = gen_messages(5, 100, pub_key_1); - let broadcast_recipients = BTreeSet::from([pub_key_1, pub_key_2]); assert_eq!( TestableNetworkingImplementation::::in_flight_message_count(&network1), @@ -302,7 +298,7 @@ async fn memory_network_test_in_flight_message_count() { network2 .broadcast_message( serialized_message.clone(), - broadcast_recipients.clone(), + Topic::Global, BroadcastDelay::None, ) .await diff --git a/crates/types/src/traits/election.rs b/crates/types/src/traits/election.rs index c192e94680..9a7012bb00 100644 --- a/crates/types/src/traits/election.rs +++ b/crates/types/src/traits/election.rs @@ -7,7 +7,7 @@ use std::{collections::BTreeSet, fmt::Debug, hash::Hash, num::NonZeroU64}; use snafu::Snafu; -use super::node_implementation::NodeType; +use super::{network::Topic, node_implementation::NodeType}; use crate::{traits::signature_key::SignatureKey, PeerConfig}; /// Error for election problems @@ -30,6 +30,7 @@ pub trait Membership: fn create_election( all_nodes: Vec>, committee_members: Vec>, + committee_topic: Topic, fixed_leader_for_gpuvid: usize, ) -> Self; @@ -50,6 +51,9 @@ pub trait Membership: /// Get whole (staked + non-staked) committee for view `view_number`. fn whole_committee(&self, view_number: TYPES::Time) -> BTreeSet; + /// Get the network topic for the committee + fn committee_topic(&self) -> Topic; + /// Check if a key has stake fn has_stake(&self, pub_key: &TYPES::SignatureKey) -> bool; diff --git a/crates/types/src/traits/network.rs b/crates/types/src/traits/network.rs index 7fecd87c9c..e602c5598f 100644 --- a/crates/types/src/traits/network.rs +++ b/crates/types/src/traits/network.rs @@ -18,7 +18,7 @@ use tokio::time::error::Elapsed as TimeoutError; compile_error! {"Either config option \"async-std\" or \"tokio\" must be enabled for this crate."} use std::{ collections::{BTreeSet, HashMap}, - fmt::Debug, + fmt::{Debug, Display}, hash::Hash, pin::Pin, sync::Arc, @@ -274,7 +274,7 @@ pub trait ConnectedNetwork: Clone + Send + Sync + 'st async fn broadcast_message( &self, message: Vec, - recipients: BTreeSet, + topic: Topic, broadcast_delay: BroadcastDelay, ) -> Result<(), NetworkError>; @@ -674,3 +674,22 @@ impl NetworkReliability for ChaosNetwork { Uniform::new_inclusive(self.repeat_low, self.repeat_high).sample(&mut rand::thread_rng()) } } + +/// Used when broadcasting messages +#[derive(Clone, Debug, PartialEq, Eq, Hash)] +pub enum Topic { + /// The `Global` topic goes out to all nodes + Global, + /// The `Da` topic goes out to only the DA committee + Da, +} + +/// Libp2p topics require a string, so we need to convert our enum to a string +impl Display for Topic { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + match self { + Topic::Global => write!(f, "global"), + Topic::Da => write!(f, "DA"), + } + } +} From 472d623f538f80d1bb47615ab34cc196b55c5a29 Mon Sep 17 00:00:00 2001 From: Rob Date: Fri, 26 Jul 2024 10:57:28 -0400 Subject: [PATCH 2/2] remove topic map --- Cargo.lock | 12 ++--- .../src/traits/networking/libp2p_network.rs | 47 +++++-------------- 2 files changed, 17 insertions(+), 42 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 27ff561046..317d29cd5c 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1452,9 +1452,9 @@ dependencies = [ [[package]] name = "clap" -version = "4.5.10" +version = "4.5.11" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "8f6b81fb3c84f5563d509c59b5a48d935f689e993afa90fe39047f05adef9142" +checksum = "35723e6a11662c2afb578bcf0b88bf6ea8e21282a953428f240574fcc3a2b5b3" dependencies = [ "clap_builder", "clap_derive", @@ -1462,9 +1462,9 @@ dependencies = [ [[package]] name = "clap_builder" -version = "4.5.10" +version = "4.5.11" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "5ca6706fd5224857d9ac5eb9355f6683563cc0541c7cd9d014043b57cbec78ac" +checksum = "49eb96cbfa7cfa35017b7cd548c75b14c3118c98b423041d70562665e07fb0fa" dependencies = [ "anstream", "anstyle", @@ -1474,9 +1474,9 @@ dependencies = [ [[package]] name = "clap_derive" -version = "4.5.8" +version = "4.5.11" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "2bac35c6dafb060fd4d275d9a4ffae97917c13a6327903a8be2153cd964f7085" +checksum = "5d029b67f89d30bbb547c89fd5161293c0aec155fc691d7924b64550662db93e" dependencies = [ "heck 0.5.0", "proc-macro2", diff --git a/crates/hotshot/src/traits/networking/libp2p_network.rs b/crates/hotshot/src/traits/networking/libp2p_network.rs index 3ab5a48e1a..6af0d9ba81 100644 --- a/crates/hotshot/src/traits/networking/libp2p_network.rs +++ b/crates/hotshot/src/traits/networking/libp2p_network.rs @@ -166,10 +166,8 @@ struct Libp2pNetworkInner { is_bootstrapped: Arc, /// The Libp2p metrics we're managing metrics: Libp2pMetricsValue, - /// topic map - /// hash(hashset) -> topic - /// btreemap ordered so is hashable - topic_map: RwLock, String>>, + /// The list of topics we're subscribed to + subscribed_topics: HashSet, /// the latest view number (for node lookup purposes) /// NOTE: supposed to represent a ViewNumber but we /// haven't made that atomic yet and we prefer lock-free @@ -298,7 +296,6 @@ impl TestableNetworkingImplementation .unwrap() }; let bootstrap_addrs_ref = Arc::clone(&bootstrap_addrs); - let keys = all_keys.clone(); let da = da_keys.clone(); let reliability_config_dup = reliability_config.clone(); Box::pin(async move { @@ -309,10 +306,8 @@ impl TestableNetworkingImplementation pubkey.clone(), bootstrap_addrs_ref, usize::try_from(node_id).unwrap(), - keys, #[cfg(feature = "hotshot-testing")] reliability_config_dup, - da.clone(), da.contains(&pubkey), ) .await @@ -453,12 +448,8 @@ impl Libp2pNetwork { pub_key.clone(), Arc::new(RwLock::new(bootstrap_nodes)), usize::try_from(config.node_index)?, - // NOTE: this introduces an invariant that the keys are assigned using this indexed - // function - all_keys, #[cfg(feature = "hotshot-testing")] None, - da_keys.clone(), da_keys.contains(pub_key), ) .await?) @@ -499,10 +490,7 @@ impl Libp2pNetwork { pk: K, bootstrap_addrs: BootstrapAddrs, id: usize, - // HACK - quorum_public_keys: BTreeSet, #[cfg(feature = "hotshot-testing")] reliability_config: Option>, - da_public_keys: BTreeSet, is_da: bool, ) -> Result, NetworkError> { // Error if there were no bootstrap nodes specified @@ -528,12 +516,11 @@ impl Libp2pNetwork { let mut pubkey_pid_map = BiHashMap::new(); pubkey_pid_map.insert(pk.clone(), network_handle.peer_id()); - let mut topic_map = BiHashMap::new(); - - topic_map.insert(quorum_public_keys, QC_TOPIC.to_string()); - topic_map.insert(da_public_keys, "DA".to_string()); - - let topic_map = RwLock::new(topic_map); + // Subscribe to the relevant topics + let mut subscribed_topics = HashSet::from_iter(vec![QC_TOPIC.to_string()]); + if is_da { + subscribed_topics.insert("DA".to_string()); + } // unbounded channels may not be the best choice (spammed?) // if bounded figure out a way to log dropped msgs @@ -557,7 +544,7 @@ impl Libp2pNetwork { dht_timeout: Duration::from_secs(120), is_bootstrapped: Arc::new(AtomicBool::new(false)), metrics, - topic_map, + subscribed_topics, node_lookup_send, // Start the latest view from 0. "Latest" refers to "most recent view we are polling for // proposals on". We need this because to have consensus info injected we need a working @@ -932,22 +919,10 @@ impl ConnectedNetwork for Libp2pNetwork { return Err(NetworkError::NotReady); }; - // Get the topic subscribers + // If we are subscribed to the topic, let topic = topic.to_string(); - let topic_map = self.inner.topic_map.read().await; - - // If the topic existed, - if let Some(topic_subscribers) = topic_map.get_by_right(&topic) { - // And we are subscribed, - if topic_subscribers.contains(&self.inner.pk) { - // Short-circuit-send the message to ourselves - self.inner.sender.send(message.clone()).await.map_err(|_| { - self.inner.metrics.num_failed_messages.add(1); - NetworkError::ShutDown - })?; - } - } else { - // Edge case for all nodes being subscribed to the topic + if self.inner.subscribed_topics.contains(&topic) { + // Short-circuit-send the message to ourselves self.inner.sender.send(message.clone()).await.map_err(|_| { self.inner.metrics.num_failed_messages.add(1); NetworkError::ShutDown