diff --git a/bin/node-template/node/src/service.rs b/bin/node-template/node/src/service.rs index d7d075efd98ab..2a37bdd893b38 100644 --- a/bin/node-template/node/src/service.rs +++ b/bin/node-template/node/src/service.rs @@ -197,7 +197,7 @@ pub fn new_full(mut config: Configuration) -> Result config: &config, client: client.clone(), transaction_pool: transaction_pool.clone(), - spawn_handle: task_manager.spawn_handle(), + spawn_handle: Box::new(task_manager.spawn_essential_handle()), import_queue, block_announce_validator_builder: None, warp_sync_params: Some(WarpSyncParams::WithProvider(warp_sync)), diff --git a/bin/node/cli/src/service.rs b/bin/node/cli/src/service.rs index 83d9651af24c4..f120d2b567b7e 100644 --- a/bin/node/cli/src/service.rs +++ b/bin/node/cli/src/service.rs @@ -361,7 +361,7 @@ pub fn new_full_base( config: &config, client: client.clone(), transaction_pool: transaction_pool.clone(), - spawn_handle: task_manager.spawn_handle(), + spawn_handle: Box::new(task_manager.spawn_essential_handle()), import_queue, block_announce_validator_builder: None, warp_sync_params: Some(WarpSyncParams::WithProvider(warp_sync)), diff --git a/client/network/sync/src/warp.rs b/client/network/sync/src/warp.rs index 68540b0ee7c29..1545e6963647d 100644 --- a/client/network/sync/src/warp.rs +++ b/client/network/sync/src/warp.rs @@ -23,7 +23,7 @@ use crate::{ schema::v1::{StateRequest, StateResponse}, state::{ImportResult, StateSync}, }; -use futures::FutureExt; +use futures::{executor::block_on, FutureExt}; use log::error; use sc_client_api::ProofProvider; use sc_network_common::sync::{ @@ -112,7 +112,13 @@ where if let Poll::Ready(Ok(target_block)) = target_block.poll_unpin(cx) { Some(Phase::TargetBlock(target_block)) } else if let Poll::Ready(Err(e)) = target_block.poll_unpin(cx) { - error!(target: "sync", "Failed to get target block. Error: {:?}",e); + block_on(async move { + error!(target: "sync", "Failed to get target block. Error: {:?}",e); + // This `return` might seem unnecessary, but we don't want to make it look like + // everything is working as normal even though the target block failed to be + // retrieved + return + }); None } else { None diff --git a/client/service/src/builder.rs b/client/service/src/builder.rs index 7890b0c5022d5..aba6ee2a06cc8 100644 --- a/client/service/src/builder.rs +++ b/client/service/src/builder.rs @@ -68,7 +68,7 @@ use sp_blockchain::{HeaderBackend, HeaderMetadata}; use sp_consensus::block_validation::{ BlockAnnounceValidator, Chain, DefaultBlockAnnounceValidator, }; -use sp_core::traits::{CodeExecutor, SpawnNamed}; +use sp_core::traits::{CodeExecutor, SpawnEssentialNamed, SpawnNamed}; use sp_keystore::{CryptoStore, SyncCryptoStore, SyncCryptoStorePtr}; use sp_runtime::traits::{Block as BlockT, BlockIdTo, NumberFor, Zero}; use std::{str::FromStr, sync::Arc, time::SystemTime}; @@ -751,7 +751,7 @@ pub struct BuildNetworkParams<'a, TBl: BlockT, TExPool, TImpQu, TCl> { /// A shared transaction pool. pub transaction_pool: Arc, /// A handle for spawning tasks. - pub spawn_handle: SpawnTaskHandle, + pub spawn_handle: Box, /// An import queue. pub import_queue: TImpQu, /// A block announce validator builder. @@ -827,7 +827,11 @@ where config.network.default_peers_set.in_peers as usize + config.network.default_peers_set.out_peers as usize, ); - spawn_handle.spawn("block-request-handler", Some("networking"), handler.run()); + spawn_handle.spawn_essential( + "block-request-handler", + Some("networking"), + Box::pin(handler.run()), + ); protocol_config }; @@ -839,7 +843,11 @@ where client.clone(), config.network.default_peers_set_num_full as usize, ); - spawn_handle.spawn("state-request-handler", Some("networking"), handler.run()); + spawn_handle.spawn_essential( + "state-request-handler", + Some("networking"), + Box::pin(handler.run()), + ); protocol_config }; @@ -856,7 +864,11 @@ where config.chain_spec.fork_id(), warp_with_provider.clone(), ); - spawn_handle.spawn("warp-sync-request-handler", Some("networking"), handler.run()); + spawn_handle.spawn_essential( + "warp-sync-request-handler", + Some("networking"), + Box::pin(handler.run()), + ); Some(protocol_config) }, _ => None, @@ -869,7 +881,11 @@ where config.chain_spec.fork_id(), client.clone(), ); - spawn_handle.spawn("light-client-request-handler", Some("networking"), handler.run()); + spawn_handle.spawn_essential( + "light-client-request-handler", + Some("networking"), + Box::pin(handler.run()), + ); protocol_config }; @@ -898,7 +914,11 @@ where request_response_protocol_configs.push(config.network.ipfs_server.then(|| { let (handler, protocol_config) = BitswapRequestHandler::new(client.clone()); - spawn_handle.spawn("bitswap-request-handler", Some("networking"), handler.run()); + spawn_handle.spawn_essential( + "bitswap-request-handler", + Some("networking"), + Box::pin(handler.run()), + ); protocol_config })); @@ -907,7 +927,7 @@ where executor: { let spawn_handle = Clone::clone(&spawn_handle); Box::new(move |fut| { - spawn_handle.spawn("libp2p-node", Some("networking"), fut); + spawn_handle.spawn_essential("libp2p-node", Some("networking"), fut); }) }, network_config: config.network.clone(), @@ -955,13 +975,21 @@ where config.prometheus_config.as_ref().map(|config| &config.registry), )?; - spawn_handle.spawn("network-transactions-handler", Some("networking"), tx_handler.run()); - spawn_handle.spawn( + spawn_handle.spawn_essential( + "network-transactions-handler", + Some("networking"), + Box::pin(tx_handler.run()), + ); + spawn_handle.spawn_essential( "chain-sync-network-service-provider", Some("networking"), - chain_sync_network_provider.run(network.clone()), + Box::pin(chain_sync_network_provider.run(network.clone())), + ); + spawn_handle.spawn_essential( + "import-queue", + None, + import_queue.run(Box::new(chain_sync_service)), ); - spawn_handle.spawn("import-queue", None, import_queue.run(Box::new(chain_sync_service))); let (system_rpc_tx, system_rpc_rx) = tracing_unbounded("mpsc_system_rpc", 10_000); @@ -997,18 +1025,22 @@ where // issue, and ideally we would like to fix the network future to take as little time as // possible, but we also take the extra harm-prevention measure to execute the networking // future using `spawn_blocking`. - spawn_handle.spawn_blocking("network-worker", Some("networking"), async move { - if network_start_rx.await.is_err() { - log::warn!( + spawn_handle.spawn_essential_blocking( + "network-worker", + Some("networking"), + Box::pin(async move { + if network_start_rx.await.is_err() { + log::warn!( "The NetworkStart returned as part of `build_network` has been silently dropped" ); - // This `return` might seem unnecessary, but we don't want to make it look like - // everything is working as normal even though the user is clearly misusing the API. - return - } + // This `return` might seem unnecessary, but we don't want to make it look like + // everything is working as normal even though the user is clearly misusing the API. + return + } - future.await - }); + future.await + }), + ); Ok((network, system_rpc_tx, tx_handler_controller, NetworkStarter(network_start_tx))) }