Skip to content

Commit

Permalink
Merge branch 'development' into comms-custom-forward-address
Browse files Browse the repository at this point in the history
  • Loading branch information
SWvheerden authored May 25, 2023
2 parents dbfb94c + c704890 commit 94f3e6e
Show file tree
Hide file tree
Showing 21 changed files with 115 additions and 81 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 2 additions & 2 deletions applications/tari_console_wallet/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -23,10 +23,10 @@ tari_utilities = "0.4.10"
tari_wallet = { path = "../../base_layer/wallet", features = ["bundled_sqlite"] }

# Uncomment for tokio tracing via tokio-console (needs "tracing" featurs)
#console-subscriber = "0.1.3"
console-subscriber = "0.1.8"
#tokio = { version = "1.20", features = ["signal", "tracing"] }
# Uncomment for normal use (non tokio-console tracing)
tokio = { version = "1.23", default-features = false, features = ["signal", "sync"] }
tokio = { version = "1.23", features = ["signal"] }

bitflags = "1.2.1"
chrono = { version = "0.4.19", default-features = false }
Expand Down
2 changes: 2 additions & 0 deletions applications/tari_console_wallet/src/cli.rs
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,8 @@ pub struct Cli {
pub grpc_address: Option<String>,
#[clap(subcommand)]
pub command2: Option<CliCommands>,
#[clap(long, alias = "profile")]
pub profile_with_tokio_console: bool,
}

