Skip to content

Commit

Permalink
feat: add configurable exclude dialer addresses for universe (#6543)
Browse files Browse the repository at this point in the history
Description
---
Added user-configurable communication node addresses that should never
be dialled. In Tari Universe the base node and wallet is configured in a
listen-only TCP mode with a fake public address and when their addresses
are propagated throughout the network, other peers should not try to
dial them.

Motivation and Context
---
The previous implementation was hard-coded and did not work properly.

How Has This Been Tested?
---
Added unit tests.
System-level testing.

What process can a PR reviewer use to test or verify this change?
---
Code review.

<!-- Checklist -->
<!-- 1. Is the title of your PR in the form that would make nice release
notes? The title, excluding the conventional commit
tag, will be included exactly as is in the CHANGELOG, so please think
about it carefully. -->


Breaking Changes
---

- [x] None
- [ ] Requires data directory on base node to be deleted
- [ ] Requires hard fork
- [ ] Other - Please specify

<!-- Does this include a breaking change? If so, include this line as a
footer -->
<!-- BREAKING CHANGE: Description what the user should do, e.g. delete a
database, resync the chain -->
  • Loading branch information
hansieodendaal authored Sep 10, 2024
1 parent 4763579 commit e113a0e
Show file tree
Hide file tree
Showing 15 changed files with 325 additions and 31 deletions.
1 change: 1 addition & 0 deletions base_layer/contacts/tests/contacts_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,7 @@ pub fn setup_contacts_service<T: ContactsBackend + 'static>(
auto_request: true,
..Default::default()
},
excluded_dial_addresses: vec![],
..Default::default()
},
allow_test_addresses: true,
Expand Down
3 changes: 2 additions & 1 deletion base_layer/p2p/src/initialization.rs
Original file line number Diff line number Diff line change
Expand Up @@ -331,7 +331,8 @@ async fn configure_comms_and_dht(
.with_listener_liveness_max_sessions(config.listener_liveness_max_sessions)
.with_listener_liveness_allowlist_cidrs(listener_liveness_allowlist_cidrs)
.with_dial_backoff(ConstantBackoff::new(Duration::from_millis(500)))
.with_peer_storage(peer_database, Some(file_lock));
.with_peer_storage(peer_database, Some(file_lock))
.with_excluded_dial_addresses(config.dht.excluded_dial_addresses.clone());

let mut comms = match config.auxiliary_tcp_listener_address {
Some(ref addr) => builder.with_auxiliary_tcp_listener_address(addr.clone()).build()?,
Expand Down
1 change: 1 addition & 0 deletions base_layer/wallet_ffi/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5327,6 +5327,7 @@ pub unsafe extern "C" fn comms_config_create(
minimum_desired_tcpv4_node_ratio: 0.0,
..Default::default()
},
excluded_dial_addresses: vec![],
..Default::default()
},
allow_test_addresses: true,
Expand Down
2 changes: 2 additions & 0 deletions common/config/presets/c_base_node_c.toml
Original file line number Diff line number Diff line change
Expand Up @@ -316,3 +316,5 @@ database_url = "data/base_node/dht.db"
# In a situation where a node is not well-connected and many nodes are locally marked as offline, we can retry
# peers that were previously tried. Default: 2 hours
#offline_peer_cooldown = 7_200 # 2 * 60 * 60
# Addresses that should never be dialed (default value = [])
#excluded_dial_addresses = ["/ip4/x.x.x.x/tcp/xxxx", "/ip4/x.y.x.y/tcp/xyxy"]
4 changes: 3 additions & 1 deletion common/config/presets/d_console_wallet.toml
Original file line number Diff line number Diff line change
Expand Up @@ -168,7 +168,7 @@ event_channel_size = 3500
# peers can find you.
# _NOTE_: If using the `tor` transport type, public_address will be ignored and an onion address will be
# automatically configured
#public_addresses = ["/ip4/172.2.3.4/tcp/18189",]
#public_addresses = ["/ip4/172.2.3.4/tcp/18188",]

# Optionally bind an additional TCP socket for inbound Tari P2P protocol commms.
# Use cases include:
Expand Down Expand Up @@ -360,3 +360,5 @@ network_discovery.initial_peer_sync_delay = 25
# In a situation where a node is not well-connected and many nodes are locally marked as offline, we can retry
# peers that were previously tried. Default: 2 hours
#offline_peer_cooldown = 7_200 # 2 * 60 * 60
# Addresses that should never be dialed (default value = [])
#excluded_dial_addresses = ["/ip4/x.x.x.x/tcp/xxxx", "/ip4/x.y.x.y/tcp/xyxy"]
5 changes: 5 additions & 0 deletions comms/core/src/builder/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -242,6 +242,11 @@ impl CommsBuilder {
self
}

pub fn with_excluded_dial_addresses(mut self, excluded_addresses: Vec<Multiaddr>) -> Self {
self.connection_manager_config.excluded_dial_addresses = excluded_addresses;
self
}

/// Restrict liveness sessions to certain address ranges (CIDR format).
pub fn with_listener_liveness_allowlist_cidrs(mut self, cidrs: Vec<cidr::AnyIpCidr>) -> Self {
self.connection_manager_config.liveness_cidr_allowlist = cidrs;
Expand Down
26 changes: 17 additions & 9 deletions comms/core/src/connection_manager/dialer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -174,6 +174,7 @@ where
fn handle_request(&mut self, pending_dials: &mut DialFuturesUnordered, request: DialerRequest) {
use DialerRequest::{CancelPendingDial, Dial, NotifyNewInboundConnection};
debug!(target: LOG_TARGET, "Connection dialer got request: {:?}", request);

match request {
Dial(peer, reply_tx) => {
self.handle_dial_peer_request(pending_dials, peer, reply_tx);
Expand Down Expand Up @@ -515,7 +516,7 @@ where
tokio::select! {
_ = delay => {
debug!(target: LOG_TARGET, "[Attempt {}] Connecting to peer '{}'", current_state.num_attempts(), current_state.peer().node_id.short_str());
match Self::dial_peer(current_state, &noise_config, &current_transport, config.network_info.network_wire_byte).await {
match Self::dial_peer(current_state, &noise_config, &current_transport, config.network_info.network_wire_byte, config.excluded_dial_addresses.clone()).await {
(state, Ok((socket, addr))) => {
debug!(target: LOG_TARGET, "Dial succeeded for peer '{}' after {} attempt(s)", state.peer().node_id.short_str(), state.num_attempts());
break (state, Ok((socket, addr)));
Expand All @@ -524,6 +525,8 @@ where
(state, Err(ConnectionManagerError::NoiseHandshakeError(e))) => break (state, Err(ConnectionManagerError::NoiseHandshakeError(e))),
// Inflight dial was cancelled
(state, Err(ConnectionManagerError::DialCancelled)) => break (state, Err(ConnectionManagerError::DialCancelled)),
// All public addresses for this peer are excluded
(state, Err(ConnectionManagerError::AllPeerAddressesAreExcluded(e))) => break (state, Err(ConnectionManagerError::AllPeerAddressesAreExcluded(e))),
(state, Err(err)) => {
debug!(target: LOG_TARGET, "Failed to dial peer {} | Attempt {} | Error: {}", state.peer().node_id.short_str(), state.num_attempts(), err);
if state.num_attempts() >= config.max_dial_attempts {
Expand Down Expand Up @@ -554,6 +557,7 @@ where
noise_config: &NoiseConfig,
transport: &TTransport,
network_byte: u8,
excluded_dial_addresses: Vec<Multiaddr>,
) -> (
DialState,
Result<(NoiseSocket<TTransport::Output>, Multiaddr), ConnectionManagerError>,
Expand All @@ -564,10 +568,7 @@ where
.clone()
.into_vec()
.iter()
.filter(|&a| {
a == &"/memory/0".parse::<Multiaddr>().expect("will not fail") || // Used for tests, allowed
a != &ConnectionManagerConfig::default().listener_address // Not allowed to dial the default
})
.filter(|&a| !excluded_dial_addresses.iter().any(|excluded| a == excluded))
.cloned()
.collect::<Vec<_>>();
if addresses.is_empty() {
Expand All @@ -577,10 +578,17 @@ where
"Dial - No more contactable addresses for peer '{}'",
node_id_hex
);
return (
dial_state,
Err(ConnectionManagerError::NoContactableAddressesForPeer(node_id_hex)),
);
return if dial_state.peer().addresses.is_empty() {
(
dial_state,
Err(ConnectionManagerError::NoContactableAddressesForPeer(node_id_hex)),
)
} else {
(
dial_state,
Err(ConnectionManagerError::AllPeerAddressesAreExcluded(node_id_hex)),
)
};
}
let cancel_signal = dial_state.get_cancel_signal();
for address in addresses {
Expand Down
2 changes: 2 additions & 0 deletions comms/core/src/connection_manager/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,8 @@ pub enum ConnectionManagerError {
PeerValidationError(#[from] PeerValidatorError),
#[error("No contactable addresses for peer {0} left")]
NoContactableAddressesForPeer(String),
#[error("All peer addresses are excluded for peer {0}")]
AllPeerAddressesAreExcluded(String),
#[error("Yamux error: {0}")]
YamuxControlError(#[from] YamuxControlError),
}
Expand Down
3 changes: 3 additions & 0 deletions comms/core/src/connection_manager/manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -133,6 +133,8 @@ pub struct ConnectionManagerConfig {
pub auxiliary_tcp_listener_address: Option<Multiaddr>,
/// Peer validation configuration. See [PeerValidatorConfig]
pub peer_validation_config: PeerValidatorConfig,
/// Addresses that should never be dialed
pub excluded_dial_addresses: Vec<Multiaddr>,
}

impl Default for ConnectionManagerConfig {
Expand All @@ -154,6 +156,7 @@ impl Default for ConnectionManagerConfig {
auxiliary_tcp_listener_address: None,
peer_validation_config: PeerValidatorConfig::default(),
noise_handshake_recv_timeout: Duration::from_secs(6),
excluded_dial_addresses: vec![],
}
}
}
Expand Down
135 changes: 135 additions & 0 deletions comms/core/src/connection_manager/tests/listener_dialer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -256,3 +256,138 @@ async fn banned() {

timeout(Duration::from_secs(5), dialer_fut).await.unwrap().unwrap();
}

#[tokio::test]
async fn excluded_yes() {
let (event_tx, _event_rx) = mpsc::channel(10);
let mut shutdown = Shutdown::new();

let node_identity1 = build_node_identity(PeerFeatures::COMMUNICATION_NODE);
let noise_config1 = NoiseConfig::new(node_identity1.clone());
let expected_proto = ProtocolId::from_static(b"/tari/test-proto");
let supported_protocols = vec![expected_proto.clone()];
let peer_manager1 = build_peer_manager();
let mut listener = PeerListener::new(
Default::default(),
"/memory/0".parse().unwrap(),
MemoryTransport,
noise_config1.clone(),
event_tx.clone(),
peer_manager1.clone(),
node_identity1.clone(),
shutdown.to_signal(),
);
listener.set_supported_protocols(supported_protocols.clone());

// Get the listener address of the peer
let address = listener.listen().await.unwrap();

let node_identity2 = build_node_identity(PeerFeatures::COMMUNICATION_NODE);
let noise_config2 = NoiseConfig::new(node_identity2.clone());
let (request_tx, request_rx) = mpsc::channel(1);
let peer_manager2 = build_peer_manager();
let connection_manager_config = ConnectionManagerConfig {
excluded_dial_addresses: vec![address.clone()],
..Default::default()
};
let mut dialer = Dialer::new(
connection_manager_config,
node_identity2.clone(),
peer_manager2.clone(),
MemoryTransport,
noise_config2.clone(),
ConstantBackoff::new(Duration::from_millis(100)),
request_rx,
event_tx.clone(),
shutdown.to_signal(),
);
dialer.set_supported_protocols(supported_protocols.clone());

let dialer_fut = tokio::spawn(dialer.run());

let mut peer = node_identity1.to_peer();
peer.addresses = MultiaddressesWithStats::from_addresses_with_source(vec![address], &PeerAddressSource::Config);
peer.set_id_for_test(1);

let (reply_tx, reply_rx) = oneshot::channel();
request_tx
.send(DialerRequest::Dial(Box::new(peer), Some(reply_tx)))
.await
.unwrap();

// Check that the dial failed. We're checking that the dial attempt was never made.
let res = reply_rx.await.unwrap();
assert_eq!(
format!("{:?}", res),
format!("Err(AllPeerAddressesAreExcluded(\"{}\"))", node_identity1.node_id())
);

shutdown.trigger();
timeout(Duration::from_secs(5), dialer_fut).await.unwrap().unwrap();
}

#[tokio::test]
async fn excluded_no() {
let (event_tx, _event_rx) = mpsc::channel(10);
let mut shutdown = Shutdown::new();

let node_identity1 = build_node_identity(PeerFeatures::COMMUNICATION_NODE);
let noise_config1 = NoiseConfig::new(node_identity1.clone());
let expected_proto = ProtocolId::from_static(b"/tari/test-proto");
let supported_protocols = vec![expected_proto.clone()];
let peer_manager1 = build_peer_manager();
let mut listener = PeerListener::new(
Default::default(),
"/memory/0".parse().unwrap(),
MemoryTransport,
noise_config1.clone(),
event_tx.clone(),
peer_manager1.clone(),
node_identity1.clone(),
shutdown.to_signal(),
);
listener.set_supported_protocols(supported_protocols.clone());

// Get the listener address of the peer
let address = listener.listen().await.unwrap();

let node_identity2 = build_node_identity(PeerFeatures::COMMUNICATION_NODE);
let noise_config2 = NoiseConfig::new(node_identity2.clone());
let (request_tx, request_rx) = mpsc::channel(1);
let peer_manager2 = build_peer_manager();
let connection_manager_config = ConnectionManagerConfig {
excluded_dial_addresses: vec![],
..Default::default()
};
let mut dialer = Dialer::new(
connection_manager_config,
node_identity2.clone(),
peer_manager2.clone(),
MemoryTransport,
noise_config2.clone(),
ConstantBackoff::new(Duration::from_millis(100)),
request_rx,
event_tx.clone(),
shutdown.to_signal(),
);
dialer.set_supported_protocols(supported_protocols.clone());

let dialer_fut = tokio::spawn(dialer.run());

let mut peer = node_identity1.to_peer();
peer.addresses = MultiaddressesWithStats::from_addresses_with_source(vec![address], &PeerAddressSource::Config);
peer.set_id_for_test(1);

let (reply_tx, reply_rx) = oneshot::channel();
request_tx
.send(DialerRequest::Dial(Box::new(peer), Some(reply_tx)))
.await
.unwrap();

// Check that the dial failed. We're checking that the dial attempt was never made.
let res = reply_rx.await.unwrap();
assert!(res.is_ok());

shutdown.trigger();
timeout(Duration::from_secs(5), dialer_fut).await.unwrap().unwrap();
}
9 changes: 9 additions & 0 deletions comms/core/src/connectivity/manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -659,6 +659,15 @@ impl ConnectivityManagerActor {
let (node_id, mut new_status, connection) = match event {
PeerDisconnected(_, node_id, minimized) => (node_id, ConnectionStatus::Disconnected(*minimized), None),
PeerConnected(conn) => (conn.peer_node_id(), ConnectionStatus::Connected, Some(conn.clone())),
PeerConnectFailed(node_id, ConnectionManagerError::AllPeerAddressesAreExcluded(msg)) => {
debug!(
target: LOG_TARGET,
"Peer '{}' contains only excluded addresses ({})",
node_id,
msg
);
(node_id, ConnectionStatus::Failed, None)
},
PeerConnectFailed(node_id, ConnectionManagerError::NoiseHandshakeError(msg)) => {
if let Some(conn) = self.pool.get_connection(node_id) {
warn!(
Expand Down
11 changes: 11 additions & 0 deletions comms/core/src/peer_manager/manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -347,6 +347,17 @@ impl PeerManager {
Ok(peer.features)
}

pub async fn get_peer_multi_addresses(
&self,
node_id: &NodeId,
) -> Result<MultiaddressesWithStats, PeerManagerError> {
let peer = self
.find_by_node_id(node_id)
.await?
.ok_or(PeerManagerError::PeerNotFoundError)?;
Ok(peer.addresses)
}

/// This will store metadata inside of the metadata field in the peer provided by the nodeID.
/// It will return None if the value was empty and the old value if the value was updated
pub async fn set_peer_metadata(
Expand Down
Loading

0 comments on commit e113a0e

Please sign in to comment.