Skip to content

Commit

Permalink
feat: close rpc connections when not in use - without close on dial r…
Browse files Browse the repository at this point in the history
…equests (#6649)

Description
---
Added the ability to close RPC connections for a given peer:
- The RPC server can request sessions to be dropped.
- RPC session counts on the server are atomically managed with RPC
server sessions.
- When a client peer connection (final clone) drops, as a safety
precaution, all client RPC connections will be signalled to drop,
resulting in the server RPC sessions closing.
- Improved management of wallet connectivity to base nodes.
- **Edit:** Introduced an optional culling of the oldest client RPC
connection(s) into an RPC server for a given peer (_default: true_).
This will allow all new RPC connections to succeed. If older RPC
sessions are dropped by the server while the client is in use, the
client can re-request an RPC connection, but many times the clients are
not aware of the RPC sessions held on the server.

Motivation and Context
---
Wallets keep on hitting their maximum allowed RPC sessions in the
server.

How Has This Been Tested?
---
- Added new unit tests.
- System-level testing.

What process can a PR reviewer use to test or verify this change?
---
Code review

<!-- Checklist -->
<!-- 1. Is the title of your PR in the form that would make nice release
notes? The title, excluding the conventional commit
tag, will be included exactly as is in the CHANGELOG, so please think
about it carefully. -->


Breaking Changes
---

- [x] None
- [ ] Requires data directory on base node to be deleted
- [ ] Requires hard fork
- [ ] Other - Please specify

<!-- Does this include a breaking change? If so, include this line as a
footer -->
<!-- BREAKING CHANGE: Description what the user should do, e.g. delete a
database, resync the chain -->

---------

Co-authored-by: Stan Bondi <sdbondi@users.noreply.github.com>
  • Loading branch information
hansieodendaal and sdbondi authored Oct 28, 2024
1 parent c9213b5 commit 20e70fa
Show file tree
Hide file tree
Showing 28 changed files with 850 additions and 149 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -478,7 +478,7 @@ mod test {
}
println!("{}: {:?}", i, entry);
}
assert_eq!(ordered_entries.len(), 2);
assert!(ordered_entries.len() <= 2);
}

#[tokio::test]
Expand Down
1 change: 1 addition & 0 deletions applications/minotari_node/src/bootstrap.rs
Original file line number Diff line number Diff line change
Expand Up @@ -230,6 +230,7 @@ where B: BlockchainBackend + 'static
let rpc_server = RpcServer::builder()
.with_maximum_simultaneous_sessions(config.rpc_max_simultaneous_sessions)
.with_maximum_sessions_per_client(config.rpc_max_sessions_per_peer)
.with_cull_oldest_peer_rpc_connection_on_full(config.cull_oldest_peer_rpc_connection_on_full)
.finish();

// Add your RPC services here ‍🏴‍☠️️☮️🌊
Expand Down
1 change: 1 addition & 0 deletions base_layer/contacts/tests/contacts_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,7 @@ pub fn setup_contacts_service<T: ContactsBackend + 'static>(
rpc_max_simultaneous_sessions: 0,
rpc_max_sessions_per_peer: 0,
listener_self_liveness_check_interval: None,
cull_oldest_peer_rpc_connection_on_full: true,
};
let peer_message_subscription_factory = Arc::new(subscription_factory);
let shutdown = Shutdown::new();
Expand Down
2 changes: 1 addition & 1 deletion base_layer/core/src/consensus/consensus_constants.rs
Original file line number Diff line number Diff line change
Expand Up @@ -656,7 +656,7 @@ impl ConsensusConstants {

let consensus_constants = vec![con_1, con_2];
#[cfg(any(test, debug_assertions))]
assert_hybrid_pow_constants(&consensus_constants, &[120], &[50], &[50]);
assert_hybrid_pow_constants(&consensus_constants, &[120, 120], &[50, 50], &[50, 50]);
consensus_constants
}