impl ConfigOverrideProvider for Cli {
Expand Down
1 change: 1 addition & 0 deletions applications/tari_console_wallet/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,7 @@ pub fn run_wallet(shutdown: &mut Shutdown, runtime: Runtime, config: &mut Applic
grpc_enabled: true,
grpc_address: None,
command2: None,
profile_with_tokio_console: false,
};

run_wallet_with_cli(shutdown, runtime, config, cli)
Expand Down
16 changes: 13 additions & 3 deletions applications/tari_console_wallet/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
// WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE
// USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.

use std::process;
use std::{panic, process};

use clap::Parser;
use log::*;
Expand All @@ -47,8 +47,13 @@ mod utils;
mod wallet_modes;

fn main() {
// Uncomment to enable tokio tracing via tokio-console
// console_subscriber::init();
// Setup a panic hook which prints the default rust panic message but also exits the process. This makes a panic in
// any thread "crash" the system instead of silently continuing.
let default_hook = panic::take_hook();
panic::set_hook(Box::new(move |info| {
default_hook(info);
process::exit(1);
}));

match main_inner() {
Ok(_) => process::exit(0),
Expand Down Expand Up @@ -80,6 +85,11 @@ fn main_inner() -> Result<(), ExitError> {
include_str!("../log4rs_sample.yml"),
)?;

if cli.profile_with_tokio_console {
// Uncomment to enable tokio tracing via tokio-console
console_subscriber::init();
}

let mut config = ApplicationConfig::load_from(&cfg)?;

setup_grpc_config(&mut config);
Expand Down
2 changes: 1 addition & 1 deletion applications/tari_console_wallet/src/ui/state/app_state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1313,7 +1313,7 @@ struct AppStateConfig {
impl Default for AppStateConfig {
fn default() -> Self {
Self {
cache_update_cooldown: Duration::from_secs(2),
cache_update_cooldown: Duration::from_millis(100),
}
}
}
4 changes: 2 additions & 2 deletions base_layer/wallet/src/base_node_service/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ use tari_common::configuration::serializers;
pub struct BaseNodeServiceConfig {
/// The refresh interval
#[serde(with = "serializers::seconds")]
pub base_node_monitor_refresh_interval: Duration,
pub base_node_monitor_max_refresh_interval: Duration,
/// The RPC client pool size
pub base_node_rpc_pool_size: usize,
/// This is the size of the event channel used to communicate base node events to the wallet
Expand All @@ -40,7 +40,7 @@ pub struct BaseNodeServiceConfig {
impl Default for BaseNodeServiceConfig {
fn default() -> Self {
Self {
base_node_monitor_refresh_interval: Duration::from_secs(3),
base_node_monitor_max_refresh_interval: Duration::from_secs(90),
base_node_rpc_pool_size: 10,
event_channel_size: 250,
}
Expand Down
9 changes: 5 additions & 4 deletions base_layer/wallet/src/base_node_service/handle.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,9 @@

use std::{fmt, fmt::Formatter, sync::Arc, time::Duration};

use tari_common_types::chain_metadata::ChainMetadata;
use tari_common_types::{chain_metadata::ChainMetadata, types::BlockHash};
use tari_service_framework::reply_channel::SenderService;
use tari_utilities::hex::Hex;
use tokio::sync::broadcast;
use tower::Service;

Expand All @@ -46,7 +47,7 @@ pub enum BaseNodeServiceResponse {
#[derive(Clone, Debug, Hash, PartialEq, Eq)]
pub enum BaseNodeEvent {
BaseNodeStateChanged(BaseNodeState),
NewBlockDetected(u64),
NewBlockDetected(BlockHash, u64),
}

impl fmt::Display for BaseNodeEvent {
Expand All @@ -55,8 +56,8 @@ impl fmt::Display for BaseNodeEvent {
BaseNodeEvent::BaseNodeStateChanged(state) => {
write!(f, "BaseNodeStateChanged: Synced:{:?}", state.is_synced)
},
BaseNodeEvent::NewBlockDetected(s) => {
write!(f, "NewBlockDetected: {}", s)
BaseNodeEvent::NewBlockDetected(hash, height) => {
write!(f, "NewBlockDetected: {} ({})", height, hash.to_hex())
},
}
}
Expand Down
65 changes: 43 additions & 22 deletions base_layer/wallet/src/base_node_service/monitor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
// USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.

use std::{
cmp,
convert::TryFrom,
future::Future,
sync::Arc,
Expand All @@ -30,8 +31,11 @@ use std::{
use chrono::Utc;
use futures::{future, future::Either};
use log::*;
use tari_common_types::chain_metadata::ChainMetadata;
use tari_comms::protocol::rpc::RpcError;
use tari_common_types::{chain_metadata::ChainMetadata, types::BlockHash as BlockHashType};
use tari_comms::{
backoff::{Backoff, ExponentialBackoff},
protocol::rpc::RpcError,
};
use tokio::{sync::RwLock, time};

use crate::{
Expand All @@ -47,7 +51,9 @@ use crate::{
const LOG_TARGET: &str = "wallet::base_node_service::chain_metadata_monitor";

pub struct BaseNodeMonitor<TBackend, TWalletConnectivity> {
interval: Duration,
max_interval: Duration,
backoff: ExponentialBackoff,
backoff_attempts: usize,
state: Arc<RwLock<BaseNodeState>>,
db: WalletDatabase<TBackend>,
wallet_connectivity: TWalletConnectivity,
Expand All @@ -60,14 +66,16 @@ where
TWalletConnectivity: WalletConnectivityInterface,
{
pub fn new(
interval: Duration,
max_interval: Duration,
state: Arc<RwLock<BaseNodeState>>,
db: WalletDatabase<TBackend>,
wallet_connectivity: TWalletConnectivity,
event_publisher: BaseNodeEventSender,
) -> Self {
Self {
interval,
max_interval,
backoff: ExponentialBackoff::default(),
backoff_attempts: 0,
state,
db,
wallet_connectivity,
Expand Down Expand Up @@ -169,14 +177,15 @@ where
let is_synced = tip_info.is_synced;
let height_of_longest_chain = chain_metadata.height_of_longest_chain();

self.update_state(BaseNodeState {
node_id: Some(base_node_id.clone()),
chain_metadata: Some(chain_metadata),
is_synced: Some(is_synced),
updated: Some(Utc::now().naive_utc()),
latency: Some(latency),
})
.await;
let new_block = self
.update_state(BaseNodeState {
node_id: Some(base_node_id.clone()),
chain_metadata: Some(chain_metadata),
is_synced: Some(is_synced),
updated: Some(Utc::now().naive_utc()),
latency: Some(latency),
})
.await;

debug!(
target: LOG_TARGET,
Expand All @@ -187,9 +196,18 @@ where
latency.as_millis()
);

let delay = time::sleep(self.interval.saturating_sub(latency));
if interrupt(base_node_watch.changed(), delay).await.is_none() {
self.update_state(Default::default()).await;
// If there's a new block, try again immediately,
if new_block {
self.backoff_attempts = 0;
} else {
self.backoff_attempts += 1;
let delay = time::sleep(
cmp::min(self.max_interval, self.backoff.calculate_backoff(self.backoff_attempts))
.saturating_sub(latency),
);
if interrupt(base_node_watch.changed(), delay).await.is_none() {
self.update_state(Default::default()).await;
}
}
}

Expand All @@ -198,24 +216,27 @@ where
Ok(())
}

async fn update_state(&self, new_state: BaseNodeState) {
// returns true if a new block, otherwise false
async fn update_state(&self, new_state: BaseNodeState) -> bool {
let mut lock = self.state.write().await;
let (new_block_detected, height) = match (new_state.chain_metadata.clone(), lock.chain_metadata.clone()) {
let (new_block_detected, height, hash) = match (new_state.chain_metadata.clone(), lock.chain_metadata.clone()) {
(Some(new_metadata), Some(old_metadata)) => (
new_metadata.height_of_longest_chain() != old_metadata.height_of_longest_chain(),
new_metadata.best_block() != old_metadata.best_block(),
new_metadata.height_of_longest_chain(),
*new_metadata.best_block(),
),
(Some(new_metadata), _) => (true, new_metadata.height_of_longest_chain()),
(None, _) => (false, 0),
(Some(new_metadata), _) => (true, new_metadata.height_of_longest_chain(), *new_metadata.best_block()),
(None, _) => (false, 0, BlockHashType::default()),
};

if new_block_detected {
self.publish_event(BaseNodeEvent::NewBlockDetected(height));
self.publish_event(BaseNodeEvent::NewBlockDetected(hash, height));
}

*lock = new_state.clone();

self.publish_event(BaseNodeEvent::BaseNodeStateChanged(new_state));
new_block_detected
}

fn publish_event(&self, event: BaseNodeEvent) {
Expand Down
2 changes: 1 addition & 1 deletion base_layer/wallet/src/base_node_service/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -127,7 +127,7 @@ where T: WalletBackend + 'static

fn spawn_monitor(&self) {
let monitor = BaseNodeMonitor::new(
self.config.base_node_monitor_refresh_interval,
self.config.base_node_monitor_max_refresh_interval,
self.state.clone(),
self.db.clone(),
self.wallet_connectivity.clone(),
Expand Down
6 changes: 3 additions & 3 deletions base_layer/wallet/src/connectivity_service/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -267,18 +267,18 @@ impl WalletConnectivityService {
debug!(
target: LOG_TARGET,
"Dial was cancelled. Retrying after {}s ...",
self.config.base_node_monitor_refresh_interval.as_secs()
self.config.base_node_monitor_max_refresh_interval.as_secs()
);
self.set_online_status(OnlineStatus::Offline);
time::sleep(self.config.base_node_monitor_refresh_interval).await;
time::sleep(self.config.base_node_monitor_max_refresh_interval).await;
continue;
},
Err(e) => {
warn!(target: LOG_TARGET, "{}", e);
if self.current_base_node().as_ref() == Some(&node_id) {
self.disconnect_base_node(node_id).await;
self.set_online_status(OnlineStatus::Offline);
time::sleep(self.config.base_node_monitor_refresh_interval).await;
time::sleep(self.config.base_node_monitor_max_refresh_interval).await;
}
continue;
},
Expand Down
26 changes: 12 additions & 14 deletions base_layer/wallet/src/output_manager_service/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -494,21 +494,19 @@ where

fn handle_base_node_service_event(&mut self, event: Arc<BaseNodeEvent>) {
match (*event).clone() {
BaseNodeEvent::BaseNodeStateChanged(state) => {
let trigger_validation = match (self.last_seen_tip_height, state.chain_metadata.clone()) {
(Some(last_seen_tip_height), Some(cm)) => last_seen_tip_height != cm.height_of_longest_chain(),
(None, _) => true,
_ => false,
};
if trigger_validation {
let _id = self.validate_outputs().map_err(|e| {
warn!(target: LOG_TARGET, "Error validating txos: {:?}", e);
e
});
}
self.last_seen_tip_height = state.chain_metadata.map(|cm| cm.height_of_longest_chain());
BaseNodeEvent::BaseNodeStateChanged(_state) => {
trace!(
target: LOG_TARGET,
"Received Base Node State Change but no block changes"
);
},
BaseNodeEvent::NewBlockDetected(_hash, height) => {
self.last_seen_tip_height = Some(height);
let _id = self.validate_outputs().map_err(|e| {
warn!(target: LOG_TARGET, "Error validating txos: {:?}", e);
e
});
},
BaseNodeEvent::NewBlockDetected(_) => {},
}
}

Expand Down
29 changes: 12 additions & 17 deletions base_layer/wallet/src/transaction_service/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -872,25 +872,20 @@ where
>,
) {
match (*event).clone() {
BaseNodeEvent::BaseNodeStateChanged(state) => {
let trigger_validation = match (self.last_seen_tip_height, state.chain_metadata.clone()) {
(Some(last_seen_tip_height), Some(cm)) => last_seen_tip_height != cm.height_of_longest_chain(),
(None, _) => true,
_ => false,
};
BaseNodeEvent::BaseNodeStateChanged(_state) => {
trace!(target: LOG_TARGET, "Received BaseNodeStateChanged event, but igoring",);
},
BaseNodeEvent::NewBlockDetected(_hash, height) => {
let _operation_id = self
.start_transaction_validation_protocol(transaction_validation_join_handles)
.await
.map_err(|e| {
warn!(target: LOG_TARGET, "Error validating txos: {:?}", e);
e
});

if trigger_validation {
let _operation_id = self
.start_transaction_validation_protocol(transaction_validation_join_handles)
.await
.map_err(|e| {
warn!(target: LOG_TARGET, "Error validating txos: {:?}", e);
e
});
}
self.last_seen_tip_height = state.chain_metadata.map(|cm| cm.height_of_longest_chain());
self.last_seen_tip_height = Some(height);
},
BaseNodeEvent::NewBlockDetected(_) => {},
}
}

Expand Down
2 changes: 1 addition & 1 deletion base_layer/wallet/src/utxo_scanner_service/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -148,7 +148,7 @@ where
event = base_node_service_event_stream.recv() => {
match event {
Ok(e) => {
if let BaseNodeEvent::NewBlockDetected(h) = (*e).clone() {
if let BaseNodeEvent::NewBlockDetected(_hash, h) = (*e).clone() {
debug!(target: LOG_TARGET, "New block event received: {}", h);
if local_shutdown.is_triggered() {
debug!(target: LOG_TARGET, "Starting new round of UTXO scanning");
Expand Down
Loading

0 comments on commit 94f3e6e

Please sign in to comment.