Skip to content
This repository has been archived by the owner on Nov 15, 2023. It is now read-only.

Commit

Permalink
Use tokio runtime handle instead of TaskExecutor abstraction
Browse files Browse the repository at this point in the history
Before this pr we had the `TaskExecutor` abstraction which theoretically
allowed that any futures executor could have been used. However, this
was never tested and is currently not really required. Anyone running a
node currently only used tokio and nothing else (because this was hard
coded in CLI). So, this pr removes the `TaskExecutor` abstraction and
relies directly on the tokio runtime handle.

Besides this changes, this pr also makes sure that the http and ws rpc
server use the same tokio runtime. This fixes a panic that occurred when
you drop the rpc servers inside an async function (tokio doesn't like
that a tokio runtime is dropped in the async context of another tokio
runtime).

As we don't use any custom runtime in the http rpc server anymore, this
pr also removes the `rpc-http-threads` cli argument. If external parties
complain that there aren't enough threads for the rpc server, we could
bring support for increasing the thread count of the tokio runtime.
  • Loading branch information
bkchr committed Sep 9, 2021
1 parent cd21e62 commit 01b5a3f
Show file tree
Hide file tree
Showing 27 changed files with 174 additions and 295 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

75 changes: 65 additions & 10 deletions bin/node/cli/tests/running_the_node_and_interrupt.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,23 +16,24 @@
// You should have received a copy of the GNU General Public License
// along with this program. If not, see <https://www.gnu.org/licenses/>.

#![cfg(unix)]

use assert_cmd::cargo::cargo_bin;
use std::{convert::TryInto, process::Command, thread, time::Duration};
use nix::{
sys::signal::{
kill,
Signal::{self, SIGINT, SIGTERM},
},
unistd::Pid,
};
use sc_service::Deref;
use std::{convert::TryInto, ops::DerefMut, process::{Child, Command}, thread, time::Duration};
use tempfile::tempdir;

pub mod common;

