Skip to content

Commit

Permalink
revert topic refactor to fix merge (#3503)
Browse files Browse the repository at this point in the history
  • Loading branch information
rob-maron authored Jul 29, 2024
1 parent 8ff7df8 commit bebcf73
Show file tree
Hide file tree
Showing 15 changed files with 88 additions and 164 deletions.
12 changes: 5 additions & 7 deletions crates/examples/infra/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,8 @@ use futures::StreamExt;
use hotshot::{
traits::{
implementations::{
derive_libp2p_peer_id, CdnMetricsValue, CdnTopic, CombinedNetworks, Libp2pMetricsValue,
Libp2pNetwork, PushCdnNetwork, WrappedSignatureKey,
derive_libp2p_peer_id, CdnMetricsValue, CombinedNetworks, Libp2pMetricsValue,
Libp2pNetwork, PushCdnNetwork, Topic, WrappedSignatureKey,
},
BlockPayload, NodeImplementation,
},
Expand Down Expand Up @@ -52,7 +52,7 @@ use hotshot_types::{
traits::{
block_contents::{BlockHeader, TestableBlock},
election::Membership,
network::{ConnectedNetwork, Topic},
network::ConnectedNetwork,
node_implementation::{ConsensusTime, NodeType},
states::TestableState,
},
Expand Down Expand Up @@ -376,7 +376,6 @@ pub trait RunDa<
let quorum_membership = <TYPES as NodeType>::Membership::create_election(
known_nodes_with_stake.clone(),
known_nodes_with_stake.clone(),
Topic::Global,
config.config.fixed_leader_for_gpuvid,
);

Expand All @@ -385,7 +384,6 @@ pub trait RunDa<
let da_membership = <TYPES as NodeType>::Membership::create_election(
known_nodes_with_stake.clone(),
config.config.known_da_nodes.clone(),
Topic::Da,
config.config.fixed_leader_for_gpuvid,
);

Expand Down Expand Up @@ -627,9 +625,9 @@ where
};

// See if we should be DA, subscribe to the DA topic if so
let mut topics = vec![CdnTopic::Global];
let mut topics = vec![Topic::Global];
if config.config.my_own_validator_config.is_da {
topics.push(CdnTopic::Da);
topics.push(Topic::Da);
}

// Create the network and await the initial connection
Expand Down
2 changes: 1 addition & 1 deletion crates/hotshot/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -432,7 +432,7 @@ impl<TYPES: NodeType, I: NodeImplementation<TYPES>> SystemContext<TYPES, I> {
api
.network.broadcast_message(
serialized_message,
da_membership.committee_topic(),
da_membership.whole_committee(view_number),
BroadcastDelay::None,
),
api
Expand Down
2 changes: 1 addition & 1 deletion crates/hotshot/src/traits.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ pub mod implementations {
},
memory_network::{MasterMap, MemoryNetwork},
push_cdn_network::{
CdnMetricsValue, KeyPair, ProductionDef, PushCdnNetwork, TestingDef, Topic as CdnTopic,
CdnMetricsValue, KeyPair, ProductionDef, PushCdnNetwork, TestingDef, Topic,
WrappedSignatureKey,
},
};
Expand Down
13 changes: 0 additions & 13 deletions crates/hotshot/src/traits/election/static_committee.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
use std::{marker::PhantomData, num::NonZeroU64};

use ethereum_types::U256;
use hotshot_types::traits::network::Topic;
// use ark_bls12_381::Parameters as Param381;
use hotshot_types::traits::signature_key::StakeTableEntryType;
use hotshot_types::{
Expand All @@ -27,9 +26,6 @@ pub struct GeneralStaticCommittee<T, PUBKEY: SignatureKey> {
fixed_leader_for_gpuvid: usize,
/// Node type phantom
_type_phantom: PhantomData<T>,

/// The network topic of the committee
committee_topic: Topic,
}

/// static committee using a vrf kp
Expand All @@ -43,15 +39,13 @@ impl<T, PUBKEY: SignatureKey> GeneralStaticCommittee<T, PUBKEY> {
nodes_with_stake: Vec<PUBKEY::StakeTableEntry>,
nodes_without_stake: Vec<PUBKEY>,
fixed_leader_for_gpuvid: usize,
committee_topic: Topic,
) -> Self {
Self {
all_nodes_with_stake: nodes_with_stake.clone(),
committee_nodes_with_stake: nodes_with_stake,
committee_nodes_without_stake: nodes_without_stake,
fixed_leader_for_gpuvid,
_type_phantom: PhantomData,
committee_topic,
}
}
}
Expand All @@ -66,11 +60,6 @@ where
self.committee_nodes_with_stake.clone()
}

/// Get the network topic for the committee
fn committee_topic(&self) -> Topic {
self.committee_topic.clone()
}

#[cfg(not(any(
feature = "randomized-leader-election",
feature = "fixed-leader-election"
Expand Down Expand Up @@ -126,7 +115,6 @@ where
fn create_election(
mut all_nodes: Vec<PeerConfig<PUBKEY>>,
committee_members: Vec<PeerConfig<PUBKEY>>,
committee_topic: Topic,
fixed_leader_for_gpuvid: usize,
) -> Self {
let mut committee_nodes_with_stake = Vec::new();
Expand Down Expand Up @@ -163,7 +151,6 @@ where
committee_nodes_without_stake,
fixed_leader_for_gpuvid,
_type_phantom: PhantomData,
committee_topic,
}
}

