diff --git a/.github/workflows/base_node_binaries.json b/.github/workflows/base_node_binaries.json index b860c66064..94adce9f8e 100644 --- a/.github/workflows/base_node_binaries.json +++ b/.github/workflows/base_node_binaries.json @@ -28,7 +28,7 @@ "cross": false, "target_cpu": "x86-64", "target_bins": "--bin minotari_node --bin minotari_console_wallet --bin minotari_merge_mining_proxy --bin minotari_miner", - "features": "safe" + "features": "libtor, safe" }, { "name": "macos-arm64", @@ -38,7 +38,7 @@ "cross": false, "target_cpu": "generic", "target_bins": "--bin minotari_node --bin minotari_console_wallet --bin minotari_merge_mining_proxy --bin minotari_miner", - "features": "safe", + "features": "libtor, safe", "build_enabled": false }, { diff --git a/.github/workflows/base_node_binaries.yml b/.github/workflows/base_node_binaries.yml index 99ed67771d..263f74c40c 100644 --- a/.github/workflows/base_node_binaries.yml +++ b/.github/workflows/base_node_binaries.yml @@ -204,14 +204,14 @@ jobs: if [ "${{ matrix.builds.cross }}" != "true" ]; then cargo build --release \ --target ${{ matrix.builds.target }} \ - --features ${{ matrix.builds.features }} \ + --features "${{ matrix.builds.features }}" \ ${{ matrix.builds.target_bins }} \ ${{ matrix.builds.flags }} --locked else cargo install cross cross build --release \ --target ${{ matrix.builds.target }} \ - --features ${{ matrix.builds.features }} \ + --features "${{ matrix.builds.features }}" \ ${{ matrix.builds.target_bins }} \ ${{ matrix.builds.flags }} --locked fi diff --git a/Cargo.lock b/Cargo.lock index 20c1fe776b..f018c4d46b 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3216,6 +3216,7 @@ dependencies = [ "hyper", "jsonrpc", "log", + "minotari_app_grpc", "minotari_app_utilities", "minotari_node_grpc_client", "minotari_wallet_grpc_client", diff --git a/applications/minotari_app_grpc/src/authentication/basic_auth.rs b/applications/minotari_app_grpc/src/authentication/basic_auth.rs index 43cdc9fb1a..1891d92c1a 100644 --- a/applications/minotari_app_grpc/src/authentication/basic_auth.rs +++ b/applications/minotari_app_grpc/src/authentication/basic_auth.rs @@ -363,19 +363,33 @@ mod tests { // #[test] fn it_compares_user_names_in_constant_time() { + // Enable flag `do_performance_testing` to run performance tests; for regular CI runs, this flag should be + // `false` otherwise the test will fail. + // Notes: + // - The `assert!(!do_performance_testing);` at the end of the test will cause a panic on CI if the flag is + // enabled, if it is enabled it will allow results to be printed when running in release mode. + // - For CI (flag disabled), we are only interested if the functional test pass, thus 1 iteration completed + // successfully. + let do_performance_testing = false; + #[allow(clippy::cast_possible_truncation)] fn round_to_6_decimals(num: f64) -> f64 { ((num * 100000.0) as u128) as f64 / 100000.0 } - const ITERATIONS: usize = 100; + const ITERATIONS: usize = 250; let mut variances = Vec::with_capacity(ITERATIONS); let mut short = Vec::with_capacity(ITERATIONS); let mut long = Vec::with_capacity(ITERATIONS); let mut actual = Vec::with_capacity(ITERATIONS); + // This value should be chosen to comply with: + // - Small enough to ensure a single iteration does not take too long. + // - Large enough to enable proper time measurement; executing the function that many times should be + // measurable, thus > micro seconds in this case. const COUNTS: usize = 2500; let username_actual = "admin"; let hashed_password = create_salted_hashed_password(b"secret").unwrap(); + let mut test_runs = 0; for i in 1..=ITERATIONS { let credentials = BasicAuthCredentials::new(username_actual.to_string(), hashed_password.to_string().into()).unwrap(); @@ -425,6 +439,11 @@ mod tests { long.push(time_taken_2); actual.push(time_taken_3); + test_runs += 1; + if !do_performance_testing { + break; + } + // The use of sleep between iterations helps ensure that the tests are run under different conditions, // simulating real-world scenarios. if i < ITERATIONS { @@ -438,12 +457,15 @@ mod tests { let avg_long = round_to_6_decimals(long.iter().sum::() as f64 / long.len() as f64 / COUNTS as f64); let avg_actual = round_to_6_decimals(actual.iter().sum::() as f64 / actual.len() as f64 / COUNTS as f64); + println!("Test runs: {}", test_runs); println!("Minimum variance: {} %", min_variance); println!("Average variance: {} %", avg_variance); println!("Average short username time: {} microseconds", avg_short); println!("Average long username time: {} microseconds", avg_long); println!("Average actual username time: {} microseconds", avg_actual); - assert!(*min_variance < 10.0); + + // This is to make sure we do not run performance tests on CI. + assert!(!do_performance_testing); } // This unit test asserts that the minimum variance is less than 10% (chosen to be robust for running the unit @@ -491,19 +513,34 @@ mod tests { // #[test] fn it_compares_credentials_in_constant_time() { + // Enable flag `do_performance_testing` to run performance tests; for regular CI runs, this flag should be + // `false` otherwise the test will fail. + // Notes: + // - The `assert!(!do_performance_testing);` at the end of the test will cause a panic on CI if the flag is + // enabled, if it is enabled it will allow results to be printed when running in release mode. + // - For CI (flag disabled), we are only interested if the functional test pass, thus 1 iteration completed + // successfully. + // - Running this specific test in debug mode is ~100x slower when compared to release mode. + let do_performance_testing = false; + #[allow(clippy::cast_possible_truncation)] fn round_to_6_decimals(num: f64) -> f64 { ((num * 100000.0) as u128) as f64 / 100000.0 } - const ITERATIONS: usize = 10; + const ITERATIONS: usize = 250; let mut variances = Vec::with_capacity(ITERATIONS); let mut short = Vec::with_capacity(ITERATIONS); let mut long = Vec::with_capacity(ITERATIONS); let mut actual = Vec::with_capacity(ITERATIONS); - const COUNTS: usize = 20; + // This value should be chosen to comply with: + // - Small enough to ensure a single iteration does not take too long. + // - Large enough to enable proper time measurement; executing the function that many times should be + // measurable, thus > milli seconds in this case. + const COUNTS: usize = 10; let username_actual = "admin"; let hashed_password = create_salted_hashed_password(b"secret").unwrap(); + let mut test_runs = 0; for i in 1..=ITERATIONS { let credentials = BasicAuthCredentials::new(username_actual.to_string(), hashed_password.to_string().into()).unwrap(); @@ -551,6 +588,11 @@ mod tests { long.push(time_taken_2); actual.push(time_taken_3); + test_runs += 1; + if !do_performance_testing { + break; + } + // The use of sleep between iterations helps ensure that the tests are run under different conditions, // simulating real-world scenarios. if i < ITERATIONS { @@ -564,12 +606,15 @@ mod tests { let avg_long = round_to_6_decimals(long.iter().sum::() as f64 / long.len() as f64 / COUNTS as f64); let avg_actual = round_to_6_decimals(actual.iter().sum::() as f64 / actual.len() as f64 / COUNTS as f64); + println!("Test runs: {}", test_runs); println!("Minimum variance: {} %", min_variance); println!("Average variance: {} %", avg_variance); println!("Average short username time: {} microseconds", avg_short); println!("Average long username time: {} microseconds", avg_long); println!("Average actual username time: {} microseconds", avg_actual); - assert!(*min_variance < 10.0); + + // This is to make sure we do not run performance tests on CI. + assert!(!do_performance_testing); } } diff --git a/applications/minotari_merge_mining_proxy/Cargo.toml b/applications/minotari_merge_mining_proxy/Cargo.toml index 67097ef679..5aebf91c49 100644 --- a/applications/minotari_merge_mining_proxy/Cargo.toml +++ b/applications/minotari_merge_mining_proxy/Cargo.toml @@ -16,8 +16,9 @@ tari_comms = { path = "../../comms/core" } tari_core = { path = "../../base_layer/core", default-features = false, features = ["transactions"] } minotari_app_utilities = { path = "../minotari_app_utilities" } tari_utilities = { version = "0.6" } -minotari_node_grpc_client = {path="../../clients/rust/base_node_grpc_client" } -minotari_wallet_grpc_client = {path="../../clients/rust/wallet_grpc_client" } +minotari_node_grpc_client = { path = "../../clients/rust/base_node_grpc_client" } +minotari_wallet_grpc_client = { path = "../../clients/rust/wallet_grpc_client" } +minotari_app_grpc = { path = "../minotari_app_grpc" } anyhow = "1.0.53" crossterm = { version = "0.25.0" } diff --git a/applications/minotari_merge_mining_proxy/src/block_template_protocol.rs b/applications/minotari_merge_mining_proxy/src/block_template_protocol.rs index 38fbe6640c..1cb70122f7 100644 --- a/applications/minotari_merge_mining_proxy/src/block_template_protocol.rs +++ b/applications/minotari_merge_mining_proxy/src/block_template_protocol.rs @@ -25,9 +25,13 @@ use std::{cmp, sync::Arc}; use log::*; -use minotari_node_grpc_client::{grpc, BaseNodeGrpcClient}; -use minotari_wallet_grpc_client::WalletGrpcClient; +use minotari_app_grpc::{ + authentication::ClientAuthenticationInterceptor, + tari_rpc::{base_node_client::BaseNodeClient, wallet_client::WalletClient}, +}; +use minotari_node_grpc_client::grpc; use tari_core::proof_of_work::{monero_rx, monero_rx::FixedByteArray, Difficulty}; +use tonic::{codegen::InterceptedService, transport::Channel}; use crate::{ block_template_data::{BlockTemplateData, BlockTemplateDataBuilder}, @@ -41,14 +45,14 @@ const LOG_TARGET: &str = "minotari_mm_proxy::proxy::block_template_protocol"; /// Structure holding grpc connections. pub struct BlockTemplateProtocol<'a> { config: Arc, - base_node_client: &'a mut BaseNodeGrpcClient, - wallet_client: &'a mut WalletGrpcClient, + base_node_client: &'a mut BaseNodeClient>, + wallet_client: &'a mut WalletClient>, } impl<'a> BlockTemplateProtocol<'a> { pub fn new( - base_node_client: &'a mut BaseNodeGrpcClient, - wallet_client: &'a mut WalletGrpcClient, + base_node_client: &'a mut BaseNodeClient>, + wallet_client: &'a mut WalletClient>, config: Arc, ) -> Self { Self { diff --git a/applications/minotari_merge_mining_proxy/src/config.rs b/applications/minotari_merge_mining_proxy/src/config.rs index 2eddef5de1..0684db75de 100644 --- a/applications/minotari_merge_mining_proxy/src/config.rs +++ b/applications/minotari_merge_mining_proxy/src/config.rs @@ -43,6 +43,8 @@ pub struct MergeMiningProxyConfig { pub monerod_use_auth: bool, /// The Minotari base node's GRPC address pub base_node_grpc_address: Option, + /// GRPC authentication for base node + pub base_node_grpc_authentication: GrpcAuthentication, /// The Minotari wallet's GRPC address pub console_wallet_grpc_address: Option, /// GRPC authentication for console wallet @@ -80,6 +82,7 @@ impl Default for MergeMiningProxyConfig { monerod_password: String::new(), monerod_use_auth: false, base_node_grpc_address: None, + base_node_grpc_authentication: GrpcAuthentication::default(), console_wallet_grpc_address: None, console_wallet_grpc_authentication: GrpcAuthentication::default(), listener_address: "/ip4/127.0.0.1/tcp/18081".parse().unwrap(), diff --git a/applications/minotari_merge_mining_proxy/src/proxy.rs b/applications/minotari_merge_mining_proxy/src/proxy.rs index 66e58b6aac..17b4319c9a 100644 --- a/applications/minotari_merge_mining_proxy/src/proxy.rs +++ b/applications/minotari_merge_mining_proxy/src/proxy.rs @@ -39,8 +39,8 @@ use bytes::Bytes; use hyper::{header::HeaderValue, service::Service, Body, Method, Request, Response, StatusCode, Uri}; use json::json; use jsonrpc::error::StandardError; -use minotari_node_grpc_client::{grpc, BaseNodeGrpcClient}; -use minotari_wallet_grpc_client::WalletGrpcClient; +use minotari_node_grpc_client::{grpc, grpc::base_node_client::BaseNodeClient}; +use minotari_wallet_grpc_client::{grpc::wallet_client::WalletClient, ClientAuthenticationInterceptor}; use reqwest::{ResponseBuilderExt, Url}; use serde_json as json; use tari_core::proof_of_work::{ @@ -50,6 +50,7 @@ use tari_core::proof_of_work::{ randomx_factory::RandomXFactory, }; use tari_utilities::hex::Hex; +use tonic::{codegen::InterceptedService, transport::Channel}; use tracing::{debug, error, info, instrument, trace, warn}; use crate::{ @@ -75,8 +76,8 @@ impl MergeMiningProxyService { pub fn new( config: MergeMiningProxyConfig, http_client: reqwest::Client, - base_node_client: BaseNodeGrpcClient, - wallet_client: WalletGrpcClient, + base_node_client: BaseNodeClient>, + wallet_client: WalletClient>, block_templates: BlockTemplateRepository, randomx_factory: RandomXFactory, ) -> Self { @@ -157,8 +158,8 @@ struct InnerService { config: Arc, block_templates: BlockTemplateRepository, http_client: reqwest::Client, - base_node_client: BaseNodeGrpcClient, - wallet_client: WalletGrpcClient, + base_node_client: BaseNodeClient>, + wallet_client: WalletClient>, initial_sync_achieved: Arc, current_monerod_server: Arc>>, last_assigned_monerod_server: Arc>>, diff --git a/applications/minotari_merge_mining_proxy/src/run_merge_miner.rs b/applications/minotari_merge_mining_proxy/src/run_merge_miner.rs index d5d433fede..420f9b78d8 100644 --- a/applications/minotari_merge_mining_proxy/src/run_merge_miner.rs +++ b/applications/minotari_merge_mining_proxy/src/run_merge_miner.rs @@ -20,13 +20,13 @@ // WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE // USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. -use std::convert::Infallible; +use std::{convert::Infallible, str::FromStr}; use futures::future; use hyper::{service::make_service_fn, Server}; use log::*; -use minotari_node_grpc_client::BaseNodeGrpcClient; -use minotari_wallet_grpc_client::WalletGrpcClient; +use minotari_node_grpc_client::grpc::base_node_client::BaseNodeClient; +use minotari_wallet_grpc_client::{grpc::wallet_client::WalletClient, ClientAuthenticationInterceptor}; use tari_common::{ configuration::bootstrap::{grpc_default_port, ApplicationType}, load_configuration, @@ -35,6 +35,10 @@ use tari_common::{ use tari_comms::utils::multiaddr::multiaddr_to_socketaddr; use tari_core::proof_of_work::randomx_factory::RandomXFactory; use tokio::time::Duration; +use tonic::{ + codegen::InterceptedService, + transport::{Channel, Endpoint}, +}; use crate::{ block_template_data::BlockTemplateRepository, @@ -59,25 +63,9 @@ pub async fn start_merge_miner(cli: Cli) -> Result<(), anyhow::Error> { .build() .map_err(MmProxyError::ReqwestError)?; - let base_node = multiaddr_to_socketaddr( - config - .base_node_grpc_address - .as_ref() - .expect("No base node address provided"), - )?; - info!(target: LOG_TARGET, "Connecting to base node at {}", base_node); - println!("Connecting to base node at {}", base_node); - let base_node_client = BaseNodeGrpcClient::connect(format!("http://{}", base_node)).await?; - let wallet_addr = multiaddr_to_socketaddr( - config - .console_wallet_grpc_address - .as_ref() - .expect("No waller address provided"), - )?; - info!(target: LOG_TARGET, "Connecting to wallet at {}", wallet_addr); - let wallet_addr = format!("http://{}", wallet_addr); - let wallet_client = - WalletGrpcClient::connect_with_auth(&wallet_addr, &config.console_wallet_grpc_authentication).await?; + let base_node_client = connect_base_node(&config).await?; + let wallet_client = connect_wallet(&config).await?; + let listen_addr = multiaddr_to_socketaddr(&config.listener_address)?; let randomx_factory = RandomXFactory::new(config.max_randomx_vms); let randomx_service = MergeMiningProxyService::new( @@ -110,6 +98,50 @@ pub async fn start_merge_miner(cli: Cli) -> Result<(), anyhow::Error> { } } +async fn connect_wallet( + config: &MergeMiningProxyConfig, +) -> Result>, MmProxyError> { + let wallet_addr = format!( + "http://{}", + multiaddr_to_socketaddr( + &config + .console_wallet_grpc_address + .clone() + .expect("Wallet grpc address not found") + )? + ); + info!(target: LOG_TARGET, "👛 Connecting to wallet at {}", wallet_addr); + let channel = Endpoint::from_str(&wallet_addr)?.connect().await?; + let wallet_conn = WalletClient::with_interceptor( + channel, + ClientAuthenticationInterceptor::create(&config.console_wallet_grpc_authentication)?, + ); + + Ok(wallet_conn) +} + +async fn connect_base_node( + config: &MergeMiningProxyConfig, +) -> Result>, MmProxyError> { + let base_node_addr = format!( + "http://{}", + multiaddr_to_socketaddr( + &config + .base_node_grpc_address + .clone() + .expect("Base node grpc address not found") + )? + ); + info!(target: LOG_TARGET, "👛 Connecting to base node at {}", base_node_addr); + let channel = Endpoint::from_str(&base_node_addr)?.connect().await?; + let node_conn = BaseNodeClient::with_interceptor( + channel, + ClientAuthenticationInterceptor::create(&config.base_node_grpc_authentication)?, + ); + + Ok(node_conn) +} + fn setup_grpc_config(config: &mut MergeMiningProxyConfig) { if config.base_node_grpc_address.is_none() { config.base_node_grpc_address = Some( diff --git a/applications/minotari_miner/src/config.rs b/applications/minotari_miner/src/config.rs index ca9101ddde..aaef7be8b4 100644 --- a/applications/minotari_miner/src/config.rs +++ b/applications/minotari_miner/src/config.rs @@ -49,6 +49,8 @@ use tari_comms::multiaddr::Multiaddr; pub struct MinerConfig { /// GRPC address of base node pub base_node_grpc_address: Option, + /// GRPC authentication for base node + pub base_node_grpc_authentication: GrpcAuthentication, /// GRPC address of console wallet pub wallet_grpc_address: Option, /// GRPC authentication for console wallet @@ -97,6 +99,7 @@ impl Default for MinerConfig { fn default() -> Self { Self { base_node_grpc_address: None, + base_node_grpc_authentication: GrpcAuthentication::default(), wallet_grpc_address: None, wallet_grpc_authentication: GrpcAuthentication::default(), num_mining_threads: num_cpus::get(), diff --git a/applications/minotari_miner/src/run_miner.rs b/applications/minotari_miner/src/run_miner.rs index 815ccfe757..4c51695c56 100644 --- a/applications/minotari_miner/src/run_miner.rs +++ b/applications/minotari_miner/src/run_miner.rs @@ -57,6 +57,7 @@ pub const LOG_TARGET: &str = "minotari::miner::main"; pub const LOG_TARGET_FILE: &str = "minotari::logging::miner::main"; type WalletGrpcClient = WalletClient>; +type BaseNodeGrpcClient = BaseNodeClient>; #[allow(clippy::too_many_lines)] pub async fn start_miner(cli: Cli) -> Result<(), ExitError> { @@ -167,18 +168,18 @@ pub async fn start_miner(cli: Cli) -> Result<(), ExitError> { } } -async fn connect(config: &MinerConfig) -> Result<(BaseNodeClient, WalletGrpcClient), MinerError> { - let base_node_addr = format!( - "http://{}", - multiaddr_to_socketaddr( - &config - .base_node_grpc_address - .clone() - .expect("no base node grpc address found"), - )? - ); - info!(target: LOG_TARGET, "🔗 Connecting to base node at {}", base_node_addr); - let node_conn = BaseNodeClient::connect(base_node_addr).await?; +async fn connect(config: &MinerConfig) -> Result<(BaseNodeGrpcClient, WalletGrpcClient), MinerError> { + let node_conn = match connect_base_node(config).await { + Ok(client) => client, + Err(e) => { + error!(target: LOG_TARGET, "Could not connect to base node"); + error!( + target: LOG_TARGET, + "Is its grpc running? try running it with `--enable-grpc` or enable it in config" + ); + return Err(e); + }, + }; let wallet_conn = match connect_wallet(config).await { Ok(client) => client, @@ -215,8 +216,28 @@ async fn connect_wallet(config: &MinerConfig) -> Result Result { + let base_node_addr = format!( + "http://{}", + multiaddr_to_socketaddr( + &config + .base_node_grpc_address + .clone() + .expect("Base node grpc address not found") + )? + ); + info!(target: LOG_TARGET, "👛 Connecting to base node at {}", base_node_addr); + let channel = Endpoint::from_str(&base_node_addr)?.connect().await?; + let node_conn = BaseNodeClient::with_interceptor( + channel, + ClientAuthenticationInterceptor::create(&config.base_node_grpc_authentication)?, + ); + + Ok(node_conn) +} + async fn mining_cycle( - node_conn: &mut BaseNodeClient, + node_conn: &mut BaseNodeGrpcClient, wallet_conn: &mut WalletGrpcClient, config: &MinerConfig, cli: &Cli, @@ -336,7 +357,7 @@ pub async fn display_report(report: &MiningReport, num_mining_threads: usize) { /// If config async fn validate_tip( - node_conn: &mut BaseNodeClient, + node_conn: &mut BaseNodeGrpcClient, height: u64, mine_until_height: Option, ) -> Result<(), MinerError> { diff --git a/applications/minotari_node/src/bootstrap.rs b/applications/minotari_node/src/bootstrap.rs index a09ab97725..c2d0ce7753 100644 --- a/applications/minotari_node/src/bootstrap.rs +++ b/applications/minotari_node/src/bootstrap.rs @@ -131,6 +131,7 @@ where B: BlockchainBackend + 'static self.rules.clone(), base_node_config.messaging_request_timeout, self.randomx_factory.clone(), + base_node_config.state_machine.clone(), )) .add_initializer(MempoolServiceInitializer::new( self.mempool.clone(), diff --git a/applications/minotari_node/src/cli.rs b/applications/minotari_node/src/cli.rs index 041f7bc0d5..924974c2aa 100644 --- a/applications/minotari_node/src/cli.rs +++ b/applications/minotari_node/src/cli.rs @@ -45,6 +45,8 @@ pub struct Cli { pub watch: Option, #[clap(long, alias = "profile")] pub profile_with_tokio_console: bool, + #[clap(long, env = "MINOTARI_NODE_ENABLE_GRPC", alias = "enable-grpc")] + pub grpc_enabled: bool, } impl ConfigOverrideProvider for Cli { diff --git a/applications/minotari_node/src/commands/command/hash_grpc_password.rs b/applications/minotari_node/src/commands/command/hash_grpc_password.rs new file mode 100644 index 0000000000..5571f7c0d0 --- /dev/null +++ b/applications/minotari_node/src/commands/command/hash_grpc_password.rs @@ -0,0 +1,69 @@ +// Copyright 2023, The Tari Project +// +// Redistribution and use in source and binary forms, with or without modification, are permitted provided that the +// following conditions are met: +// +// 1. Redistributions of source code must retain the above copyright notice, this list of conditions and the following +// disclaimer. +// +// 2. Redistributions in binary form must reproduce the above copyright notice, this list of conditions and the +// following disclaimer in the documentation and/or other materials provided with the distribution. +// +// 3. Neither the name of the copyright holder nor the names of its contributors may be used to endorse or promote +// products derived from this software without specific prior written permission. +// +// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, +// INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE +// DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, +// SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR +// SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, +// WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE +// USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + +use anyhow::{anyhow, Error}; +use async_trait::async_trait; +use clap::Parser; +use minotari_app_grpc::authentication::salted_password::create_salted_hashed_password; + +use super::{CommandContext, HandleCommand}; + +/// Hashes the GRPC authentication password from the config and returns an argon2 hash +#[derive(Debug, Parser)] +pub struct Args {} + +#[async_trait] +impl HandleCommand for CommandContext { + async fn handle_command(&mut self, _: Args) -> Result<(), Error> { + self.hash_grpc_password().await + } +} + +impl CommandContext { + pub async fn hash_grpc_password(&mut self) -> Result<(), Error> { + match self + .config + .base_node + .grpc_authentication + .username_password() + .ok_or_else(|| anyhow!("GRPC basic auth is not configured")) + { + Ok((username, password)) => { + match create_salted_hashed_password(password.reveal()).map_err(|e| anyhow!(e.to_string())) { + Ok(hashed_password) => { + println!("Your hashed password is:"); + println!("{}", *hashed_password); + println!(); + println!( + "Use HTTP basic auth with username '{}' and the hashed password to make GRPC requests", + username + ); + }, + Err(e) => eprintln!("HashGrpcPassword error! {}", e), + } + }, + Err(e) => eprintln!("HashGrpcPassword error! {}", e), + } + + Ok(()) + } +} diff --git a/applications/minotari_node/src/commands/command/mod.rs b/applications/minotari_node/src/commands/command/mod.rs index 4a02c670d0..e4efb921aa 100644 --- a/applications/minotari_node/src/commands/command/mod.rs +++ b/applications/minotari_node/src/commands/command/mod.rs @@ -35,6 +35,7 @@ mod get_mempool_stats; mod get_network_stats; mod get_peer; mod get_state_info; +mod hash_grpc_password; mod header_stats; mod list_banned_peers; mod list_connections; @@ -136,6 +137,7 @@ pub enum Command { Quit(quit::Args), Exit(quit::Args), Watch(watch_command::Args), + HashGrpcPassword(hash_grpc_password::Args), } impl Command { @@ -228,6 +230,7 @@ impl CommandContext { Command::Status(_) | Command::Watch(_) | Command::ListValidatorNodes(_) | + Command::HashGrpcPassword(_) | Command::Quit(_) | Command::Exit(_) => 30, // These commands involve intense blockchain db operations and needs a lot of time to complete @@ -293,6 +296,7 @@ impl HandleCommand for CommandContext { Command::Quit(args) | Command::Exit(args) => self.handle_command(args).await, Command::Watch(args) => self.handle_command(args).await, Command::ListValidatorNodes(args) => self.handle_command(args).await, + Command::HashGrpcPassword(args) => self.handle_command(args).await, } } } diff --git a/applications/minotari_node/src/config.rs b/applications/minotari_node/src/config.rs index 29f508e35c..bd642ae7e9 100644 --- a/applications/minotari_node/src/config.rs +++ b/applications/minotari_node/src/config.rs @@ -34,6 +34,7 @@ use tari_common::{ DefaultConfigLoader, SubConfigPath, }; +use tari_common_types::grpc_authentication::GrpcAuthentication; use tari_comms::multiaddr::Multiaddr; use tari_core::{ base_node::BaseNodeStateMachineConfig, @@ -89,6 +90,8 @@ pub struct BaseNodeConfig { pub grpc_address: Option, /// GRPC server config - which methods are active and which not pub grpc_server_deny_methods: Vec, + /// GRPC authentication mode + pub grpc_authentication: GrpcAuthentication, /// A path to the file that stores the base node identity and secret key pub identity_file: PathBuf, /// Spin up and use a built-in Tor instance. This only works on macos/linux - requires that the wallet was built @@ -155,6 +158,7 @@ impl Default for BaseNodeConfig { GrpcMethod::Identify, GrpcMethod::GetNetworkStatus, ], + grpc_authentication: GrpcAuthentication::default(), identity_file: PathBuf::from("config/base_node_id.json"), use_libtor: false, tor_identity_file: PathBuf::from("config/base_node_tor_id.json"), diff --git a/applications/minotari_node/src/lib.rs b/applications/minotari_node/src/lib.rs index f6f3c2d0aa..bc4ebf542f 100644 --- a/applications/minotari_node/src/lib.rs +++ b/applications/minotari_node/src/lib.rs @@ -42,11 +42,13 @@ use std::{process, sync::Arc}; use commands::{cli_loop::CliLoop, command::CommandContext}; use futures::FutureExt; use log::*; +use minotari_app_grpc::authentication::ServerAuthenticationInterceptor; use minotari_app_utilities::{common_cli_args::CommonCliArgs, network_check::is_network_choice_valid}; use tari_common::{ configuration::bootstrap::{grpc_default_port, ApplicationType}, exit_codes::{ExitCode, ExitError}, }; +use tari_common_types::grpc_authentication::GrpcAuthentication; use tari_comms::{multiaddr::Multiaddr, utils::multiaddr::multiaddr_to_socketaddr, NodeIdentity}; use tari_shutdown::{Shutdown, ShutdownSignal}; use tokio::task; @@ -85,6 +87,7 @@ pub async fn run_base_node( non_interactive_mode: true, watch: None, profile_with_tokio_console: false, + grpc_enabled: false, }; run_base_node_with_cli(node_identity, config, cli, shutdown).await @@ -137,7 +140,8 @@ pub async fn run_base_node_with_cli( &ctx, config.base_node.grpc_server_deny_methods.clone(), ); - task::spawn(run_grpc(grpc, grpc_address, shutdown.to_signal())); + let auth = config.base_node.grpc_authentication.clone(); + task::spawn(run_grpc(grpc, grpc_address, auth, shutdown.to_signal())); } // Run, node, run! @@ -171,13 +175,17 @@ pub async fn run_base_node_with_cli( async fn run_grpc( grpc: grpc::base_node_grpc_server::BaseNodeGrpcServer, grpc_address: Multiaddr, + auth_config: GrpcAuthentication, interrupt_signal: ShutdownSignal, ) -> Result<(), anyhow::Error> { info!(target: LOG_TARGET, "Starting GRPC on {}", grpc_address); let grpc_address = multiaddr_to_socketaddr(&grpc_address)?; + let auth = ServerAuthenticationInterceptor::new(auth_config); + let service = minotari_app_grpc::tari_rpc::base_node_server::BaseNodeServer::with_interceptor(grpc, auth); + Server::builder() - .add_service(minotari_app_grpc::tari_rpc::base_node_server::BaseNodeServer::new(grpc)) + .add_service(service) .serve_with_shutdown(grpc_address, interrupt_signal.map(|_| ())) .await .map_err(|err| { diff --git a/base_layer/core/Cargo.toml b/base_layer/core/Cargo.toml index 4beb7bb763..105e51fc42 100644 --- a/base_layer/core/Cargo.toml +++ b/base_layer/core/Cargo.toml @@ -16,6 +16,7 @@ mempool_proto = [] base_node = ["tari_mmr", "transactions", "mempool_proto", "base_node_proto", "monero", "randomx-rs"] base_node_proto = [] benches = ["base_node"] +metrics = ["tari_metrics"] [dependencies] tari_common = { path = "../../common" } @@ -24,7 +25,7 @@ tari_comms = { path = "../../comms/core" } tari_comms_dht = { path = "../../comms/dht" } tari_comms_rpc_macros = { path = "../../comms/rpc_macros" } tari_crypto = { version = "0.19", features = ["borsh"] } -tari_metrics = { path = "../../infrastructure/metrics" } +tari_metrics = { path = "../../infrastructure/metrics", optional = true } tari_mmr = { path = "../../base_layer/mmr", optional = true, features = ["native_bitmap"] } tari_p2p = { path = "../../base_layer/p2p" } tari_script = { path = "../../infrastructure/tari_script" } diff --git a/base_layer/core/src/base_node/comms_interface/error.rs b/base_layer/core/src/base_node/comms_interface/error.rs index 35bc42ce3b..555b37ed62 100644 --- a/base_layer/core/src/base_node/comms_interface/error.rs +++ b/base_layer/core/src/base_node/comms_interface/error.rs @@ -64,8 +64,6 @@ pub enum CommsInterfaceError { InternalError(String), #[error("API responded with an error: {0}")] ApiError(String), - #[error("Header not found at {0}")] - BlockHeaderNotFound(u64), #[error("Block error: {0}")] BlockError(#[from] BlockError), #[error("Invalid request for {request}: {details}")] diff --git a/base_layer/core/src/base_node/comms_interface/inbound_handlers.rs b/base_layer/core/src/base_node/comms_interface/inbound_handlers.rs index de0af66acb..c00c78023a 100644 --- a/base_layer/core/src/base_node/comms_interface/inbound_handlers.rs +++ b/base_layer/core/src/base_node/comms_interface/inbound_handlers.rs @@ -20,10 +20,11 @@ // WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE // USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. +#[cfg(feature = "metrics")] +use std::convert::{TryFrom, TryInto}; use std::{ cmp::max, collections::HashSet, - convert::{TryFrom, TryInto}, sync::Arc, time::{Duration, Instant}, }; @@ -35,17 +36,16 @@ use tari_comms::{connectivity::ConnectivityRequester, peer_manager::NodeId}; use tari_utilities::hex::Hex; use tokio::sync::RwLock; +#[cfg(feature = "metrics")] +use crate::base_node::metrics; use crate::{ - base_node::{ - comms_interface::{ - error::CommsInterfaceError, - local_interface::BlockEventSender, - FetchMempoolTransactionsResponse, - NodeCommsRequest, - NodeCommsResponse, - OutboundNodeCommsInterface, - }, - metrics, + base_node::comms_interface::{ + error::CommsInterfaceError, + local_interface::BlockEventSender, + FetchMempoolTransactionsResponse, + NodeCommsRequest, + NodeCommsResponse, + OutboundNodeCommsInterface, }, blocks::{Block, BlockBuilder, BlockHeader, BlockHeaderValidationError, ChainBlock, NewBlock, NewBlockTemplate}, chain_storage::{async_db::AsyncBlockchainDb, BlockAddResult, BlockchainBackend, ChainStorageError}, @@ -619,6 +619,7 @@ where B: BlockchainBackend + 'static .build(); return Ok(block); } + #[cfg(feature = "metrics")] metrics::compact_block_tx_misses(header.height).set(excess_sigs.len() as i64); let block = self.request_full_block_from_peer(source_peer, block_hash).await?; return Ok(block); @@ -628,6 +629,7 @@ where B: BlockchainBackend + 'static let (known_transactions, missing_excess_sigs) = self.mempool.retrieve_by_excess_sigs(excess_sigs).await?; let known_transactions = known_transactions.into_iter().map(|tx| (*tx).clone()).collect(); + #[cfg(feature = "metrics")] metrics::compact_block_tx_misses(header.height).set(missing_excess_sigs.len() as i64); let mut builder = BlockBuilder::new(header.version) @@ -673,6 +675,7 @@ where B: BlockchainBackend + 'static not_found.len() ); + #[cfg(feature = "metrics")] metrics::compact_block_full_misses(header.height).inc(); let block = self.request_full_block_from_peer(source_peer, block_hash).await?; return Ok(block); @@ -710,6 +713,7 @@ where B: BlockchainBackend + 'static e, ); + #[cfg(feature = "metrics")] metrics::compact_block_mmr_mismatch(header.height).inc(); let block = self.request_full_block_from_peer(source_peer, block_hash).await?; return Ok(block); @@ -834,8 +838,11 @@ where B: BlockchainBackend + 'static }, Err(e @ ChainStorageError::ValidationError { .. }) => { - let block_hash = block.hash(); - metrics::rejected_blocks(block.header.height, &block_hash).inc(); + #[cfg(feature = "metrics")] + { + let block_hash = block.hash(); + metrics::rejected_blocks(block.header.height, &block_hash).inc(); + } warn!( target: LOG_TARGET, "Peer {} sent an invalid block: {}", @@ -856,14 +863,20 @@ where B: BlockchainBackend + 'static } }, // SECURITY: This indicates an issue in the transaction validator. - None => metrics::rejected_local_blocks(block.header.height, &block_hash).inc(), + None => { + #[cfg(feature = "metrics")] + metrics::rejected_local_blocks(block.header.height, &block_hash).inc(); + debug!(target: LOG_TARGET, "There may have been an issue in the transaction validator"); + }, } self.publish_block_event(BlockEvent::AddBlockValidationFailed { block, source_peer }); Err(e.into()) }, Err(e) => { + #[cfg(feature = "metrics")] metrics::rejected_blocks(block.header.height, &block.hash()).inc(); + self.publish_block_event(BlockEvent::AddBlockErrored { block }); Err(e.into()) }, @@ -936,6 +949,7 @@ where B: BlockchainBackend + 'static async fn update_block_result_metrics(&self, block_add_result: &BlockAddResult) -> Result<(), CommsInterfaceError> { fn update_target_difficulty(block: &ChainBlock) { + #[cfg(feature = "metrics")] match block.header().pow_algo() { PowAlgorithm::Sha3x => { metrics::target_difficulty_sha() @@ -950,25 +964,33 @@ where B: BlockchainBackend + 'static match block_add_result { BlockAddResult::Ok(ref block) => { - #[allow(clippy::cast_possible_wrap)] - metrics::tip_height().set(block.height() as i64); update_target_difficulty(block); - let utxo_set_size = self.blockchain_db.utxo_count().await?; - metrics::utxo_set_size().set(utxo_set_size.try_into().unwrap_or(i64::MAX)); + + #[cfg(feature = "metrics")] + { + #[allow(clippy::cast_possible_wrap)] + metrics::tip_height().set(block.height() as i64); + let utxo_set_size = self.blockchain_db.utxo_count().await?; + metrics::utxo_set_size().set(utxo_set_size.try_into().unwrap_or(i64::MAX)); + } }, + #[allow(unused_variables)] // `removed` variable is used if metrics are compiled BlockAddResult::ChainReorg { added, removed } => { + #[cfg(feature = "metrics")] if let Some(fork_height) = added.last().map(|b| b.height()) { #[allow(clippy::cast_possible_wrap)] metrics::tip_height().set(fork_height as i64); metrics::reorg(fork_height, added.len(), removed.len()).inc(); + + let utxo_set_size = self.blockchain_db.utxo_count().await?; + metrics::utxo_set_size().set(utxo_set_size.try_into().unwrap_or(i64::MAX)); } for block in added { update_target_difficulty(block); } - let utxo_set_size = self.blockchain_db.utxo_count().await?; - metrics::utxo_set_size().set(utxo_set_size.try_into().unwrap_or(i64::MAX)); }, BlockAddResult::OrphanBlock => { + #[cfg(feature = "metrics")] metrics::orphaned_blocks().inc(); }, _ => {}, diff --git a/base_layer/core/src/base_node/mod.rs b/base_layer/core/src/base_node/mod.rs index feae403b1f..c7a37b6524 100644 --- a/base_layer/core/src/base_node/mod.rs +++ b/base_layer/core/src/base_node/mod.rs @@ -39,7 +39,7 @@ pub mod chain_metadata_service; pub mod comms_interface; #[cfg(feature = "base_node")] pub use comms_interface::LocalNodeCommsInterface; -#[cfg(feature = "base_node")] +#[cfg(feature = "metrics")] mod metrics; #[cfg(feature = "base_node")] diff --git a/base_layer/core/src/base_node/service/error.rs b/base_layer/core/src/base_node/service/error.rs index 43684c5b07..497813cec6 100644 --- a/base_layer/core/src/base_node/service/error.rs +++ b/base_layer/core/src/base_node/service/error.rs @@ -20,14 +20,12 @@ // WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE // USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. -use std::time::Duration; - use tari_comms_dht::outbound::DhtOutboundError; use thiserror::Error; use crate::{ base_node::{comms_interface::CommsInterfaceError, service::initializer::ExtractBlockError}, - common::BanReason, + common::{BanPeriod, BanReason}, }; #[derive(Debug, Error)] @@ -48,58 +46,43 @@ impl BaseNodeServiceError { pub fn get_ban_reason(&self) -> Option { match self { BaseNodeServiceError::CommsInterfaceError(comms) => match comms { - CommsInterfaceError::UnexpectedApiResponse => Some(BanReason { - reason: "Unexpected API response".to_string(), - ban_duration: Duration::from_secs(60), - }), - CommsInterfaceError::RequestTimedOut => Some(BanReason { - reason: "Request timed out".to_string(), - ban_duration: Duration::from_secs(60), - }), - CommsInterfaceError::InvalidPeerResponse(e) => Some(BanReason { - reason: format!("Invalid peer response: {}", e), - ban_duration: Duration::from_secs(60), - }), - CommsInterfaceError::InvalidBlockHeader(e) => Some(BanReason { - reason: format!("Invalid block header: {}", e), - ban_duration: Duration::from_secs(60), + err @ CommsInterfaceError::UnexpectedApiResponse | err @ CommsInterfaceError::RequestTimedOut => { + Some(BanReason { + reason: err.to_string(), + ban_duration: BanPeriod::Short, + }) + }, + err @ CommsInterfaceError::InvalidPeerResponse(_) | + err @ CommsInterfaceError::InvalidBlockHeader(_) | + err @ CommsInterfaceError::TransactionError(_) | + err @ CommsInterfaceError::InvalidFullBlock { .. } | + err @ CommsInterfaceError::InvalidRequest { .. } => Some(BanReason { + reason: err.to_string(), + ban_duration: BanPeriod::Long, }), - CommsInterfaceError::TransactionError(e) => Some(BanReason { - reason: format!("Invalid transaction: {}", e), - ban_duration: Duration::from_secs(60), - }), - CommsInterfaceError::InvalidRequest { request, details } => Some(BanReason { - reason: format!("Invalid request: {} ({})", request, details), - ban_duration: Duration::from_secs(60), + CommsInterfaceError::MempoolError(e) => e.get_ban_reason(), + CommsInterfaceError::TransportChannelError(e) => Some(BanReason { + reason: e.to_string(), + ban_duration: BanPeriod::Short, }), + CommsInterfaceError::ChainStorageError(e) => e.get_ban_reason(), + CommsInterfaceError::MergeMineError(e) => e.get_ban_reason(), CommsInterfaceError::NoBootstrapNodesConfigured | - CommsInterfaceError::TransportChannelError(_) | - CommsInterfaceError::ChainStorageError(_) | CommsInterfaceError::OutboundMessageError(_) | - CommsInterfaceError::MempoolError(_) | CommsInterfaceError::BroadcastFailed | CommsInterfaceError::InternalChannelError(_) | CommsInterfaceError::DifficultyAdjustmentManagerError(_) | CommsInterfaceError::InternalError(_) | CommsInterfaceError::ApiError(_) | - CommsInterfaceError::BlockHeaderNotFound(_) | CommsInterfaceError::BlockError(_) | - CommsInterfaceError::InvalidFullBlock { .. } | - CommsInterfaceError::MergeMineError(_) | CommsInterfaceError::DifficultyError(_) => None, }, BaseNodeServiceError::DhtOutboundError(_) => None, - BaseNodeServiceError::InvalidRequest(e) => Some(BanReason { - reason: format!("Invalid request: {}", e), - ban_duration: Duration::from_secs(60), - }), - BaseNodeServiceError::InvalidResponse(e) => Some(BanReason { - reason: format!("Invalid response: {}", e), - ban_duration: Duration::from_secs(60), - }), - BaseNodeServiceError::InvalidBlockMessage(e) => Some(BanReason { - reason: format!("Invalid block message: {}", e), - ban_duration: Duration::from_secs(60), + err @ BaseNodeServiceError::InvalidRequest(_) | + err @ BaseNodeServiceError::InvalidResponse(_) | + err @ BaseNodeServiceError::InvalidBlockMessage(_) => Some(BanReason { + reason: err.to_string(), + ban_duration: BanPeriod::Long, }), } } diff --git a/base_layer/core/src/base_node/service/initializer.rs b/base_layer/core/src/base_node/service/initializer.rs index d906491a53..b1d42d0848 100644 --- a/base_layer/core/src/base_node/service/initializer.rs +++ b/base_layer/core/src/base_node/service/initializer.rs @@ -46,6 +46,7 @@ use crate::{ base_node::{ comms_interface::{InboundNodeCommsHandlers, LocalNodeCommsInterface, OutboundNodeCommsInterface}, service::service::{BaseNodeService, BaseNodeStreams}, + BaseNodeStateMachineConfig, StateMachineHandle, }, blocks::NewBlock, @@ -68,6 +69,7 @@ pub struct BaseNodeServiceInitializer { consensus_manager: ConsensusManager, service_request_timeout: Duration, randomx_factory: RandomXFactory, + base_node_config: BaseNodeStateMachineConfig, } impl BaseNodeServiceInitializer @@ -81,6 +83,7 @@ where T: BlockchainBackend consensus_manager: ConsensusManager, service_request_timeout: Duration, randomx_factory: RandomXFactory, + base_node_config: BaseNodeStateMachineConfig, ) -> Self { Self { inbound_message_subscription_factory, @@ -89,6 +92,7 @@ where T: BlockchainBackend consensus_manager, service_request_timeout, randomx_factory, + base_node_config, } } @@ -180,6 +184,7 @@ where T: BlockchainBackend + 'static let mempool = self.mempool.clone(); let consensus_manager = self.consensus_manager.clone(); let randomx_factory = self.randomx_factory.clone(); + let config = self.base_node_config.clone(); context.spawn_when_ready(move |handles| async move { let dht = handles.expect_handle::(); @@ -213,6 +218,7 @@ where T: BlockchainBackend + 'static service_request_timeout, state_machine, connectivity, + config, ) .start(streams); futures::pin_mut!(service); diff --git a/base_layer/core/src/base_node/service/service.rs b/base_layer/core/src/base_node/service/service.rs index fc0184001d..5d6c6f78f4 100644 --- a/base_layer/core/src/base_node/service/service.rs +++ b/base_layer/core/src/base_node/service/service.rs @@ -55,10 +55,12 @@ use crate::{ comms_interface::{CommsInterfaceError, InboundNodeCommsHandlers, NodeCommsRequest, NodeCommsResponse}, service::{error::BaseNodeServiceError, initializer::ExtractBlockError}, state_machine_service::states::StateInfo, + BaseNodeStateMachineConfig, StateMachineHandle, }, blocks::{Block, NewBlock}, chain_storage::{BlockchainBackend, ChainStorageError}, + common::BanPeriod, proto as shared_protos, proto::base_node as proto, }; @@ -97,6 +99,7 @@ pub(super) struct BaseNodeService { service_request_timeout: Duration, state_machine_handle: StateMachineHandle, connectivity: ConnectivityRequester, + base_node_config: BaseNodeStateMachineConfig, } impl BaseNodeService @@ -108,6 +111,7 @@ where B: BlockchainBackend + 'static service_request_timeout: Duration, state_machine_handle: StateMachineHandle, connectivity: ConnectivityRequester, + base_node_config: BaseNodeStateMachineConfig, ) -> Self { let (timeout_sender, timeout_receiver) = mpsc::channel(100); Self { @@ -119,6 +123,7 @@ where B: BlockchainBackend + 'static service_request_timeout, state_machine_handle, connectivity, + base_node_config, } } @@ -256,7 +261,8 @@ where B: BlockchainBackend + 'static let outbound_message_service = self.outbound_message_service.clone(); let state_machine_handle = self.state_machine_handle.clone(); let mut connectivity = self.connectivity.clone(); - + let short_ban = self.base_node_config.blockchain_sync_config.short_ban_period; + let long_ban = self.base_node_config.blockchain_sync_config.ban_period; task::spawn(async move { let result = handle_incoming_request( inbound_nch, @@ -267,12 +273,12 @@ where B: BlockchainBackend + 'static .await; if let Err(e) = result { if let Some(ban_reason) = e.get_ban_reason() { + let duration = match ban_reason.ban_duration { + BanPeriod::Short => short_ban, + BanPeriod::Long => long_ban, + }; let _drop = connectivity - .ban_peer_until( - domain_msg.source_peer.node_id.clone(), - ban_reason.ban_duration(), - ban_reason.reason().to_string(), - ) + .ban_peer_until(domain_msg.source_peer.node_id.clone(), duration, ban_reason.reason) .await .map_err(|e| error!(target: LOG_TARGET, "Failed to ban peer: {:?}", e)); } @@ -287,18 +293,21 @@ where B: BlockchainBackend + 'static ) { let waiting_requests = self.waiting_requests.clone(); let mut connectivity_requester = self.connectivity.clone(); + + let short_ban = self.base_node_config.blockchain_sync_config.short_ban_period; + let long_ban = self.base_node_config.blockchain_sync_config.ban_period; task::spawn(async move { let source_peer = domain_msg.source_peer.clone(); let result = handle_incoming_response(waiting_requests, domain_msg).await; if let Err(e) = result { if let Some(ban_reason) = e.get_ban_reason() { + let duration = match ban_reason.ban_duration { + BanPeriod::Short => short_ban, + BanPeriod::Long => long_ban, + }; let _drop = connectivity_requester - .ban_peer_until( - source_peer.node_id, - ban_reason.ban_duration(), - ban_reason.reason().to_string(), - ) + .ban_peer_until(source_peer.node_id, duration, ban_reason.reason) .await .map_err(|e| error!(target: LOG_TARGET, "Failed to ban peer: {:?}", e)); } @@ -334,6 +343,10 @@ where B: BlockchainBackend + 'static return; } let inbound_nch = self.inbound_nch.clone(); + let mut connectivity_requester = self.connectivity.clone(); + let source_peer = new_block.source_peer.clone(); + let short_ban = self.base_node_config.blockchain_sync_config.short_ban_period; + let long_ban = self.base_node_config.blockchain_sync_config.ban_period; task::spawn(async move { let result = handle_incoming_block(inbound_nch, new_block).await; @@ -344,7 +357,19 @@ where B: BlockchainBackend + 'static ))) => { // Special case, dont log this again as an error }, - Err(e) => error!(target: LOG_TARGET, "Failed to handle incoming block message: {}", e), + Err(e) => { + if let Some(ban_reason) = e.get_ban_reason() { + let duration = match ban_reason.ban_duration { + BanPeriod::Short => short_ban, + BanPeriod::Long => long_ban, + }; + let _drop = connectivity_requester + .ban_peer_until(source_peer.node_id, duration, ban_reason.reason) + .await + .map_err(|e| error!(target: LOG_TARGET, "Failed to ban peer: {:?}", e)); + } + error!(target: LOG_TARGET, "Failed to handle incoming block message: {}", e) + }, } }); } diff --git a/base_layer/core/src/base_node/state_machine_service/states/block_sync.rs b/base_layer/core/src/base_node/state_machine_service/states/block_sync.rs index f40188ce7e..50ce67b73f 100644 --- a/base_layer/core/src/base_node/state_machine_service/states/block_sync.rs +++ b/base_layer/core/src/base_node/state_machine_service/states/block_sync.rs @@ -24,10 +24,11 @@ use std::time::Instant; use log::*; +#[cfg(feature = "metrics")] +use crate::base_node::metrics; use crate::{ base_node::{ comms_interface::BlockEvent, - metrics, state_machine_service::states::{BlockSyncInfo, HorizonStateSync, StateEvent, StateInfo, StatusInfo}, sync::{BlockSynchronizer, SyncPeer}, BaseNodeStateMachine, @@ -63,6 +64,7 @@ impl BlockSync { let local_nci = shared.local_node_interface.clone(); let randomx_vm_cnt = shared.get_randomx_vm_cnt(); let randomx_vm_flags = shared.get_randomx_vm_flags(); + #[cfg(feature = "metrics")] let tip_height_metric = metrics::tip_height(); synchronizer.on_starting(move |sync_peer| { let _result = status_event_sender.send(StatusInfo { @@ -81,6 +83,7 @@ impl BlockSync { BlockAddResult::Ok(block), )); + #[cfg(feature = "metrics")] tip_height_metric.set(local_height as i64); let _result = status_event_sender.send(StatusInfo { bootstrapped, diff --git a/base_layer/core/src/base_node/state_machine_service/states/header_sync.rs b/base_layer/core/src/base_node/state_machine_service/states/header_sync.rs index 677c2448ff..f3c9a382e0 100644 --- a/base_layer/core/src/base_node/state_machine_service/states/header_sync.rs +++ b/base_layer/core/src/base_node/state_machine_service/states/header_sync.rs @@ -26,10 +26,11 @@ use log::*; use tari_common_types::chain_metadata::ChainMetadata; use tari_comms::peer_manager::NodeId; +#[cfg(feature = "metrics")] +use crate::base_node::metrics; use crate::{ base_node::{ comms_interface::BlockEvent, - metrics, state_machine_service::states::{BlockSyncInfo, StateEvent, StateInfo, StatusInfo}, sync::{BlockHeaderSyncError, HeaderSynchronizer, SyncPeer}, BaseNodeStateMachine, @@ -146,6 +147,7 @@ impl HeaderSyncState { let local_nci = shared.local_node_interface.clone(); synchronizer.on_rewind(move |removed| { + #[cfg(feature = "metrics")] if let Some(fork_height) = removed.last().map(|b| b.height().saturating_sub(1)) { metrics::tip_height().set(fork_height as i64); metrics::reorg(fork_height, 0, removed.len()).inc(); diff --git a/base_layer/core/src/base_node/sync/ban.rs b/base_layer/core/src/base_node/sync/ban.rs index 9f3da126fd..a2ef918cb6 100644 --- a/base_layer/core/src/base_node/sync/ban.rs +++ b/base_layer/core/src/base_node/sync/ban.rs @@ -20,10 +20,12 @@ // WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE // USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. +use std::time::Duration; + use log::*; use tari_comms::{connectivity::ConnectivityRequester, peer_manager::NodeId}; -use crate::{base_node::BlockchainSyncConfig, common::BanReason}; +use crate::base_node::BlockchainSyncConfig; const LOG_TARGET: &str = "c::bn::sync"; @@ -39,27 +41,25 @@ impl PeerBanManager { Self { config, connectivity } } - pub async fn ban_peer_if_required(&mut self, node_id: &NodeId, ban_reason: &Option) { - if let Some(ban) = ban_reason { - if self.config.forced_sync_peers.contains(node_id) { - debug!( - target: LOG_TARGET, - "Not banning peer that is on the allow list for sync. Ban reason = {}", ban.reason() - ); - return; - } - debug!(target: LOG_TARGET, "Sync peer {} removed from the sync peer list because {}", node_id, ban.reason()); + pub async fn ban_peer_if_required(&mut self, node_id: &NodeId, ban_reason: String, ban_duration: Duration) { + if self.config.forced_sync_peers.contains(node_id) { + debug!( + target: LOG_TARGET, + "Not banning peer that is on the allow list for sync. Ban reason = {}", ban_reason + ); + return; + } + debug!(target: LOG_TARGET, "Sync peer {} removed from the sync peer list because {}", node_id, ban_reason); - match self - .connectivity - .ban_peer_until(node_id.clone(), ban.ban_duration, ban.reason().to_string()) - .await - { - Ok(_) => { - warn!(target: LOG_TARGET, "Banned sync peer {} for {:?} because {}", node_id, ban.ban_duration, ban.reason()) - }, - Err(err) => error!(target: LOG_TARGET, "Failed to ban sync peer {}: {}", node_id, err), - } + match self + .connectivity + .ban_peer_until(node_id.clone(), ban_duration, ban_reason.clone()) + .await + { + Ok(_) => { + warn!(target: LOG_TARGET, "Banned sync peer {} for {:?} because {}", node_id, ban_duration, ban_reason) + }, + Err(err) => error!(target: LOG_TARGET, "Failed to ban sync peer {}: {}", node_id, err), } } } diff --git a/base_layer/core/src/base_node/sync/block_sync/error.rs b/base_layer/core/src/base_node/sync/block_sync/error.rs index 8f44e8a3e4..97b21aa937 100644 --- a/base_layer/core/src/base_node/sync/block_sync/error.rs +++ b/base_layer/core/src/base_node/sync/block_sync/error.rs @@ -29,7 +29,11 @@ use tari_comms::{ protocol::rpc::{RpcError, RpcStatus, RpcStatusCode}, }; -use crate::{chain_storage::ChainStorageError, common::BanReason, validation::ValidationError}; +use crate::{ + chain_storage::ChainStorageError, + common::{BanPeriod, BanReason}, + validation::ValidationError, +}; #[derive(Debug, thiserror::Error)] pub enum BlockSyncError { @@ -101,25 +105,24 @@ impl BlockSyncError { } impl BlockSyncError { - pub fn get_ban_reason(&self, short_ban: Duration, long_ban: Duration) -> Option { + pub fn get_ban_reason(&self) -> Option { match self { // no ban BlockSyncError::AsyncTaskFailed(_) | - BlockSyncError::ChainStorageError(_) | BlockSyncError::ConnectivityError(_) | BlockSyncError::NoMoreSyncPeers(_) | BlockSyncError::AllSyncPeersExceedLatency | BlockSyncError::FailedToConstructChainBlock | BlockSyncError::PeerNotFound | BlockSyncError::SyncRoundFailed => None, - + BlockSyncError::ChainStorageError(e) => e.get_ban_reason(), // short ban err @ BlockSyncError::MaxLatencyExceeded { .. } | err @ BlockSyncError::PeerDidNotSupplyAllClaimedBlocks(_) | err @ BlockSyncError::RpcError(_) | err @ BlockSyncError::RpcRequestError(_) => Some(BanReason { reason: format!("{}", err), - ban_duration: short_ban, + ban_duration: BanPeriod::Short, }), // long ban @@ -128,10 +131,10 @@ impl BlockSyncError { err @ BlockSyncError::InvalidBlockBody(_) | err @ BlockSyncError::FixedHashSizeError(_) => Some(BanReason { reason: format!("{}", err), - ban_duration: long_ban, + ban_duration: BanPeriod::Long, }), - BlockSyncError::ValidationError(err) => ValidationError::get_ban_reason(err, Some(long_ban)), + BlockSyncError::ValidationError(err) => ValidationError::get_ban_reason(err), } } } diff --git a/base_layer/core/src/base_node/sync/block_sync/synchronizer.rs b/base_layer/core/src/base_node/sync/block_sync/synchronizer.rs index a77664ef74..c06325f2af 100644 --- a/base_layer/core/src/base_node/sync/block_sync/synchronizer.rs +++ b/base_layer/core/src/base_node/sync/block_sync/synchronizer.rs @@ -40,7 +40,7 @@ use crate::{ }, blocks::{Block, ChainBlock}, chain_storage::{async_db::AsyncBlockchainDb, BlockchainBackend}, - common::rolling_avg::RollingAverageTime, + common::{rolling_avg::RollingAverageTime, BanPeriod}, proto::base_node::SyncBlocksRequest, transactions::aggregated_body::AggregateBody, validation::{BlockBodyValidator, ValidationError}, @@ -187,12 +187,15 @@ impl<'a, B: BlockchainBackend + 'static> BlockSynchronizer<'a, B> { Ok(_) => return Ok(()), Err(err) => { warn!(target: LOG_TARGET, "{}", err); - let ban_reason = - BlockSyncError::get_ban_reason(&err, self.config.short_ban_period, self.config.ban_period); + let ban_reason = BlockSyncError::get_ban_reason(&err); if let Some(reason) = ban_reason { + let duration = match reason.ban_duration { + BanPeriod::Short => self.config.short_ban_period, + BanPeriod::Long => self.config.ban_period, + }; warn!(target: LOG_TARGET, "{}", err); self.peer_ban_manager - .ban_peer_if_required(&node_id, &Some(reason.clone())) + .ban_peer_if_required(&node_id, reason.reason, duration) .await; } if let BlockSyncError::MaxLatencyExceeded { .. } = err { diff --git a/base_layer/core/src/base_node/sync/header_sync/error.rs b/base_layer/core/src/base_node/sync/header_sync/error.rs index e286609049..9432b99327 100644 --- a/base_layer/core/src/base_node/sync/header_sync/error.rs +++ b/base_layer/core/src/base_node/sync/header_sync/error.rs @@ -29,7 +29,12 @@ use tari_comms::{ protocol::rpc::{RpcError, RpcStatus}, }; -use crate::{blocks::BlockError, chain_storage::ChainStorageError, common::BanReason, validation::ValidationError}; +use crate::{ + blocks::BlockError, + chain_storage::ChainStorageError, + common::{BanPeriod, BanReason}, + validation::ValidationError, +}; #[derive(Debug, thiserror::Error)] pub enum BlockHeaderSyncError { @@ -95,7 +100,7 @@ pub enum BlockHeaderSyncError { } impl BlockHeaderSyncError { - pub fn get_ban_reason(&self, short_ban: Duration, long_ban: Duration) -> Option { + pub fn get_ban_reason(&self) -> Option { match self { // no ban BlockHeaderSyncError::NoMoreSyncPeers(_) | @@ -104,15 +109,15 @@ impl BlockHeaderSyncError { BlockHeaderSyncError::AllSyncPeersExceedLatency | BlockHeaderSyncError::ConnectivityError(_) | BlockHeaderSyncError::NotInSync | - BlockHeaderSyncError::PeerNotFound | - BlockHeaderSyncError::ChainStorageError(_) => None, + BlockHeaderSyncError::PeerNotFound => None, + BlockHeaderSyncError::ChainStorageError(e) => e.get_ban_reason(), // short ban err @ BlockHeaderSyncError::MaxLatencyExceeded { .. } | err @ BlockHeaderSyncError::RpcError { .. } | err @ BlockHeaderSyncError::RpcRequestError { .. } => Some(BanReason { reason: format!("{}", err), - ban_duration: short_ban, + ban_duration: BanPeriod::Short, }), // long ban @@ -127,10 +132,10 @@ impl BlockHeaderSyncError { err @ BlockHeaderSyncError::PeerSentInaccurateChainMetadata { .. } | err @ BlockHeaderSyncError::PeerSentTooManyHeaders(_) => Some(BanReason { reason: format!("{}", err), - ban_duration: long_ban, + ban_duration: BanPeriod::Long, }), - BlockHeaderSyncError::ValidationFailed(err) => ValidationError::get_ban_reason(err, Some(long_ban)), + BlockHeaderSyncError::ValidationFailed(err) => ValidationError::get_ban_reason(err), } } } diff --git a/base_layer/core/src/base_node/sync/header_sync/synchronizer.rs b/base_layer/core/src/base_node/sync/header_sync/synchronizer.rs index bce5ada1a3..9df904a4f7 100644 --- a/base_layer/core/src/base_node/sync/header_sync/synchronizer.rs +++ b/base_layer/core/src/base_node/sync/header_sync/synchronizer.rs @@ -49,7 +49,7 @@ use crate::{ }, blocks::{BlockHeader, ChainBlock, ChainHeader}, chain_storage::{async_db::AsyncBlockchainDb, BlockchainBackend, ChainStorageError}, - common::rolling_avg::RollingAverageTime, + common::{rolling_avg::RollingAverageTime, BanPeriod}, consensus::ConsensusManager, proof_of_work::randomx_factory::RandomXFactory, proto::{ @@ -156,15 +156,15 @@ impl<'a, B: BlockchainBackend + 'static> HeaderSynchronizer<'a, B> { match self.connect_and_attempt_sync(&node_id, max_latency).await { Ok((peer, sync_result)) => return Ok((peer, sync_result)), Err(err) => { - let ban_reason = BlockHeaderSyncError::get_ban_reason( - &err, - self.config.short_ban_period, - self.config.ban_period, - ); + let ban_reason = BlockHeaderSyncError::get_ban_reason(&err); if let Some(reason) = ban_reason { warn!(target: LOG_TARGET, "{}", err); + let duration = match reason.ban_duration { + BanPeriod::Short => self.config.short_ban_period, + BanPeriod::Long => self.config.ban_period, + }; self.peer_ban_manager - .ban_peer_if_required(&node_id, &Some(reason.clone())) + .ban_peer_if_required(&node_id, reason.reason, duration) .await; } if let BlockHeaderSyncError::MaxLatencyExceeded { .. } = err { diff --git a/base_layer/core/src/base_node/sync/horizon_state_sync/error.rs b/base_layer/core/src/base_node/sync/horizon_state_sync/error.rs index 4c07210020..4f1a40ff89 100644 --- a/base_layer/core/src/base_node/sync/horizon_state_sync/error.rs +++ b/base_layer/core/src/base_node/sync/horizon_state_sync/error.rs @@ -35,7 +35,7 @@ use tokio::task; use crate::{ chain_storage::ChainStorageError, - common::BanReason, + common::{BanPeriod, BanReason}, transactions::transaction_components::TransactionError, validation::ValidationError, }; @@ -112,10 +112,10 @@ impl From for HorizonSyncError { } impl HorizonSyncError { - pub fn get_ban_reason(&self, short_ban: Duration, long_ban: Duration) -> Option { + pub fn get_ban_reason(&self) -> Option { match self { // no ban - HorizonSyncError::ChainStorageError(_) | + HorizonSyncError::ChainStorageError(e) => e.get_ban_reason(), HorizonSyncError::NoSyncPeers | HorizonSyncError::FailedSyncAllPeers | HorizonSyncError::AllSyncPeersExceedLatency | @@ -129,7 +129,7 @@ impl HorizonSyncError { err @ HorizonSyncError::RpcError { .. } | err @ HorizonSyncError::RpcStatus { .. } => Some(BanReason { reason: format!("{}", err), - ban_duration: short_ban, + ban_duration: BanPeriod::Short, }), // long ban @@ -144,10 +144,10 @@ impl HorizonSyncError { err @ HorizonSyncError::FixedHashSizeError(_) | err @ HorizonSyncError::TransactionError(_) => Some(BanReason { reason: format!("{}", err), - ban_duration: long_ban, + ban_duration: BanPeriod::Long, }), - HorizonSyncError::ValidationError(err) => ValidationError::get_ban_reason(err, Some(long_ban)), + HorizonSyncError::ValidationError(err) => ValidationError::get_ban_reason(err), } } } diff --git a/base_layer/core/src/base_node/sync/horizon_state_sync/synchronizer.rs b/base_layer/core/src/base_node/sync/horizon_state_sync/synchronizer.rs index 0725309461..489b5aa23b 100644 --- a/base_layer/core/src/base_node/sync/horizon_state_sync/synchronizer.rs +++ b/base_layer/core/src/base_node/sync/horizon_state_sync/synchronizer.rs @@ -48,7 +48,7 @@ use crate::{ }, blocks::{BlockHeader, ChainHeader, UpdateBlockAccumulatedData}, chain_storage::{async_db::AsyncBlockchainDb, BlockchainBackend, ChainStorageError, MmrTree}, - common::rolling_avg::RollingAverageTime, + common::{rolling_avg::RollingAverageTime, BanPeriod}, consensus::ConsensusManager, proto::base_node::{SyncKernelsRequest, SyncUtxosRequest, SyncUtxosResponse}, transactions::transaction_components::{ @@ -180,13 +180,16 @@ impl<'a, B: BlockchainBackend + 'static> HorizonStateSynchronization<'a, B> { Ok(_) => return Ok(()), // Try another peer Err(err) => { - let ban_reason = - HorizonSyncError::get_ban_reason(&err, self.config.short_ban_period, self.config.ban_period); + let ban_reason = HorizonSyncError::get_ban_reason(&err); if let Some(reason) = ban_reason { + let duration = match reason.ban_duration { + BanPeriod::Short => self.config.short_ban_period, + BanPeriod::Long => self.config.ban_period, + }; warn!(target: LOG_TARGET, "{}", err); self.peer_ban_manager - .ban_peer_if_required(&node_id, &Some(reason.clone())) + .ban_peer_if_required(&node_id, reason.reason, duration) .await; } if let HorizonSyncError::MaxLatencyExceeded { .. } = err { diff --git a/base_layer/core/src/base_node/sync/rpc/service.rs b/base_layer/core/src/base_node/sync/rpc/service.rs index 173d029ad1..51b94cb200 100644 --- a/base_layer/core/src/base_node/sync/rpc/service.rs +++ b/base_layer/core/src/base_node/sync/rpc/service.rs @@ -40,10 +40,11 @@ use tokio::{ }; use tracing::{instrument, span, Instrument, Level}; +#[cfg(feature = "metrics")] +use crate::base_node::metrics; use crate::{ base_node::{ comms_interface::{BlockEvent, BlockEvent::BlockSyncRewind}, - metrics, sync::{ header_sync::HEADER_SYNC_INITIAL_MAX_HEADERS, rpc::{sync_utxos_task::SyncUtxosTask, BaseNodeSyncService}, @@ -99,6 +100,7 @@ impl BaseNodeSyncRpcService { let token = Arc::new(peer); lock.push(Arc::downgrade(&token)); + #[cfg(feature = "metrics")] metrics::active_sync_peers().set(lock.len() as i64); Ok(token) } @@ -256,6 +258,7 @@ impl BaseNodeSyncService for BaseNodeSyncRpcServ } } + #[cfg(feature = "metrics")] metrics::active_sync_peers().dec(); debug!( target: LOG_TARGET, @@ -355,6 +358,7 @@ impl BaseNodeSyncService for BaseNodeSyncRpcServ } } + #[cfg(feature = "metrics")] metrics::active_sync_peers().dec(); debug!( target: LOG_TARGET, @@ -572,6 +576,7 @@ impl BaseNodeSyncService for BaseNodeSyncRpcServ } } + #[cfg(feature = "metrics")] metrics::active_sync_peers().dec(); debug!( target: LOG_TARGET, diff --git a/base_layer/core/src/base_node/sync/rpc/sync_utxos_task.rs b/base_layer/core/src/base_node/sync/rpc/sync_utxos_task.rs index 24945676f8..8b03e476a4 100644 --- a/base_layer/core/src/base_node/sync/rpc/sync_utxos_task.rs +++ b/base_layer/core/src/base_node/sync/rpc/sync_utxos_task.rs @@ -31,8 +31,9 @@ use tari_comms::{ use tari_utilities::hex::Hex; use tokio::{sync::mpsc, task}; +#[cfg(feature = "metrics")] +use crate::base_node::metrics; use crate::{ - base_node::metrics, blocks::BlockHeader, chain_storage::{async_db::AsyncBlockchainDb, BlockchainBackend}, proto::base_node::{SyncUtxosRequest, SyncUtxosResponse}, @@ -106,6 +107,7 @@ where B: BlockchainBackend + 'static target: LOG_TARGET, "UTXO stream completed for peer '{}'", self.peer_node_id ); + #[cfg(feature = "metrics")] metrics::active_sync_peers().dec(); }); diff --git a/base_layer/core/src/chain_storage/error.rs b/base_layer/core/src/chain_storage/error.rs index 75e19a12fc..47f5217697 100644 --- a/base_layer/core/src/chain_storage/error.rs +++ b/base_layer/core/src/chain_storage/error.rs @@ -30,6 +30,7 @@ use tokio::task; use crate::{ blocks::BlockError, chain_storage::MmrTree, + common::{BanPeriod, BanReason}, proof_of_work::PowError, transactions::transaction_components::TransactionError, validation::ValidationError, @@ -130,8 +131,6 @@ pub enum ChainStorageError { TransactionError(#[from] TransactionError), #[error("Could not convert data:{0}")] ConversionError(String), - #[error("Unable to spend UTXO because it has dependant UTXOS: {details}")] - UnspendableDueToDependentUtxos { details: String }, #[error("FixedHashSize Error: {0}")] FixedHashSizeError(#[from] FixedHashSizeError), #[error("Composite key length was exceeded (THIS SHOULD NEVER HAPPEN)")] @@ -150,6 +149,50 @@ impl ChainStorageError { pub fn is_key_exist_error(&self) -> bool { matches!(self, ChainStorageError::KeyExists { .. }) } + + pub fn get_ban_reason(&self) -> Option { + match self { + ChainStorageError::ProofOfWorkError { source: e } => e.get_ban_reason(), + ChainStorageError::ValidationError { source: e } => e.get_ban_reason(), + err @ ChainStorageError::UnspendableInput | + err @ ChainStorageError::MerkleMountainRangeError { .. } | + err @ ChainStorageError::MismatchedMmrRoot(_) | + err @ ChainStorageError::TransactionError(_) | + err @ ChainStorageError::SMTError(_) => Some(BanReason { + reason: err.to_string(), + ban_duration: BanPeriod::Long, + }), + _err @ ChainStorageError::AccessError(_) | + _err @ ChainStorageError::CorruptedDatabase(_) | + _err @ ChainStorageError::UnexpectedResult(_) | + _err @ ChainStorageError::InvalidOperation(_) | + _err @ ChainStorageError::UnspendError | + _err @ ChainStorageError::DataInconsistencyDetected { .. } | + _err @ ChainStorageError::CriticalError(_) | + _err @ ChainStorageError::InsertError { .. } | + _err @ ChainStorageError::InvalidQuery(_) | + _err @ ChainStorageError::InvalidArguments { .. } | + _err @ ChainStorageError::ValueNotFound { .. } | + _err @ ChainStorageError::MerkleProofError { .. } | + _err @ ChainStorageError::InvalidBlock(_) | + _err @ ChainStorageError::BlockingTaskSpawnError(_) | + _err @ ChainStorageError::LmdbError { .. } | + _err @ ChainStorageError::CannotAcquireFileLock | + _err @ ChainStorageError::IoError(_) | + _err @ ChainStorageError::CannotCalculateNonTipMmr(_) | + _err @ ChainStorageError::KeyExists { .. } | + _err @ ChainStorageError::DbResizeRequired | + _err @ ChainStorageError::DbTransactionTooLarge(_) | + _err @ ChainStorageError::DatabaseResyncRequired(_) | + _err @ ChainStorageError::BlockError(_) | + _err @ ChainStorageError::AddBlockOperationLocked | + _err @ ChainStorageError::ConversionError(_) | + _err @ ChainStorageError::FixedHashSizeError(_) | + _err @ ChainStorageError::CompositeKeyLengthExceeded | + _err @ ChainStorageError::FromKeyBytesFailed(_) | + _err @ ChainStorageError::OutOfRange => None, + } + } } impl From for ChainStorageError { diff --git a/base_layer/core/src/common/mod.rs b/base_layer/core/src/common/mod.rs index eb0c27ee3a..6b3ffcdc63 100644 --- a/base_layer/core/src/common/mod.rs +++ b/base_layer/core/src/common/mod.rs @@ -20,9 +20,6 @@ // WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE // USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. -#[cfg(feature = "base_node")] -use std::time::Duration; - use blake2::Blake2b; use digest::consts::U64; use tari_crypto::hash_domain; @@ -43,16 +40,20 @@ hash_domain!(ConfidentialOutputHashDomain, "com.tari.dan.confidential_output", 1 pub type ConfidentialOutputHasher = DomainSeparatedConsensusHasher>; /// The reason for a peer being banned -#[cfg(feature = "base_node")] #[derive(Clone, Debug)] pub struct BanReason { /// The reason for the ban pub reason: String, /// The duration of the ban - pub ban_duration: Duration, + pub ban_duration: BanPeriod, +} + +#[derive(Clone, Copy, Debug)] +pub enum BanPeriod { + Short, + Long, } -#[cfg(feature = "base_node")] impl BanReason { /// Create a new ban reason pub fn reason(&self) -> &str { @@ -60,7 +61,7 @@ impl BanReason { } /// The duration of the ban - pub fn ban_duration(&self) -> Duration { + pub fn ban_duration(&self) -> BanPeriod { self.ban_duration } } diff --git a/base_layer/core/src/mempool/error.rs b/base_layer/core/src/mempool/error.rs index bd4539ca42..d468338e0f 100644 --- a/base_layer/core/src/mempool/error.rs +++ b/base_layer/core/src/mempool/error.rs @@ -24,7 +24,11 @@ use tari_service_framework::reply_channel::TransportChannelError; use thiserror::Error; use tokio::task::JoinError; -use crate::{mempool::unconfirmed_pool::UnconfirmedPoolError, transactions::transaction_components::TransactionError}; +use crate::{ + common::{BanPeriod, BanReason}, + mempool::unconfirmed_pool::UnconfirmedPoolError, + transactions::transaction_components::TransactionError, +}; #[derive(Debug, Error)] pub enum MempoolError { @@ -45,3 +49,22 @@ pub enum MempoolError { #[error("Mempool indexes out of sync: transaction exists in txs_by_signature but not in tx_by_key")] IndexOutOfSync, } +impl MempoolError { + pub fn get_ban_reason(&self) -> Option { + match self { + _err @ MempoolError::UnconfirmedPoolError(e) => e.get_ban_reason(), + err @ MempoolError::TransactionError(_) | err @ MempoolError::TransactionNoKernels => Some(BanReason { + reason: err.to_string(), + ban_duration: BanPeriod::Long, + }), + err @ MempoolError::TransportChannelError(_) => Some(BanReason { + reason: err.to_string(), + ban_duration: BanPeriod::Short, + }), + _err @ MempoolError::RwLockPoisonError | + _err @ MempoolError::BlockingTaskError(_) | + _err @ MempoolError::InternalError(_) | + _err @ MempoolError::IndexOutOfSync => None, + } + } +} diff --git a/base_layer/core/src/mempool/mod.rs b/base_layer/core/src/mempool/mod.rs index b83317012b..39a602d3fe 100644 --- a/base_layer/core/src/mempool/mod.rs +++ b/base_layer/core/src/mempool/mod.rs @@ -42,7 +42,7 @@ mod rpc; pub use rpc::create_mempool_rpc_service; #[cfg(feature = "base_node")] pub use rpc::{MempoolRpcClient, MempoolRpcServer, MempoolRpcService, MempoolService}; -#[cfg(feature = "base_node")] +#[cfg(feature = "metrics")] mod metrics; #[cfg(feature = "base_node")] mod shrink_hashmap; diff --git a/base_layer/core/src/mempool/service/inbound_handlers.rs b/base_layer/core/src/mempool/service/inbound_handlers.rs index e7fe81fe9c..674a63454d 100644 --- a/base_layer/core/src/mempool/service/inbound_handlers.rs +++ b/base_layer/core/src/mempool/service/inbound_handlers.rs @@ -26,11 +26,12 @@ use log::*; use tari_comms::peer_manager::NodeId; use tari_utilities::hex::Hex; +#[cfg(feature = "metrics")] +use crate::mempool::metrics; use crate::{ base_node::comms_interface::{BlockEvent, BlockEvent::AddBlockErrored}, chain_storage::BlockAddResult, mempool::{ - metrics, service::{MempoolRequest, MempoolResponse, MempoolServiceError, OutboundMempoolServiceInterface}, Mempool, TxStorageResponse, @@ -135,6 +136,7 @@ impl MempoolInboundHandlers { } match self.mempool.insert(tx.clone()).await { Ok(tx_storage) => { + #[cfg(feature = "metrics")] if tx_storage.is_stored() { metrics::inbound_transactions(source_peer.as_ref()).inc(); } else { @@ -164,6 +166,7 @@ impl MempoolInboundHandlers { #[allow(clippy::cast_possible_wrap)] async fn update_pool_size_metrics(&self) { + #[cfg(feature = "metrics")] if let Ok(stats) = self.mempool.stats().await { metrics::unconfirmed_pool_size().set(stats.unconfirmed_txs as i64); metrics::reorg_pool_size().set(stats.reorg_txs as i64); diff --git a/base_layer/core/src/mempool/sync_protocol/mod.rs b/base_layer/core/src/mempool/sync_protocol/mod.rs index 811b98de2e..e1bbcfea02 100644 --- a/base_layer/core/src/mempool/sync_protocol/mod.rs +++ b/base_layer/core/src/mempool/sync_protocol/mod.rs @@ -96,10 +96,12 @@ use tokio::{ time, }; +#[cfg(feature = "metrics")] +use crate::mempool::metrics; use crate::{ base_node::comms_interface::{BlockEvent, BlockEventReceiver}, chain_storage::BlockAddResult, - mempool::{metrics, proto, Mempool, MempoolServiceConfig}, + mempool::{proto, Mempool, MempoolServiceConfig}, proto as shared_proto, transactions::transaction_components::Transaction, }; @@ -544,6 +546,7 @@ where TSubstream: AsyncRead + AsyncWrite + Unpin #[allow(clippy::cast_possible_truncation)] #[allow(clippy::cast_possible_wrap)] + #[cfg(feature = "metrics")] { let stats = self.mempool.stats().await?; metrics::unconfirmed_pool_size().set(stats.unconfirmed_txs as i64); @@ -580,6 +583,7 @@ where TSubstream: AsyncRead + AsyncWrite + Unpin let stored_result = self.mempool.insert(txn).await?; if stored_result.is_stored() { + #[cfg(feature = "metrics")] metrics::inbound_transactions(Some(&self.peer_node_id)).inc(); debug!( target: LOG_TARGET, @@ -588,6 +592,7 @@ where TSubstream: AsyncRead + AsyncWrite + Unpin self.peer_node_id.short_str() ); } else { + #[cfg(feature = "metrics")] metrics::rejected_inbound_transactions(Some(&self.peer_node_id)).inc(); debug!( target: LOG_TARGET, diff --git a/base_layer/core/src/mempool/unconfirmed_pool/error.rs b/base_layer/core/src/mempool/unconfirmed_pool/error.rs index 15322e35c4..56df0a95de 100644 --- a/base_layer/core/src/mempool/unconfirmed_pool/error.rs +++ b/base_layer/core/src/mempool/unconfirmed_pool/error.rs @@ -22,7 +22,10 @@ use thiserror::Error; -use crate::transactions::transaction_components::TransactionError; +use crate::{ + common::{BanPeriod, BanReason}, + transactions::transaction_components::TransactionError, +}; #[derive(Debug, Error)] pub enum UnconfirmedPoolError { @@ -35,3 +38,18 @@ pub enum UnconfirmedPoolError { #[error("Transaction error: `{0}`")] TransactionError(#[from] TransactionError), } +impl UnconfirmedPoolError { + pub fn get_ban_reason(&self) -> Option { + match self { + UnconfirmedPoolError::StorageOutofSync | UnconfirmedPoolError::InternalError(_) => None, + err @ UnconfirmedPoolError::TransactionNoKernels => Some(BanReason { + reason: err.to_string(), + ban_duration: BanPeriod::Long, + }), + err @ UnconfirmedPoolError::TransactionError(_) => Some(BanReason { + reason: format!("Invalid transaction: {}", err), + ban_duration: BanPeriod::Long, + }), + } + } +} diff --git a/base_layer/core/src/proof_of_work/error.rs b/base_layer/core/src/proof_of_work/error.rs index 8ddfa7d80b..1d8e8237d5 100644 --- a/base_layer/core/src/proof_of_work/error.rs +++ b/base_layer/core/src/proof_of_work/error.rs @@ -24,7 +24,10 @@ use thiserror::Error; #[cfg(feature = "base_node")] use crate::proof_of_work::monero_rx::MergeMineError; -use crate::proof_of_work::Difficulty; +use crate::{ + common::{BanPeriod, BanReason}, + proof_of_work::Difficulty, +}; /// Errors that can occur when validating a proof of work #[derive(Debug, Error)] @@ -44,6 +47,23 @@ pub enum PowError { MergeMineError(#[from] MergeMineError), } +impl PowError { + pub fn get_ban_reason(&self) -> Option { + match self { + err @ PowError::InvalidProofOfWork | + err @ PowError::AchievedDifficultyBelowMin | + err @ PowError::Sha3HeaderNonEmptyPowBytes | + err @ PowError::AchievedDifficultyTooLow { .. } | + err @ PowError::InvalidTargetDifficulty { .. } => Some(BanReason { + reason: err.to_string(), + ban_duration: BanPeriod::Long, + }), + #[cfg(feature = "base_node")] + PowError::MergeMineError(e) => e.get_ban_reason(), + } + } +} + /// Errors that can occur when adjusting the difficulty #[derive(Debug, Error, Clone, PartialEq, Eq)] pub enum DifficultyAdjustmentError { diff --git a/base_layer/core/src/proof_of_work/monero_rx/error.rs b/base_layer/core/src/proof_of_work/monero_rx/error.rs index cbbb384801..ab7d7958e5 100644 --- a/base_layer/core/src/proof_of_work/monero_rx/error.rs +++ b/base_layer/core/src/proof_of_work/monero_rx/error.rs @@ -22,7 +22,10 @@ use tari_utilities::hex::HexError; -use crate::proof_of_work::{randomx_factory::RandomXVMFactoryError, DifficultyError}; +use crate::{ + common::{BanPeriod, BanReason}, + proof_of_work::{randomx_factory::RandomXVMFactoryError, DifficultyError}, +}; /// Errors that can occur when merging Monero PoW data with Tari PoW data #[derive(Debug, thiserror::Error)] @@ -47,6 +50,25 @@ pub enum MergeMineError { DifficultyError(#[from] DifficultyError), } +impl MergeMineError { + pub fn get_ban_reason(&self) -> Option { + match self { + err @ MergeMineError::SerializedPowDataDoesNotMatch(_) | + err @ MergeMineError::SerializeError(_) | + err @ MergeMineError::DeserializeError(_) | + err @ MergeMineError::HashingError(_) | + err @ MergeMineError::ValidationError(_) | + err @ MergeMineError::InvalidMerkleRoot | + err @ MergeMineError::DifficultyError(_) | + err @ MergeMineError::HexError(_) => Some(BanReason { + reason: err.to_string(), + ban_duration: BanPeriod::Long, + }), + MergeMineError::RandomXVMFactoryError(_) => None, + } + } +} + impl From for MergeMineError { fn from(err: HexError) -> Self { MergeMineError::HexError(err.to_string()) diff --git a/base_layer/core/src/validation/error.rs b/base_layer/core/src/validation/error.rs index 798f5c937f..3daa3a08ab 100644 --- a/base_layer/core/src/validation/error.rs +++ b/base_layer/core/src/validation/error.rs @@ -20,15 +20,13 @@ // WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE // USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. -use std::time::Duration; - use tari_common_types::types::HashOutput; use thiserror::Error; use crate::{ blocks::{BlockHeaderValidationError, BlockValidationError}, chain_storage::ChainStorageError, - common::BanReason, + common::{BanPeriod, BanReason}, covenants::CovenantError, proof_of_work::{monero_rx::MergeMineError, DifficultyError, PowError}, transactions::{ @@ -137,8 +135,9 @@ impl From for ValidationError { } impl ValidationError { - pub fn get_ban_reason(&self, long_ban_duration: Option) -> Option { + pub fn get_ban_reason(&self) -> Option { match self { + ValidationError::ProofOfWorkError(e) => e.get_ban_reason(), err @ ValidationError::SerializationError(_) | err @ ValidationError::BlockHeaderError(_) | err @ ValidationError::BlockError(_) | @@ -152,12 +151,10 @@ impl ValidationError { err @ ValidationError::ContainsTxO | err @ ValidationError::ContainsDuplicateUtxoCommitment | err @ ValidationError::ChainBalanceValidationFailed(_) | - err @ ValidationError::ProofOfWorkError(_) | err @ ValidationError::ValidatingGenesis | err @ ValidationError::UnsortedOrDuplicateInput | err @ ValidationError::UnsortedOrDuplicateOutput | err @ ValidationError::UnsortedOrDuplicateKernel | - err @ ValidationError::MergeMineError(_) | err @ ValidationError::MaxTransactionWeightExceeded | err @ ValidationError::IncorrectHeight { .. } | err @ ValidationError::IncorrectPreviousHash { .. } | @@ -176,9 +173,10 @@ impl ValidationError { err @ ValidationError::DifficultyError(_) | err @ ValidationError::CoinbaseExceedsMaxLimit | err @ ValidationError::CovenantTooLarge { .. } => Some(BanReason { - reason: format!("{}", err), - ban_duration: long_ban_duration.unwrap_or_else(|| Duration::from_secs(2 * 60 * 60)), + reason: err.to_string(), + ban_duration: BanPeriod::Long, }), + ValidationError::MergeMineError(e) => e.get_ban_reason(), ValidationError::FatalStorageError(_) | ValidationError::IncorrectNumberOfTimestampsProvided { .. } => None, } } diff --git a/base_layer/core/tests/helpers/nodes.rs b/base_layer/core/tests/helpers/nodes.rs index c17f45d376..98702db9d8 100644 --- a/base_layer/core/tests/helpers/nodes.rs +++ b/base_layer/core/tests/helpers/nodes.rs @@ -427,6 +427,7 @@ async fn setup_base_node_services( consensus_manager, Duration::from_secs(60), randomx_factory, + Default::default(), )) .add_initializer(MempoolServiceInitializer::new(mempool.clone(), subscription_factory)) .add_initializer(mock_state_machine.get_initializer()) diff --git a/common/config/presets/c_base_node.toml b/common/config/presets/c_base_node.toml index d6d8db46c9..9c8a307076 100644 --- a/common/config/presets/c_base_node.toml +++ b/common/config/presets/c_base_node.toml @@ -33,6 +33,9 @@ identity_file = "config/base_node_id_nextnet.json" # Set to false to disable the base node GRPC server (default = true) #grpc_enabled = true +# gRPC authentication method (default = "none") +#grpc_authentication = { username = "admin", password = "xxxx" } + # Uncomment all gRPC server methods that should be denied default (only active when `grpc_enabled = true`) grpc_server_deny_methods = [ "get_version", diff --git a/common/config/presets/f_merge_mining_proxy.toml b/common/config/presets/f_merge_mining_proxy.toml index ce0c8bf709..8744f28b90 100644 --- a/common/config/presets/f_merge_mining_proxy.toml +++ b/common/config/presets/f_merge_mining_proxy.toml @@ -39,11 +39,14 @@ monerod_url = [# stagenet # The Minotari base node's GRPC address. (default = "/ip4/127.0.0.1/tcp/18142") #base_node_grpc_address = "/ip4/127.0.0.1/tcp/18142" +# GRPC authentication for the base node (default = "none") +#base_node_grpc_authentication = { username = "miner", password = "$argon..." } + # The Minotari wallet's GRPC address. (default = "/ip4/127.0.0.1/tcp/18143") #console_wallet_grpc_address = "/ip4/127.0.0.1/tcp/18143" # GRPC authentication for the Minotari wallet (default = "none") -#wallet_grpc_authentication = { username: "miner", password: "$argon..." } +#wallet_grpc_authentication = { username = "miner", password = "$argon..." } # Address of the minotari_merge_mining_proxy application. (default = "/ip4/127.0.0.1/tcp/18081") #listener_address = "/ip4/127.0.0.1/tcp/18081" diff --git a/common/config/presets/g_miner.toml b/common/config/presets/g_miner.toml index 717a9da1d7..b821e95521 100644 --- a/common/config/presets/g_miner.toml +++ b/common/config/presets/g_miner.toml @@ -9,11 +9,13 @@ # GRPC address of base node (default = "/ip4/127.0.0.1/tcp/18142") #base_node_grpc_address = "/ip4/127.0.0.1/tcp/18142" +# GRPC authentication for the base node (default = "none") +#base_node_grpc_authentication = { username = "miner", password = "$argon..." } # GRPC address of console wallet (default = "/ip4/127.0.0.1/tcp/18143") #wallet_grpc_address = "/ip4/127.0.0.1/tcp/18143" # GRPC authentication for the console wallet (default = "none") -#wallet_grpc_authentication = { username: "miner", password: "$argon..." } +#wallet_grpc_authentication = { username = "miner", password = "$argon..." } # Number of mining threads (default: number of logical CPU cores) #num_mining_threads = 8 diff --git a/comms/core/Cargo.toml b/comms/core/Cargo.toml index 6b5879184c..82c486e5bd 100644 --- a/comms/core/Cargo.toml +++ b/comms/core/Cargo.toml @@ -11,7 +11,7 @@ edition = "2018" [dependencies] tari_crypto = { version = "0.19" } -tari_metrics = { path = "../../infrastructure/metrics" } +tari_metrics = { path = "../../infrastructure/metrics", optional = true } tari_storage = { path = "../../infrastructure/storage" } tari_shutdown = { path = "../../infrastructure/shutdown" } tari_utilities = { version = "0.6" } @@ -63,5 +63,5 @@ tari_common = { path = "../../common", features = ["build"] } [features] c_integration = [] -metrics = [] +metrics = ["tari_metrics"] rpc = ["tower/make", "tower/util"] diff --git a/comms/core/src/connection_manager/dialer.rs b/comms/core/src/connection_manager/dialer.rs index 823b386a7f..b5bc59565c 100644 --- a/comms/core/src/connection_manager/dialer.rs +++ b/comms/core/src/connection_manager/dialer.rs @@ -42,6 +42,8 @@ use tokio_stream::StreamExt; use tracing::{self, span, Instrument, Level}; use super::{direction::ConnectionDirection, error::ConnectionManagerError, peer_connection::PeerConnection}; +#[cfg(feature = "metrics")] +use crate::connection_manager::metrics; use crate::{ backoff::Backoff, connection_manager::{ @@ -49,7 +51,6 @@ use crate::{ common::ValidatedPeerIdentityExchange, dial_state::DialState, manager::{ConnectionManagerConfig, ConnectionManagerEvent}, - metrics, peer_connection, }, multiaddr::Multiaddr, @@ -222,6 +223,7 @@ where dial_result: Result<(PeerConnection, ValidatedPeerIdentityExchange), ConnectionManagerError>, ) { let node_id = dial_state.peer().node_id.clone(); + #[cfg(feature = "metrics")] metrics::pending_connections(Some(&node_id), ConnectionDirection::Outbound).inc(); match dial_result { @@ -276,6 +278,7 @@ where .map_err(|e| error!(target: LOG_TARGET, "Could not send reply to dial request: {:?}", e)); }); + #[cfg(feature = "metrics")] metrics::pending_connections(Some(&node_id), ConnectionDirection::Outbound).dec(); self.cancel_dial(&node_id); diff --git a/comms/core/src/connection_manager/listener.rs b/comms/core/src/connection_manager/listener.rs index 2266fd3027..c777742ea0 100644 --- a/comms/core/src/connection_manager/listener.rs +++ b/comms/core/src/connection_manager/listener.rs @@ -50,11 +50,12 @@ use super::{ ConnectionManagerConfig, ConnectionManagerEvent, }; +#[cfg(feature = "metrics")] +use crate::connection_manager::metrics; use crate::{ bounded_executor::BoundedExecutor, connection_manager::{ liveness::LivenessSession, - metrics, wire_mode::{WireMode, LIVENESS_WIRE_MODE}, }, multiaddr::Multiaddr, @@ -240,6 +241,7 @@ where let span = span!(Level::TRACE, "connection_mann::listener::inbound_task",); let inbound_fut = async move { + #[cfg(feature = "metrics")] metrics::pending_connections(None, ConnectionDirection::Inbound).inc(); match Self::read_wire_format(&mut socket, config.time_to_first_byte).await { Ok(WireMode::Comms(byte)) if byte == config.network_info.network_byte => { @@ -325,6 +327,7 @@ where }, } + #[cfg(feature = "metrics")] metrics::pending_connections(None, ConnectionDirection::Inbound).dec(); } .instrument(span); diff --git a/comms/core/src/connection_manager/manager.rs b/comms/core/src/connection_manager/manager.rs index 71521708aa..42b64d4338 100644 --- a/comms/core/src/connection_manager/manager.rs +++ b/comms/core/src/connection_manager/manager.rs @@ -41,9 +41,13 @@ use super::{ peer_connection::PeerConnection, requester::ConnectionManagerRequest, }; +#[cfg(feature = "metrics")] +use crate::connection_manager::metrics; +#[cfg(feature = "metrics")] +use crate::connection_manager::ConnectionDirection; use crate::{ backoff::Backoff, - connection_manager::{metrics, ConnectionDirection, ConnectionId}, + connection_manager::ConnectionId, multiplexing::Substream, noise::NoiseConfig, peer_manager::{NodeId, NodeIdentity, PeerManagerError}, @@ -421,6 +425,7 @@ where node_id.short_str(), proto_str ); + #[cfg(feature = "metrics")] metrics::inbound_substream_counter(&node_id, &protocol).inc(); let notify_fut = self .protocols @@ -452,14 +457,17 @@ where .send(DialerRequest::NotifyNewInboundConnection(conn.clone())) .await; } + #[cfg(feature = "metrics")] metrics::successful_connections(conn.peer_node_id(), conn.direction()).inc(); self.publish_event(PeerConnected(conn)); }, PeerConnectFailed(peer, err) => { + #[cfg(feature = "metrics")] metrics::failed_connections(&peer, ConnectionDirection::Outbound).inc(); self.publish_event(PeerConnectFailed(peer, err)); }, PeerInboundConnectFailed(err) => { + #[cfg(feature = "metrics")] metrics::failed_connections(&Default::default(), ConnectionDirection::Inbound).inc(); self.publish_event(PeerInboundConnectFailed(err)); }, diff --git a/comms/core/src/connection_manager/mod.rs b/comms/core/src/connection_manager/mod.rs index fe3c1fe0ed..2b3469a3d3 100644 --- a/comms/core/src/connection_manager/mod.rs +++ b/comms/core/src/connection_manager/mod.rs @@ -30,6 +30,7 @@ mod dial_state; mod dialer; mod listener; +#[cfg(feature = "metrics")] mod metrics; mod common; diff --git a/comms/core/src/protocol/messaging/inbound.rs b/comms/core/src/protocol/messaging/inbound.rs index 99ed90d218..d0a1eeaa31 100644 --- a/comms/core/src/protocol/messaging/inbound.rs +++ b/comms/core/src/protocol/messaging/inbound.rs @@ -29,7 +29,9 @@ use tokio::{ sync::{broadcast, mpsc}, }; -use super::{metrics, MessagingEvent, MessagingProtocol}; +#[cfg(feature = "metrics")] +use super::metrics; +use super::{MessagingEvent, MessagingProtocol}; use crate::{message::InboundMessage, peer_manager::NodeId}; const LOG_TARGET: &str = "comms::protocol::messaging::inbound"; @@ -60,6 +62,7 @@ impl InboundMessaging { pub async fn run(self, socket: S) where S: AsyncRead + AsyncWrite + Unpin { let peer = &self.peer; + #[cfg(feature = "metrics")] metrics::num_sessions().inc(); debug!( target: LOG_TARGET, @@ -71,11 +74,11 @@ impl InboundMessaging { tokio::pin!(stream); - let inbound_count = metrics::inbound_message_count(&self.peer); while let Some(result) = stream.next().await { match result { Ok(raw_msg) => { - inbound_count.inc(); + #[cfg(feature = "metrics")] + metrics::inbound_message_count(&self.peer).inc(); let msg_len = raw_msg.len(); let inbound_msg = InboundMessage::new(peer.clone(), raw_msg.freeze()); debug!( @@ -107,6 +110,7 @@ impl InboundMessaging { }, // LengthDelimitedCodec emits a InvalidData io error when the message length exceeds the maximum allowed Err(err) if err.kind() == io::ErrorKind::InvalidData => { + #[cfg(feature = "metrics")] metrics::error_count(peer).inc(); debug!( target: LOG_TARGET, @@ -121,6 +125,7 @@ impl InboundMessaging { break; }, Err(err) => { + #[cfg(feature = "metrics")] metrics::error_count(peer).inc(); error!( target: LOG_TARGET, @@ -136,6 +141,7 @@ impl InboundMessaging { let _ignore = self .messaging_events_tx .send(MessagingEvent::InboundProtocolExited(peer.clone())); + #[cfg(feature = "metrics")] metrics::num_sessions().dec(); debug!( target: LOG_TARGET, diff --git a/comms/core/src/protocol/messaging/mod.rs b/comms/core/src/protocol/messaging/mod.rs index 9b45008474..1d50bb5a01 100644 --- a/comms/core/src/protocol/messaging/mod.rs +++ b/comms/core/src/protocol/messaging/mod.rs @@ -34,6 +34,7 @@ pub use extension::MessagingProtocolExtension; mod error; mod forward; mod inbound; +#[cfg(feature = "metrics")] mod metrics; mod outbound; mod protocol; diff --git a/comms/core/src/protocol/messaging/outbound.rs b/comms/core/src/protocol/messaging/outbound.rs index 83d144ad4a..b560b7867e 100644 --- a/comms/core/src/protocol/messaging/outbound.rs +++ b/comms/core/src/protocol/messaging/outbound.rs @@ -26,7 +26,9 @@ use futures::{future, SinkExt, StreamExt}; use tokio::{pin, sync::mpsc}; use tracing::{debug, error, span, Instrument, Level}; -use super::{error::MessagingProtocolError, metrics, MessagingEvent, MessagingProtocol, SendFailReason}; +#[cfg(feature = "metrics")] +use super::metrics; +use super::{error::MessagingProtocolError, MessagingEvent, MessagingProtocol, SendFailReason}; use crate::{ connection_manager::{NegotiatedSubstream, PeerConnection}, connectivity::{ConnectivityError, ConnectivityRequester}, @@ -78,6 +80,7 @@ impl OutboundMessaging { "comms::messaging::outbound", node_id = self.peer_node_id.to_string().as_str() ); + #[cfg(feature = "metrics")] metrics::num_sessions().inc(); async move { debug!( @@ -101,6 +104,7 @@ impl OutboundMessaging { }, Err(MessagingProtocolError::ConnectionClosed(err)) => { // Not sure about the metrics, but feels safer to keep on registering the error in metrics for now + #[cfg(feature = "metrics")] metrics::error_count(&peer_node_id).inc(); debug!( target: LOG_TARGET, @@ -111,6 +115,7 @@ impl OutboundMessaging { ); }, Err(err) => { + #[cfg(feature = "metrics")] metrics::error_count(&peer_node_id).inc(); error!( target: LOG_TARGET, @@ -119,6 +124,7 @@ impl OutboundMessaging { }, } + #[cfg(feature = "metrics")] metrics::num_sessions().dec(); let _ignore = messaging_events_tx .send(MessagingEvent::OutboundProtocolExited(peer_node_id)) @@ -261,8 +267,10 @@ impl OutboundMessaging { v.map(|v| (v, rx)) }); + #[cfg(feature = "metrics")] let outbound_count = metrics::outbound_message_count(&peer_node_id); let stream = outbound_stream.map(|mut out_msg| { + #[cfg(feature = "metrics")] outbound_count.inc(); debug!( target: LOG_TARGET, diff --git a/comms/core/src/protocol/rpc/client/mod.rs b/comms/core/src/protocol/rpc/client/mod.rs index 76844304c3..292c397930 100644 --- a/comms/core/src/protocol/rpc/client/mod.rs +++ b/comms/core/src/protocol/rpc/client/mod.rs @@ -25,6 +25,7 @@ pub mod pool; #[cfg(test)] mod tests; +#[cfg(feature = "metrics")] mod metrics; use std::{ @@ -463,9 +464,11 @@ where TSubstream: AsyncRead + AsyncWrite + Unpin + Send + StreamId if let Some(r) = self.ready_tx.take() { let _result = r.send(Ok(())); } + #[cfg(feature = "metrics")] metrics::handshake_counter(&self.node_id, &self.protocol_id).inc(); }, Err(err) => { + #[cfg(feature = "metrics")] metrics::handshake_errors(&self.node_id, &self.protocol_id).inc(); if let Some(r) = self.ready_tx.take() { let _result = r.send(Err(err.into())); @@ -475,6 +478,7 @@ where TSubstream: AsyncRead + AsyncWrite + Unpin + Send + StreamId }, } + #[cfg(feature = "metrics")] metrics::num_sessions(&self.node_id, &self.protocol_id).inc(); loop { tokio::select! { @@ -487,6 +491,7 @@ where TSubstream: AsyncRead + AsyncWrite + Unpin + Send + StreamId match req { Some(req) => { if let Err(err) = self.handle_request(req).await { + #[cfg(feature = "metrics")] metrics::client_errors(&self.node_id, &self.protocol_id).inc(); error!(target: LOG_TARGET, "(stream={}) Unexpected error: {}. Worker is terminating.", self.stream_id(), err); break; @@ -500,6 +505,7 @@ where TSubstream: AsyncRead + AsyncWrite + Unpin + Send + StreamId } } } + #[cfg(feature = "metrics")] metrics::num_sessions(&self.node_id, &self.protocol_id).dec(); if let Err(err) = self.framed.close().await { @@ -561,6 +567,7 @@ where TSubstream: AsyncRead + AsyncWrite + Unpin + Send + StreamId self.stream_id(), start.elapsed() ); + #[cfg(feature = "metrics")] metrics::client_timeouts(&self.node_id, &self.protocol_id).inc(); let _result = reply.send(Err(RpcStatus::timed_out("Response timed out"))); return Ok(()); @@ -606,6 +613,7 @@ where TSubstream: AsyncRead + AsyncWrite + Unpin + Send + StreamId request: BaseRequest, reply: oneshot::Sender, RpcStatus>>>, ) -> Result<(), RpcError> { + #[cfg(feature = "metrics")] metrics::outbound_request_bytes(&self.node_id, &self.protocol_id).observe(request.get_ref().len() as f64); let request_id = self.next_request_id(); @@ -639,11 +647,15 @@ where TSubstream: AsyncRead + AsyncWrite + Unpin + Send + StreamId return Ok(()); } + #[cfg(feature = "metrics")] let latency = metrics::request_response_latency(&self.node_id, &self.protocol_id); + #[cfg(feature = "metrics")] let mut metrics_timer = Some(latency.start_timer()); + let timer = Instant::now(); if let Err(err) = self.send_request(req).await { warn!(target: LOG_TARGET, "{}", err); + #[cfg(feature = "metrics")] metrics::client_errors(&self.node_id, &self.protocol_id).inc(); let _result = response_tx.send(Err(err.into())).await; return Ok(()); @@ -697,6 +709,7 @@ where TSubstream: AsyncRead + AsyncWrite + Unpin + Send + StreamId method, ); + #[cfg(feature = "metrics")] if let Some(t) = metrics_timer.take() { t.observe_duration(); } @@ -707,6 +720,7 @@ where TSubstream: AsyncRead + AsyncWrite + Unpin + Send + StreamId target: LOG_TARGET, "Request {} (method={}) timed out", request_id, method, ); + #[cfg(feature = "metrics")] metrics::client_timeouts(&self.node_id, &self.protocol_id).inc(); if response_tx.is_closed() { self.premature_close(request_id, method).await?; @@ -819,6 +833,7 @@ where TSubstream: AsyncRead + AsyncWrite + Unpin + Send + StreamId protocol_name, reader.bytes_read() ); + #[cfg(feature = "metrics")] metrics::inbound_response_bytes(&self.node_id, &self.protocol_id) .observe(reader.bytes_read() as f64); let time_to_first_msg = reader.time_to_first_msg(); diff --git a/comms/core/src/protocol/rpc/message.rs b/comms/core/src/protocol/rpc/message.rs index 4722b1e065..ed377fe50f 100644 --- a/comms/core/src/protocol/rpc/message.rs +++ b/comms/core/src/protocol/rpc/message.rs @@ -128,6 +128,7 @@ impl BaseRequest { self.message } + #[allow(dead_code)] pub fn get_ref(&self) -> &T { &self.message } diff --git a/comms/core/src/protocol/rpc/server/metrics.rs b/comms/core/src/protocol/rpc/server/metrics.rs index 9592d78118..fb885fc561 100644 --- a/comms/core/src/protocol/rpc/server/metrics.rs +++ b/comms/core/src/protocol/rpc/server/metrics.rs @@ -21,6 +21,7 @@ // USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. use once_cell::sync::Lazy; +#[cfg(feature = "metrics")] use tari_metrics::{Histogram, HistogramVec, IntCounter, IntCounterVec, IntGauge, IntGaugeVec}; use crate::{ diff --git a/comms/core/src/protocol/rpc/server/mod.rs b/comms/core/src/protocol/rpc/server/mod.rs index d44fbe0632..3891ca02e9 100644 --- a/comms/core/src/protocol/rpc/server/mod.rs +++ b/comms/core/src/protocol/rpc/server/mod.rs @@ -30,6 +30,7 @@ mod handle; pub use handle::RpcServerHandle; use handle::RpcServerRequest; +#[cfg(feature = "metrics")] mod metrics; pub mod mock; @@ -354,6 +355,7 @@ where Ok(_) => {}, Err(err @ RpcServerError::HandshakeError(_)) => { debug!(target: LOG_TARGET, "Handshake error: {}", err); + #[cfg(feature = "metrics")] metrics::handshake_error_counter(&node_id, ¬ification.protocol).inc(); }, Err(err) => { @@ -464,10 +466,13 @@ where let handle = self .executor .try_spawn(async move { + #[cfg(feature = "metrics")] let num_sessions = metrics::num_sessions(&node_id, &service.protocol); + #[cfg(feature = "metrics")] num_sessions.inc(); service.start().await; info!(target: LOG_TARGET, "END OF SESSION for {} ", node_id,); + #[cfg(feature = "metrics")] num_sessions.dec(); node_id @@ -526,6 +531,7 @@ where "({}) Rpc server started.", self.logging_context_string, ); if let Err(err) = self.run().await { + #[cfg(feature = "metrics")] metrics::error_counter(&self.node_id, &self.protocol, &err).inc(); let level = match &err { RpcServerError::Io(e) => err_to_log_level(e), @@ -543,12 +549,14 @@ where } async fn run(&mut self) -> Result<(), RpcServerError> { - let request_bytes = metrics::inbound_requests_bytes(&self.node_id, &self.protocol); while let Some(result) = self.framed.next().await { match result { Ok(frame) => { + #[cfg(feature = "metrics")] + metrics::inbound_requests_bytes(&self.node_id, &self.protocol).observe(frame.len() as f64); + let start = Instant::now(); - request_bytes.observe(frame.len() as f64); + if let Err(err) = self.handle_request(frame.freeze()).await { if let Err(err) = self.framed.close().await { let level = err.io().map(err_to_log_level).unwrap_or(log::Level::Error); @@ -622,6 +630,7 @@ where flags: RpcMessageFlags::FIN.bits().into(), payload: status.to_details_bytes(), }; + #[cfg(feature = "metrics")] metrics::status_error_counter(&self.node_id, &self.protocol, status.as_status_code()).inc(); self.framed.send(bad_request.to_encoded_bytes().into()).await?; return Ok(()); @@ -685,6 +694,7 @@ where deadline, ); + #[cfg(feature = "metrics")] metrics::error_counter( &self.node_id, &self.protocol, @@ -711,6 +721,7 @@ where payload: err.to_details_bytes(), }; + #[cfg(feature = "metrics")] metrics::status_error_counter(&self.node_id, &self.protocol, err.as_status_code()).inc(); self.framed.send(resp.to_encoded_bytes().into()).await?; }, @@ -729,15 +740,17 @@ where deadline: Duration, body: Response, ) -> Result<(), RpcServerError> { - let response_bytes = metrics::outbound_response_bytes(&self.node_id, &self.protocol); trace!(target: LOG_TARGET, "Service call succeeded"); + #[cfg(feature = "metrics")] let node_id = self.node_id.clone(); + #[cfg(feature = "metrics")] let protocol = self.protocol.clone(); let mut stream = body .into_message() .map(|result| into_response(request_id, result)) .flat_map(move |message| { + #[cfg(feature = "metrics")] if !message.status.is_ok() { metrics::status_error_counter(&node_id, &protocol, message.status).inc(); } @@ -771,7 +784,8 @@ where msg = next_item => { match msg { Some(msg) => { - response_bytes.observe(msg.len() as f64); + #[cfg(feature = "metrics")] + metrics::outbound_response_bytes(&self.node_id, &self.protocol).observe(msg.len() as f64); debug!( target: LOG_TARGET, "({}) Sending body len = {}", @@ -796,6 +810,7 @@ where deadline ); + #[cfg(feature = "metrics")] metrics::error_counter( &self.node_id, &self.protocol,