Skip to content

Commit

Permalink
perf(comms)!: optimise connection establishment (#3658)
Browse files Browse the repository at this point in the history
Description
---
Optimise connection establishment:
- perform identity protocol before yamux upgrade
- identity protocol uses socket rather than yamux
- trim down bytes sent for identity protocol

Additional: remove tokio-tungstonite dependency from base node (part of warp default-features for websockets)

Motivation and Context
---
By performing identity protocol before yamux upgrade, we can send the identity message without incurring the header cost of yamux and also avoids having to close a substream after identities are exchanged. It is important that the identity protocol is as trim as possible, so a manual framing implementation is used rather than the tokio length-delimited framing codec.

This is a network breaking change, nodes upgraded to this will not be able to communicate with older nodes and vice-versa.

How Has This Been Tested?
---
Existing tests pass, manually between two upgraded nodes
  • Loading branch information
sdbondi authored Jan 3, 2022
1 parent eee73f7 commit aeeefbb
Show file tree
Hide file tree
Showing 18 changed files with 160 additions and 223 deletions.
61 changes: 0 additions & 61 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion applications/tari_base_node/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ tracing-subscriber = "0.2.20"

# Metrics
tari_metrics = { path = "../../infrastructure/metrics", optional = true }
warp = { version = "0.3.1", optional = true }
warp = { version = "0.3.1", optional = true, default-features = false }
reqwest = { version = "0.11.4", default-features = false, optional = true }

[features]
Expand Down
4 changes: 2 additions & 2 deletions base_layer/p2p/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ pub use tari_common::configuration::Network;
pub const DEFAULT_DNS_NAME_SERVER: &str = "1.1.1.1:853/cloudflare-dns.com";

/// Major network version. Peers will refuse connections if this value differs
pub const MAJOR_NETWORK_VERSION: u32 = 0;
pub const MAJOR_NETWORK_VERSION: u8 = 0;
/// Minor network version. This should change with each time the network protocol has changed in a backward-compatible
/// way.
pub const MINOR_NETWORK_VERSION: u32 = 0;
pub const MINOR_NETWORK_VERSION: u8 = 0;
14 changes: 13 additions & 1 deletion comms/dht/tests/dht.rs
Original file line number Diff line number Diff line change
Expand Up @@ -550,8 +550,11 @@ async fn dht_propagate_dedup() {

let mut node_A_messaging = node_A.messaging_events.subscribe();
let mut node_B_messaging = node_B.messaging_events.subscribe();
let mut node_B_messaging2 = node_B.messaging_events.subscribe();
let mut node_C_messaging = node_C.messaging_events.subscribe();
let mut node_C_messaging2 = node_C.messaging_events.subscribe();
let mut node_D_messaging = node_D.messaging_events.subscribe();
let mut node_D_messaging2 = node_D.messaging_events.subscribe();

#[derive(Clone, PartialEq, ::prost::Message)]
struct Person {
Expand Down Expand Up @@ -596,6 +599,11 @@ async fn dht_propagate_dedup() {
let node_C_id = node_C.node_identity().node_id().clone();
let node_D_id = node_D.node_identity().node_id().clone();

// Ensure that the message has propagated before disconnecting everyone
let _ = node_B_messaging2.recv().await.unwrap();
let _ = node_C_messaging2.recv().await.unwrap();
let _ = node_D_messaging2.recv().await.unwrap();

node_A.shutdown().await;
node_B.shutdown().await;
node_C.shutdown().await;
Expand All @@ -611,7 +619,11 @@ async fn dht_propagate_dedup() {
let received = filter_received(collect_try_recv!(node_B_messaging, timeout = Duration::from_secs(20)));
let recv_count = count_messages_received(&received, &[&node_A_id, &node_C_id]);
// Expected race condition: If A->B->C before A->C then C->B does not happen
assert!((1..=2).contains(&recv_count));
assert!(
(1..=2).contains(&recv_count),
"expected recv_count to be in [1-2] but was {}",
recv_count
);

let received = filter_received(collect_try_recv!(node_C_messaging, timeout = Duration::from_secs(20)));
let recv_count = count_messages_received(&received, &[&node_A_id, &node_B_id]);
Expand Down
2 changes: 1 addition & 1 deletion comms/src/builder/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -124,7 +124,7 @@ impl CommsBuilder {
}

/// Set a network major and minor version as per [RFC-173 Versioning](https://rfc.tari.com/RFC-0173_Versioning.html)
pub fn with_node_version(mut self, major_version: u32, minor_version: u32) -> Self {
pub fn with_node_version(mut self, major_version: u8, minor_version: u8) -> Self {
self.connection_manager_config.network_info.major_version = major_version;
self.connection_manager_config.network_info.minor_version = minor_version;
self
Expand Down
25 changes: 9 additions & 16 deletions comms/src/connection_manager/common.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,14 +22,13 @@

use std::{convert::TryFrom, net::Ipv6Addr};

use futures::StreamExt;
use log::*;
use tokio::io::{AsyncRead, AsyncWrite};

use super::types::ConnectionDirection;
use crate::{
connection_manager::error::ConnectionManagerError,
multiaddr::{Multiaddr, Protocol},
multiplexing::Yamux,
peer_manager::{NodeId, NodeIdentity, Peer, PeerFeatures, PeerFlags},
proto::identity::PeerIdentityMsg,
protocol,
Expand All @@ -43,30 +42,24 @@ const LOG_TARGET: &str = "comms::connection_manager::common";
/// The maximum size of the peer's user agent string. If the peer sends a longer string it is truncated.
const MAX_USER_AGENT_LEN: usize = 100;

pub async fn perform_identity_exchange<'p, P: IntoIterator<Item = &'p ProtocolId>>(
muxer: &mut Yamux,
pub async fn perform_identity_exchange<
'p,
P: IntoIterator<Item = &'p ProtocolId>,
TSocket: AsyncRead + AsyncWrite + Unpin,
>(
socket: &mut TSocket,
node_identity: &NodeIdentity,
direction: ConnectionDirection,
our_supported_protocols: P,
network_info: NodeNetworkInfo,
) -> Result<PeerIdentityMsg, ConnectionManagerError> {
let mut control = muxer.get_yamux_control();
let stream = match direction {
ConnectionDirection::Inbound => muxer
.incoming_mut()
.next()
.await
.ok_or(ConnectionManagerError::IncomingListenerStreamClosed)?,
ConnectionDirection::Outbound => control.open_stream().await?,
};

debug!(
target: LOG_TARGET,
"{} substream opened to peer. Performing identity exchange.", direction
"{} socket opened to peer. Performing identity exchange.", direction
);

let peer_identity =
protocol::identity_exchange(node_identity, direction, our_supported_protocols, network_info, stream).await?;
protocol::identity_exchange(node_identity, direction, our_supported_protocols, network_info, socket).await?;

Ok(peer_identity)
}
Expand Down
30 changes: 15 additions & 15 deletions comms/src/connection_manager/dialer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -352,7 +352,7 @@ where
async fn perform_socket_upgrade_procedure(
peer_manager: Arc<PeerManager>,
node_identity: Arc<NodeIdentity>,
socket: NoiseSocket<TTransport::Output>,
mut socket: NoiseSocket<TTransport::Output>,
dialed_addr: Multiaddr,
authenticated_public_key: CommsPublicKey,
conn_man_notifier: mpsc::Sender<ConnectionManagerEvent>,
Expand All @@ -361,43 +361,36 @@ where
cancel_signal: ShutdownSignal,
) -> Result<PeerConnection, ConnectionManagerError> {
static CONNECTION_DIRECTION: ConnectionDirection = ConnectionDirection::Outbound;
let mut muxer = Yamux::upgrade_connection(socket, CONNECTION_DIRECTION)
.await
.map_err(|err| ConnectionManagerError::YamuxUpgradeFailure(err.to_string()))?;

debug!(
target: LOG_TARGET,
"Starting peer identity exchange for peer with public key '{}'", authenticated_public_key
);
if cancel_signal.is_terminated() {
return Err(ConnectionManagerError::DialCancelled);
}

// Check if we know the peer and if it is banned
let known_peer = common::find_unbanned_peer(&peer_manager, &authenticated_public_key).await?;

let peer_identity = common::perform_identity_exchange(
&mut muxer,
&mut socket,
&node_identity,
CONNECTION_DIRECTION,
&our_supported_protocols,
config.network_info.clone(),
)
.await?;

if cancel_signal.is_terminated() {
muxer.get_yamux_control().close().await?;
return Err(ConnectionManagerError::DialCancelled);
}

let features = PeerFeatures::from_bits_truncate(peer_identity.features);
trace!(
debug!(
target: LOG_TARGET,
"Peer identity exchange succeeded on Outbound connection for peer '{}' (Features = {:?})",
authenticated_public_key,
features
);
trace!(target: LOG_TARGET, "{:?}", peer_identity);

// Check if we know the peer and if it is banned
let known_peer = common::find_unbanned_peer(&peer_manager, &authenticated_public_key).await?;

let (peer_node_id, their_supported_protocols) = common::validate_and_add_peer_from_peer_identity(
&peer_manager,
known_peer,
Expand All @@ -409,7 +402,6 @@ where
.await?;

if cancel_signal.is_terminated() {
muxer.get_yamux_control().close().await?;
return Err(ConnectionManagerError::DialCancelled);
}

Expand All @@ -420,6 +412,14 @@ where
peer_node_id.short_str()
);

let muxer = Yamux::upgrade_connection(socket, CONNECTION_DIRECTION)
.map_err(|err| ConnectionManagerError::YamuxUpgradeFailure(err.to_string()))?;

if cancel_signal.is_terminated() {
muxer.get_yamux_control().close().await?;
return Err(ConnectionManagerError::DialCancelled);
}

peer_connection::create(
muxer,
dialed_addr,
Expand Down
26 changes: 16 additions & 10 deletions comms/src/connection_manager/listener.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ use std::{
atomic::{AtomicUsize, Ordering},
Arc,
},
time::Duration,
time::{Duration, Instant},
};

use futures::{future, FutureExt};
Expand Down Expand Up @@ -351,7 +351,8 @@ where
"Starting noise protocol upgrade for peer at address '{}'", peer_addr
);

let noise_socket = time::timeout(
let timer = Instant::now();
let mut noise_socket = time::timeout(
Duration::from_secs(30),
noise_config.upgrade_socket(socket, CONNECTION_DIRECTION),
)
Expand All @@ -362,21 +363,23 @@ where
.get_remote_public_key()
.ok_or(ConnectionManagerError::InvalidStaticPublicKey)?;

debug!(
target: LOG_TARGET,
"Noise socket upgrade completed in {:.2?} with public key '{}'",
timer.elapsed(),
authenticated_public_key
);

// Check if we know the peer and if it is banned
let known_peer = common::find_unbanned_peer(&peer_manager, &authenticated_public_key).await?;

let mut muxer = Yamux::upgrade_connection(noise_socket, CONNECTION_DIRECTION)
.await
.map_err(|err| ConnectionManagerError::YamuxUpgradeFailure(err.to_string()))?;

trace!(
debug!(
target: LOG_TARGET,
"Starting peer identity exchange for peer with public key '{}'",
authenticated_public_key
"Starting peer identity exchange for peer with public key '{}'", authenticated_public_key
);

let peer_identity = common::perform_identity_exchange(
&mut muxer,
&mut noise_socket,
&node_identity,
CONNECTION_DIRECTION,
&our_supported_protocols,
Expand Down Expand Up @@ -410,6 +413,9 @@ where
peer_node_id.short_str()
);

let muxer = Yamux::upgrade_connection(noise_socket, CONNECTION_DIRECTION)
.map_err(|err| ConnectionManagerError::YamuxUpgradeFailure(err.to_string()))?;

peer_connection::create(
muxer,
peer_addr,
Expand Down
6 changes: 3 additions & 3 deletions comms/src/connection_manager/tests/manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ use crate::{
},
noise::NoiseConfig,
peer_manager::{NodeId, Peer, PeerFeatures, PeerFlags, PeerManagerError},
protocol::{ProtocolEvent, ProtocolId, Protocols, IDENTITY_PROTOCOL},
protocol::{ProtocolEvent, ProtocolId, Protocols},
runtime,
runtime::task,
test_utils::{
Expand Down Expand Up @@ -156,15 +156,15 @@ async fn dial_success() {
let mut conn_out = conn_man1.dial_peer(node_identity2.node_id().clone()).await.unwrap();
assert_eq!(conn_out.peer_node_id(), node_identity2.node_id());
let peer2 = peer_manager1.find_by_node_id(conn_out.peer_node_id()).await.unwrap();
assert_eq!(peer2.supported_protocols, [&IDENTITY_PROTOCOL, &TEST_PROTO]);
assert_eq!(peer2.supported_protocols, [&TEST_PROTO]);
assert_eq!(peer2.user_agent, "node2");

let event = subscription2.recv().await.unwrap();
unpack_enum!(ConnectionManagerEvent::PeerConnected(conn_in) = &*event);
assert_eq!(conn_in.peer_node_id(), node_identity1.node_id());

let peer1 = peer_manager2.find_by_node_id(node_identity1.node_id()).await.unwrap();
assert_eq!(peer1.supported_protocols(), [&IDENTITY_PROTOCOL, &TEST_PROTO]);
assert_eq!(peer1.supported_protocols(), [&TEST_PROTO]);
assert_eq!(peer1.user_agent, "node1");

let err = conn_out
Expand Down
Loading

0 comments on commit aeeefbb

Please sign in to comment.