Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Topic refactor #3483

Merged
merged 2 commits into from
Jul 29, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 6 additions & 6 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

12 changes: 7 additions & 5 deletions crates/examples/infra/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,8 @@ use futures::StreamExt;
use hotshot::{
traits::{
implementations::{
derive_libp2p_peer_id, CdnMetricsValue, CombinedNetworks, Libp2pMetricsValue,
Libp2pNetwork, PushCdnNetwork, Topic, WrappedSignatureKey,
derive_libp2p_peer_id, CdnMetricsValue, CdnTopic, CombinedNetworks, Libp2pMetricsValue,
Libp2pNetwork, PushCdnNetwork, WrappedSignatureKey,
},
BlockPayload, NodeImplementation,
},
Expand Down Expand Up @@ -52,7 +52,7 @@ use hotshot_types::{
traits::{
block_contents::{BlockHeader, TestableBlock},
election::Membership,
network::ConnectedNetwork,
network::{ConnectedNetwork, Topic},
node_implementation::{ConsensusTime, NodeType},
states::TestableState,
},
Expand Down Expand Up @@ -376,6 +376,7 @@ pub trait RunDa<
let quorum_membership = <TYPES as NodeType>::Membership::create_election(
known_nodes_with_stake.clone(),
known_nodes_with_stake.clone(),
Topic::Global,
config.config.fixed_leader_for_gpuvid,
);

Expand All @@ -384,6 +385,7 @@ pub trait RunDa<
let da_membership = <TYPES as NodeType>::Membership::create_election(
known_nodes_with_stake.clone(),
config.config.known_da_nodes.clone(),
Topic::Da,
config.config.fixed_leader_for_gpuvid,
);

Expand Down Expand Up @@ -625,9 +627,9 @@ where
};

// See if we should be DA, subscribe to the DA topic if so
let mut topics = vec![Topic::Global];
let mut topics = vec![CdnTopic::Global];
if config.config.my_own_validator_config.is_da {
topics.push(Topic::Da);
topics.push(CdnTopic::Da);
}

// Create the network and await the initial connection
Expand Down
2 changes: 1 addition & 1 deletion crates/hotshot/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -432,7 +432,7 @@ impl<TYPES: NodeType, I: NodeImplementation<TYPES>> SystemContext<TYPES, I> {
api
.network.broadcast_message(
serialized_message,
da_membership.whole_committee(view_number),
da_membership.committee_topic(),
BroadcastDelay::None,
),
api
Expand Down
2 changes: 1 addition & 1 deletion crates/hotshot/src/traits.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ pub mod implementations {
},
memory_network::{MasterMap, MemoryNetwork},
push_cdn_network::{
CdnMetricsValue, KeyPair, ProductionDef, PushCdnNetwork, TestingDef, Topic,
CdnMetricsValue, KeyPair, ProductionDef, PushCdnNetwork, TestingDef, Topic as CdnTopic,
WrappedSignatureKey,
},
};
Expand Down
13 changes: 13 additions & 0 deletions crates/hotshot/src/traits/election/static_committee.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
use std::{marker::PhantomData, num::NonZeroU64};

use ethereum_types::U256;
use hotshot_types::traits::network::Topic;
// use ark_bls12_381::Parameters as Param381;
use hotshot_types::traits::signature_key::StakeTableEntryType;
use hotshot_types::{
Expand All @@ -26,6 +27,9 @@ pub struct GeneralStaticCommittee<T, PUBKEY: SignatureKey> {
fixed_leader_for_gpuvid: usize,
/// Node type phantom
_type_phantom: PhantomData<T>,

/// The network topic of the committee
committee_topic: Topic,
}

/// static committee using a vrf kp
Expand All @@ -39,13 +43,15 @@ impl<T, PUBKEY: SignatureKey> GeneralStaticCommittee<T, PUBKEY> {
nodes_with_stake: Vec<PUBKEY::StakeTableEntry>,
nodes_without_stake: Vec<PUBKEY>,
fixed_leader_for_gpuvid: usize,
committee_topic: Topic,
) -> Self {
Self {
all_nodes_with_stake: nodes_with_stake.clone(),
committee_nodes_with_stake: nodes_with_stake,
committee_nodes_without_stake: nodes_without_stake,
fixed_leader_for_gpuvid,
_type_phantom: PhantomData,
committee_topic,
}
}
}
Expand All @@ -60,6 +66,11 @@ where
self.committee_nodes_with_stake.clone()
}

