From 270fb1e576ae2199c8b1803ec91a9a749ded6779 Mon Sep 17 00:00:00 2001 From: Dmitry Markin Date: Tue, 30 Aug 2022 19:50:22 +0300 Subject: [PATCH] Change validation & collation protocol names to include genesis hash & fork id (#5876) --- .../approval-distribution/src/tests.rs | 4 +- .../bitfield-distribution/src/tests.rs | 10 +- node/network/bridge/src/lib.rs | 3 +- node/network/bridge/src/metrics.rs | 4 +- node/network/bridge/src/network.rs | 32 +- node/network/bridge/src/rx/mod.rs | 94 +++- node/network/bridge/src/rx/tests.rs | 91 +++- node/network/bridge/src/tx/mod.rs | 52 +- node/network/bridge/src/tx/tests.rs | 40 +- .../network/bridge/src/validator_discovery.rs | 32 +- .../src/collator_side/tests.rs | 3 +- .../src/validator_side/tests.rs | 9 +- node/network/protocol/src/lib.rs | 2 - node/network/protocol/src/peer_set.rs | 457 +++++++++++++++--- .../network/statement-distribution/src/lib.rs | 2 +- .../statement-distribution/src/tests.rs | 39 +- node/service/src/lib.rs | 13 +- node/service/src/overseer.rs | 10 +- .../src/messages/network_bridge_event.rs | 3 +- 19 files changed, 731 insertions(+), 169 deletions(-) diff --git a/node/network/approval-distribution/src/tests.rs b/node/network/approval-distribution/src/tests.rs index bac808603593..b3d44bfe8c1e 100644 --- a/node/network/approval-distribution/src/tests.rs +++ b/node/network/approval-distribution/src/tests.rs @@ -17,7 +17,7 @@ use super::*; use assert_matches::assert_matches; use futures::{executor, future, Future}; -use polkadot_node_network_protocol::{our_view, view, ObservedRole}; +use polkadot_node_network_protocol::{our_view, peer_set::ValidationVersion, view, ObservedRole}; use polkadot_node_primitives::approval::{ AssignmentCertKind, VRFOutput, VRFProof, RELAY_VRF_MODULO_CONTEXT, }; @@ -174,7 +174,7 @@ async fn setup_peer_with_view( ApprovalDistributionMessage::NetworkBridgeUpdate(NetworkBridgeEvent::PeerConnected( peer_id.clone(), ObservedRole::Full, - 1, + ValidationVersion::V1.into(), None, )), ) diff --git a/node/network/bitfield-distribution/src/tests.rs b/node/network/bitfield-distribution/src/tests.rs index 6509db3ba660..f3894d61c5f9 100644 --- a/node/network/bitfield-distribution/src/tests.rs +++ b/node/network/bitfield-distribution/src/tests.rs @@ -20,7 +20,8 @@ use bitvec::bitvec; use futures::executor; use maplit::hashmap; use polkadot_node_network_protocol::{ - grid_topology::SessionBoundGridTopologyStorage, our_view, view, ObservedRole, + grid_topology::SessionBoundGridTopologyStorage, our_view, peer_set::ValidationVersion, view, + ObservedRole, }; use polkadot_node_subsystem::{ jaeger, @@ -568,7 +569,12 @@ fn changing_view() { &mut ctx, &mut state, &Default::default(), - NetworkBridgeEvent::PeerConnected(peer_b.clone(), ObservedRole::Full, 1, None), + NetworkBridgeEvent::PeerConnected( + peer_b.clone(), + ObservedRole::Full, + ValidationVersion::V1.into(), + None + ), &mut rng, )); diff --git a/node/network/bridge/src/lib.rs b/node/network/bridge/src/lib.rs index d599ae25d9c1..77d106d25f7b 100644 --- a/node/network/bridge/src/lib.rs +++ b/node/network/bridge/src/lib.rs @@ -28,7 +28,8 @@ use parking_lot::Mutex; use sp_consensus::SyncOracle; use polkadot_node_network_protocol::{ - peer_set::PeerSet, PeerId, ProtocolVersion, UnifiedReputationChange as Rep, View, + peer_set::{PeerSet, ProtocolVersion}, + PeerId, UnifiedReputationChange as Rep, View, }; /// Peer set info for network initialization. diff --git a/node/network/bridge/src/metrics.rs b/node/network/bridge/src/metrics.rs index 0224c4960ac9..52b3629a66ad 100644 --- a/node/network/bridge/src/metrics.rs +++ b/node/network/bridge/src/metrics.rs @@ -23,7 +23,7 @@ pub struct Metrics(pub(crate) Option); fn peer_set_label(peer_set: PeerSet, version: ProtocolVersion) -> &'static str { // Higher level code is meant to protect against this ever happening. - peer_set.get_protocol_name_static(version).unwrap_or("") + peer_set.get_protocol_label(version).unwrap_or("") } #[allow(missing_docs)] @@ -98,7 +98,7 @@ impl Metrics { self.0.as_ref().map(|metrics| { metrics .desired_peer_count - .with_label_values(&[peer_set.get_default_protocol_name()]) + .with_label_values(&[peer_set.get_label()]) .set(size as u64) }); } diff --git a/node/network/bridge/src/network.rs b/node/network/bridge/src/network.rs index e80094588e33..da240262cd2c 100644 --- a/node/network/bridge/src/network.rs +++ b/node/network/bridge/src/network.rs @@ -31,9 +31,9 @@ use sc_network_common::{ }; use polkadot_node_network_protocol::{ - peer_set::PeerSet, + peer_set::{PeerSet, PeerSetProtocolNames, ProtocolVersion}, request_response::{OutgoingRequest, Recipient, ReqProtocolNames, Requests}, - PeerId, ProtocolVersion, UnifiedReputationChange as Rep, + PeerId, UnifiedReputationChange as Rep, }; use polkadot_primitives::v2::{AuthorityDiscoveryId, Block, Hash}; @@ -52,6 +52,7 @@ pub(crate) fn send_message( mut peers: Vec, peer_set: PeerSet, version: ProtocolVersion, + protocol_names: &PeerSetProtocolNames, message: M, metrics: &super::Metrics, ) where @@ -67,11 +68,13 @@ pub(crate) fn send_message( // list. The message payload can be quite large. If the underlying // network used `Bytes` this would not be necessary. let last_peer = peers.pop(); + // optimization: generate the protocol name once. + let protocol_name = protocol_names.get_name(peer_set, version); peers.into_iter().for_each(|peer| { - net.write_notification(peer, peer_set, message.clone()); + net.write_notification(peer, protocol_name.clone(), message.clone()); }); if let Some(peer) = last_peer { - net.write_notification(peer, peer_set, message); + net.write_notification(peer, protocol_name, message); } } @@ -108,11 +111,11 @@ pub trait Network: Clone + Send + 'static { /// Report a given peer as either beneficial (+) or costly (-) according to the given scalar. fn report_peer(&self, who: PeerId, cost_benefit: Rep); - /// Disconnect a given peer from the peer set specified without harming reputation. - fn disconnect_peer(&self, who: PeerId, peer_set: PeerSet); + /// Disconnect a given peer from the protocol specified without harming reputation. + fn disconnect_peer(&self, who: PeerId, protocol: Cow<'static, str>); - /// Write a notification to a peer on the given peer-set's protocol. - fn write_notification(&self, who: PeerId, peer_set: PeerSet, message: Vec); + /// Write a notification to a peer on the given protocol. + fn write_notification(&self, who: PeerId, protocol: Cow<'static, str>, message: Vec); } #[async_trait] @@ -137,17 +140,12 @@ impl Network for Arc> { NetworkService::report_peer(&**self, who, cost_benefit.into_base_rep()); } - fn disconnect_peer(&self, who: PeerId, peer_set: PeerSet) { - NetworkService::disconnect_peer(&**self, who, peer_set.into_default_protocol_name()); + fn disconnect_peer(&self, who: PeerId, protocol: Cow<'static, str>) { + NetworkService::disconnect_peer(&**self, who, protocol); } - fn write_notification(&self, who: PeerId, peer_set: PeerSet, message: Vec) { - NetworkService::write_notification( - &**self, - who, - peer_set.into_default_protocol_name(), - message, - ); + fn write_notification(&self, who: PeerId, protocol: Cow<'static, str>, message: Vec) { + NetworkService::write_notification(&**self, who, protocol, message); } async fn start_request( diff --git a/node/network/bridge/src/rx/mod.rs b/node/network/bridge/src/rx/mod.rs index f135b006f114..b93024b43dfb 100644 --- a/node/network/bridge/src/rx/mod.rs +++ b/node/network/bridge/src/rx/mod.rs @@ -27,9 +27,11 @@ use sp_consensus::SyncOracle; use polkadot_node_network_protocol::{ self as net_protocol, - peer_set::{PeerSet, PerPeerSet}, - v1 as protocol_v1, ObservedRole, OurView, PeerId, ProtocolVersion, - UnifiedReputationChange as Rep, View, + peer_set::{ + CollationVersion, PeerSet, PeerSetProtocolNames, PerPeerSet, ProtocolVersion, + ValidationVersion, + }, + v1 as protocol_v1, ObservedRole, OurView, PeerId, UnifiedReputationChange as Rep, View, }; use polkadot_node_subsystem::{ @@ -80,6 +82,7 @@ pub struct NetworkBridgeRx { sync_oracle: Box, shared: Shared, metrics: Metrics, + peerset_protocol_names: PeerSetProtocolNames, } impl NetworkBridgeRx { @@ -92,9 +95,17 @@ impl NetworkBridgeRx { authority_discovery_service: AD, sync_oracle: Box, metrics: Metrics, + peerset_protocol_names: PeerSetProtocolNames, ) -> Self { let shared = Shared::default(); - Self { network_service, authority_discovery_service, sync_oracle, shared, metrics } + Self { + network_service, + authority_discovery_service, + sync_oracle, + shared, + metrics, + peerset_protocol_names, + } } } @@ -147,6 +158,7 @@ async fn handle_network_messages( mut authority_discovery_service: AD, metrics: Metrics, shared: Shared, + peerset_protocol_names: PeerSetProtocolNames, ) -> Result<(), Error> where AD: validator_discovery::AuthorityDiscovery + Send, @@ -166,13 +178,14 @@ where }) => { let role = ObservedRole::from(role); let (peer_set, version) = { - let (peer_set, version) = match PeerSet::try_from_protocol_name(&protocol) { - None => continue, - Some(p) => p, - }; + let (peer_set, version) = + match peerset_protocol_names.try_get_protocol(&protocol) { + None => continue, + Some(p) => p, + }; if let Some(fallback) = negotiated_fallback { - match PeerSet::try_from_protocol_name(&fallback) { + match peerset_protocol_names.try_get_protocol(&fallback) { None => { gum::debug!( target: LOG_TARGET, @@ -210,7 +223,7 @@ where target: LOG_TARGET, action = "PeerConnected", peer_set = ?peer_set, - version, + version = %version, peer = ?peer, role = ?role ); @@ -245,7 +258,7 @@ where NetworkBridgeEvent::PeerConnected( peer.clone(), role, - 1, + version, maybe_authority, ), NetworkBridgeEvent::PeerViewChange(peer.clone(), View::default()), @@ -259,6 +272,7 @@ where vec![peer], PeerSet::Validation, version, + &peerset_protocol_names, WireMessage::::ViewUpdate(local_view), &metrics, ); @@ -269,7 +283,7 @@ where NetworkBridgeEvent::PeerConnected( peer.clone(), role, - 1, + version, maybe_authority, ), NetworkBridgeEvent::PeerViewChange(peer.clone(), View::default()), @@ -283,6 +297,7 @@ where vec![peer], PeerSet::Collation, version, + &peerset_protocol_names, WireMessage::::ViewUpdate(local_view), &metrics, ); @@ -290,7 +305,7 @@ where } }, Some(NetworkEvent::NotificationStreamClosed { remote: peer, protocol }) => { - let (peer_set, version) = match PeerSet::try_from_protocol_name(&protocol) { + let (peer_set, version) = match peerset_protocol_names.try_get_protocol(&protocol) { None => continue, Some(peer_set) => peer_set, }; @@ -317,7 +332,7 @@ where w }; - if was_connected && version == peer_set.get_default_version() { + if was_connected && version == peer_set.get_main_version() { match peer_set { PeerSet::Validation => dispatch_validation_event_to_all( @@ -355,7 +370,8 @@ where .filter_map(|(protocol, msg_bytes)| { // version doesn't matter because we always receive on the 'correct' // protocol name, not the negotiated fallback. - let (peer_set, _version) = PeerSet::try_from_protocol_name(protocol)?; + let (peer_set, _version) = + peerset_protocol_names.try_get_protocol(protocol)?; if peer_set == PeerSet::Validation { if expected_versions[PeerSet::Validation].is_none() { return Some(Err(UNCONNECTED_PEERSET_COST)) @@ -384,7 +400,8 @@ where .filter_map(|(protocol, msg_bytes)| { // version doesn't matter because we always receive on the 'correct' // protocol name, not the negotiated fallback. - let (peer_set, _version) = PeerSet::try_from_protocol_name(protocol)?; + let (peer_set, _version) = + peerset_protocol_names.try_get_protocol(protocol)?; if peer_set == PeerSet::Collation { if expected_versions[PeerSet::Collation].is_none() { @@ -422,7 +439,9 @@ where if !v_messages.is_empty() { let (events, reports) = - if expected_versions[PeerSet::Validation] == Some(1) { + if expected_versions[PeerSet::Validation] == + Some(ValidationVersion::V1.into()) + { handle_v1_peer_messages::( remote.clone(), PeerSet::Validation, @@ -453,7 +472,9 @@ where if !c_messages.is_empty() { let (events, reports) = - if expected_versions[PeerSet::Collation] == Some(1) { + if expected_versions[PeerSet::Collation] == + Some(CollationVersion::V1.into()) + { handle_v1_peer_messages::( remote.clone(), PeerSet::Collation, @@ -494,6 +515,7 @@ async fn run_incoming_orchestra_signals( shared: Shared, sync_oracle: Box, metrics: Metrics, + peerset_protocol_names: PeerSetProtocolNames, ) -> Result<(), Error> where N: Network, @@ -574,6 +596,7 @@ where &shared, finalized_number, &metrics, + &peerset_protocol_names, ); } } @@ -619,6 +642,7 @@ where metrics, sync_oracle, shared, + peerset_protocol_names, } = bridge; let (task, network_event_handler) = handle_network_messages( @@ -628,6 +652,7 @@ where authority_discovery_service.clone(), metrics.clone(), shared.clone(), + peerset_protocol_names.clone(), ) .remote_handle(); @@ -641,6 +666,7 @@ where shared, sync_oracle, metrics, + peerset_protocol_names, ); futures::pin_mut!(orchestra_signal_handler); @@ -667,6 +693,7 @@ fn update_our_view( shared: &Shared, finalized_number: BlockNumber, metrics: &Metrics, + peerset_protocol_names: &PeerSetProtocolNames, ) where Net: Network, { @@ -700,11 +727,18 @@ fn update_our_view( send_validation_message_v1( net, validation_peers, + peerset_protocol_names, WireMessage::ViewUpdate(new_view.clone()), metrics, ); - send_collation_message_v1(net, collation_peers, WireMessage::ViewUpdate(new_view), metrics); + send_collation_message_v1( + net, + collation_peers, + peerset_protocol_names, + WireMessage::ViewUpdate(new_view), + metrics, + ); let our_view = OurView::new( live_heads.iter().take(MAX_VIEW_HEADS).cloned().map(|a| (a.hash, a.span)), @@ -778,19 +812,37 @@ fn handle_v1_peer_messages>( fn send_validation_message_v1( net: &mut impl Network, peers: Vec, + peerset_protocol_names: &PeerSetProtocolNames, message: WireMessage, metrics: &Metrics, ) { - send_message(net, peers, PeerSet::Validation, 1, message, metrics); + send_message( + net, + peers, + PeerSet::Validation, + ValidationVersion::V1.into(), + peerset_protocol_names, + message, + metrics, + ); } fn send_collation_message_v1( net: &mut impl Network, peers: Vec, + peerset_protocol_names: &PeerSetProtocolNames, message: WireMessage, metrics: &Metrics, ) { - send_message(net, peers, PeerSet::Collation, 1, message, metrics) + send_message( + net, + peers, + PeerSet::Collation, + CollationVersion::V1.into(), + peerset_protocol_names, + message, + metrics, + ); } async fn dispatch_validation_event_to_all( diff --git a/node/network/bridge/src/rx/tests.rs b/node/network/bridge/src/rx/tests.rs index ad1d8392d30c..8773f0ee1a46 100644 --- a/node/network/bridge/src/rx/tests.rs +++ b/node/network/bridge/src/rx/tests.rs @@ -31,6 +31,7 @@ use std::{ use sc_network::{Event as NetworkEvent, IfDisconnected}; use polkadot_node_network_protocol::{ + peer_set::PeerSetProtocolNames, request_response::{outgoing::Requests, ReqProtocolNames}, view, ObservedRole, Versioned, }; @@ -46,7 +47,7 @@ use polkadot_node_subsystem_test_helpers::{ SingleItemSink, SingleItemStream, TestSubsystemContextHandle, }; use polkadot_node_subsystem_util::metered; -use polkadot_primitives::v2::AuthorityDiscoveryId; +use polkadot_primitives::v2::{AuthorityDiscoveryId, Hash}; use sc_network::Multiaddr; use sp_keyring::Sr25519Keyring; @@ -68,6 +69,7 @@ pub enum NetworkAction { struct TestNetwork { net_events: Arc>>>, action_tx: Arc>>, + protocol_names: Arc, } #[derive(Clone, Debug)] @@ -78,9 +80,12 @@ struct TestAuthorityDiscovery; struct TestNetworkHandle { action_rx: metered::UnboundedMeteredReceiver, net_tx: SingleItemSink, + protocol_names: PeerSetProtocolNames, } -fn new_test_network() -> (TestNetwork, TestNetworkHandle, TestAuthorityDiscovery) { +fn new_test_network( + protocol_names: PeerSetProtocolNames, +) -> (TestNetwork, TestNetworkHandle, TestAuthorityDiscovery) { let (net_tx, net_rx) = polkadot_node_subsystem_test_helpers::single_item_sink(); let (action_tx, action_rx) = metered::unbounded(); @@ -88,8 +93,9 @@ fn new_test_network() -> (TestNetwork, TestNetworkHandle, TestAuthorityDiscovery TestNetwork { net_events: Arc::new(Mutex::new(Some(net_rx))), action_tx: Arc::new(Mutex::new(action_tx)), + protocol_names: Arc::new(protocol_names.clone()), }, - TestNetworkHandle { action_rx, net_tx }, + TestNetworkHandle { action_rx, net_tx, protocol_names }, TestAuthorityDiscovery, ) } @@ -130,14 +136,20 @@ impl Network for TestNetwork { .unwrap(); } - fn disconnect_peer(&self, who: PeerId, peer_set: PeerSet) { + fn disconnect_peer(&self, who: PeerId, protocol: Cow<'static, str>) { + let (peer_set, version) = self.protocol_names.try_get_protocol(&protocol).unwrap(); + assert_eq!(version, peer_set.get_main_version()); + self.action_tx .lock() .unbounded_send(NetworkAction::DisconnectPeer(who, peer_set)) .unwrap(); } - fn write_notification(&self, who: PeerId, peer_set: PeerSet, message: Vec) { + fn write_notification(&self, who: PeerId, protocol: Cow<'static, str>, message: Vec) { + let (peer_set, version) = self.protocol_names.try_get_protocol(&protocol).unwrap(); + assert_eq!(version, peer_set.get_main_version()); + self.action_tx .lock() .unbounded_send(NetworkAction::WriteNotification(who, peer_set, message)) @@ -181,7 +193,7 @@ impl TestNetworkHandle { async fn connect_peer(&mut self, peer: PeerId, peer_set: PeerSet, role: ObservedRole) { self.send_network_event(NetworkEvent::NotificationStreamOpened { remote: peer, - protocol: peer_set.into_default_protocol_name(), + protocol: self.protocol_names.get_main_name(peer_set), negotiated_fallback: None, role: role.into(), }) @@ -191,7 +203,7 @@ impl TestNetworkHandle { async fn disconnect_peer(&mut self, peer: PeerId, peer_set: PeerSet) { self.send_network_event(NetworkEvent::NotificationStreamClosed { remote: peer, - protocol: peer_set.into_default_protocol_name(), + protocol: self.protocol_names.get_main_name(peer_set), }) .await; } @@ -199,7 +211,7 @@ impl TestNetworkHandle { async fn peer_message(&mut self, peer: PeerId, peer_set: PeerSet, message: Vec) { self.send_network_event(NetworkEvent::NotificationsReceived { remote: peer, - messages: vec![(peer_set.into_default_protocol_name(), message.into())], + messages: vec![(self.protocol_names.get_main_name(peer_set), message.into())], }) .await; } @@ -285,8 +297,12 @@ fn test_harness>( sync_oracle: Box, test: impl FnOnce(TestHarness) -> T, ) { + let genesis_hash = Hash::repeat_byte(0xff); + let fork_id = None; + let peerset_protocol_names = PeerSetProtocolNames::new(genesis_hash, fork_id); + let pool = sp_core::testing::TaskExecutor::new(); - let (mut network, network_handle, discovery) = new_test_network(); + let (mut network, network_handle, discovery) = new_test_network(peerset_protocol_names.clone()); let (context, virtual_overseer) = polkadot_node_subsystem_test_helpers::make_subsystem_context(pool); let network_stream = network.event_stream(); @@ -297,6 +313,7 @@ fn test_harness>( metrics: Metrics(None), sync_oracle, shared: Shared::default(), + peerset_protocol_names, }; let network_bridge = run_network_in(bridge, context, network_stream) @@ -656,7 +673,12 @@ fn peer_view_updates_sent_via_overseer() { // bridge will inform about all connected peers. { assert_sends_validation_event_to_all( - NetworkBridgeEvent::PeerConnected(peer.clone(), ObservedRole::Full, 1, None), + NetworkBridgeEvent::PeerConnected( + peer.clone(), + ObservedRole::Full, + ValidationVersion::V1.into(), + None, + ), &mut virtual_overseer, ) .await; @@ -699,7 +721,12 @@ fn peer_messages_sent_via_overseer() { // bridge will inform about all connected peers. { assert_sends_validation_event_to_all( - NetworkBridgeEvent::PeerConnected(peer.clone(), ObservedRole::Full, 1, None), + NetworkBridgeEvent::PeerConnected( + peer.clone(), + ObservedRole::Full, + ValidationVersion::V1.into(), + None, + ), &mut virtual_overseer, ) .await; @@ -769,7 +796,12 @@ fn peer_disconnect_from_just_one_peerset() { // bridge will inform about all connected peers. { assert_sends_validation_event_to_all( - NetworkBridgeEvent::PeerConnected(peer.clone(), ObservedRole::Full, 1, None), + NetworkBridgeEvent::PeerConnected( + peer.clone(), + ObservedRole::Full, + ValidationVersion::V1.into(), + None, + ), &mut virtual_overseer, ) .await; @@ -783,7 +815,12 @@ fn peer_disconnect_from_just_one_peerset() { { assert_sends_collation_event_to_all( - NetworkBridgeEvent::PeerConnected(peer.clone(), ObservedRole::Full, 1, None), + NetworkBridgeEvent::PeerConnected( + peer.clone(), + ObservedRole::Full, + ValidationVersion::V1.into(), + None, + ), &mut virtual_overseer, ) .await; @@ -852,7 +889,12 @@ fn relays_collation_protocol_messages() { // bridge will inform about all connected peers. { assert_sends_validation_event_to_all( - NetworkBridgeEvent::PeerConnected(peer_a.clone(), ObservedRole::Full, 1, None), + NetworkBridgeEvent::PeerConnected( + peer_a.clone(), + ObservedRole::Full, + ValidationVersion::V1.into(), + None, + ), &mut virtual_overseer, ) .await; @@ -866,7 +908,12 @@ fn relays_collation_protocol_messages() { { assert_sends_collation_event_to_all( - NetworkBridgeEvent::PeerConnected(peer_b.clone(), ObservedRole::Full, 1, None), + NetworkBridgeEvent::PeerConnected( + peer_b.clone(), + ObservedRole::Full, + ValidationVersion::V1.into(), + None, + ), &mut virtual_overseer, ) .await; @@ -945,7 +992,12 @@ fn different_views_on_different_peer_sets() { // bridge will inform about all connected peers. { assert_sends_validation_event_to_all( - NetworkBridgeEvent::PeerConnected(peer.clone(), ObservedRole::Full, 1, None), + NetworkBridgeEvent::PeerConnected( + peer.clone(), + ObservedRole::Full, + ValidationVersion::V1.into(), + None, + ), &mut virtual_overseer, ) .await; @@ -959,7 +1011,12 @@ fn different_views_on_different_peer_sets() { { assert_sends_collation_event_to_all( - NetworkBridgeEvent::PeerConnected(peer.clone(), ObservedRole::Full, 1, None), + NetworkBridgeEvent::PeerConnected( + peer.clone(), + ObservedRole::Full, + ValidationVersion::V1.into(), + None, + ), &mut virtual_overseer, ) .await; diff --git a/node/network/bridge/src/tx/mod.rs b/node/network/bridge/src/tx/mod.rs index ac34cf315cec..47f095fdf273 100644 --- a/node/network/bridge/src/tx/mod.rs +++ b/node/network/bridge/src/tx/mod.rs @@ -18,7 +18,9 @@ use super::*; use polkadot_node_network_protocol::{ - peer_set::PeerSet, request_response::ReqProtocolNames, v1 as protocol_v1, PeerId, Versioned, + peer_set::{CollationVersion, PeerSet, PeerSetProtocolNames, ValidationVersion}, + request_response::ReqProtocolNames, + v1 as protocol_v1, PeerId, Versioned, }; use polkadot_node_subsystem::{ @@ -53,6 +55,7 @@ pub struct NetworkBridgeTx { authority_discovery_service: AD, metrics: Metrics, req_protocol_names: ReqProtocolNames, + peerset_protocol_names: PeerSetProtocolNames, } impl NetworkBridgeTx { @@ -65,8 +68,15 @@ impl NetworkBridgeTx { authority_discovery_service: AD, metrics: Metrics, req_protocol_names: ReqProtocolNames, + peerset_protocol_names: PeerSetProtocolNames, ) -> Self { - Self { network_service, authority_discovery_service, metrics, req_protocol_names } + Self { + network_service, + authority_discovery_service, + metrics, + req_protocol_names, + peerset_protocol_names, + } } } @@ -91,12 +101,14 @@ async fn handle_subsystem_messages( mut authority_discovery_service: AD, metrics: Metrics, req_protocol_names: ReqProtocolNames, + peerset_protocol_names: PeerSetProtocolNames, ) -> Result<(), Error> where N: Network, AD: validator_discovery::AuthorityDiscovery + Clone, { - let mut validator_discovery = validator_discovery::Service::::new(); + let mut validator_discovery = + validator_discovery::Service::::new(peerset_protocol_names.clone()); loop { match ctx.recv().fuse().await? { @@ -112,6 +124,7 @@ where msg, &metrics, &req_protocol_names, + &peerset_protocol_names, ) .await; }, @@ -128,6 +141,7 @@ async fn handle_incoming_subsystem_communication( msg: NetworkBridgeTxMessage, metrics: &Metrics, req_protocol_names: &ReqProtocolNames, + peerset_protocol_names: &PeerSetProtocolNames, ) -> (N, AD) where N: Network, @@ -150,7 +164,9 @@ where peer_set = ?peer_set, ); - network_service.disconnect_peer(peer, peer_set); + // [`NetworkService`] keeps track of the protocols by their main name. + let protocol = peerset_protocol_names.get_main_name(peer_set); + network_service.disconnect_peer(peer, protocol); }, NetworkBridgeTxMessage::SendValidationMessage(peers, msg) => { gum::trace!( @@ -163,6 +179,7 @@ where Versioned::V1(msg) => send_validation_message_v1( &mut network_service, peers, + peerset_protocol_names, WireMessage::ProtocolMessage(msg), &metrics, ), @@ -180,6 +197,7 @@ where Versioned::V1(msg) => send_validation_message_v1( &mut network_service, peers, + peerset_protocol_names, WireMessage::ProtocolMessage(msg), &metrics, ), @@ -197,6 +215,7 @@ where Versioned::V1(msg) => send_collation_message_v1( &mut network_service, peers, + peerset_protocol_names, WireMessage::ProtocolMessage(msg), &metrics, ), @@ -214,6 +233,7 @@ where Versioned::V1(msg) => send_collation_message_v1( &mut network_service, peers, + peerset_protocol_names, WireMessage::ProtocolMessage(msg), &metrics, ), @@ -296,6 +316,7 @@ where authority_discovery_service, metrics, req_protocol_names, + peerset_protocol_names, } = bridge; handle_subsystem_messages( @@ -304,6 +325,7 @@ where authority_discovery_service, metrics, req_protocol_names, + peerset_protocol_names, ) .await?; @@ -313,17 +335,35 @@ where fn send_validation_message_v1( net: &mut impl Network, peers: Vec, + protocol_names: &PeerSetProtocolNames, message: WireMessage, metrics: &Metrics, ) { - send_message(net, peers, PeerSet::Validation, 1, message, metrics); + send_message( + net, + peers, + PeerSet::Validation, + ValidationVersion::V1.into(), + protocol_names, + message, + metrics, + ); } fn send_collation_message_v1( net: &mut impl Network, peers: Vec, + protocol_names: &PeerSetProtocolNames, message: WireMessage, metrics: &Metrics, ) { - send_message(net, peers, PeerSet::Collation, 1, message, metrics) + send_message( + net, + peers, + PeerSet::Collation, + CollationVersion::V1.into(), + protocol_names, + message, + metrics, + ); } diff --git a/node/network/bridge/src/tx/tests.rs b/node/network/bridge/src/tx/tests.rs index d5b6d3ca67ab..b70c51105904 100644 --- a/node/network/bridge/src/tx/tests.rs +++ b/node/network/bridge/src/tx/tests.rs @@ -25,6 +25,7 @@ use std::{borrow::Cow, collections::HashSet}; use sc_network::{Event as NetworkEvent, IfDisconnected}; use polkadot_node_network_protocol::{ + peer_set::PeerSetProtocolNames, request_response::{outgoing::Requests, ReqProtocolNames}, ObservedRole, Versioned, }; @@ -55,6 +56,7 @@ pub enum NetworkAction { struct TestNetwork { net_events: Arc>>>, action_tx: Arc>>, + peerset_protocol_names: Arc, } #[derive(Clone, Debug)] @@ -65,9 +67,12 @@ struct TestAuthorityDiscovery; struct TestNetworkHandle { action_rx: metered::UnboundedMeteredReceiver, net_tx: metered::MeteredSender, + peerset_protocol_names: PeerSetProtocolNames, } -fn new_test_network() -> (TestNetwork, TestNetworkHandle, TestAuthorityDiscovery) { +fn new_test_network( + peerset_protocol_names: PeerSetProtocolNames, +) -> (TestNetwork, TestNetworkHandle, TestAuthorityDiscovery) { let (net_tx, net_rx) = metered::channel(10); let (action_tx, action_rx) = metered::unbounded(); @@ -75,8 +80,9 @@ fn new_test_network() -> (TestNetwork, TestNetworkHandle, TestAuthorityDiscovery TestNetwork { net_events: Arc::new(Mutex::new(Some(net_rx))), action_tx: Arc::new(Mutex::new(action_tx)), + peerset_protocol_names: Arc::new(peerset_protocol_names.clone()), }, - TestNetworkHandle { action_rx, net_tx }, + TestNetworkHandle { action_rx, net_tx, peerset_protocol_names }, TestAuthorityDiscovery, ) } @@ -117,14 +123,20 @@ impl Network for TestNetwork { .unwrap(); } - fn disconnect_peer(&self, who: PeerId, peer_set: PeerSet) { + fn disconnect_peer(&self, who: PeerId, protocol: Cow<'static, str>) { + let (peer_set, version) = self.peerset_protocol_names.try_get_protocol(&protocol).unwrap(); + assert_eq!(version, peer_set.get_main_version()); + self.action_tx .lock() .unbounded_send(NetworkAction::DisconnectPeer(who, peer_set)) .unwrap(); } - fn write_notification(&self, who: PeerId, peer_set: PeerSet, message: Vec) { + fn write_notification(&self, who: PeerId, protocol: Cow<'static, str>, message: Vec) { + let (peer_set, version) = self.peerset_protocol_names.try_get_protocol(&protocol).unwrap(); + assert_eq!(version, peer_set.get_main_version()); + self.action_tx .lock() .unbounded_send(NetworkAction::WriteNotification(who, peer_set, message)) @@ -158,7 +170,7 @@ impl TestNetworkHandle { async fn connect_peer(&mut self, peer: PeerId, peer_set: PeerSet, role: ObservedRole) { self.send_network_event(NetworkEvent::NotificationStreamOpened { remote: peer, - protocol: peer_set.into_default_protocol_name(), + protocol: self.peerset_protocol_names.get_main_name(peer_set), negotiated_fallback: None, role: role.into(), }) @@ -178,16 +190,24 @@ struct TestHarness { } fn test_harness>(test: impl FnOnce(TestHarness) -> T) { + let genesis_hash = Hash::repeat_byte(0xff); + let fork_id = None; + let req_protocol_names = ReqProtocolNames::new(genesis_hash, fork_id); + let peerset_protocol_names = PeerSetProtocolNames::new(genesis_hash, fork_id); + let pool = sp_core::testing::TaskExecutor::new(); - let (network, network_handle, discovery) = new_test_network(); + let (network, network_handle, discovery) = new_test_network(peerset_protocol_names.clone()); let (context, virtual_overseer) = polkadot_node_subsystem_test_helpers::make_subsystem_context(pool); - let genesis_hash = Hash::repeat_byte(0xff); - let protocol_names = ReqProtocolNames::new(genesis_hash, None); - - let bridge_out = NetworkBridgeTx::new(network, discovery, Metrics(None), protocol_names); + let bridge_out = NetworkBridgeTx::new( + network, + discovery, + Metrics(None), + req_protocol_names, + peerset_protocol_names, + ); let network_bridge_out_fut = run_network_out(bridge_out, context) .map_err(|e| panic!("bridge-out subsystem execution failed {:?}", e)) diff --git a/node/network/bridge/src/validator_discovery.rs b/node/network/bridge/src/validator_discovery.rs index 9c90200aa06a..068bfdd9bbe7 100644 --- a/node/network/bridge/src/validator_discovery.rs +++ b/node/network/bridge/src/validator_discovery.rs @@ -27,7 +27,7 @@ use sc_network::multiaddr::{self, Multiaddr}; pub use polkadot_node_network_protocol::authority_discovery::AuthorityDiscovery; use polkadot_node_network_protocol::{ - peer_set::{PeerSet, PerPeerSet}, + peer_set::{PeerSet, PeerSetProtocolNames, PerPeerSet}, PeerId, }; use polkadot_primitives::v2::AuthorityDiscoveryId; @@ -36,6 +36,7 @@ const LOG_TARGET: &str = "parachain::validator-discovery"; pub(super) struct Service { state: PerPeerSet, + peerset_protocol_names: PeerSetProtocolNames, // PhantomData used to make the struct generic instead of having generic methods _phantom: PhantomData<(N, AD)>, } @@ -46,8 +47,8 @@ struct StatePerPeerSet { } impl Service { - pub fn new() -> Self { - Self { state: Default::default(), _phantom: PhantomData } + pub fn new(peerset_protocol_names: PeerSetProtocolNames) -> Self { + Self { state: Default::default(), peerset_protocol_names, _phantom: PhantomData } } /// Connect to already resolved addresses. @@ -76,20 +77,26 @@ impl Service { // ask the network to connect to these nodes and not disconnect // from them until removed from the set // - // for peer-set management, the default should be used regardless of + // for peer-set management, the main protocol name should be used regardless of // the negotiated version. if let Err(e) = network_service - .set_reserved_peers(peer_set.into_default_protocol_name(), newly_requested) + .set_reserved_peers( + self.peerset_protocol_names.get_main_name(peer_set), + newly_requested, + ) .await { gum::warn!(target: LOG_TARGET, err = ?e, "AuthorityDiscoveryService returned an invalid multiaddress"); } // the addresses are known to be valid // - // for peer-set management, the default should be used regardless of + // for peer-set management, the main protocol name should be used regardless of // the negotiated version. let _ = network_service - .remove_from_peers_set(peer_set.into_default_protocol_name(), peers_to_remove) + .remove_from_peers_set( + self.peerset_protocol_names.get_main_name(peer_set), + peers_to_remove, + ) .await; network_service @@ -166,6 +173,7 @@ mod tests { request_response::{outgoing::Requests, ReqProtocolNames}, PeerId, }; + use polkadot_primitives::v2::Hash; use sc_network::{Event as NetworkEvent, IfDisconnected}; use sp_keyring::Sr25519Keyring; use std::{ @@ -174,7 +182,11 @@ mod tests { }; fn new_service() -> Service { - Service::new() + let genesis_hash = Hash::repeat_byte(0xff); + let fork_id = None; + let protocol_names = PeerSetProtocolNames::new(genesis_hash, fork_id); + + Service::new(protocol_names) } fn new_network() -> (TestNetwork, TestAuthorityDiscovery) { @@ -248,11 +260,11 @@ mod tests { panic!() } - fn disconnect_peer(&self, _: PeerId, _: PeerSet) { + fn disconnect_peer(&self, _: PeerId, _: Cow<'static, str>) { panic!() } - fn write_notification(&self, _: PeerId, _: PeerSet, _: Vec) { + fn write_notification(&self, _: PeerId, _: Cow<'static, str>, _: Vec) { panic!() } } diff --git a/node/network/collator-protocol/src/collator_side/tests.rs b/node/network/collator-protocol/src/collator_side/tests.rs index f81e738e16a4..2d2f2cf043de 100644 --- a/node/network/collator-protocol/src/collator_side/tests.rs +++ b/node/network/collator-protocol/src/collator_side/tests.rs @@ -31,6 +31,7 @@ use sp_runtime::traits::AppVerify; use polkadot_node_network_protocol::{ our_view, + peer_set::CollationVersion, request_response::{IncomingRequest, ReqProtocolNames}, view, }; @@ -399,7 +400,7 @@ async fn connect_peer( CollatorProtocolMessage::NetworkBridgeUpdate(NetworkBridgeEvent::PeerConnected( peer.clone(), polkadot_node_network_protocol::ObservedRole::Authority, - 1, + CollationVersion::V1.into(), authority_id.map(|v| HashSet::from([v])), )), ) diff --git a/node/network/collator-protocol/src/validator_side/tests.rs b/node/network/collator-protocol/src/validator_side/tests.rs index cfb8b967bb34..15740e5d5efa 100644 --- a/node/network/collator-protocol/src/validator_side/tests.rs +++ b/node/network/collator-protocol/src/validator_side/tests.rs @@ -24,6 +24,7 @@ use std::{iter, sync::Arc, time::Duration}; use polkadot_node_network_protocol::{ our_view, + peer_set::CollationVersion, request_response::{Requests, ResponseSender}, ObservedRole, }; @@ -306,7 +307,7 @@ async fn connect_and_declare_collator( CollatorProtocolMessage::NetworkBridgeUpdate(NetworkBridgeEvent::PeerConnected( peer.clone(), ObservedRole::Full, - 1, + CollationVersion::V1.into(), None, )), ) @@ -458,7 +459,7 @@ fn collator_authentication_verification_works() { CollatorProtocolMessage::NetworkBridgeUpdate(NetworkBridgeEvent::PeerConnected( peer_b, ObservedRole::Full, - 1, + CollationVersion::V1.into(), None, )), ) @@ -946,7 +947,7 @@ fn disconnect_if_no_declare() { CollatorProtocolMessage::NetworkBridgeUpdate(NetworkBridgeEvent::PeerConnected( peer_b.clone(), ObservedRole::Full, - 1, + CollationVersion::V1.into(), None, )), ) @@ -984,7 +985,7 @@ fn disconnect_if_wrong_declare() { CollatorProtocolMessage::NetworkBridgeUpdate(NetworkBridgeEvent::PeerConnected( peer_b.clone(), ObservedRole::Full, - 1, + CollationVersion::V1.into(), None, )), ) diff --git a/node/network/protocol/src/lib.rs b/node/network/protocol/src/lib.rs index 78727ae67e83..169d916ce6f9 100644 --- a/node/network/protocol/src/lib.rs +++ b/node/network/protocol/src/lib.rs @@ -43,8 +43,6 @@ pub mod authority_discovery; /// Grid topology support module pub mod grid_topology; -/// A version of the protocol. -pub type ProtocolVersion = u32; /// The minimum amount of peers to send gossip messages to. pub const MIN_GOSSIP_PEERS: usize = 25; diff --git a/node/network/protocol/src/peer_set.rs b/node/network/protocol/src/peer_set.rs index 400b36e3d4c5..aecc11e2554b 100644 --- a/node/network/protocol/src/peer_set.rs +++ b/node/network/protocol/src/peer_set.rs @@ -16,23 +16,25 @@ //! All peersets and protocols used for parachains. -use super::ProtocolVersion; +use derive_more::Display; +use polkadot_primitives::v2::Hash; use sc_network::config::{NonDefaultSetConfig, SetConfig}; use std::{ borrow::Cow, + collections::{hash_map::Entry, HashMap}, ops::{Index, IndexMut}, }; use strum::{EnumIter, IntoEnumIterator}; -// Only supported protocol versions should be defined here. -const VALIDATION_PROTOCOL_V1: &str = "/polkadot/validation/1"; -const COLLATION_PROTOCOL_V1: &str = "/polkadot/collation/1"; +/// The legacy protocol names. Only supported on version = 1. +const LEGACY_VALIDATION_PROTOCOL_V1: &str = "/polkadot/validation/1"; +const LEGACY_COLLATION_PROTOCOL_V1: &str = "/polkadot/collation/1"; -/// The default validation protocol version. -pub const DEFAULT_VALIDATION_PROTOCOL_VERSION: ProtocolVersion = 1; +/// The legacy protocol version. Is always 1 for both validation & collation. +const LEGACY_PROTOCOL_VERSION_V1: u32 = 1; -/// The default collation protocol version. -pub const DEFAULT_COLLATION_PROTOCOL_VERSION: ProtocolVersion = 1; +/// Max notification size is currently constant. +const MAX_NOTIFICATION_SIZE: u64 = 100 * 1024; /// The peer-sets and thus the protocols which are used for the network. #[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, EnumIter)] @@ -60,17 +62,21 @@ impl PeerSet { /// /// Those should be used in the network configuration to register the protocols with the /// network service. - pub fn get_info(self, is_authority: IsAuthority) -> NonDefaultSetConfig { - let version = self.get_default_version(); - let protocol = self - .into_protocol_name(version) - .expect("default version always has protocol name; qed"); - let max_notification_size = 100 * 1024; + pub fn get_info( + self, + is_authority: IsAuthority, + peerset_protocol_names: &PeerSetProtocolNames, + ) -> NonDefaultSetConfig { + // Networking layer relies on `get_main_name()` being the main name of the protocol + // for peersets and connection management. + let protocol = peerset_protocol_names.get_main_name(self); + let fallback_names = PeerSetProtocolNames::get_fallback_names(self); + let max_notification_size = self.get_max_notification_size(is_authority); match self { PeerSet::Validation => NonDefaultSetConfig { notifications_protocol: protocol, - fallback_names: Vec::new(), + fallback_names, max_notification_size, set_config: sc_network::config::SetConfig { // we allow full nodes to connect to validators for gossip @@ -85,7 +91,7 @@ impl PeerSet { }, PeerSet::Collation => NonDefaultSetConfig { notifications_protocol: protocol, - fallback_names: Vec::new(), + fallback_names, max_notification_size, set_config: SetConfig { // Non-authority nodes don't need to accept incoming connections on this peer set: @@ -102,51 +108,47 @@ impl PeerSet { } } - /// Get the default protocol version for this peer set. - pub const fn get_default_version(self) -> ProtocolVersion { + /// Get the main protocol version for this peer set. + /// + /// Networking layer relies on `get_main_version()` being the version + /// of the main protocol name reported by [`PeerSetProtocolNames::get_main_name()`]. + pub fn get_main_version(self) -> ProtocolVersion { match self { - PeerSet::Validation => DEFAULT_VALIDATION_PROTOCOL_VERSION, - PeerSet::Collation => DEFAULT_COLLATION_PROTOCOL_VERSION, + PeerSet::Validation => ValidationVersion::V1.into(), + PeerSet::Collation => CollationVersion::V1.into(), } } - /// Get the default protocol name as a static str. - pub const fn get_default_protocol_name(self) -> &'static str { - match self { - PeerSet::Validation => VALIDATION_PROTOCOL_V1, - PeerSet::Collation => COLLATION_PROTOCOL_V1, - } + /// Get the max notification size for this peer set. + pub fn get_max_notification_size(self, _: IsAuthority) -> u64 { + MAX_NOTIFICATION_SIZE } - /// Get the protocol name associated with each peer set - /// and the given version, if any, as static str. - pub const fn get_protocol_name_static(self, version: ProtocolVersion) -> Option<&'static str> { - match (self, version) { - (PeerSet::Validation, 1) => Some(VALIDATION_PROTOCOL_V1), - (PeerSet::Collation, 1) => Some(COLLATION_PROTOCOL_V1), - _ => None, + /// Get the peer set label for metrics reporting. + pub fn get_label(self) -> &'static str { + match self { + PeerSet::Validation => "validation", + PeerSet::Collation => "collation", } } - /// Get the protocol name associated with each peer set as understood by Substrate. - pub fn into_default_protocol_name(self) -> Cow<'static, str> { - self.get_default_protocol_name().into() - } - - /// Convert a peer set and the given version into a protocol name, if any, - /// as understood by Substrate. - pub fn into_protocol_name(self, version: ProtocolVersion) -> Option> { - self.get_protocol_name_static(version).map(|n| n.into()) - } - - /// Try parsing a protocol name into a peer set and protocol version. - /// - /// This only succeeds on supported versions. - pub fn try_from_protocol_name(name: &Cow<'static, str>) -> Option<(PeerSet, ProtocolVersion)> { - match name { - n if n == VALIDATION_PROTOCOL_V1 => Some((PeerSet::Validation, 1)), - n if n == COLLATION_PROTOCOL_V1 => Some((PeerSet::Collation, 1)), - _ => None, + /// Get the protocol label for metrics reporting. + pub fn get_protocol_label(self, version: ProtocolVersion) -> Option<&'static str> { + // Unfortunately, labels must be static strings, so we must manually cover them + // for all protocol versions here. + match self { + PeerSet::Validation => + if version == ValidationVersion::V1.into() { + Some("validation/1") + } else { + None + }, + PeerSet::Collation => + if version == CollationVersion::V1.into() { + Some("collation/1") + } else { + None + }, } } } @@ -181,6 +183,353 @@ impl IndexMut for PerPeerSet { /// /// Should be used during network configuration (added to [`NetworkConfiguration::extra_sets`]) /// or shortly after startup to register the protocols with the network service. -pub fn peer_sets_info(is_authority: IsAuthority) -> Vec { - PeerSet::iter().map(|s| s.get_info(is_authority)).collect() +pub fn peer_sets_info( + is_authority: IsAuthority, + peerset_protocol_names: &PeerSetProtocolNames, +) -> Vec { + PeerSet::iter() + .map(|s| s.get_info(is_authority, &peerset_protocol_names)) + .collect() +} + +/// A generic version of the protocol. This struct must not be created directly. +#[derive(Debug, Clone, Copy, Display, PartialEq, Eq, Hash)] +pub struct ProtocolVersion(u32); + +impl From for u32 { + fn from(version: ProtocolVersion) -> u32 { + version.0 + } +} + +/// Supported validation protocol versions. Only versions defined here must be used in the codebase. +#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, EnumIter)] +pub enum ValidationVersion { + /// The first version. + V1 = 1, +} + +/// Supported collation protocol versions. Only versions defined here must be used in the codebase. +#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, EnumIter)] +pub enum CollationVersion { + /// The first version. + V1 = 1, +} + +impl From for ProtocolVersion { + fn from(version: ValidationVersion) -> ProtocolVersion { + ProtocolVersion(version as u32) + } +} + +impl From for ProtocolVersion { + fn from(version: CollationVersion) -> ProtocolVersion { + ProtocolVersion(version as u32) + } +} + +/// On the wire protocol name to [`PeerSet`] mapping. +#[derive(Clone)] +pub struct PeerSetProtocolNames { + protocols: HashMap, (PeerSet, ProtocolVersion)>, + names: HashMap<(PeerSet, ProtocolVersion), Cow<'static, str>>, +} + +impl PeerSetProtocolNames { + /// Construct [`PeerSetProtocols`] using `genesis_hash` and `fork_id`. + pub fn new(genesis_hash: Hash, fork_id: Option<&str>) -> Self { + let mut protocols = HashMap::new(); + let mut names = HashMap::new(); + for protocol in PeerSet::iter() { + match protocol { + PeerSet::Validation => + for version in ValidationVersion::iter() { + Self::register_main_protocol( + &mut protocols, + &mut names, + protocol, + version.into(), + &genesis_hash, + fork_id, + ); + }, + PeerSet::Collation => + for version in CollationVersion::iter() { + Self::register_main_protocol( + &mut protocols, + &mut names, + protocol, + version.into(), + &genesis_hash, + fork_id, + ); + }, + } + Self::register_legacy_protocol(&mut protocols, protocol); + } + Self { protocols, names } + } + + /// Helper function to register main protocol. + fn register_main_protocol( + protocols: &mut HashMap, (PeerSet, ProtocolVersion)>, + names: &mut HashMap<(PeerSet, ProtocolVersion), Cow<'static, str>>, + protocol: PeerSet, + version: ProtocolVersion, + genesis_hash: &Hash, + fork_id: Option<&str>, + ) { + let protocol_name = Self::generate_name(genesis_hash, fork_id, protocol, version); + names.insert((protocol, version), protocol_name.clone()); + Self::insert_protocol_or_panic(protocols, protocol_name, protocol, version); + } + + /// Helper function to register legacy protocol. + fn register_legacy_protocol( + protocols: &mut HashMap, (PeerSet, ProtocolVersion)>, + protocol: PeerSet, + ) { + Self::insert_protocol_or_panic( + protocols, + Self::get_legacy_name(protocol), + protocol, + ProtocolVersion(LEGACY_PROTOCOL_VERSION_V1), + ) + } + + /// Helper function to make sure no protocols have the same name. + fn insert_protocol_or_panic( + protocols: &mut HashMap, (PeerSet, ProtocolVersion)>, + name: Cow<'static, str>, + protocol: PeerSet, + version: ProtocolVersion, + ) { + match protocols.entry(name) { + Entry::Vacant(entry) => { + entry.insert((protocol, version)); + }, + Entry::Occupied(entry) => { + panic!( + "Protocol {:?} (version {}) has the same on-the-wire name as protocol {:?} (version {}): `{}`.", + protocol, + version, + entry.get().0, + entry.get().1, + entry.key(), + ); + }, + } + } + + /// Lookup the protocol using its on the wire name. + pub fn try_get_protocol(&self, name: &Cow<'static, str>) -> Option<(PeerSet, ProtocolVersion)> { + self.protocols.get(name).map(ToOwned::to_owned) + } + + /// Get the main protocol name. It's used by the networking for keeping track + /// of peersets and connections. + pub fn get_main_name(&self, protocol: PeerSet) -> Cow<'static, str> { + self.get_name(protocol, protocol.get_main_version()) + } + + /// Get the protocol name for specific version. + pub fn get_name(&self, protocol: PeerSet, version: ProtocolVersion) -> Cow<'static, str> { + self.names + .get(&(protocol, version)) + .expect("Protocols & versions are specified via enums defined above, and they are all registered in `new()`; qed") + .clone() + } + + /// The protocol name of this protocol based on `genesis_hash` and `fork_id`. + fn generate_name( + genesis_hash: &Hash, + fork_id: Option<&str>, + protocol: PeerSet, + version: ProtocolVersion, + ) -> Cow<'static, str> { + let prefix = if let Some(fork_id) = fork_id { + format!("/{}/{}", hex::encode(genesis_hash), fork_id) + } else { + format!("/{}", hex::encode(genesis_hash)) + }; + + let short_name = match protocol { + PeerSet::Validation => "validation", + PeerSet::Collation => "collation", + }; + + format!("{}/{}/{}", prefix, short_name, version).into() + } + + /// Get the legacy protocol name, only `LEGACY_PROTOCOL_VERSION` = 1 is supported. + fn get_legacy_name(protocol: PeerSet) -> Cow<'static, str> { + match protocol { + PeerSet::Validation => LEGACY_VALIDATION_PROTOCOL_V1, + PeerSet::Collation => LEGACY_COLLATION_PROTOCOL_V1, + } + .into() + } + + /// Get the protocol fallback names. Currently only holds the legacy name + /// for `LEGACY_PROTOCOL_VERSION` = 1. + fn get_fallback_names(protocol: PeerSet) -> Vec> { + std::iter::once(Self::get_legacy_name(protocol)).collect() + } +} + +#[cfg(test)] +mod tests { + use super::{ + CollationVersion, Hash, PeerSet, PeerSetProtocolNames, ProtocolVersion, ValidationVersion, + }; + use strum::IntoEnumIterator; + + struct TestVersion(u32); + + impl From for ProtocolVersion { + fn from(version: TestVersion) -> ProtocolVersion { + ProtocolVersion(version.0) + } + } + + #[test] + fn protocol_names_are_correctly_generated() { + let genesis_hash = Hash::from([ + 122, 200, 116, 29, 232, 183, 20, 109, 138, 86, 23, 253, 70, 41, 20, 85, 127, 230, 60, + 38, 90, 127, 28, 16, 231, 218, 227, 40, 88, 238, 187, 128, + ]); + let name = PeerSetProtocolNames::generate_name( + &genesis_hash, + None, + PeerSet::Validation, + TestVersion(3).into(), + ); + let expected = + "/7ac8741de8b7146d8a5617fd462914557fe63c265a7f1c10e7dae32858eebb80/validation/3"; + assert_eq!(name, expected); + + let name = PeerSetProtocolNames::generate_name( + &genesis_hash, + None, + PeerSet::Collation, + TestVersion(5).into(), + ); + let expected = + "/7ac8741de8b7146d8a5617fd462914557fe63c265a7f1c10e7dae32858eebb80/collation/5"; + assert_eq!(name, expected); + + let fork_id = Some("test-fork"); + let name = PeerSetProtocolNames::generate_name( + &genesis_hash, + fork_id, + PeerSet::Validation, + TestVersion(7).into(), + ); + let expected = + "/7ac8741de8b7146d8a5617fd462914557fe63c265a7f1c10e7dae32858eebb80/test-fork/validation/7"; + assert_eq!(name, expected); + + let name = PeerSetProtocolNames::generate_name( + &genesis_hash, + fork_id, + PeerSet::Collation, + TestVersion(11).into(), + ); + let expected = + "/7ac8741de8b7146d8a5617fd462914557fe63c265a7f1c10e7dae32858eebb80/test-fork/collation/11"; + assert_eq!(name, expected); + } + + #[test] + fn all_protocol_names_are_known() { + let genesis_hash = Hash::from([ + 122, 200, 116, 29, 232, 183, 20, 109, 138, 86, 23, 253, 70, 41, 20, 85, 127, 230, 60, + 38, 90, 127, 28, 16, 231, 218, 227, 40, 88, 238, 187, 128, + ]); + let protocol_names = PeerSetProtocolNames::new(genesis_hash, None); + + let validation_main = + "/7ac8741de8b7146d8a5617fd462914557fe63c265a7f1c10e7dae32858eebb80/validation/1"; + assert_eq!( + protocol_names.try_get_protocol(&validation_main.into()), + Some((PeerSet::Validation, TestVersion(1).into())), + ); + + let validation_legacy = "/polkadot/validation/1"; + assert_eq!( + protocol_names.try_get_protocol(&validation_legacy.into()), + Some((PeerSet::Validation, TestVersion(1).into())), + ); + + let collation_main = + "/7ac8741de8b7146d8a5617fd462914557fe63c265a7f1c10e7dae32858eebb80/collation/1"; + assert_eq!( + protocol_names.try_get_protocol(&collation_main.into()), + Some((PeerSet::Collation, TestVersion(1).into())), + ); + + let collation_legacy = "/polkadot/collation/1"; + assert_eq!( + protocol_names.try_get_protocol(&collation_legacy.into()), + Some((PeerSet::Collation, TestVersion(1).into())), + ); + } + + #[test] + fn all_protocol_versions_are_registered() { + let genesis_hash = Hash::from([ + 122, 200, 116, 29, 232, 183, 20, 109, 138, 86, 23, 253, 70, 41, 20, 85, 127, 230, 60, + 38, 90, 127, 28, 16, 231, 218, 227, 40, 88, 238, 187, 128, + ]); + let protocol_names = PeerSetProtocolNames::new(genesis_hash, None); + + for protocol in PeerSet::iter() { + match protocol { + PeerSet::Validation => + for version in ValidationVersion::iter() { + assert_eq!( + protocol_names.get_name(protocol, version.into()), + PeerSetProtocolNames::generate_name( + &genesis_hash, + None, + protocol, + version.into(), + ), + ); + }, + PeerSet::Collation => + for version in CollationVersion::iter() { + assert_eq!( + protocol_names.get_name(protocol, version.into()), + PeerSetProtocolNames::generate_name( + &genesis_hash, + None, + protocol, + version.into(), + ), + ); + }, + } + } + } + + #[test] + fn all_protocol_versions_have_labels() { + for protocol in PeerSet::iter() { + match protocol { + PeerSet::Validation => + for version in ValidationVersion::iter() { + protocol + .get_protocol_label(version.into()) + .expect("All validation protocol versions must have a label."); + }, + PeerSet::Collation => + for version in CollationVersion::iter() { + protocol + .get_protocol_label(version.into()) + .expect("All collation protocol versions must have a label."); + }, + } + } + } } diff --git a/node/network/statement-distribution/src/lib.rs b/node/network/statement-distribution/src/lib.rs index 38d4022c633b..274582420f5d 100644 --- a/node/network/statement-distribution/src/lib.rs +++ b/node/network/statement-distribution/src/lib.rs @@ -996,7 +996,7 @@ fn is_statement_large(statement: &SignedFullStatement) -> (bool, Option) // Half max size seems to be a good threshold to start not using notifications: let threshold = - PeerSet::Validation.get_info(IsAuthority::Yes).max_notification_size as usize / 2; + PeerSet::Validation.get_max_notification_size(IsAuthority::Yes) as usize / 2; (size >= threshold, Some(size)) }, diff --git a/node/network/statement-distribution/src/tests.rs b/node/network/statement-distribution/src/tests.rs index efc9ca896bb1..3304ad86fcd5 100644 --- a/node/network/statement-distribution/src/tests.rs +++ b/node/network/statement-distribution/src/tests.rs @@ -20,6 +20,7 @@ use futures::executor::{self, block_on}; use futures_timer::Delay; use parity_scale_codec::{Decode, Encode}; use polkadot_node_network_protocol::{ + peer_set::ValidationVersion, request_response::{ v1::{StatementFetchingRequest, StatementFetchingResponse}, IncomingRequest, Recipient, ReqProtocolNames, Requests, @@ -779,7 +780,12 @@ fn receiving_from_one_sends_to_another_and_to_candidate_backing() { handle .send(FromOrchestra::Communication { msg: StatementDistributionMessage::NetworkBridgeUpdate( - NetworkBridgeEvent::PeerConnected(peer_a.clone(), ObservedRole::Full, 1, None), + NetworkBridgeEvent::PeerConnected( + peer_a.clone(), + ObservedRole::Full, + ValidationVersion::V1.into(), + None, + ), ), }) .await; @@ -787,7 +793,12 @@ fn receiving_from_one_sends_to_another_and_to_candidate_backing() { handle .send(FromOrchestra::Communication { msg: StatementDistributionMessage::NetworkBridgeUpdate( - NetworkBridgeEvent::PeerConnected(peer_b.clone(), ObservedRole::Full, 1, None), + NetworkBridgeEvent::PeerConnected( + peer_b.clone(), + ObservedRole::Full, + ValidationVersion::V1.into(), + None, + ), ), }) .await; @@ -977,7 +988,7 @@ fn receiving_large_statement_from_one_sends_to_another_and_to_candidate_backing( NetworkBridgeEvent::PeerConnected( peer_a.clone(), ObservedRole::Full, - 1, + ValidationVersion::V1.into(), Some(HashSet::from([Sr25519Keyring::Alice.public().into()])), ), ), @@ -990,7 +1001,7 @@ fn receiving_large_statement_from_one_sends_to_another_and_to_candidate_backing( NetworkBridgeEvent::PeerConnected( peer_b.clone(), ObservedRole::Full, - 1, + ValidationVersion::V1.into(), Some(HashSet::from([Sr25519Keyring::Bob.public().into()])), ), ), @@ -1002,7 +1013,7 @@ fn receiving_large_statement_from_one_sends_to_another_and_to_candidate_backing( NetworkBridgeEvent::PeerConnected( peer_c.clone(), ObservedRole::Full, - 1, + ValidationVersion::V1.into(), Some(HashSet::from([Sr25519Keyring::Charlie.public().into()])), ), ), @@ -1014,7 +1025,7 @@ fn receiving_large_statement_from_one_sends_to_another_and_to_candidate_backing( NetworkBridgeEvent::PeerConnected( peer_bad.clone(), ObservedRole::Full, - 1, + ValidationVersion::V1.into(), None, ), ), @@ -1492,7 +1503,7 @@ fn share_prioritizes_backing_group() { NetworkBridgeEvent::PeerConnected( peer, ObservedRole::Full, - 1, + ValidationVersion::V1.into(), Some(HashSet::from([pair.public().into()])), ), ), @@ -1515,7 +1526,7 @@ fn share_prioritizes_backing_group() { NetworkBridgeEvent::PeerConnected( peer_a.clone(), ObservedRole::Full, - 1, + ValidationVersion::V1.into(), Some(HashSet::from([Sr25519Keyring::Alice.public().into()])), ), ), @@ -1527,7 +1538,7 @@ fn share_prioritizes_backing_group() { NetworkBridgeEvent::PeerConnected( peer_b.clone(), ObservedRole::Full, - 1, + ValidationVersion::V1.into(), Some(HashSet::from([Sr25519Keyring::Bob.public().into()])), ), ), @@ -1539,7 +1550,7 @@ fn share_prioritizes_backing_group() { NetworkBridgeEvent::PeerConnected( peer_c.clone(), ObservedRole::Full, - 1, + ValidationVersion::V1.into(), Some(HashSet::from([Sr25519Keyring::Charlie.public().into()])), ), ), @@ -1551,7 +1562,7 @@ fn share_prioritizes_backing_group() { NetworkBridgeEvent::PeerConnected( peer_bad.clone(), ObservedRole::Full, - 1, + ValidationVersion::V1.into(), None, ), ), @@ -1563,7 +1574,7 @@ fn share_prioritizes_backing_group() { NetworkBridgeEvent::PeerConnected( peer_other_group.clone(), ObservedRole::Full, - 1, + ValidationVersion::V1.into(), Some(HashSet::from([Sr25519Keyring::Dave.public().into()])), ), ), @@ -1786,7 +1797,7 @@ fn peer_cant_flood_with_large_statements() { NetworkBridgeEvent::PeerConnected( peer_a.clone(), ObservedRole::Full, - 1, + ValidationVersion::V1.into(), Some(HashSet::from([Sr25519Keyring::Alice.public().into()])), ), ), @@ -1993,7 +2004,7 @@ fn handle_multiple_seconded_statements() { NetworkBridgeEvent::PeerConnected( peer.clone(), ObservedRole::Full, - 1, + ValidationVersion::V1.into(), None, ), ), diff --git a/node/service/src/lib.rs b/node/service/src/lib.rs index 206809aeefe8..1487e7201ebf 100644 --- a/node/service/src/lib.rs +++ b/node/service/src/lib.rs @@ -46,7 +46,9 @@ use { self as chain_selection_subsystem, Config as ChainSelectionConfig, }, polkadot_node_core_dispute_coordinator::Config as DisputeCoordinatorConfig, - polkadot_node_network_protocol::request_response::ReqProtocolNames, + polkadot_node_network_protocol::{ + peer_set::PeerSetProtocolNames, request_response::ReqProtocolNames, + }, polkadot_overseer::BlockInfo, sc_client_api::{BlockBackend, ExecutorProvider}, sp_core::traits::SpawnNamed, @@ -852,10 +854,16 @@ where .push(beefy_gadget::beefy_peers_set_config(beefy_protocol_name.clone())); } + let peerset_protocol_names = + PeerSetProtocolNames::new(genesis_hash, config.chain_spec.fork_id()); + { use polkadot_network_bridge::{peer_sets_info, IsAuthority}; let is_authority = if role.is_authority() { IsAuthority::Yes } else { IsAuthority::No }; - config.network.extra_sets.extend(peer_sets_info(is_authority)); + config + .network + .extra_sets + .extend(peer_sets_info(is_authority, &peerset_protocol_names)); } let req_protocol_names = ReqProtocolNames::new(&genesis_hash, config.chain_spec.fork_id()); @@ -1063,6 +1071,7 @@ where pvf_checker_enabled, overseer_message_channel_capacity_override, req_protocol_names, + peerset_protocol_names, }, ) .map_err(|e| { diff --git a/node/service/src/overseer.rs b/node/service/src/overseer.rs index 622b815944ae..a8ce3e5eaaf0 100644 --- a/node/service/src/overseer.rs +++ b/node/service/src/overseer.rs @@ -24,8 +24,9 @@ use polkadot_node_core_av_store::Config as AvailabilityConfig; use polkadot_node_core_candidate_validation::Config as CandidateValidationConfig; use polkadot_node_core_chain_selection::Config as ChainSelectionConfig; use polkadot_node_core_dispute_coordinator::Config as DisputeCoordinatorConfig; -use polkadot_node_network_protocol::request_response::{ - v1 as request_v1, IncomingRequestReceiver, ReqProtocolNames, +use polkadot_node_network_protocol::{ + peer_set::PeerSetProtocolNames, + request_response::{v1 as request_v1, IncomingRequestReceiver, ReqProtocolNames}, }; #[cfg(any(feature = "malus", test))] pub use polkadot_overseer::{ @@ -122,6 +123,8 @@ where pub overseer_message_channel_capacity_override: Option, /// Request-response protocol names source. pub req_protocol_names: ReqProtocolNames, + /// [`PeerSet`] protocol names to protocols mapping. + pub peerset_protocol_names: PeerSetProtocolNames, } /// Obtain a prepared `OverseerBuilder`, that is initialized @@ -151,6 +154,7 @@ pub fn prepared_overseer_builder<'a, Spawner, RuntimeClient>( pvf_checker_enabled, overseer_message_channel_capacity_override, req_protocol_names, + peerset_protocol_names, }: OverseerGenArgs<'a, Spawner, RuntimeClient>, ) -> Result< InitializedOverseerBuilder< @@ -206,12 +210,14 @@ where authority_discovery_service.clone(), network_bridge_metrics.clone(), req_protocol_names, + peerset_protocol_names.clone(), )) .network_bridge_rx(NetworkBridgeRxSubsystem::new( network_service.clone(), authority_discovery_service.clone(), Box::new(network_service.clone()), network_bridge_metrics, + peerset_protocol_names, )) .availability_distribution(AvailabilityDistributionSubsystem::new( keystore.clone(), diff --git a/node/subsystem-types/src/messages/network_bridge_event.rs b/node/subsystem-types/src/messages/network_bridge_event.rs index 4cf2bed6ef8a..cd0bb9894b6b 100644 --- a/node/subsystem-types/src/messages/network_bridge_event.rs +++ b/node/subsystem-types/src/messages/network_bridge_event.rs @@ -22,7 +22,8 @@ use std::{ pub use sc_network::{PeerId, ReputationChange}; use polkadot_node_network_protocol::{ - grid_topology::SessionGridTopology, ObservedRole, OurView, ProtocolVersion, View, WrongVariant, + grid_topology::SessionGridTopology, peer_set::ProtocolVersion, ObservedRole, OurView, View, + WrongVariant, }; use polkadot_primitives::v2::{AuthorityDiscoveryId, SessionIndex, ValidatorIndex};