From dc0842336b121be599c2e6e08dcf330c47bade37 Mon Sep 17 00:00:00 2001 From: Stanimal Date: Sun, 22 Aug 2021 07:58:25 +0400 Subject: [PATCH] fix: edge-case fixes for wallet peer switching in console wallet - set peer using the watch to allow the connectivity service to immediately be aware of the new peer - aborted the dial early if necessary, should the user set a different peer - slightly reduce busy-ness of the wallet monitor by monitoring for less comms connectivity events - monitor for wallet connectivity peer status changes to improve the responsiveness of the status ui update. --- .../src/ui/state/app_state.rs | 5 ++ .../src/ui/state/wallet_event_monitor.rs | 11 +++-- .../wallet/src/base_node_service/monitor.rs | 2 +- .../wallet/src/connectivity_service/handle.rs | 19 ++++---- .../src/connectivity_service/initializer.rs | 2 +- .../src/connectivity_service/service.rs | 47 ++++++++++++++----- .../wallet/src/connectivity_service/test.rs | 6 +-- .../wallet/src/connectivity_service/watch.rs | 2 +- common/src/configuration/global.rs | 18 ++++--- common/src/configuration/utils.rs | 3 +- comms/src/protocol/rpc/server/mod.rs | 17 ++++--- 11 files changed, 83 insertions(+), 49 deletions(-) diff --git a/applications/tari_console_wallet/src/ui/state/app_state.rs b/applications/tari_console_wallet/src/ui/state/app_state.rs index 5918928ced..6d9293ef47 100644 --- a/applications/tari_console_wallet/src/ui/state/app_state.rs +++ b/applications/tari_console_wallet/src/ui/state/app_state.rs @@ -58,6 +58,7 @@ use tari_crypto::{ristretto::RistrettoPublicKey, tari_utilities::hex::Hex}; use tari_shutdown::ShutdownSignal; use tari_wallet::{ base_node_service::{handle::BaseNodeEventReceiver, service::BaseNodeState}, + connectivity_service::WalletConnectivityHandle, contacts_service::storage::database::Contact, output_manager_service::{handle::OutputManagerEventReceiver, service::Balance, TxId, TxoValidationType}, transaction_service::{ @@ -652,6 +653,10 @@ impl AppStateInner { self.wallet.comms.connectivity().get_event_subscription().fuse() } + pub fn get_wallet_connectivity(&self) -> WalletConnectivityHandle { + self.wallet.wallet_connectivity.clone() + } + pub fn get_base_node_event_stream(&self) -> Fuse { self.wallet.base_node_service.clone().get_event_stream_fused() } diff --git a/applications/tari_console_wallet/src/ui/state/wallet_event_monitor.rs b/applications/tari_console_wallet/src/ui/state/wallet_event_monitor.rs index b656372fcc..2e20999667 100644 --- a/applications/tari_console_wallet/src/ui/state/wallet_event_monitor.rs +++ b/applications/tari_console_wallet/src/ui/state/wallet_event_monitor.rs @@ -54,6 +54,8 @@ impl WalletEventMonitor { .get_output_manager_service_event_stream(); let mut connectivity_events = self.app_state_inner.read().await.get_connectivity_event_stream(); + let wallet_connectivity = self.app_state_inner.read().await.get_wallet_connectivity(); + let mut connectivity_status = wallet_connectivity.get_connectivity_status_watch().fuse(); let mut base_node_events = self.app_state_inner.read().await.get_base_node_event_stream(); @@ -105,6 +107,10 @@ impl WalletEventMonitor { Err(_) => debug!(target: LOG_TARGET, "Lagging read on Transaction Service event broadcast channel"), } }, + status = connectivity_status.select_next_some() => { + trace!(target: LOG_TARGET, "Wallet Event Monitor received wallet connectivity status {:?}", status); + self.trigger_peer_state_refresh().await; + }, result = connectivity_events.select_next_some() => { match result { Ok(msg) => { @@ -112,10 +118,7 @@ impl WalletEventMonitor { match &*msg { ConnectivityEvent::PeerDisconnected(_) | ConnectivityEvent::ManagedPeerDisconnected(_) | - ConnectivityEvent::PeerConnected(_) | - ConnectivityEvent::PeerBanned(_) | - ConnectivityEvent::PeerOffline(_) | - ConnectivityEvent::PeerConnectionWillClose(_, _) => { + ConnectivityEvent::PeerConnected(_) => { self.trigger_peer_state_refresh().await; }, // Only the above variants trigger state refresh diff --git a/base_layer/wallet/src/base_node_service/monitor.rs b/base_layer/wallet/src/base_node_service/monitor.rs index a1005bb6c7..86f17eb227 100644 --- a/base_layer/wallet/src/base_node_service/monitor.rs +++ b/base_layer/wallet/src/base_node_service/monitor.rs @@ -97,7 +97,7 @@ impl BaseNodeMonitor { } async fn update_connectivity_status(&self) -> NodeId { - let mut watcher = self.wallet_connectivity.get_connectivity_status_watcher(); + let mut watcher = self.wallet_connectivity.get_connectivity_status_watch(); loop { use OnlineStatus::*; match watcher.recv().await.unwrap_or(Offline) { diff --git a/base_layer/wallet/src/connectivity_service/handle.rs b/base_layer/wallet/src/connectivity_service/handle.rs index a43fea5333..ac218edc5e 100644 --- a/base_layer/wallet/src/connectivity_service/handle.rs +++ b/base_layer/wallet/src/connectivity_service/handle.rs @@ -21,7 +21,7 @@ // USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. use super::service::OnlineStatus; -use crate::connectivity_service::error::WalletConnectivityError; +use crate::connectivity_service::{error::WalletConnectivityError, watch::Watch}; use futures::{ channel::{mpsc, oneshot}, SinkExt, @@ -36,33 +36,30 @@ use tokio::sync::watch; pub enum WalletConnectivityRequest { ObtainBaseNodeWalletRpcClient(oneshot::Sender>), ObtainBaseNodeSyncRpcClient(oneshot::Sender>), - SetBaseNode(Box), } #[derive(Clone)] pub struct WalletConnectivityHandle { sender: mpsc::Sender, - base_node_watch_rx: watch::Receiver>, + base_node_watch: Watch>, online_status_rx: watch::Receiver, } impl WalletConnectivityHandle { pub(super) fn new( sender: mpsc::Sender, - base_node_watch_rx: watch::Receiver>, + base_node_watch: Watch>, online_status_rx: watch::Receiver, ) -> Self { Self { sender, - base_node_watch_rx, + base_node_watch, online_status_rx, } } pub async fn set_base_node(&mut self, base_node_peer: Peer) -> Result<(), WalletConnectivityError> { - self.sender - .send(WalletConnectivityRequest::SetBaseNode(Box::new(base_node_peer))) - .await?; + self.base_node_watch.broadcast(Some(base_node_peer)); Ok(()) } @@ -109,15 +106,15 @@ impl WalletConnectivityHandle { self.online_status_rx.recv().await.unwrap_or(OnlineStatus::Offline) } - pub fn get_connectivity_status_watcher(&self) -> watch::Receiver { + pub fn get_connectivity_status_watch(&self) -> watch::Receiver { self.online_status_rx.clone() } pub fn get_current_base_node_peer(&self) -> Option { - self.base_node_watch_rx.borrow().clone() + self.base_node_watch.borrow().clone() } pub fn get_current_base_node_id(&self) -> Option { - self.base_node_watch_rx.borrow().as_ref().map(|p| p.node_id.clone()) + self.base_node_watch.borrow().as_ref().map(|p| p.node_id.clone()) } } diff --git a/base_layer/wallet/src/connectivity_service/initializer.rs b/base_layer/wallet/src/connectivity_service/initializer.rs index e39206630f..d0c2b94126 100644 --- a/base_layer/wallet/src/connectivity_service/initializer.rs +++ b/base_layer/wallet/src/connectivity_service/initializer.rs @@ -51,7 +51,7 @@ impl ServiceInitializer for WalletConnectivityInitializer { let online_status_watch = Watch::new(OnlineStatus::Offline); context.register_handle(WalletConnectivityHandle::new( sender, - base_node_watch.get_receiver(), + base_node_watch.clone(), online_status_watch.get_receiver(), )); diff --git a/base_layer/wallet/src/connectivity_service/service.rs b/base_layer/wallet/src/connectivity_service/service.rs index 373dd069a1..c0cf474b96 100644 --- a/base_layer/wallet/src/connectivity_service/service.rs +++ b/base_layer/wallet/src/connectivity_service/service.rs @@ -27,6 +27,8 @@ use crate::{ use core::mem; use futures::{ channel::{mpsc, oneshot}, + future, + future::Either, stream::Fuse, StreamExt, }; @@ -35,6 +37,7 @@ use tari_comms::{ connectivity::ConnectivityRequester, peer_manager::{NodeId, Peer}, protocol::rpc::{RpcClientLease, RpcClientPool}, + PeerConnection, }; use tari_core::base_node::{rpc::BaseNodeWalletRpcClient, sync::rpc::BaseNodeSyncRpcClient}; use tokio::time; @@ -110,10 +113,6 @@ impl WalletConnectivityService { ObtainBaseNodeSyncRpcClient(reply) => { self.handle_pool_request(reply.into()).await; }, - - SetBaseNode(peer) => { - self.set_base_node_peer(*peer); - }, } } @@ -202,11 +201,15 @@ impl WalletConnectivityService { self.base_node_watch.broadcast(Some(peer)); } + fn current_base_node(&self) -> Option { + self.base_node_watch.borrow().as_ref().map(|p| p.node_id.clone()) + } + async fn setup_base_node_connection(&mut self) { self.pools = None; loop { - let node_id = match self.base_node_watch.borrow().as_ref() { - Some(p) => p.node_id.clone(), + let node_id = match self.current_base_node() { + Some(n) => n, None => return, }; debug!( @@ -215,7 +218,7 @@ impl WalletConnectivityService { ); self.set_online_status(OnlineStatus::Connecting); match self.try_setup_rpc_pool(node_id.clone()).await { - Ok(_) => { + Ok(true) => { self.set_online_status(OnlineStatus::Online); debug!( target: LOG_TARGET, @@ -223,8 +226,16 @@ impl WalletConnectivityService { ); break; }, + Ok(false) => { + // Retry with updated peer + continue; + }, Err(e) => { - self.set_online_status(OnlineStatus::Offline); + if self.current_base_node() != Some(node_id) { + self.set_online_status(OnlineStatus::Connecting); + } else { + self.set_online_status(OnlineStatus::Offline); + } error!(target: LOG_TARGET, "{}", e); time::delay_for(self.config.base_node_monitor_refresh_interval).await; continue; @@ -237,9 +248,12 @@ impl WalletConnectivityService { let _ = self.online_status_watch.broadcast(status); } - async fn try_setup_rpc_pool(&mut self, peer: NodeId) -> Result<(), WalletConnectivityError> { + async fn try_setup_rpc_pool(&mut self, peer: NodeId) -> Result { self.connectivity.add_managed_peers(vec![peer.clone()]).await?; - let conn = self.connectivity.dial_peer(peer).await?; + let conn = match self.try_dial_peer(peer).await? { + Some(peer) => peer, + None => return Ok(false), + }; debug!( target: LOG_TARGET, "Successfully established peer connection to base node {}", @@ -257,7 +271,18 @@ impl WalletConnectivityService { "Successfully established RPC connection {}", conn.peer_node_id() ); - Ok(()) + Ok(true) + } + + async fn try_dial_peer(&mut self, peer: NodeId) -> Result, WalletConnectivityError> { + let recv_fut = self.base_node_watch.recv(); + futures::pin_mut!(recv_fut); + let dial_fut = self.connectivity.dial_peer(peer); + futures::pin_mut!(dial_fut); + match future::select(recv_fut, dial_fut).await { + Either::Left(_) => Ok(None), + Either::Right((conn, _)) => Ok(Some(conn?)), + } } async fn notify_pending_requests(&mut self) -> Result<(), WalletConnectivityError> { diff --git a/base_layer/wallet/src/connectivity_service/test.rs b/base_layer/wallet/src/connectivity_service/test.rs index a2092453e9..7c24ef5b46 100644 --- a/base_layer/wallet/src/connectivity_service/test.rs +++ b/base_layer/wallet/src/connectivity_service/test.rs @@ -50,7 +50,7 @@ async fn setup() -> ( let (tx, rx) = mpsc::channel(1); let base_node_watch = Watch::new(None); let online_status_watch = Watch::new(OnlineStatus::Offline); - let handle = WalletConnectivityHandle::new(tx, base_node_watch.get_receiver(), online_status_watch.get_receiver()); + let handle = WalletConnectivityHandle::new(tx, base_node_watch.clone(), online_status_watch.get_receiver()); let (connectivity, mock) = create_connectivity_mock(); let mock_state = mock.spawn(); // let peer_manager = create_peer_manager(tempdir().unwrap()); @@ -138,7 +138,7 @@ async fn it_changes_to_a_new_base_node() { mock_state.await_call_count(2).await; mock_state.expect_dial_peer(base_node_peer1.node_id()).await; - assert_eq!(mock_state.count_calls_containing("AddManagedPeer").await, 1); + assert_eq!(mock_state.count_calls_containing("AddManagedPeer").await, 2); let _ = mock_state.take_calls().await; let rpc_client = handle.obtain_base_node_wallet_rpc_client().await.unwrap(); @@ -149,7 +149,7 @@ async fn it_changes_to_a_new_base_node() { mock_state.await_call_count(2).await; mock_state.expect_dial_peer(base_node_peer2.node_id()).await; - assert_eq!(mock_state.count_calls_containing("AddManagedPeer").await, 1); + assert_eq!(mock_state.count_calls_containing("AddManagedPeer").await, 2); let rpc_client = handle.obtain_base_node_wallet_rpc_client().await.unwrap(); assert!(rpc_client.is_connected()); diff --git a/base_layer/wallet/src/connectivity_service/watch.rs b/base_layer/wallet/src/connectivity_service/watch.rs index 056e8bfe8d..1f1e868d47 100644 --- a/base_layer/wallet/src/connectivity_service/watch.rs +++ b/base_layer/wallet/src/connectivity_service/watch.rs @@ -37,7 +37,7 @@ impl Watch { self.receiver_mut().recv().await } - pub fn borrow(&mut self) -> watch::Ref<'_, T> { + pub fn borrow(&self) -> watch::Ref<'_, T> { self.receiver().borrow() } diff --git a/common/src/configuration/global.rs b/common/src/configuration/global.rs index 7d1b8c7f98..88ff5586be 100644 --- a/common/src/configuration/global.rs +++ b/common/src/configuration/global.rs @@ -518,18 +518,16 @@ fn convert_node_config( let console_wallet_notify_file = optional(cfg.get_str(key))?.map(PathBuf::from); let key = "wallet.base_node_service_refresh_interval"; - let wallet_base_node_service_refresh_interval = match cfg.get_int(key) { - Ok(seconds) => seconds as u64, - Err(ConfigError::NotFound(_)) => 10, - Err(e) => return Err(ConfigurationError::new(&key, &e.to_string())), - }; + let wallet_base_node_service_refresh_interval = cfg + .get_int(key) + .map(|seconds| seconds as u64) + .map_err(|e| ConfigurationError::new(&key, &e.to_string()))?; let key = "wallet.base_node_service_request_max_age"; - let wallet_base_node_service_request_max_age = match cfg.get_int(key) { - Ok(seconds) => seconds as u64, - Err(ConfigError::NotFound(_)) => 60, - Err(e) => return Err(ConfigurationError::new(&key, &e.to_string())), - }; + let wallet_base_node_service_request_max_age = cfg + .get_int(key) + .map(|seconds| seconds as u64) + .map_err(|e| ConfigurationError::new(&key, &e.to_string()))?; let key = "common.liveness_max_sessions"; let liveness_max_sessions = cfg diff --git a/common/src/configuration/utils.rs b/common/src/configuration/utils.rs index edc65e5967..1f291cf0ce 100644 --- a/common/src/configuration/utils.rs +++ b/common/src/configuration/utils.rs @@ -98,7 +98,8 @@ pub fn default_config(bootstrap: &ConfigBootstrap) -> Config { ) .unwrap(); cfg.set_default("wallet.base_node_query_timeout", 60).unwrap(); - // 60 sec * 60 minutes * 12 hours. + cfg.set_default("wallet.base_node_service_refresh_interval", 5).unwrap(); + cfg.set_default("wallet.base_node_service_request_max_age", 60).unwrap(); cfg.set_default("wallet.scan_for_utxo_interval", 60 * 60 * 12).unwrap(); cfg.set_default("wallet.transaction_broadcast_monitoring_timeout", 60) .unwrap(); diff --git a/comms/src/protocol/rpc/server/mod.rs b/comms/src/protocol/rpc/server/mod.rs index 5d775a5314..7615b1497e 100644 --- a/comms/src/protocol/rpc/server/mod.rs +++ b/comms/src/protocol/rpc/server/mod.rs @@ -464,14 +464,14 @@ where return Ok(()); } - debug!( - target: LOG_TARGET, - "[Peer=`{}`] Got request {}", self.node_id, decoded_msg - ); - let msg_flags = RpcMessageFlags::from_bits_truncate(decoded_msg.flags as u8); if msg_flags.contains(RpcMessageFlags::ACK) { - debug!(target: LOG_TARGET, "[Peer=`{}`] ACK.", self.node_id); + debug!( + target: LOG_TARGET, + "[Peer=`{}` {}] sending ACK response.", + self.node_id, + self.protocol_name() + ); let ack = proto::rpc::RpcResponse { request_id, status: RpcStatus::ok().as_code(), @@ -482,6 +482,11 @@ where return Ok(()); } + debug!( + target: LOG_TARGET, + "[Peer=`{}`] Got request {}", self.node_id, decoded_msg + ); + let req = Request::with_context( self.create_request_context(request_id), method,