Expand Down
10 changes: 5 additions & 5 deletions crates/hotshot/src/traits/networking/combined_network.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ use hotshot_types::{
},
data::ViewNumber,
traits::{
network::{BroadcastDelay, ConnectedNetwork, ResponseChannel, Topic},
network::{BroadcastDelay, ConnectedNetwork, ResponseChannel},
node_implementation::NodeType,
},
BoxSyncFuture,
Expand Down Expand Up @@ -372,24 +372,24 @@ impl<TYPES: NodeType> ConnectedNetwork<TYPES::SignatureKey> for CombinedNetworks
async fn broadcast_message(
&self,
message: Vec<u8>,
topic: Topic,
recipients: BTreeSet<TYPES::SignatureKey>,
broadcast_delay: BroadcastDelay,
) -> Result<(), NetworkError> {
let primary = self.primary().clone();
let secondary = self.secondary().clone();
let primary_message = message.clone();
let secondary_message = message.clone();
let topic_clone = topic.clone();
let primary_recipients = recipients.clone();
self.send_both_networks(
message,
async move {
primary
.broadcast_message(primary_message, topic_clone, BroadcastDelay::None)
.broadcast_message(primary_message, primary_recipients, BroadcastDelay::None)
.await
},
async move {
secondary
.broadcast_message(secondary_message, topic, BroadcastDelay::None)
.broadcast_message(secondary_message, recipients, BroadcastDelay::None)
.await
},
broadcast_delay,
Expand Down
54 changes: 38 additions & 16 deletions crates/hotshot/src/traits/networking/libp2p_network.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ use hotshot_types::{
traits::{
election::Membership,
metrics::{Counter, Gauge, Metrics, NoMetrics},
network::{self, ConnectedNetwork, NetworkError, ResponseMessage, Topic},
network::{self, ConnectedNetwork, NetworkError, ResponseMessage},
node_implementation::{ConsensusTime, NodeType},
signature_key::SignatureKey,
},
Expand All @@ -60,8 +60,8 @@ use libp2p_networking::{
behaviours::request_response::{Request, Response},
spawn_network_node, MeshParams,
NetworkEvent::{self, DirectRequest, DirectResponse, GossipMsg},
NetworkNodeConfig, NetworkNodeConfigBuilder, NetworkNodeHandle, NetworkNodeReceiver,
NetworkNodeType, DEFAULT_REPLICATION_FACTOR,
NetworkNodeConfig, NetworkNodeConfigBuilder, NetworkNodeHandle, NetworkNodeHandleError,
NetworkNodeReceiver, NetworkNodeType, DEFAULT_REPLICATION_FACTOR,
},
reexport::{Multiaddr, ResponseChannel},
};
Expand Down Expand Up @@ -166,8 +166,10 @@ struct Libp2pNetworkInner<K: SignatureKey + 'static> {
is_bootstrapped: Arc<AtomicBool>,
/// The Libp2p metrics we're managing
metrics: Libp2pMetricsValue,
/// The list of topics we're subscribed to
subscribed_topics: HashSet<String>,
/// topic map
/// hash(hashset) -> topic
/// btreemap ordered so is hashable
topic_map: RwLock<BiHashMap<BTreeSet<K>, String>>,
/// the latest view number (for node lookup purposes)
/// NOTE: supposed to represent a ViewNumber but we
/// haven't made that atomic yet and we prefer lock-free
Expand Down Expand Up @@ -296,6 +298,7 @@ impl<TYPES: NodeType> TestableNetworkingImplementation<TYPES>
.unwrap()
};
let bootstrap_addrs_ref = Arc::clone(&bootstrap_addrs);
let keys = all_keys.clone();
let da = da_keys.clone();
let reliability_config_dup = reliability_config.clone();
Box::pin(async move {
Expand All @@ -306,8 +309,10 @@ impl<TYPES: NodeType> TestableNetworkingImplementation<TYPES>
pubkey.clone(),
bootstrap_addrs_ref,
usize::try_from(node_id).unwrap(),
keys,
#[cfg(feature = "hotshot-testing")]
reliability_config_dup,
da.clone(),
da.contains(&pubkey),
)
.await
Expand Down Expand Up @@ -448,8 +453,12 @@ impl<K: SignatureKey + 'static> Libp2pNetwork<K> {
pub_key.clone(),
Arc::new(RwLock::new(bootstrap_nodes)),
usize::try_from(config.node_index)?,
// NOTE: this introduces an invariant that the keys are assigned using this indexed
// function
all_keys,
#[cfg(feature = "hotshot-testing")]
None,
da_keys.clone(),
da_keys.contains(pub_key),
)
.await?)
Expand Down Expand Up @@ -490,7 +499,10 @@ impl<K: SignatureKey + 'static> Libp2pNetwork<K> {
pk: K,
bootstrap_addrs: BootstrapAddrs,
id: usize,
// HACK
quorum_public_keys: BTreeSet<K>,
#[cfg(feature = "hotshot-testing")] reliability_config: Option<Box<dyn NetworkReliability>>,
da_public_keys: BTreeSet<K>,
is_da: bool,
) -> Result<Libp2pNetwork<K>, NetworkError> {
// Error if there were no bootstrap nodes specified
Expand All @@ -516,11 +528,11 @@ impl<K: SignatureKey + 'static> Libp2pNetwork<K> {
let mut pubkey_pid_map = BiHashMap::new();
pubkey_pid_map.insert(pk.clone(), network_handle.peer_id());

// Subscribe to the relevant topics
let mut subscribed_topics = HashSet::from_iter(vec![QC_TOPIC.to_string()]);
if is_da {
subscribed_topics.insert("DA".to_string());
}
let mut topic_map = BiHashMap::new();
topic_map.insert(quorum_public_keys, QC_TOPIC.to_string());
topic_map.insert(da_public_keys, "DA".to_string());

let topic_map = RwLock::new(topic_map);

// unbounded channels may not be the best choice (spammed?)
// if bounded figure out a way to log dropped msgs
Expand All @@ -544,7 +556,7 @@ impl<K: SignatureKey + 'static> Libp2pNetwork<K> {
dht_timeout: Duration::from_secs(120),
is_bootstrapped: Arc::new(AtomicBool::new(false)),
metrics,
subscribed_topics,
topic_map,
node_lookup_send,
// Start the latest view from 0. "Latest" refers to "most recent view we are polling for
// proposals on". We need this because to have consensus info injected we need a working
Expand Down Expand Up @@ -910,7 +922,7 @@ impl<K: SignatureKey + 'static> ConnectedNetwork<K> for Libp2pNetwork<K> {
async fn broadcast_message(
&self,
message: Vec<u8>,
topic: Topic,
recipients: BTreeSet<K>,
_broadcast_delay: BroadcastDelay,
) -> Result<(), NetworkError> {
// If we're not ready, return an error
Expand All @@ -919,10 +931,20 @@ impl<K: SignatureKey + 'static> ConnectedNetwork<K> for Libp2pNetwork<K> {
return Err(NetworkError::NotReady);
};

// If we are subscribed to the topic,
let topic = topic.to_string();
if self.inner.subscribed_topics.contains(&topic) {
// Short-circuit-send the message to ourselves
let topic_map = self.inner.topic_map.read().await;
let topic = topic_map
.get_by_left(&recipients)
.ok_or_else(|| {
self.inner.metrics.num_failed_messages.add(1);
NetworkError::Libp2p {
source: Box::new(NetworkNodeHandleError::NoSuchTopic),
}
})?
.clone();

// gossip doesn't broadcast from itself, so special case
if recipients.contains(&self.inner.pk) {
// send to self
self.inner.sender.send(message.clone()).await.map_err(|_| {
self.inner.metrics.num_failed_messages.add(1);
NetworkError::ShutDown
Expand Down
Loading

0 comments on commit bebcf73

Please sign in to comment.