/// Get the network topic for the committee
fn committee_topic(&self) -> Topic {
self.committee_topic.clone()
}

#[cfg(not(any(
feature = "randomized-leader-election",
feature = "fixed-leader-election"
Expand Down Expand Up @@ -115,6 +126,7 @@ where
fn create_election(
mut all_nodes: Vec<PeerConfig<PUBKEY>>,
committee_members: Vec<PeerConfig<PUBKEY>>,
committee_topic: Topic,
fixed_leader_for_gpuvid: usize,
) -> Self {
let mut committee_nodes_with_stake = Vec::new();
Expand Down Expand Up @@ -151,6 +163,7 @@ where
committee_nodes_without_stake,
fixed_leader_for_gpuvid,
_type_phantom: PhantomData,
committee_topic,
}
}

Expand Down
10 changes: 5 additions & 5 deletions crates/hotshot/src/traits/networking/combined_network.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ use hotshot_types::{
},
data::ViewNumber,
traits::{
network::{BroadcastDelay, ConnectedNetwork, ResponseChannel},
network::{BroadcastDelay, ConnectedNetwork, ResponseChannel, Topic},
node_implementation::NodeType,
},
BoxSyncFuture,
Expand Down Expand Up @@ -372,24 +372,24 @@ impl<TYPES: NodeType> ConnectedNetwork<TYPES::SignatureKey> for CombinedNetworks
async fn broadcast_message(
&self,
message: Vec<u8>,
recipients: BTreeSet<TYPES::SignatureKey>,
topic: Topic,
broadcast_delay: BroadcastDelay,
) -> Result<(), NetworkError> {
let primary = self.primary().clone();
let secondary = self.secondary().clone();
let primary_message = message.clone();
let secondary_message = message.clone();
let primary_recipients = recipients.clone();
let topic_clone = topic.clone();
self.send_both_networks(
message,
async move {
primary
.broadcast_message(primary_message, primary_recipients, BroadcastDelay::None)
.broadcast_message(primary_message, topic_clone, BroadcastDelay::None)
.await
},
async move {
secondary
.broadcast_message(secondary_message, recipients, BroadcastDelay::None)
.broadcast_message(secondary_message, topic, BroadcastDelay::None)
.await
},
broadcast_delay,
Expand Down
54 changes: 16 additions & 38 deletions crates/hotshot/src/traits/networking/libp2p_network.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ use hotshot_types::{
traits::{
election::Membership,
metrics::{Counter, Gauge, Metrics, NoMetrics},
network::{self, ConnectedNetwork, NetworkError, ResponseMessage},
network::{self, ConnectedNetwork, NetworkError, ResponseMessage, Topic},
node_implementation::{ConsensusTime, NodeType},
signature_key::SignatureKey,
},
Expand All @@ -60,8 +60,8 @@ use libp2p_networking::{
behaviours::request_response::{Request, Response},
spawn_network_node, MeshParams,
NetworkEvent::{self, DirectRequest, DirectResponse, GossipMsg},
NetworkNodeConfig, NetworkNodeConfigBuilder, NetworkNodeHandle, NetworkNodeHandleError,
NetworkNodeReceiver, NetworkNodeType, DEFAULT_REPLICATION_FACTOR,
NetworkNodeConfig, NetworkNodeConfigBuilder, NetworkNodeHandle, NetworkNodeReceiver,
NetworkNodeType, DEFAULT_REPLICATION_FACTOR,
},
reexport::{Multiaddr, ResponseChannel},
};
Expand Down Expand Up @@ -166,10 +166,8 @@ struct Libp2pNetworkInner<K: SignatureKey + 'static> {
is_bootstrapped: Arc<AtomicBool>,
/// The Libp2p metrics we're managing
metrics: Libp2pMetricsValue,
/// topic map
/// hash(hashset) -> topic
/// btreemap ordered so is hashable
topic_map: RwLock<BiHashMap<BTreeSet<K>, String>>,
/// The list of topics we're subscribed to
subscribed_topics: HashSet<String>,
/// the latest view number (for node lookup purposes)
/// NOTE: supposed to represent a ViewNumber but we
/// haven't made that atomic yet and we prefer lock-free
Expand Down Expand Up @@ -298,7 +296,6 @@ impl<TYPES: NodeType> TestableNetworkingImplementation<TYPES>
.unwrap()
};
let bootstrap_addrs_ref = Arc::clone(&bootstrap_addrs);
let keys = all_keys.clone();
let da = da_keys.clone();
let reliability_config_dup = reliability_config.clone();
Box::pin(async move {
Expand All @@ -309,10 +306,8 @@ impl<TYPES: NodeType> TestableNetworkingImplementation<TYPES>
pubkey.clone(),
bootstrap_addrs_ref,
usize::try_from(node_id).unwrap(),
keys,
#[cfg(feature = "hotshot-testing")]
reliability_config_dup,
da.clone(),
da.contains(&pubkey),
)
.await
Expand Down Expand Up @@ -453,12 +448,8 @@ impl<K: SignatureKey + 'static> Libp2pNetwork<K> {
pub_key.clone(),
Arc::new(RwLock::new(bootstrap_nodes)),
usize::try_from(config.node_index)?,
// NOTE: this introduces an invariant that the keys are assigned using this indexed
// function
all_keys,
#[cfg(feature = "hotshot-testing")]
None,
da_keys.clone(),
da_keys.contains(pub_key),
)
.await?)
Expand Down Expand Up @@ -499,10 +490,7 @@ impl<K: SignatureKey + 'static> Libp2pNetwork<K> {
pk: K,
bootstrap_addrs: BootstrapAddrs,
id: usize,
// HACK
quorum_public_keys: BTreeSet<K>,
#[cfg(feature = "hotshot-testing")] reliability_config: Option<Box<dyn NetworkReliability>>,
da_public_keys: BTreeSet<K>,
is_da: bool,
) -> Result<Libp2pNetwork<K>, NetworkError> {
// Error if there were no bootstrap nodes specified
Expand All @@ -528,11 +516,11 @@ impl<K: SignatureKey + 'static> Libp2pNetwork<K> {
let mut pubkey_pid_map = BiHashMap::new();
pubkey_pid_map.insert(pk.clone(), network_handle.peer_id());

let mut topic_map = BiHashMap::new();
topic_map.insert(quorum_public_keys, QC_TOPIC.to_string());
topic_map.insert(da_public_keys, "DA".to_string());

let topic_map = RwLock::new(topic_map);
// Subscribe to the relevant topics
let mut subscribed_topics = HashSet::from_iter(vec![QC_TOPIC.to_string()]);
if is_da {
subscribed_topics.insert("DA".to_string());
}

// unbounded channels may not be the best choice (spammed?)
// if bounded figure out a way to log dropped msgs
Expand All @@ -556,7 +544,7 @@ impl<K: SignatureKey + 'static> Libp2pNetwork<K> {
dht_timeout: Duration::from_secs(120),
is_bootstrapped: Arc::new(AtomicBool::new(false)),
metrics,
topic_map,
subscribed_topics,
node_lookup_send,
// Start the latest view from 0. "Latest" refers to "most recent view we are polling for
// proposals on". We need this because to have consensus info injected we need a working
Expand Down Expand Up @@ -922,7 +910,7 @@ impl<K: SignatureKey + 'static> ConnectedNetwork<K> for Libp2pNetwork<K> {
async fn broadcast_message(
&self,
message: Vec<u8>,
recipients: BTreeSet<K>,
topic: Topic,
_broadcast_delay: BroadcastDelay,
) -> Result<(), NetworkError> {
// If we're not ready, return an error
Expand All @@ -931,20 +919,10 @@ impl<K: SignatureKey + 'static> ConnectedNetwork<K> for Libp2pNetwork<K> {
return Err(NetworkError::NotReady);
};

let topic_map = self.inner.topic_map.read().await;
let topic = topic_map
.get_by_left(&recipients)
.ok_or_else(|| {
self.inner.metrics.num_failed_messages.add(1);
NetworkError::Libp2p {
source: Box::new(NetworkNodeHandleError::NoSuchTopic),
}
})?
.clone();

// gossip doesn't broadcast from itself, so special case
if recipients.contains(&self.inner.pk) {
// send to self
// If we are subscribed to the topic,
let topic = topic.to_string();
if self.inner.subscribed_topics.contains(&topic) {
// Short-circuit-send the message to ourselves
self.inner.sender.send(message.clone()).await.map_err(|_| {
self.inner.metrics.num_failed_messages.add(1);
NetworkError::ShutDown
Expand Down
Loading