Skip to content

Commit

Permalink
fix: bug in wallet base node peer switching (#3217)
Browse files Browse the repository at this point in the history
<!--- Provide a general summary of your changes in the Title above -->

## Description
<!--- Describe your changes in detail -->
- Connectivity retries connecting indefinitely, however previously if
  this continuously fails and the user updates their peer, the new peer will not be
  read until the previous peer was connected to. This PR fixes this.
- Add protocl information to comms RPC logs
- Cleanup peer state, making the wallet connectivity service the source
  of truth for the base node peer
- Adds an `Ack` flag to RPC protocol, this is not currently used but
  could be implemented in the client side in future if required without
  breaking the network (server supports it, we may choose in future to implement a client keep alive should it be needed,
 current server side impl easy and zero cost).

## Motivation and Context
<!--- Why is this change required? What problem does it solve? -->
<!--- If it fixes an open issue, please link to the issue here. -->
Critical bug fix 

## How Has This Been Tested?
<!--- Please describe in detail how you tested your changes. -->
<!--- Include details of your testing environment, and the tests you ran to -->
<!--- see how your change affects other areas of the code, etc. -->
Switching the console wallet base node peer a number of times

## Checklist:
<!--- Go over all the following points, and put an `x` in all the boxes that apply. -->
* [x] I'm merging against the `development` branch.
* [x] I have squashed my commits into a single commit.
  • Loading branch information
aviator-app[bot] authored Aug 19, 2021
2 parents 4fd0327 + a4958b3 commit 878c317
Show file tree
Hide file tree
Showing 16 changed files with 174 additions and 107 deletions.
2 changes: 1 addition & 1 deletion applications/daily_tests/helpers.js
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ const yargs = () => require("yargs")(hideBin(process.argv));
function sendWebhookNotification(channel, message, webhookUrlOverride = null) {
const hook = webhookUrlOverride || getWebhookUrlFromEnv();
if (!hook) {
throw new Error("WEBHOOK_URL not specified");
return;
}
const data = JSON.stringify({ channel, text: message });
const args = ` -i -X POST -H 'Content-Type: application/json' -d '${data}' ${hook}`;
Expand Down
2 changes: 1 addition & 1 deletion base_layer/core/src/base_node/rpc/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -312,7 +312,7 @@ impl<B: BlockchainBackend + 'static> BaseNodeWalletService for BaseNodeWalletRpc
async fn get_tip_info(&self, _request: Request<()>) -> Result<Response<TipInfoResponse>, RpcStatus> {
let state_machine = self.state_machine();
let status_watch = state_machine.get_status_info_watch();
let is_synced = match (*status_watch.borrow()).state_info {
let is_synced = match status_watch.borrow().state_info {
StateInfo::Listening(li) => li.is_synced(),
_ => false,
};
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -152,8 +152,9 @@ impl Listening {
);

if !self.is_synced {
debug!(target: LOG_TARGET, "Initial sync achieved");
self.is_synced = true;
shared.set_state_info(StateInfo::Listening(ListeningInfo::new(self.is_synced)));
shared.set_state_info(StateInfo::Listening(ListeningInfo::new(true)));
}
continue;
}
Expand Down Expand Up @@ -222,7 +223,7 @@ impl Listening {

impl From<Waiting> for Listening {
fn from(_: Waiting) -> Self {
Default::default()
Self { is_synced: false }
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,6 @@ impl MockBaseNodeService {
updated: None,
latency: None,
online,
base_node_peer: self.state.base_node_peer.clone(),
}
}

Expand All @@ -106,12 +105,11 @@ impl MockBaseNodeService {
updated: None,
latency: None,
online: OnlineStatus::Online,
base_node_peer: None,
}
}

fn set_base_node_peer(&mut self, peer: Peer) {
self.state.base_node_peer = Some(peer);
self.base_node_peer = Some(peer);
}

/// This handler is called when requests arrive from the various streams
Expand All @@ -125,7 +123,7 @@ impl MockBaseNodeService {
Ok(BaseNodeServiceResponse::BaseNodePeerSet)
},
BaseNodeServiceRequest::GetBaseNodePeer => {
let peer = self.state.base_node_peer.clone();
let peer = self.base_node_peer.clone();
Ok(BaseNodeServiceResponse::BaseNodePeer(peer.map(Box::new)))
},
BaseNodeServiceRequest::GetChainMetadata => Ok(BaseNodeServiceResponse::ChainMetadata(
Expand Down
32 changes: 15 additions & 17 deletions base_layer/wallet/src/base_node_service/monitor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -101,10 +101,8 @@ impl<T: WalletBackend + 'static> BaseNodeMonitor<T> {
loop {
use OnlineStatus::*;
match watcher.recv().await.unwrap_or(Offline) {
Online => match self.wallet_connectivity.get_current_base_node() {
Some(peer) => {
return peer;
},
Online => match self.wallet_connectivity.get_current_base_node_id() {
Some(node_id) => return node_id,
_ => continue,
},
Connecting => {
Expand All @@ -126,15 +124,8 @@ impl<T: WalletBackend + 'static> BaseNodeMonitor<T> {
.await
.ok_or(BaseNodeMonitorError::NodeShuttingDown)?;
let latency = client.get_last_request_latency().await?;
debug!(
target: LOG_TARGET,
"Base node {} latency: {} ms",
peer_node_id,
latency.unwrap_or_default().as_millis()
);

let tip_info = client.get_tip_info().await?;
let is_synced = tip_info.is_synced;

let chain_metadata = tip_info
.metadata
Expand All @@ -143,15 +134,24 @@ impl<T: WalletBackend + 'static> BaseNodeMonitor<T> {
ChainMetadata::try_from(metadata).map_err(BaseNodeMonitorError::InvalidBaseNodeResponse)
})?;

let is_synced = tip_info.is_synced;
debug!(
target: LOG_TARGET,
"Base node {} Tip: {} ({}) Latency: {} ms",
peer_node_id,
chain_metadata.height_of_longest_chain(),
if is_synced { "Synced" } else { "Syncing..." },
latency.unwrap_or_default().as_millis()
);

self.db.set_chain_metadata(chain_metadata.clone()).await?;

self.map_state(move |state| BaseNodeState {
self.map_state(move |_| BaseNodeState {
chain_metadata: Some(chain_metadata),
is_synced: Some(is_synced),
updated: Some(Utc::now().naive_utc()),
latency,
online: OnlineStatus::Online,
base_node_peer: state.base_node_peer.clone(),
})
.await;

Expand All @@ -164,25 +164,23 @@ impl<T: WalletBackend + 'static> BaseNodeMonitor<T> {
}

async fn set_connecting(&self) {
self.map_state(|state| BaseNodeState {
self.map_state(|_| BaseNodeState {
chain_metadata: None,
is_synced: None,
updated: Some(Utc::now().naive_utc()),
latency: None,
online: OnlineStatus::Connecting,
base_node_peer: state.base_node_peer.clone(),
})
.await;
}

async fn set_offline(&self) {
self.map_state(|state| BaseNodeState {
self.map_state(|_| BaseNodeState {
chain_metadata: None,
is_synced: None,
updated: Some(Utc::now().naive_utc()),
latency: None,
online: OnlineStatus::Offline,
base_node_peer: state.base_node_peer.clone(),
})
.await;
}
Expand Down
18 changes: 3 additions & 15 deletions base_layer/wallet/src/base_node_service/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ pub struct BaseNodeState {
pub updated: Option<NaiveDateTime>,
pub latency: Option<Duration>,
pub online: OnlineStatus,
pub base_node_peer: Option<Peer>,
// pub base_node_peer: Option<Peer>,
}

impl Default for BaseNodeState {
Expand All @@ -61,7 +61,6 @@ impl Default for BaseNodeState {
updated: None,
latency: None,
online: OnlineStatus::Connecting,
base_node_peer: None,
}
}
}
Expand Down Expand Up @@ -158,18 +157,7 @@ where T: WalletBackend + 'static
}

async fn set_base_node_peer(&mut self, peer: Peer) -> Result<(), BaseNodeServiceError> {
let new_state = BaseNodeState {
base_node_peer: Some(peer.clone()),
..Default::default()
};

{
let mut lock = self.state.write().await;
*lock = new_state.clone();
}
self.wallet_connectivity.set_base_node(peer.node_id.clone()).await?;

self.publish_event(BaseNodeEvent::BaseNodeStateChanged(new_state));
self.wallet_connectivity.set_base_node(peer.clone()).await?;
self.publish_event(BaseNodeEvent::BaseNodePeerSet(Box::new(peer)));
Ok(())
}
Expand All @@ -189,7 +177,7 @@ where T: WalletBackend + 'static
Ok(BaseNodeServiceResponse::BaseNodePeerSet)
},
BaseNodeServiceRequest::GetBaseNodePeer => {
let peer = self.get_state().await.base_node_peer.map(Box::new);
let peer = self.wallet_connectivity.get_current_base_node_peer().map(Box::new);
Ok(BaseNodeServiceResponse::BaseNodePeer(peer))
},
BaseNodeServiceRequest::GetChainMetadata => match self.get_state().await.chain_metadata.clone() {
Expand Down
21 changes: 14 additions & 7 deletions base_layer/wallet/src/connectivity_service/handle.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,27 +26,30 @@ use futures::{
channel::{mpsc, oneshot},
SinkExt,
};
use tari_comms::{peer_manager::NodeId, protocol::rpc::RpcClientLease};
use tari_comms::{
peer_manager::{NodeId, Peer},
protocol::rpc::RpcClientLease,
};
use tari_core::base_node::{rpc::BaseNodeWalletRpcClient, sync::rpc::BaseNodeSyncRpcClient};
use tokio::sync::watch;

pub enum WalletConnectivityRequest {
ObtainBaseNodeWalletRpcClient(oneshot::Sender<RpcClientLease<BaseNodeWalletRpcClient>>),
ObtainBaseNodeSyncRpcClient(oneshot::Sender<RpcClientLease<BaseNodeSyncRpcClient>>),
SetBaseNode(NodeId),
SetBaseNode(Box<Peer>),
}

#[derive(Clone)]
pub struct WalletConnectivityHandle {
sender: mpsc::Sender<WalletConnectivityRequest>,
base_node_watch_rx: watch::Receiver<Option<NodeId>>,
base_node_watch_rx: watch::Receiver<Option<Peer>>,
online_status_rx: watch::Receiver<OnlineStatus>,
}

impl WalletConnectivityHandle {
pub(super) fn new(
sender: mpsc::Sender<WalletConnectivityRequest>,
base_node_watch_rx: watch::Receiver<Option<NodeId>>,
base_node_watch_rx: watch::Receiver<Option<Peer>>,
online_status_rx: watch::Receiver<OnlineStatus>,
) -> Self {
Self {
Expand All @@ -56,9 +59,9 @@ impl WalletConnectivityHandle {
}
}

pub async fn set_base_node(&mut self, base_node_peer: NodeId) -> Result<(), WalletConnectivityError> {
pub async fn set_base_node(&mut self, base_node_peer: Peer) -> Result<(), WalletConnectivityError> {
self.sender
.send(WalletConnectivityRequest::SetBaseNode(base_node_peer))
.send(WalletConnectivityRequest::SetBaseNode(Box::new(base_node_peer)))
.await?;
Ok(())
}
Expand Down Expand Up @@ -110,7 +113,11 @@ impl WalletConnectivityHandle {
self.online_status_rx.clone()
}

pub fn get_current_base_node(&self) -> Option<NodeId> {
pub fn get_current_base_node_peer(&self) -> Option<Peer> {
self.base_node_watch_rx.borrow().clone()
}

pub fn get_current_base_node_id(&self) -> Option<NodeId> {
self.base_node_watch_rx.borrow().as_ref().map(|p| p.node_id.clone())
}
}
28 changes: 16 additions & 12 deletions base_layer/wallet/src/connectivity_service/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ use futures::{
use log::*;
use tari_comms::{
connectivity::ConnectivityRequester,
peer_manager::NodeId,
peer_manager::{NodeId, Peer},
protocol::rpc::{RpcClientLease, RpcClientPool},
};
use tari_core::base_node::{rpc::BaseNodeWalletRpcClient, sync::rpc::BaseNodeSyncRpcClient};
Expand All @@ -53,7 +53,7 @@ pub struct WalletConnectivityService {
config: BaseNodeServiceConfig,
request_stream: Fuse<mpsc::Receiver<WalletConnectivityRequest>>,
connectivity: ConnectivityRequester,
base_node_watch: Watch<Option<NodeId>>,
base_node_watch: Watch<Option<Peer>>,
pools: Option<ClientPoolContainer>,
online_status_watch: Watch<OnlineStatus>,
pending_requests: Vec<ReplyOneshot>,
Expand All @@ -68,7 +68,7 @@ impl WalletConnectivityService {
pub(super) fn new(
config: BaseNodeServiceConfig,
request_stream: mpsc::Receiver<WalletConnectivityRequest>,
base_node_watch: Watch<Option<NodeId>>,
base_node_watch: Watch<Option<Peer>>,
online_status_watch: Watch<OnlineStatus>,
connectivity: ConnectivityRequester,
) -> Self {
Expand All @@ -91,10 +91,10 @@ impl WalletConnectivityService {
req = self.request_stream.select_next_some() => {
self.handle_request(req).await;
},
peer = base_node_watch_rx.select_next_some() => {
if let Some(peer) = peer {
maybe_peer = base_node_watch_rx.select_next_some() => {
if maybe_peer.is_some() {
// This will block the rest until the connection is established. This is what we want.
self.setup_base_node_connection(peer).await;
self.setup_base_node_connection().await;
}
}
}
Expand All @@ -112,7 +112,7 @@ impl WalletConnectivityService {
},

SetBaseNode(peer) => {
self.set_base_node_peer(peer);
self.set_base_node_peer(*peer);
},
}
}
Expand Down Expand Up @@ -197,25 +197,29 @@ impl WalletConnectivityService {
self.set_base_node_peer(peer);
}

fn set_base_node_peer(&mut self, peer: NodeId) {
fn set_base_node_peer(&mut self, peer: Peer) {
self.pools = None;
self.base_node_watch.broadcast(Some(peer));
}

async fn setup_base_node_connection(&mut self, peer: NodeId) {
async fn setup_base_node_connection(&mut self) {
self.pools = None;
loop {
let node_id = match self.base_node_watch.borrow().as_ref() {
Some(p) => p.node_id.clone(),
None => return,
};
debug!(
target: LOG_TARGET,
"Attempting to connect to base node peer {}...", peer
"Attempting to connect to base node peer {}...", node_id
);
self.set_online_status(OnlineStatus::Connecting);
match self.try_setup_rpc_pool(peer.clone()).await {
match self.try_setup_rpc_pool(node_id.clone()).await {
Ok(_) => {
self.set_online_status(OnlineStatus::Online);
debug!(
target: LOG_TARGET,
"Wallet is ONLINE and connected to base node {}", peer
"Wallet is ONLINE and connected to base node {}", node_id
);
break;
},
Expand Down
Loading

0 comments on commit 878c317

Please sign in to comment.