Expand Down
1 change: 1 addition & 0 deletions base_layer/core/tests/helpers/nodes.rs
Original file line number Diff line number Diff line change
Expand Up @@ -383,6 +383,7 @@ async fn setup_base_node_services(
let rpc_server = RpcServer::builder()
.with_maximum_simultaneous_sessions(p2p_config.rpc_max_simultaneous_sessions)
.with_maximum_sessions_per_client(p2p_config.rpc_max_sessions_per_peer)
.with_cull_oldest_peer_rpc_connection_on_full(p2p_config.cull_oldest_peer_rpc_connection_on_full)
.finish();
let rpc_server = rpc_server.add_service(base_node::create_base_node_sync_rpc_service(
blockchain_db.clone().into(),
Expand Down
5 changes: 5 additions & 0 deletions base_layer/p2p/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -139,6 +139,10 @@ pub struct P2pConfig {
/// The maximum allowed RPC sessions per peer.
/// Default: 10
pub rpc_max_sessions_per_peer: usize,
/// If true, and the maximum per peer RPC sessions is reached, the RPC server will close an old session and replace
/// it with a new session. If false, the RPC server will reject the new session and preserve the older session.
/// (default value = true).
pub cull_oldest_peer_rpc_connection_on_full: bool,
}

impl Default for P2pConfig {
Expand All @@ -163,6 +167,7 @@ impl Default for P2pConfig {
auxiliary_tcp_listener_address: None,
rpc_max_simultaneous_sessions: 100,
rpc_max_sessions_per_peer: 10,
cull_oldest_peer_rpc_connection_on_full: true,
}
}
}
Expand Down
3 changes: 3 additions & 0 deletions base_layer/wallet/src/base_node_service/monitor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,9 @@ where
latency: None,
})
.await;
if let Some(node_id) = self.wallet_connectivity.get_current_base_node_peer_node_id() {
self.wallet_connectivity.disconnect_base_node(node_id).await;
}
continue;
},
Err(e @ BaseNodeMonitorError::InvalidBaseNodeResponse(_)) |
Expand Down
8 changes: 8 additions & 0 deletions base_layer/wallet/src/connectivity_service/handle.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ use crate::{
pub enum WalletConnectivityRequest {
ObtainBaseNodeWalletRpcClient(oneshot::Sender<RpcClientLease<BaseNodeWalletRpcClient>>),
ObtainBaseNodeSyncRpcClient(oneshot::Sender<RpcClientLease<BaseNodeSyncRpcClient>>),
DisconnectBaseNode(NodeId),
}

#[derive(Clone)]
Expand Down Expand Up @@ -118,6 +119,13 @@ impl WalletConnectivityInterface for WalletConnectivityHandle {
reply_rx.await.ok()
}

async fn disconnect_base_node(&mut self, node_id: NodeId) {
let _unused = self
.sender
.send(WalletConnectivityRequest::DisconnectBaseNode(node_id))
.await;
}

fn get_connectivity_status(&mut self) -> OnlineStatus {
*self.online_status_rx.borrow()
}
Expand Down
2 changes: 2 additions & 0 deletions base_layer/wallet/src/connectivity_service/interface.rs
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,8 @@ pub trait WalletConnectivityInterface: Clone + Send + Sync + 'static {
/// BaseNodeSyncRpcClient RPC session.
async fn obtain_base_node_sync_rpc_client(&mut self) -> Option<RpcClientLease<BaseNodeSyncRpcClient>>;

async fn disconnect_base_node(&mut self, node_id: NodeId);

fn get_connectivity_status(&mut self) -> OnlineStatus;

fn get_connectivity_status_watch(&self) -> watch::Receiver<OnlineStatus>;
Expand Down
4 changes: 4 additions & 0 deletions base_layer/wallet/src/connectivity_service/mock.rs
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,10 @@ impl WalletConnectivityInterface for WalletConnectivityMock {
borrow.as_ref().cloned()
}

async fn disconnect_base_node(&mut self, _node_id: NodeId) {
self.send_shutdown();
}

fn get_connectivity_status(&mut self) -> OnlineStatus {
*self.online_status_watch.borrow()
}
Expand Down
90 changes: 49 additions & 41 deletions base_layer/wallet/src/connectivity_service/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,6 @@ use crate::{

const LOG_TARGET: &str = "wallet::connectivity";
pub(crate) const CONNECTIVITY_WAIT: u64 = 5;
pub(crate) const COOL_OFF_PERIOD: u64 = 60;

/// Connection status of the Base Node
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
Expand Down Expand Up @@ -161,14 +160,21 @@ impl WalletConnectivityService {
}

async fn handle_request(&mut self, request: WalletConnectivityRequest) {
use WalletConnectivityRequest::{ObtainBaseNodeSyncRpcClient, ObtainBaseNodeWalletRpcClient};
use WalletConnectivityRequest::{
DisconnectBaseNode,
ObtainBaseNodeSyncRpcClient,
ObtainBaseNodeWalletRpcClient,
};
match request {
ObtainBaseNodeWalletRpcClient(reply) => {
self.handle_pool_request(reply.into()).await;
},
ObtainBaseNodeSyncRpcClient(reply) => {
self.handle_pool_request(reply.into()).await;
},
DisconnectBaseNode(node_id) => {
self.disconnect_base_node(node_id).await;
},
}
}

Expand All @@ -195,18 +201,17 @@ impl WalletConnectivityService {
match self.pools.get(&node_id) {
Some(pools) => match pools.base_node_wallet_rpc_client.get().await {
Ok(client) => {
debug!(target: LOG_TARGET, "Obtained pool RPC 'wallet' connection to base node '{}'", node_id);
let _result = reply.send(client);
},
Err(e) => {
warn!(
target: LOG_TARGET,
"Base node '{}' wallet RPC pool connection failed ({}). Reconnecting...",
"Base node '{}' pool RPC 'wallet' connection failed ({}). Reconnecting...",
node_id,
e
);
if let Some(node_id) = self.current_base_node() {
self.disconnect_base_node(node_id).await;
};
self.disconnect_base_node(node_id).await;
self.pending_requests.push(reply.into());
},
},
Expand Down Expand Up @@ -237,18 +242,17 @@ impl WalletConnectivityService {
match self.pools.get(&node_id) {
Some(pools) => match pools.base_node_sync_rpc_client.get().await {
Ok(client) => {
debug!(target: LOG_TARGET, "Obtained pool RPC 'sync' connection to base node '{}'", node_id);
let _result = reply.send(client);
},
Err(e) => {
warn!(
target: LOG_TARGET,
"Base node '{}' sync RPC pool connection failed ({}). Reconnecting...",
"Base node '{}' pool RPC 'sync' connection failed ({}). Reconnecting...",
node_id,
e
);
if let Some(node_id) = self.current_base_node() {
self.disconnect_base_node(node_id).await;
};
self.disconnect_base_node(node_id).await;
self.pending_requests.push(reply.into());
},
},
Expand Down Expand Up @@ -282,6 +286,8 @@ impl WalletConnectivityService {
Err(e) => error!(target: LOG_TARGET, "Failed to disconnect base node: {}", e),
}
self.pools.remove(&node_id);
// We want to ensure any active RPC clients are dropped when this connection (a clone) is dropped
connection.set_force_disconnect_rpc_clients_when_clone_drops();
};
}

Expand All @@ -292,16 +298,17 @@ impl WalletConnectivityService {
return;
};
loop {
let node_id = if let Some(time) = peer_manager.time_since_last_connection_attempt() {
if time < Duration::from_secs(COOL_OFF_PERIOD) {
if peer_manager.get_current_peer().node_id == peer_manager.get_next_peer().node_id {
// If we only have one peer in the list, wait a bit before retrying
time::sleep(Duration::from_secs(CONNECTIVITY_WAIT)).await;
}
peer_manager.get_current_peer().node_id
} else {
peer_manager.get_current_peer().node_id
let node_id = if let Some(_time) = peer_manager.time_since_last_connection_attempt() {
if peer_manager.get_current_peer().node_id == peer_manager.get_next_peer().node_id {
// If we only have one peer in the list, wait a bit before retrying
debug!(target: LOG_TARGET,
"Retrying after {}s ...",
Duration::from_secs(CONNECTIVITY_WAIT).as_secs()
);
time::sleep(Duration::from_secs(CONNECTIVITY_WAIT)).await;
}
// If 'peer_manager.get_next_peer()' is called, 'current_peer' is advanced to the next peer
peer_manager.get_current_peer().node_id
} else {
peer_manager.get_current_peer().node_id
};
Expand All @@ -325,36 +332,29 @@ impl WalletConnectivityService {
break;
}
self.base_node_watch.send(Some(peer_manager.clone()));
if let Err(e) = self.notify_pending_requests().await {
warn!(target: LOG_TARGET, "Error notifying pending RPC requests: {}", e);
if let Ok(true) = self.notify_pending_requests().await {
self.set_online_status(OnlineStatus::Online);
debug!(
target: LOG_TARGET,
"Wallet is ONLINE and connected to base node '{}'", node_id
);
}
self.set_online_status(OnlineStatus::Online);
debug!(
target: LOG_TARGET,
"Wallet is ONLINE and connected to base node '{}'", node_id
);
break;
},
Ok(false) => {
debug!(
target: LOG_TARGET,
"The peer has changed while connecting. Attempting to connect to new base node."
);
self.disconnect_base_node(node_id).await;
},
Err(WalletConnectivityError::ConnectivityError(ConnectivityError::DialCancelled)) => {
debug!(
target: LOG_TARGET,
"Dial was cancelled. Retrying after {}s ...",
Duration::from_secs(CONNECTIVITY_WAIT).as_secs()
);
time::sleep(Duration::from_secs(CONNECTIVITY_WAIT)).await;
debug!(target: LOG_TARGET, "Dial was cancelled.");
self.disconnect_base_node(node_id).await;
},
Err(e) => {
warn!(target: LOG_TARGET, "{}", e);
if self.current_base_node().as_ref() == Some(&node_id) {
self.disconnect_base_node(node_id).await;
time::sleep(Duration::from_secs(CONNECTIVITY_WAIT)).await;
}
self.disconnect_base_node(node_id).await;
},
}
if self.peer_list_change_detected(&peer_manager) {
Expand Down Expand Up @@ -401,15 +401,15 @@ impl WalletConnectivityService {
};
debug!(
target: LOG_TARGET,
"Successfully established peer connection to base node '{}'",
"Established peer connection to base node '{}'",
conn.peer_node_id()
);
self.pools.insert(peer_node_id.clone(), ClientPoolContainer {
base_node_sync_rpc_client: conn.create_rpc_client_pool(1, Default::default()),
base_node_wallet_rpc_client: conn
.create_rpc_client_pool(self.config.base_node_rpc_pool_size, Default::default()),
});
debug!(target: LOG_TARGET, "Successfully established RPC connection to base node '{}'", peer_node_id);
trace!(target: LOG_TARGET, "Created RPC pools for '{}'", peer_node_id);
Ok(true)
}

Expand All @@ -426,16 +426,24 @@ impl WalletConnectivityService {
}
}

async fn notify_pending_requests(&mut self) -> Result<(), WalletConnectivityError> {
async fn notify_pending_requests(&mut self) -> Result<bool, WalletConnectivityError> {
let current_pending = mem::take(&mut self.pending_requests);
let mut count = 0;
let current_pending_len = current_pending.len();
for reply in current_pending {
if reply.is_canceled() {
continue;
}

count += 1;
trace!(target: LOG_TARGET, "Handle {} of {} pending RPC pool requests", count, current_pending_len);
self.handle_pool_request(reply).await;
}
Ok(())
if self.pending_requests.is_empty() {
Ok(true)
} else {
warn!(target: LOG_TARGET, "{} of {} pending RPC pool requests not handled", count, current_pending_len);
Ok(false)
}
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -179,7 +179,7 @@ where
Ok(())
}

async fn connect_to_peer(&mut self, peer: NodeId) -> Result<PeerConnection, UtxoScannerError> {
async fn new_connection_to_peer(&mut self, peer: NodeId) -> Result<PeerConnection, UtxoScannerError> {
debug!(
target: LOG_TARGET,
"Attempting UTXO sync with seed peer {} ({})", self.peer_index, peer,
Expand Down Expand Up @@ -333,7 +333,7 @@ where
&mut self,
peer: &NodeId,
) -> Result<RpcClientLease<BaseNodeWalletRpcClient>, UtxoScannerError> {
let mut connection = self.connect_to_peer(peer.clone()).await?;
let mut connection = self.new_connection_to_peer(peer.clone()).await?;
let client = connection
.connect_rpc_using_builder(BaseNodeWalletRpcClient::builder().with_deadline(Duration::from_secs(60)))
.await?;
Expand Down
17 changes: 14 additions & 3 deletions base_layer/wallet/src/wallet.rs
Original file line number Diff line number Diff line change
Expand Up @@ -412,14 +412,12 @@ where
peer_manager.add_peer(current_peer.clone()).await?;
}
}
connectivity
.add_peer_to_allow_list(current_peer.node_id.clone())
.await?;
let mut peer_list = vec![current_peer];
if let Some(pos) = backup_peers.iter().position(|p| p.public_key == public_key) {
backup_peers.remove(pos);
}
peer_list.append(&mut backup_peers);
self.update_allow_list(&peer_list).await?;
self.wallet_connectivity
.set_base_node(BaseNodePeerManager::new(0, peer_list)?);
} else {
Expand Down Expand Up @@ -451,13 +449,26 @@ where
backup_peers.remove(pos);
}
peer_list.append(&mut backup_peers);
self.update_allow_list(&peer_list).await?;
self.wallet_connectivity
.set_base_node(BaseNodePeerManager::new(0, peer_list)?);
}

Ok(())
}

async fn update_allow_list(&mut self, peer_list: &[Peer]) -> Result<(), WalletError> {
let mut connectivity = self.comms.connectivity();
let current_allow_list = connectivity.get_allow_list().await?;
for peer in &current_allow_list {
connectivity.remove_peer_from_allow_list(peer.clone()).await?;
}
for peer in peer_list {
connectivity.add_peer_to_allow_list(peer.node_id.clone()).await?;
}
Ok(())
}

pub async fn get_base_node_peer(&mut self) -> Option<Peer> {
self.wallet_connectivity.get_current_base_node_peer()
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1927,6 +1927,7 @@ async fn test_txo_validation() {
#[tokio::test]
#[allow(clippy::too_many_lines)]
async fn test_txo_revalidation() {
// env_logger::init(); // Set `$env:RUST_LOG = "trace"`
let (connection, _tempdir) = get_temp_sqlite_database_connection();
let backend = OutputManagerSqliteDatabase::new(connection.clone());

Expand Down
1 change: 1 addition & 0 deletions base_layer/wallet_ffi/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5452,6 +5452,7 @@ pub unsafe extern "C" fn comms_config_create(
rpc_max_simultaneous_sessions: 0,
rpc_max_sessions_per_peer: 0,
listener_self_liveness_check_interval: None,
cull_oldest_peer_rpc_connection_on_full: true,
};

Box::into_raw(Box::new(config))
Expand Down
Loading

0 comments on commit 20e70fa

Please sign in to comment.