diff --git a/Cargo.lock b/Cargo.lock index 529fbfc17315f..e255d11689eee 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -8077,6 +8077,7 @@ dependencies = [ "log 0.4.14", "serde_json", "substrate-prometheus-endpoint", + "tokio", ] [[package]] diff --git a/bin/node/cli/tests/running_the_node_and_interrupt.rs b/bin/node/cli/tests/running_the_node_and_interrupt.rs index 7a945a30a4166..af19f9edd1b06 100644 --- a/bin/node/cli/tests/running_the_node_and_interrupt.rs +++ b/bin/node/cli/tests/running_the_node_and_interrupt.rs @@ -16,23 +16,24 @@ // You should have received a copy of the GNU General Public License // along with this program. If not, see . +#![cfg(unix)] + use assert_cmd::cargo::cargo_bin; -use std::{convert::TryInto, process::Command, thread, time::Duration}; +use nix::{ + sys::signal::{ + kill, + Signal::{self, SIGINT, SIGTERM}, + }, + unistd::Pid, +}; +use sc_service::Deref; +use std::{convert::TryInto, ops::DerefMut, process::{Child, Command}, thread, time::Duration}; use tempfile::tempdir; pub mod common; #[test] -#[cfg(unix)] fn running_the_node_works_and_can_be_interrupted() { - use nix::{ - sys::signal::{ - kill, - Signal::{self, SIGINT, SIGTERM}, - }, - unistd::Pid, - }; - fn run_command_and_kill(signal: Signal) { let base_path = tempdir().expect("could not create a temp dir"); let mut cmd = Command::new(cargo_bin("substrate")) @@ -55,3 +56,57 @@ fn running_the_node_works_and_can_be_interrupted() { run_command_and_kill(SIGINT); run_command_and_kill(SIGTERM); } + +struct KillChildOnDrop(Child); + +impl Drop for KillChildOnDrop { + fn drop(&mut self) { + let _ = self.0.kill(); + } +} + +impl Deref for KillChildOnDrop { + type Target = Child; + + fn deref(&self) -> &Self::Target { + &self.0 + } +} + +impl DerefMut for KillChildOnDrop { + fn deref_mut(&mut self) -> &mut Self::Target { + &mut self.0 + } +} + +#[test] +fn running_two_nodes_with_the_same_ws_port_should_work() { + fn start_node() -> Child { + Command::new(cargo_bin("substrate")) + .args(&["--dev", "--tmp", "--ws-port=45789"]) + .spawn() + .unwrap() + } + + let mut first_node = KillChildOnDrop(start_node()); + let mut second_node = KillChildOnDrop(start_node()); + + thread::sleep(Duration::from_secs(30)); + + assert!(first_node.try_wait().unwrap().is_none(), "The first node should still be running"); + assert!(second_node.try_wait().unwrap().is_none(), "The second node should still be running"); + + kill(Pid::from_raw(first_node.id().try_into().unwrap()), SIGINT).unwrap(); + kill(Pid::from_raw(second_node.id().try_into().unwrap()), SIGINT).unwrap(); + + assert_eq!( + common::wait_for(&mut first_node, 30).map(|x| x.success()), + Some(true), + "The first node must exit gracefully", + ); + assert_eq!( + common::wait_for(&mut second_node, 30).map(|x| x.success()), + Some(true), + "The second node must exit gracefully", + ); +} diff --git a/bin/node/test-runner-example/src/lib.rs b/bin/node/test-runner-example/src/lib.rs index 6164372ab4f2f..e7fe1ee002423 100644 --- a/bin/node/test-runner-example/src/lib.rs +++ b/bin/node/test-runner-example/src/lib.rs @@ -88,18 +88,17 @@ mod tests { use node_cli::chain_spec::development_config; use sp_keyring::sr25519::Keyring::Alice; use sp_runtime::{traits::IdentifyAccount, MultiSigner}; - use test_runner::{build_runtime, client_parts, task_executor, ConfigOrChainSpec, Node}; + use test_runner::{build_runtime, client_parts, ConfigOrChainSpec, Node}; #[test] fn test_runner() { let tokio_runtime = build_runtime().unwrap(); - let task_executor = task_executor(tokio_runtime.handle().clone()); - let (rpc, task_manager, client, pool, command_sink, backend) = client_parts::< - NodeTemplateChainInfo, - >( - ConfigOrChainSpec::ChainSpec(Box::new(development_config()), task_executor), - ) - .unwrap(); + let (rpc, task_manager, client, pool, command_sink, backend) = + client_parts::(ConfigOrChainSpec::ChainSpec( + Box::new(development_config()), + tokio_runtime.handle().clone(), + )) + .unwrap(); let node = Node::::new( rpc, task_manager, diff --git a/client/cli/src/commands/run_cmd.rs b/client/cli/src/commands/run_cmd.rs index fcc486297b21a..98f2090c6f446 100644 --- a/client/cli/src/commands/run_cmd.rs +++ b/client/cli/src/commands/run_cmd.rs @@ -127,10 +127,6 @@ pub struct RunCmd { #[structopt(long = "ws-max-connections", value_name = "COUNT")] pub ws_max_connections: Option, - /// Size of the RPC HTTP server thread pool. - #[structopt(long = "rpc-http-threads", value_name = "COUNT")] - pub rpc_http_threads: Option, - /// Specify browser Origins allowed to access the HTTP & WS RPC servers. /// /// A comma-separated list of origins (protocol://domain or special `null` @@ -381,10 +377,6 @@ impl CliConfiguration for RunCmd { Ok(self.ws_max_connections) } - fn rpc_http_threads(&self) -> Result> { - Ok(self.rpc_http_threads) - } - fn rpc_cors(&self, is_dev: bool) -> Result>> { Ok(self .rpc_cors diff --git a/client/cli/src/config.rs b/client/cli/src/config.rs index bfc7c6eb7bacc..a2add25df726d 100644 --- a/client/cli/src/config.rs +++ b/client/cli/src/config.rs @@ -29,7 +29,7 @@ use sc_service::{ config::{ BasePath, Configuration, DatabaseSource, KeystoreConfig, NetworkConfiguration, NodeKeyConfig, OffchainWorkerConfig, PrometheusConfig, PruningMode, Role, RpcMethods, - TaskExecutor, TelemetryEndpoints, TransactionPoolOptions, WasmExecutionMethod, + TelemetryEndpoints, TransactionPoolOptions, WasmExecutionMethod, }, ChainSpec, KeepBlocks, TracingReceiver, TransactionStorageMode, }; @@ -343,13 +343,6 @@ pub trait CliConfiguration: Sized { Ok(None) } - /// Get the RPC HTTP thread pool size (`None` for a default 4-thread pool config). - /// - /// By default this is `None`. - fn rpc_http_threads(&self) -> Result> { - Ok(None) - } - /// Get the RPC cors (`None` if disabled) /// /// By default this is `Some(Vec::new())`. @@ -460,7 +453,7 @@ pub trait CliConfiguration: Sized { fn create_configuration( &self, cli: &C, - task_executor: TaskExecutor, + tokio_handle: tokio::runtime::Handle, ) -> Result { let is_dev = self.is_dev()?; let chain_id = self.chain_id(is_dev)?; @@ -485,7 +478,7 @@ pub trait CliConfiguration: Sized { Ok(Configuration { impl_name: C::impl_name(), impl_version: C::impl_version(), - task_executor, + tokio_handle, transaction_pool: self.transaction_pool()?, network: self.network_config( &chain_spec, @@ -513,7 +506,6 @@ pub trait CliConfiguration: Sized { rpc_ipc: self.rpc_ipc()?, rpc_methods: self.rpc_methods()?, rpc_ws_max_connections: self.rpc_ws_max_connections()?, - rpc_http_threads: self.rpc_http_threads()?, rpc_cors: self.rpc_cors(is_dev)?, rpc_max_payload: self.rpc_max_payload()?, prometheus_config: self.prometheus_config(DCV::prometheus_listen_port())?, diff --git a/client/cli/src/lib.rs b/client/cli/src/lib.rs index b560594f77c8b..bb1bff94145f7 100644 --- a/client/cli/src/lib.rs +++ b/client/cli/src/lib.rs @@ -35,8 +35,8 @@ pub use config::*; pub use error::*; pub use params::*; pub use runner::*; +use sc_service::Configuration; pub use sc_service::{ChainSpec, Role}; -use sc_service::{Configuration, TaskExecutor}; pub use sc_tracing::logging::LoggerBuilder; pub use sp_version::RuntimeVersion; use std::io::Write; @@ -216,9 +216,9 @@ pub trait SubstrateCli: Sized { fn create_configuration, DVC: DefaultConfigurationValues>( &self, command: &T, - task_executor: TaskExecutor, + tokio_handle: tokio::runtime::Handle, ) -> error::Result { - command.create_configuration(self, task_executor) + command.create_configuration(self, tokio_handle) } /// Create a runner for the command provided in argument. This will create a Configuration and diff --git a/client/cli/src/runner.rs b/client/cli/src/runner.rs index 2ec200d9285b1..6f03e02a12d05 100644 --- a/client/cli/src/runner.rs +++ b/client/cli/src/runner.rs @@ -20,7 +20,7 @@ use crate::{error::Error as CliError, CliConfiguration, Result, SubstrateCli}; use chrono::prelude::*; use futures::{future, future::FutureExt, pin_mut, select, Future}; use log::info; -use sc_service::{Configuration, Error as ServiceError, TaskManager, TaskType}; +use sc_service::{Configuration, Error as ServiceError, TaskManager}; use sc_utils::metrics::{TOKIO_THREADS_ALIVE, TOKIO_THREADS_TOTAL}; use std::marker::PhantomData; @@ -116,15 +116,8 @@ impl Runner { let tokio_runtime = build_runtime()?; let runtime_handle = tokio_runtime.handle().clone(); - let task_executor = move |fut, task_type| match task_type { - TaskType::Async => runtime_handle.spawn(fut).map(drop), - TaskType::Blocking => runtime_handle - .spawn_blocking(move || futures::executor::block_on(fut)) - .map(drop), - }; - Ok(Runner { - config: command.create_configuration(cli, task_executor.into())?, + config: command.create_configuration(cli, runtime_handle)?, tokio_runtime, phantom: PhantomData, }) diff --git a/client/rpc-servers/Cargo.toml b/client/rpc-servers/Cargo.toml index ebb8c620193f2..0322b9f9c29fd 100644 --- a/client/rpc-servers/Cargo.toml +++ b/client/rpc-servers/Cargo.toml @@ -19,6 +19,7 @@ pubsub = { package = "jsonrpc-pubsub", version = "18.0.0" } log = "0.4.8" prometheus-endpoint = { package = "substrate-prometheus-endpoint", path = "../../utils/prometheus", version = "0.9.0"} serde_json = "1.0.41" +tokio = "1.10" [target.'cfg(not(target_os = "unknown"))'.dependencies] http = { package = "jsonrpc-http-server", version = "18.0.0" } diff --git a/client/rpc-servers/src/lib.rs b/client/rpc-servers/src/lib.rs index 6e09a0ea36ac0..a66c6423ab985 100644 --- a/client/rpc-servers/src/lib.rs +++ b/client/rpc-servers/src/lib.rs @@ -36,9 +36,6 @@ pub const RPC_MAX_PAYLOAD_DEFAULT: usize = 15 * MEGABYTE; /// Default maximum number of connections for WS RPC servers. const WS_MAX_CONNECTIONS: usize = 100; -/// Default thread pool size for RPC HTTP servers. -const HTTP_THREADS: usize = 4; - /// The RPC IoHandler containing all requested APIs. pub type RpcHandler = pubsub::PubSubHandler; @@ -137,17 +134,18 @@ mod inner { /// **Note**: Only available if `not(target_os = "unknown")`. pub fn start_http( addr: &std::net::SocketAddr, - thread_pool_size: Option, cors: Option<&Vec>, io: RpcHandler, maybe_max_payload_mb: Option, + tokio_handle: tokio::runtime::Handle, ) -> io::Result { let max_request_body_size = maybe_max_payload_mb .map(|mb| mb.saturating_mul(MEGABYTE)) .unwrap_or(RPC_MAX_PAYLOAD_DEFAULT); http::ServerBuilder::new(io) - .threads(thread_pool_size.unwrap_or(HTTP_THREADS)) + .threads(1) + .event_loop_executor(tokio_handle) .health_api(("/health", "system_health")) .allowed_hosts(hosts_filtering(cors.is_some())) .rest_api(if cors.is_some() { http::RestApi::Secure } else { http::RestApi::Unsecure }) @@ -186,6 +184,7 @@ mod inner { io: RpcHandler, maybe_max_payload_mb: Option, server_metrics: ServerMetrics, + tokio_handle: tokio::runtime::Handle, ) -> io::Result { let rpc_max_payload = maybe_max_payload_mb .map(|mb| mb.saturating_mul(MEGABYTE)) @@ -193,6 +192,7 @@ mod inner { ws::ServerBuilder::with_meta_extractor(io, |context: &ws::RequestContext| { context.sender().into() }) + .event_loop_executor(tokio_handle) .max_payload(rpc_max_payload) .max_connections(max_connections.unwrap_or(WS_MAX_CONNECTIONS)) .allowed_origins(map_cors(cors)) diff --git a/client/service/Cargo.toml b/client/service/Cargo.toml index b79e95fbb0912..c79eff13ac96c 100644 --- a/client/service/Cargo.toml +++ b/client/service/Cargo.toml @@ -79,6 +79,7 @@ parity-util-mem = { version = "0.10.0", default-features = false, features = [ "primitive-types", ] } async-trait = "0.1.50" +tokio = { version = "1.10", features = ["time", "rt-multi-thread"] } [target.'cfg(not(target_os = "unknown"))'.dependencies] tempfile = "3.1.0" @@ -87,5 +88,4 @@ directories = "3.0.2" [dev-dependencies] substrate-test-runtime-client = { version = "2.0.0", path = "../../test-utils/runtime/client" } substrate-test-runtime = { version = "2.0.0", path = "../../test-utils/runtime/" } -tokio = { version = "1.10", features = ["time"] } async-std = { version = "1.6.5", default-features = false } diff --git a/client/service/src/builder.rs b/client/service/src/builder.rs index f0c037aee232f..b45ff8f3c4631 100644 --- a/client/service/src/builder.rs +++ b/client/service/src/builder.rs @@ -287,7 +287,7 @@ where let task_manager = { let registry = config.prometheus_config.as_ref().map(|cfg| &cfg.registry); - TaskManager::new(config.task_executor.clone(), registry)? + TaskManager::new(config.tokio_handle.clone(), registry)? }; let chain_spec = &config.chain_spec; @@ -373,7 +373,7 @@ where let keystore_container = KeystoreContainer::new(&config.keystore)?; let task_manager = { let registry = config.prometheus_config.as_ref().map(|cfg| &cfg.registry); - TaskManager::new(config.task_executor.clone(), registry)? + TaskManager::new(config.tokio_handle.clone(), registry)? }; let db_storage = { diff --git a/client/service/src/config.rs b/client/service/src/config.rs index 4223a1812204e..c18efb396d7dd 100644 --- a/client/service/src/config.rs +++ b/client/service/src/config.rs @@ -36,12 +36,9 @@ pub use sc_telemetry::TelemetryEndpoints; pub use sc_transaction_pool::Options as TransactionPoolOptions; use sp_core::crypto::SecretString; use std::{ - future::Future, io, net::SocketAddr, path::{Path, PathBuf}, - pin::Pin, - sync::Arc, }; #[cfg(not(target_os = "unknown"))] use tempfile::TempDir; @@ -55,8 +52,8 @@ pub struct Configuration { pub impl_version: String, /// Node role. pub role: Role, - /// How to spawn background tasks. Mandatory, otherwise creating a `Service` will error. - pub task_executor: TaskExecutor, + /// Handle to the tokio runtime. Will be used to spawn futures by the task manager. + pub tokio_handle: tokio::runtime::Handle, /// Extrinsic pool configuration. pub transaction_pool: TransactionPoolOptions, /// Network configuration. @@ -95,8 +92,6 @@ pub struct Configuration { pub rpc_ipc: Option, /// Maximum number of connections for WebSockets RPC server. `None` if default. pub rpc_ws_max_connections: Option, - /// Size of the RPC HTTP server thread pool. `None` if default. - pub rpc_http_threads: Option, /// CORS settings for HTTP & WS servers. `None` if all origins are allowed. pub rpc_cors: Option>, /// RPC methods to expose (by default only a safe subset or all of them). @@ -310,62 +305,3 @@ impl std::convert::From for BasePath { BasePath::new(path) } } - -// NOTE: here for code readability. -pub(crate) type SomeFuture = Pin + Send>>; -pub(crate) type JoinFuture = Pin + Send>>; - -/// Callable object that execute tasks. -/// -/// This struct can be created easily using `Into`. -/// -/// # Examples -/// -/// ## Using tokio -/// -/// ``` -/// # use sc_service::TaskExecutor; -/// use futures::future::FutureExt; -/// use tokio::runtime::Runtime; -/// -/// let runtime = Runtime::new().unwrap(); -/// let handle = runtime.handle().clone(); -/// let task_executor: TaskExecutor = (move |future, _task_type| { -/// handle.spawn(future).map(|_| ()) -/// }).into(); -/// ``` -/// -/// ## Using async-std -/// -/// ``` -/// # use sc_service::TaskExecutor; -/// let task_executor: TaskExecutor = (|future, _task_type| { -/// // NOTE: async-std's JoinHandle is not a Result so we don't need to map the result -/// async_std::task::spawn(future) -/// }).into(); -/// ``` -#[derive(Clone)] -pub struct TaskExecutor(Arc JoinFuture + Send + Sync>); - -impl std::fmt::Debug for TaskExecutor { - fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result { - write!(f, "TaskExecutor") - } -} - -impl std::convert::From for TaskExecutor -where - F: Fn(SomeFuture, TaskType) -> FUT + Send + Sync + 'static, - FUT: Future + Send + 'static, -{ - fn from(func: F) -> Self { - Self(Arc::new(move |fut, tt| Box::pin(func(fut, tt)))) - } -} - -impl TaskExecutor { - /// Spawns a new asynchronous task. - pub fn spawn(&self, future: SomeFuture, task_type: TaskType) -> JoinFuture { - self.0(future, task_type) - } -} diff --git a/client/service/src/lib.rs b/client/service/src/lib.rs index ede6f01a4539e..dd77022fc29fd 100644 --- a/client/service/src/lib.rs +++ b/client/service/src/lib.rs @@ -59,8 +59,8 @@ pub use self::{ error::Error, }; pub use config::{ - BasePath, Configuration, DatabaseSource, KeepBlocks, PruningMode, Role, RpcMethods, - TaskExecutor, TaskType, TransactionStorageMode, + BasePath, Configuration, DatabaseSource, KeepBlocks, PruningMode, Role, RpcMethods, TaskType, + TransactionStorageMode, }; pub use sc_chain_spec::{ ChainSpec, ChainType, Extension as ChainSpecExtension, GenericChainSpec, NoExtension, @@ -408,7 +408,6 @@ fn start_rpc_servers< maybe_start_server(config.rpc_http, |address| { sc_rpc_server::start_http( address, - config.rpc_http_threads, config.rpc_cors.as_ref(), gen_handler( deny_unsafe(&address, &config.rpc_methods), @@ -419,6 +418,7 @@ fn start_rpc_servers< ), )?, config.rpc_max_payload, + config.tokio_handle.clone(), ) .map_err(Error::from) })? @@ -438,6 +438,7 @@ fn start_rpc_servers< )?, config.rpc_max_payload, server_metrics.clone(), + config.tokio_handle.clone(), ) .map_err(Error::from) })? diff --git a/client/service/src/task_manager/mod.rs b/client/service/src/task_manager/mod.rs index 7842acdf0455a..7113ce4e52036 100644 --- a/client/service/src/task_manager/mod.rs +++ b/client/service/src/task_manager/mod.rs @@ -19,7 +19,7 @@ //! Substrate service tasks management module. use crate::{ - config::{JoinFuture, TaskExecutor, TaskType}, + config::TaskType, Error, }; use exit_future::Signal; @@ -35,6 +35,7 @@ use prometheus_endpoint::{ }; use sc_utils::mpsc::{tracing_unbounded, TracingUnboundedReceiver, TracingUnboundedSender}; use std::{panic, pin::Pin, result::Result}; +use tokio::{runtime::Handle, task::JoinHandle}; use tracing_futures::Instrument; mod prometheus_future; @@ -45,9 +46,9 @@ mod tests; #[derive(Clone)] pub struct SpawnTaskHandle { on_exit: exit_future::Exit, - executor: TaskExecutor, + tokio_handle: Handle, metrics: Option, - task_notifier: TracingUnboundedSender, + task_notifier: TracingUnboundedSender>, } impl SpawnTaskHandle { @@ -128,17 +129,22 @@ impl SpawnTaskHandle { } }; - let join_handle = self.executor.spawn(future.in_current_span().boxed(), task_type); + let join_handle = match task_type { + TaskType::Async => self.tokio_handle.spawn(future.in_current_span()), + TaskType::Blocking => { + let handle = self.tokio_handle.clone(); + self.tokio_handle.spawn_blocking(move || { + handle.block_on(future.in_current_span()); + }) + }, + }; let mut task_notifier = self.task_notifier.clone(); - self.executor.spawn( - Box::pin(async move { - if let Err(err) = task_notifier.send(join_handle).await { - error!("Could not send spawned task handle to queue: {}", err); - } - }), - TaskType::Async, - ); + self.tokio_handle.spawn(async move { + if let Err(err) = task_notifier.send(join_handle).await { + error!("Could not send spawned task handle to queue: {}", err); + } + }); } } @@ -222,8 +228,8 @@ pub struct TaskManager { on_exit: exit_future::Exit, /// A signal that makes the exit future above resolve, fired on service drop. signal: Option, - /// How to spawn background tasks. - executor: TaskExecutor, + /// Tokio runtime handle that is used to spawn futures. + tokio_handle: Handle, /// Prometheus metric where to report the polling times. metrics: Option, /// Send a signal when a spawned essential task has concluded. The next time @@ -234,9 +240,9 @@ pub struct TaskManager { /// Things to keep alive until the task manager is dropped. keep_alive: Box, /// A sender to a stream of background tasks. This is used for the completion future. - task_notifier: TracingUnboundedSender, + task_notifier: TracingUnboundedSender>, /// This future will complete when all the tasks are joined and the stream is closed. - completion_future: JoinFuture, + completion_future: JoinHandle<()>, /// A list of other `TaskManager`'s to terminate and gracefully shutdown when the parent /// terminates and gracefully shutdown. Also ends the parent `future()` if a child's essential /// task fails. @@ -247,7 +253,7 @@ impl TaskManager { /// If a Prometheus registry is passed, it will be used to report statistics about the /// service tasks. pub fn new( - executor: TaskExecutor, + tokio_handle: Handle, prometheus_registry: Option<&Registry>, ) -> Result { let (signal, on_exit) = exit_future::signal(); @@ -261,13 +267,15 @@ impl TaskManager { // NOTE: for_each_concurrent will await on all the JoinHandle futures at the same time. It // is possible to limit this but it's actually better for the memory foot print to await // them all to not accumulate anything on that stream. - let completion_future = executor - .spawn(Box::pin(background_tasks.for_each_concurrent(None, |x| x)), TaskType::Async); + let completion_future = + tokio_handle.spawn(background_tasks.for_each_concurrent(None, |x| async move { + let _ = x.await; + })); Ok(Self { on_exit, signal: Some(signal), - executor, + tokio_handle, metrics, essential_failed_tx, essential_failed_rx, @@ -282,7 +290,7 @@ impl TaskManager { pub fn spawn_handle(&self) -> SpawnTaskHandle { SpawnTaskHandle { on_exit: self.on_exit.clone(), - executor: self.executor.clone(), + tokio_handle: self.tokio_handle.clone(), metrics: self.metrics.clone(), task_notifier: self.task_notifier.clone(), } @@ -310,14 +318,9 @@ impl TaskManager { Box::pin(async move { join_all(children_shutdowns).await; - completion_future.await; - - // The keep_alive stuff is holding references to some RPC handles etc. These - // RPC handles spawn their own tokio stuff and that doesn't like to be closed in an - // async context. So, we move the deletion to some other thread. - std::thread::spawn(move || { - let _ = keep_alive; - }); + let _ = completion_future.await; + + let _ = keep_alive; }) } diff --git a/client/service/src/task_manager/tests.rs b/client/service/src/task_manager/tests.rs index 5b6cd7acdd4ab..291d71ebaf03b 100644 --- a/client/service/src/task_manager/tests.rs +++ b/client/service/src/task_manager/tests.rs @@ -16,7 +16,7 @@ // You should have received a copy of the GNU General Public License // along with this program. If not, see . -use crate::{config::TaskExecutor, task_manager::TaskManager}; +use crate::task_manager::TaskManager; use futures::{future::FutureExt, pin_mut, select}; use parking_lot::Mutex; use std::{any::Any, sync::Arc, time::Duration}; @@ -84,17 +84,16 @@ async fn run_background_task_blocking(duration: Duration, _keep_alive: impl Any) } } -fn new_task_manager(task_executor: TaskExecutor) -> TaskManager { - TaskManager::new(task_executor, None).unwrap() +fn new_task_manager(tokio_handle: tokio::runtime::Handle) -> TaskManager { + TaskManager::new(tokio_handle, None).unwrap() } #[test] fn ensure_tasks_are_awaited_on_shutdown() { let runtime = tokio::runtime::Runtime::new().unwrap(); let handle = runtime.handle().clone(); - let task_executor: TaskExecutor = (move |future, _| handle.spawn(future).map(|_| ())).into(); - let task_manager = new_task_manager(task_executor); + let task_manager = new_task_manager(handle); let spawn_handle = task_manager.spawn_handle(); let drop_tester = DropTester::new(); spawn_handle.spawn("task1", run_background_task(drop_tester.new_ref())); @@ -111,9 +110,8 @@ fn ensure_tasks_are_awaited_on_shutdown() { fn ensure_keep_alive_during_shutdown() { let runtime = tokio::runtime::Runtime::new().unwrap(); let handle = runtime.handle().clone(); - let task_executor: TaskExecutor = (move |future, _| handle.spawn(future).map(|_| ())).into(); - let mut task_manager = new_task_manager(task_executor); + let mut task_manager = new_task_manager(handle); let spawn_handle = task_manager.spawn_handle(); let drop_tester = DropTester::new(); task_manager.keep_alive(drop_tester.new_ref()); @@ -130,9 +128,8 @@ fn ensure_keep_alive_during_shutdown() { fn ensure_blocking_futures_are_awaited_on_shutdown() { let runtime = tokio::runtime::Runtime::new().unwrap(); let handle = runtime.handle().clone(); - let task_executor: TaskExecutor = (move |future, _| handle.spawn(future).map(|_| ())).into(); - let task_manager = new_task_manager(task_executor); + let task_manager = new_task_manager(handle); let spawn_handle = task_manager.spawn_handle(); let drop_tester = DropTester::new(); spawn_handle.spawn( @@ -155,9 +152,8 @@ fn ensure_blocking_futures_are_awaited_on_shutdown() { fn ensure_no_task_can_be_spawn_after_terminate() { let runtime = tokio::runtime::Runtime::new().unwrap(); let handle = runtime.handle().clone(); - let task_executor: TaskExecutor = (move |future, _| handle.spawn(future).map(|_| ())).into(); - let mut task_manager = new_task_manager(task_executor); + let mut task_manager = new_task_manager(handle); let spawn_handle = task_manager.spawn_handle(); let drop_tester = DropTester::new(); spawn_handle.spawn("task1", run_background_task(drop_tester.new_ref())); @@ -176,9 +172,8 @@ fn ensure_no_task_can_be_spawn_after_terminate() { fn ensure_task_manager_future_ends_when_task_manager_terminated() { let runtime = tokio::runtime::Runtime::new().unwrap(); let handle = runtime.handle().clone(); - let task_executor: TaskExecutor = (move |future, _| handle.spawn(future).map(|_| ())).into(); - let mut task_manager = new_task_manager(task_executor); + let mut task_manager = new_task_manager(handle); let spawn_handle = task_manager.spawn_handle(); let drop_tester = DropTester::new(); spawn_handle.spawn("task1", run_background_task(drop_tester.new_ref())); @@ -197,9 +192,8 @@ fn ensure_task_manager_future_ends_when_task_manager_terminated() { fn ensure_task_manager_future_ends_with_error_when_essential_task_fails() { let runtime = tokio::runtime::Runtime::new().unwrap(); let handle = runtime.handle().clone(); - let task_executor: TaskExecutor = (move |future, _| handle.spawn(future).map(|_| ())).into(); - let mut task_manager = new_task_manager(task_executor); + let mut task_manager = new_task_manager(handle); let spawn_handle = task_manager.spawn_handle(); let spawn_essential_handle = task_manager.spawn_essential_handle(); let drop_tester = DropTester::new(); @@ -222,12 +216,11 @@ fn ensure_task_manager_future_ends_with_error_when_essential_task_fails() { fn ensure_children_tasks_ends_when_task_manager_terminated() { let runtime = tokio::runtime::Runtime::new().unwrap(); let handle = runtime.handle().clone(); - let task_executor: TaskExecutor = (move |future, _| handle.spawn(future).map(|_| ())).into(); - let mut task_manager = new_task_manager(task_executor.clone()); - let child_1 = new_task_manager(task_executor.clone()); + let mut task_manager = new_task_manager(handle.clone()); + let child_1 = new_task_manager(handle.clone()); let spawn_handle_child_1 = child_1.spawn_handle(); - let child_2 = new_task_manager(task_executor.clone()); + let child_2 = new_task_manager(handle.clone()); let spawn_handle_child_2 = child_2.spawn_handle(); task_manager.add_child(child_1); task_manager.add_child(child_2); @@ -251,13 +244,12 @@ fn ensure_children_tasks_ends_when_task_manager_terminated() { fn ensure_task_manager_future_ends_with_error_when_childs_essential_task_fails() { let runtime = tokio::runtime::Runtime::new().unwrap(); let handle = runtime.handle().clone(); - let task_executor: TaskExecutor = (move |future, _| handle.spawn(future).map(|_| ())).into(); - let mut task_manager = new_task_manager(task_executor.clone()); - let child_1 = new_task_manager(task_executor.clone()); + let mut task_manager = new_task_manager(handle.clone()); + let child_1 = new_task_manager(handle.clone()); let spawn_handle_child_1 = child_1.spawn_handle(); let spawn_essential_handle_child_1 = child_1.spawn_essential_handle(); - let child_2 = new_task_manager(task_executor.clone()); + let child_2 = new_task_manager(handle.clone()); let spawn_handle_child_2 = child_2.spawn_handle(); task_manager.add_child(child_1); task_manager.add_child(child_2); @@ -284,12 +276,11 @@ fn ensure_task_manager_future_ends_with_error_when_childs_essential_task_fails() fn ensure_task_manager_future_continues_when_childs_not_essential_task_fails() { let runtime = tokio::runtime::Runtime::new().unwrap(); let handle = runtime.handle().clone(); - let task_executor: TaskExecutor = (move |future, _| handle.spawn(future).map(|_| ())).into(); - let mut task_manager = new_task_manager(task_executor.clone()); - let child_1 = new_task_manager(task_executor.clone()); + let mut task_manager = new_task_manager(handle.clone()); + let child_1 = new_task_manager(handle.clone()); let spawn_handle_child_1 = child_1.spawn_handle(); - let child_2 = new_task_manager(task_executor.clone()); + let child_2 = new_task_manager(handle.clone()); let spawn_handle_child_2 = child_2.spawn_handle(); task_manager.add_child(child_1); task_manager.add_child(child_2); diff --git a/client/service/test/src/lib.rs b/client/service/test/src/lib.rs index 61313b4488cb4..cda0ef0bf7308 100644 --- a/client/service/test/src/lib.rs +++ b/client/service/test/src/lib.rs @@ -18,7 +18,7 @@ //! Service integration test utils. -use futures::{task::Poll, Future, FutureExt, TryFutureExt as _}; +use futures::{task::Poll, Future, TryFutureExt as _}; use log::{debug, info}; use parking_lot::Mutex; use sc_client_api::{Backend, CallExecutor}; @@ -30,7 +30,7 @@ use sc_service::{ client::Client, config::{BasePath, DatabaseSource, KeystoreConfig}, ChainSpecExtension, Configuration, Error, GenericChainSpec, KeepBlocks, Role, RuntimeGenesis, - SpawnTaskHandle, TaskExecutor, TaskManager, TransactionStorageMode, + SpawnTaskHandle, TaskManager, TransactionStorageMode, }; use sc_transaction_pool_api::TransactionPool; use sp_blockchain::HeaderBackend; @@ -200,7 +200,7 @@ fn node_config< index: usize, spec: &GenericChainSpec, role: Role, - task_executor: TaskExecutor, + tokio_handle: tokio::runtime::Handle, key_seed: Option, base_port: u16, root: &TempDir, @@ -229,7 +229,7 @@ fn node_config< impl_name: String::from("network-test-impl"), impl_version: String::from("0.1"), role, - task_executor, + tokio_handle, transaction_pool: Default::default(), network: network_config, keystore_remote: Default::default(), @@ -248,7 +248,6 @@ fn node_config< rpc_ipc: None, rpc_ws: None, rpc_ws_max_connections: None, - rpc_http_threads: None, rpc_cors: None, rpc_methods: Default::default(), rpc_max_payload: None, @@ -308,21 +307,13 @@ where authorities: impl Iterator Result<(F, U), Error>)>, ) { let handle = self.runtime.handle().clone(); - let task_executor: TaskExecutor = { - let executor = handle.clone(); - (move |fut: Pin + Send>>, _| { - executor.spawn(fut.unit_error()); - async {} - }) - .into() - }; for (key, authority) in authorities { let node_config = node_config( self.nodes, &self.chain_spec, Role::Authority, - task_executor.clone(), + handle.clone(), Some(key), self.base_port, &temp, @@ -343,7 +334,7 @@ where self.nodes, &self.chain_spec, Role::Full, - task_executor.clone(), + handle.clone(), None, self.base_port, &temp, @@ -363,7 +354,7 @@ where self.nodes, &self.chain_spec, Role::Light, - task_executor.clone(), + handle.clone(), None, self.base_port, &temp, diff --git a/test-utils/derive/src/lib.rs b/test-utils/derive/src/lib.rs index 2205b259e3e6c..3f14f67477fad 100644 --- a/test-utils/derive/src/lib.rs +++ b/test-utils/derive/src/lib.rs @@ -36,18 +36,9 @@ fn parse_knobs( let attrs = &input.attrs; let vis = input.vis; - if sig.inputs.len() != 1 { - let msg = "the test function accepts only one argument of type sc_service::TaskExecutor"; - return Err(syn::Error::new_spanned(&sig, msg)) + if !sig.inputs.is_empty() { + return Err(syn::Error::new_spanned(&sig, "No arguments expected for tests.")) } - let (task_executor_name, task_executor_type) = match sig.inputs.pop().map(|x| x.into_value()) { - Some(syn::FnArg::Typed(x)) => (x.pat, x.ty), - _ => { - let msg = - "the test function accepts only one argument of type sc_service::TaskExecutor"; - return Err(syn::Error::new_spanned(&sig, msg)) - }, - }; let crate_name = match crate_name("substrate-test-utils") { Ok(FoundCrate::Itself) => syn::Ident::new("substrate_test_utils", Span::call_site().into()), @@ -65,12 +56,6 @@ fn parse_knobs( #header #(#attrs)* #vis #sig { - use #crate_name::futures::future::FutureExt; - - let #task_executor_name: #task_executor_type = (|fut, _| { - #crate_name::tokio::spawn(fut).map(drop) - }) - .into(); if #crate_name::tokio::time::timeout( std::time::Duration::from_secs( std::env::var("SUBSTRATE_TEST_TIMEOUT") diff --git a/test-utils/src/lib.rs b/test-utils/src/lib.rs index 27f13e2a7b30f..b68994926533a 100644 --- a/test-utils/src/lib.rs +++ b/test-utils/src/lib.rs @@ -19,8 +19,7 @@ #[doc(hidden)] pub use futures; -/// Marks async function to be executed by an async runtime and provide a `TaskExecutor`, -/// suitable to test environment. +/// Marks async function to be executed by an async runtime suitable to test environment. /// /// # Requirements /// @@ -30,10 +29,8 @@ pub use futures; /// /// ``` /// #[substrate_test_utils::test] -/// async fn basic_test(task_executor: TaskExecutor) { +/// async fn basic_test() { /// assert!(true); -/// // create your node in here and use task_executor -/// // then don't forget to gracefully shutdown your node before exit /// } /// ``` pub use substrate_test_utils_derive::test; diff --git a/test-utils/test-crate/src/main.rs b/test-utils/test-crate/src/main.rs index 2f04568591afe..554adcb884064 100644 --- a/test-utils/test-crate/src/main.rs +++ b/test-utils/test-crate/src/main.rs @@ -18,7 +18,7 @@ #[cfg(test)] #[test_utils::test] -async fn basic_test(_: sc_service::TaskExecutor) { +async fn basic_test() { assert!(true); } diff --git a/test-utils/test-runner/src/client.rs b/test-utils/test-runner/src/client.rs index 6622c1f919428..58c4cf6503a93 100644 --- a/test-utils/test-runner/src/client.rs +++ b/test-utils/test-runner/src/client.rs @@ -29,7 +29,7 @@ use sc_client_api::backend::Backend; use sc_executor::NativeElseWasmExecutor; use sc_service::{ build_network, new_full_parts, spawn_tasks, BuildNetworkParams, ChainSpec, Configuration, - SpawnTasksParams, TFullBackend, TFullClient, TaskExecutor, TaskManager, + SpawnTasksParams, TFullBackend, TFullClient, TaskManager, }; use sc_transaction_pool::BasicPool; use sc_transaction_pool_api::TransactionPool; @@ -74,7 +74,7 @@ pub enum ConfigOrChainSpec { /// Configuration object Config(Configuration), /// Chain spec object - ChainSpec(Box, TaskExecutor), + ChainSpec(Box, tokio::runtime::Handle), } /// Creates all the client parts you need for [`Node`](crate::node::Node) pub fn client_parts( @@ -103,8 +103,8 @@ where use sp_consensus_babe::AuthorityId; let config = match config_or_chain_spec { ConfigOrChainSpec::Config(config) => config, - ConfigOrChainSpec::ChainSpec(chain_spec, task_executor) => - default_config(task_executor, chain_spec), + ConfigOrChainSpec::ChainSpec(chain_spec, tokio_handle) => + default_config(tokio_handle, chain_spec), }; let executor = NativeElseWasmExecutor::::new( diff --git a/test-utils/test-runner/src/utils.rs b/test-utils/test-runner/src/utils.rs index 3caba633dcfa9..8e8c84e6b4f8a 100644 --- a/test-utils/test-runner/src/utils.rs +++ b/test-utils/test-runner/src/utils.rs @@ -16,7 +16,6 @@ // You should have received a copy of the GNU General Public License // along with this program. If not, see . -use futures::FutureExt; use sc_client_api::execution_extensions::ExecutionStrategies; use sc_executor::WasmExecutionMethod; use sc_informant::OutputFormat; @@ -26,7 +25,7 @@ use sc_network::{ }; use sc_service::{ config::KeystoreConfig, BasePath, ChainSpec, Configuration, DatabaseSource, KeepBlocks, - TaskExecutor, TaskType, TransactionStorageMode, + TransactionStorageMode, }; use sp_keyring::sr25519::Keyring::Alice; use tokio::runtime::Handle; @@ -43,10 +42,7 @@ pub fn base_path() -> BasePath { } /// Produces a default configuration object, suitable for use with most set ups. -pub fn default_config( - task_executor: TaskExecutor, - mut chain_spec: Box, -) -> Configuration { +pub fn default_config(tokio_handle: Handle, mut chain_spec: Box) -> Configuration { let base_path = base_path(); let root_path = base_path.path().to_path_buf().join("chains").join(chain_spec.id()); @@ -75,7 +71,7 @@ pub fn default_config( impl_name: "test-node".to_string(), impl_version: "0.1".to_string(), role: Role::Authority, - task_executor: task_executor.into(), + tokio_handle, transaction_pool: Default::default(), network: network_config, keystore: KeystoreConfig::Path { path: root_path.join("key"), password: None }, @@ -95,7 +91,6 @@ pub fn default_config( rpc_ws: None, rpc_ipc: None, rpc_ws_max_connections: None, - rpc_http_threads: None, rpc_cors: None, rpc_methods: Default::default(), rpc_max_payload: None, @@ -120,14 +115,3 @@ pub fn default_config( transaction_storage: TransactionStorageMode::BlockBody, } } - -/// Produce a task executor given a handle to a tokio runtime -pub fn task_executor(handle: Handle) -> TaskExecutor { - let task_executor = move |fut, task_type| match task_type { - TaskType::Async => handle.spawn(fut).map(drop), - TaskType::Blocking => - handle.spawn_blocking(move || futures::executor::block_on(fut)).map(drop), - }; - - task_executor.into() -} diff --git a/test-utils/tests/basic.rs b/test-utils/tests/basic.rs index b94f85ccba574..527ca3e365edb 100644 --- a/test-utils/tests/basic.rs +++ b/test-utils/tests/basic.rs @@ -16,28 +16,19 @@ // You should have received a copy of the GNU General Public License // along with this program. If not, see . -use sc_service::{TaskExecutor, TaskType}; - #[substrate_test_utils::test] -async fn basic_test(_: TaskExecutor) { +async fn basic_test() { assert!(true); } #[substrate_test_utils::test] #[should_panic(expected = "boo!")] -async fn panicking_test(_: TaskExecutor) { +async fn panicking_test() { panic!("boo!"); } #[substrate_test_utils::test(flavor = "multi_thread", worker_threads = 1)] -async fn basic_test_with_args(_: TaskExecutor) { - assert!(true); -} - -#[substrate_test_utils::test] -async fn rename_argument(ex: TaskExecutor) { - let ex2 = ex.clone(); - ex2.spawn(Box::pin(async { () }), TaskType::Blocking); +async fn basic_test_with_args() { assert!(true); } @@ -47,7 +38,7 @@ async fn rename_argument(ex: TaskExecutor) { #[substrate_test_utils::test] #[should_panic(expected = "test took too long")] #[ignore] -async fn timeout(_: TaskExecutor) { +async fn timeout() { tokio::time::sleep(std::time::Duration::from_secs( std::env::var("SUBSTRATE_TEST_TIMEOUT") .expect("env var SUBSTRATE_TEST_TIMEOUT has been provided by the user") diff --git a/test-utils/tests/ui.rs b/test-utils/tests/ui.rs index 13602f25572d7..119162fdc21b8 100644 --- a/test-utils/tests/ui.rs +++ b/test-utils/tests/ui.rs @@ -19,6 +19,5 @@ #[test] fn substrate_test_utils_derive_trybuild() { let t = trybuild::TestCases::new(); - t.compile_fail("tests/ui/missing-func-parameter.rs"); t.compile_fail("tests/ui/too-many-func-parameters.rs"); } diff --git a/test-utils/tests/ui/missing-func-parameter.rs b/test-utils/tests/ui/missing-func-parameter.rs deleted file mode 100644 index e08d8ae13100a..0000000000000 --- a/test-utils/tests/ui/missing-func-parameter.rs +++ /dev/null @@ -1,24 +0,0 @@ -// This file is part of Substrate. - -// Copyright (C) 2020-2021 Parity Technologies (UK) Ltd. -// SPDX-License-Identifier: GPL-3.0-or-later WITH Classpath-exception-2.0 - -// This program is free software: you can redistribute it and/or modify -// it under the terms of the GNU General Public License as published by -// the Free Software Foundation, either version 3 of the License, or -// (at your option) any later version. - -// This program is distributed in the hope that it will be useful, -// but WITHOUT ANY WARRANTY; without even the implied warranty of -// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -// GNU General Public License for more details. - -// You should have received a copy of the GNU General Public License -// along with this program. If not, see . - -#[substrate_test_utils::test] -async fn missing_func_parameter() { - assert!(true); -} - -fn main() {} diff --git a/test-utils/tests/ui/missing-func-parameter.stderr b/test-utils/tests/ui/missing-func-parameter.stderr deleted file mode 100644 index fbe0bc69918e8..0000000000000 --- a/test-utils/tests/ui/missing-func-parameter.stderr +++ /dev/null @@ -1,5 +0,0 @@ -error: the test function accepts only one argument of type sc_service::TaskExecutor - --> $DIR/missing-func-parameter.rs:20:1 - | -20 | async fn missing_func_parameter() { - | ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ diff --git a/test-utils/tests/ui/too-many-func-parameters.rs b/test-utils/tests/ui/too-many-func-parameters.rs index 3b742fac7a603..b1789b9d3ee7e 100644 --- a/test-utils/tests/ui/too-many-func-parameters.rs +++ b/test-utils/tests/ui/too-many-func-parameters.rs @@ -16,11 +16,8 @@ // You should have received a copy of the GNU General Public License // along with this program. If not, see . -#[allow(unused_imports)] -use sc_service::TaskExecutor; - #[substrate_test_utils::test] -async fn too_many_func_parameters(task_executor_1: TaskExecutor, task_executor_2: TaskExecutor) { +async fn too_many_func_parameters(_: u32) { assert!(true); } diff --git a/test-utils/tests/ui/too-many-func-parameters.stderr b/test-utils/tests/ui/too-many-func-parameters.stderr index e30bb4ed8ee85..1b1630022e4f7 100644 --- a/test-utils/tests/ui/too-many-func-parameters.stderr +++ b/test-utils/tests/ui/too-many-func-parameters.stderr @@ -1,5 +1,5 @@ -error: the test function accepts only one argument of type sc_service::TaskExecutor - --> $DIR/too-many-func-parameters.rs:23:1 +error: No arguments expected for tests. + --> $DIR/too-many-func-parameters.rs:20:1 | -23 | async fn too_many_func_parameters(task_executor_1: TaskExecutor, task_executor_2: TaskExecutor) { - | ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ +20 | async fn too_many_func_parameters(_: u32) { + | ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^