diff --git a/Cargo.lock b/Cargo.lock index 537de8113375b3..f2351bbe49b0e1 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -292,6 +292,7 @@ dependencies = [ "tokio", "toml 0.7.4", "tonic 0.10.0", + "tracing", "tracing-subscriber", "walkdir", ] @@ -6254,14 +6255,14 @@ dependencies = [ [[package]] name = "enum_dispatch" -version = "0.3.8" +version = "0.3.12" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "0eb359f1476bf611266ac1f5355bc14aeca37b299d0ebccc038ee7058891c9cb" +checksum = "8f33313078bb8d4d05a2733a94ac4c2d8a0df9a2b84424ebf4f33bfc224a890e" dependencies = [ "once_cell", "proc-macro2 1.0.64", "quote 1.0.29", - "syn 1.0.105", + "syn 2.0.32", ] [[package]] diff --git a/Cargo.toml b/Cargo.toml index a4b4684f254c15..7e42ba08730af6 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -485,7 +485,7 @@ dirs = "5.0.1" ed25519-dalek = { version = "1.0.1", features = ["std", "serde"] } ed25519-dalek-bip32 = "0.2.0" either = "1.6.1" -enum_dispatch = "0.3.8" +enum_dispatch = "0.3.12" env_logger = "0.10.0" erased-serde = "0.3.13" event-listener = "2.5.3" diff --git a/crates/aptos/Cargo.toml b/crates/aptos/Cargo.toml index 62d3194255d06a..babaf6cfc5d29b 100644 --- a/crates/aptos/Cargo.toml +++ b/crates/aptos/Cargo.toml @@ -86,6 +86,7 @@ thiserror = { workspace = true } tokio = { workspace = true } toml = { workspace = true } tonic = { workspace = true } +tracing = { workspace = true } tracing-subscriber = { workspace = true } walkdir = { workspace = true } diff --git a/crates/aptos/src/node/local_testnet/faucet.rs b/crates/aptos/src/node/local_testnet/faucet.rs new file mode 100644 index 00000000000000..8194fecf057bed --- /dev/null +++ b/crates/aptos/src/node/local_testnet/faucet.rs @@ -0,0 +1,86 @@ +// Copyright © Aptos Foundation +// SPDX-License-Identifier: Apache-2.0 + +use super::{health_checker::HealthChecker, traits::ServiceManager, RunLocalTestnet}; +use anyhow::Result; +use aptos_faucet_core::server::{FunderKeyEnum, RunConfig}; +use async_trait::async_trait; +use clap::Parser; +use reqwest::Url; +use std::path::PathBuf; + +/// Args related to running a faucet in the local testnet. +#[derive(Debug, Parser)] +pub struct FaucetArgs { + /// Do not run a faucet alongside the node. + /// + /// Running a faucet alongside the node allows you to create and fund accounts + /// for testing. + #[clap(long)] + pub no_faucet: bool, + + /// This does nothing, we already run a faucet by default. We only keep this here + /// for backwards compatibility with tests. We will remove this once the commit + /// that added --no-faucet makes its way to the testnet branch. + #[clap(long, hide = true)] + pub with_faucet: bool, + + /// Port to run the faucet on. + /// + /// When running, you'll be able to use the faucet at `http://127.0.0.1:/mint` e.g. + /// `http//127.0.0.1:8081/mint` + #[clap(long, default_value_t = 8081)] + pub faucet_port: u16, + + /// Disable the delegation of faucet minting to a dedicated account. + #[clap(long)] + pub do_not_delegate: bool, +} + +#[derive(Clone, Debug)] +pub struct FaucetManager { + config: RunConfig, + prerequisite_health_checkers: Vec, +} + +impl FaucetManager { + pub fn new( + args: &RunLocalTestnet, + prerequisite_health_checkers: Vec, + test_dir: PathBuf, + node_api_url: Url, + ) -> Result { + Ok(Self { + config: RunConfig::build_for_cli( + node_api_url.clone(), + args.faucet_args.faucet_port, + FunderKeyEnum::KeyFile(test_dir.join("mint.key")), + args.faucet_args.do_not_delegate, + None, + ), + prerequisite_health_checkers, + }) + } +} + +#[async_trait] +impl ServiceManager for FaucetManager { + fn get_name(&self) -> String { + "Faucet".to_string() + } + + fn get_healthchecks(&self) -> Vec { + vec![HealthChecker::http_checker_from_port( + self.config.server_config.listen_port, + self.get_name(), + )] + } + + fn get_prerequisite_health_checkers(&self) -> Vec<&HealthChecker> { + self.prerequisite_health_checkers.iter().collect() + } + + async fn run_service(self: Box) -> Result<()> { + self.config.run().await + } +} diff --git a/crates/aptos/src/node/local_testnet/health_checker.rs b/crates/aptos/src/node/local_testnet/health_checker.rs index 2ba6438160ba6f..0f385c8932df3a 100644 --- a/crates/aptos/src/node/local_testnet/health_checker.rs +++ b/crates/aptos/src/node/local_testnet/health_checker.rs @@ -17,7 +17,7 @@ const WAIT_INTERVAL_MS: u64 = 150; #[derive(Clone, Debug, Serialize)] pub enum HealthChecker { /// Check that an HTTP API is up. The second param is the name of the HTTP service. - Http(Url, &'static str), + Http(Url, String), /// Check that the node API is up. This is just a specific case of Http for extra /// guarantees around liveliness. NodeApi(Url), @@ -100,6 +100,14 @@ impl HealthChecker { HealthChecker::DataServiceGrpc(url) => url.as_str(), } } + + /// Given a port, make an instance of HealthChecker::Http targeting 127.0.0.1. + pub fn http_checker_from_port(port: u16, name: String) -> Self { + Self::Http( + Url::parse(&format!("http://127.0.0.1:{}", port,)).unwrap(), + name, + ) + } } impl std::fmt::Display for HealthChecker { @@ -123,15 +131,25 @@ where let start = Instant::now(); let mut started_successfully = false; + let mut last_error_message = None; while start.elapsed() < max_wait { - if check_fn().await.is_ok() { - started_successfully = true; - break; + match check_fn().await { + Ok(_) => { + started_successfully = true; + break; + }, + Err(err) => { + last_error_message = Some(format!("{:#}", err)); + }, } tokio::time::sleep(wait_interval).await } if !started_successfully { + let error_message = match last_error_message { + Some(last_error_message) => format!("{}: {}", error_message, last_error_message), + None => error_message, + }; return Err(CliError::UnexpectedError(error_message)); } diff --git a/crates/aptos/src/node/local_testnet/mod.rs b/crates/aptos/src/node/local_testnet/mod.rs index 47edbf1570344c..89b3d9212943b2 100644 --- a/crates/aptos/src/node/local_testnet/mod.rs +++ b/crates/aptos/src/node/local_testnet/mod.rs @@ -1,16 +1,21 @@ // Copyright © Aptos Foundation // SPDX-License-Identifier: Apache-2.0 +mod faucet; mod health_checker; mod logging; +mod node; mod ready_server; +mod traits; mod utils; use self::{ + faucet::FaucetArgs, health_checker::HealthChecker, logging::ThreadNameMakeWriter, - ready_server::{run_ready_server, ReadyServerConfig}, - utils::socket_addr_to_url, + node::NodeArgs, + ready_server::ReadyServerArgs, + traits::{PostHealthyStep, ServiceManager}, }; use crate::{ common::{ @@ -18,26 +23,21 @@ use crate::{ utils::prompt_yes_with_override, }, config::GlobalConfig, + node::local_testnet::{ + faucet::FaucetManager, node::NodeManager, ready_server::ReadyServerManager, + }, }; use anyhow::Context; -use aptos_config::config::{NodeConfig, DEFAULT_GRPC_STREAM_PORT}; -use aptos_faucet_core::server::{FunderKeyEnum, RunConfig as FaucetConfig}; use aptos_indexer_grpc_server_framework::setup_logging; -use aptos_logger::debug; -use aptos_node::{load_node_config, start_test_environment_node}; use async_trait::async_trait; use clap::Parser; -use futures::{Future, FutureExt}; -use rand::{rngs::StdRng, SeedableRng}; -use reqwest::Url; use std::{ fs::{create_dir_all, remove_dir_all}, path::PathBuf, pin::Pin, - thread, - time::Duration, }; use tokio::task::JoinHandle; +use tracing::info; use tracing_subscriber::fmt::MakeWriter; const TESTNET_FOLDER: &str = "testnet"; @@ -49,231 +49,63 @@ const TESTNET_FOLDER: &str = "testnet"; /// you specify otherwise with --no-faucet and --no-txn-stream respectively. #[derive(Parser)] pub struct RunLocalTestnet { - /// An overridable config template for the test node - /// - /// If provided, the config will be used, and any needed configuration for the local testnet - /// will override the config's values - #[clap(long, value_parser)] - config_path: Option, - /// The directory to save all files for the node /// /// Defaults to .aptos/testnet #[clap(long, value_parser)] test_dir: Option, - /// Path to node configuration file override for local test mode. - /// - /// If provided, the default node config will be overridden by the config in the given file. - /// Cannot be used with --config-path - #[clap(long, value_parser, conflicts_with("config_path"))] - test_config_override: Option, - - /// Random seed for key generation in test mode - /// - /// This allows you to have deterministic keys for testing - #[clap(long, value_parser = aptos_node::load_seed)] - seed: Option<[u8; 32]>, - /// Clean the state and start with a new chain at genesis /// - /// This will wipe the aptosdb in `test-dir` to remove any incompatible changes, and start - /// the chain fresh. Note, that you will need to publish the module again and distribute funds - /// from the faucet accordingly + /// This will wipe the aptosdb in `--test-dir` to remove any incompatible changes, and start + /// the chain fresh. Note, that you will need to publish the module again and distribute funds + /// from the faucet accordingly. #[clap(long)] force_restart: bool, - /// Port to run the faucet on. - /// - /// When running, you'll be able to use the faucet at `http://127.0.0.1:/mint` e.g. - /// `http//127.0.0.1:8081/mint` - #[clap(long, default_value_t = 8081)] - faucet_port: u16, - - /// Do not run a faucet alongside the node. - /// - /// Running a faucet alongside the node allows you to create and fund accounts - /// for testing. - #[clap(long)] - no_faucet: bool, - - /// This does nothing, we already run a faucet by default. We only keep this here - /// for backwards compatibility with tests. We will remove this once the commit - /// that added --no-faucet makes its way to the testnet branch. - #[clap(long, hide = true)] - with_faucet: bool, - - /// Disable the delegation of faucet minting to a dedicated account. - #[clap(long)] - do_not_delegate: bool, - - /// Do not run a transaction stream service alongside the node. - /// - /// Note: In reality this is not the same as running a Transaction Stream Service, - /// it is just using the stream from the node, but in practice this distinction - /// shouldn't matter. - #[clap(long)] - no_txn_stream: bool, + #[clap(flatten)] + node_args: NodeArgs, - /// The port at which to expose the grpc transaction stream. - #[clap(long, default_value_t = DEFAULT_GRPC_STREAM_PORT)] - txn_stream_port: u16, + #[clap(flatten)] + faucet_args: FaucetArgs, #[clap(flatten)] - ready_server_config: ReadyServerConfig, + ready_server_args: ReadyServerArgs, #[clap(flatten)] prompt_options: PromptOptions, } -#[derive(Debug)] -struct AllConfigs { - ready_server_config: ReadyServerConfig, - node_config: NodeConfig, - faucet_config: Option, -} - -impl AllConfigs { - pub fn get_node_api_url(&self) -> Url { - socket_addr_to_url(&self.node_config.api.address, "http").unwrap() - } -} - impl RunLocalTestnet { - /// This function builds all the configs we need to run each of the requested - /// services. We separate creating configs and spawning services to keep the - /// code clean. This could also allow us to one day have two phases for starting - /// a local testnet, in which you can alter the configs on disk between each phase. - fn build_configs(&self, test_dir: PathBuf) -> anyhow::Result { - let rng = self - .seed - .map(StdRng::from_seed) - .unwrap_or_else(StdRng::from_entropy); - - // If there is a config on disk, this function will use that. If not, it will - // create a new one, taking the config_path and test_config_override arguments - // into account. - let mut node_config = load_node_config( - &self.config_path, - &self.test_config_override, - &test_dir, - false, - false, - aptos_cached_packages::head_release_bundle(), - rng, - ) - .context("Failed to load / create config for node")?; - - eprintln!(); - - // Enable the grpc stream on the node if we will run a txn stream service. - let run_txn_stream = !self.no_txn_stream; - node_config.indexer_grpc.enabled = run_txn_stream; - node_config.indexer_grpc.use_data_service_interface = run_txn_stream; - node_config - .indexer_grpc - .address - .set_port(self.txn_stream_port); - - // So long as the indexer relies on storage indexing tables, this must be set - // for the indexer GRPC stream on the node to work. - node_config.storage.enable_indexer = run_txn_stream; - - let node_api_url = socket_addr_to_url(&node_config.api.address, "http").unwrap(); - - let faucet_config = if self.no_faucet { - None - } else { - Some(FaucetConfig::build_for_cli( - node_api_url.clone(), - self.faucet_port, - FunderKeyEnum::KeyFile(test_dir.join("mint.key")), - self.do_not_delegate, - None, - )) - }; - - Ok(AllConfigs { - ready_server_config: self.ready_server_config.clone(), - node_config, - faucet_config, - }) - } - - // Note: These start_* functions (e.g. start_node) can run checks prior to - // returning the future for the service, for example to ensure that a prerequisite - // service has started. They cannot however do anything afterwards. For that, - // you probably want to define a HealthCheck to register with wait_for_startup. - - /// Spawn the node on a thread and then create a future that just waits for it to - /// exit (which should never happen) forever. This is necessary because there is - /// no async function we can use to run the node. - async fn start_node( - &self, - test_dir: PathBuf, - config: NodeConfig, - ) -> CliTypedResult> { - let node_thread_handle = thread::spawn(move || { - let result = start_test_environment_node(config, test_dir, false); - eprintln!("Node stopped unexpectedly {:#?}", result); - }); - - // This just waits for the node thread forever. - let node_future = async move { - loop { - if node_thread_handle.is_finished() { - return; - } - tokio::time::sleep(Duration::from_millis(500)).await; - } - }; - - Ok(node_future) - } - - /// Run the faucet. - async fn start_faucet( - &self, - config: FaucetConfig, - node_api_url: Url, - ) -> CliTypedResult> { - HealthChecker::NodeApi(node_api_url) - .wait(Some("Faucet")) - .await?; - - // Start the faucet - Ok(config.run().map(|result| { - eprintln!("Faucet stopped unexpectedly {:#?}", result); - })) - } - - /// Run the ready server. - async fn start_ready_server( - &self, - health_checks: Vec, - ) -> CliTypedResult> { - let config = self.ready_server_config.clone(); - Ok(run_ready_server(health_checks, config).map(|result| { - eprintln!("Faucet stopped unexpectedly {:#?}", result); - })) - } - /// Wait for many services to start up. This prints a message like "X is starting, /// please wait..." for each service and then "X is running. Endpoint: " /// when it's ready. - async fn wait_for_startup<'a>(&self, health_checks: &Vec) -> CliTypedResult<()> { + async fn wait_for_startup<'a>( + &self, + health_checkers: &Vec, + ) -> CliTypedResult<()> { let mut futures: Vec> + Send>>> = Vec::new(); - for health_check in health_checks { - eprintln!("{} is starting, please wait...", health_check); + for health_checker in health_checkers { + // We don't want to print anything for the processors, it'd be too spammy. + let silent = match health_checker { + HealthChecker::NodeApi(_) => false, + HealthChecker::Http(_, name) => name.contains("processor"), + HealthChecker::DataServiceGrpc(_) => false, + }; + if !silent { + eprintln!("{} is starting, please wait...", health_checker); + } let fut = async move { - health_check.wait(None).await?; - eprintln!( - "{} is running. Endpoint: {}", - health_check, - health_check.address_str() - ); + health_checker.wait(None).await?; + if !silent { + eprintln!( + "{} is running. Endpoint: {}", + health_checker, + health_checker.address_str() + ); + } Ok(()) }; futures.push(Box::pin(fut)); @@ -291,8 +123,6 @@ impl RunLocalTestnet { })?; } - eprintln!("\nAll services are running, you can now use the local testnet!"); - Ok(()) } } @@ -313,7 +143,7 @@ impl CliCommand<()> for RunLocalTestnet { }; // If asked, remove the current test directory and start with a new node. - if test_dir.exists() && self.force_restart { + if self.force_restart && test_dir.exists() { prompt_yes_with_override( "Are you sure you want to delete the existing local testnet data?", self.prompt_options, @@ -321,14 +151,15 @@ impl CliCommand<()> for RunLocalTestnet { remove_dir_all(test_dir.as_path()).map_err(|err| { CliError::IO(format!("Failed to delete {}", test_dir.display()), err) })?; + info!("Deleted test directory at: {:?}", test_dir); } if !test_dir.exists() { - debug!("Test directory does not exist, creating it: {:?}", test_dir); + info!("Test directory does not exist, creating it: {:?}", test_dir); create_dir_all(test_dir.as_path()).map_err(|err| { CliError::IO(format!("Failed to create {}", test_dir.display()), err) })?; - debug!("Created test directory: {:?}", test_dir); + info!("Created test directory: {:?}", test_dir); } // Set up logging for anything that uses tracing. These logs will go to @@ -338,78 +169,96 @@ impl CliCommand<()> for RunLocalTestnet { move || ThreadNameMakeWriter::new(td.clone()).make_writer() as Box; setup_logging(Some(Box::new(make_writer))); - let all_configs = self - .build_configs(test_dir.clone()) - .context("Failed to build configs")?; - - let node_api_url = all_configs.get_node_api_url(); - - let AllConfigs { - ready_server_config, - node_config, - faucet_config, - } = all_configs; - - // Collect all the health checks we want to run. - let mut health_checks = Vec::new(); - health_checks.push(HealthChecker::NodeApi(node_api_url.clone())); - - if let Some(config) = &faucet_config { - let url = Url::parse(&format!( - "http://{}:{}", - config.server_config.listen_address, config.server_config.listen_port - )) - .unwrap(); - health_checks.push(HealthChecker::Http(url, "Faucet")); + let mut managers: Vec> = Vec::new(); + + // Build the node manager. We do this unconditionally. + let node_manager = NodeManager::new(&self, test_dir.clone()) + .context("Failed to build node service manager")?; + let node_health_checkers = node_manager.get_healthchecks(); + + // If configured to do so, build the faucet manager. + if !self.faucet_args.no_faucet { + let faucet_manager = FaucetManager::new( + &self, + node_health_checkers.clone(), + test_dir.clone(), + node_manager.get_node_api_url(), + ) + .context("Failed to build faucet service manager")?; + managers.push(Box::new(faucet_manager)); } - if !self.no_txn_stream { - let url = socket_addr_to_url(&node_config.indexer_grpc.address, "http")?; - health_checks.push(HealthChecker::DataServiceGrpc(url)); + // Now we the node manager into managers last, just so we have access to it + // before this so we can call things like `node_manager.get_node_api_url()`. + managers.push(Box::new(node_manager)); + + // Get the healthcheckers from all the managers. We'll pass to this + // `wait_for_startup`. + let health_checkers: Vec = + managers.iter().flat_map(|m| m.get_healthchecks()).collect(); + + // The final manager we add is the ready server. This must happen last since + // it use the health checkers from all the other services. + managers.push(Box::new(ReadyServerManager::new( + &self, + health_checkers.clone(), + )?)); + + // Run any pre-run steps. + for manager in &managers { + manager.pre_run().await.with_context(|| { + format!("Failed to apply pre run steps for {}", manager.get_name()) + })?; } - // Build tasks for each of the services. + eprintln!( + "Readiness endpoint: http://0.0.0.0:{}/\n", + self.ready_server_args.ready_server_listen_port, + ); + + // Collect post healthy steps to run after the services start. + let post_healthy_steps: Vec> = managers + .iter() + .flat_map(|m| m.get_post_healthy_steps()) + .collect(); + let mut tasks: Vec> = Vec::new(); - // Push a task to run the ready server. - tasks.push(tokio::spawn( - self.start_ready_server(health_checks.clone()) - .await - .context("Failed to create future to start the ready server")?, - )); + // Start each of the services. + for manager in managers.into_iter() { + tasks.push(manager.run()); + } - // Run the node API. - tasks.push(tokio::spawn( - self.start_node(test_dir.clone(), node_config) + // Wait for all the services to start up. + self.wait_for_startup(&health_checkers).await?; + + eprintln!("\nApplying post startup steps..."); + + // Run any post healthy steps. + for post_healthy_step in post_healthy_steps { + post_healthy_step + .run() .await - .context("Failed to create future to start the node")?, - )); - - // If configured, run the faucet. - if let Some(config) = faucet_config { - tasks.push(tokio::spawn( - self.start_faucet(config, node_api_url.clone()) - .await - .context("Failed to create future to start the faucet")?, - )); + .context("Failed to run post startup step")?; } - eprintln!( - "Readiness endpoint: http://0.0.0.0:{}/\n", - ready_server_config.ready_server_listen_port + eprintln!("\nSetup is complete, you can now use the local testnet!"); + + // Wait for all of the tasks. We should never get past this point unless + // something goes goes wrong or the user signals for the process to end. + let (result, _, handles) = futures::future::select_all(tasks).await; + + // Something ended unexpectedly, exit with any relevant information. + let finished_handles = handles + .into_iter() + .filter(|handle| handle.is_finished()) + .map(|handle| handle.id()) + .collect::>(); + let message = format!( + "One of the services ({:?}) stopped unexpectedly: {:?}", + finished_handles, result, ); - // Wait for all the services to start up. - self.wait_for_startup(&health_checks).await?; - - // Wait for all of the futures for the tasks. We should never get past this - // point unless something goes wrong or the user signals for the process to - // end. - let result = futures::future::select_all(tasks).await; - - Err(CliError::UnexpectedError(format!( - "One of the components stopped unexpectedly: {:?}", - result - ))) + Err(CliError::UnexpectedError(message)) } } diff --git a/crates/aptos/src/node/local_testnet/node.rs b/crates/aptos/src/node/local_testnet/node.rs new file mode 100644 index 00000000000000..4bebc26d3e746b --- /dev/null +++ b/crates/aptos/src/node/local_testnet/node.rs @@ -0,0 +1,148 @@ +// Copyright © Aptos Foundation +// SPDX-License-Identifier: Apache-2.0 + +use super::{health_checker::HealthChecker, traits::ServiceManager, RunLocalTestnet}; +use crate::node::local_testnet::utils::socket_addr_to_url; +use anyhow::{anyhow, Context, Result}; +use aptos_config::config::{NodeConfig, DEFAULT_GRPC_STREAM_PORT}; +use aptos_node::{load_node_config, start_test_environment_node}; +use async_trait::async_trait; +use clap::Parser; +use rand::{rngs::StdRng, SeedableRng}; +use reqwest::Url; +use std::{path::PathBuf, thread, time::Duration}; + +/// Args specific to running a node (and its components, e.g. the txn stream) in the +/// local testnet. +#[derive(Debug, Parser)] +pub struct NodeArgs { + /// An overridable config template for the test node + /// + /// If provided, the config will be used, and any needed configuration for the local testnet + /// will override the config's values + #[clap(long, value_parser)] + pub config_path: Option, + + /// Path to node configuration file override for local test mode. + /// + /// If provided, the default node config will be overridden by the config in the given file. + /// Cannot be used with --config-path + #[clap(long, value_parser, conflicts_with("config_path"))] + pub test_config_override: Option, + + /// Random seed for key generation in test mode + /// + /// This allows you to have deterministic keys for testing + #[clap(long, value_parser = aptos_node::load_seed)] + pub seed: Option<[u8; 32]>, + + /// Do not run a transaction stream service alongside the node. + /// + /// Note: In reality this is not the same as running a Transaction Stream Service, + /// it is just using the stream directly on the node, but in practice this + /// distinction shouldn't matter. + #[clap(long)] + no_txn_stream: bool, + + /// The port at which to expose the grpc transaction stream. + #[clap(long, default_value_t = DEFAULT_GRPC_STREAM_PORT)] + txn_stream_port: u16, +} + +#[derive(Clone, Debug)] +pub struct NodeManager { + config: NodeConfig, + test_dir: PathBuf, +} + +impl NodeManager { + pub fn new(args: &RunLocalTestnet, test_dir: PathBuf) -> Result { + let rng = args + .node_args + .seed + .map(StdRng::from_seed) + .unwrap_or_else(StdRng::from_entropy); + + // If there is a config on disk, this function will use that. If not, it will + // create a new one, taking the config_path and test_config_override arguments + // into account. + let mut node_config = load_node_config( + &args.node_args.config_path, + &args.node_args.test_config_override, + &test_dir, + false, + false, + aptos_cached_packages::head_release_bundle(), + rng, + ) + .context("Failed to load / create config for node")?; + + eprintln!(); + + // Enable the grpc stream on the node if we will run a txn stream service. + let run_txn_stream = !args.node_args.no_txn_stream; + node_config.indexer_grpc.enabled = run_txn_stream; + node_config.indexer_grpc.use_data_service_interface = run_txn_stream; + node_config + .indexer_grpc + .address + .set_port(args.node_args.txn_stream_port); + + // So long as the indexer relies on storage indexing tables, this must be set + // for the indexer GRPC stream on the node to work. + node_config.storage.enable_indexer = run_txn_stream; + + Ok(NodeManager { + config: node_config, + test_dir, + }) + } + + pub fn get_node_api_url(&self) -> Url { + socket_addr_to_url(&self.config.api.address, "http").unwrap() + } +} + +#[async_trait] +impl ServiceManager for NodeManager { + fn get_name(&self) -> String { + "Node API".to_string() + } + + /// We return health checkers for both the Node API and the txn stream (if enabled). + /// As it is now, it is fine to make downstream services wait for both but if that + /// changes we can refactor. + fn get_healthchecks(&self) -> Vec { + let node_api_url = self.get_node_api_url(); + let mut checkers = vec![HealthChecker::NodeApi(node_api_url)]; + if self.config.indexer_grpc.enabled { + let data_service_url = + socket_addr_to_url(&self.config.indexer_grpc.address, "http").unwrap(); + checkers.push(HealthChecker::DataServiceGrpc(data_service_url)); + } + checkers + } + + fn get_prerequisite_health_checkers(&self) -> Vec<&HealthChecker> { + // The node doesn't depend on anything, we start it first. + vec![] + } + + /// Spawn the node on a thread and then create a future that just waits for it to + /// exit (which should never happen) forever. This is necessary because there is + /// no async function we can use to run the node. + async fn run_service(self: Box) -> Result<()> { + let node_thread_handle = thread::spawn(move || { + let result = start_test_environment_node(self.config, self.test_dir, false); + eprintln!("Node stopped unexpectedly {:#?}", result); + }); + + // This just waits for the node thread forever. + loop { + if node_thread_handle.is_finished() { + return Err(anyhow!("Node thread finished unexpectedly")); + } + tokio::time::sleep(Duration::from_millis(500)).await; + } + } +} diff --git a/crates/aptos/src/node/local_testnet/ready_server.rs b/crates/aptos/src/node/local_testnet/ready_server.rs index 157e0a3cc59873..241c85789fb0bb 100644 --- a/crates/aptos/src/node/local_testnet/ready_server.rs +++ b/crates/aptos/src/node/local_testnet/ready_server.rs @@ -1,8 +1,9 @@ // Copyright © Aptos Foundation // SPDX-License-Identifier: Apache-2.0 -use crate::node::local_testnet::health_checker::HealthChecker; +use super::{health_checker::HealthChecker, traits::ServiceManager, RunLocalTestnet}; use anyhow::Result; +use async_trait::async_trait; use clap::Parser; use poem::{ get, handler, @@ -15,19 +16,58 @@ use poem::{ use serde::Serialize; use std::net::{Ipv4Addr, SocketAddrV4}; +/// Args related to running a ready server in the local testnet. The ready server lets +/// users / clients check that if all the services in the local testnet are ready +/// without having to ping each service individually. #[derive(Debug, Clone, Parser)] -pub struct ReadyServerConfig { +pub struct ReadyServerArgs { /// The port to run the ready server. This exposes an endpoint at `/` that you can /// use to check if the entire local testnet is ready. - #[clap(long, default_value_t = 8090)] + #[clap(long, default_value_t = 8070)] pub ready_server_listen_port: u16, } +#[derive(Clone, Debug)] +pub struct ReadyServerManager { + config: ReadyServerArgs, + health_checkers: Vec, +} + +impl ReadyServerManager { + pub fn new(args: &RunLocalTestnet, health_checkers: Vec) -> Result { + Ok(ReadyServerManager { + config: args.ready_server_args.clone(), + health_checkers, + }) + } +} + +#[async_trait] +impl ServiceManager for ReadyServerManager { + fn get_name(&self) -> String { + "Ready Server".to_string() + } + + fn get_healthchecks(&self) -> Vec { + // We don't health check the service that exposes health checks. + vec![] + } + + fn get_prerequisite_health_checkers(&self) -> Vec<&HealthChecker> { + // This service should start before the other services are ready. + vec![] + } + + async fn run_service(self: Box) -> Result<()> { + run_ready_server(self.health_checkers, self.config).await + } +} + /// This returns a future that runs a web server that exposes a single unified health /// checking port. Clients can use this to check if all the services are ready. pub async fn run_ready_server( health_checkers: Vec, - config: ReadyServerConfig, + config: ReadyServerArgs, ) -> Result<()> { let app = Route::new() .at("/", get(root)) diff --git a/crates/aptos/src/node/local_testnet/traits.rs b/crates/aptos/src/node/local_testnet/traits.rs new file mode 100644 index 00000000000000..497012e8e641b0 --- /dev/null +++ b/crates/aptos/src/node/local_testnet/traits.rs @@ -0,0 +1,90 @@ +// Copyright © Aptos Foundation +// SPDX-License-Identifier: Apache-2.0 + +use super::health_checker::HealthChecker; +use anyhow::{Context, Result}; +use async_trait::async_trait; +use futures::FutureExt; +use std::fmt::Debug; +use tokio::task::JoinHandle; +use tracing::warn; + +#[async_trait] +pub trait ServiceManager: Debug + Send + Sync + 'static { + /// Pretty name that we will show to the user for updates about this service. + fn get_name(&self) -> String; + + /// This is called before the service is run. This is a good place to do any + /// setup that needs to be done before the service is run. + async fn pre_run(&self) -> Result<()> { + Ok(()) + } + + /// All services should expose some way to check if they are healthy. This function + /// returns HealthCheckers, a struct that serves this purpose, that later services + /// can use to make sure prerequisite services have started. These are also used + /// by the "ready server", a server that exposes a unified endpoint for checking + /// if all services are ready. + fn get_healthchecks(&self) -> Vec; + + /// Whereas get_healthchecks returns healthchecks that other downstream services can + /// use, this should return health checkers for services that this service is + /// waiting to start. + // + // Note: If we were using an object oriented language, we'd just make the + // constructor of the superclass require a vec of health checkers. Unfortunately + // we can't do that here, hence this runaround where the trait implementer must + // individually handle accepting health checkers and exposing them here. Similarly, + // if we could make this function private we would, but we can't since right now + // all functions in a trait must be pub. + fn get_prerequisite_health_checkers(&self) -> Vec<&HealthChecker>; + + /// This is the function we use from the outside to start the service. It makes + /// sure all the prerequisite services have started and then spawns a tokio task to + /// run the service. The user should never need to override this implementation. + fn run(self: Box) -> JoinHandle<()> { + // We make a new function here so that each task waits for its prereqs within + // its own run function. This way we can start each service in any order. + let name = self.get_name(); + let future = async move { + for health_checker in self.get_prerequisite_health_checkers() { + health_checker + .wait(Some(&self.get_name())) + .await + .context("Prerequisite service did not start up successfully")?; + } + self.run_service() + .await + .context("Service ended with an error")?; + warn!("Service ended unexpectedly without any error"); + Ok(()) + }; + tokio::spawn(future.map(move |result: Result<()>| { + warn!("{} stopped unexpectedly {:#?}", name, result); + })) + } + + /// The ServiceManager may return PostHealthySteps. The tool will run these after + /// the service is started. See `PostHealthyStep` for more information. + // + // You might ask, why not just have a `post_healthy` function? The problem is we + // want `run` to take `self` so the implementer doesn't have to worry about making + // their config Clone, so after that point the ServiceManager won't exist. Hence + // this model. + fn get_post_healthy_steps(&self) -> Vec> { + vec![] + } + + /// This function is responsible for running the service. It should return an error + /// if the service ends unexpectedly. It gets called by `run`. + async fn run_service(self: Box) -> Result<()>; +} + +/// If a service wants to do something after it is healthy, it can define a struct, +/// implement this trait for it, and return an instance of it. +// +// For more information see `get_post_healthy_steps` in `ServiceManager`. +#[async_trait] +pub trait PostHealthyStep: Debug + Send + Sync + 'static { + async fn run(self: Box) -> Result<()>; +} diff --git a/crates/aptos/src/node/local_testnet/utils.rs b/crates/aptos/src/node/local_testnet/utils.rs index 86a9f5f9d57220..58e0a7808c24ee 100644 --- a/crates/aptos/src/node/local_testnet/utils.rs +++ b/crates/aptos/src/node/local_testnet/utils.rs @@ -1,10 +1,11 @@ // Copyright © Aptos Foundation // SPDX-License-Identifier: Apache-2.0 +use anyhow::Result; use reqwest::Url; use std::net::SocketAddr; -pub fn socket_addr_to_url(socket_addr: &SocketAddr, scheme: &str) -> anyhow::Result { +pub fn socket_addr_to_url(socket_addr: &SocketAddr, scheme: &str) -> Result { let host = match socket_addr { SocketAddr::V4(v4) => format!("{}", v4.ip()), SocketAddr::V6(v6) => format!("[{}]", v6.ip()),