Skip to content

Commit

Permalink
review comments - balance enquiry debouncer
Browse files Browse the repository at this point in the history
  • Loading branch information
hansieodendaal committed Sep 21, 2021
1 parent d4f118a commit 95ee4d4
Show file tree
Hide file tree
Showing 8 changed files with 195 additions and 63 deletions.
4 changes: 0 additions & 4 deletions applications/tari_console_wallet/src/ui/app.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,6 @@ use crate::{
},
wallet_modes::PeerConfig,
};
use log::*;
use tari_common::{configuration::Network, GlobalConfig};
use tari_comms::peer_manager::Peer;
use tari_wallet::WalletSqlite;
Expand Down Expand Up @@ -153,9 +152,6 @@ impl<B: Backend> App<B> {
}

pub fn on_tick(&mut self) {
if let Err(e) = Handle::current().block_on(self.app_state.refresh_balance_check()) {
warn!(target: LOG_TARGET, "Error refresh balance check: {}", e);
}
Handle::current().block_on(self.app_state.update_cache());
self.tabs.on_tick(&mut self.app_state);
}
Expand Down
2 changes: 2 additions & 0 deletions applications/tari_console_wallet/src/ui/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,8 @@ pub fn run(app: App<CrosstermBackend<Stdout>>) -> Result<(), ExitCodes> {
app.app_state.refresh_contacts_state().await?;
trace!(target: LOG_TARGET, "Refreshing connected peers state");
app.app_state.refresh_connected_peers_state().await?;
trace!(target: LOG_TARGET, "Starting balance enquiry debouncer");
app.app_state.start_balance_enquiry_debouncer().await?;
trace!(target: LOG_TARGET, "Starting app state event monitor");
app.app_state.start_event_monitor(app.notifier.clone()).await;
Result::<_, UiError>::Ok(())
Expand Down
78 changes: 31 additions & 47 deletions applications/tari_console_wallet/src/ui/state/app_state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@ use crate::{
notifier::Notifier,
ui::{
state::{
debouncer::BalanceEnquiryDebouncer,
tasks::{send_one_sided_transaction_task, send_transaction_task},
wallet_event_monitor::WalletEventMonitor,
},
Expand All @@ -74,7 +75,7 @@ use crate::{
utils::db::{CUSTOM_BASE_NODE_ADDRESS_KEY, CUSTOM_BASE_NODE_PUBLIC_KEY_KEY},
wallet_modes::PeerConfig,
};
use std::ops::Sub;
use tari_wallet::output_manager_service::handle::OutputManagerHandle;

const LOG_TARGET: &str = "wallet::console_wallet::app_state";

Expand All @@ -87,6 +88,8 @@ pub struct AppState {
node_config: GlobalConfig,
config: AppStateConfig,
wallet_connectivity: WalletConnectivityHandle,
output_manager_service: OutputManagerHandle,
balance_enquiry_debouncer: BalanceEnquiryDebouncer,
}

impl AppState {
Expand All @@ -99,32 +102,45 @@ impl AppState {
node_config: GlobalConfig,
) -> Self {
let wallet_connectivity = wallet.wallet_connectivity.clone();
let inner = AppStateInner::new(
node_identity,
network,
wallet,
base_node_selected,
base_node_config,
Duration::from_secs(node_config.wallet_balance_enquiry_cooldown_period),
);
let output_manager_service = wallet.output_manager_service.clone();
let inner = AppStateInner::new(node_identity, network, wallet, base_node_selected, base_node_config);
let cached_data = inner.data.clone();

let inner = Arc::new(RwLock::new(inner));
Self {
inner: Arc::new(RwLock::new(inner)),
inner: inner.clone(),
cached_data,
cache_update_cooldown: None,
completed_tx_filter: TransactionFilter::ABANDONED_COINBASES,
node_config,
node_config: node_config.clone(),
config: AppStateConfig::default(),
wallet_connectivity,
output_manager_service: output_manager_service.clone(),
balance_enquiry_debouncer: BalanceEnquiryDebouncer::new(
inner,
Duration::from_secs(node_config.wallet_balance_enquiry_cooldown_period),
output_manager_service,
),
}
}

pub async fn start_event_monitor(&self, notifier: Notifier) {
let event_monitor = WalletEventMonitor::new(self.inner.clone());
let balance_enquiry_debounce_tx = self.balance_enquiry_debouncer.clone().get_sender();
let event_monitor = WalletEventMonitor::new(self.inner.clone(), balance_enquiry_debounce_tx);
tokio::spawn(event_monitor.run(notifier));
}

pub async fn start_balance_enquiry_debouncer(&self) -> Result<(), UiError> {
tokio::spawn(self.balance_enquiry_debouncer.clone().run());
let _ = self
.balance_enquiry_debouncer
.clone()
.get_sender()
.send(())
.map_err(|e| UiError::SendError(e.to_string()));
Ok(())
}

pub async fn refresh_transaction_state(&mut self) -> Result<(), UiError> {
let mut inner = self.inner.write().await;
inner.refresh_full_transaction_state().await?;
Expand All @@ -149,12 +165,6 @@ impl AppState {
Ok(())
}

pub async fn refresh_balance_check(&self) -> Result<(), UiError> {
let mut inner = self.inner.write().await;
inner.refresh_balance_check().await?;
Ok(())
}

pub async fn update_cache(&mut self) {
let update = match self.cache_update_cooldown {
Some(last_update) => last_update.elapsed() > self.config.cache_update_cooldown,
Expand Down Expand Up @@ -455,9 +465,6 @@ pub struct AppStateInner {
updated: bool,
data: AppStateData,
wallet: WalletSqlite,
balance_enquiry_cooldown_start: Instant,
balance_enquiry_cooldown_infringement: bool,
balance_enquiry_cooldown_period: Duration,
}

impl AppStateInner {
Expand All @@ -467,18 +474,13 @@ impl AppStateInner {
wallet: WalletSqlite,
base_node_selected: Peer,
base_node_config: PeerConfig,
balance_enquiry_cooldown_period: Duration,
) -> Self {
let data = AppStateData::new(node_identity, network, base_node_selected, base_node_config);

AppStateInner {
updated: false,
data,
wallet,
balance_enquiry_cooldown_start: Instant::now()
.sub(balance_enquiry_cooldown_period + Duration::from_secs(1)),
balance_enquiry_cooldown_infringement: false,
balance_enquiry_cooldown_period,
}
}

Expand Down Expand Up @@ -547,7 +549,6 @@ impl AppStateInner {
});

self.data.completed_txs = completed_transactions;
self.refresh_balance().await?;
self.updated = true;
Ok(())
}
Expand Down Expand Up @@ -605,7 +606,6 @@ impl AppStateInner {
.partial_cmp(&a.timestamp)
.expect("Should be able to compare timestamps")
});
self.refresh_balance().await?;
self.updated = true;
return Ok(());
}
Expand All @@ -622,7 +622,6 @@ impl AppStateInner {
});
},
}
self.refresh_balance().await?;
self.updated = true;
Ok(())
}
Expand Down Expand Up @@ -664,24 +663,9 @@ impl AppStateInner {
Ok(())
}