#[test]
#[cfg(unix)]
fn running_the_node_works_and_can_be_interrupted() {
use nix::{
sys::signal::{
kill,
Signal::{self, SIGINT, SIGTERM},
},
unistd::Pid,
};

fn run_command_and_kill(signal: Signal) {
let base_path = tempdir().expect("could not create a temp dir");
let mut cmd = Command::new(cargo_bin("substrate"))
Expand All @@ -55,3 +56,57 @@ fn running_the_node_works_and_can_be_interrupted() {
run_command_and_kill(SIGINT);
run_command_and_kill(SIGTERM);
}

struct KillChildOnDrop(Child);

impl Drop for KillChildOnDrop {
fn drop(&mut self) {
let _ = self.0.kill();
}
}

impl Deref for KillChildOnDrop {
type Target = Child;

fn deref(&self) -> &Self::Target {
&self.0
}
}

impl DerefMut for KillChildOnDrop {
fn deref_mut(&mut self) -> &mut Self::Target {
&mut self.0
}
}

#[test]
fn running_two_nodes_with_the_same_ws_port_should_work() {
fn start_node() -> Child {
Command::new(cargo_bin("substrate"))
.args(&["--dev", "--tmp", "--ws-port=45789"])
.spawn()
.unwrap()
}

let mut first_node = KillChildOnDrop(start_node());
let mut second_node = KillChildOnDrop(start_node());

thread::sleep(Duration::from_secs(30));

assert!(first_node.try_wait().unwrap().is_none(), "The first node should still be running");
assert!(second_node.try_wait().unwrap().is_none(), "The second node should still be running");

kill(Pid::from_raw(first_node.id().try_into().unwrap()), SIGINT).unwrap();
kill(Pid::from_raw(second_node.id().try_into().unwrap()), SIGINT).unwrap();

assert_eq!(
common::wait_for(&mut first_node, 30).map(|x| x.success()),
Some(true),
"The first node must exit gracefully",
);
assert_eq!(
common::wait_for(&mut second_node, 30).map(|x| x.success()),
Some(true),
"The second node must exit gracefully",
);
}
15 changes: 7 additions & 8 deletions bin/node/test-runner-example/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -88,18 +88,17 @@ mod tests {
use node_cli::chain_spec::development_config;
use sp_keyring::sr25519::Keyring::Alice;
use sp_runtime::{traits::IdentifyAccount, MultiSigner};
use test_runner::{build_runtime, client_parts, task_executor, ConfigOrChainSpec, Node};
use test_runner::{build_runtime, client_parts, ConfigOrChainSpec, Node};

#[test]
fn test_runner() {
let tokio_runtime = build_runtime().unwrap();
let task_executor = task_executor(tokio_runtime.handle().clone());
let (rpc, task_manager, client, pool, command_sink, backend) = client_parts::<
NodeTemplateChainInfo,
>(
ConfigOrChainSpec::ChainSpec(Box::new(development_config()), task_executor),
)
.unwrap();
let (rpc, task_manager, client, pool, command_sink, backend) =
client_parts::<NodeTemplateChainInfo>(ConfigOrChainSpec::ChainSpec(
Box::new(development_config()),
tokio_runtime.handle().clone(),
))
.unwrap();
let node = Node::<NodeTemplateChainInfo>::new(
rpc,
task_manager,
Expand Down
8 changes: 0 additions & 8 deletions client/cli/src/commands/run_cmd.rs
Original file line number Diff line number Diff line change
Expand Up @@ -127,10 +127,6 @@ pub struct RunCmd {
#[structopt(long = "ws-max-connections", value_name = "COUNT")]
pub ws_max_connections: Option<usize>,

/// Size of the RPC HTTP server thread pool.
#[structopt(long = "rpc-http-threads", value_name = "COUNT")]
pub rpc_http_threads: Option<usize>,

/// Specify browser Origins allowed to access the HTTP & WS RPC servers.
///
/// A comma-separated list of origins (protocol://domain or special `null`
Expand Down Expand Up @@ -381,10 +377,6 @@ impl CliConfiguration for RunCmd {
Ok(self.ws_max_connections)
}

fn rpc_http_threads(&self) -> Result<Option<usize>> {
Ok(self.rpc_http_threads)
}

fn rpc_cors(&self, is_dev: bool) -> Result<Option<Vec<String>>> {
Ok(self
.rpc_cors
Expand Down
14 changes: 3 additions & 11 deletions client/cli/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ use sc_service::{
config::{
BasePath, Configuration, DatabaseSource, KeystoreConfig, NetworkConfiguration,
NodeKeyConfig, OffchainWorkerConfig, PrometheusConfig, PruningMode, Role, RpcMethods,
TaskExecutor, TelemetryEndpoints, TransactionPoolOptions, WasmExecutionMethod,
TelemetryEndpoints, TransactionPoolOptions, WasmExecutionMethod,
},
ChainSpec, KeepBlocks, TracingReceiver, TransactionStorageMode,
};
Expand Down Expand Up @@ -343,13 +343,6 @@ pub trait CliConfiguration<DCV: DefaultConfigurationValues = ()>: Sized {
Ok(None)
}

/// Get the RPC HTTP thread pool size (`None` for a default 4-thread pool config).
///
/// By default this is `None`.
fn rpc_http_threads(&self) -> Result<Option<usize>> {
Ok(None)
}

/// Get the RPC cors (`None` if disabled)
///
/// By default this is `Some(Vec::new())`.
Expand Down Expand Up @@ -460,7 +453,7 @@ pub trait CliConfiguration<DCV: DefaultConfigurationValues = ()>: Sized {
fn create_configuration<C: SubstrateCli>(
&self,
cli: &C,
task_executor: TaskExecutor,
tokio_handle: tokio::runtime::Handle,
) -> Result<Configuration> {
let is_dev = self.is_dev()?;
let chain_id = self.chain_id(is_dev)?;
Expand All @@ -485,7 +478,7 @@ pub trait CliConfiguration<DCV: DefaultConfigurationValues = ()>: Sized {
Ok(Configuration {
impl_name: C::impl_name(),
impl_version: C::impl_version(),
task_executor,
tokio_handle,
transaction_pool: self.transaction_pool()?,
network: self.network_config(
&chain_spec,
Expand Down Expand Up @@ -513,7 +506,6 @@ pub trait CliConfiguration<DCV: DefaultConfigurationValues = ()>: Sized {
rpc_ipc: self.rpc_ipc()?,
rpc_methods: self.rpc_methods()?,
rpc_ws_max_connections: self.rpc_ws_max_connections()?,
rpc_http_threads: self.rpc_http_threads()?,
rpc_cors: self.rpc_cors(is_dev)?,
rpc_max_payload: self.rpc_max_payload()?,
prometheus_config: self.prometheus_config(DCV::prometheus_listen_port())?,
Expand Down
6 changes: 3 additions & 3 deletions client/cli/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,8 +35,8 @@ pub use config::*;
pub use error::*;
pub use params::*;
pub use runner::*;
use sc_service::Configuration;
pub use sc_service::{ChainSpec, Role};
use sc_service::{Configuration, TaskExecutor};
pub use sc_tracing::logging::LoggerBuilder;
pub use sp_version::RuntimeVersion;
use std::io::Write;
Expand Down Expand Up @@ -216,9 +216,9 @@ pub trait SubstrateCli: Sized {
fn create_configuration<T: CliConfiguration<DVC>, DVC: DefaultConfigurationValues>(
&self,
command: &T,
task_executor: TaskExecutor,
tokio_handle: tokio::runtime::Handle,
) -> error::Result<Configuration> {
command.create_configuration(self, task_executor)
command.create_configuration(self, tokio_handle)
}

/// Create a runner for the command provided in argument. This will create a Configuration and
Expand Down
11 changes: 2 additions & 9 deletions client/cli/src/runner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ use crate::{error::Error as CliError, CliConfiguration, Result, SubstrateCli};
use chrono::prelude::*;
use futures::{future, future::FutureExt, pin_mut, select, Future};
use log::info;
use sc_service::{Configuration, Error as ServiceError, TaskManager, TaskType};
use sc_service::{Configuration, Error as ServiceError, TaskManager};
use sc_utils::metrics::{TOKIO_THREADS_ALIVE, TOKIO_THREADS_TOTAL};
use std::marker::PhantomData;

Expand Down Expand Up @@ -116,15 +116,8 @@ impl<C: SubstrateCli> Runner<C> {
let tokio_runtime = build_runtime()?;
let runtime_handle = tokio_runtime.handle().clone();

let task_executor = move |fut, task_type| match task_type {
TaskType::Async => runtime_handle.spawn(fut).map(drop),
TaskType::Blocking => runtime_handle
.spawn_blocking(move || futures::executor::block_on(fut))
.map(drop),
};

Ok(Runner {
config: command.create_configuration(cli, task_executor.into())?,
config: command.create_configuration(cli, runtime_handle)?,
tokio_runtime,
phantom: PhantomData,
})
Expand Down
1 change: 1 addition & 0 deletions client/rpc-servers/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ pubsub = { package = "jsonrpc-pubsub", version = "18.0.0" }
log = "0.4.8"
prometheus-endpoint = { package = "substrate-prometheus-endpoint", path = "../../utils/prometheus", version = "0.9.0"}
serde_json = "1.0.41"
tokio = "1.10"

[target.'cfg(not(target_os = "unknown"))'.dependencies]
http = { package = "jsonrpc-http-server", version = "18.0.0" }
Expand Down
10 changes: 5 additions & 5 deletions client/rpc-servers/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,9 +36,6 @@ pub const RPC_MAX_PAYLOAD_DEFAULT: usize = 15 * MEGABYTE;
/// Default maximum number of connections for WS RPC servers.
const WS_MAX_CONNECTIONS: usize = 100;

/// Default thread pool size for RPC HTTP servers.
const HTTP_THREADS: usize = 4;

/// The RPC IoHandler containing all requested APIs.
pub type RpcHandler<T> = pubsub::PubSubHandler<T, RpcMiddleware>;

Expand Down Expand Up @@ -137,17 +134,18 @@ mod inner {
/// **Note**: Only available if `not(target_os = "unknown")`.
pub fn start_http<M: pubsub::PubSubMetadata + Default + Unpin>(
addr: &std::net::SocketAddr,
thread_pool_size: Option<usize>,
cors: Option<&Vec<String>>,
io: RpcHandler<M>,
maybe_max_payload_mb: Option<usize>,
tokio_handle: tokio::runtime::Handle,
) -> io::Result<http::Server> {
let max_request_body_size = maybe_max_payload_mb
.map(|mb| mb.saturating_mul(MEGABYTE))
.unwrap_or(RPC_MAX_PAYLOAD_DEFAULT);

http::ServerBuilder::new(io)
.threads(thread_pool_size.unwrap_or(HTTP_THREADS))
.threads(1)
.event_loop_executor(tokio_handle)
.health_api(("/health", "system_health"))
.allowed_hosts(hosts_filtering(cors.is_some()))
.rest_api(if cors.is_some() { http::RestApi::Secure } else { http::RestApi::Unsecure })
Expand Down Expand Up @@ -186,13 +184,15 @@ mod inner {
io: RpcHandler<M>,
maybe_max_payload_mb: Option<usize>,
server_metrics: ServerMetrics,
tokio_handle: tokio::runtime::Handle,
) -> io::Result<ws::Server> {
let rpc_max_payload = maybe_max_payload_mb
.map(|mb| mb.saturating_mul(MEGABYTE))
.unwrap_or(RPC_MAX_PAYLOAD_DEFAULT);
ws::ServerBuilder::with_meta_extractor(io, |context: &ws::RequestContext| {
context.sender().into()
})
.event_loop_executor(tokio_handle)
.max_payload(rpc_max_payload)
.max_connections(max_connections.unwrap_or(WS_MAX_CONNECTIONS))
.allowed_origins(map_cors(cors))
Expand Down
2 changes: 1 addition & 1 deletion client/service/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,7 @@ parity-util-mem = { version = "0.10.0", default-features = false, features = [
"primitive-types",
] }
async-trait = "0.1.50"
tokio = { version = "1.10", features = ["time", "rt-multi-thread"] }

[target.'cfg(not(target_os = "unknown"))'.dependencies]
tempfile = "3.1.0"
Expand All @@ -87,5 +88,4 @@ directories = "3.0.2"
[dev-dependencies]
substrate-test-runtime-client = { version = "2.0.0", path = "../../test-utils/runtime/client" }
substrate-test-runtime = { version = "2.0.0", path = "../../test-utils/runtime/" }
tokio = { version = "1.10", features = ["time"] }
async-std = { version = "1.6.5", default-features = false }
4 changes: 2 additions & 2 deletions client/service/src/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -287,7 +287,7 @@ where

let task_manager = {
let registry = config.prometheus_config.as_ref().map(|cfg| &cfg.registry);
TaskManager::new(config.task_executor.clone(), registry)?
TaskManager::new(config.tokio_handle.clone(), registry)?
};

let chain_spec = &config.chain_spec;
Expand Down Expand Up @@ -373,7 +373,7 @@ where
let keystore_container = KeystoreContainer::new(&config.keystore)?;
let task_manager = {
let registry = config.prometheus_config.as_ref().map(|cfg| &cfg.registry);
TaskManager::new(config.task_executor.clone(), registry)?
TaskManager::new(config.tokio_handle.clone(), registry)?
};

let db_storage = {
Expand Down
Loading

0 comments on commit 01b5a3f

Please sign in to comment.