Skip to content

Commit

Permalink
feat(base-node): allow status line interval to be configured (#3852)
Browse files Browse the repository at this point in the history
Description
---

- Allow status line interval to be configured e.g `base_node.status_line_interval_secs = 10`
- Show status line once immediately on startup
- chore: prefix and sort a few GlobalConfig variables to indicate which subsystem they pertain  
- set defaults for `validator_node.scan_for_assets` and `validator_node.new_asset_scanning_interval`

Motivation and Context
---
There are some cases where you may want to see the status line more often. 

How Has This Been Tested?
---
Manually
  • Loading branch information
sdbondi authored Feb 22, 2022
1 parent ab52f5e commit 427463d
Show file tree
Hide file tree
Showing 12 changed files with 200 additions and 189 deletions.
6 changes: 3 additions & 3 deletions applications/tari_app_utilities/src/initialization.rs
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ pub fn init_configuration(
if let DatabaseType::LMDB(_) = global_config.db_type {
global_config.db_type = DatabaseType::LMDB(global_config.data_dir.join("db"));
}
global_config.peer_db_path = global_config.data_dir.join("peer_db");
global_config.comms_peer_db_path = global_config.data_dir.join("peer_db");
global_config.wallet_peer_db_path = global_config.data_dir.join("wallet_peer_db");
global_config.console_wallet_peer_db_path = global_config.data_dir.join("console_wallet_peer_db");
},
Expand Down Expand Up @@ -78,8 +78,8 @@ fn check_file_paths(config: &mut GlobalConfig, bootstrap: &ConfigBootstrap) {
config.db_type = DatabaseType::LMDB(config.data_dir.join("db"));
}
}
if !config.peer_db_path.is_absolute() {
config.peer_db_path = concatenate_paths_normalized(prepend.clone(), config.peer_db_path.clone());
if !config.comms_peer_db_path.is_absolute() {
config.comms_peer_db_path = concatenate_paths_normalized(prepend.clone(), config.comms_peer_db_path.clone());
}
if !config.base_node_identity_file.is_absolute() {
config.base_node_identity_file =
Expand Down
16 changes: 8 additions & 8 deletions applications/tari_base_node/src/bootstrap.rs
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ where B: BlockchainBackend + 'static
pub async fn bootstrap(self) -> Result<ServiceHandles, anyhow::Error> {
let config = self.config;

fs::create_dir_all(&config.peer_db_path)?;
fs::create_dir_all(&config.comms_peer_db_path)?;

let buf_size = cmp::max(BASE_NODE_BUFFER_MIN_SIZE, config.buffer_size_base_node);
let (publisher, peer_message_subscriptions) = pubsub_connector(buf_size, config.buffer_rate_limit_base_node);
Expand Down Expand Up @@ -218,7 +218,7 @@ where B: BlockchainBackend + 'static
let dht = handles.expect_handle::<Dht>();
let base_node_service = handles.expect_handle::<LocalNodeCommsInterface>();
let builder = RpcServer::builder();
let builder = match config.rpc_max_simultaneous_sessions {
let builder = match config.comms_rpc_max_simultaneous_sessions {
Some(limit) => builder.with_maximum_simultaneous_sessions(limit),
None => {
warn!(
Expand Down Expand Up @@ -256,26 +256,26 @@ where B: BlockchainBackend + 'static
node_identity: self.node_identity.clone(),
transport_type: create_transport_type(self.config),
auxilary_tcp_listener_address: self.config.auxilary_tcp_listener_address.clone(),
datastore_path: self.config.peer_db_path.clone(),
datastore_path: self.config.comms_peer_db_path.clone(),
peer_database_name: "peers".to_string(),
max_concurrent_inbound_tasks: 50,
max_concurrent_outbound_tasks: 100,
outbound_buffer_size: 100,
dht: DhtConfig {
database_url: DbConnectionUrl::File(self.config.data_dir.join("dht.db")),
auto_join: true,
allow_test_addresses: self.config.allow_test_addresses,
allow_test_addresses: self.config.comms_allow_test_addresses,
flood_ban_max_msg_count: self.config.flood_ban_max_msg_count,
saf_config: SafConfig {
msg_validity: self.config.saf_expiry_duration,
..Default::default()
},
dedup_cache_capacity: self.config.dedup_cache_capacity,
dedup_cache_capacity: self.config.dht_dedup_cache_capacity,
..Default::default()
},
allow_test_addresses: self.config.allow_test_addresses,
listener_liveness_allowlist_cidrs: self.config.listener_liveness_allowlist_cidrs.clone(),
listener_liveness_max_sessions: self.config.listnener_liveness_max_sessions,
allow_test_addresses: self.config.comms_allow_test_addresses,
listener_liveness_allowlist_cidrs: self.config.comms_listener_liveness_allowlist_cidrs.clone(),
listener_liveness_max_sessions: self.config.comms_listener_liveness_max_sessions,
user_agent: format!("tari/basenode/{}", env!("CARGO_PKG_VERSION")),
// Also add sync peers to the peer seed list. Duplicates are acceptable.
peer_seeds: self
Expand Down
16 changes: 10 additions & 6 deletions applications/tari_base_node/src/commands/command_handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -73,9 +73,9 @@ use tokio::{
use super::status_line::StatusLine;
use crate::{builder::BaseNodeContext, table::Table, utils::format_duration_basic, LOG_TARGET};

pub enum StatusOutput {
pub enum StatusLineOutput {
Log,
Full,
StdOutAndLog,
}

pub struct CommandHandler {
Expand Down Expand Up @@ -117,7 +117,11 @@ impl CommandHandler {
}
}

pub async fn status(&mut self, output: StatusOutput) -> Result<(), Error> {
pub fn global_config(&self) -> Arc<GlobalConfig> {
self.config.clone()
}

pub async fn status(&mut self, output: StatusLineOutput) -> Result<(), Error> {
let mut full_log = false;
if self.last_time_full.elapsed() > Duration::from_secs(120) {
self.last_time_full = Instant::now();
Expand Down Expand Up @@ -182,7 +186,7 @@ impl CommandHandler {
"{}/{}",
num_active_rpc_sessions,
self.config
.rpc_max_simultaneous_sessions
.comms_rpc_max_simultaneous_sessions
.as_ref()
.map(ToString::to_string)
.unwrap_or_else(|| "∞".to_string()),
Expand All @@ -201,11 +205,11 @@ impl CommandHandler {

let target = "base_node::app::status";
match output {
StatusOutput::Full => {
StatusLineOutput::StdOutAndLog => {
println!("{}", status_line);
info!(target: target, "{}", status_line);
},
StatusOutput::Log => info!(target: target, "{}", status_line),
StatusLineOutput::Log => info!(target: target, "{}", status_line),
};
Ok(())
}
Expand Down
4 changes: 2 additions & 2 deletions applications/tari_base_node/src/commands/performer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ use tari_utilities::ByteArray;

use super::{
args::{Args, ArgsError, ArgsReason, FromHex},
command_handler::{CommandHandler, StatusOutput},
command_handler::{CommandHandler, StatusLineOutput},
parser::BaseNodeCommand,
};
use crate::LOG_TARGET;
Expand Down Expand Up @@ -65,7 +65,7 @@ impl Performer {
self.print_help(command);
Ok(())
},
Status => self.command_handler.status(StatusOutput::Full).await,
Status => self.command_handler.status(StatusLineOutput::StdOutAndLog).await,
GetStateInfo => self.command_handler.state_info(),
Version => self.command_handler.print_version(),
CheckForUpdates => self.command_handler.check_for_updates().await,
Expand Down
24 changes: 13 additions & 11 deletions applications/tari_base_node/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,7 @@ use std::{
};

use commands::{
command_handler::{CommandHandler, StatusOutput},
command_handler::{CommandHandler, StatusLineOutput},
parser::Parser,
performer::Performer,
reader::{CommandEvent, CommandReader},
Expand Down Expand Up @@ -163,7 +163,7 @@ fn main_inner() -> Result<(), ExitError> {
// Load or create the Node identity
let node_identity = setup_node_identity(
&config.base_node_identity_file,
&config.public_address,
&config.comms_public_address,
bootstrap.create_id,
PeerFeatures::COMMUNICATION_NODE,
)?;
Expand Down Expand Up @@ -293,7 +293,7 @@ async fn run_node(
target: LOG_TARGET,
"Node has been successfully configured and initialized. Starting CLI loop."
);
task::spawn(cli_loop(command_handler, shutdown));
task::spawn(cli_loop(command_handler, config.clone(), shutdown));
}
if !config.force_sync_peers.is_empty() {
warn!(
Expand Down Expand Up @@ -353,27 +353,28 @@ async fn run_grpc(
Ok(())
}

fn status_interval(start_time: Instant) -> time::Sleep {
fn get_status_interval(start_time: Instant, long_interval: Duration) -> time::Sleep {
let duration = match start_time.elapsed().as_secs() {
0..=120 => Duration::from_secs(5),
_ => Duration::from_secs(30),
_ => long_interval,
};
time::sleep(duration)
}

async fn status_loop(mut command_handler: CommandHandler, shutdown: Shutdown) {
let start_time = Instant::now();
let mut shutdown_signal = shutdown.to_signal();
let status_interval = command_handler.global_config().base_node_status_line_interval;
loop {
let interval = status_interval(start_time);
let interval = get_status_interval(start_time, status_interval);
tokio::select! {
biased;
_ = shutdown_signal.wait() => {
break;
}

_ = interval => {
command_handler.status(StatusOutput::Log).await.ok();
command_handler.status(StatusLineOutput::Log).await.ok();
},
}
}
Expand All @@ -386,12 +387,11 @@ async fn status_loop(mut command_handler: CommandHandler, shutdown: Shutdown) {
///
/// ## Returns
/// Doesn't return anything
async fn cli_loop(command_handler: CommandHandler, mut shutdown: Shutdown) {
async fn cli_loop(command_handler: CommandHandler, config: Arc<GlobalConfig>, mut shutdown: Shutdown) {
let parser = Parser::new();
commands::cli::print_banner(parser.get_commands(), 3);

let mut performer = Performer::new(command_handler);

let cli_config = Config::builder()
.history_ignore_space(true)
.completion_type(CompletionType::List)
Expand All @@ -408,8 +408,10 @@ async fn cli_loop(command_handler: CommandHandler, mut shutdown: Shutdown) {
let mut software_update_notif = performer.get_software_updater().new_update_notifier().clone();
let mut first_signal = false;
// TODO: Add heartbeat here
// Show status immediately on startup
let _ = performer.status(StatusLineOutput::StdOutAndLog).await;
loop {
let interval = status_interval(start_time);
let interval = get_status_interval(start_time, config.base_node_status_line_interval);
tokio::select! {
res = reader.next_command() => {
if let Some(event) = res {
Expand Down Expand Up @@ -454,7 +456,7 @@ async fn cli_loop(command_handler: CommandHandler, mut shutdown: Shutdown) {
}
_ = interval => {
// TODO: Execute `watch` command here + use the result
performer.status(StatusOutput::Full).await.ok();
let _ = performer.status(StatusLineOutput::StdOutAndLog).await;
},
_ = shutdown_signal.wait() => {
break;
Expand Down
8 changes: 4 additions & 4 deletions applications/tari_console_wallet/src/init/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -318,7 +318,7 @@ pub async fn init_wallet(
"Databases Initialized. Wallet encrypted? {}.", wallet_encrypted
);

let node_address = match config.public_address.clone() {
let node_address = match config.comms_public_address.clone() {
Some(addr) => addr,
None => match wallet_db.get_node_address().await? {
Some(addr) => addr,
Expand Down Expand Up @@ -387,17 +387,17 @@ pub async fn init_wallet(
dht: DhtConfig {
database_url: DbConnectionUrl::File(config.data_dir.join("dht-console-wallet.db")),
auto_join: true,
allow_test_addresses: config.allow_test_addresses,
allow_test_addresses: config.comms_allow_test_addresses,
flood_ban_max_msg_count: config.flood_ban_max_msg_count,
saf_config: SafConfig {
msg_validity: config.saf_expiry_duration,
..Default::default()
},
dedup_cache_capacity: config.dedup_cache_capacity,
dedup_cache_capacity: config.dht_dedup_cache_capacity,
..Default::default()
},
// This should be false unless testing locally
allow_test_addresses: config.allow_test_addresses,
allow_test_addresses: config.comms_allow_test_addresses,
listener_liveness_allowlist_cidrs: Vec::new(),
listener_liveness_max_sessions: 0,
dns_seeds_name_server: DEFAULT_DNS_NAME_SERVER.parse().unwrap(),
Expand Down
12 changes: 6 additions & 6 deletions applications/tari_validator_node/src/comms.rs
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,7 @@ fn setup_p2p_rpc(
) -> UnspawnedCommsNode {
let dht = handles.expect_handle::<Dht>();
let builder = RpcServer::builder();
let builder = match config.rpc_max_simultaneous_sessions {
let builder = match config.comms_rpc_max_simultaneous_sessions {
Some(limit) => builder.with_maximum_simultaneous_sessions(limit),
None => {
warn!(
Expand All @@ -136,25 +136,25 @@ fn create_comms_config(config: &GlobalConfig, node_identity: Arc<NodeIdentity>)
network: config.network,
node_identity,
transport_type: create_transport_type(config),
datastore_path: config.peer_db_path.clone(),
datastore_path: config.comms_peer_db_path.clone(),
peer_database_name: "peers".to_string(),
max_concurrent_inbound_tasks: 50,
max_concurrent_outbound_tasks: 100,
outbound_buffer_size: 100,
dht: DhtConfig {
database_url: DbConnectionUrl::File(config.data_dir.join("dht.db")),
auto_join: true,
allow_test_addresses: config.allow_test_addresses,
allow_test_addresses: config.comms_allow_test_addresses,
flood_ban_max_msg_count: config.flood_ban_max_msg_count,
saf_config: SafConfig {
msg_validity: config.saf_expiry_duration,
..Default::default()
},
..Default::default()
},
allow_test_addresses: config.allow_test_addresses,
listener_liveness_allowlist_cidrs: config.listener_liveness_allowlist_cidrs.clone(),
listener_liveness_max_sessions: config.listnener_liveness_max_sessions,
allow_test_addresses: config.comms_allow_test_addresses,
listener_liveness_allowlist_cidrs: config.comms_listener_liveness_allowlist_cidrs.clone(),
listener_liveness_max_sessions: config.comms_listener_liveness_max_sessions,
user_agent: format!("tari/dannode/{}", env!("CARGO_PKG_VERSION")),
// Also add sync peers to the peer seed list. Duplicates are acceptable.
peer_seeds: config
Expand Down
4 changes: 2 additions & 2 deletions applications/tari_validator_node/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -97,10 +97,10 @@ async fn run_node(config: GlobalConfig, create_id: bool) -> Result<(), ExitError
.as_ref()
.ok_or_else(|| ExitError::new(ExitCode::ConfigError, "validator_node configuration not found"))?;

fs::create_dir_all(&config.peer_db_path).map_err(|err| ExitError::new(ExitCode::ConfigError, err))?;
fs::create_dir_all(&config.comms_peer_db_path).map_err(|err| ExitError::new(ExitCode::ConfigError, err))?;
let node_identity = setup_node_identity(
&config.base_node_identity_file,
&config.public_address,
&config.comms_public_address,
create_id,
PeerFeatures::NONE,
)?;
Expand Down
2 changes: 0 additions & 2 deletions common/config/presets/base_node.toml
Original file line number Diff line number Diff line change
Expand Up @@ -28,11 +28,9 @@
grpc_enabled = true
# The socket to expose for the gRPC base node server. This value is ignored if grpc_enabled is false.
grpc_address = "/ip4/127.0.0.1/tcp/18142"

# Set to true to record all reorgs. Recorded reorgs can be viewed using the list-reorgs command.
track_reorgs = true


# Configuration options for testnet dibbler
[base_node.dibbler]
# The type of database backend to use. Currently supported options are "memory" and "lmdb". LMDB is recommnded for
Expand Down
2 changes: 1 addition & 1 deletion common/src/configuration/collectibles_config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ fn default_wallet_grpc_address() -> SocketAddr {
}

impl CollectiblesConfig {
pub fn convert_if_present(cfg: Config) -> Result<Option<CollectiblesConfig>, ConfigurationError> {
pub fn convert_if_present(cfg: &Config) -> Result<Option<CollectiblesConfig>, ConfigurationError> {
let section: Self = match cfg.get("collectibles") {
Ok(s) => s,
Err(_e) => {
Expand Down
Loading

0 comments on commit 427463d

Please sign in to comment.