diff --git a/Cargo.lock b/Cargo.lock index 7a11c5b7f196..55a8e0c748f5 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -15380,6 +15380,7 @@ dependencies = [ "substrate-test-runtime-client", "thiserror", "tokio", + "tokio-stream", ] [[package]] diff --git a/polkadot/node/core/runtime-api/src/lib.rs b/polkadot/node/core/runtime-api/src/lib.rs index 8b2cf10409d2..bdcca08b10dd 100644 --- a/polkadot/node/core/runtime-api/src/lib.rs +++ b/polkadot/node/core/runtime-api/src/lib.rs @@ -579,7 +579,7 @@ where query!( ParaBackingState, para_backing_state(para), - ver = Request::STAGING_BACKING_STATE, + ver = Request::ASYNC_BACKING_STATE_RUNTIME_REQUIREMENT, sender ) }, @@ -587,7 +587,7 @@ where query!( AsyncBackingParams, async_backing_params(), - ver = Request::STAGING_BACKING_STATE, + ver = Request::ASYNC_BACKING_STATE_RUNTIME_REQUIREMENT, sender ) }, diff --git a/polkadot/node/subsystem-types/src/messages.rs b/polkadot/node/subsystem-types/src/messages.rs index 98eaf8974c26..6bc874384594 100644 --- a/polkadot/node/subsystem-types/src/messages.rs +++ b/polkadot/node/subsystem-types/src/messages.rs @@ -727,7 +727,7 @@ impl RuntimeApiRequest { pub const MINIMUM_BACKING_VOTES_RUNTIME_REQUIREMENT: u32 = 6; /// Minimum version to enable asynchronous backing: `AsyncBackingParams` and `ParaBackingState`. - pub const STAGING_BACKING_STATE: u32 = 7; + pub const ASYNC_BACKING_STATE_RUNTIME_REQUIREMENT: u32 = 7; /// `DisabledValidators` pub const DISABLED_VALIDATORS_RUNTIME_REQUIREMENT: u32 = 8; diff --git a/polkadot/node/subsystem-util/src/lib.rs b/polkadot/node/subsystem-util/src/lib.rs index fad29500bfaf..a5f3e9d4a0c0 100644 --- a/polkadot/node/subsystem-util/src/lib.rs +++ b/polkadot/node/subsystem-util/src/lib.rs @@ -227,7 +227,6 @@ specialize_requests! { fn request_key_ownership_proof(validator_id: ValidatorId) -> Option; KeyOwnershipProof; fn request_submit_report_dispute_lost(dp: slashing::DisputeProof, okop: slashing::OpaqueKeyOwnershipProof) -> Option<()>; SubmitReportDisputeLost; fn request_disabled_validators() -> Vec; DisabledValidators; - fn request_async_backing_params() -> AsyncBackingParams; AsyncBackingParams; } diff --git a/substrate/client/network/common/src/sync.rs b/substrate/client/network/common/src/sync.rs index 5a6f90b290d2..8ef0fafa1c79 100644 --- a/substrate/client/network/common/src/sync.rs +++ b/substrate/client/network/common/src/sync.rs @@ -36,7 +36,7 @@ use sp_runtime::{ }; use warp::WarpSyncProgress; -use std::{any::Any, fmt, fmt::Formatter, pin::Pin, sync::Arc, task::Poll}; +use std::{any::Any, fmt, fmt::Formatter, pin::Pin, sync::Arc}; /// The sync status of a peer we are trying to sync with #[derive(Debug)] @@ -204,6 +204,23 @@ pub enum PeerRequest { WarpProof, } +#[derive(Debug)] +pub enum PeerRequestType { + Block, + State, + WarpProof, +} + +impl PeerRequest { + pub fn get_type(&self) -> PeerRequestType { + match self { + PeerRequest::Block(_) => PeerRequestType::Block, + PeerRequest::State => PeerRequestType::State, + PeerRequest::WarpProof => PeerRequestType::WarpProof, + } + } +} + /// Wrapper for implementation-specific state request. /// /// NOTE: Implementation must be able to encode and decode it for network purposes. @@ -289,9 +306,6 @@ pub trait ChainSync: Send { /// Returns the current number of peers stored within this state machine. fn num_peers(&self) -> usize; - /// Returns the number of peers we're connected to and that are being queried. - fn num_active_peers(&self) -> usize; - /// Handle a new connected peer. /// /// Call this method whenever we connect to a new peer. @@ -369,10 +383,4 @@ pub trait ChainSync: Send { /// Return some key metrics. fn metrics(&self) -> Metrics; - - /// Advance the state of `ChainSync` - fn poll(&mut self, cx: &mut std::task::Context) -> Poll<()>; - - /// Send block request to peer - fn send_block_request(&mut self, who: PeerId, request: BlockRequest); } diff --git a/substrate/client/network/sync/Cargo.toml b/substrate/client/network/sync/Cargo.toml index f10dd7869bbb..39312cc4b327 100644 --- a/substrate/client/network/sync/Cargo.toml +++ b/substrate/client/network/sync/Cargo.toml @@ -29,6 +29,7 @@ prost = "0.11" schnellru = "0.2.1" smallvec = "1.11.0" thiserror = "1.0" +tokio-stream = "0.1.14" fork-tree = { path = "../../../utils/fork-tree" } prometheus-endpoint = { package = "substrate-prometheus-endpoint", path = "../../../utils/prometheus" } sc-client-api = { path = "../../api" } diff --git a/substrate/client/network/sync/src/engine.rs b/substrate/client/network/sync/src/engine.rs index c93ba89c6220..8b8874ad4ebd 100644 --- a/substrate/client/network/sync/src/engine.rs +++ b/substrate/client/network/sync/src/engine.rs @@ -23,10 +23,12 @@ use crate::{ block_announce_validator::{ BlockAnnounceValidationResult, BlockAnnounceValidator as BlockAnnounceValidatorStream, }, - block_relay_protocol::BlockDownloader, + block_relay_protocol::{BlockDownloader, BlockResponseError}, + pending_responses::{PendingResponses, ResponseEvent}, + schema::v1::{StateRequest, StateResponse}, service::{self, chain_sync::ToServiceCommand}, warp::WarpSyncParams, - ChainSync, ClientError, SyncingService, + BlockRequestEvent, ChainSync, ClientError, SyncingService, }; use codec::{Decode, Encode}; @@ -36,24 +38,32 @@ use futures::{ FutureExt, StreamExt, }; use futures_timer::Delay; -use libp2p::PeerId; +use libp2p::{request_response::OutboundFailure, PeerId}; +use log::{debug, trace}; use prometheus_endpoint::{ register, Gauge, GaugeVec, MetricSource, Opts, PrometheusError, Registry, SourcedGauge, U64, }; +use prost::Message; use schnellru::{ByLength, LruMap}; use sc_client_api::{BlockBackend, HeaderBackend, ProofProvider}; use sc_consensus::import_queue::ImportQueueService; use sc_network::{ - config::{FullNetworkConfiguration, NonDefaultSetConfig, ProtocolId}, + config::{ + FullNetworkConfiguration, NonDefaultSetConfig, NonReservedPeerMode, NotificationHandshake, + ProtocolId, SetConfig, + }, + request_responses::{IfDisconnected, RequestFailure}, utils::LruHashSet, NotificationsSink, ProtocolName, ReputationChange, }; use sc_network_common::{ role::Roles, sync::{ - message::{BlockAnnounce, BlockAnnouncesHandshake, BlockState}, - BadPeer, ChainSync as ChainSyncT, ExtendedPeerInfo, SyncEvent, + message::{BlockAnnounce, BlockAnnouncesHandshake, BlockRequest, BlockState}, + warp::{EncodedProof, WarpProofRequest}, + BadPeer, ChainSync as ChainSyncT, ExtendedPeerInfo, OpaqueStateRequest, + OpaqueStateResponse, PeerRequest, SyncEvent, }, }; use sc_utils::mpsc::{tracing_unbounded, TracingUnboundedReceiver, TracingUnboundedSender}; @@ -63,6 +73,7 @@ use sp_runtime::traits::{Block as BlockT, Header, NumberFor, Zero}; use std::{ collections::{HashMap, HashSet}, + iter, num::NonZeroUsize, sync::{ atomic::{AtomicBool, AtomicUsize, Ordering}, @@ -98,6 +109,9 @@ const INACTIVITY_EVICT_THRESHOLD: Duration = Duration::from_secs(30); /// before it starts evicting peers. const INITIAL_EVICTION_WAIT_PERIOD: Duration = Duration::from_secs(2 * 60); +/// Maximum allowed size for a block announce. +const MAX_BLOCK_ANNOUNCE_SIZE: u64 = 1024 * 1024; + mod rep { use sc_network::ReputationChange as Rep; /// Peer has different genesis. @@ -106,6 +120,14 @@ mod rep { pub const BAD_BLOCK_ANNOUNCEMENT: Rep = Rep::new(-(1 << 12), "Bad block announcement"); /// Block announce substream with the peer has been inactive too long pub const INACTIVE_SUBSTREAM: Rep = Rep::new(-(1 << 10), "Inactive block announce substream"); + /// We received a message that failed to decode. + pub const BAD_MESSAGE: Rep = Rep::new(-(1 << 12), "Bad message"); + /// Peer is on unsupported protocol version. + pub const BAD_PROTOCOL: Rep = Rep::new_fatal("Unsupported protocol"); + /// Reputation change when a peer refuses a request. + pub const REFUSED: Rep = Rep::new(-(1 << 10), "Request refused"); + /// Reputation change when a peer doesn't respond in time to our messages. + pub const TIMEOUT: Rep = Rep::new(-(1 << 10), "Request timeout"); } struct Metrics { @@ -277,6 +299,18 @@ pub struct SyncingEngine { /// Instant when the last notification was sent or received. last_notification_io: Instant, + + /// Pending responses + pending_responses: PendingResponses, + + /// Block downloader + block_downloader: Arc>, + + /// Protocol name used to send out state requests + state_request_protocol_name: ProtocolName, + + /// Protocol name used to send out warp sync requests + warp_sync_protocol_name: Option, } impl SyncingEngine @@ -381,24 +415,32 @@ where let warp_sync_target_block_header_rx = warp_sync_target_block_header_rx .map_or(futures::future::pending().boxed().fuse(), |rx| rx.boxed().fuse()); - let (chain_sync, block_announce_config) = ChainSync::new( - mode, - client.clone(), + let block_announce_config = Self::get_block_announce_proto_config( protocol_id, fork_id, roles, + client.info().best_number, + client.info().best_hash, + client + .block_hash(Zero::zero()) + .ok() + .flatten() + .expect("Genesis block exists; qed"), + ); + let block_announce_protocol_name = block_announce_config.notifications_protocol.clone(); + + let chain_sync = ChainSync::new( + mode, + client.clone(), + block_announce_protocol_name.clone(), max_parallel_downloads, max_blocks_per_request, warp_sync_config, metrics_registry, network_service.clone(), import_queue, - block_downloader, - state_request_protocol_name, - warp_sync_protocol_name, )?; - let block_announce_protocol_name = block_announce_config.notifications_protocol.clone(); let (tx, service_rx) = tracing_unbounded("mpsc_chain_sync", 100_000); let num_connected = Arc::new(AtomicUsize::new(0)); let is_major_syncing = Arc::new(AtomicBool::new(false)); @@ -455,6 +497,10 @@ where } else { None }, + pending_responses: PendingResponses::new(), + block_downloader, + state_request_protocol_name, + warp_sync_protocol_name, }, SyncingService::new(tx, num_connected, is_major_syncing), block_announce_config, @@ -682,11 +728,23 @@ where ToServiceCommand::BlocksProcessed(imported, count, results) => { for result in self.chain_sync.on_blocks_processed(imported, count, results) { match result { - Ok((id, req)) => self.chain_sync.send_block_request(id, req), - Err(BadPeer(id, repu)) => { - self.network_service - .disconnect_peer(id, self.block_announce_protocol_name.clone()); - self.network_service.report_peer(id, repu) + Ok(event) => match event { + BlockRequestEvent::SendRequest { peer_id, request } => { + // drop obsolete pending response first + self.pending_responses.remove(&peer_id); + self.send_block_request(peer_id, request); + }, + BlockRequestEvent::RemoveStale { peer_id } => { + self.pending_responses.remove(&peer_id); + }, + }, + Err(BadPeer(peer_id, repu)) => { + self.pending_responses.remove(&peer_id); + self.network_service.disconnect_peer( + peer_id, + self.block_announce_protocol_name.clone(), + ); + self.network_service.report_peer(peer_id, repu) }, } } @@ -715,7 +773,7 @@ where let _ = tx.send(status); }, ToServiceCommand::NumActivePeers(tx) => { - let _ = tx.send(self.chain_sync.num_active_peers()); + let _ = tx.send(self.num_active_peers()); }, ToServiceCommand::SyncState(tx) => { let _ = tx.send(self.chain_sync.status()); @@ -817,8 +875,13 @@ where Poll::Pending => {}, } - // Drive `ChainSync`. - while let Poll::Ready(()) = self.chain_sync.poll(cx) {} + // Send outbound requests on `ChanSync`'s behalf. + self.send_chain_sync_requests(); + + // Poll & process pending responses. + while let Poll::Ready(Some(event)) = self.pending_responses.poll_next_unpin(cx) { + self.process_response_event(event); + } // Poll block announce validations last, because if a block announcement was received // through the event stream between `SyncingEngine` and `Protocol` and the validation @@ -860,6 +923,7 @@ where } self.chain_sync.peer_disconnected(&peer_id); + self.pending_responses.remove(&peer_id); self.event_streams.retain(|stream| { stream.unbounded_send(SyncEvent::PeerDisconnected(peer_id)).is_ok() }); @@ -991,7 +1055,7 @@ where } if let Some(req) = req { - self.chain_sync.send_block_request(peer_id, req); + self.send_block_request(peer_id, req); } self.event_streams @@ -999,4 +1063,278 @@ where Ok(()) } + + fn send_chain_sync_requests(&mut self) { + for (peer_id, request) in self.chain_sync.block_requests() { + self.send_block_request(peer_id, request); + } + + if let Some((peer_id, request)) = self.chain_sync.state_request() { + self.send_state_request(peer_id, request); + } + + for (peer_id, request) in self.chain_sync.justification_requests() { + self.send_block_request(peer_id, request); + } + + if let Some((peer_id, request)) = self.chain_sync.warp_sync_request() { + self.send_warp_sync_request(peer_id, request); + } + } + + fn send_block_request(&mut self, peer_id: PeerId, request: BlockRequest) { + if !self.chain_sync.is_peer_known(&peer_id) { + trace!(target: LOG_TARGET, "Cannot send block request to unknown peer {peer_id}"); + debug_assert!(false); + return + } + + let downloader = self.block_downloader.clone(); + + self.pending_responses.insert( + peer_id, + PeerRequest::Block(request.clone()), + async move { downloader.download_blocks(peer_id, request).await }.boxed(), + ); + } + + fn send_state_request(&mut self, peer_id: PeerId, request: OpaqueStateRequest) { + if !self.chain_sync.is_peer_known(&peer_id) { + trace!(target: LOG_TARGET, "Cannot send state request to unknown peer {peer_id}"); + debug_assert!(false); + return + } + + let (tx, rx) = oneshot::channel(); + + self.pending_responses.insert(peer_id, PeerRequest::State, rx.boxed()); + + match Self::encode_state_request(&request) { + Ok(data) => { + self.network_service.start_request( + peer_id, + self.state_request_protocol_name.clone(), + data, + tx, + IfDisconnected::ImmediateError, + ); + }, + Err(err) => { + log::warn!( + target: LOG_TARGET, + "Failed to encode state request {request:?}: {err:?}", + ); + }, + } + } + + fn send_warp_sync_request(&mut self, peer_id: PeerId, request: WarpProofRequest) { + if !self.chain_sync.is_peer_known(&peer_id) { + trace!(target: LOG_TARGET, "Cannot send warp proof request to unknown peer {peer_id}"); + debug_assert!(false); + return + } + + let (tx, rx) = oneshot::channel(); + + self.pending_responses.insert(peer_id, PeerRequest::WarpProof, rx.boxed()); + + match &self.warp_sync_protocol_name { + Some(name) => self.network_service.start_request( + peer_id, + name.clone(), + request.encode(), + tx, + IfDisconnected::ImmediateError, + ), + None => { + log::warn!( + target: LOG_TARGET, + "Trying to send warp sync request when no protocol is configured {request:?}", + ); + }, + } + } + + fn encode_state_request(request: &OpaqueStateRequest) -> Result, String> { + let request: &StateRequest = request.0.downcast_ref().ok_or_else(|| { + "Failed to downcast opaque state response during encoding, this is an \ + implementation bug." + .to_string() + })?; + + Ok(request.encode_to_vec()) + } + + fn decode_state_response(response: &[u8]) -> Result { + let response = StateResponse::decode(response) + .map_err(|error| format!("Failed to decode state response: {error}"))?; + + Ok(OpaqueStateResponse(Box::new(response))) + } + + fn process_response_event(&mut self, response_event: ResponseEvent) { + let ResponseEvent { peer_id, request, response } = response_event; + + match response { + Ok(Ok(resp)) => match request { + PeerRequest::Block(req) => { + match self.block_downloader.block_response_into_blocks(&req, resp) { + Ok(blocks) => { + if let Some((peer_id, new_req)) = + self.chain_sync.on_block_response(peer_id, req, blocks) + { + self.send_block_request(peer_id, new_req); + } + }, + Err(BlockResponseError::DecodeFailed(e)) => { + debug!( + target: LOG_TARGET, + "Failed to decode block response from peer {:?}: {:?}.", + peer_id, + e + ); + self.network_service.report_peer(peer_id, rep::BAD_MESSAGE); + self.network_service.disconnect_peer( + peer_id, + self.block_announce_protocol_name.clone(), + ); + return + }, + Err(BlockResponseError::ExtractionFailed(e)) => { + debug!( + target: LOG_TARGET, + "Failed to extract blocks from peer response {:?}: {:?}.", + peer_id, + e + ); + self.network_service.report_peer(peer_id, rep::BAD_MESSAGE); + return + }, + } + }, + PeerRequest::State => { + let response = match Self::decode_state_response(&resp[..]) { + Ok(proto) => proto, + Err(e) => { + debug!( + target: LOG_TARGET, + "Failed to decode state response from peer {peer_id:?}: {e:?}.", + ); + self.network_service.report_peer(peer_id, rep::BAD_MESSAGE); + self.network_service.disconnect_peer( + peer_id, + self.block_announce_protocol_name.clone(), + ); + return + }, + }; + + self.chain_sync.on_state_response(peer_id, response); + }, + PeerRequest::WarpProof => { + self.chain_sync.on_warp_sync_response(peer_id, EncodedProof(resp)); + }, + }, + Ok(Err(e)) => { + debug!(target: LOG_TARGET, "Request to peer {peer_id:?} failed: {e:?}."); + + match e { + RequestFailure::Network(OutboundFailure::Timeout) => { + self.network_service.report_peer(peer_id, rep::TIMEOUT); + self.network_service + .disconnect_peer(peer_id, self.block_announce_protocol_name.clone()); + }, + RequestFailure::Network(OutboundFailure::UnsupportedProtocols) => { + self.network_service.report_peer(peer_id, rep::BAD_PROTOCOL); + self.network_service + .disconnect_peer(peer_id, self.block_announce_protocol_name.clone()); + }, + RequestFailure::Network(OutboundFailure::DialFailure) => { + self.network_service + .disconnect_peer(peer_id, self.block_announce_protocol_name.clone()); + }, + RequestFailure::Refused => { + self.network_service.report_peer(peer_id, rep::REFUSED); + self.network_service + .disconnect_peer(peer_id, self.block_announce_protocol_name.clone()); + }, + RequestFailure::Network(OutboundFailure::ConnectionClosed) | + RequestFailure::NotConnected => { + self.network_service + .disconnect_peer(peer_id, self.block_announce_protocol_name.clone()); + }, + RequestFailure::UnknownProtocol => { + debug_assert!(false, "Block request protocol should always be known."); + }, + RequestFailure::Obsolete => { + debug_assert!( + false, + "Can not receive `RequestFailure::Obsolete` after dropping the \ + response receiver.", + ); + }, + } + }, + Err(oneshot::Canceled) => { + trace!( + target: LOG_TARGET, + "Request to peer {peer_id:?} failed due to oneshot being canceled.", + ); + self.network_service + .disconnect_peer(peer_id, self.block_announce_protocol_name.clone()); + }, + } + } + + /// Returns the number of peers we're connected to and that are being queried. + fn num_active_peers(&self) -> usize { + self.pending_responses.len() + } + + /// Get config for the block announcement protocol + fn get_block_announce_proto_config( + protocol_id: ProtocolId, + fork_id: &Option, + roles: Roles, + best_number: NumberFor, + best_hash: B::Hash, + genesis_hash: B::Hash, + ) -> NonDefaultSetConfig { + let block_announces_protocol = { + let genesis_hash = genesis_hash.as_ref(); + if let Some(ref fork_id) = fork_id { + format!( + "/{}/{}/block-announces/1", + array_bytes::bytes2hex("", genesis_hash), + fork_id + ) + } else { + format!("/{}/block-announces/1", array_bytes::bytes2hex("", genesis_hash)) + } + }; + + NonDefaultSetConfig { + notifications_protocol: block_announces_protocol.into(), + fallback_names: iter::once( + format!("/{}/block-announces/1", protocol_id.as_ref()).into(), + ) + .collect(), + max_notification_size: MAX_BLOCK_ANNOUNCE_SIZE, + handshake: Some(NotificationHandshake::new(BlockAnnouncesHandshake::::build( + roles, + best_number, + best_hash, + genesis_hash, + ))), + // NOTE: `set_config` will be ignored by `protocol.rs` as the block announcement + // protocol is still hardcoded into the peerset. + set_config: SetConfig { + in_peers: 0, + out_peers: 0, + reserved_nodes: Vec::new(), + non_reserved_mode: NonReservedPeerMode::Deny, + }, + } + } } diff --git a/substrate/client/network/sync/src/lib.rs b/substrate/client/network/sync/src/lib.rs index 20c0034966d5..ff00e8f90f82 100644 --- a/substrate/client/network/sync/src/lib.rs +++ b/substrate/client/network/sync/src/lib.rs @@ -29,44 +29,31 @@ //! order to update it. use crate::{ - block_relay_protocol::{BlockDownloader, BlockResponseError}, blocks::BlockCollection, - schema::v1::{StateRequest, StateResponse}, + schema::v1::StateResponse, state::StateSync, warp::{WarpProofImportResult, WarpSync, WarpSyncConfig}, }; use codec::Encode; use extra_requests::ExtraRequests; -use futures::{channel::oneshot, task::Poll, Future, FutureExt}; -use libp2p::{request_response::OutboundFailure, PeerId}; +use libp2p::PeerId; use log::{debug, error, info, trace, warn}; -use prost::Message; use prometheus_endpoint::{register, Counter, PrometheusError, Registry, U64}; use sc_client_api::{BlockBackend, ProofProvider}; use sc_consensus::{ import_queue::ImportQueueService, BlockImportError, BlockImportStatus, IncomingBlock, }; -use sc_network::{ - config::{ - NonDefaultSetConfig, NonReservedPeerMode, NotificationHandshake, ProtocolId, SetConfig, - }, - request_responses::{IfDisconnected, RequestFailure}, - types::ProtocolName, -}; -use sc_network_common::{ - role::Roles, - sync::{ - message::{ - BlockAnnounce, BlockAnnouncesHandshake, BlockAttributes, BlockData, BlockRequest, - BlockResponse, Direction, FromBlock, - }, - warp::{EncodedProof, WarpProofRequest, WarpSyncPhase, WarpSyncProgress}, - BadPeer, ChainSync as ChainSyncT, ImportResult, Metrics, OnBlockData, OnBlockJustification, - OnStateData, OpaqueStateRequest, OpaqueStateResponse, PeerInfo, PeerRequest, SyncMode, - SyncState, SyncStatus, +use sc_network::types::ProtocolName; +use sc_network_common::sync::{ + message::{ + BlockAnnounce, BlockAttributes, BlockData, BlockRequest, BlockResponse, Direction, + FromBlock, }, + warp::{EncodedProof, WarpProofRequest, WarpSyncPhase, WarpSyncProgress}, + BadPeer, ChainSync as ChainSyncT, Metrics, OnBlockData, OnBlockJustification, OnStateData, + OpaqueStateRequest, OpaqueStateResponse, PeerInfo, SyncMode, SyncState, SyncStatus, }; use sp_arithmetic::traits::Saturating; use sp_blockchain::{Error as ClientError, HeaderBackend, HeaderMetadata}; @@ -81,9 +68,7 @@ use sp_runtime::{ use std::{ collections::{HashMap, HashSet}, - iter, ops::Range, - pin::Pin, sync::Arc, }; @@ -92,6 +77,7 @@ pub use service::chain_sync::SyncingService; mod block_announce_validator; mod extra_requests; mod futures_stream; +mod pending_responses; mod schema; pub mod block_relay_protocol; @@ -131,9 +117,6 @@ const MAJOR_SYNC_BLOCKS: u8 = 5; /// Number of peers that need to be connected before warp sync is started. const MIN_PEERS_TO_START_WARP_SYNC: usize = 3; -/// Maximum allowed size for a block announce. -const MAX_BLOCK_ANNOUNCE_SIZE: u64 = 1024 * 1024; - /// Maximum blocks per response. pub(crate) const MAX_BLOCKS_IN_RESPONSE: usize = 128; @@ -170,18 +153,6 @@ mod rep { /// Peer response data does not have requested bits. pub const BAD_RESPONSE: Rep = Rep::new(-(1 << 12), "Incomplete response"); - - /// Reputation change when a peer doesn't respond in time to our messages. - pub const TIMEOUT: Rep = Rep::new(-(1 << 10), "Request timeout"); - - /// Peer is on unsupported protocol version. - pub const BAD_PROTOCOL: Rep = Rep::new_fatal("Unsupported protocol"); - - /// Reputation change when a peer refuses a request. - pub const REFUSED: Rep = Rep::new(-(1 << 10), "Request refused"); - - /// We received a message that failed to decode. - pub const BAD_MESSAGE: Rep = Rep::new(-(1 << 12), "Bad message"); } enum AllowedRequests { @@ -261,17 +232,12 @@ struct GapSync { target: NumberFor, } -type PendingResponse = Pin< - Box< - dyn Future< - Output = ( - PeerId, - PeerRequest, - Result, RequestFailure>, oneshot::Canceled>, - ), - > + Send, - >, ->; +/// An event used to notify [`engine::SyncingEngine`] if we want to perform a block request +/// or drop an obsolete pending response. +enum BlockRequestEvent { + SendRequest { peer_id: PeerId, request: BlockRequest }, + RemoveStale { peer_id: PeerId }, +} /// The main data structure which contains all the state for a chains /// active syncing strategy. @@ -322,14 +288,6 @@ pub struct ChainSync { network_service: service::network::NetworkServiceHandle, /// Protocol name used for block announcements block_announce_protocol_name: ProtocolName, - /// Block downloader stub - block_downloader: Arc>, - /// Protocol name used to send out state requests - state_request_protocol_name: ProtocolName, - /// Protocol name used to send out warp sync requests - warp_sync_protocol_name: Option, - /// Pending responses - pending_responses: HashMap>, /// Handle to import queue. import_queue: Box>, /// Metrics. @@ -491,10 +449,6 @@ where self.peers.len() } - fn num_active_peers(&self) -> usize { - self.pending_responses.len() - } - fn new_peer( &mut self, who: PeerId, @@ -1145,7 +1099,6 @@ where gap_sync.blocks.clear_peer_download(who) } self.peers.remove(who); - self.pending_responses.remove(who); self.extra_justifications.peer_disconnected(who); self.allowed_requests.set_all(); self.fork_targets.retain(|_, target| { @@ -1168,36 +1121,6 @@ where justifications: self.extra_justifications.metrics(), } } - - fn poll(&mut self, cx: &mut std::task::Context) -> Poll<()> { - self.process_outbound_requests(); - - while let Poll::Ready(result) = self.poll_pending_responses(cx) { - match result { - ImportResult::BlockImport(origin, blocks) => self.import_blocks(origin, blocks), - ImportResult::JustificationImport(who, hash, number, justifications) => - self.import_justifications(who, hash, number, justifications), - } - } - - Poll::Pending - } - - fn send_block_request(&mut self, who: PeerId, request: BlockRequest) { - if self.peers.contains_key(&who) { - let downloader = self.block_downloader.clone(); - self.pending_responses.insert( - who, - Box::pin(async move { - ( - who, - PeerRequest::Block(request.clone()), - downloader.download_blocks(who, request).await, - ) - }), - ); - } - } } impl ChainSync @@ -1216,32 +1139,14 @@ where pub fn new( mode: SyncMode, client: Arc, - protocol_id: ProtocolId, - fork_id: &Option, - roles: Roles, + block_announce_protocol_name: ProtocolName, max_parallel_downloads: u32, max_blocks_per_request: u32, warp_sync_config: Option>, metrics_registry: Option<&Registry>, network_service: service::network::NetworkServiceHandle, import_queue: Box>, - block_downloader: Arc>, - state_request_protocol_name: ProtocolName, - warp_sync_protocol_name: Option, - ) -> Result<(Self, NonDefaultSetConfig), ClientError> { - let block_announce_config = Self::get_block_announce_proto_config( - protocol_id, - fork_id, - roles, - client.info().best_number, - client.info().best_hash, - client - .block_hash(Zero::zero()) - .ok() - .flatten() - .expect("Genesis block exists; qed"), - ); - + ) -> Result { let mut sync = Self { client, peers: HashMap::new(), @@ -1261,16 +1166,9 @@ where import_existing: false, gap_sync: None, network_service, - block_downloader, - state_request_protocol_name, warp_sync_config, warp_sync_target_block_header: None, - warp_sync_protocol_name, - block_announce_protocol_name: block_announce_config - .notifications_protocol - .clone() - .into(), - pending_responses: HashMap::new(), + block_announce_protocol_name, import_queue, metrics: if let Some(r) = &metrics_registry { match SyncingMetrics::register(r) { @@ -1289,7 +1187,7 @@ where }; sync.reset_sync_start_point()?; - Ok((sync, block_announce_config)) + Ok(sync) } /// Returns the median seen block number. @@ -1413,7 +1311,7 @@ where /// Restart the sync process. This will reset all pending block requests and return an iterator /// of new block requests to make to peers. Peers that were downloading finality data (i.e. /// their state was `DownloadingJustification`) are unaffected and will stay in the same state. - fn restart(&mut self) -> impl Iterator), BadPeer>> + '_ { + fn restart(&mut self) -> impl Iterator, BadPeer>> + '_ { self.blocks.clear(); if let Err(e) = self.reset_sync_start_point() { warn!(target: LOG_TARGET, "💔 Unable to restart sync: {e}"); @@ -1427,23 +1325,23 @@ where ); let old_peers = std::mem::take(&mut self.peers); - old_peers.into_iter().filter_map(move |(id, mut p)| { + old_peers.into_iter().filter_map(move |(peer_id, mut p)| { // peers that were downloading justifications // should be kept in that state. if let PeerSyncState::DownloadingJustification(_) = p.state { // We make sure our commmon number is at least something we have. p.common_number = self.best_queued_number; - self.peers.insert(id, p); + self.peers.insert(peer_id, p); return None } - // since the request is not a justification, remove it from pending responses - self.pending_responses.remove(&id); - // handle peers that were in other states. - match self.new_peer(id, p.best_hash, p.best_number) { - Ok(None) => None, - Ok(Some(x)) => Some(Ok((id, x))), + match self.new_peer(peer_id, p.best_hash, p.best_number) { + // since the request is not a justification, remove it from pending responses + Ok(None) => Some(Ok(BlockRequestEvent::RemoveStale { peer_id })), + // update the request if the new one is available + Ok(Some(request)) => Some(Ok(BlockRequestEvent::SendRequest { peer_id, request })), + // this implies that we need to drop pending response from the peer Err(e) => Some(Err(e)), } }) @@ -1524,6 +1422,11 @@ where .any(|(_, p)| p.state == PeerSyncState::DownloadingStale(*hash)) } + /// Is the peer know to the sync state machine? + pub fn is_peer_known(&self, peer_id: &PeerId) -> bool { + self.peers.contains_key(peer_id) + } + /// Get the set of downloaded blocks that are ready to be queued for import. fn ready_blocks(&mut self) -> Vec> { self.blocks @@ -1588,117 +1491,12 @@ where None } - /// Get config for the block announcement protocol - pub fn get_block_announce_proto_config( - protocol_id: ProtocolId, - fork_id: &Option, - roles: Roles, - best_number: NumberFor, - best_hash: B::Hash, - genesis_hash: B::Hash, - ) -> NonDefaultSetConfig { - let block_announces_protocol = { - let genesis_hash = genesis_hash.as_ref(); - if let Some(ref fork_id) = fork_id { - format!( - "/{}/{}/block-announces/1", - array_bytes::bytes2hex("", genesis_hash), - fork_id - ) - } else { - format!("/{}/block-announces/1", array_bytes::bytes2hex("", genesis_hash)) - } - }; - - NonDefaultSetConfig { - notifications_protocol: block_announces_protocol.into(), - fallback_names: iter::once( - format!("/{}/block-announces/1", protocol_id.as_ref()).into(), - ) - .collect(), - max_notification_size: MAX_BLOCK_ANNOUNCE_SIZE, - handshake: Some(NotificationHandshake::new(BlockAnnouncesHandshake::::build( - roles, - best_number, - best_hash, - genesis_hash, - ))), - // NOTE: `set_config` will be ignored by `protocol.rs` as the block announcement - // protocol is still hardcoded into the peerset. - set_config: SetConfig { - in_peers: 0, - out_peers: 0, - reserved_nodes: Vec::new(), - non_reserved_mode: NonReservedPeerMode::Deny, - }, - } - } - - fn decode_state_response(response: &[u8]) -> Result { - let response = StateResponse::decode(response) - .map_err(|error| format!("Failed to decode state response: {error}"))?; - - Ok(OpaqueStateResponse(Box::new(response))) - } - - fn send_state_request(&mut self, who: PeerId, request: OpaqueStateRequest) { - let (tx, rx) = oneshot::channel(); - - if self.peers.contains_key(&who) { - self.pending_responses - .insert(who, Box::pin(async move { (who, PeerRequest::State, rx.await) })); - } - - match self.encode_state_request(&request) { - Ok(data) => { - self.network_service.start_request( - who, - self.state_request_protocol_name.clone(), - data, - tx, - IfDisconnected::ImmediateError, - ); - }, - Err(err) => { - log::warn!( - target: LOG_TARGET, - "Failed to encode state request {request:?}: {err:?}", - ); - }, - } - } - - fn send_warp_sync_request(&mut self, who: PeerId, request: WarpProofRequest) { - let (tx, rx) = oneshot::channel(); - - if self.peers.contains_key(&who) { - self.pending_responses - .insert(who, Box::pin(async move { (who, PeerRequest::WarpProof, rx.await) })); - } - - match &self.warp_sync_protocol_name { - Some(name) => self.network_service.start_request( - who, - name.clone(), - request.encode(), - tx, - IfDisconnected::ImmediateError, - ), - None => { - log::warn!( - target: LOG_TARGET, - "Trying to send warp sync request when no protocol is configured {request:?}", - ); - }, - } - } - - fn on_block_response( + pub(crate) fn on_block_response( &mut self, peer_id: PeerId, request: BlockRequest, blocks: Vec>, - ) -> Option> { + ) -> Option<(PeerId, BlockRequest)> { let block_response = BlockResponse:: { id: request.id, blocks }; let blocks_range = || match ( @@ -1741,10 +1539,7 @@ where self.import_blocks(origin, blocks); None }, - Ok(OnBlockData::Request(peer, req)) => { - self.send_block_request(peer, req); - None - }, + Ok(OnBlockData::Request(peer, req)) => Some((peer, req)), Ok(OnBlockData::Continue) => None, Err(BadPeer(id, repu)) => { self.network_service @@ -1756,20 +1551,14 @@ where } } - pub fn on_state_response( - &mut self, - peer_id: PeerId, - response: OpaqueStateResponse, - ) -> Option> { + pub fn on_state_response(&mut self, peer_id: PeerId, response: OpaqueStateResponse) { match self.on_state_data(&peer_id, response) { - Ok(OnStateData::Import(origin, block)) => - Some(ImportResult::BlockImport(origin, vec![block])), - Ok(OnStateData::Continue) => None, + Ok(OnStateData::Import(origin, block)) => self.import_blocks(origin, vec![block]), + Ok(OnStateData::Continue) => {}, Err(BadPeer(id, repu)) => { self.network_service .disconnect_peer(id, self.block_announce_protocol_name.clone()); self.network_service.report_peer(id, repu); - None }, } } @@ -1782,165 +1571,10 @@ where } } - fn process_outbound_requests(&mut self) { - for (id, request) in self.block_requests() { - self.send_block_request(id, request); - } - - if let Some((id, request)) = self.state_request() { - self.send_state_request(id, request); - } - - for (id, request) in self.justification_requests().collect::>() { - self.send_block_request(id, request); - } - - if let Some((id, request)) = self.warp_sync_request() { - self.send_warp_sync_request(id, request); - } - } - - fn poll_pending_responses(&mut self, cx: &mut std::task::Context) -> Poll> { - let ready_responses = self - .pending_responses - .values_mut() - .filter_map(|future| match future.poll_unpin(cx) { - Poll::Pending => None, - Poll::Ready(result) => Some(result), - }) - .collect::>(); - - for (id, request, response) in ready_responses { - self.pending_responses - .remove(&id) - .expect("Logic error: peer id from pending response is missing in the map."); - - match response { - Ok(Ok(resp)) => match request { - PeerRequest::Block(req) => { - match self.block_downloader.block_response_into_blocks(&req, resp) { - Ok(blocks) => { - if let Some(import) = self.on_block_response(id, req, blocks) { - return Poll::Ready(import) - } - }, - Err(BlockResponseError::DecodeFailed(e)) => { - debug!( - target: LOG_TARGET, - "Failed to decode block response from peer {:?}: {:?}.", - id, - e - ); - self.network_service.report_peer(id, rep::BAD_MESSAGE); - self.network_service - .disconnect_peer(id, self.block_announce_protocol_name.clone()); - continue - }, - Err(BlockResponseError::ExtractionFailed(e)) => { - debug!( - target: LOG_TARGET, - "Failed to extract blocks from peer response {:?}: {:?}.", - id, - e - ); - self.network_service.report_peer(id, rep::BAD_MESSAGE); - continue - }, - } - }, - PeerRequest::State => { - let response = match Self::decode_state_response(&resp[..]) { - Ok(proto) => proto, - Err(e) => { - debug!( - target: LOG_TARGET, - "Failed to decode state response from peer {id:?}: {e:?}.", - ); - self.network_service.report_peer(id, rep::BAD_MESSAGE); - self.network_service - .disconnect_peer(id, self.block_announce_protocol_name.clone()); - continue - }, - }; - - if let Some(import) = self.on_state_response(id, response) { - return Poll::Ready(import) - } - }, - PeerRequest::WarpProof => { - self.on_warp_sync_response(id, EncodedProof(resp)); - }, - }, - Ok(Err(e)) => { - debug!(target: LOG_TARGET, "Request to peer {id:?} failed: {e:?}."); - - match e { - RequestFailure::Network(OutboundFailure::Timeout) => { - self.network_service.report_peer(id, rep::TIMEOUT); - self.network_service - .disconnect_peer(id, self.block_announce_protocol_name.clone()); - }, - RequestFailure::Network(OutboundFailure::UnsupportedProtocols) => { - self.network_service.report_peer(id, rep::BAD_PROTOCOL); - self.network_service - .disconnect_peer(id, self.block_announce_protocol_name.clone()); - }, - RequestFailure::Network(OutboundFailure::DialFailure) => { - self.network_service - .disconnect_peer(id, self.block_announce_protocol_name.clone()); - }, - RequestFailure::Refused => { - self.network_service.report_peer(id, rep::REFUSED); - self.network_service - .disconnect_peer(id, self.block_announce_protocol_name.clone()); - }, - RequestFailure::Network(OutboundFailure::ConnectionClosed) | - RequestFailure::NotConnected => { - self.network_service - .disconnect_peer(id, self.block_announce_protocol_name.clone()); - }, - RequestFailure::UnknownProtocol => { - debug_assert!(false, "Block request protocol should always be known."); - }, - RequestFailure::Obsolete => { - debug_assert!( - false, - "Can not receive `RequestFailure::Obsolete` after dropping the \ - response receiver.", - ); - }, - } - }, - Err(oneshot::Canceled) => { - trace!( - target: LOG_TARGET, - "Request to peer {id:?} failed due to oneshot being canceled.", - ); - self.network_service - .disconnect_peer(id, self.block_announce_protocol_name.clone()); - }, - } - } - - Poll::Pending - } - - fn encode_state_request(&self, request: &OpaqueStateRequest) -> Result, String> { - let request: &StateRequest = request.0.downcast_ref().ok_or_else(|| { - "Failed to downcast opaque state response during encoding, this is an \ - implementation bug." - .to_string() - })?; - - Ok(request.encode_to_vec()) - } - - fn justification_requests<'a>( - &'a mut self, - ) -> Box)> + 'a> { + fn justification_requests(&mut self) -> Vec<(PeerId, BlockRequest)> { let peers = &mut self.peers; let mut matcher = self.extra_justifications.matcher(); - Box::new(std::iter::from_fn(move || { + std::iter::from_fn(move || { if let Some((peer, request)) = matcher.next(peers) { peers .get_mut(&peer) @@ -1959,7 +1593,8 @@ where } else { None } - })) + }) + .collect() } fn block_requests(&mut self) -> Vec<(PeerId, BlockRequest)> { @@ -2086,7 +1721,6 @@ where } }) .collect() - // Box::new(iter) } fn state_request(&mut self) -> Option<(PeerId, OpaqueStateRequest)> { @@ -2288,13 +1922,14 @@ where /// A batch of blocks have been processed, with or without errors. /// /// Call this when a batch of blocks have been processed by the import - /// queue, with or without errors. + /// queue, with or without errors. If an error is returned, the pending response + /// from the peer must be dropped. fn on_blocks_processed( &mut self, imported: usize, count: usize, results: Vec<(Result>, BlockImportError>, B::Hash)>, - ) -> Box), BadPeer>>> { + ) -> Box, BadPeer>>> { trace!(target: LOG_TARGET, "Imported {imported} of {count}"); let mut output = Vec::new(); @@ -2799,13 +2434,10 @@ fn validate_blocks( #[cfg(test)] mod test { use super::*; - use crate::{mock::MockBlockDownloader, service::network::NetworkServiceProvider}; + use crate::service::network::NetworkServiceProvider; use futures::executor::block_on; use sc_block_builder::BlockBuilderProvider; - use sc_network_common::{ - role::Role, - sync::message::{BlockAnnounce, BlockData, BlockState, FromBlock}, - }; + use sc_network_common::sync::message::{BlockAnnounce, BlockData, BlockState, FromBlock}; use sp_blockchain::HeaderBackend; use substrate_test_runtime_client::{ runtime::{Block, Hash, Header}, @@ -2825,21 +2457,16 @@ mod test { let import_queue = Box::new(sc_consensus::import_queue::mock::MockImportQueueHandle::new()); let (_chain_sync_network_provider, chain_sync_network_handle) = NetworkServiceProvider::new(); - let (mut sync, _) = ChainSync::new( + let mut sync = ChainSync::new( SyncMode::Full, client.clone(), - ProtocolId::from("test-protocol-name"), - &Some(String::from("test-fork-id")), - Roles::from(&Role::Full), + ProtocolName::from("test-block-announce-protocol"), 1, 64, None, None, chain_sync_network_handle, import_queue, - Arc::new(MockBlockDownloader::new()), - ProtocolName::from("state-request"), - None, ) .unwrap(); @@ -2857,7 +2484,8 @@ mod test { // the justification request should be scheduled to that peer assert!(sync .justification_requests() - .any(|(who, request)| { who == peer_id && request.from == FromBlock::Hash(a1_hash) })); + .iter() + .any(|(who, request)| { *who == peer_id && request.from == FromBlock::Hash(a1_hash) })); // there are no extra pending requests assert_eq!(sync.extra_justifications.pending_requests().count(), 0); @@ -2891,21 +2519,16 @@ mod test { let (_chain_sync_network_provider, chain_sync_network_handle) = NetworkServiceProvider::new(); - let (mut sync, _) = ChainSync::new( + let mut sync = ChainSync::new( SyncMode::Full, client.clone(), - ProtocolId::from("test-protocol-name"), - &Some(String::from("test-fork-id")), - Roles::from(&Role::Full), + ProtocolName::from("test-block-announce-protocol"), 1, 64, None, None, chain_sync_network_handle, import_queue, - Arc::new(MockBlockDownloader::new()), - ProtocolName::from("state-request"), - None, ) .unwrap(); @@ -2944,8 +2567,8 @@ mod test { // the justification request should be scheduled to the // new peer which is at the given block - assert!(sync.justification_requests().any(|(p, r)| { - p == peer_id3 && + assert!(sync.justification_requests().iter().any(|(p, r)| { + *p == peer_id3 && r.fields == BlockAttributes::JUSTIFICATION && r.from == FromBlock::Hash(b1_hash) })); @@ -2959,9 +2582,11 @@ mod test { let block_requests = sync.restart(); // which should make us send out block requests to the first two peers - assert!(block_requests - .map(|r| r.unwrap()) - .all(|(p, _)| { p == peer_id1 || p == peer_id2 })); + assert!(block_requests.map(|r| r.unwrap()).all(|event| match event { + BlockRequestEvent::SendRequest { peer_id, .. } => + peer_id == peer_id1 || peer_id == peer_id2, + BlockRequestEvent::RemoveStale { .. } => false, + })); // peer 3 should be unaffected it was downloading finality data assert_eq!( @@ -3065,21 +2690,16 @@ mod test { let (_chain_sync_network_provider, chain_sync_network_handle) = NetworkServiceProvider::new(); - let (mut sync, _) = ChainSync::new( + let mut sync = ChainSync::new( SyncMode::Full, client.clone(), - ProtocolId::from("test-protocol-name"), - &Some(String::from("test-fork-id")), - Roles::from(&Role::Full), + ProtocolName::from("test-block-announce-protocol"), 5, 64, None, None, chain_sync_network_handle, import_queue, - Arc::new(MockBlockDownloader::new()), - ProtocolName::from("state-request"), - None, ) .unwrap(); @@ -3191,21 +2811,16 @@ mod test { NetworkServiceProvider::new(); let info = client.info(); - let (mut sync, _) = ChainSync::new( + let mut sync = ChainSync::new( SyncMode::Full, client.clone(), - ProtocolId::from("test-protocol-name"), - &Some(String::from("test-fork-id")), - Roles::from(&Role::Full), + ProtocolName::from("test-block-announce-protocol"), 5, 64, None, None, chain_sync_network_handle, import_queue, - Arc::new(MockBlockDownloader::new()), - ProtocolName::from("state-request"), - None, ) .unwrap(); @@ -3348,21 +2963,16 @@ mod test { let info = client.info(); - let (mut sync, _) = ChainSync::new( + let mut sync = ChainSync::new( SyncMode::Full, client.clone(), - ProtocolId::from("test-protocol-name"), - &Some(String::from("test-fork-id")), - Roles::from(&Role::Full), + ProtocolName::from("test-block-announce-protocol"), 5, 64, None, None, chain_sync_network_handle, import_queue, - Arc::new(MockBlockDownloader::new()), - ProtocolName::from("state-request"), - None, ) .unwrap(); @@ -3490,21 +3100,16 @@ mod test { let info = client.info(); - let (mut sync, _) = ChainSync::new( + let mut sync = ChainSync::new( SyncMode::Full, client.clone(), - ProtocolId::from("test-protocol-name"), - &Some(String::from("test-fork-id")), - Roles::from(&Role::Full), + ProtocolName::from("test-block-announce-protocol"), 5, 64, None, None, chain_sync_network_handle, import_queue, - Arc::new(MockBlockDownloader::new()), - ProtocolName::from("state-request"), - None, ) .unwrap(); @@ -3634,21 +3239,16 @@ mod test { let mut client = Arc::new(TestClientBuilder::new().build()); let blocks = (0..3).map(|_| build_block(&mut client, None, false)).collect::>(); - let (mut sync, _) = ChainSync::new( + let mut sync = ChainSync::new( SyncMode::Full, client.clone(), - ProtocolId::from("test-protocol-name"), - &Some(String::from("test-fork-id")), - Roles::from(&Role::Full), + ProtocolName::from("test-block-announce-protocol"), 1, 64, None, None, chain_sync_network_handle, import_queue, - Arc::new(MockBlockDownloader::new()), - ProtocolName::from("state-request"), - None, ) .unwrap(); @@ -3679,21 +3279,16 @@ mod test { let empty_client = Arc::new(TestClientBuilder::new().build()); - let (mut sync, _) = ChainSync::new( + let mut sync = ChainSync::new( SyncMode::Full, empty_client.clone(), - ProtocolId::from("test-protocol-name"), - &Some(String::from("test-fork-id")), - Roles::from(&Role::Full), + ProtocolName::from("test-block-announce-protocol"), 1, 64, None, None, chain_sync_network_handle, import_queue, - Arc::new(MockBlockDownloader::new()), - ProtocolName::from("state-request"), - None, ) .unwrap(); @@ -3731,21 +3326,16 @@ mod test { let import_queue = Box::new(sc_consensus::import_queue::mock::MockImportQueueHandle::new()); let (_chain_sync_network_provider, chain_sync_network_handle) = NetworkServiceProvider::new(); - let (mut sync, _) = ChainSync::new( + let mut sync = ChainSync::new( SyncMode::Full, client.clone(), - ProtocolId::from("test-protocol-name"), - &Some(String::from("test-fork-id")), - Roles::from(&Role::Full), + ProtocolName::from("test-block-announce-protocol"), 1, 64, None, None, chain_sync_network_handle, import_queue, - Arc::new(MockBlockDownloader::new()), - ProtocolName::from("state-request"), - None, ) .unwrap(); @@ -3766,10 +3356,14 @@ mod test { // add new peer and request blocks from them sync.new_peer(peers[0], Hash::random(), 42).unwrap(); + // we don't actually perform any requests, just keep track of peers waiting for a response + let mut pending_responses = HashSet::new(); + // we wil send block requests to these peers // for these blocks we don't know about - for (peer, request) in sync.block_requests() { - sync.send_block_request(peer, request); + for (peer, _request) in sync.block_requests() { + // "send" request + pending_responses.insert(peer); } // add a new peer at a known block @@ -3780,10 +3374,11 @@ mod test { // the justification request should be scheduled to the // new peer which is at the given block - let mut requests = sync.justification_requests().collect::>(); + let mut requests = sync.justification_requests(); assert_eq!(requests.len(), 1); - let (peer, request) = requests.remove(0); - sync.send_block_request(peer, request); + let (peer, _request) = requests.remove(0); + // "send" request + assert!(pending_responses.insert(peer)); assert!(!std::matches!( sync.peers.get(&peers[0]).unwrap().state, @@ -3793,18 +3388,37 @@ mod test { sync.peers.get(&peers[1]).unwrap().state, PeerSyncState::DownloadingJustification(b1_hash), ); - assert_eq!(sync.pending_responses.len(), 2); - - let requests = sync.restart().collect::>(); - assert!(requests.iter().any(|res| res.as_ref().unwrap().0 == peers[0])); + assert_eq!(pending_responses.len(), 2); + + // restart sync + let request_events = sync.restart().collect::>(); + for event in request_events.iter() { + match event.as_ref().unwrap() { + BlockRequestEvent::RemoveStale { peer_id } => { + pending_responses.remove(&peer_id); + }, + BlockRequestEvent::SendRequest { peer_id, .. } => { + // we drop obsolete response, but don't register a new request, it's checked in + // the `assert!` below + pending_responses.remove(&peer_id); + }, + } + } + assert!(request_events.iter().any(|event| { + match event.as_ref().unwrap() { + BlockRequestEvent::RemoveStale { .. } => false, + BlockRequestEvent::SendRequest { peer_id, .. } => peer_id == &peers[0], + } + })); - assert_eq!(sync.pending_responses.len(), 1); - assert!(sync.pending_responses.get(&peers[1]).is_some()); + assert_eq!(pending_responses.len(), 1); + assert!(pending_responses.contains(&peers[1])); assert_eq!( sync.peers.get(&peers[1]).unwrap().state, PeerSyncState::DownloadingJustification(b1_hash), ); sync.peer_disconnected(&peers[1]); - assert_eq!(sync.pending_responses.len(), 0); + pending_responses.remove(&peers[1]); + assert_eq!(pending_responses.len(), 0); } } diff --git a/substrate/client/network/sync/src/mock.rs b/substrate/client/network/sync/src/mock.rs index 859f9fb9c54b..b51eec8d1499 100644 --- a/substrate/client/network/sync/src/mock.rs +++ b/substrate/client/network/sync/src/mock.rs @@ -20,7 +20,7 @@ use crate::block_relay_protocol::{BlockDownloader as BlockDownloaderT, BlockResponseError}; -use futures::{channel::oneshot, task::Poll}; +use futures::channel::oneshot; use libp2p::PeerId; use sc_network::RequestFailure; use sc_network_common::sync::{ @@ -39,7 +39,6 @@ mockall::mock! { fn num_sync_requests(&self) -> usize; fn num_downloaded_blocks(&self) -> usize; fn num_peers(&self) -> usize; - fn num_active_peers(&self) -> usize; fn new_peer( &mut self, who: PeerId, @@ -81,15 +80,6 @@ mockall::mock! { ); fn peer_disconnected(&mut self, who: &PeerId); fn metrics(&self) -> Metrics; - fn poll<'a>( - &mut self, - cx: &mut std::task::Context<'a>, - ) -> Poll<()>; - fn send_block_request( - &mut self, - who: PeerId, - request: BlockRequest, - ); } } diff --git a/substrate/client/network/sync/src/pending_responses.rs b/substrate/client/network/sync/src/pending_responses.rs new file mode 100644 index 000000000000..c863267e7808 --- /dev/null +++ b/substrate/client/network/sync/src/pending_responses.rs @@ -0,0 +1,114 @@ +// This file is part of Substrate. + +// Copyright (C) Parity Technologies (UK) Ltd. +// SPDX-License-Identifier: GPL-3.0-or-later WITH Classpath-exception-2.0 + +// This program is free software: you can redistribute it and/or modify +// it under the terms of the GNU General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. + +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. + +// You should have received a copy of the GNU General Public License +// along with this program. If not, see . + +//! [`PendingResponses`] is responsible for keeping track of pending responses and +//! polling them. + +use futures::{ + channel::oneshot, + future::BoxFuture, + stream::{BoxStream, Stream}, + FutureExt, StreamExt, +}; +use libp2p::PeerId; +use log::error; +use sc_network::request_responses::RequestFailure; +use sc_network_common::sync::PeerRequest; +use sp_runtime::traits::Block as BlockT; +use std::task::{Context, Poll}; +use tokio_stream::StreamMap; + +/// Response result. +type ResponseResult = Result, RequestFailure>, oneshot::Canceled>; + +/// A future yielding [`ResponseResult`]. +type ResponseFuture = BoxFuture<'static, ResponseResult>; + +/// An event we receive once a pending response future resolves. +pub(crate) struct ResponseEvent { + pub peer_id: PeerId, + pub request: PeerRequest, + pub response: ResponseResult, +} + +/// Stream taking care of polling pending responses. +pub(crate) struct PendingResponses { + /// Pending responses + pending_responses: StreamMap, ResponseResult)>>, +} + +impl PendingResponses { + pub fn new() -> Self { + Self { pending_responses: StreamMap::new() } + } + + pub fn insert( + &mut self, + peer_id: PeerId, + request: PeerRequest, + response_future: ResponseFuture, + ) { + let request_type = request.get_type(); + + if self + .pending_responses + .insert( + peer_id, + Box::pin(async move { (request, response_future.await) }.into_stream()), + ) + .is_some() + { + error!( + target: crate::LOG_TARGET, + "Discarded pending response from peer {peer_id}, request type: {request_type:?}.", + ); + debug_assert!(false); + } + } + + pub fn remove(&mut self, peer_id: &PeerId) -> bool { + self.pending_responses.remove(peer_id).is_some() + } + + pub fn len(&self) -> usize { + self.pending_responses.len() + } +} + +impl Unpin for PendingResponses {} + +impl Stream for PendingResponses { + type Item = ResponseEvent; + + fn poll_next( + mut self: std::pin::Pin<&mut Self>, + cx: &mut Context<'_>, + ) -> Poll> { + match futures::ready!(self.pending_responses.poll_next_unpin(cx)) { + Some((peer_id, (request, response))) => { + // We need to manually remove the stream, because `StreamMap` doesn't know yet that + // it's going to yield `None`, so may not remove it before the next request is made + // to the same peer. + self.pending_responses.remove(&peer_id); + + Poll::Ready(Some(ResponseEvent { peer_id, request, response })) + }, + None => Poll::Ready(None), + } + } +} diff --git a/substrate/client/rpc-spec-v2/Cargo.toml b/substrate/client/rpc-spec-v2/Cargo.toml index c93006753afb..f4068d1bc594 100644 --- a/substrate/client/rpc-spec-v2/Cargo.toml +++ b/substrate/client/rpc-spec-v2/Cargo.toml @@ -31,7 +31,7 @@ serde = "1.0" hex = "0.4" futures = "0.3.21" parking_lot = "0.12.1" -tokio-stream = { version = "0.1", features = ["sync"] } +tokio-stream = { version = "0.1.14", features = ["sync"] } tokio = { version = "1.22.0", features = ["sync"] } array-bytes = "6.1" log = "0.4.17" diff --git a/substrate/frame/support/procedural/src/no_bound/default.rs b/substrate/frame/support/procedural/src/no_bound/default.rs index da05f19e0f81..35d0eaeecf56 100644 --- a/substrate/frame/support/procedural/src/no_bound/default.rs +++ b/substrate/frame/support/procedural/src/no_bound/default.rs @@ -66,15 +66,12 @@ pub fn derive_default_no_bound(input: proc_macro::TokenStream) -> proc_macro::To .collect::>(); match &*default_variants { - [] => { - return syn::Error::new( - name.clone().span(), - // writing this as a regular string breaks rustfmt for some reason - r#"no default declared, make a variant default by placing `#[default]` above it"#, - ) - .into_compile_error() - .into() - }, + [] => return syn::Error::new( + name.clone().span(), + "no default declared, make a variant default by placing `#[default]` above it", + ) + .into_compile_error() + .into(), // only one variant with the #[default] attribute set [default_variant] => { let variant_attrs = default_variant diff --git a/substrate/frame/support/src/storage/storage_noop_guard.rs b/substrate/frame/support/src/storage/storage_noop_guard.rs index d00e6e18ecc4..c4d40fa99a35 100644 --- a/substrate/frame/support/src/storage/storage_noop_guard.rs +++ b/substrate/frame/support/src/storage/storage_noop_guard.rs @@ -37,15 +37,38 @@ /// }); /// ``` #[must_use] -pub struct StorageNoopGuard(sp_std::vec::Vec); +pub struct StorageNoopGuard<'a> { + storage_root: sp_std::vec::Vec, + error_message: &'a str, +} -impl Default for StorageNoopGuard { +impl<'a> Default for StorageNoopGuard<'a> { fn default() -> Self { - Self(sp_io::storage::root(sp_runtime::StateVersion::V1)) + Self { + storage_root: sp_io::storage::root(sp_runtime::StateVersion::V1), + error_message: "`StorageNoopGuard` detected an attempted storage change.", + } + } +} + +impl<'a> StorageNoopGuard<'a> { + /// Alias to `default()`. + pub fn new() -> Self { + Self::default() + } + + /// Creates a new [`StorageNoopGuard`] with a custom error message. + pub fn from_error_message(error_message: &'a str) -> Self { + Self { storage_root: sp_io::storage::root(sp_runtime::StateVersion::V1), error_message } + } + + /// Sets a custom error message for a [`StorageNoopGuard`]. + pub fn set_error_message(&mut self, error_message: &'a str) { + self.error_message = error_message; } } -impl Drop for StorageNoopGuard { +impl<'a> Drop for StorageNoopGuard<'a> { fn drop(&mut self) { // No need to double panic, eg. inside a test assertion failure. if sp_std::thread::panicking() { @@ -53,8 +76,9 @@ impl Drop for StorageNoopGuard { } assert_eq!( sp_io::storage::root(sp_runtime::StateVersion::V1), - self.0, - "StorageNoopGuard detected wrongful storage changes.", + self.storage_root, + "{}", + self.error_message, ); } } @@ -65,7 +89,7 @@ mod tests { use sp_io::TestExternalities; #[test] - #[should_panic(expected = "StorageNoopGuard detected wrongful storage changes.")] + #[should_panic(expected = "`StorageNoopGuard` detected an attempted storage change.")] fn storage_noop_guard_panics_on_changed() { TestExternalities::default().execute_with(|| { let _guard = StorageNoopGuard::default(); @@ -83,7 +107,7 @@ mod tests { } #[test] - #[should_panic(expected = "StorageNoopGuard detected wrongful storage changes.")] + #[should_panic(expected = "`StorageNoopGuard` detected an attempted storage change.")] fn storage_noop_guard_panics_on_early_drop() { TestExternalities::default().execute_with(|| { let guard = StorageNoopGuard::default(); @@ -111,4 +135,34 @@ mod tests { panic!("Something else"); }); } + + #[test] + #[should_panic(expected = "`StorageNoopGuard` found unexpected storage changes.")] + fn storage_noop_guard_panics_created_from_error_message() { + TestExternalities::default().execute_with(|| { + let _guard = StorageNoopGuard::from_error_message( + "`StorageNoopGuard` found unexpected storage changes.", + ); + frame_support::storage::unhashed::put(b"key", b"value"); + }); + } + + #[test] + #[should_panic(expected = "`StorageNoopGuard` found unexpected storage changes.")] + fn storage_noop_guard_panics_with_set_error_message() { + TestExternalities::default().execute_with(|| { + let mut guard = StorageNoopGuard::default(); + guard.set_error_message("`StorageNoopGuard` found unexpected storage changes."); + frame_support::storage::unhashed::put(b"key", b"value"); + }); + } + + #[test] + #[should_panic(expected = "`StorageNoopGuard` detected an attempted storage change.")] + fn storage_noop_guard_panics_new_alias() { + TestExternalities::default().execute_with(|| { + let _guard = StorageNoopGuard::new(); + frame_support::storage::unhashed::put(b"key", b"value"); + }); + } }