pub async fn refresh_balance(&mut self) -> Result<(), UiError> {
if self.balance_enquiry_cooldown_start.elapsed() > self.balance_enquiry_cooldown_period {
self.balance_enquiry_cooldown_start = Instant::now();
self.balance_enquiry_cooldown_infringement = false;
let balance = self.wallet.output_manager_service.get_balance().await?;
self.data.balance = balance;
self.updated = true;
} else {
self.balance_enquiry_cooldown_infringement = true;
}

Ok(())
}

pub async fn refresh_balance_check(&mut self) -> Result<(), UiError> {
if self.balance_enquiry_cooldown_infringement {
self.refresh_balance().await?;
};
pub async fn refresh_balance(&mut self, balance: Balance) -> Result<(), UiError> {
self.data.balance = balance;
self.updated = true;

Ok(())
}
Expand Down
139 changes: 139 additions & 0 deletions applications/tari_console_wallet/src/ui/state/debouncer.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,139 @@
// Copyright 2020. The Tari Project
//
// Redistribution and use in source and binary forms, with or without modification, are permitted provided that the
// following conditions are met:
//
// 1. Redistributions of source code must retain the above copyright notice, this list of conditions and the following
// disclaimer.
//
// 2. Redistributions in binary form must reproduce the above copyright notice, this list of conditions and the
// following disclaimer in the documentation and/or other materials provided with the distribution.
//
// 3. Neither the name of the copyright holder nor the names of its contributors may be used to endorse or promote
// products derived from this software without specific prior written permission.
//
// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES,
// INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
// DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
// SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR
// SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY,
// WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE
// USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.

use crate::ui::state::AppStateInner;
use log::*;
use std::{
sync::Arc,
time::{Duration, Instant},
};
use tari_wallet::output_manager_service::handle::OutputManagerHandle;
use tokio::{
sync::{broadcast, RwLock},
time,
};

const LOG_TARGET: &str = "wallet::console_wallet::debouncer";

#[derive(Clone)]
pub(crate) struct BalanceEnquiryDebouncer {
app_state_inner: Arc<RwLock<AppStateInner>>,
output_manager_service: OutputManagerHandle,
balance_enquiry_cooldown_period: Duration,
tx: broadcast::Sender<()>,
}

impl BalanceEnquiryDebouncer {
pub fn new(
app_state_inner: Arc<RwLock<AppStateInner>>,
balance_enquiry_cooldown_period: Duration,
output_manager_service: OutputManagerHandle,
) -> Self {
// This channel must only be size 1; the debouncer will ensure that the balance is updated timeously
let (tx, _) = broadcast::channel(1);
Self {
app_state_inner,
output_manager_service,
balance_enquiry_cooldown_period,
tx,
}
}

pub async fn run(mut self) {
let balance_enquiry_events = &mut self.tx.subscribe();
let mut shutdown_signal = self.app_state_inner.read().await.get_shutdown_signal();
let delay = time::sleep(self.balance_enquiry_cooldown_period);
tokio::pin!(delay);

debug!(target: LOG_TARGET, "Balance enquiry debouncer starting");
if let Ok(balance) = self.output_manager_service.get_balance().await {
trace!(
target: LOG_TARGET,
"Initial balance: available {}, incoming {}, outgoing {}",
balance.available_balance,
balance.pending_incoming_balance,
balance.pending_outgoing_balance
);
let mut inner = self.app_state_inner.write().await;
if let Err(e) = inner.refresh_balance(balance).await {
warn!(target: LOG_TARGET, "Error refresh app_state: {}", e);
}
}
loop {
tokio::select! {
_ = &mut delay => {
if let Ok(result) = time::timeout(
self.balance_enquiry_cooldown_period,
balance_enquiry_events.recv()
).await {
match result {
Ok(_) => {
let start_time = Instant::now();
match self.output_manager_service.get_balance().await {
Ok(balance) => {
trace!(
target: LOG_TARGET,
"Updating balance ({} ms): available {}, incoming {}, outgoing {}",
start_time.elapsed().as_millis(),
balance.available_balance,
balance.pending_incoming_balance,
balance.pending_outgoing_balance
);
let mut inner = self.app_state_inner.write().await;
if let Err(e) = inner.refresh_balance(balance).await {
warn!(target: LOG_TARGET, "Error refresh app_state: {}", e);
}
}
Err(e) => {
warn!(target: LOG_TARGET, "Could not obtain balance ({})", e);
}
}
}
Err(broadcast::error::RecvError::Lagged(n)) => {
trace!(target: LOG_TARGET, "Balance enquiry debouncer lagged {} update requests", n);
continue;
}
Err(broadcast::error::RecvError::Closed) => {
info!(
target: LOG_TARGET,
"Balance enquiry debouncer shutting down because the channel was closed"
);
break;
}
}
}
},
_ = shutdown_signal.wait() => {
info!(
target: LOG_TARGET,
"Balance enquiry debouncer shutting down because the shutdown signal was received"
);
break;
},
}
}
}

pub fn get_sender(self) -> broadcast::Sender<()> {
self.tx
}
}
1 change: 1 addition & 0 deletions applications/tari_console_wallet/src/ui/state/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
// USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.

mod app_state;
mod debouncer;
mod tasks;
mod wallet_event_monitor;

Expand Down
Loading

0 comments on commit 95ee4d4

Please sign in to comment.