From 9db34684f129f674f5271d41e9b307c57f92518a Mon Sep 17 00:00:00 2001 From: sam elamin Date: Tue, 22 Nov 2022 17:20:30 +0000 Subject: [PATCH 01/50] add warp to target block for parachains --- Cargo.lock | 1 + bin/node-template/node/Cargo.toml | 1 + bin/node-template/node/src/service.rs | 3 +- bin/node/cli/src/service.rs | 4 +- client/network/common/src/sync/warp.rs | 11 +++++- client/network/sync/src/lib.rs | 28 +++++++------ client/network/sync/src/warp.rs | 15 ++++--- client/network/test/src/lib.rs | 4 +- client/service/src/builder.rs | 54 ++++++++++++++------------ 9 files changed, 75 insertions(+), 46 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 68c144b3f177e..b9bce90cc396d 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -4880,6 +4880,7 @@ dependencies = [ "sc-executor", "sc-finality-grandpa", "sc-keystore", + "sc-network-common", "sc-rpc", "sc-rpc-api", "sc-service", diff --git a/bin/node-template/node/Cargo.toml b/bin/node-template/node/Cargo.toml index 364cfa25d3c6b..8caf3aa0c049f 100644 --- a/bin/node-template/node/Cargo.toml +++ b/bin/node-template/node/Cargo.toml @@ -23,6 +23,7 @@ futures = { version = "0.3.21", features = ["thread-pool"]} sc-cli = { version = "0.10.0-dev", path = "../../../client/cli" } sp-core = { version = "7.0.0", path = "../../../primitives/core" } sc-executor = { version = "0.10.0-dev", path = "../../../client/executor" } +sc-network-common = { version = "0.10.0-dev", path = "../../../client/network/common" } sc-service = { version = "0.10.0-dev", path = "../../../client/service" } sc-telemetry = { version = "4.0.0-dev", path = "../../../client/telemetry" } sc-keystore = { version = "4.0.0-dev", path = "../../../client/keystore" } diff --git a/bin/node-template/node/src/service.rs b/bin/node-template/node/src/service.rs index ee8464688c79c..687db46cf4ea8 100644 --- a/bin/node-template/node/src/service.rs +++ b/bin/node-template/node/src/service.rs @@ -7,6 +7,7 @@ pub use sc_executor::NativeElseWasmExecutor; use sc_finality_grandpa::SharedVoterState; use sc_keystore::LocalKeystore; use sc_service::{error::Error as ServiceError, Configuration, TaskManager}; +use sc_network_common::sync::warp::WarpSyncParams; use sc_telemetry::{Telemetry, TelemetryWorker}; use sp_consensus_aura::sr25519::AuthorityPair as AuraPair; use std::{sync::Arc, time::Duration}; @@ -200,7 +201,7 @@ pub fn new_full(mut config: Configuration) -> Result spawn_handle: task_manager.spawn_handle(), import_queue, block_announce_validator_builder: None, - warp_sync: Some(warp_sync), + warp_sync_params: Some(WarpSyncParams::WithProvider(warp_sync)), })?; if config.offchain_worker.enabled { diff --git a/bin/node/cli/src/service.rs b/bin/node/cli/src/service.rs index cdee61af3f500..cb7b3fde460e5 100644 --- a/bin/node/cli/src/service.rs +++ b/bin/node/cli/src/service.rs @@ -30,7 +30,7 @@ use sc_client_api::BlockBackend; use sc_consensus_babe::{self, SlotProportion}; use sc_executor::NativeElseWasmExecutor; use sc_network::NetworkService; -use sc_network_common::{protocol::event::Event, service::NetworkEventStream}; +use sc_network_common::{protocol::event::Event, service::NetworkEventStream, sync::warp::WarpSyncParams}; use sc_service::{config::Configuration, error::Error as ServiceError, RpcHandlers, TaskManager}; use sc_telemetry::{Telemetry, TelemetryWorker}; use sp_api::ProvideRuntimeApi; @@ -362,7 +362,7 @@ pub fn new_full_base( spawn_handle: task_manager.spawn_handle(), import_queue, block_announce_validator_builder: None, - warp_sync: Some(warp_sync), + warp_sync_params: Some(WarpSyncParams::WithProvider(warp_sync)), })?; if config.offchain_worker.enabled { diff --git a/client/network/common/src/sync/warp.rs b/client/network/common/src/sync/warp.rs index c9b9037542388..445af3b67d7a9 100644 --- a/client/network/common/src/sync/warp.rs +++ b/client/network/common/src/sync/warp.rs @@ -17,7 +17,8 @@ use codec::{Decode, Encode}; pub use sp_finality_grandpa::{AuthorityList, SetId}; use sp_runtime::traits::{Block as BlockT, NumberFor}; -use std::fmt; +use std::{fmt, sync::Arc}; +use futures::channel::oneshot; /// Scale-encoded warp sync proof response. pub struct EncodedProof(pub Vec); @@ -29,6 +30,14 @@ pub struct WarpProofRequest { pub begin: B::Hash, } +/// Defines different types of syncs +pub enum WarpSyncParams { + /// Standard warp sync for the relay chain + WithProvider(Arc>), + /// New sync that waits for a header from the relaychain + WaitForTarget(oneshot::Receiver<::Header>), +} + /// Proof verification result. pub enum VerificationResult { /// Proof is valid, but the target was not reached. diff --git a/client/network/sync/src/lib.rs b/client/network/sync/src/lib.rs index 312fc6f5b7947..e1fc17bf08951 100644 --- a/client/network/sync/src/lib.rs +++ b/client/network/sync/src/lib.rs @@ -71,7 +71,7 @@ use sc_network_common::{ BlockAnnounce, BlockAnnouncesHandshake, BlockAttributes, BlockData, BlockRequest, BlockResponse, Direction, FromBlock, }, - warp::{EncodedProof, WarpProofRequest, WarpSyncPhase, WarpSyncProgress, WarpSyncProvider}, + warp::{EncodedProof, WarpProofRequest, WarpSyncParams, WarpSyncPhase, WarpSyncProgress}, BadPeer, ChainSync as ChainSyncT, ImportResult, Metrics, OnBlockData, OnBlockJustification, OnStateData, OpaqueBlockRequest, OpaqueBlockResponse, OpaqueStateRequest, OpaqueStateResponse, PeerInfo, PeerRequest, PollBlockAnnounceValidation, SyncMode, @@ -318,8 +318,8 @@ pub struct ChainSync { state_sync: Option>, /// Warp sync in progress, if any. warp_sync: Option>, - /// Warp sync provider. - warp_sync_provider: Option>>, + /// Warp sync params. Provider vs TargetBlock. + warp_sync_params: Option>, /// Enable importing existing blocks. This is used used after the state download to /// catch up to the latest state while re-importing blocks. import_existing: bool, @@ -633,14 +633,20 @@ where if let SyncMode::Warp = &self.mode { if self.peers.len() >= MIN_PEERS_TO_START_WARP_SYNC && self.warp_sync.is_none() { - log::debug!(target: "sync", "Starting warp state sync."); - if let Some(provider) = &self.warp_sync_provider { - self.warp_sync = - Some(WarpSync::new(self.client.clone(), provider.clone())); - } + async move { + match self.warp_sync_params.as_mut().unwrap() { + WarpSyncParams::WithProvider(warp_with_provider) => { + log::debug!(target: "sync", "Starting warp state sync."); + self.warp_sync = Some(WarpSync::new(self.client.clone(), warp_with_provider.clone())); + } + WarpSyncParams::WaitForTarget(header) => { + log::debug!(target: "sync", "Waiting for target block."); + self.warp_sync = Some(WarpSync::new_with_target_block(self.client.clone(), header.await.unwrap())); + } + } + }.boxed(); } } - Ok(req) }, Ok(BlockStatus::Queued) | @@ -1427,8 +1433,8 @@ where roles: Roles, block_announce_validator: Box + Send>, max_parallel_downloads: u32, - warp_sync_provider: Option>>, metrics_registry: Option<&Registry>, + warp_sync_params: Option>, network_service: service::network::NetworkServiceHandle, import_queue: Box>, block_request_protocol_name: ProtocolName, @@ -1467,13 +1473,13 @@ where block_announce_validation_per_peer_stats: Default::default(), state_sync: None, warp_sync: None, - warp_sync_provider, import_existing: false, gap_sync: None, service_rx, network_service, block_request_protocol_name, state_request_protocol_name, + warp_sync_params, warp_sync_protocol_name, block_announce_protocol_name: block_announce_config .notifications_protocol diff --git a/client/network/sync/src/warp.rs b/client/network/sync/src/warp.rs index ab8a7c66b9856..632fee9aa6b36 100644 --- a/client/network/sync/src/warp.rs +++ b/client/network/sync/src/warp.rs @@ -36,7 +36,7 @@ use sp_runtime::traits::{Block as BlockT, Header, NumberFor, Zero}; use std::sync::Arc; enum Phase { - WarpProof { set_id: SetId, authorities: AuthorityList, last_hash: B::Hash }, + WarpProof { set_id: SetId, authorities: AuthorityList, last_hash: B::Hash, warp_sync_provider: Arc> }, TargetBlock(B::Header), State(StateSync), } @@ -61,7 +61,6 @@ pub enum TargetBlockImportResult { pub struct WarpSync { phase: Phase, client: Arc, - warp_sync_provider: Arc>, total_proof_bytes: u64, } @@ -77,8 +76,14 @@ where set_id: 0, authorities: warp_sync_provider.current_authorities(), last_hash, + warp_sync_provider }; - Self { client, warp_sync_provider, phase, total_proof_bytes: 0 } + Self { client, phase, total_proof_bytes: 0 } + } + + pub fn new_with_target_block(client: Arc, target: ::Header) -> Self { + let phase = Phase::TargetBlock(target); + Self { client, phase, total_proof_bytes: 0 } } /// Validate and import a state response. @@ -99,8 +104,8 @@ where log::debug!(target: "sync", "Unexpected warp proof response"); WarpProofImportResult::BadResponse }, - Phase::WarpProof { set_id, authorities, last_hash } => { - match self.warp_sync_provider.verify(&response, *set_id, authorities.clone()) { + Phase::WarpProof { set_id, authorities, last_hash, warp_sync_provider } => { + match warp_sync_provider.verify(&response, *set_id, authorities.clone()) { Err(e) => { log::debug!(target: "sync", "Bad warp proof response: {}", e); WarpProofImportResult::BadResponse diff --git a/client/network/test/src/lib.rs b/client/network/test/src/lib.rs index b3653ac7c0f88..bdeb42f2d24db 100644 --- a/client/network/test/src/lib.rs +++ b/client/network/test/src/lib.rs @@ -56,7 +56,7 @@ use sc_network_common::{ }, protocol::{role::Roles, ProtocolName}, service::{NetworkBlock, NetworkStateInfo, NetworkSyncForkRequest}, - sync::warp::{AuthorityList, EncodedProof, SetId, VerificationResult, WarpSyncProvider}, + sync::warp::{AuthorityList, EncodedProof, SetId, VerificationResult, WarpSyncParams, WarpSyncProvider}, }; use sc_network_light::light_client_requests::handler::LightClientRequestHandler; use sc_network_sync::{ @@ -903,7 +903,7 @@ where Roles::from(if config.is_authority { &Role::Authority } else { &Role::Full }), block_announce_validator, network_config.max_parallel_downloads, - Some(warp_sync), + Some(WarpSyncParams::WithProvider(warp_sync)), None, chain_sync_network_handle, import_queue.service(), diff --git a/client/service/src/builder.rs b/client/service/src/builder.rs index 1f94f96fae89e..cba6786b4995f 100644 --- a/client/service/src/builder.rs +++ b/client/service/src/builder.rs @@ -43,7 +43,7 @@ use sc_network_bitswap::BitswapRequestHandler; use sc_network_common::{ protocol::role::Roles, service::{NetworkStateInfo, NetworkStatusProvider}, - sync::warp::WarpSyncProvider, + sync::warp::WarpSyncParams, }; use sc_network_light::light_client_requests::handler::LightClientRequestHandler; use sc_network_sync::{ @@ -757,8 +757,8 @@ pub struct BuildNetworkParams<'a, TBl: BlockT, TExPool, TImpQu, TCl> { /// A block announce validator builder. pub block_announce_validator_builder: Option) -> Box + Send> + Send>>, - /// An optional warp sync provider. - pub warp_sync: Option>>, + /// Optional warp sync params. + pub warp_sync_params: Option>, } /// Build the network service, the network status sinks and an RPC sender. pub fn build_network( @@ -793,12 +793,12 @@ where spawn_handle, import_queue, block_announce_validator_builder, - warp_sync, + warp_sync_params, } = params; let mut request_response_protocol_configs = Vec::new(); - if warp_sync.is_none() && config.network.sync_mode.is_warp() { + if warp_sync_params.is_none() && config.network.sync_mode.is_warp() { return Err("Warp sync enabled, but no warp sync provider configured.".into()) } @@ -843,23 +843,30 @@ where protocol_config }; - let (warp_sync_provider, warp_sync_protocol_config) = warp_sync - .map(|provider| { - // Allow both outgoing and incoming requests. - let (handler, protocol_config) = WarpSyncRequestHandler::new( - protocol_id.clone(), - client - .block_hash(0u32.into()) - .ok() - .flatten() - .expect("Genesis block exists; qed"), - config.chain_spec.fork_id(), - provider.clone(), - ); - spawn_handle.spawn("warp-sync-request-handler", Some("networking"), handler.run()); - (Some(provider), Some(protocol_config)) - }) - .unwrap_or_default(); + let mut warp_sync_protocol_config = None; + match warp_sync_params.as_ref().unwrap() { + WarpSyncParams::WithProvider(warp_with_provider) => { + (_, warp_sync_protocol_config) = Some(warp_with_provider) + .map(|provider| { + // Allow both outgoing and incoming requests. + let (handler, protocol_config) = WarpSyncRequestHandler::new( + protocol_id.clone(), + client + .block_hash(0u32.into()) + .ok() + .flatten() + .expect("Genesis block exists; qed"), + config.chain_spec.fork_id(), + provider.clone(), + ); + spawn_handle.spawn("warp-sync-request-handler", Some("networking"), handler.run()); + (Some(provider), Some(protocol_config)) + }) + .unwrap_or_default(); + }, + _ => { + } + } let light_client_request_protocol_config = { // Allow both outgoing and incoming requests. @@ -886,10 +893,9 @@ where Roles::from(&config.role), block_announce_validator, config.network.max_parallel_downloads, - warp_sync_provider, + warp_sync_params, config.prometheus_config.as_ref().map(|config| config.registry.clone()).as_ref(), chain_sync_network_handle, - import_queue.service(), block_request_protocol_config.name.clone(), state_request_protocol_config.name.clone(), warp_sync_protocol_config.as_ref().map(|config| config.name.clone()), From f46d68b1bf7623d6bf1a1d1fdaf8d52eebd70ca9 Mon Sep 17 00:00:00 2001 From: sam elamin Date: Wed, 23 Nov 2022 09:47:39 +0000 Subject: [PATCH 02/50] fix for failing tests --- client/network/sync/Cargo.toml | 1 + client/network/sync/src/lib.rs | 6 +++--- 2 files changed, 4 insertions(+), 3 deletions(-) diff --git a/client/network/sync/Cargo.toml b/client/network/sync/Cargo.toml index d2cbeb5ec1f1d..2372237eeea63 100644 --- a/client/network/sync/Cargo.toml +++ b/client/network/sync/Cargo.toml @@ -18,6 +18,7 @@ prost-build = "0.11" [dependencies] array-bytes = "4.1" async-trait = "0.1.58" +async-std = { version = "1.11.0", default-features = false } codec = { package = "parity-scale-codec", version = "3.0.0", features = ["derive"] } futures = "0.3.21" libp2p = "0.50.0" diff --git a/client/network/sync/src/lib.rs b/client/network/sync/src/lib.rs index e1fc17bf08951..d2f4b47059081 100644 --- a/client/network/sync/src/lib.rs +++ b/client/network/sync/src/lib.rs @@ -633,7 +633,6 @@ where if let SyncMode::Warp = &self.mode { if self.peers.len() >= MIN_PEERS_TO_START_WARP_SYNC && self.warp_sync.is_none() { - async move { match self.warp_sync_params.as_mut().unwrap() { WarpSyncParams::WithProvider(warp_with_provider) => { log::debug!(target: "sync", "Starting warp state sync."); @@ -641,10 +640,11 @@ where } WarpSyncParams::WaitForTarget(header) => { log::debug!(target: "sync", "Waiting for target block."); - self.warp_sync = Some(WarpSync::new_with_target_block(self.client.clone(), header.await.unwrap())); + async_std::task::block_on(async { + self.warp_sync = Some(WarpSync::new_with_target_block(self.client.clone(), header.await.unwrap())); + }); } } - }.boxed(); } } Ok(req) From 5b814bc10d0901a745cb16a32b034a5f0fe1e4e8 Mon Sep 17 00:00:00 2001 From: sam elamin Date: Wed, 23 Nov 2022 09:51:13 +0000 Subject: [PATCH 03/50] format using `Cargo +nightly fmt` --- bin/node-template/node/src/service.rs | 2 +- bin/node/cli/src/service.rs | 4 +++- client/network/common/src/sync/warp.rs | 4 ++-- client/network/sync/Cargo.toml | 1 - client/network/sync/src/lib.rs | 32 ++++++++++++++++---------- client/network/sync/src/warp.rs | 20 ++++++++++------ client/network/test/src/lib.rs | 4 +++- client/service/src/builder.rs | 19 ++++++++------- 8 files changed, 53 insertions(+), 33 deletions(-) diff --git a/bin/node-template/node/src/service.rs b/bin/node-template/node/src/service.rs index 687db46cf4ea8..ae6ad867583d8 100644 --- a/bin/node-template/node/src/service.rs +++ b/bin/node-template/node/src/service.rs @@ -6,8 +6,8 @@ use sc_consensus_aura::{ImportQueueParams, SlotProportion, StartAuraParams}; pub use sc_executor::NativeElseWasmExecutor; use sc_finality_grandpa::SharedVoterState; use sc_keystore::LocalKeystore; -use sc_service::{error::Error as ServiceError, Configuration, TaskManager}; use sc_network_common::sync::warp::WarpSyncParams; +use sc_service::{error::Error as ServiceError, Configuration, TaskManager}; use sc_telemetry::{Telemetry, TelemetryWorker}; use sp_consensus_aura::sr25519::AuthorityPair as AuraPair; use std::{sync::Arc, time::Duration}; diff --git a/bin/node/cli/src/service.rs b/bin/node/cli/src/service.rs index cb7b3fde460e5..83d9651af24c4 100644 --- a/bin/node/cli/src/service.rs +++ b/bin/node/cli/src/service.rs @@ -30,7 +30,9 @@ use sc_client_api::BlockBackend; use sc_consensus_babe::{self, SlotProportion}; use sc_executor::NativeElseWasmExecutor; use sc_network::NetworkService; -use sc_network_common::{protocol::event::Event, service::NetworkEventStream, sync::warp::WarpSyncParams}; +use sc_network_common::{ + protocol::event::Event, service::NetworkEventStream, sync::warp::WarpSyncParams, +}; use sc_service::{config::Configuration, error::Error as ServiceError, RpcHandlers, TaskManager}; use sc_telemetry::{Telemetry, TelemetryWorker}; use sp_api::ProvideRuntimeApi; diff --git a/client/network/common/src/sync/warp.rs b/client/network/common/src/sync/warp.rs index 445af3b67d7a9..9beb6f861276c 100644 --- a/client/network/common/src/sync/warp.rs +++ b/client/network/common/src/sync/warp.rs @@ -15,10 +15,10 @@ // along with Substrate. If not, see . use codec::{Decode, Encode}; +use futures::channel::oneshot; pub use sp_finality_grandpa::{AuthorityList, SetId}; use sp_runtime::traits::{Block as BlockT, NumberFor}; use std::{fmt, sync::Arc}; -use futures::channel::oneshot; /// Scale-encoded warp sync proof response. pub struct EncodedProof(pub Vec); @@ -34,7 +34,7 @@ pub struct WarpProofRequest { pub enum WarpSyncParams { /// Standard warp sync for the relay chain WithProvider(Arc>), - /// New sync that waits for a header from the relaychain + /// Skip downloading proofs and wait for a header from the relaychain WaitForTarget(oneshot::Receiver<::Header>), } diff --git a/client/network/sync/Cargo.toml b/client/network/sync/Cargo.toml index 2372237eeea63..d2cbeb5ec1f1d 100644 --- a/client/network/sync/Cargo.toml +++ b/client/network/sync/Cargo.toml @@ -18,7 +18,6 @@ prost-build = "0.11" [dependencies] array-bytes = "4.1" async-trait = "0.1.58" -async-std = { version = "1.11.0", default-features = false } codec = { package = "parity-scale-codec", version = "3.0.0", features = ["derive"] } futures = "0.3.21" libp2p = "0.50.0" diff --git a/client/network/sync/src/lib.rs b/client/network/sync/src/lib.rs index d2f4b47059081..24470e21a80e4 100644 --- a/client/network/sync/src/lib.rs +++ b/client/network/sync/src/lib.rs @@ -565,6 +565,7 @@ where info!("💔 New peer with unknown genesis hash {} ({}).", best_hash, best_number); return Err(BadPeer(who, rep::GENESIS_MISMATCH)) } + // If there are more than `MAJOR_SYNC_BLOCKS` in the import queue then we have // enough to do in the import queue that it's not worth kicking off // an ancestor search, which is what we do in the next match case below. @@ -633,18 +634,25 @@ where if let SyncMode::Warp = &self.mode { if self.peers.len() >= MIN_PEERS_TO_START_WARP_SYNC && self.warp_sync.is_none() { - match self.warp_sync_params.as_mut().unwrap() { - WarpSyncParams::WithProvider(warp_with_provider) => { - log::debug!(target: "sync", "Starting warp state sync."); - self.warp_sync = Some(WarpSync::new(self.client.clone(), warp_with_provider.clone())); - } - WarpSyncParams::WaitForTarget(header) => { - log::debug!(target: "sync", "Waiting for target block."); - async_std::task::block_on(async { - self.warp_sync = Some(WarpSync::new_with_target_block(self.client.clone(), header.await.unwrap())); - }); - } - } + match self.warp_sync_params.as_mut() { + Some(WarpSyncParams::WithProvider(warp_with_provider)) => { + log::debug!(target: "sync", "Starting warp state sync."); + self.warp_sync = Some(WarpSync::new( + self.client.clone(), + warp_with_provider.clone(), + )); + }, + Some(WarpSyncParams::WaitForTarget(target_block)) => { + log::debug!(target: "sync", "Waiting for target block."); + futures::executor::block_on(async { + self.warp_sync = Some(WarpSync::new_with_target_block( + self.client.clone(), + target_block.await.unwrap(), + )); + }); + }, + None => {}, + } } } Ok(req) diff --git a/client/network/sync/src/warp.rs b/client/network/sync/src/warp.rs index 632fee9aa6b36..5d59dfd21622f 100644 --- a/client/network/sync/src/warp.rs +++ b/client/network/sync/src/warp.rs @@ -36,7 +36,12 @@ use sp_runtime::traits::{Block as BlockT, Header, NumberFor, Zero}; use std::sync::Arc; enum Phase { - WarpProof { set_id: SetId, authorities: AuthorityList, last_hash: B::Hash, warp_sync_provider: Arc> }, + WarpProof { + set_id: SetId, + authorities: AuthorityList, + last_hash: B::Hash, + warp_sync_provider: Arc>, + }, TargetBlock(B::Header), State(StateSync), } @@ -76,13 +81,15 @@ where set_id: 0, authorities: warp_sync_provider.current_authorities(), last_hash, - warp_sync_provider + warp_sync_provider, }; Self { client, phase, total_proof_bytes: 0 } } - pub fn new_with_target_block(client: Arc, target: ::Header) -> Self { - let phase = Phase::TargetBlock(target); + /// Create a new instance, skip the proof downloading and verification and go directly to + /// target block + pub fn new_with_target_block(client: Arc, target_block: ::Header) -> Self { + let phase = Phase::TargetBlock(target_block); Self { client, phase, total_proof_bytes: 0 } } @@ -104,7 +111,7 @@ where log::debug!(target: "sync", "Unexpected warp proof response"); WarpProofImportResult::BadResponse }, - Phase::WarpProof { set_id, authorities, last_hash, warp_sync_provider } => { + Phase::WarpProof { set_id, authorities, last_hash, warp_sync_provider } => match warp_sync_provider.verify(&response, *set_id, authorities.clone()) { Err(e) => { log::debug!(target: "sync", "Bad warp proof response: {}", e); @@ -124,8 +131,7 @@ where self.phase = Phase::TargetBlock(header); WarpProofImportResult::Success }, - } - }, + }, } } diff --git a/client/network/test/src/lib.rs b/client/network/test/src/lib.rs index bdeb42f2d24db..c4023a4e034d0 100644 --- a/client/network/test/src/lib.rs +++ b/client/network/test/src/lib.rs @@ -56,7 +56,9 @@ use sc_network_common::{ }, protocol::{role::Roles, ProtocolName}, service::{NetworkBlock, NetworkStateInfo, NetworkSyncForkRequest}, - sync::warp::{AuthorityList, EncodedProof, SetId, VerificationResult, WarpSyncParams, WarpSyncProvider}, + sync::warp::{ + AuthorityList, EncodedProof, SetId, VerificationResult, WarpSyncParams, WarpSyncProvider, + }, }; use sc_network_light::light_client_requests::handler::LightClientRequestHandler; use sc_network_sync::{ diff --git a/client/service/src/builder.rs b/client/service/src/builder.rs index cba6786b4995f..5d04153b7ef0d 100644 --- a/client/service/src/builder.rs +++ b/client/service/src/builder.rs @@ -843,10 +843,9 @@ where protocol_config }; - let mut warp_sync_protocol_config = None; - match warp_sync_params.as_ref().unwrap() { - WarpSyncParams::WithProvider(warp_with_provider) => { - (_, warp_sync_protocol_config) = Some(warp_with_provider) + let warp_sync_protocol_config = match warp_sync_params.as_ref() { + Some(WarpSyncParams::WithProvider(warp_with_provider)) => { + let (_, warp_sync_protocol_config) = Some(warp_with_provider) .map(|provider| { // Allow both outgoing and incoming requests. let (handler, protocol_config) = WarpSyncRequestHandler::new( @@ -859,14 +858,18 @@ where config.chain_spec.fork_id(), provider.clone(), ); - spawn_handle.spawn("warp-sync-request-handler", Some("networking"), handler.run()); + spawn_handle.spawn( + "warp-sync-request-handler", + Some("networking"), + handler.run(), + ); (Some(provider), Some(protocol_config)) }) .unwrap_or_default(); + warp_sync_protocol_config }, - _ => { - } - } + _ => None, + }; let light_client_request_protocol_config = { // Allow both outgoing and incoming requests. From c12da8902f00b463a75c4c595f795414cbc3b19c Mon Sep 17 00:00:00 2001 From: sam elamin Date: Mon, 28 Nov 2022 14:43:17 +0000 Subject: [PATCH 04/50] Remove blocking based on PR comments and create new `WarpSync` on poll --- client/network/common/src/sync.rs | 10 ++++++++++ client/network/sync/src/lib.rs | 28 ++++++++++++++++++---------- 2 files changed, 28 insertions(+), 10 deletions(-) diff --git a/client/network/common/src/sync.rs b/client/network/common/src/sync.rs index 802cf75fc4709..b687abf517828 100644 --- a/client/network/common/src/sync.rs +++ b/client/network/common/src/sync.rs @@ -368,6 +368,16 @@ pub trait ChainSync: Send { cx: &mut std::task::Context<'a>, ) -> Poll>; + /// Poll warp sync target block + /// + /// This should be polled until it returns [`target_block`]. + /// + /// If [`target_block`] is returned, then `WarpSync::new` is called with a target header + fn poll_warp_sync_target_block<'a>( + &mut self, + cx: &mut std::task::Context<'a>, + ) -> Poll; + /// Call when a peer has disconnected. /// Canceled obsolete block request may result in some blocks being ready for /// import, so this functions checks for such blocks and returns them. diff --git a/client/network/sync/src/lib.rs b/client/network/sync/src/lib.rs index 24470e21a80e4..31f42ff8f13a2 100644 --- a/client/network/sync/src/lib.rs +++ b/client/network/sync/src/lib.rs @@ -642,16 +642,7 @@ where warp_with_provider.clone(), )); }, - Some(WarpSyncParams::WaitForTarget(target_block)) => { - log::debug!(target: "sync", "Waiting for target block."); - futures::executor::block_on(async { - self.warp_sync = Some(WarpSync::new_with_target_block( - self.client.clone(), - target_block.await.unwrap(), - )); - }); - }, - None => {}, + _ => {}, } } } @@ -1007,6 +998,17 @@ where Ok(self.validate_and_queue_blocks(new_blocks, gap)) } + + fn poll_warp_sync_target_block(&mut self, cx: &mut std::task::Context) -> Poll { + if let Some(WarpSyncParams::WaitForTarget(target_block)) = self.warp_sync_params.as_mut() { + return match target_block.poll_unpin(cx) { + Poll::Ready(Ok(target_block)) => Poll::Ready(target_block), + _ => Poll::Pending, + } + } + Poll::Pending + } + fn process_block_response_data(&mut self, blocks_to_import: Result, BadPeer>) { match blocks_to_import { Ok(OnBlockData::Import(origin, blocks)) => self.import_blocks(origin, blocks), @@ -1374,6 +1376,12 @@ where } } self.process_outbound_requests(); + match self.poll_warp_sync_target_block(cx) { + Poll::Ready(target_block) => + self.warp_sync = + Some(WarpSync::new_with_target_block(self.client.clone(), target_block)), + Poll::Pending => (), + }; while let Poll::Ready(result) = self.poll_pending_responses(cx) { match result { From 57cc0908852d34ba0b530cfe447bb7dd1b649b86 Mon Sep 17 00:00:00 2001 From: Sam Elamin Date: Thu, 1 Dec 2022 09:05:13 +0000 Subject: [PATCH 05/50] remove method from trait --- client/network/common/src/sync.rs | 10 ---------- client/network/sync/src/lib.rs | 15 +++++++++------ 2 files changed, 9 insertions(+), 16 deletions(-) diff --git a/client/network/common/src/sync.rs b/client/network/common/src/sync.rs index b687abf517828..802cf75fc4709 100644 --- a/client/network/common/src/sync.rs +++ b/client/network/common/src/sync.rs @@ -368,16 +368,6 @@ pub trait ChainSync: Send { cx: &mut std::task::Context<'a>, ) -> Poll>; - /// Poll warp sync target block - /// - /// This should be polled until it returns [`target_block`]. - /// - /// If [`target_block`] is returned, then `WarpSync::new` is called with a target header - fn poll_warp_sync_target_block<'a>( - &mut self, - cx: &mut std::task::Context<'a>, - ) -> Poll; - /// Call when a peer has disconnected. /// Canceled obsolete block request may result in some blocks being ready for /// import, so this functions checks for such blocks and returns them. diff --git a/client/network/sync/src/lib.rs b/client/network/sync/src/lib.rs index 31f42ff8f13a2..16d55f321d0a2 100644 --- a/client/network/sync/src/lib.rs +++ b/client/network/sync/src/lib.rs @@ -1376,12 +1376,15 @@ where } } self.process_outbound_requests(); - match self.poll_warp_sync_target_block(cx) { - Poll::Ready(target_block) => - self.warp_sync = - Some(WarpSync::new_with_target_block(self.client.clone(), target_block)), - Poll::Pending => (), - }; + + if let Some(WarpSyncParams::WaitForTarget(target_block)) = self.warp_sync_params.as_mut() { + match target_block.poll_unpin(cx) { + Poll::Ready(Ok(target_block)) => + self.warp_sync = + Some(WarpSync::new_with_target_block(self.client.clone(), target_block)), + _ => (), + } + } while let Poll::Ready(result) = self.poll_pending_responses(cx) { match result { From a3205b4cafe4325bec8bdf4e726a7706389d24e2 Mon Sep 17 00:00:00 2001 From: sam elamin Date: Wed, 7 Dec 2022 20:19:42 +0000 Subject: [PATCH 06/50] add tests for wait for target --- client/network/test/src/lib.rs | 21 ++++++++++++++++++-- client/network/test/src/sync.rs | 34 ++++++++++++++++++++++++++++++++- 2 files changed, 52 insertions(+), 3 deletions(-) diff --git a/client/network/test/src/lib.rs b/client/network/test/src/lib.rs index c4023a4e034d0..6283319360558 100644 --- a/client/network/test/src/lib.rs +++ b/client/network/test/src/lib.rs @@ -31,7 +31,7 @@ use std::{ time::Duration, }; -use futures::{future::BoxFuture, prelude::*}; +use futures::{channel::oneshot, future::BoxFuture, prelude::*}; use libp2p::{build_multiaddr, PeerId}; use log::trace; use parking_lot::Mutex; @@ -424,6 +424,11 @@ where self.push_blocks_at(BlockId::Hash(best_hash), count, with_tx) } + pub fn get_best_header(&mut self) -> ::Header { + let info = self.client.info(); + self.client.header(&BlockId::hash(info.best_hash)).unwrap().unwrap() + } + /// Push blocks to the peer (simplified: with or without a TX) pub fn push_headers(&mut self, count: usize) -> Vec { let best_hash = self.client.info().best_hash; @@ -724,6 +729,8 @@ pub struct FullPeerConfig { pub extra_storage: Option, /// Enable transaction indexing. pub storage_chain: bool, + /// Optional target block header to sync to + pub target_block: Option<::Header>, } #[async_trait::async_trait] @@ -869,6 +876,15 @@ where let warp_sync = Arc::new(TestWarpSyncProvider(client.clone())); + let warp_sync_params = match config.target_block { + Some(target_block) => { + let (sender, receiver) = oneshot::channel::<::Header>(); + let _ = sender.send(target_block); + WarpSyncParams::WaitForTarget(receiver) + }, + _ => WarpSyncParams::WithProvider(warp_sync.clone()), + }; + let warp_protocol_config = { let (handler, protocol_config) = warp_request_handler::RequestHandler::new( protocol_id.clone(), @@ -889,6 +905,7 @@ where .unwrap_or_else(|| Box::new(DefaultBlockAnnounceValidator)); let (chain_sync_network_provider, chain_sync_network_handle) = NetworkServiceProvider::new(); + let (chain_sync, chain_sync_service, block_announce_config) = ChainSync::new( match network_config.sync_mode { SyncMode::Full => sc_network_common::sync::SyncMode::Full, @@ -905,7 +922,7 @@ where Roles::from(if config.is_authority { &Role::Authority } else { &Role::Full }), block_announce_validator, network_config.max_parallel_downloads, - Some(WarpSyncParams::WithProvider(warp_sync)), + Some(warp_sync_params), None, chain_sync_network_handle, import_queue.service(), diff --git a/client/network/test/src/sync.rs b/client/network/test/src/sync.rs index b629574fe9874..942ad0a798508 100644 --- a/client/network/test/src/sync.rs +++ b/client/network/test/src/sync.rs @@ -1250,7 +1250,39 @@ async fn warp_sync() { } #[tokio::test(flavor = "multi_thread", worker_threads = 2)] -async fn syncs_huge_blocks() { +#[test] +fn warp_sync_to_target_block() { + sp_tracing::try_init_simple(); + let runtime = Runtime::new().unwrap(); + let mut net = TestNet::new(runtime.handle().clone(), 0); + // Create 3 synced peers and 1 peer trying to warp sync. + net.add_full_peer_with_config(Default::default()); + net.add_full_peer_with_config(Default::default()); + net.add_full_peer_with_config(Default::default()); + + net.peer(0).push_blocks(63, false); + net.peer(0).push_blocks(1, false); + + let target_block = net.peer(0).get_best_header(); + + net.peer(1).push_blocks(64, false); + net.peer(2).push_blocks(64, false); + + net.add_full_peer_with_config(FullPeerConfig { + sync_mode: SyncMode::Warp, + target_block: Some(target_block), + ..Default::default() + }); + + // Wait for peer 1 to sync state. + runtime.block_on(net.wait_until_sync()); + assert!(!net.peer(3).client().has_state_at(&BlockId::Number(1))); + assert!(net.peer(3).client().has_state_at(&BlockId::Number(64))); +} + +#[tokio::test(flavor = "multi_thread", worker_threads = 2)] +#[test] +fn syncs_huge_blocks() { use sp_core::storage::well_known_keys::HEAP_PAGES; use sp_runtime::codec::Encode; use substrate_test_runtime_client::BlockBuilderExt; From e57569eda8b1476b4e87c219d6f88dcad9aca5bc Mon Sep 17 00:00:00 2001 From: Sam Elamin Date: Wed, 7 Dec 2022 22:00:01 +0000 Subject: [PATCH 07/50] Update client/network/common/src/sync/warp.rs MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Co-authored-by: Bastian Köcher --- client/network/common/src/sync/warp.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/client/network/common/src/sync/warp.rs b/client/network/common/src/sync/warp.rs index 9beb6f861276c..662cc5b7c0e20 100644 --- a/client/network/common/src/sync/warp.rs +++ b/client/network/common/src/sync/warp.rs @@ -30,7 +30,7 @@ pub struct WarpProofRequest { pub begin: B::Hash, } -/// Defines different types of syncs +/// The different types of warp syncing. pub enum WarpSyncParams { /// Standard warp sync for the relay chain WithProvider(Arc>), From 18647227780c3d2f22a13c4a0c7bf3f1dd82f57a Mon Sep 17 00:00:00 2001 From: Sam Elamin Date: Wed, 7 Dec 2022 22:00:09 +0000 Subject: [PATCH 08/50] Update client/network/common/src/sync/warp.rs MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Co-authored-by: Bastian Köcher --- client/network/common/src/sync/warp.rs | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/client/network/common/src/sync/warp.rs b/client/network/common/src/sync/warp.rs index 662cc5b7c0e20..9e3dcf2e92bc8 100644 --- a/client/network/common/src/sync/warp.rs +++ b/client/network/common/src/sync/warp.rs @@ -34,7 +34,9 @@ pub struct WarpProofRequest { pub enum WarpSyncParams { /// Standard warp sync for the relay chain WithProvider(Arc>), - /// Skip downloading proofs and wait for a header from the relaychain + /// Skip downloading proofs and wait for a header of the state that should be downloaded. + /// + /// It is expected that the header provider ensures that the header is trusted. WaitForTarget(oneshot::Receiver<::Header>), } From 6d5eef82c087631e5de06536a85830e4649c662d Mon Sep 17 00:00:00 2001 From: Sam Elamin Date: Wed, 7 Dec 2022 22:00:16 +0000 Subject: [PATCH 09/50] Update client/network/test/src/sync.rs MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Co-authored-by: Bastian Köcher --- client/network/test/src/sync.rs | 5 +---- 1 file changed, 1 insertion(+), 4 deletions(-) diff --git a/client/network/test/src/sync.rs b/client/network/test/src/sync.rs index 942ad0a798508..46e306081d865 100644 --- a/client/network/test/src/sync.rs +++ b/client/network/test/src/sync.rs @@ -1254,11 +1254,8 @@ async fn warp_sync() { fn warp_sync_to_target_block() { sp_tracing::try_init_simple(); let runtime = Runtime::new().unwrap(); - let mut net = TestNet::new(runtime.handle().clone(), 0); // Create 3 synced peers and 1 peer trying to warp sync. - net.add_full_peer_with_config(Default::default()); - net.add_full_peer_with_config(Default::default()); - net.add_full_peer_with_config(Default::default()); + let mut net = TestNet::new(runtime.handle().clone(), 3); net.peer(0).push_blocks(63, false); net.peer(0).push_blocks(1, false); From 02e5885130593011d4269b33b7141899feed09b6 Mon Sep 17 00:00:00 2001 From: Sam Elamin Date: Wed, 7 Dec 2022 22:00:38 +0000 Subject: [PATCH 10/50] Update client/network/test/src/sync.rs MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Co-authored-by: Bastian Köcher --- client/network/test/src/sync.rs | 3 --- 1 file changed, 3 deletions(-) diff --git a/client/network/test/src/sync.rs b/client/network/test/src/sync.rs index 46e306081d865..daed73be3ac8b 100644 --- a/client/network/test/src/sync.rs +++ b/client/network/test/src/sync.rs @@ -1262,9 +1262,6 @@ fn warp_sync_to_target_block() { let target_block = net.peer(0).get_best_header(); - net.peer(1).push_blocks(64, false); - net.peer(2).push_blocks(64, false); - net.add_full_peer_with_config(FullPeerConfig { sync_mode: SyncMode::Warp, target_block: Some(target_block), From 2a9f68acd72b9b25b633d63e470abcb6e10cbdbd Mon Sep 17 00:00:00 2001 From: Sam Elamin Date: Wed, 7 Dec 2022 22:00:43 +0000 Subject: [PATCH 11/50] Update client/network/test/src/lib.rs MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Co-authored-by: Bastian Köcher --- client/network/test/src/lib.rs | 5 ----- 1 file changed, 5 deletions(-) diff --git a/client/network/test/src/lib.rs b/client/network/test/src/lib.rs index 6283319360558..ccaebc976135b 100644 --- a/client/network/test/src/lib.rs +++ b/client/network/test/src/lib.rs @@ -424,11 +424,6 @@ where self.push_blocks_at(BlockId::Hash(best_hash), count, with_tx) } - pub fn get_best_header(&mut self) -> ::Header { - let info = self.client.info(); - self.client.header(&BlockId::hash(info.best_hash)).unwrap().unwrap() - } - /// Push blocks to the peer (simplified: with or without a TX) pub fn push_headers(&mut self, count: usize) -> Vec { let best_hash = self.client.info().best_hash; From 9c2f8f511130af4a3a245ee47851a4255308832d Mon Sep 17 00:00:00 2001 From: Sam Elamin Date: Wed, 7 Dec 2022 22:00:51 +0000 Subject: [PATCH 12/50] Update client/network/test/src/sync.rs MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Co-authored-by: Bastian Köcher --- client/network/test/src/sync.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/client/network/test/src/sync.rs b/client/network/test/src/sync.rs index daed73be3ac8b..3fe09f480cb7a 100644 --- a/client/network/test/src/sync.rs +++ b/client/network/test/src/sync.rs @@ -1257,7 +1257,7 @@ fn warp_sync_to_target_block() { // Create 3 synced peers and 1 peer trying to warp sync. let mut net = TestNet::new(runtime.handle().clone(), 3); - net.peer(0).push_blocks(63, false); + net.peer(0).push_blocks(64, false); net.peer(0).push_blocks(1, false); let target_block = net.peer(0).get_best_header(); From f36c21575db6c65873ca4a576993eb1bbf02ae2a Mon Sep 17 00:00:00 2001 From: Sam Elamin Date: Wed, 7 Dec 2022 22:01:01 +0000 Subject: [PATCH 13/50] Update client/network/test/src/sync.rs MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Co-authored-by: Bastian Köcher --- client/network/test/src/sync.rs | 1 - 1 file changed, 1 deletion(-) diff --git a/client/network/test/src/sync.rs b/client/network/test/src/sync.rs index 3fe09f480cb7a..eed6790e8a5ab 100644 --- a/client/network/test/src/sync.rs +++ b/client/network/test/src/sync.rs @@ -1258,7 +1258,6 @@ fn warp_sync_to_target_block() { let mut net = TestNet::new(runtime.handle().clone(), 3); net.peer(0).push_blocks(64, false); - net.peer(0).push_blocks(1, false); let target_block = net.peer(0).get_best_header(); From 709608e84d18e0ecb5c064b434810e141eaeb1ef Mon Sep 17 00:00:00 2001 From: sam elamin Date: Wed, 7 Dec 2022 23:08:30 +0000 Subject: [PATCH 14/50] code refactor based on pr comments --- client/network/sync/src/lib.rs | 11 ++++------- client/network/sync/src/warp.rs | 25 +++++++++++++++++++---- client/network/test/src/lib.rs | 6 ++++++ client/service/src/builder.rs | 35 ++++++++++++--------------------- 4 files changed, 44 insertions(+), 33 deletions(-) diff --git a/client/network/sync/src/lib.rs b/client/network/sync/src/lib.rs index 16d55f321d0a2..82455e840f821 100644 --- a/client/network/sync/src/lib.rs +++ b/client/network/sync/src/lib.rs @@ -1377,13 +1377,10 @@ where } self.process_outbound_requests(); - if let Some(WarpSyncParams::WaitForTarget(target_block)) = self.warp_sync_params.as_mut() { - match target_block.poll_unpin(cx) { - Poll::Ready(Ok(target_block)) => - self.warp_sync = - Some(WarpSync::new_with_target_block(self.client.clone(), target_block)), - _ => (), - } + if let Poll::Ready(warp_sync) = + WarpSync::poll_target_block(self.client.clone(), self.warp_sync_params.as_mut(), cx) + { + self.warp_sync = Some(warp_sync) } while let Poll::Ready(result) = self.poll_pending_responses(cx) { diff --git a/client/network/sync/src/warp.rs b/client/network/sync/src/warp.rs index 5d59dfd21622f..a9f344c999934 100644 --- a/client/network/sync/src/warp.rs +++ b/client/network/sync/src/warp.rs @@ -22,18 +22,19 @@ use crate::{ schema::v1::{StateRequest, StateResponse}, state::{ImportResult, StateSync}, }; +use futures::FutureExt; use sc_client_api::ProofProvider; use sc_network_common::sync::{ message::{BlockAttributes, BlockData, BlockRequest, Direction, FromBlock}, warp::{ - EncodedProof, VerificationResult, WarpProofRequest, WarpSyncPhase, WarpSyncProgress, - WarpSyncProvider, + EncodedProof, VerificationResult, WarpProofRequest, WarpSyncParams, WarpSyncPhase, + WarpSyncProgress, WarpSyncProvider, }, }; use sp_blockchain::HeaderBackend; use sp_finality_grandpa::{AuthorityList, SetId}; use sp_runtime::traits::{Block as BlockT, Header, NumberFor, Zero}; -use std::sync::Arc; +use std::{sync::Arc, task::Poll}; enum Phase { WarpProof { @@ -88,11 +89,27 @@ where /// Create a new instance, skip the proof downloading and verification and go directly to /// target block - pub fn new_with_target_block(client: Arc, target_block: ::Header) -> Self { + fn new_with_target_block(client: Arc, target_block: ::Header) -> Self { let phase = Phase::TargetBlock(target_block); Self { client, phase, total_proof_bytes: 0 } } + /// Poll and wait for target block + pub fn poll_target_block( + client: Arc, + warp_sync_params: Option<&mut WarpSyncParams>, + cx: &mut std::task::Context, + ) -> Poll { + if let Some(WarpSyncParams::WaitForTarget(target_block)) = warp_sync_params { + return match target_block.poll_unpin(cx) { + Poll::Ready(Ok(target_block)) => + Poll::Ready(WarpSync::new_with_target_block(client.clone(), target_block)), + _ => Poll::Pending, + } + } + Poll::Pending + } + /// Validate and import a state response. pub fn import_state(&mut self, response: StateResponse) -> ImportResult { match &mut self.phase { diff --git a/client/network/test/src/lib.rs b/client/network/test/src/lib.rs index ccaebc976135b..1e75d6a0799c9 100644 --- a/client/network/test/src/lib.rs +++ b/client/network/test/src/lib.rs @@ -424,6 +424,12 @@ where self.push_blocks_at(BlockId::Hash(best_hash), count, with_tx) } + /// Get best header of the current peer + pub fn get_best_header(&mut self) -> ::Header { + let info = self.client.info(); + self.client.header(&BlockId::hash(info.best_hash)).unwrap().unwrap() + } + /// Push blocks to the peer (simplified: with or without a TX) pub fn push_headers(&mut self, count: usize) -> Vec { let best_hash = self.client.info().best_hash; diff --git a/client/service/src/builder.rs b/client/service/src/builder.rs index 5d04153b7ef0d..84185e8e1cd16 100644 --- a/client/service/src/builder.rs +++ b/client/service/src/builder.rs @@ -845,28 +845,19 @@ where let warp_sync_protocol_config = match warp_sync_params.as_ref() { Some(WarpSyncParams::WithProvider(warp_with_provider)) => { - let (_, warp_sync_protocol_config) = Some(warp_with_provider) - .map(|provider| { - // Allow both outgoing and incoming requests. - let (handler, protocol_config) = WarpSyncRequestHandler::new( - protocol_id.clone(), - client - .block_hash(0u32.into()) - .ok() - .flatten() - .expect("Genesis block exists; qed"), - config.chain_spec.fork_id(), - provider.clone(), - ); - spawn_handle.spawn( - "warp-sync-request-handler", - Some("networking"), - handler.run(), - ); - (Some(provider), Some(protocol_config)) - }) - .unwrap_or_default(); - warp_sync_protocol_config + // Allow both outgoing and incoming requests. + let (handler, protocol_config) = WarpSyncRequestHandler::new( + protocol_id.clone(), + client + .block_hash(0u32.into()) + .ok() + .flatten() + .expect("Genesis block exists; qed"), + config.chain_spec.fork_id(), + warp_with_provider.clone(), + ); + spawn_handle.spawn("warp-sync-request-handler", Some("networking"), handler.run()); + Some(protocol_config) }, _ => None, }; From 0adecb91a036c7e7fd23a618bb2d45be7f43fb31 Mon Sep 17 00:00:00 2001 From: sam elamin Date: Mon, 12 Dec 2022 14:52:12 +0000 Subject: [PATCH 15/50] Second round of PR comments --- Cargo.lock | 1 - bin/node-template/node/Cargo.toml | 1 - bin/node-template/node/src/service.rs | 3 +-- client/network/sync/src/lib.rs | 19 +++++++------------ client/network/test/src/lib.rs | 6 ------ client/network/test/src/sync.rs | 3 ++- client/service/src/builder.rs | 1 + client/service/src/lib.rs | 1 + 8 files changed, 12 insertions(+), 23 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index b9bce90cc396d..68c144b3f177e 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -4880,7 +4880,6 @@ dependencies = [ "sc-executor", "sc-finality-grandpa", "sc-keystore", - "sc-network-common", "sc-rpc", "sc-rpc-api", "sc-service", diff --git a/bin/node-template/node/Cargo.toml b/bin/node-template/node/Cargo.toml index 8caf3aa0c049f..364cfa25d3c6b 100644 --- a/bin/node-template/node/Cargo.toml +++ b/bin/node-template/node/Cargo.toml @@ -23,7 +23,6 @@ futures = { version = "0.3.21", features = ["thread-pool"]} sc-cli = { version = "0.10.0-dev", path = "../../../client/cli" } sp-core = { version = "7.0.0", path = "../../../primitives/core" } sc-executor = { version = "0.10.0-dev", path = "../../../client/executor" } -sc-network-common = { version = "0.10.0-dev", path = "../../../client/network/common" } sc-service = { version = "0.10.0-dev", path = "../../../client/service" } sc-telemetry = { version = "4.0.0-dev", path = "../../../client/telemetry" } sc-keystore = { version = "4.0.0-dev", path = "../../../client/keystore" } diff --git a/bin/node-template/node/src/service.rs b/bin/node-template/node/src/service.rs index ae6ad867583d8..d7d075efd98ab 100644 --- a/bin/node-template/node/src/service.rs +++ b/bin/node-template/node/src/service.rs @@ -6,8 +6,7 @@ use sc_consensus_aura::{ImportQueueParams, SlotProportion, StartAuraParams}; pub use sc_executor::NativeElseWasmExecutor; use sc_finality_grandpa::SharedVoterState; use sc_keystore::LocalKeystore; -use sc_network_common::sync::warp::WarpSyncParams; -use sc_service::{error::Error as ServiceError, Configuration, TaskManager}; +use sc_service::{error::Error as ServiceError, Configuration, TaskManager, WarpSyncParams}; use sc_telemetry::{Telemetry, TelemetryWorker}; use sp_consensus_aura::sr25519::AuthorityPair as AuraPair; use std::{sync::Arc, time::Duration}; diff --git a/client/network/sync/src/lib.rs b/client/network/sync/src/lib.rs index 82455e840f821..e4fab0eb138aa 100644 --- a/client/network/sync/src/lib.rs +++ b/client/network/sync/src/lib.rs @@ -998,17 +998,6 @@ where Ok(self.validate_and_queue_blocks(new_blocks, gap)) } - - fn poll_warp_sync_target_block(&mut self, cx: &mut std::task::Context) -> Poll { - if let Some(WarpSyncParams::WaitForTarget(target_block)) = self.warp_sync_params.as_mut() { - return match target_block.poll_unpin(cx) { - Poll::Ready(Ok(target_block)) => Poll::Ready(target_block), - _ => Poll::Pending, - } - } - Poll::Pending - } - fn process_block_response_data(&mut self, blocks_to_import: Result, BadPeer>) { match blocks_to_import { Ok(OnBlockData::Import(origin, blocks)) => self.import_blocks(origin, blocks), @@ -1380,6 +1369,12 @@ where if let Poll::Ready(warp_sync) = WarpSync::poll_target_block(self.client.clone(), self.warp_sync_params.as_mut(), cx) { + let target_block = warp_sync.target_block_number().unwrap(); + info!( + target: "sync", + "Waiting for target block complete. Target block reached {:?}", + target_block, + ); self.warp_sync = Some(warp_sync) } @@ -1449,8 +1444,8 @@ where roles: Roles, block_announce_validator: Box + Send>, max_parallel_downloads: u32, - metrics_registry: Option<&Registry>, warp_sync_params: Option>, + metrics_registry: Option<&Registry>, network_service: service::network::NetworkServiceHandle, import_queue: Box>, block_request_protocol_name: ProtocolName, diff --git a/client/network/test/src/lib.rs b/client/network/test/src/lib.rs index 1e75d6a0799c9..ccaebc976135b 100644 --- a/client/network/test/src/lib.rs +++ b/client/network/test/src/lib.rs @@ -424,12 +424,6 @@ where self.push_blocks_at(BlockId::Hash(best_hash), count, with_tx) } - /// Get best header of the current peer - pub fn get_best_header(&mut self) -> ::Header { - let info = self.client.info(); - self.client.header(&BlockId::hash(info.best_hash)).unwrap().unwrap() - } - /// Push blocks to the peer (simplified: with or without a TX) pub fn push_headers(&mut self, count: usize) -> Vec { let best_hash = self.client.info().best_hash; diff --git a/client/network/test/src/sync.rs b/client/network/test/src/sync.rs index eed6790e8a5ab..7a619c3e70d9b 100644 --- a/client/network/test/src/sync.rs +++ b/client/network/test/src/sync.rs @@ -1259,7 +1259,8 @@ fn warp_sync_to_target_block() { net.peer(0).push_blocks(64, false); - let target_block = net.peer(0).get_best_header(); + let info = net.peer(0).client.info(); + let target_block = net.peer(0).client.header(&BlockId::hash(info.best_hash)).unwrap().unwrap(); net.add_full_peer_with_config(FullPeerConfig { sync_mode: SyncMode::Warp, diff --git a/client/service/src/builder.rs b/client/service/src/builder.rs index 84185e8e1cd16..7890b0c5022d5 100644 --- a/client/service/src/builder.rs +++ b/client/service/src/builder.rs @@ -890,6 +890,7 @@ where warp_sync_params, config.prometheus_config.as_ref().map(|config| config.registry.clone()).as_ref(), chain_sync_network_handle, + import_queue.service(), block_request_protocol_config.name.clone(), state_request_protocol_config.name.clone(), warp_sync_protocol_config.as_ref().map(|config| config.name.clone()), diff --git a/client/service/src/lib.rs b/client/service/src/lib.rs index 1529b822ade32..e3dcf012892c3 100644 --- a/client/service/src/lib.rs +++ b/client/service/src/lib.rs @@ -73,6 +73,7 @@ pub use sc_chain_spec::{ pub use sc_consensus::ImportQueue; pub use sc_executor::NativeExecutionDispatch; +pub use sc_network_common::sync::warp::WarpSyncParams; #[doc(hidden)] pub use sc_network_transactions::config::{TransactionImport, TransactionImportFuture}; pub use sc_rpc::{ From db0a3a49dab0ce7c827e62ce0dc50155f1287299 Mon Sep 17 00:00:00 2001 From: sam elamin Date: Tue, 13 Dec 2022 10:15:18 +0000 Subject: [PATCH 16/50] Third round of pr comments --- client/network/common/src/sync/warp.rs | 3 + client/network/sync/src/lib.rs | 39 ++++++----- client/network/sync/src/warp.rs | 89 ++++++++++++++------------ 3 files changed, 70 insertions(+), 61 deletions(-) diff --git a/client/network/common/src/sync/warp.rs b/client/network/common/src/sync/warp.rs index 9e3dcf2e92bc8..89e096c88588e 100644 --- a/client/network/common/src/sync/warp.rs +++ b/client/network/common/src/sync/warp.rs @@ -73,6 +73,8 @@ pub trait WarpSyncProvider: Send + Sync { pub enum WarpSyncPhase { /// Waiting for peers to connect. AwaitingPeers, + /// Waiting for target block to be received + AwaitingTargetBlock, /// Downloading and verifying grandpa warp proofs. DownloadingWarpProofs, /// Downloading target block. @@ -89,6 +91,7 @@ impl fmt::Display for WarpSyncPhase { fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { match self { Self::AwaitingPeers => write!(f, "Waiting for peers"), + Self::AwaitingTargetBlock => write!(f, "Waiting for target block to be reached"), Self::DownloadingWarpProofs => write!(f, "Downloading finality proofs"), Self::DownloadingTargetBlock => write!(f, "Downloading target block"), Self::DownloadingState => write!(f, "Downloading state"), diff --git a/client/network/sync/src/lib.rs b/client/network/sync/src/lib.rs index e4fab0eb138aa..333671ed6a5d7 100644 --- a/client/network/sync/src/lib.rs +++ b/client/network/sync/src/lib.rs @@ -631,19 +631,15 @@ where }, ); - if let SyncMode::Warp = &self.mode { + if let SyncMode::Warp = self.mode { if self.peers.len() >= MIN_PEERS_TO_START_WARP_SYNC && self.warp_sync.is_none() { - match self.warp_sync_params.as_mut() { - Some(WarpSyncParams::WithProvider(warp_with_provider)) => { - log::debug!(target: "sync", "Starting warp state sync."); - self.warp_sync = Some(WarpSync::new( - self.client.clone(), - warp_with_provider.clone(), - )); - }, - _ => {}, - } + log::debug!(target: "sync", "Starting warp state sync."); + self.warp_sync = Some(WarpSync::new( + self.client.clone(), + self.warp_sync_params.as_mut().unwrap(), + None, + )); } } Ok(req) @@ -1366,16 +1362,17 @@ where } self.process_outbound_requests(); - if let Poll::Ready(warp_sync) = - WarpSync::poll_target_block(self.client.clone(), self.warp_sync_params.as_mut(), cx) - { - let target_block = warp_sync.target_block_number().unwrap(); - info!( - target: "sync", - "Waiting for target block complete. Target block reached {:?}", - target_block, - ); - self.warp_sync = Some(warp_sync) + if let Some(WarpSyncParams::WaitForTarget(_)) = self.warp_sync_params { + if self.warp_sync.is_none() || + self.warp_sync.as_ref().unwrap().progress().phase == + WarpSyncPhase::AwaitingTargetBlock + { + self.warp_sync = Some(WarpSync::new( + self.client.clone(), + self.warp_sync_params.as_mut().unwrap(), + Some(cx), + )) + } } while let Poll::Ready(result) = self.poll_pending_responses(cx) { diff --git a/client/network/sync/src/warp.rs b/client/network/sync/src/warp.rs index a9f344c999934..fb4d51143ea47 100644 --- a/client/network/sync/src/warp.rs +++ b/client/network/sync/src/warp.rs @@ -19,6 +19,7 @@ //! Warp sync support. use crate::{ + oneshot, schema::v1::{StateRequest, StateResponse}, state::{ImportResult, StateSync}, }; @@ -43,6 +44,7 @@ enum Phase { last_hash: B::Hash, warp_sync_provider: Arc>, }, + PendingTargetBlock, TargetBlock(B::Header), State(StateSync), } @@ -76,44 +78,52 @@ where Client: HeaderBackend + ProofProvider + 'static, { /// Create a new instance. - pub fn new(client: Arc, warp_sync_provider: Arc>) -> Self { + pub fn new( + client: Arc, + warp_sync_params: &mut WarpSyncParams, + cx: Option<&mut std::task::Context>, + ) -> Self { let last_hash = client.hash(Zero::zero()).unwrap().expect("Genesis header always exists"); - let phase = Phase::WarpProof { - set_id: 0, - authorities: warp_sync_provider.current_authorities(), - last_hash, - warp_sync_provider, - }; - Self { client, phase, total_proof_bytes: 0 } - } - - /// Create a new instance, skip the proof downloading and verification and go directly to - /// target block - fn new_with_target_block(client: Arc, target_block: ::Header) -> Self { - let phase = Phase::TargetBlock(target_block); - Self { client, phase, total_proof_bytes: 0 } + match warp_sync_params { + WarpSyncParams::WithProvider(warp_sync_provider) => { + let phase = Phase::WarpProof { + set_id: 0, + authorities: warp_sync_provider.current_authorities(), + last_hash, + warp_sync_provider: warp_sync_provider.clone(), + }; + Self { client, phase, total_proof_bytes: 0 } + }, + WarpSyncParams::WaitForTarget(block) => { + match Self::poll_target_block(block, cx.unwrap()) { + Poll::Ready(target_block) => { + let phase = Phase::TargetBlock(target_block); + Self { client, phase, total_proof_bytes: 0 } + }, + Poll::Pending => { + let phase = Phase::PendingTargetBlock; + Self { client, phase, total_proof_bytes: 0 } + }, + } + }, + } } /// Poll and wait for target block - pub fn poll_target_block( - client: Arc, - warp_sync_params: Option<&mut WarpSyncParams>, + fn poll_target_block( + target_block: &mut oneshot::Receiver, cx: &mut std::task::Context, - ) -> Poll { - if let Some(WarpSyncParams::WaitForTarget(target_block)) = warp_sync_params { - return match target_block.poll_unpin(cx) { - Poll::Ready(Ok(target_block)) => - Poll::Ready(WarpSync::new_with_target_block(client.clone(), target_block)), - _ => Poll::Pending, - } + ) -> Poll { + return match target_block.poll_unpin(cx) { + Poll::Ready(Ok(target_block)) => Poll::Ready(target_block), + _ => Poll::Pending, } - Poll::Pending } /// Validate and import a state response. pub fn import_state(&mut self, response: StateResponse) -> ImportResult { match &mut self.phase { - Phase::WarpProof { .. } | Phase::TargetBlock(_) => { + Phase::WarpProof { .. } | Phase::TargetBlock(_) | Phase::PendingTargetBlock => { log::debug!(target: "sync", "Unexpected state response"); ImportResult::BadResponse }, @@ -124,7 +134,7 @@ where /// Validate and import a warp proof response. pub fn import_warp_proof(&mut self, response: EncodedProof) -> WarpProofImportResult { match &mut self.phase { - Phase::State(_) | Phase::TargetBlock(_) => { + Phase::State(_) | Phase::TargetBlock(_) | Phase::PendingTargetBlock => { log::debug!(target: "sync", "Unexpected warp proof response"); WarpProofImportResult::BadResponse }, @@ -155,7 +165,7 @@ where /// Import the target block body. pub fn import_target_block(&mut self, block: BlockData) -> TargetBlockImportResult { match &mut self.phase { - Phase::WarpProof { .. } | Phase::State(_) => { + Phase::WarpProof { .. } | Phase::State(_) | Phase::PendingTargetBlock => { log::debug!(target: "sync", "Unexpected target block response"); TargetBlockImportResult::BadResponse }, @@ -196,8 +206,7 @@ where /// Produce next state request. pub fn next_state_request(&self) -> Option { match &self.phase { - Phase::WarpProof { .. } => None, - Phase::TargetBlock(_) => None, + Phase::WarpProof { .. } | Phase::TargetBlock(_) | Phase::PendingTargetBlock => None, Phase::State(sync) => Some(sync.next_request()), } } @@ -206,15 +215,14 @@ where pub fn next_warp_proof_request(&self) -> Option> { match &self.phase { Phase::WarpProof { last_hash, .. } => Some(WarpProofRequest { begin: *last_hash }), - Phase::TargetBlock(_) => None, - Phase::State(_) => None, + Phase::TargetBlock(_) | Phase::State(_) | Phase::PendingTargetBlock => None, } } /// Produce next target block request. pub fn next_target_block_request(&self) -> Option<(NumberFor, BlockRequest)> { match &self.phase { - Phase::WarpProof { .. } => None, + Phase::WarpProof { .. } | Phase::State(_) | Phase::PendingTargetBlock => None, Phase::TargetBlock(header) => { let request = BlockRequest:: { id: 0, @@ -226,15 +234,13 @@ where }; Some((*header.number(), request)) }, - Phase::State(_) => None, } } /// Return target block hash if it is known. pub fn target_block_hash(&self) -> Option { match &self.phase { - Phase::WarpProof { .. } => None, - Phase::TargetBlock(_) => None, + Phase::WarpProof { .. } | Phase::TargetBlock(_) | Phase::PendingTargetBlock => None, Phase::State(s) => Some(s.target()), } } @@ -242,7 +248,7 @@ where /// Return target block number if it is known. pub fn target_block_number(&self) -> Option> { match &self.phase { - Phase::WarpProof { .. } => None, + Phase::WarpProof { .. } | Phase::PendingTargetBlock => None, Phase::TargetBlock(header) => Some(*header.number()), Phase::State(s) => Some(s.target_block_num()), } @@ -251,8 +257,7 @@ where /// Check if the state is complete. pub fn is_complete(&self) -> bool { match &self.phase { - Phase::WarpProof { .. } => false, - Phase::TargetBlock(_) => false, + Phase::WarpProof { .. } | Phase::TargetBlock(_) | Phase::PendingTargetBlock => false, Phase::State(sync) => sync.is_complete(), } } @@ -268,6 +273,10 @@ where phase: WarpSyncPhase::DownloadingTargetBlock, total_bytes: self.total_proof_bytes, }, + Phase::PendingTargetBlock => WarpSyncProgress { + phase: WarpSyncPhase::AwaitingTargetBlock, + total_bytes: self.total_proof_bytes, + }, Phase::State(sync) => WarpSyncProgress { phase: if self.is_complete() { WarpSyncPhase::ImportingState From fdbe062a49146aee38dbc72ca20b33b224453b45 Mon Sep 17 00:00:00 2001 From: sam elamin Date: Tue, 13 Dec 2022 10:47:01 +0000 Subject: [PATCH 17/50] add comments to explain logic --- client/network/sync/src/warp.rs | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/client/network/sync/src/warp.rs b/client/network/sync/src/warp.rs index fb4d51143ea47..3013a9c305ccd 100644 --- a/client/network/sync/src/warp.rs +++ b/client/network/sync/src/warp.rs @@ -77,7 +77,9 @@ where B: BlockT, Client: HeaderBackend + ProofProvider + 'static, { - /// Create a new instance. + /// Create a new instance. When passing a warp sync provider we will be checking for proof and + /// authorities. Alternatively we can pass a target block when we want to skip downloading + /// proofs, in this case we will continue polling until the target block is known. pub fn new( client: Arc, warp_sync_params: &mut WarpSyncParams, From 958fe3fbfb018e5ed9239a48532044c45ada7729 Mon Sep 17 00:00:00 2001 From: Sam Elamin Date: Wed, 14 Dec 2022 19:34:06 +0000 Subject: [PATCH 18/50] Update client/network/sync/src/lib.rs MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Co-authored-by: Bastian Köcher --- client/network/sync/src/lib.rs | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/client/network/sync/src/lib.rs b/client/network/sync/src/lib.rs index 333671ed6a5d7..976fbea8c9372 100644 --- a/client/network/sync/src/lib.rs +++ b/client/network/sync/src/lib.rs @@ -318,7 +318,9 @@ pub struct ChainSync { state_sync: Option>, /// Warp sync in progress, if any. warp_sync: Option>, - /// Warp sync params. Provider vs TargetBlock. + /// Warp sync params. + /// + /// Will be `None` after `self.warp_sync` is `Some(_)`. warp_sync_params: Option>, /// Enable importing existing blocks. This is used used after the state download to /// catch up to the latest state while re-importing blocks. From 1eac0da5e92862bb17bab713be4ebfe3d130cff9 Mon Sep 17 00:00:00 2001 From: Sam Elamin Date: Wed, 14 Dec 2022 19:35:09 +0000 Subject: [PATCH 19/50] Update client/network/sync/src/lib.rs MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Co-authored-by: Bastian Köcher --- client/network/sync/src/lib.rs | 12 +++++++----- 1 file changed, 7 insertions(+), 5 deletions(-) diff --git a/client/network/sync/src/lib.rs b/client/network/sync/src/lib.rs index 976fbea8c9372..e8cf13a6a7d8d 100644 --- a/client/network/sync/src/lib.rs +++ b/client/network/sync/src/lib.rs @@ -637,11 +637,13 @@ where if self.peers.len() >= MIN_PEERS_TO_START_WARP_SYNC && self.warp_sync.is_none() { log::debug!(target: "sync", "Starting warp state sync."); - self.warp_sync = Some(WarpSync::new( - self.client.clone(), - self.warp_sync_params.as_mut().unwrap(), - None, - )); + if let Some(params) = self.warp_sync_params.take() { + self.warp_sync = Some(WarpSync::new( + self.client.clone(), + params, + None, + )); + } } } Ok(req) From 312ed8c97777a96c47cf1a2a7d21ebc09e59f287 Mon Sep 17 00:00:00 2001 From: Sam Elamin Date: Wed, 14 Dec 2022 19:35:19 +0000 Subject: [PATCH 20/50] Update client/network/sync/src/warp.rs MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Co-authored-by: Bastian Köcher --- client/network/sync/src/warp.rs | 27 ++++++++++++++++++++------- 1 file changed, 20 insertions(+), 7 deletions(-) diff --git a/client/network/sync/src/warp.rs b/client/network/sync/src/warp.rs index 3013a9c305ccd..f78ec9487d8f5 100644 --- a/client/network/sync/src/warp.rs +++ b/client/network/sync/src/warp.rs @@ -111,15 +111,28 @@ where } } - /// Poll and wait for target block - fn poll_target_block( - target_block: &mut oneshot::Receiver, + /// Poll to make progress. + /// + /// This only makes progress when `phase = Phase::PendingTargetBlock` and the pending block was send. + fn poll( + &mut self, cx: &mut std::task::Context, - ) -> Poll { - return match target_block.poll_unpin(cx) { - Poll::Ready(Ok(target_block)) => Poll::Ready(target_block), - _ => Poll::Pending, + ) -> Poll<()> { + let new_phase = if let Phase::PendingTarget { target_block } = &mut self.phase { + if let Poll::Ready(target_block) = target_block.poll_unpin(cx) { + Some(Phase::TargetBlock(target_block)) + } else { + None + } + } else { + None + }; + + if let Some(new_phase) = phase { + self.phase = new_phase; } + + Poll::Pending } /// Validate and import a state response. From aae4229519910838bcdbdb823a2610338fbeffd2 Mon Sep 17 00:00:00 2001 From: Sam Elamin Date: Wed, 14 Dec 2022 19:35:33 +0000 Subject: [PATCH 21/50] Update client/network/sync/src/warp.rs MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Co-authored-by: Bastian Köcher --- client/network/sync/src/warp.rs | 11 +---------- 1 file changed, 1 insertion(+), 10 deletions(-) diff --git a/client/network/sync/src/warp.rs b/client/network/sync/src/warp.rs index f78ec9487d8f5..fd1eb4cb5fb16 100644 --- a/client/network/sync/src/warp.rs +++ b/client/network/sync/src/warp.rs @@ -97,16 +97,7 @@ where Self { client, phase, total_proof_bytes: 0 } }, WarpSyncParams::WaitForTarget(block) => { - match Self::poll_target_block(block, cx.unwrap()) { - Poll::Ready(target_block) => { - let phase = Phase::TargetBlock(target_block); - Self { client, phase, total_proof_bytes: 0 } - }, - Poll::Pending => { - let phase = Phase::PendingTargetBlock; - Self { client, phase, total_proof_bytes: 0 } - }, - } + Self { client, phase: Phase::PendingTargetBlock { target_block: block }, total_proof_bytes: 0 } }, } } From eaa6be380c3b60aa3f20aea3b2a526ce7342afa9 Mon Sep 17 00:00:00 2001 From: Sam Elamin Date: Wed, 14 Dec 2022 19:35:42 +0000 Subject: [PATCH 22/50] Update client/network/sync/src/warp.rs MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Co-authored-by: Bastian Köcher --- client/network/sync/src/warp.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/client/network/sync/src/warp.rs b/client/network/sync/src/warp.rs index fd1eb4cb5fb16..979b80db7c5d1 100644 --- a/client/network/sync/src/warp.rs +++ b/client/network/sync/src/warp.rs @@ -44,7 +44,7 @@ enum Phase { last_hash: B::Hash, warp_sync_provider: Arc>, }, - PendingTargetBlock, + PendingTargetBlock { target_block: oneshot::Receiver }, TargetBlock(B::Header), State(StateSync), } From c4602b20974006fc041fb5b40b7fdd7098afbdd3 Mon Sep 17 00:00:00 2001 From: Sam Elamin Date: Wed, 14 Dec 2022 19:35:57 +0000 Subject: [PATCH 23/50] Update client/network/sync/src/lib.rs MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Co-authored-by: Bastian Köcher --- client/network/sync/src/lib.rs | 13 ++----------- 1 file changed, 2 insertions(+), 11 deletions(-) diff --git a/client/network/sync/src/lib.rs b/client/network/sync/src/lib.rs index e8cf13a6a7d8d..b10c542a0bb82 100644 --- a/client/network/sync/src/lib.rs +++ b/client/network/sync/src/lib.rs @@ -1366,17 +1366,8 @@ where } self.process_outbound_requests(); - if let Some(WarpSyncParams::WaitForTarget(_)) = self.warp_sync_params { - if self.warp_sync.is_none() || - self.warp_sync.as_ref().unwrap().progress().phase == - WarpSyncPhase::AwaitingTargetBlock - { - self.warp_sync = Some(WarpSync::new( - self.client.clone(), - self.warp_sync_params.as_mut().unwrap(), - Some(cx), - )) - } + if let Some(warp_sync) = &mut self.warp_sync { + let _ = warp_sync.poll(cx); } while let Poll::Ready(result) = self.poll_pending_responses(cx) { From a692ca362f10d7ca34f47a94a3fa14c286ba1cf0 Mon Sep 17 00:00:00 2001 From: sam elamin Date: Wed, 14 Dec 2022 19:57:37 +0000 Subject: [PATCH 24/50] code refactor based on last PR comments --- client/informant/src/display.rs | 5 +++ client/network/sync/src/lib.rs | 6 +--- client/network/sync/src/warp.rs | 57 +++++++++++++++++---------------- client/network/test/src/sync.rs | 9 ++++-- 4 files changed, 42 insertions(+), 35 deletions(-) diff --git a/client/informant/src/display.rs b/client/informant/src/display.rs index 3d585a9985134..4da46ebb65edb 100644 --- a/client/informant/src/display.rs +++ b/client/informant/src/display.rs @@ -100,6 +100,11 @@ impl InformantDisplay { _, Some(WarpSyncProgress { phase: WarpSyncPhase::DownloadingBlocks(n), .. }), ) => ("⏩", "Block history".into(), format!(", #{}", n)), + ( + _, + _, + Some(WarpSyncProgress { phase: WarpSyncPhase::AwaitingTargetBlock, .. }), + ) => ("⏩", "Waiting for pending target block".into(), "".into()), (_, _, Some(warp)) => ( "⏩", "Warping".into(), diff --git a/client/network/sync/src/lib.rs b/client/network/sync/src/lib.rs index b10c542a0bb82..7a5d7cfef0e8c 100644 --- a/client/network/sync/src/lib.rs +++ b/client/network/sync/src/lib.rs @@ -638,11 +638,7 @@ where { log::debug!(target: "sync", "Starting warp state sync."); if let Some(params) = self.warp_sync_params.take() { - self.warp_sync = Some(WarpSync::new( - self.client.clone(), - params, - None, - )); + self.warp_sync = Some(WarpSync::new(self.client.clone(), params)); } } } diff --git a/client/network/sync/src/warp.rs b/client/network/sync/src/warp.rs index 979b80db7c5d1..d06f5652286e6 100644 --- a/client/network/sync/src/warp.rs +++ b/client/network/sync/src/warp.rs @@ -44,7 +44,9 @@ enum Phase { last_hash: B::Hash, warp_sync_provider: Arc>, }, - PendingTargetBlock { target_block: oneshot::Receiver }, + PendingTargetBlock { + target_block: oneshot::Receiver, + }, TargetBlock(B::Header), State(StateSync), } @@ -80,11 +82,7 @@ where /// Create a new instance. When passing a warp sync provider we will be checking for proof and /// authorities. Alternatively we can pass a target block when we want to skip downloading /// proofs, in this case we will continue polling until the target block is known. - pub fn new( - client: Arc, - warp_sync_params: &mut WarpSyncParams, - cx: Option<&mut std::task::Context>, - ) -> Self { + pub fn new(client: Arc, warp_sync_params: WarpSyncParams) -> Self { let last_hash = client.hash(Zero::zero()).unwrap().expect("Genesis header always exists"); match warp_sync_params { WarpSyncParams::WithProvider(warp_sync_provider) => { @@ -96,21 +94,21 @@ where }; Self { client, phase, total_proof_bytes: 0 } }, - WarpSyncParams::WaitForTarget(block) => { - Self { client, phase: Phase::PendingTargetBlock { target_block: block }, total_proof_bytes: 0 } + WarpSyncParams::WaitForTarget(block) => Self { + client, + phase: Phase::PendingTargetBlock { target_block: block }, + total_proof_bytes: 0, }, } } /// Poll to make progress. /// - /// This only makes progress when `phase = Phase::PendingTargetBlock` and the pending block was send. - fn poll( - &mut self, - cx: &mut std::task::Context, - ) -> Poll<()> { - let new_phase = if let Phase::PendingTarget { target_block } = &mut self.phase { - if let Poll::Ready(target_block) = target_block.poll_unpin(cx) { + /// This only makes progress when `phase = Phase::PendingTargetBlock` and the pending block was + /// send. + pub fn poll(&mut self, cx: &mut std::task::Context) -> Poll<()> { + let new_phase = if let Phase::PendingTargetBlock { target_block } = &mut self.phase { + if let Poll::Ready(Ok(target_block)) = target_block.poll_unpin(cx) { Some(Phase::TargetBlock(target_block)) } else { None @@ -118,18 +116,18 @@ where } else { None }; - - if let Some(new_phase) = phase { + + if let Some(new_phase) = new_phase { self.phase = new_phase; } - + Poll::Pending } /// Validate and import a state response. pub fn import_state(&mut self, response: StateResponse) -> ImportResult { match &mut self.phase { - Phase::WarpProof { .. } | Phase::TargetBlock(_) | Phase::PendingTargetBlock => { + Phase::WarpProof { .. } | Phase::TargetBlock(_) | Phase::PendingTargetBlock { .. } => { log::debug!(target: "sync", "Unexpected state response"); ImportResult::BadResponse }, @@ -140,7 +138,7 @@ where /// Validate and import a warp proof response. pub fn import_warp_proof(&mut self, response: EncodedProof) -> WarpProofImportResult { match &mut self.phase { - Phase::State(_) | Phase::TargetBlock(_) | Phase::PendingTargetBlock => { + Phase::State(_) | Phase::TargetBlock(_) | Phase::PendingTargetBlock { .. } => { log::debug!(target: "sync", "Unexpected warp proof response"); WarpProofImportResult::BadResponse }, @@ -171,7 +169,7 @@ where /// Import the target block body. pub fn import_target_block(&mut self, block: BlockData) -> TargetBlockImportResult { match &mut self.phase { - Phase::WarpProof { .. } | Phase::State(_) | Phase::PendingTargetBlock => { + Phase::WarpProof { .. } | Phase::State(_) | Phase::PendingTargetBlock { .. } => { log::debug!(target: "sync", "Unexpected target block response"); TargetBlockImportResult::BadResponse }, @@ -212,7 +210,8 @@ where /// Produce next state request. pub fn next_state_request(&self) -> Option { match &self.phase { - Phase::WarpProof { .. } | Phase::TargetBlock(_) | Phase::PendingTargetBlock => None, + Phase::WarpProof { .. } | Phase::TargetBlock(_) | Phase::PendingTargetBlock { .. } => + None, Phase::State(sync) => Some(sync.next_request()), } } @@ -221,14 +220,14 @@ where pub fn next_warp_proof_request(&self) -> Option> { match &self.phase { Phase::WarpProof { last_hash, .. } => Some(WarpProofRequest { begin: *last_hash }), - Phase::TargetBlock(_) | Phase::State(_) | Phase::PendingTargetBlock => None, + Phase::TargetBlock(_) | Phase::State(_) | Phase::PendingTargetBlock { .. } => None, } } /// Produce next target block request. pub fn next_target_block_request(&self) -> Option<(NumberFor, BlockRequest)> { match &self.phase { - Phase::WarpProof { .. } | Phase::State(_) | Phase::PendingTargetBlock => None, + Phase::WarpProof { .. } | Phase::State(_) | Phase::PendingTargetBlock { .. } => None, Phase::TargetBlock(header) => { let request = BlockRequest:: { id: 0, @@ -246,7 +245,8 @@ where /// Return target block hash if it is known. pub fn target_block_hash(&self) -> Option { match &self.phase { - Phase::WarpProof { .. } | Phase::TargetBlock(_) | Phase::PendingTargetBlock => None, + Phase::WarpProof { .. } | Phase::TargetBlock(_) | Phase::PendingTargetBlock { .. } => + None, Phase::State(s) => Some(s.target()), } } @@ -254,7 +254,7 @@ where /// Return target block number if it is known. pub fn target_block_number(&self) -> Option> { match &self.phase { - Phase::WarpProof { .. } | Phase::PendingTargetBlock => None, + Phase::WarpProof { .. } | Phase::PendingTargetBlock { .. } => None, Phase::TargetBlock(header) => Some(*header.number()), Phase::State(s) => Some(s.target_block_num()), } @@ -263,7 +263,8 @@ where /// Check if the state is complete. pub fn is_complete(&self) -> bool { match &self.phase { - Phase::WarpProof { .. } | Phase::TargetBlock(_) | Phase::PendingTargetBlock => false, + Phase::WarpProof { .. } | Phase::TargetBlock(_) | Phase::PendingTargetBlock { .. } => + false, Phase::State(sync) => sync.is_complete(), } } @@ -279,7 +280,7 @@ where phase: WarpSyncPhase::DownloadingTargetBlock, total_bytes: self.total_proof_bytes, }, - Phase::PendingTargetBlock => WarpSyncProgress { + Phase::PendingTargetBlock { .. } => WarpSyncProgress { phase: WarpSyncPhase::AwaitingTargetBlock, total_bytes: self.total_proof_bytes, }, diff --git a/client/network/test/src/sync.rs b/client/network/test/src/sync.rs index 7a619c3e70d9b..624e856dd7b29 100644 --- a/client/network/test/src/sync.rs +++ b/client/network/test/src/sync.rs @@ -1258,6 +1258,12 @@ fn warp_sync_to_target_block() { let mut net = TestNet::new(runtime.handle().clone(), 3); net.peer(0).push_blocks(64, false); + net.add_full_peer_with_config(Default::default()); + net.add_full_peer_with_config(Default::default()); + net.add_full_peer_with_config(Default::default()); + + net.peer(1).push_blocks(64, false); + net.peer(2).push_blocks(64, false); let info = net.peer(0).client.info(); let target_block = net.peer(0).client.header(&BlockId::hash(info.best_hash)).unwrap().unwrap(); @@ -1270,8 +1276,7 @@ fn warp_sync_to_target_block() { // Wait for peer 1 to sync state. runtime.block_on(net.wait_until_sync()); - assert!(!net.peer(3).client().has_state_at(&BlockId::Number(1))); - assert!(net.peer(3).client().has_state_at(&BlockId::Number(64))); + assert!(net.peer(5).client().has_state_at(&BlockId::Number(64))); } #[tokio::test(flavor = "multi_thread", worker_threads = 2)] From a198affa31d9eec3dd13367f2006ccda781bd365 Mon Sep 17 00:00:00 2001 From: sam elamin Date: Wed, 14 Dec 2022 22:46:39 +0000 Subject: [PATCH 25/50] move warp sync polling before `process_outbound_requests` Add error message if target block fails to be retreived --- client/network/sync/src/lib.rs | 3 ++- client/network/sync/src/warp.rs | 4 ++++ 2 files changed, 6 insertions(+), 1 deletion(-) diff --git a/client/network/sync/src/lib.rs b/client/network/sync/src/lib.rs index 7a5d7cfef0e8c..1f711cdcfd27b 100644 --- a/client/network/sync/src/lib.rs +++ b/client/network/sync/src/lib.rs @@ -1360,12 +1360,13 @@ where }, } } - self.process_outbound_requests(); if let Some(warp_sync) = &mut self.warp_sync { let _ = warp_sync.poll(cx); } + self.process_outbound_requests(); + while let Poll::Ready(result) = self.poll_pending_responses(cx) { match result { ImportResult::BlockImport(origin, blocks) => self.import_blocks(origin, blocks), diff --git a/client/network/sync/src/warp.rs b/client/network/sync/src/warp.rs index d06f5652286e6..6c0c4c26b9af1 100644 --- a/client/network/sync/src/warp.rs +++ b/client/network/sync/src/warp.rs @@ -24,6 +24,7 @@ use crate::{ state::{ImportResult, StateSync}, }; use futures::FutureExt; +use log::error; use sc_client_api::ProofProvider; use sc_network_common::sync::{ message::{BlockAttributes, BlockData, BlockRequest, Direction, FromBlock}, @@ -110,6 +111,9 @@ where let new_phase = if let Phase::PendingTargetBlock { target_block } = &mut self.phase { if let Poll::Ready(Ok(target_block)) = target_block.poll_unpin(cx) { Some(Phase::TargetBlock(target_block)) + } else if let Poll::Ready(Err(e)) = target_block.poll_unpin(cx) { + error!(target: "warp_sync", "Failed to get target block. Error: {:?}",e); + None } else { None } From c7112bca8328a9d192f7f109a89ea20c32fced36 Mon Sep 17 00:00:00 2001 From: Sam Elamin Date: Thu, 15 Dec 2022 20:18:00 +0400 Subject: [PATCH 26/50] Update client/network/sync/src/warp.rs Co-authored-by: Arkadiy Paronyan --- client/network/sync/src/warp.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/client/network/sync/src/warp.rs b/client/network/sync/src/warp.rs index 6c0c4c26b9af1..575ede401a2d5 100644 --- a/client/network/sync/src/warp.rs +++ b/client/network/sync/src/warp.rs @@ -112,7 +112,7 @@ where if let Poll::Ready(Ok(target_block)) = target_block.poll_unpin(cx) { Some(Phase::TargetBlock(target_block)) } else if let Poll::Ready(Err(e)) = target_block.poll_unpin(cx) { - error!(target: "warp_sync", "Failed to get target block. Error: {:?}",e); + error!(target: "sync", "Failed to get target block. Error: {:?}",e); None } else { None From 226c8948f683384e60fc43617553113af2cfd9ed Mon Sep 17 00:00:00 2001 From: Sam Elamin Date: Thu, 15 Dec 2022 20:18:17 +0400 Subject: [PATCH 27/50] Update client/network/sync/src/lib.rs MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Co-authored-by: Bastian Köcher --- client/network/sync/src/lib.rs | 2 ++ 1 file changed, 2 insertions(+) diff --git a/client/network/sync/src/lib.rs b/client/network/sync/src/lib.rs index 1f711cdcfd27b..ed6a00a745a84 100644 --- a/client/network/sync/src/lib.rs +++ b/client/network/sync/src/lib.rs @@ -1361,6 +1361,8 @@ where } } + /// Should be called before `process_outbound_requests` to ensure + /// that a potential target block is directly leading to requests. if let Some(warp_sync) = &mut self.warp_sync { let _ = warp_sync.poll(cx); } From b5dac7926d8c88cc4a2ac32d6b3c6aa89e3cdcbe Mon Sep 17 00:00:00 2001 From: Sam Elamin Date: Thu, 15 Dec 2022 20:18:30 +0400 Subject: [PATCH 28/50] Update client/network/sync/src/warp.rs MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Co-authored-by: Bastian Köcher --- client/network/sync/src/warp.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/client/network/sync/src/warp.rs b/client/network/sync/src/warp.rs index 575ede401a2d5..68540b0ee7c29 100644 --- a/client/network/sync/src/warp.rs +++ b/client/network/sync/src/warp.rs @@ -80,7 +80,7 @@ where B: BlockT, Client: HeaderBackend + ProofProvider + 'static, { - /// Create a new instance. When passing a warp sync provider we will be checking for proof and + /// Create a new instance. When passing a warp sync provider we will be checking for proof and /// authorities. Alternatively we can pass a target block when we want to skip downloading /// proofs, in this case we will continue polling until the target block is known. pub fn new(client: Arc, warp_sync_params: WarpSyncParams) -> Self { From 06cdb67cd6f188e3ce39aabc87c1e93087ea15c4 Mon Sep 17 00:00:00 2001 From: sam elamin Date: Thu, 15 Dec 2022 19:09:57 +0100 Subject: [PATCH 29/50] fmt after code suggestions --- client/network/sync/src/lib.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/client/network/sync/src/lib.rs b/client/network/sync/src/lib.rs index ed6a00a745a84..ffc0edaf31e72 100644 --- a/client/network/sync/src/lib.rs +++ b/client/network/sync/src/lib.rs @@ -1361,8 +1361,8 @@ where } } - /// Should be called before `process_outbound_requests` to ensure - /// that a potential target block is directly leading to requests. + // Should be called before `process_outbound_requests` to ensure + // that a potential target block is directly leading to requests. if let Some(warp_sync) = &mut self.warp_sync { let _ = warp_sync.poll(cx); } From de295794f9fa6e86b1d10b077b9b76102c6ac03f Mon Sep 17 00:00:00 2001 From: sam elamin Date: Tue, 20 Dec 2022 22:40:51 +0400 Subject: [PATCH 30/50] rebase changes --- client/network/test/src/sync.rs | 18 ++++++------------ 1 file changed, 6 insertions(+), 12 deletions(-) diff --git a/client/network/test/src/sync.rs b/client/network/test/src/sync.rs index 624e856dd7b29..6ac3913467eea 100644 --- a/client/network/test/src/sync.rs +++ b/client/network/test/src/sync.rs @@ -1250,14 +1250,10 @@ async fn warp_sync() { } #[tokio::test(flavor = "multi_thread", worker_threads = 2)] -#[test] -fn warp_sync_to_target_block() { +async fn warp_sync_to_target_block() { sp_tracing::try_init_simple(); - let runtime = Runtime::new().unwrap(); + let mut net = TestNet::new(0); // Create 3 synced peers and 1 peer trying to warp sync. - let mut net = TestNet::new(runtime.handle().clone(), 3); - - net.peer(0).push_blocks(64, false); net.add_full_peer_with_config(Default::default()); net.add_full_peer_with_config(Default::default()); net.add_full_peer_with_config(Default::default()); @@ -1266,7 +1262,7 @@ fn warp_sync_to_target_block() { net.peer(2).push_blocks(64, false); let info = net.peer(0).client.info(); - let target_block = net.peer(0).client.header(&BlockId::hash(info.best_hash)).unwrap().unwrap(); + let target_block = net.peer(0).client.header(info.best_hash).unwrap().unwrap(); net.add_full_peer_with_config(FullPeerConfig { sync_mode: SyncMode::Warp, @@ -1274,14 +1270,12 @@ fn warp_sync_to_target_block() { ..Default::default() }); - // Wait for peer 1 to sync state. - runtime.block_on(net.wait_until_sync()); - assert!(net.peer(5).client().has_state_at(&BlockId::Number(64))); + net.run_until_sync().await; + assert!(net.peer(3).client().has_state_at(&BlockId::Number(64))); } #[tokio::test(flavor = "multi_thread", worker_threads = 2)] -#[test] -fn syncs_huge_blocks() { +async fn syncs_huge_blocks() { use sp_core::storage::well_known_keys::HEAP_PAGES; use sp_runtime::codec::Encode; use substrate_test_runtime_client::BlockBuilderExt; From c0ecb220d66dd8e7b1a5ee29831b776f4f18d024 Mon Sep 17 00:00:00 2001 From: sam elamin Date: Fri, 13 Jan 2023 17:04:49 +0000 Subject: [PATCH 31/50] Bring down the node if the target block fails to return --- bin/node-template/node/src/service.rs | 2 +- bin/node/cli/src/service.rs | 2 +- client/network/sync/src/warp.rs | 10 +++- client/service/src/builder.rs | 74 +++++++++++++++++++-------- 4 files changed, 63 insertions(+), 25 deletions(-) diff --git a/bin/node-template/node/src/service.rs b/bin/node-template/node/src/service.rs index d7d075efd98ab..2a37bdd893b38 100644 --- a/bin/node-template/node/src/service.rs +++ b/bin/node-template/node/src/service.rs @@ -197,7 +197,7 @@ pub fn new_full(mut config: Configuration) -> Result config: &config, client: client.clone(), transaction_pool: transaction_pool.clone(), - spawn_handle: task_manager.spawn_handle(), + spawn_handle: Box::new(task_manager.spawn_essential_handle()), import_queue, block_announce_validator_builder: None, warp_sync_params: Some(WarpSyncParams::WithProvider(warp_sync)), diff --git a/bin/node/cli/src/service.rs b/bin/node/cli/src/service.rs index 83d9651af24c4..f120d2b567b7e 100644 --- a/bin/node/cli/src/service.rs +++ b/bin/node/cli/src/service.rs @@ -361,7 +361,7 @@ pub fn new_full_base( config: &config, client: client.clone(), transaction_pool: transaction_pool.clone(), - spawn_handle: task_manager.spawn_handle(), + spawn_handle: Box::new(task_manager.spawn_essential_handle()), import_queue, block_announce_validator_builder: None, warp_sync_params: Some(WarpSyncParams::WithProvider(warp_sync)), diff --git a/client/network/sync/src/warp.rs b/client/network/sync/src/warp.rs index 68540b0ee7c29..1545e6963647d 100644 --- a/client/network/sync/src/warp.rs +++ b/client/network/sync/src/warp.rs @@ -23,7 +23,7 @@ use crate::{ schema::v1::{StateRequest, StateResponse}, state::{ImportResult, StateSync}, }; -use futures::FutureExt; +use futures::{executor::block_on, FutureExt}; use log::error; use sc_client_api::ProofProvider; use sc_network_common::sync::{ @@ -112,7 +112,13 @@ where if let Poll::Ready(Ok(target_block)) = target_block.poll_unpin(cx) { Some(Phase::TargetBlock(target_block)) } else if let Poll::Ready(Err(e)) = target_block.poll_unpin(cx) { - error!(target: "sync", "Failed to get target block. Error: {:?}",e); + block_on(async move { + error!(target: "sync", "Failed to get target block. Error: {:?}",e); + // This `return` might seem unnecessary, but we don't want to make it look like + // everything is working as normal even though the target block failed to be + // retrieved + return + }); None } else { None diff --git a/client/service/src/builder.rs b/client/service/src/builder.rs index 7890b0c5022d5..aba6ee2a06cc8 100644 --- a/client/service/src/builder.rs +++ b/client/service/src/builder.rs @@ -68,7 +68,7 @@ use sp_blockchain::{HeaderBackend, HeaderMetadata}; use sp_consensus::block_validation::{ BlockAnnounceValidator, Chain, DefaultBlockAnnounceValidator, }; -use sp_core::traits::{CodeExecutor, SpawnNamed}; +use sp_core::traits::{CodeExecutor, SpawnEssentialNamed, SpawnNamed}; use sp_keystore::{CryptoStore, SyncCryptoStore, SyncCryptoStorePtr}; use sp_runtime::traits::{Block as BlockT, BlockIdTo, NumberFor, Zero}; use std::{str::FromStr, sync::Arc, time::SystemTime}; @@ -751,7 +751,7 @@ pub struct BuildNetworkParams<'a, TBl: BlockT, TExPool, TImpQu, TCl> { /// A shared transaction pool. pub transaction_pool: Arc, /// A handle for spawning tasks. - pub spawn_handle: SpawnTaskHandle, + pub spawn_handle: Box, /// An import queue. pub import_queue: TImpQu, /// A block announce validator builder. @@ -827,7 +827,11 @@ where config.network.default_peers_set.in_peers as usize + config.network.default_peers_set.out_peers as usize, ); - spawn_handle.spawn("block-request-handler", Some("networking"), handler.run()); + spawn_handle.spawn_essential( + "block-request-handler", + Some("networking"), + Box::pin(handler.run()), + ); protocol_config }; @@ -839,7 +843,11 @@ where client.clone(), config.network.default_peers_set_num_full as usize, ); - spawn_handle.spawn("state-request-handler", Some("networking"), handler.run()); + spawn_handle.spawn_essential( + "state-request-handler", + Some("networking"), + Box::pin(handler.run()), + ); protocol_config }; @@ -856,7 +864,11 @@ where config.chain_spec.fork_id(), warp_with_provider.clone(), ); - spawn_handle.spawn("warp-sync-request-handler", Some("networking"), handler.run()); + spawn_handle.spawn_essential( + "warp-sync-request-handler", + Some("networking"), + Box::pin(handler.run()), + ); Some(protocol_config) }, _ => None, @@ -869,7 +881,11 @@ where config.chain_spec.fork_id(), client.clone(), ); - spawn_handle.spawn("light-client-request-handler", Some("networking"), handler.run()); + spawn_handle.spawn_essential( + "light-client-request-handler", + Some("networking"), + Box::pin(handler.run()), + ); protocol_config }; @@ -898,7 +914,11 @@ where request_response_protocol_configs.push(config.network.ipfs_server.then(|| { let (handler, protocol_config) = BitswapRequestHandler::new(client.clone()); - spawn_handle.spawn("bitswap-request-handler", Some("networking"), handler.run()); + spawn_handle.spawn_essential( + "bitswap-request-handler", + Some("networking"), + Box::pin(handler.run()), + ); protocol_config })); @@ -907,7 +927,7 @@ where executor: { let spawn_handle = Clone::clone(&spawn_handle); Box::new(move |fut| { - spawn_handle.spawn("libp2p-node", Some("networking"), fut); + spawn_handle.spawn_essential("libp2p-node", Some("networking"), fut); }) }, network_config: config.network.clone(), @@ -955,13 +975,21 @@ where config.prometheus_config.as_ref().map(|config| &config.registry), )?; - spawn_handle.spawn("network-transactions-handler", Some("networking"), tx_handler.run()); - spawn_handle.spawn( + spawn_handle.spawn_essential( + "network-transactions-handler", + Some("networking"), + Box::pin(tx_handler.run()), + ); + spawn_handle.spawn_essential( "chain-sync-network-service-provider", Some("networking"), - chain_sync_network_provider.run(network.clone()), + Box::pin(chain_sync_network_provider.run(network.clone())), + ); + spawn_handle.spawn_essential( + "import-queue", + None, + import_queue.run(Box::new(chain_sync_service)), ); - spawn_handle.spawn("import-queue", None, import_queue.run(Box::new(chain_sync_service))); let (system_rpc_tx, system_rpc_rx) = tracing_unbounded("mpsc_system_rpc", 10_000); @@ -997,18 +1025,22 @@ where // issue, and ideally we would like to fix the network future to take as little time as // possible, but we also take the extra harm-prevention measure to execute the networking // future using `spawn_blocking`. - spawn_handle.spawn_blocking("network-worker", Some("networking"), async move { - if network_start_rx.await.is_err() { - log::warn!( + spawn_handle.spawn_essential_blocking( + "network-worker", + Some("networking"), + Box::pin(async move { + if network_start_rx.await.is_err() { + log::warn!( "The NetworkStart returned as part of `build_network` has been silently dropped" ); - // This `return` might seem unnecessary, but we don't want to make it look like - // everything is working as normal even though the user is clearly misusing the API. - return - } + // This `return` might seem unnecessary, but we don't want to make it look like + // everything is working as normal even though the user is clearly misusing the API. + return + } - future.await - }); + future.await + }), + ); Ok((network, system_rpc_tx, tx_handler_controller, NetworkStarter(network_start_tx))) } From 5ed2e511d3df524bd050c7cb9a22ecf686066f95 Mon Sep 17 00:00:00 2001 From: sam elamin Date: Fri, 20 Jan 2023 22:10:30 +0000 Subject: [PATCH 32/50] Revert "Bring down the node if the target block fails to return" This reverts commit c0ecb220d66dd8e7b1a5ee29831b776f4f18d024. --- bin/node-template/node/src/service.rs | 2 +- bin/node/cli/src/service.rs | 2 +- client/network/sync/src/warp.rs | 10 +--- client/service/src/builder.rs | 74 ++++++++------------------- 4 files changed, 25 insertions(+), 63 deletions(-) diff --git a/bin/node-template/node/src/service.rs b/bin/node-template/node/src/service.rs index 2a37bdd893b38..d7d075efd98ab 100644 --- a/bin/node-template/node/src/service.rs +++ b/bin/node-template/node/src/service.rs @@ -197,7 +197,7 @@ pub fn new_full(mut config: Configuration) -> Result config: &config, client: client.clone(), transaction_pool: transaction_pool.clone(), - spawn_handle: Box::new(task_manager.spawn_essential_handle()), + spawn_handle: task_manager.spawn_handle(), import_queue, block_announce_validator_builder: None, warp_sync_params: Some(WarpSyncParams::WithProvider(warp_sync)), diff --git a/bin/node/cli/src/service.rs b/bin/node/cli/src/service.rs index f120d2b567b7e..83d9651af24c4 100644 --- a/bin/node/cli/src/service.rs +++ b/bin/node/cli/src/service.rs @@ -361,7 +361,7 @@ pub fn new_full_base( config: &config, client: client.clone(), transaction_pool: transaction_pool.clone(), - spawn_handle: Box::new(task_manager.spawn_essential_handle()), + spawn_handle: task_manager.spawn_handle(), import_queue, block_announce_validator_builder: None, warp_sync_params: Some(WarpSyncParams::WithProvider(warp_sync)), diff --git a/client/network/sync/src/warp.rs b/client/network/sync/src/warp.rs index 1545e6963647d..68540b0ee7c29 100644 --- a/client/network/sync/src/warp.rs +++ b/client/network/sync/src/warp.rs @@ -23,7 +23,7 @@ use crate::{ schema::v1::{StateRequest, StateResponse}, state::{ImportResult, StateSync}, }; -use futures::{executor::block_on, FutureExt}; +use futures::FutureExt; use log::error; use sc_client_api::ProofProvider; use sc_network_common::sync::{ @@ -112,13 +112,7 @@ where if let Poll::Ready(Ok(target_block)) = target_block.poll_unpin(cx) { Some(Phase::TargetBlock(target_block)) } else if let Poll::Ready(Err(e)) = target_block.poll_unpin(cx) { - block_on(async move { - error!(target: "sync", "Failed to get target block. Error: {:?}",e); - // This `return` might seem unnecessary, but we don't want to make it look like - // everything is working as normal even though the target block failed to be - // retrieved - return - }); + error!(target: "sync", "Failed to get target block. Error: {:?}",e); None } else { None diff --git a/client/service/src/builder.rs b/client/service/src/builder.rs index aba6ee2a06cc8..7890b0c5022d5 100644 --- a/client/service/src/builder.rs +++ b/client/service/src/builder.rs @@ -68,7 +68,7 @@ use sp_blockchain::{HeaderBackend, HeaderMetadata}; use sp_consensus::block_validation::{ BlockAnnounceValidator, Chain, DefaultBlockAnnounceValidator, }; -use sp_core::traits::{CodeExecutor, SpawnEssentialNamed, SpawnNamed}; +use sp_core::traits::{CodeExecutor, SpawnNamed}; use sp_keystore::{CryptoStore, SyncCryptoStore, SyncCryptoStorePtr}; use sp_runtime::traits::{Block as BlockT, BlockIdTo, NumberFor, Zero}; use std::{str::FromStr, sync::Arc, time::SystemTime}; @@ -751,7 +751,7 @@ pub struct BuildNetworkParams<'a, TBl: BlockT, TExPool, TImpQu, TCl> { /// A shared transaction pool. pub transaction_pool: Arc, /// A handle for spawning tasks. - pub spawn_handle: Box, + pub spawn_handle: SpawnTaskHandle, /// An import queue. pub import_queue: TImpQu, /// A block announce validator builder. @@ -827,11 +827,7 @@ where config.network.default_peers_set.in_peers as usize + config.network.default_peers_set.out_peers as usize, ); - spawn_handle.spawn_essential( - "block-request-handler", - Some("networking"), - Box::pin(handler.run()), - ); + spawn_handle.spawn("block-request-handler", Some("networking"), handler.run()); protocol_config }; @@ -843,11 +839,7 @@ where client.clone(), config.network.default_peers_set_num_full as usize, ); - spawn_handle.spawn_essential( - "state-request-handler", - Some("networking"), - Box::pin(handler.run()), - ); + spawn_handle.spawn("state-request-handler", Some("networking"), handler.run()); protocol_config }; @@ -864,11 +856,7 @@ where config.chain_spec.fork_id(), warp_with_provider.clone(), ); - spawn_handle.spawn_essential( - "warp-sync-request-handler", - Some("networking"), - Box::pin(handler.run()), - ); + spawn_handle.spawn("warp-sync-request-handler", Some("networking"), handler.run()); Some(protocol_config) }, _ => None, @@ -881,11 +869,7 @@ where config.chain_spec.fork_id(), client.clone(), ); - spawn_handle.spawn_essential( - "light-client-request-handler", - Some("networking"), - Box::pin(handler.run()), - ); + spawn_handle.spawn("light-client-request-handler", Some("networking"), handler.run()); protocol_config }; @@ -914,11 +898,7 @@ where request_response_protocol_configs.push(config.network.ipfs_server.then(|| { let (handler, protocol_config) = BitswapRequestHandler::new(client.clone()); - spawn_handle.spawn_essential( - "bitswap-request-handler", - Some("networking"), - Box::pin(handler.run()), - ); + spawn_handle.spawn("bitswap-request-handler", Some("networking"), handler.run()); protocol_config })); @@ -927,7 +907,7 @@ where executor: { let spawn_handle = Clone::clone(&spawn_handle); Box::new(move |fut| { - spawn_handle.spawn_essential("libp2p-node", Some("networking"), fut); + spawn_handle.spawn("libp2p-node", Some("networking"), fut); }) }, network_config: config.network.clone(), @@ -975,21 +955,13 @@ where config.prometheus_config.as_ref().map(|config| &config.registry), )?; - spawn_handle.spawn_essential( - "network-transactions-handler", - Some("networking"), - Box::pin(tx_handler.run()), - ); - spawn_handle.spawn_essential( + spawn_handle.spawn("network-transactions-handler", Some("networking"), tx_handler.run()); + spawn_handle.spawn( "chain-sync-network-service-provider", Some("networking"), - Box::pin(chain_sync_network_provider.run(network.clone())), - ); - spawn_handle.spawn_essential( - "import-queue", - None, - import_queue.run(Box::new(chain_sync_service)), + chain_sync_network_provider.run(network.clone()), ); + spawn_handle.spawn("import-queue", None, import_queue.run(Box::new(chain_sync_service))); let (system_rpc_tx, system_rpc_rx) = tracing_unbounded("mpsc_system_rpc", 10_000); @@ -1025,22 +997,18 @@ where // issue, and ideally we would like to fix the network future to take as little time as // possible, but we also take the extra harm-prevention measure to execute the networking // future using `spawn_blocking`. - spawn_handle.spawn_essential_blocking( - "network-worker", - Some("networking"), - Box::pin(async move { - if network_start_rx.await.is_err() { - log::warn!( + spawn_handle.spawn_blocking("network-worker", Some("networking"), async move { + if network_start_rx.await.is_err() { + log::warn!( "The NetworkStart returned as part of `build_network` has been silently dropped" ); - // This `return` might seem unnecessary, but we don't want to make it look like - // everything is working as normal even though the user is clearly misusing the API. - return - } + // This `return` might seem unnecessary, but we don't want to make it look like + // everything is working as normal even though the user is clearly misusing the API. + return + } - future.await - }), - ); + future.await + }); Ok((network, system_rpc_tx, tx_handler_controller, NetworkStarter(network_start_tx))) } From 2dadb0dbd651635a8637b9f6ad7a60cac17e0527 Mon Sep 17 00:00:00 2001 From: Sam Elamin Date: Tue, 24 Jan 2023 13:58:25 +0000 Subject: [PATCH 33/50] Update client/network/common/src/sync/warp.rs Co-authored-by: Michal Kucharczyk <1728078+michalkucharczyk@users.noreply.github.com> --- client/network/common/src/sync/warp.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/client/network/common/src/sync/warp.rs b/client/network/common/src/sync/warp.rs index 89e096c88588e..ca15921996a11 100644 --- a/client/network/common/src/sync/warp.rs +++ b/client/network/common/src/sync/warp.rs @@ -91,7 +91,7 @@ impl fmt::Display for WarpSyncPhase { fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { match self { Self::AwaitingPeers => write!(f, "Waiting for peers"), - Self::AwaitingTargetBlock => write!(f, "Waiting for target block to be reached"), + Self::AwaitingTargetBlock => write!(f, "Waiting for target block to be received"), Self::DownloadingWarpProofs => write!(f, "Downloading finality proofs"), Self::DownloadingTargetBlock => write!(f, "Downloading target block"), Self::DownloadingState => write!(f, "Downloading state"), From fac921455575aaffb363ac4db10ad59eed4be441 Mon Sep 17 00:00:00 2001 From: Sam Elamin Date: Tue, 24 Jan 2023 13:58:37 +0000 Subject: [PATCH 34/50] Update client/network/common/src/sync/warp.rs Co-authored-by: Michal Kucharczyk <1728078+michalkucharczyk@users.noreply.github.com> --- client/network/common/src/sync/warp.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/client/network/common/src/sync/warp.rs b/client/network/common/src/sync/warp.rs index ca15921996a11..020642fb48c82 100644 --- a/client/network/common/src/sync/warp.rs +++ b/client/network/common/src/sync/warp.rs @@ -73,7 +73,7 @@ pub trait WarpSyncProvider: Send + Sync { pub enum WarpSyncPhase { /// Waiting for peers to connect. AwaitingPeers, - /// Waiting for target block to be received + /// Waiting for target block to be received. AwaitingTargetBlock, /// Downloading and verifying grandpa warp proofs. DownloadingWarpProofs, From c6390a12234587319cb5ffac74012e7270f671f2 Mon Sep 17 00:00:00 2001 From: sam elamin Date: Thu, 26 Jan 2023 13:37:37 +0000 Subject: [PATCH 35/50] use matching on polling to avoid calling poll more than once --- client/network/sync/src/warp.rs | 14 +++++++------- 1 file changed, 7 insertions(+), 7 deletions(-) diff --git a/client/network/sync/src/warp.rs b/client/network/sync/src/warp.rs index 68540b0ee7c29..3fac4191d7729 100644 --- a/client/network/sync/src/warp.rs +++ b/client/network/sync/src/warp.rs @@ -109,13 +109,13 @@ where /// send. pub fn poll(&mut self, cx: &mut std::task::Context) -> Poll<()> { let new_phase = if let Phase::PendingTargetBlock { target_block } = &mut self.phase { - if let Poll::Ready(Ok(target_block)) = target_block.poll_unpin(cx) { - Some(Phase::TargetBlock(target_block)) - } else if let Poll::Ready(Err(e)) = target_block.poll_unpin(cx) { - error!(target: "sync", "Failed to get target block. Error: {:?}",e); - None - } else { - None + match target_block.poll_unpin(cx) { + Poll::Ready(Ok(target_block)) => Some(Phase::TargetBlock(target_block)), + Poll::Ready(Err(e)) => { + error!(target: "sync", "Failed to get target block. Error: {:?}",e); + None + }, + _ => None, } } else { None From 7941ee6194e6813d722fdd8595db718f3479ca78 Mon Sep 17 00:00:00 2001 From: Sam Elamin Date: Fri, 27 Jan 2023 07:22:58 +0000 Subject: [PATCH 36/50] Update client/network/sync/src/warp.rs MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Co-authored-by: Bastian Köcher --- client/network/sync/src/warp.rs | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/client/network/sync/src/warp.rs b/client/network/sync/src/warp.rs index 3fac4191d7729..2429366965747 100644 --- a/client/network/sync/src/warp.rs +++ b/client/network/sync/src/warp.rs @@ -107,18 +107,18 @@ where /// /// This only makes progress when `phase = Phase::PendingTargetBlock` and the pending block was /// send. - pub fn poll(&mut self, cx: &mut std::task::Context) -> Poll<()> { + pub fn poll(&mut self, cx: &mut std::task::Context) { let new_phase = if let Phase::PendingTargetBlock { target_block } = &mut self.phase { match target_block.poll_unpin(cx) { - Poll::Ready(Ok(target_block)) => Some(Phase::TargetBlock(target_block)), + Poll::Ready(Ok(target_block)) => Phase::TargetBlock(target_block), Poll::Ready(Err(e)) => { error!(target: "sync", "Failed to get target block. Error: {:?}",e); - None + return }, - _ => None, + _ => return, } } else { - None + return }; if let Some(new_phase) = new_phase { From fcc7216f0b1f4e49a6b15fdba1ad145313614dc7 Mon Sep 17 00:00:00 2001 From: Sam Elamin Date: Fri, 27 Jan 2023 07:23:07 +0000 Subject: [PATCH 37/50] Update client/network/sync/src/warp.rs MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Co-authored-by: Bastian Köcher --- client/network/sync/src/warp.rs | 2 -- 1 file changed, 2 deletions(-) diff --git a/client/network/sync/src/warp.rs b/client/network/sync/src/warp.rs index 2429366965747..5e1d363e9be55 100644 --- a/client/network/sync/src/warp.rs +++ b/client/network/sync/src/warp.rs @@ -124,8 +124,6 @@ where if let Some(new_phase) = new_phase { self.phase = new_phase; } - - Poll::Pending } /// Validate and import a state response. From da38da3be72be88e63e3801f37d8ce77379451a1 Mon Sep 17 00:00:00 2001 From: Sam Elamin Date: Fri, 27 Jan 2023 07:23:16 +0000 Subject: [PATCH 38/50] Update client/network/sync/src/warp.rs MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Co-authored-by: Bastian Köcher --- client/network/sync/src/warp.rs | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/client/network/sync/src/warp.rs b/client/network/sync/src/warp.rs index 5e1d363e9be55..5871f6d165f7d 100644 --- a/client/network/sync/src/warp.rs +++ b/client/network/sync/src/warp.rs @@ -121,9 +121,7 @@ where return }; - if let Some(new_phase) = new_phase { - self.phase = new_phase; - } + self.phase = new_phase; } /// Validate and import a state response. From d7d16ba8532102248b29a25973f3beab7735ebfc Mon Sep 17 00:00:00 2001 From: sam elamin Date: Fri, 27 Jan 2023 07:28:47 +0000 Subject: [PATCH 39/50] fix typo on comment --- client/network/sync/src/warp.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/client/network/sync/src/warp.rs b/client/network/sync/src/warp.rs index 5871f6d165f7d..5b28b8bd52eee 100644 --- a/client/network/sync/src/warp.rs +++ b/client/network/sync/src/warp.rs @@ -106,7 +106,7 @@ where /// Poll to make progress. /// /// This only makes progress when `phase = Phase::PendingTargetBlock` and the pending block was - /// send. + /// sent. pub fn poll(&mut self, cx: &mut std::task::Context) { let new_phase = if let Phase::PendingTargetBlock { target_block } = &mut self.phase { match target_block.poll_unpin(cx) { From f960907f7573d0fb6e9791214ef11cd0960f004f Mon Sep 17 00:00:00 2001 From: Sam Elamin Date: Mon, 6 Feb 2023 19:54:28 +0000 Subject: [PATCH 40/50] update snapshot with new folder structure --- zombienet/0001-basic-warp-sync/test-warp-sync.toml | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/zombienet/0001-basic-warp-sync/test-warp-sync.toml b/zombienet/0001-basic-warp-sync/test-warp-sync.toml index ae2810b6ecccd..591d07cf64a79 100644 --- a/zombienet/0001-basic-warp-sync/test-warp-sync.toml +++ b/zombienet/0001-basic-warp-sync/test-warp-sync.toml @@ -11,18 +11,18 @@ chain_spec_path = "zombienet/0001-basic-warp-sync/chain-spec.json" [[relaychain.nodes]] name = "alice" validator = false - db_snapshot="https://storage.googleapis.com/zombienet-db-snaps/substrate/0001-basic-warp-sync/chains-0bb3f0be2ce41b5615b224215bcc8363aa0416a6.tgz" + db_snapshot="https://imbue-test-us-east-2.s3.us-east-2.amazonaws.com/chains-b37ea1fc1eba94880fdf431a769cbaad75a89199.tgz" [[relaychain.nodes]] name = "bob" validator = false - db_snapshot="https://storage.googleapis.com/zombienet-db-snaps/substrate/0001-basic-warp-sync/chains-0bb3f0be2ce41b5615b224215bcc8363aa0416a6.tgz" + db_snapshot="https://imbue-test-us-east-2.s3.us-east-2.amazonaws.com/chains-b37ea1fc1eba94880fdf431a769cbaad75a89199.tgz" #we need at least 3 nodes for warp sync [[relaychain.nodes]] name = "charlie" validator = false - db_snapshot="https://storage.googleapis.com/zombienet-db-snaps/substrate/0001-basic-warp-sync/chains-0bb3f0be2ce41b5615b224215bcc8363aa0416a6.tgz" + db_snapshot="https://imbue-test-us-east-2.s3.us-east-2.amazonaws.com/chains-b37ea1fc1eba94880fdf431a769cbaad75a89199.tgz" [[relaychain.nodes]] name = "dave" From ba4e7e6432b638ca289de161f6a27752fd59a485 Mon Sep 17 00:00:00 2001 From: Sebastian Kunert Date: Tue, 7 Feb 2023 16:40:23 +0100 Subject: [PATCH 41/50] Upload snapshot --- zombienet/0001-basic-warp-sync/test-warp-sync.toml | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/zombienet/0001-basic-warp-sync/test-warp-sync.toml b/zombienet/0001-basic-warp-sync/test-warp-sync.toml index 591d07cf64a79..272b5862e8e89 100644 --- a/zombienet/0001-basic-warp-sync/test-warp-sync.toml +++ b/zombienet/0001-basic-warp-sync/test-warp-sync.toml @@ -11,18 +11,18 @@ chain_spec_path = "zombienet/0001-basic-warp-sync/chain-spec.json" [[relaychain.nodes]] name = "alice" validator = false - db_snapshot="https://imbue-test-us-east-2.s3.us-east-2.amazonaws.com/chains-b37ea1fc1eba94880fdf431a769cbaad75a89199.tgz" + db_snapshot="https://storage.googleapis.com/zombienet-db-snaps/substrate/0001-basic-warp-sync/chains-a233bbacff650aac6bcb56b4e4ef7db7dc143cfb.tgz" [[relaychain.nodes]] name = "bob" validator = false - db_snapshot="https://imbue-test-us-east-2.s3.us-east-2.amazonaws.com/chains-b37ea1fc1eba94880fdf431a769cbaad75a89199.tgz" + db_snapshot="https://storage.googleapis.com/zombienet-db-snaps/substrate/0001-basic-warp-sync/chains-a233bbacff650aac6bcb56b4e4ef7db7dc143cfb.tgz" #we need at least 3 nodes for warp sync [[relaychain.nodes]] name = "charlie" validator = false - db_snapshot="https://imbue-test-us-east-2.s3.us-east-2.amazonaws.com/chains-b37ea1fc1eba94880fdf431a769cbaad75a89199.tgz" + db_snapshot="https://storage.googleapis.com/zombienet-db-snaps/substrate/0001-basic-warp-sync/chains-a233bbacff650aac6bcb56b4e4ef7db7dc143cfb.tgz" [[relaychain.nodes]] name = "dave" From 5f24b3eea2015a72603f8c14c82225a22624d7d3 Mon Sep 17 00:00:00 2001 From: Sebastian Kunert Date: Tue, 7 Feb 2023 17:21:12 +0100 Subject: [PATCH 42/50] Bump zombienet --- .gitlab-ci.yml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.gitlab-ci.yml b/.gitlab-ci.yml index ecc27d016a811..9087e60538b0c 100644 --- a/.gitlab-ci.yml +++ b/.gitlab-ci.yml @@ -55,7 +55,7 @@ variables: RUSTY_CACHIER_COMPRESSION_METHOD: zstd NEXTEST_FAILURE_OUTPUT: immediate-final NEXTEST_SUCCESS_OUTPUT: final - ZOMBIENET_IMAGE: "docker.io/paritytech/zombienet:v1.3.22" + ZOMBIENET_IMAGE: "docker.io/paritytech/zombienet:v1.3.33" .shared-default: &shared-default retry: From be945366d3a334883c7d5d43f3eda59f25f47e59 Mon Sep 17 00:00:00 2001 From: sam elamin Date: Mon, 13 Feb 2023 15:10:03 +0000 Subject: [PATCH 43/50] bump zombienet again --- .gitlab-ci.yml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.gitlab-ci.yml b/.gitlab-ci.yml index 9087e60538b0c..47090407fe5bb 100644 --- a/.gitlab-ci.yml +++ b/.gitlab-ci.yml @@ -55,7 +55,7 @@ variables: RUSTY_CACHIER_COMPRESSION_METHOD: zstd NEXTEST_FAILURE_OUTPUT: immediate-final NEXTEST_SUCCESS_OUTPUT: final - ZOMBIENET_IMAGE: "docker.io/paritytech/zombienet:v1.3.33" + ZOMBIENET_IMAGE: "docker.io/paritytech/zombienet:v1.3.34" .shared-default: &shared-default retry: From de75501954bbeb1972db88d6d2ef5064de0c47f2 Mon Sep 17 00:00:00 2001 From: Sebastian Kunert Date: Mon, 13 Feb 2023 18:00:59 +0100 Subject: [PATCH 44/50] Improve test --- client/network/test/src/sync.rs | 16 ++++++++++++++-- 1 file changed, 14 insertions(+), 2 deletions(-) diff --git a/client/network/test/src/sync.rs b/client/network/test/src/sync.rs index 6ac3913467eea..c3e5d13476a57 100644 --- a/client/network/test/src/sync.rs +++ b/client/network/test/src/sync.rs @@ -1258,11 +1258,12 @@ async fn warp_sync_to_target_block() { net.add_full_peer_with_config(Default::default()); net.add_full_peer_with_config(Default::default()); + let gap_end = net.peer(0).push_blocks(63, false).pop().unwrap(); + let target = net.peer(0).push_blocks(1, false).pop().unwrap(); net.peer(1).push_blocks(64, false); net.peer(2).push_blocks(64, false); - let info = net.peer(0).client.info(); - let target_block = net.peer(0).client.header(info.best_hash).unwrap().unwrap(); + let target_block = net.peer(0).client.header(target).unwrap().unwrap(); net.add_full_peer_with_config(FullPeerConfig { sync_mode: SyncMode::Warp, @@ -1272,6 +1273,17 @@ async fn warp_sync_to_target_block() { net.run_until_sync().await; assert!(net.peer(3).client().has_state_at(&BlockId::Number(64))); + + // Wait for peer 1 download block history + futures::future::poll_fn::<(), _>(|cx| { + net.poll(cx); + if net.peer(3).has_body(gap_end) && net.peer(3).has_body(target) { + Poll::Ready(()) + } else { + Poll::Pending + } + }) + .await; } #[tokio::test(flavor = "multi_thread", worker_threads = 2)] From 501c4f46ce29ac1b782131c637c1fbc1431cd20a Mon Sep 17 00:00:00 2001 From: Sam Elamin Date: Tue, 14 Feb 2023 09:04:28 +0000 Subject: [PATCH 45/50] Update client/network/test/src/sync.rs MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Co-authored-by: Bastian Köcher --- client/network/test/src/sync.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/client/network/test/src/sync.rs b/client/network/test/src/sync.rs index c3e5d13476a57..d279230cbf78a 100644 --- a/client/network/test/src/sync.rs +++ b/client/network/test/src/sync.rs @@ -1258,8 +1258,8 @@ async fn warp_sync_to_target_block() { net.add_full_peer_with_config(Default::default()); net.add_full_peer_with_config(Default::default()); - let gap_end = net.peer(0).push_blocks(63, false).pop().unwrap(); - let target = net.peer(0).push_blocks(1, false).pop().unwrap(); + let blocks = net.peer(0).push_blocks(64, false); + let target = *blocks[63]; net.peer(1).push_blocks(64, false); net.peer(2).push_blocks(64, false); From 4cc074eba1d02cb9afa7a6229563787db3f62f09 Mon Sep 17 00:00:00 2001 From: Sam Elamin Date: Tue, 14 Feb 2023 09:04:37 +0000 Subject: [PATCH 46/50] Update client/network/test/src/sync.rs MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Co-authored-by: Bastian Köcher --- client/network/test/src/sync.rs | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/client/network/test/src/sync.rs b/client/network/test/src/sync.rs index d279230cbf78a..b8e559b9e3f0a 100644 --- a/client/network/test/src/sync.rs +++ b/client/network/test/src/sync.rs @@ -1277,7 +1277,8 @@ async fn warp_sync_to_target_block() { // Wait for peer 1 download block history futures::future::poll_fn::<(), _>(|cx| { net.poll(cx); - if net.peer(3).has_body(gap_end) && net.peer(3).has_body(target) { + let peer = net.peer(3); + if blocks.iter().all(|b| peer.has_bodyl(*b)) { Poll::Ready(()) } else { Poll::Pending From 03a3b725245c41e582deede99950e74367f56697 Mon Sep 17 00:00:00 2001 From: sam elamin Date: Tue, 14 Feb 2023 09:35:11 +0000 Subject: [PATCH 47/50] fix tests --- client/network/test/src/sync.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/client/network/test/src/sync.rs b/client/network/test/src/sync.rs index b8e559b9e3f0a..8c230640355c9 100644 --- a/client/network/test/src/sync.rs +++ b/client/network/test/src/sync.rs @@ -1259,7 +1259,7 @@ async fn warp_sync_to_target_block() { net.add_full_peer_with_config(Default::default()); let blocks = net.peer(0).push_blocks(64, false); - let target = *blocks[63]; + let target = blocks[63]; net.peer(1).push_blocks(64, false); net.peer(2).push_blocks(64, false); @@ -1278,7 +1278,7 @@ async fn warp_sync_to_target_block() { futures::future::poll_fn::<(), _>(|cx| { net.poll(cx); let peer = net.peer(3); - if blocks.iter().all(|b| peer.has_bodyl(*b)) { + if blocks.iter().all(|b| peer.has_body(*b)) { Poll::Ready(()) } else { Poll::Pending From 3d8ecac82e507ada29af1e5293414f7a5b58c8f8 Mon Sep 17 00:00:00 2001 From: sam elamin Date: Tue, 14 Feb 2023 10:41:28 +0000 Subject: [PATCH 48/50] dummy commit to restart builds --- zombienet/0001-basic-warp-sync/test-warp-sync.toml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/zombienet/0001-basic-warp-sync/test-warp-sync.toml b/zombienet/0001-basic-warp-sync/test-warp-sync.toml index 272b5862e8e89..e2b27e0d6cf7e 100644 --- a/zombienet/0001-basic-warp-sync/test-warp-sync.toml +++ b/zombienet/0001-basic-warp-sync/test-warp-sync.toml @@ -27,4 +27,4 @@ chain_spec_path = "zombienet/0001-basic-warp-sync/chain-spec.json" [[relaychain.nodes]] name = "dave" validator = false - args = ["--sync warp"] + args = ["--sync warp"] \ No newline at end of file From f12c16a79a85154ff140e2b42c883f9e0850580a Mon Sep 17 00:00:00 2001 From: sam elamin Date: Tue, 14 Feb 2023 15:20:02 +0000 Subject: [PATCH 49/50] Converted the target block to an optional value that is set to `None` when an error occurs --- client/network/sync/src/warp.rs | 12 +++++++----- 1 file changed, 7 insertions(+), 5 deletions(-) diff --git a/client/network/sync/src/warp.rs b/client/network/sync/src/warp.rs index 5b28b8bd52eee..bb64ff6a15890 100644 --- a/client/network/sync/src/warp.rs +++ b/client/network/sync/src/warp.rs @@ -46,7 +46,7 @@ enum Phase { warp_sync_provider: Arc>, }, PendingTargetBlock { - target_block: oneshot::Receiver, + target_block: Option>, }, TargetBlock(B::Header), State(StateSync), @@ -97,7 +97,7 @@ where }, WarpSyncParams::WaitForTarget(block) => Self { client, - phase: Phase::PendingTargetBlock { target_block: block }, + phase: Phase::PendingTargetBlock { target_block: Some(block) }, total_proof_bytes: 0, }, } @@ -108,12 +108,14 @@ where /// This only makes progress when `phase = Phase::PendingTargetBlock` and the pending block was /// sent. pub fn poll(&mut self, cx: &mut std::task::Context) { - let new_phase = if let Phase::PendingTargetBlock { target_block } = &mut self.phase { + let new_phase = if let Phase::PendingTargetBlock { target_block: Some(target_block) } = + &mut self.phase + { match target_block.poll_unpin(cx) { - Poll::Ready(Ok(target_block)) => Phase::TargetBlock(target_block), + Poll::Ready(Ok(target)) => Phase::TargetBlock(target), Poll::Ready(Err(e)) => { error!(target: "sync", "Failed to get target block. Error: {:?}",e); - return + Phase::PendingTargetBlock { target_block: None } }, _ => return, } From 10e248f8ce159af36ff3148a2634d39d80d256ef Mon Sep 17 00:00:00 2001 From: sam elamin Date: Tue, 14 Feb 2023 15:37:11 +0000 Subject: [PATCH 50/50] dummy commit to restart builds --- zombienet/0001-basic-warp-sync/test-warp-sync.toml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/zombienet/0001-basic-warp-sync/test-warp-sync.toml b/zombienet/0001-basic-warp-sync/test-warp-sync.toml index e2b27e0d6cf7e..272b5862e8e89 100644 --- a/zombienet/0001-basic-warp-sync/test-warp-sync.toml +++ b/zombienet/0001-basic-warp-sync/test-warp-sync.toml @@ -27,4 +27,4 @@ chain_spec_path = "zombienet/0001-basic-warp-sync/chain-spec.json" [[relaychain.nodes]] name = "dave" validator = false - args = ["--sync warp"] \ No newline at end of file + args = ["--sync warp"]