Skip to content
This repository has been archived by the owner on Nov 15, 2023. It is now read-only.

Pass an executor through the Configuration #4688

Merged
merged 3 commits into from
Jan 21, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 5 additions & 1 deletion bin/node-template/src/cli.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,14 +18,18 @@ pub fn run<I, T, E>(args: I, exit: E, version: VersionInfo) -> error::Result<()>
type Config<T> = Configuration<(), T>;
match parse_and_prepare::<NoCustom, NoCustom, _>(&version, "substrate-node", args) {
ParseAndPrepare::Run(cmd) => cmd.run(load_spec, exit,
|exit, _cli_args, _custom_args, config: Config<_>| {
|exit, _cli_args, _custom_args, mut config: Config<_>| {
info!("{}", version.name);
info!(" version {}", config.full_version());
info!(" by {}, 2017, 2018", version.author);
info!("Chain specification: {}", config.chain_spec.name());
info!("Node name: {}", config.name);
info!("Roles: {}", display_role(&config));
let runtime = Runtime::new().map_err(|e| format!("{:?}", e))?;
config.tasks_executor = {
let runtime_handle = runtime.handle().clone();
Some(Box::new(move |fut| { runtime_handle.spawn(fut); }))
};
match config.roles {
ServiceRoles::LIGHT => run_until_exit(
runtime,
Expand Down
6 changes: 5 additions & 1 deletion bin/node/cli/src/cli.rs
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,7 @@ pub fn run<I, T, E>(args: I, exit: E, version: sc_cli::VersionInfo) -> error::Re

match parse_and_prepare::<CustomSubcommands, NoCustom, _>(&version, "substrate-node", args) {
ParseAndPrepare::Run(cmd) => cmd.run(load_spec, exit,
|exit, _cli_args, _custom_args, config: Config<_, _>| {
|exit, _cli_args, _custom_args, mut config: Config<_, _>| {
info!("{}", version.name);
info!(" version {}", config.full_version());
info!(" by Parity Technologies, 2017-2019");
Expand All @@ -110,6 +110,10 @@ pub fn run<I, T, E>(args: I, exit: E, version: sc_cli::VersionInfo) -> error::Re
.threaded_scheduler()
.build()
.map_err(|e| format!("{:?}", e))?;
config.tasks_executor = {
let runtime_handle = runtime.handle().clone();
Some(Box::new(move |fut| { runtime_handle.spawn(fut); }))
};
match config.roles {
ServiceRoles::LIGHT => run_until_exit(
runtime,
Expand Down
6 changes: 5 additions & 1 deletion client/service/src/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1147,7 +1147,11 @@ ServiceBuilder<
essential_failed_rx,
to_spawn_tx,
to_spawn_rx,
to_poll: Vec::new(),
tasks_executor: if let Some(exec) = config.tasks_executor {
exec
} else {
return Err(Error::TasksExecutorRequired);
},
rpc_handlers,
_rpc: rpc,
_telemetry: telemetry,
Expand Down
6 changes: 4 additions & 2 deletions client/service/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,15 +21,14 @@ pub use sc_client_db::{kvdb::KeyValueDB, PruningMode};
pub use sc_network::config::{ExtTransport, NetworkConfiguration, Roles};
pub use sc_executor::WasmExecutionMethod;

use std::{path::{PathBuf, Path}, net::SocketAddr, sync::Arc};
use std::{future::Future, path::{PathBuf, Path}, pin::Pin, net::SocketAddr, sync::Arc};
pub use sc_transaction_pool::txpool::Options as TransactionPoolOptions;
use sc_chain_spec::{ChainSpec, RuntimeGenesis, Extension, NoExtension};
use sp_core::crypto::Protected;
use target_info::Target;
use sc_telemetry::TelemetryEndpoints;

/// Service configuration.
#[derive(Clone)]
pub struct Configuration<C, G, E = NoExtension> {
/// Implementation name
pub impl_name: &'static str,
Expand All @@ -39,6 +38,8 @@ pub struct Configuration<C, G, E = NoExtension> {
pub impl_commit: &'static str,
/// Node roles.
pub roles: Roles,
/// How to spawn background tasks. Mandatory, otherwise creating a `Service` will error.
pub tasks_executor: Option<Box<dyn Fn(Pin<Box<dyn Future<Output = ()> + Send>>) + Send>>,
/// Extrinsic pool configuration.
pub transaction_pool: TransactionPoolOptions,
/// Network configuration.
Expand Down Expand Up @@ -160,6 +161,7 @@ impl<C, G, E> Configuration<C, G, E> where
config_dir: config_dir.clone(),
name: Default::default(),
roles: Roles::FULL,
tasks_executor: None,
transaction_pool: Default::default(),
network: Default::default(),
keystore: KeystoreConfig::None,
Expand Down
3 changes: 3 additions & 0 deletions client/service/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,9 @@ pub enum Error {
/// Best chain selection strategy is missing.
#[display(fmt="Best chain selection strategy (SelectChain) is not provided.")]
SelectChainRequired,
/// Tasks executor is missing.
#[display(fmt="Tasks executor hasn't been provided.")]
TasksExecutorRequired,
/// Other error.
Other(String),
}
Expand Down
25 changes: 4 additions & 21 deletions client/service/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ use parking_lot::Mutex;
use sc_client::Client;
use exit_future::Signal;
use futures::{
Future, FutureExt, Stream, StreamExt, TryFutureExt,
Future, FutureExt, Stream, StreamExt,
future::select, channel::mpsc,
compat::*,
sink::SinkExt,
Expand Down Expand Up @@ -95,10 +95,8 @@ pub struct Service<TBl, TCl, TSc, TNetStatus, TNet, TTxPool, TOc> {
to_spawn_tx: mpsc::UnboundedSender<Pin<Box<dyn Future<Output = ()> + Send>>>,
/// Receiver for futures that must be spawned as background tasks.
to_spawn_rx: mpsc::UnboundedReceiver<Pin<Box<dyn Future<Output = ()> + Send>>>,
/// List of futures to poll from `poll`.
/// If spawning a background task is not possible, we instead push the task into this `Vec`.
/// The elements must then be polled manually.
to_poll: Vec<Pin<Box<dyn Future<Output = ()> + Send>>>,
/// How to spawn background tasks.
tasks_executor: Box<dyn Fn(Pin<Box<dyn Future<Output = ()> + Send>>) + Send>,
rpc_handlers: sc_rpc_server::RpcHandler<sc_rpc::Metadata>,
_rpc: Box<dyn std::any::Any + Send + Sync>,
_telemetry: Option<sc_telemetry::Telemetry>,
Expand Down Expand Up @@ -322,22 +320,7 @@ impl<TBl: Unpin, TCl, TSc: Unpin, TNetStatus, TNet, TTxPool, TOc> Future for
}

while let Poll::Ready(Some(task_to_spawn)) = Pin::new(&mut this.to_spawn_rx).poll_next(cx) {
// TODO: Update to tokio 0.2 when libp2p get switched to std futures (#4383)
let executor = tokio_executor::DefaultExecutor::current();
use futures01::future::Executor;
if let Err(err) = executor.execute(task_to_spawn.unit_error().compat()) {
debug!(
target: "service",
"Failed to spawn background task: {:?}; falling back to manual polling",
err
);
this.to_poll.push(Box::pin(err.into_future().compat().map(drop)));
}
}

// Polling all the `to_poll` futures.
while let Some(pos) = this.to_poll.iter_mut().position(|t| Pin::new(t).poll(cx).is_ready()) {
let _ = this.to_poll.remove(pos);
(this.tasks_executor)(task_to_spawn);
}

// The service future never ends.
Expand Down
21 changes: 19 additions & 2 deletions client/service/test/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,11 @@
use std::iter;
use std::sync::{Arc, Mutex, MutexGuard};
use std::net::Ipv4Addr;
use std::pin::Pin;
use std::time::Duration;
use log::info;
use futures01::{Future, Stream, Poll};
use futures::{FutureExt as _, TryFutureExt as _};
use tempfile::TempDir;
use tokio::{runtime::Runtime, prelude::FutureExt};
use tokio::timer::Interval;
Expand Down Expand Up @@ -131,6 +133,7 @@ fn node_config<G, E: Clone> (
index: usize,
spec: &ChainSpec<G, E>,
role: Roles,
tasks_executor: Box<dyn Fn(Pin<Box<dyn futures::Future<Output = ()> + Send>>) + Send>,
key_seed: Option<String>,
base_port: u16,
root: &TempDir,
Expand Down Expand Up @@ -172,6 +175,7 @@ fn node_config<G, E: Clone> (
impl_version: "0.1",
impl_commit: "",
roles: role,
tasks_executor: Some(tasks_executor),
transaction_pool: Default::default(),
network: network_config,
keystore: KeystoreConfig::Path {
Expand Down Expand Up @@ -251,10 +255,15 @@ impl<G, E, F, L, U> TestNet<G, E, F, L, U> where
let executor = self.runtime.executor();

for (key, authority) in authorities {
let tasks_executor = {
let executor = executor.clone();
Box::new(move |fut: Pin<Box<dyn futures::Future<Output = ()> + Send>>| executor.spawn(fut.unit_error().compat()))
};
let node_config = node_config(
self.nodes,
&self.chain_spec,
Roles::AUTHORITY,
tasks_executor,
Some(key),
self.base_port,
&temp,
Expand All @@ -270,7 +279,11 @@ impl<G, E, F, L, U> TestNet<G, E, F, L, U> where
}

for full in full {
let node_config = node_config(self.nodes, &self.chain_spec, Roles::FULL, None, self.base_port, &temp);
let tasks_executor = {
let executor = executor.clone();
Box::new(move |fut: Pin<Box<dyn futures::Future<Output = ()> + Send>>| executor.spawn(fut.unit_error().compat()))
};
let node_config = node_config(self.nodes, &self.chain_spec, Roles::FULL, tasks_executor, None, self.base_port, &temp);
let addr = node_config.network.listen_addresses.iter().next().unwrap().clone();
let (service, user_data) = full(node_config).expect("Error creating test node service");
let service = SyncService::from(service);
Expand All @@ -282,7 +295,11 @@ impl<G, E, F, L, U> TestNet<G, E, F, L, U> where
}

for light in light {
let node_config = node_config(self.nodes, &self.chain_spec, Roles::LIGHT, None, self.base_port, &temp);
let tasks_executor = {
let executor = executor.clone();
Box::new(move |fut: Pin<Box<dyn futures::Future<Output = ()> + Send>>| executor.spawn(fut.unit_error().compat()))
};
let node_config = node_config(self.nodes, &self.chain_spec, Roles::LIGHT, tasks_executor, None, self.base_port, &temp);
let addr = node_config.network.listen_addresses.iter().next().unwrap().clone();
let service = SyncService::from(light(node_config).expect("Error creating test node service"));

Expand Down
3 changes: 3 additions & 0 deletions utils/browser/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,9 @@ where
allow_private_ipv4: true,
enable_mdns: false,
};
config.tasks_executor = Some(Box::new(move |fut| {
wasm_bindgen_futures::spawn_local(fut)
}));
config.telemetry_external_transport = Some(transport);
config.roles = Roles::LIGHT;
config.name = format!("{} (Browser)", name);
Expand Down