From 432b4f67a5fa2c6492c34edfcdc9c720a004d91d Mon Sep 17 00:00:00 2001 From: Dmitry Markin Date: Wed, 1 Nov 2023 17:26:09 +0200 Subject: [PATCH 01/12] Replace `futures_timer::Delay` with `tokio::time::Interval` --- substrate/client/network/sync/Cargo.toml | 1 + substrate/client/network/sync/src/engine.rs | 15 ++++++++++----- 2 files changed, 11 insertions(+), 5 deletions(-) diff --git a/substrate/client/network/sync/Cargo.toml b/substrate/client/network/sync/Cargo.toml index 39312cc4b327..038f993133d0 100644 --- a/substrate/client/network/sync/Cargo.toml +++ b/substrate/client/network/sync/Cargo.toml @@ -30,6 +30,7 @@ schnellru = "0.2.1" smallvec = "1.11.0" thiserror = "1.0" tokio-stream = "0.1.14" +tokio = { version = "1.32.0", features = ["time"] } fork-tree = { path = "../../../utils/fork-tree" } prometheus-endpoint = { package = "substrate-prometheus-endpoint", path = "../../../utils/prometheus" } sc-client-api = { path = "../../api" } diff --git a/substrate/client/network/sync/src/engine.rs b/substrate/client/network/sync/src/engine.rs index 0f689742bc58..294f563965bb 100644 --- a/substrate/client/network/sync/src/engine.rs +++ b/substrate/client/network/sync/src/engine.rs @@ -47,7 +47,6 @@ use futures::{ future::{BoxFuture, Fuse}, FutureExt, StreamExt, }; -use futures_timer::Delay; use libp2p::{request_response::OutboundFailure, PeerId}; use log::{debug, trace}; use prometheus_endpoint::{ @@ -56,6 +55,7 @@ use prometheus_endpoint::{ }; use prost::Message; use schnellru::{ByLength, LruMap}; +use tokio::time::{Interval, MissedTickBehavior}; use sc_client_api::{BlockBackend, HeaderBackend, ProofProvider}; use sc_consensus::import_queue::ImportQueueService; @@ -266,7 +266,7 @@ pub struct SyncingEngine { event_streams: Vec>, /// Interval at which we call `tick`. - tick_timeout: Delay, + tick_timeout: Interval, /// All connected peers. Contains both full and light node peers. peers: HashMap>, @@ -478,6 +478,12 @@ where let max_out_peers = net_config.network_config.default_peers_set.out_peers; let max_in_peers = (max_full_peers - max_out_peers) as usize; + let tick_timeout = { + let mut interval = tokio::time::interval(TICK_TIMEOUT); + interval.set_missed_tick_behavior(MissedTickBehavior::Delay); + interval + }; + Ok(( Self { roles, @@ -505,7 +511,7 @@ where num_in_peers: 0usize, max_in_peers, event_streams: Vec::new(), - tick_timeout: Delay::new(TICK_TIMEOUT), + tick_timeout, syncing_started: None, last_notification_io: Instant::now(), metrics: if let Some(r) = metrics_registry { @@ -700,9 +706,8 @@ where self.is_major_syncing .store(self.chain_sync.status().state.is_major_syncing(), Ordering::Relaxed); - while let Poll::Ready(()) = self.tick_timeout.poll_unpin(cx) { + while let Poll::Ready(_) = self.tick_timeout.poll_tick(cx) { self.report_metrics(); - self.tick_timeout.reset(TICK_TIMEOUT); // if `SyncingEngine` has just started, don't evict seemingly inactive peers right away // as they may not have produced blocks not because they've disconnected but because From c0d51e91fd99f338381b2cdb69e1c06bc73300d7 Mon Sep 17 00:00:00 2001 From: Dmitry Markin Date: Wed, 1 Nov 2023 17:49:11 +0200 Subject: [PATCH 02/12] Factor out handling commands and events --- substrate/client/network/sync/src/engine.rs | 385 ++++++++++---------- 1 file changed, 200 insertions(+), 185 deletions(-) diff --git a/substrate/client/network/sync/src/engine.rs b/substrate/client/network/sync/src/engine.rs index 294f563965bb..4b94a162003d 100644 --- a/substrate/client/network/sync/src/engine.rs +++ b/substrate/client/network/sync/src/engine.rs @@ -707,200 +707,21 @@ where .store(self.chain_sync.status().state.is_major_syncing(), Ordering::Relaxed); while let Poll::Ready(_) = self.tick_timeout.poll_tick(cx) { - self.report_metrics(); - - // if `SyncingEngine` has just started, don't evict seemingly inactive peers right away - // as they may not have produced blocks not because they've disconnected but because - // they're still waiting to receive enough relaychain blocks to start producing blocks. - if let Some(started) = self.syncing_started { - if started.elapsed() < INITIAL_EVICTION_WAIT_PERIOD { - continue - } - - self.syncing_started = None; - self.last_notification_io = Instant::now(); - } - - // if syncing hasn't sent or received any blocks within `INACTIVITY_EVICT_THRESHOLD`, - // it means the local node has stalled and is connected to peers who either don't - // consider it connected or are also all stalled. In order to unstall the node, - // disconnect all peers and allow `ProtocolController` to establish new connections. - if self.last_notification_io.elapsed() > INACTIVITY_EVICT_THRESHOLD { - log::debug!( - target: LOG_TARGET, - "syncing has halted due to inactivity, evicting all peers", - ); - - for peer in self.peers.keys() { - self.network_service.report_peer(*peer, rep::INACTIVE_SUBSTREAM); - self.network_service - .disconnect_peer(*peer, self.block_announce_protocol_name.clone()); - } - - // after all the peers have been evicted, start timer again to prevent evicting - // new peers that join after the old peer have been evicted - self.last_notification_io = Instant::now(); - } + self.perform_periodic_actions(); } - while let Poll::Ready(Some(event)) = self.service_rx.poll_next_unpin(cx) { - match event { - ToServiceCommand::SetSyncForkRequest(peers, hash, number) => { - self.chain_sync.set_sync_fork_request(peers, &hash, number); - }, - ToServiceCommand::EventStream(tx) => self.event_streams.push(tx), - ToServiceCommand::RequestJustification(hash, number) => - self.chain_sync.request_justification(&hash, number), - ToServiceCommand::ClearJustificationRequests => - self.chain_sync.clear_justification_requests(), - ToServiceCommand::BlocksProcessed(imported, count, results) => { - for result in self.chain_sync.on_blocks_processed(imported, count, results) { - match result { - Ok(action) => match action { - BlockRequestAction::SendRequest { peer_id, request } => { - // drop obsolete pending response first - self.pending_responses.remove(&peer_id); - self.send_block_request(peer_id, request); - }, - BlockRequestAction::RemoveStale { peer_id } => { - self.pending_responses.remove(&peer_id); - }, - }, - Err(BadPeer(peer_id, repu)) => { - self.pending_responses.remove(&peer_id); - self.network_service.disconnect_peer( - peer_id, - self.block_announce_protocol_name.clone(), - ); - self.network_service.report_peer(peer_id, repu) - }, - } - } - }, - ToServiceCommand::JustificationImported(peer_id, hash, number, success) => { - self.chain_sync.on_justification_import(hash, number, success); - if !success { - log::info!( - target: LOG_TARGET, - "💔 Invalid justification provided by {peer_id} for #{hash}", - ); - self.network_service - .disconnect_peer(peer_id, self.block_announce_protocol_name.clone()); - self.network_service.report_peer( - peer_id, - ReputationChange::new_fatal("Invalid justification"), - ); - } - }, - ToServiceCommand::AnnounceBlock(hash, data) => self.announce_block(hash, data), - ToServiceCommand::NewBestBlockImported(hash, number) => - self.new_best_block_imported(hash, number), - ToServiceCommand::Status(tx) => { - let mut status = self.chain_sync.status(); - status.num_connected_peers = self.peers.len() as u32; - let _ = tx.send(status); - }, - ToServiceCommand::NumActivePeers(tx) => { - let _ = tx.send(self.num_active_peers()); - }, - ToServiceCommand::SyncState(tx) => { - let _ = tx.send(self.chain_sync.status()); - }, - ToServiceCommand::BestSeenBlock(tx) => { - let _ = tx.send(self.chain_sync.status().best_seen_block); - }, - ToServiceCommand::NumSyncPeers(tx) => { - let _ = tx.send(self.chain_sync.status().num_peers); - }, - ToServiceCommand::NumQueuedBlocks(tx) => { - let _ = tx.send(self.chain_sync.status().queued_blocks); - }, - ToServiceCommand::NumDownloadedBlocks(tx) => { - let _ = tx.send(self.chain_sync.num_downloaded_blocks()); - }, - ToServiceCommand::NumSyncRequests(tx) => { - let _ = tx.send(self.chain_sync.num_sync_requests()); - }, - ToServiceCommand::PeersInfo(tx) => { - let peers_info = self - .peers - .iter() - .map(|(peer_id, peer)| (*peer_id, peer.info.clone())) - .collect(); - let _ = tx.send(peers_info); - }, - ToServiceCommand::OnBlockFinalized(hash, header) => - self.chain_sync.on_block_finalized(&hash, *header.number()), - } + while let Poll::Ready(Some(command)) = self.service_rx.poll_next_unpin(cx) { + self.process_service_command(command); } while let Poll::Ready(Some(event)) = self.rx.poll_next_unpin(cx) { - match event { - sc_network::SyncEvent::NotificationStreamOpened { - remote, - received_handshake, - sink, - inbound, - tx, - } => match self.on_sync_peer_connected(remote, &received_handshake, sink, inbound) { - Ok(()) => { - let _ = tx.send(true); - }, - Err(()) => { - log::debug!( - target: LOG_TARGET, - "Failed to register peer {remote:?}: {received_handshake:?}", - ); - let _ = tx.send(false); - }, - }, - sc_network::SyncEvent::NotificationStreamClosed { remote } => { - if self.on_sync_peer_disconnected(remote).is_err() { - log::trace!( - target: LOG_TARGET, - "Disconnected peer which had earlier been refused by on_sync_peer_connected {}", - remote - ); - } - }, - sc_network::SyncEvent::NotificationsReceived { remote, messages } => { - for message in messages { - if self.peers.contains_key(&remote) { - if let Ok(announce) = BlockAnnounce::decode(&mut message.as_ref()) { - self.last_notification_io = Instant::now(); - self.push_block_announce_validation(remote, announce); - } else { - log::warn!(target: "sub-libp2p", "Failed to decode block announce"); - } - } else { - log::trace!( - target: LOG_TARGET, - "Received sync for peer earlier refused by sync layer: {remote}", - ); - } - } - }, - sc_network::SyncEvent::NotificationSinkReplaced { remote, sink } => { - if let Some(peer) = self.peers.get_mut(&remote) { - peer.sink = sink; - } - }, - } + self.process_sync_event(event); } // Retreive warp sync target block header just before polling `ChainSync` // to make progress as soon as we receive it. - match self.warp_sync_target_block_header_rx.poll_unpin(cx) { - Poll::Ready(Ok(target)) => { - self.chain_sync.set_warp_sync_target_block(target); - }, - Poll::Ready(Err(err)) => { - log::error!( - target: LOG_TARGET, - "Failed to get target block for warp sync. Error: {err:?}", - ); - }, - Poll::Pending => {}, + if let Poll::Ready(result) = self.warp_sync_target_block_header_rx.poll_unpin(cx) { + self.pass_warp_sync_target_block_header(result); } // Send outbound requests on `ChanSync`'s behalf. @@ -922,6 +743,200 @@ where Poll::Pending } + fn perform_periodic_actions(&mut self) { + self.report_metrics(); + + // if `SyncingEngine` has just started, don't evict seemingly inactive peers right away + // as they may not have produced blocks not because they've disconnected but because + // they're still waiting to receive enough relaychain blocks to start producing blocks. + if let Some(started) = self.syncing_started { + if started.elapsed() < INITIAL_EVICTION_WAIT_PERIOD { + return + } + + self.syncing_started = None; + self.last_notification_io = Instant::now(); + } + + // if syncing hasn't sent or received any blocks within `INACTIVITY_EVICT_THRESHOLD`, + // it means the local node has stalled and is connected to peers who either don't + // consider it connected or are also all stalled. In order to unstall the node, + // disconnect all peers and allow `ProtocolController` to establish new connections. + if self.last_notification_io.elapsed() > INACTIVITY_EVICT_THRESHOLD { + log::debug!( + target: LOG_TARGET, + "syncing has halted due to inactivity, evicting all peers", + ); + + for peer in self.peers.keys() { + self.network_service.report_peer(*peer, rep::INACTIVE_SUBSTREAM); + self.network_service + .disconnect_peer(*peer, self.block_announce_protocol_name.clone()); + } + + // after all the peers have been evicted, start timer again to prevent evicting + // new peers that join after the old peer have been evicted + self.last_notification_io = Instant::now(); + } + } + + fn process_service_command(&mut self, command: ToServiceCommand) { + match command { + ToServiceCommand::SetSyncForkRequest(peers, hash, number) => { + self.chain_sync.set_sync_fork_request(peers, &hash, number); + }, + ToServiceCommand::EventStream(tx) => self.event_streams.push(tx), + ToServiceCommand::RequestJustification(hash, number) => + self.chain_sync.request_justification(&hash, number), + ToServiceCommand::ClearJustificationRequests => + self.chain_sync.clear_justification_requests(), + ToServiceCommand::BlocksProcessed(imported, count, results) => { + for result in self.chain_sync.on_blocks_processed(imported, count, results) { + match result { + Ok(action) => match action { + BlockRequestAction::SendRequest { peer_id, request } => { + // drop obsolete pending response first + self.pending_responses.remove(&peer_id); + self.send_block_request(peer_id, request); + }, + BlockRequestAction::RemoveStale { peer_id } => { + self.pending_responses.remove(&peer_id); + }, + }, + Err(BadPeer(peer_id, repu)) => { + self.pending_responses.remove(&peer_id); + self.network_service.disconnect_peer( + peer_id, + self.block_announce_protocol_name.clone(), + ); + self.network_service.report_peer(peer_id, repu) + }, + } + } + }, + ToServiceCommand::JustificationImported(peer_id, hash, number, success) => { + self.chain_sync.on_justification_import(hash, number, success); + if !success { + log::info!( + target: LOG_TARGET, + "💔 Invalid justification provided by {peer_id} for #{hash}", + ); + self.network_service + .disconnect_peer(peer_id, self.block_announce_protocol_name.clone()); + self.network_service + .report_peer(peer_id, ReputationChange::new_fatal("Invalid justification")); + } + }, + ToServiceCommand::AnnounceBlock(hash, data) => self.announce_block(hash, data), + ToServiceCommand::NewBestBlockImported(hash, number) => + self.new_best_block_imported(hash, number), + ToServiceCommand::Status(tx) => { + let mut status = self.chain_sync.status(); + status.num_connected_peers = self.peers.len() as u32; + let _ = tx.send(status); + }, + ToServiceCommand::NumActivePeers(tx) => { + let _ = tx.send(self.num_active_peers()); + }, + ToServiceCommand::SyncState(tx) => { + let _ = tx.send(self.chain_sync.status()); + }, + ToServiceCommand::BestSeenBlock(tx) => { + let _ = tx.send(self.chain_sync.status().best_seen_block); + }, + ToServiceCommand::NumSyncPeers(tx) => { + let _ = tx.send(self.chain_sync.status().num_peers); + }, + ToServiceCommand::NumQueuedBlocks(tx) => { + let _ = tx.send(self.chain_sync.status().queued_blocks); + }, + ToServiceCommand::NumDownloadedBlocks(tx) => { + let _ = tx.send(self.chain_sync.num_downloaded_blocks()); + }, + ToServiceCommand::NumSyncRequests(tx) => { + let _ = tx.send(self.chain_sync.num_sync_requests()); + }, + ToServiceCommand::PeersInfo(tx) => { + let peers_info = self + .peers + .iter() + .map(|(peer_id, peer)| (*peer_id, peer.info.clone())) + .collect(); + let _ = tx.send(peers_info); + }, + ToServiceCommand::OnBlockFinalized(hash, header) => + self.chain_sync.on_block_finalized(&hash, *header.number()), + } + } + + fn process_sync_event(&mut self, event: sc_network::SyncEvent) { + match event { + sc_network::SyncEvent::NotificationStreamOpened { + remote, + received_handshake, + sink, + inbound, + tx, + } => match self.on_sync_peer_connected(remote, &received_handshake, sink, inbound) { + Ok(()) => { + let _ = tx.send(true); + }, + Err(()) => { + log::debug!( + target: LOG_TARGET, + "Failed to register peer {remote:?}: {received_handshake:?}", + ); + let _ = tx.send(false); + }, + }, + sc_network::SyncEvent::NotificationStreamClosed { remote } => { + if self.on_sync_peer_disconnected(remote).is_err() { + log::trace!( + target: LOG_TARGET, + "Disconnected peer which had earlier been refused by on_sync_peer_connected {}", + remote + ); + } + }, + sc_network::SyncEvent::NotificationsReceived { remote, messages } => { + for message in messages { + if self.peers.contains_key(&remote) { + if let Ok(announce) = BlockAnnounce::decode(&mut message.as_ref()) { + self.last_notification_io = Instant::now(); + self.push_block_announce_validation(remote, announce); + } else { + log::warn!(target: "sub-libp2p", "Failed to decode block announce"); + } + } else { + log::trace!( + target: LOG_TARGET, + "Received sync for peer earlier refused by sync layer: {remote}", + ); + } + } + }, + sc_network::SyncEvent::NotificationSinkReplaced { remote, sink } => { + if let Some(peer) = self.peers.get_mut(&remote) { + peer.sink = sink; + } + }, + } + } + + fn pass_warp_sync_target_block_header(&mut self, header: Result) { + match header { + Ok(header) => { + self.chain_sync.set_warp_sync_target_block(header); + }, + Err(err) => { + log::error!( + target: LOG_TARGET, + "Failed to get target block for warp sync. Error: {err:?}", + ); + }, + } + } + /// Called by peer when it is disconnecting. /// /// Returns a result if the handshake of this peer was indeed accepted. From c5ef26751965db07d4ce1496af1a0030fed11a88 Mon Sep 17 00:00:00 2001 From: Dmitry Markin Date: Thu, 2 Nov 2023 10:25:37 +0200 Subject: [PATCH 03/12] Implement `FusedStream` for `BlockAnnounceValidator` --- .../network/sync/src/block_announce_validator.rs | 12 ++++++++++-- 1 file changed, 10 insertions(+), 2 deletions(-) diff --git a/substrate/client/network/sync/src/block_announce_validator.rs b/substrate/client/network/sync/src/block_announce_validator.rs index f083f9e29e44..961b581cddce 100644 --- a/substrate/client/network/sync/src/block_announce_validator.rs +++ b/substrate/client/network/sync/src/block_announce_validator.rs @@ -16,10 +16,11 @@ // You should have received a copy of the GNU General Public License // along with this program. If not, see . -//! `BlockAnnounceValidator` is responsible for async validation of block announcements. +//! [`BlockAnnounceValidator`] is responsible for async validation of block announcements. +//! [`Stream`] implemented by [`BlockAnnounceValidator`] never terminates. use crate::futures_stream::FuturesStream; -use futures::{Future, FutureExt, Stream, StreamExt}; +use futures::{stream::FusedStream, Future, FutureExt, Stream, StreamExt}; use libp2p::PeerId; use log::{debug, error, trace, warn}; use sc_network_common::sync::message::BlockAnnounce; @@ -300,6 +301,13 @@ impl Stream for BlockAnnounceValidator { } } +// As [`BlockAnnounceValidator`] never terminates, we can easily implement [`FusedStream`] for it. +impl FusedStream for BlockAnnounceValidator { + fn is_terminated(&self) -> bool { + false + } +} + #[cfg(test)] mod tests { use super::*; From 2fc1971239467b9fea3af21e709c981816b10dfd Mon Sep 17 00:00:00 2001 From: Dmitry Markin Date: Thu, 2 Nov 2023 11:16:42 +0200 Subject: [PATCH 04/12] Make `PendingResponses` never terminate --- .../network/sync/src/pending_responses.rs | 46 ++++++++++++------- 1 file changed, 30 insertions(+), 16 deletions(-) diff --git a/substrate/client/network/sync/src/pending_responses.rs b/substrate/client/network/sync/src/pending_responses.rs index 9e2fd5cfd674..d8ed8be45f08 100644 --- a/substrate/client/network/sync/src/pending_responses.rs +++ b/substrate/client/network/sync/src/pending_responses.rs @@ -17,20 +17,20 @@ // along with this program. If not, see . //! [`PendingResponses`] is responsible for keeping track of pending responses and -//! polling them. +//! polling them. [`Stream`] implemented by [`PendingResponses`] never terminates. use crate::types::PeerRequest; use futures::{ channel::oneshot, future::BoxFuture, - stream::{BoxStream, Stream}, + stream::{BoxStream, FusedStream, Stream}, FutureExt, StreamExt, }; use libp2p::PeerId; use log::error; use sc_network::request_responses::RequestFailure; use sp_runtime::traits::Block as BlockT; -use std::task::{Context, Poll}; +use std::task::{Context, Poll, Waker}; use tokio_stream::StreamMap; /// Log target for this file. @@ -53,11 +53,13 @@ pub(crate) struct ResponseEvent { pub(crate) struct PendingResponses { /// Pending responses pending_responses: StreamMap, ResponseResult)>>, + /// Waker to implement never terminating stream + waker: Option, } impl PendingResponses { pub fn new() -> Self { - Self { pending_responses: StreamMap::new() } + Self { pending_responses: StreamMap::new(), waker: None } } pub fn insert( @@ -82,6 +84,10 @@ impl PendingResponses { ); debug_assert!(false); } + + if let Some(waker) = self.waker.take() { + waker.wake(); + } } pub fn remove(&mut self, peer_id: &PeerId) -> bool { @@ -93,8 +99,6 @@ impl PendingResponses { } } -impl Unpin for PendingResponses {} - impl Stream for PendingResponses { type Item = ResponseEvent; @@ -102,16 +106,26 @@ impl Stream for PendingResponses { mut self: std::pin::Pin<&mut Self>, cx: &mut Context<'_>, ) -> Poll> { - match futures::ready!(self.pending_responses.poll_next_unpin(cx)) { - Some((peer_id, (request, response))) => { - // We need to manually remove the stream, because `StreamMap` doesn't know yet that - // it's going to yield `None`, so may not remove it before the next request is made - // to the same peer. - self.pending_responses.remove(&peer_id); - - Poll::Ready(Some(ResponseEvent { peer_id, request, response })) - }, - None => Poll::Ready(None), + if let Poll::Ready(Some((peer_id, (request, response)))) = + self.pending_responses.poll_next_unpin(cx) + { + // We need to manually remove the stream, because `StreamMap` doesn't know yet that + // it's going to yield `None`, so may not remove it before the next request is made + // to the same peer. + self.pending_responses.remove(&peer_id); + + Poll::Ready(Some(ResponseEvent { peer_id, request, response })) + } else { + self.waker = Some(cx.waker().clone()); + + Poll::Pending } } } + +// As [`PendingResponses`] never terminates, we can easily implement [`FusedStream`] for it. +impl FusedStream for PendingResponses { + fn is_terminated(&self) -> bool { + false + } +} From 4e29cfbefc000748c4cacbaf5ae758b007b2c4b8 Mon Sep 17 00:00:00 2001 From: Dmitry Markin Date: Thu, 2 Nov 2023 12:01:54 +0200 Subject: [PATCH 05/12] Make `SyncingEngine::run` use `tokio::select!` --- substrate/client/network/sync/Cargo.toml | 2 +- substrate/client/network/sync/src/engine.rs | 73 +++++++-------------- 2 files changed, 26 insertions(+), 49 deletions(-) diff --git a/substrate/client/network/sync/Cargo.toml b/substrate/client/network/sync/Cargo.toml index 038f993133d0..a1ea39a852fc 100644 --- a/substrate/client/network/sync/Cargo.toml +++ b/substrate/client/network/sync/Cargo.toml @@ -30,7 +30,7 @@ schnellru = "0.2.1" smallvec = "1.11.0" thiserror = "1.0" tokio-stream = "0.1.14" -tokio = { version = "1.32.0", features = ["time"] } +tokio = { version = "1.32.0", features = ["time", "macros"] } fork-tree = { path = "../../../utils/fork-tree" } prometheus-endpoint = { package = "substrate-prometheus-endpoint", path = "../../../utils/prometheus" } sc-client-api = { path = "../../api" } diff --git a/substrate/client/network/sync/src/engine.rs b/substrate/client/network/sync/src/engine.rs index 4b94a162003d..02a0dc2e9151 100644 --- a/substrate/client/network/sync/src/engine.rs +++ b/substrate/client/network/sync/src/engine.rs @@ -85,7 +85,6 @@ use std::{ atomic::{AtomicBool, AtomicUsize, Ordering}, Arc, }, - task::Poll, time::{Duration, Instant}, }; @@ -254,7 +253,7 @@ pub struct SyncingEngine { service_rx: TracingUnboundedReceiver>, /// Channel for receiving inbound connections from `Protocol`. - rx: sc_utils::mpsc::TracingUnboundedReceiver>, + sync_events_rx: sc_utils::mpsc::TracingUnboundedReceiver>, /// Assigned roles. roles: Roles, @@ -304,7 +303,7 @@ pub struct SyncingEngine { boot_node_ids: HashSet, /// A channel to get target block header if we skip over proofs downloading during warp sync. - warp_sync_target_block_header_rx: + warp_sync_target_block_header_rx_fused: Fuse>>, /// Protocol name used for block announcements @@ -363,7 +362,7 @@ where block_downloader: Arc>, state_request_protocol_name: ProtocolName, warp_sync_protocol_name: Option, - rx: sc_utils::mpsc::TracingUnboundedReceiver>, + sync_events_rx: sc_utils::mpsc::TracingUnboundedReceiver>, ) -> Result<(Self, SyncingService, NonDefaultSetConfig), ClientError> { let mode = net_config.network_config.sync_mode; let max_parallel_downloads = net_config.network_config.max_parallel_downloads; @@ -436,7 +435,7 @@ where // Make sure polling of the target block channel is a no-op if there is no block to // retrieve. - let warp_sync_target_block_header_rx = warp_sync_target_block_header_rx + let warp_sync_target_block_header_rx_fused = warp_sync_target_block_header_rx .map_or(futures::future::pending().boxed().fuse(), |rx| rx.boxed().fuse()); let block_announce_config = Self::get_block_announce_proto_config( @@ -499,11 +498,11 @@ where num_connected: num_connected.clone(), is_major_syncing: is_major_syncing.clone(), service_rx, - rx, + sync_events_rx, genesis_hash, important_peers, default_peers_set_no_slot_connected_peers: HashSet::new(), - warp_sync_target_block_header_rx, + warp_sync_target_block_header_rx_fused, boot_node_ids, default_peers_set_no_slot_peers, default_peers_set_num_full, @@ -697,50 +696,28 @@ where self.syncing_started = Some(Instant::now()); loop { - futures::future::poll_fn(|cx| self.poll(cx)).await; - } - } - - pub fn poll(&mut self, cx: &mut std::task::Context) -> Poll<()> { - self.num_connected.store(self.peers.len(), Ordering::Relaxed); - self.is_major_syncing - .store(self.chain_sync.status().state.is_major_syncing(), Ordering::Relaxed); - - while let Poll::Ready(_) = self.tick_timeout.poll_tick(cx) { - self.perform_periodic_actions(); - } - - while let Poll::Ready(Some(command)) = self.service_rx.poll_next_unpin(cx) { - self.process_service_command(command); - } - - while let Poll::Ready(Some(event)) = self.rx.poll_next_unpin(cx) { - self.process_sync_event(event); - } - - // Retreive warp sync target block header just before polling `ChainSync` - // to make progress as soon as we receive it. - if let Poll::Ready(result) = self.warp_sync_target_block_header_rx.poll_unpin(cx) { - self.pass_warp_sync_target_block_header(result); - } - - // Send outbound requests on `ChanSync`'s behalf. - self.send_chain_sync_requests(); + tokio::select! { + _ = self.tick_timeout.tick() => self.perform_periodic_actions(), + command = self.service_rx.select_next_some() => + self.process_service_command(command), + sync_event = self.sync_events_rx.select_next_some() => + self.process_sync_event(sync_event), + warp_target_block_header = &mut self.warp_sync_target_block_header_rx_fused => + self.pass_warp_sync_target_block_header(warp_target_block_header), + response_event = self.pending_responses.select_next_some() => + self.process_response_event(response_event), + validation_result = self.block_announce_validator.select_next_some() => + self.process_block_announce_validation_result(validation_result), + } - // Poll & process pending responses. - while let Poll::Ready(Some(event)) = self.pending_responses.poll_next_unpin(cx) { - self.process_response_event(event); - } + // Update atomic variables + self.num_connected.store(self.peers.len(), Ordering::Relaxed); + self.is_major_syncing + .store(self.chain_sync.status().state.is_major_syncing(), Ordering::Relaxed); - // Poll block announce validations last, because if a block announcement was received - // through the event stream between `SyncingEngine` and `Protocol` and the validation - // finished right after it is queued, the resulting block request (if any) can be sent - // right away. - while let Poll::Ready(Some(result)) = self.block_announce_validator.poll_next_unpin(cx) { - self.process_block_announce_validation_result(result); + // Send outbound requests on `ChanSync`'s behalf. + self.send_chain_sync_requests(); } - - Poll::Pending } fn perform_periodic_actions(&mut self) { From 328312a32937ab1245b74bcd5ccf773b3f20e9a3 Mon Sep 17 00:00:00 2001 From: Dmitry Markin Date: Thu, 2 Nov 2023 17:50:13 +0200 Subject: [PATCH 06/12] Apply review suggestion: use `match` in `PendingResponses::poll_next` --- .../network/sync/src/pending_responses.rs | 27 ++++++++++--------- 1 file changed, 14 insertions(+), 13 deletions(-) diff --git a/substrate/client/network/sync/src/pending_responses.rs b/substrate/client/network/sync/src/pending_responses.rs index d8ed8be45f08..55308dfc1ea9 100644 --- a/substrate/client/network/sync/src/pending_responses.rs +++ b/substrate/client/network/sync/src/pending_responses.rs @@ -106,19 +106,20 @@ impl Stream for PendingResponses { mut self: std::pin::Pin<&mut Self>, cx: &mut Context<'_>, ) -> Poll> { - if let Poll::Ready(Some((peer_id, (request, response)))) = - self.pending_responses.poll_next_unpin(cx) - { - // We need to manually remove the stream, because `StreamMap` doesn't know yet that - // it's going to yield `None`, so may not remove it before the next request is made - // to the same peer. - self.pending_responses.remove(&peer_id); - - Poll::Ready(Some(ResponseEvent { peer_id, request, response })) - } else { - self.waker = Some(cx.waker().clone()); - - Poll::Pending + match self.pending_responses.poll_next_unpin(cx) { + Poll::Ready(Some((peer_id, (request, response)))) => { + // We need to manually remove the stream, because `StreamMap` doesn't know yet that + // it's going to yield `None`, so may not remove it before the next request is made + // to the same peer. + self.pending_responses.remove(&peer_id); + + Poll::Ready(Some(ResponseEvent { peer_id, request, response })) + }, + Poll::Ready(None) | Poll::Pending => { + self.waker = Some(cx.waker().clone()); + + Poll::Pending + }, } } } From 452f6a79861d25a48e848bab62502ffd8f89da7a Mon Sep 17 00:00:00 2001 From: Dmitry Markin Date: Fri, 3 Nov 2023 11:08:34 +0200 Subject: [PATCH 07/12] Get rid of `NetworkService` in `ChainSync` --- .../client/network/sync/src/chain_sync.rs | 71 ++++------ .../network/sync/src/chain_sync/test.rs | 125 ++---------------- substrate/client/network/sync/src/engine.rs | 37 ++++-- 3 files changed, 69 insertions(+), 164 deletions(-) diff --git a/substrate/client/network/sync/src/chain_sync.rs b/substrate/client/network/sync/src/chain_sync.rs index 9cf0080e36ac..37eb5ffb3556 100644 --- a/substrate/client/network/sync/src/chain_sync.rs +++ b/substrate/client/network/sync/src/chain_sync.rs @@ -32,7 +32,6 @@ use crate::{ blocks::BlockCollection, extra_requests::ExtraRequests, schema::v1::StateResponse, - service::network::NetworkServiceHandle, state::{ImportResult, StateSync}, types::{ BadPeer, Metrics, OpaqueStateRequest, OpaqueStateResponse, PeerInfo, SyncMode, SyncState, @@ -50,7 +49,6 @@ use log::{debug, error, info, trace, warn}; use sc_client_api::{BlockBackend, ProofProvider}; use sc_consensus::{BlockImportError, BlockImportStatus, IncomingBlock}; -use sc_network::types::ProtocolName; use sc_network_common::sync::message::{ BlockAnnounce, BlockAttributes, BlockData, BlockRequest, BlockResponse, Direction, FromBlock, }; @@ -237,7 +235,7 @@ pub enum OnBlockJustification { // Result of [`ChainSync::on_state_data`]. #[derive(Debug)] -pub enum OnStateData { +enum OnStateData { /// The block and state that should be imported. Import(BlockOrigin, IncomingBlock), /// A new state request needs to be made to the given peer. @@ -247,7 +245,7 @@ pub enum OnStateData { /// Action that the parent of [`ChainSync`] should perform after reporting block response with /// [`ChainSync::on_block_response`]. pub enum OnBlockResponse { - /// Nothing to do + /// Nothing to do. Nothing, /// Perform block request. SendBlockRequest { peer_id: PeerId, request: BlockRequest }, @@ -255,6 +253,19 @@ pub enum OnBlockResponse { ImportBlocks(ImportBlocksAction), /// Import justifications. ImportJustifications(ImportJustificationsAction), + /// Invalid block response, the peer should be disconnected and reported. + DisconnectPeer(BadPeer), +} + +/// Action that the parent of [`ChainSync`] should perform after reporting state response with +/// [`ChainSync::on_state_response`]. +pub enum OnStateResponse { + /// Nothing to do. + Nothing, + /// Import blocks. + ImportBlocks(ImportBlocksAction), + /// Invalid state response, the peer should be disconnected and reported. + DisconnectPeer(BadPeer), } /// The main data structure which contains all the state for a chains @@ -302,10 +313,6 @@ pub struct ChainSync { import_existing: bool, /// Gap download process. gap_sync: Option>, - /// Handle for communicating with `NetworkService` - network_service: NetworkServiceHandle, - /// Protocol name used for block announcements - block_announce_protocol_name: ProtocolName, } /// All the data we have about a Peer that we are trying to sync with @@ -396,11 +403,9 @@ where pub fn new( mode: SyncMode, client: Arc, - block_announce_protocol_name: ProtocolName, max_parallel_downloads: u32, max_blocks_per_request: u32, warp_sync_config: Option>, - network_service: NetworkServiceHandle, ) -> Result { let mut sync = Self { client, @@ -420,10 +425,8 @@ where warp_sync: None, import_existing: false, gap_sync: None, - network_service, warp_sync_config, warp_sync_target_block_header: None, - block_announce_protocol_name, }; sync.reset_sync_start_point()?; @@ -1539,12 +1542,7 @@ where number, justifications, }), - Err(BadPeer(id, repu)) => { - self.network_service - .disconnect_peer(id, self.block_announce_protocol_name.clone()); - self.network_service.report_peer(id, repu); - OnBlockResponse::Nothing - }, + Err(bad_peer) => OnBlockResponse::DisconnectPeer(bad_peer), } } else { match self.on_block_data(&peer_id, Some(request), block_response) { @@ -1552,12 +1550,7 @@ where Ok(OnBlockData::Request(peer_id, request)) => OnBlockResponse::SendBlockRequest { peer_id, request }, Ok(OnBlockData::Continue) => OnBlockResponse::Nothing, - Err(BadPeer(id, repu)) => { - self.network_service - .disconnect_peer(id, self.block_announce_protocol_name.clone()); - self.network_service.report_peer(id, repu); - OnBlockResponse::Nothing - }, + Err(bad_peer) => OnBlockResponse::DisconnectPeer(bad_peer), } } } @@ -1568,26 +1561,12 @@ where &mut self, peer_id: PeerId, response: OpaqueStateResponse, - ) -> Option> { + ) -> OnStateResponse { match self.on_state_data(&peer_id, response) { Ok(OnStateData::Import(origin, block)) => - Some(ImportBlocksAction { origin, blocks: vec![block] }), - Ok(OnStateData::Continue) => None, - Err(BadPeer(id, repu)) => { - self.network_service - .disconnect_peer(id, self.block_announce_protocol_name.clone()); - self.network_service.report_peer(id, repu); - None - }, - } - } - - /// Submit a warp proof response received. - pub fn on_warp_sync_response(&mut self, peer_id: PeerId, response: EncodedProof) { - if let Err(BadPeer(id, repu)) = self.on_warp_sync_data(&peer_id, response) { - self.network_service - .disconnect_peer(id, self.block_announce_protocol_name.clone()); - self.network_service.report_peer(id, repu); + OnStateResponse::ImportBlocks(ImportBlocksAction { origin, blocks: vec![block] }), + Ok(OnStateData::Continue) => OnStateResponse::Nothing, + Err(bad_peer) => OnStateResponse::DisconnectPeer(bad_peer), } } @@ -1892,7 +1871,13 @@ where } } - fn on_warp_sync_data(&mut self, who: &PeerId, response: EncodedProof) -> Result<(), BadPeer> { + /// Submit a warp proof response received. + #[must_use] + pub fn on_warp_sync_response( + &mut self, + who: &PeerId, + response: EncodedProof, + ) -> Result<(), BadPeer> { if let Some(peer) = self.peers.get_mut(who) { if let PeerSyncState::DownloadingWarpProof = peer.state { peer.state = PeerSyncState::Available; diff --git a/substrate/client/network/sync/src/chain_sync/test.rs b/substrate/client/network/sync/src/chain_sync/test.rs index 6f9fea1b161b..da08fd06d968 100644 --- a/substrate/client/network/sync/src/chain_sync/test.rs +++ b/substrate/client/network/sync/src/chain_sync/test.rs @@ -19,7 +19,6 @@ //! Tests of [`ChainSync`]. use super::*; -use crate::service::network::NetworkServiceProvider; use futures::executor::block_on; use sc_block_builder::BlockBuilderProvider; use sc_network_common::sync::message::{BlockAnnounce, BlockData, BlockState, FromBlock}; @@ -39,17 +38,7 @@ fn processes_empty_response_on_justification_request_for_unknown_block() { let client = Arc::new(TestClientBuilder::new().build()); let peer_id = PeerId::random(); - let (_chain_sync_network_provider, chain_sync_network_handle) = NetworkServiceProvider::new(); - let mut sync = ChainSync::new( - SyncMode::Full, - client.clone(), - ProtocolName::from("test-block-announce-protocol"), - 1, - 64, - None, - chain_sync_network_handle, - ) - .unwrap(); + let mut sync = ChainSync::new(SyncMode::Full, client.clone(), 1, 64, None).unwrap(); let (a1_hash, a1_number) = { let a1 = client.new_block(Default::default()).unwrap().build().unwrap().block; @@ -96,18 +85,8 @@ fn processes_empty_response_on_justification_request_for_unknown_block() { #[test] fn restart_doesnt_affect_peers_downloading_finality_data() { let mut client = Arc::new(TestClientBuilder::new().build()); - let (_chain_sync_network_provider, chain_sync_network_handle) = NetworkServiceProvider::new(); - - let mut sync = ChainSync::new( - SyncMode::Full, - client.clone(), - ProtocolName::from("test-block-announce-protocol"), - 1, - 64, - None, - chain_sync_network_handle, - ) - .unwrap(); + + let mut sync = ChainSync::new(SyncMode::Full, client.clone(), 1, 64, None).unwrap(); let peer_id1 = PeerId::random(); let peer_id2 = PeerId::random(); @@ -259,18 +238,8 @@ fn do_not_report_peer_on_block_response_for_block_request() { sp_tracing::try_init_simple(); let mut client = Arc::new(TestClientBuilder::new().build()); - let (_chain_sync_network_provider, chain_sync_network_handle) = NetworkServiceProvider::new(); - - let mut sync = ChainSync::new( - SyncMode::Full, - client.clone(), - ProtocolName::from("test-block-announce-protocol"), - 5, - 64, - None, - chain_sync_network_handle, - ) - .unwrap(); + + let mut sync = ChainSync::new(SyncMode::Full, client.clone(), 5, 64, None).unwrap(); let peer_id1 = PeerId::random(); let peer_id2 = PeerId::random(); @@ -379,19 +348,9 @@ fn do_ancestor_search_when_common_block_to_best_qeued_gap_is_to_big() { }; let mut client = Arc::new(TestClientBuilder::new().build()); - let (_chain_sync_network_provider, chain_sync_network_handle) = NetworkServiceProvider::new(); let info = client.info(); - let mut sync = ChainSync::new( - SyncMode::Full, - client.clone(), - ProtocolName::from("test-block-announce-protocol"), - 5, - 64, - None, - chain_sync_network_handle, - ) - .unwrap(); + let mut sync = ChainSync::new(SyncMode::Full, client.clone(), 5, 64, None).unwrap(); let peer_id1 = PeerId::random(); let peer_id2 = PeerId::random(); @@ -505,7 +464,6 @@ fn do_ancestor_search_when_common_block_to_best_qeued_gap_is_to_big() { fn can_sync_huge_fork() { sp_tracing::try_init_simple(); - let (_chain_sync_network_provider, chain_sync_network_handle) = NetworkServiceProvider::new(); let mut client = Arc::new(TestClientBuilder::new().build()); let blocks = (0..MAX_BLOCKS_TO_LOOK_BACKWARDS * 4) .map(|_| build_block(&mut client, None, false)) @@ -530,16 +488,7 @@ fn can_sync_huge_fork() { let info = client.info(); - let mut sync = ChainSync::new( - SyncMode::Full, - client.clone(), - ProtocolName::from("test-block-announce-protocol"), - 5, - 64, - None, - chain_sync_network_handle, - ) - .unwrap(); + let mut sync = ChainSync::new(SyncMode::Full, client.clone(), 5, 64, None).unwrap(); let finalized_block = blocks[MAX_BLOCKS_TO_LOOK_BACKWARDS as usize * 2 - 1].clone(); let just = (*b"TEST", Vec::new()); @@ -633,7 +582,6 @@ fn can_sync_huge_fork() { fn syncs_fork_without_duplicate_requests() { sp_tracing::try_init_simple(); - let (_chain_sync_network_provider, chain_sync_network_handle) = NetworkServiceProvider::new(); let mut client = Arc::new(TestClientBuilder::new().build()); let blocks = (0..MAX_BLOCKS_TO_LOOK_BACKWARDS * 4) .map(|_| build_block(&mut client, None, false)) @@ -658,16 +606,7 @@ fn syncs_fork_without_duplicate_requests() { let info = client.info(); - let mut sync = ChainSync::new( - SyncMode::Full, - client.clone(), - ProtocolName::from("test-block-announce-protocol"), - 5, - 64, - None, - chain_sync_network_handle, - ) - .unwrap(); + let mut sync = ChainSync::new(SyncMode::Full, client.clone(), 5, 64, None).unwrap(); let finalized_block = blocks[MAX_BLOCKS_TO_LOOK_BACKWARDS as usize * 2 - 1].clone(); let just = (*b"TEST", Vec::new()); @@ -784,20 +723,10 @@ fn syncs_fork_without_duplicate_requests() { #[test] fn removes_target_fork_on_disconnect() { sp_tracing::try_init_simple(); - let (_chain_sync_network_provider, chain_sync_network_handle) = NetworkServiceProvider::new(); let mut client = Arc::new(TestClientBuilder::new().build()); let blocks = (0..3).map(|_| build_block(&mut client, None, false)).collect::>(); - let mut sync = ChainSync::new( - SyncMode::Full, - client.clone(), - ProtocolName::from("test-block-announce-protocol"), - 1, - 64, - None, - chain_sync_network_handle, - ) - .unwrap(); + let mut sync = ChainSync::new(SyncMode::Full, client.clone(), 1, 64, None).unwrap(); let peer_id1 = PeerId::random(); let common_block = blocks[1].clone(); @@ -818,22 +747,12 @@ fn removes_target_fork_on_disconnect() { #[test] fn can_import_response_with_missing_blocks() { sp_tracing::try_init_simple(); - let (_chain_sync_network_provider, chain_sync_network_handle) = NetworkServiceProvider::new(); let mut client2 = Arc::new(TestClientBuilder::new().build()); let blocks = (0..4).map(|_| build_block(&mut client2, None, false)).collect::>(); let empty_client = Arc::new(TestClientBuilder::new().build()); - let mut sync = ChainSync::new( - SyncMode::Full, - empty_client.clone(), - ProtocolName::from("test-block-announce-protocol"), - 1, - 64, - None, - chain_sync_network_handle, - ) - .unwrap(); + let mut sync = ChainSync::new(SyncMode::Full, empty_client.clone(), 1, 64, None).unwrap(); let peer_id1 = PeerId::random(); let best_block = blocks[3].clone(); @@ -865,17 +784,7 @@ fn ancestor_search_repeat() { #[test] fn sync_restart_removes_block_but_not_justification_requests() { let mut client = Arc::new(TestClientBuilder::new().build()); - let (_chain_sync_network_provider, chain_sync_network_handle) = NetworkServiceProvider::new(); - let mut sync = ChainSync::new( - SyncMode::Full, - client.clone(), - ProtocolName::from("test-block-announce-protocol"), - 1, - 64, - None, - chain_sync_network_handle, - ) - .unwrap(); + let mut sync = ChainSync::new(SyncMode::Full, client.clone(), 1, 64, None).unwrap(); let peers = vec![PeerId::random(), PeerId::random()]; @@ -970,7 +879,6 @@ fn sync_restart_removes_block_but_not_justification_requests() { fn request_across_forks() { sp_tracing::try_init_simple(); - let (_chain_sync_network_provider, chain_sync_network_handle) = NetworkServiceProvider::new(); let mut client = Arc::new(TestClientBuilder::new().build()); let blocks = (0..100).map(|_| build_block(&mut client, None, false)).collect::>(); @@ -1006,16 +914,7 @@ fn request_across_forks() { fork_blocks }; - let mut sync = ChainSync::new( - SyncMode::Full, - client.clone(), - ProtocolName::from("test-block-announce-protocol"), - 5, - 64, - None, - chain_sync_network_handle, - ) - .unwrap(); + let mut sync = ChainSync::new(SyncMode::Full, client.clone(), 5, 64, None).unwrap(); // Add the peers, all at the common ancestor 100. let common_block = blocks.last().unwrap(); diff --git a/substrate/client/network/sync/src/engine.rs b/substrate/client/network/sync/src/engine.rs index 02a0dc2e9151..21c14c375013 100644 --- a/substrate/client/network/sync/src/engine.rs +++ b/substrate/client/network/sync/src/engine.rs @@ -27,7 +27,7 @@ use crate::{ block_request_handler::MAX_BLOCKS_IN_RESPONSE, chain_sync::{ BlockRequestAction, ChainSync, ImportBlocksAction, ImportJustificationsAction, - OnBlockResponse, + OnBlockResponse, OnStateResponse, }, pending_responses::{PendingResponses, ResponseEvent}, schema::v1::{StateRequest, StateResponse}, @@ -455,11 +455,9 @@ where let chain_sync = ChainSync::new( mode, client.clone(), - block_announce_protocol_name.clone(), max_parallel_downloads, max_blocks_per_request, warp_sync_config, - network_service.clone(), )?; let (tx, service_rx) = tracing_unbounded("mpsc_chain_sync", 100_000); @@ -1212,6 +1210,13 @@ where OnBlockResponse::ImportJustifications(action) => self.import_justifications(action), OnBlockResponse::Nothing => {}, + OnBlockResponse::DisconnectPeer(BadPeer(peer_id, rep)) => { + self.network_service.disconnect_peer( + peer_id, + self.block_announce_protocol_name.clone(), + ); + self.network_service.report_peer(peer_id, rep); + }, } }, Err(BlockResponseError::DecodeFailed(e)) => { @@ -1257,14 +1262,30 @@ where }, }; - if let Some(import_blocks_action) = - self.chain_sync.on_state_response(peer_id, response) - { - self.import_blocks(import_blocks_action); + match self.chain_sync.on_state_response(peer_id, response) { + OnStateResponse::ImportBlocks(import_blocks_action) => + self.import_blocks(import_blocks_action), + OnStateResponse::DisconnectPeer(BadPeer(peer_id, rep)) => { + self.network_service.disconnect_peer( + peer_id, + self.block_announce_protocol_name.clone(), + ); + self.network_service.report_peer(peer_id, rep); + }, + OnStateResponse::Nothing => {}, } }, PeerRequest::WarpProof => { - self.chain_sync.on_warp_sync_response(peer_id, EncodedProof(resp)); + match self.chain_sync.on_warp_sync_response(&peer_id, EncodedProof(resp)) { + Ok(()) => (), + Err(BadPeer(peer_id, rep)) => { + self.network_service.disconnect_peer( + peer_id, + self.block_announce_protocol_name.clone(), + ); + self.network_service.report_peer(peer_id, rep); + }, + } }, }, Ok(Err(e)) => { From feeb6000c049dd0353cf16ef4f0b252bc28278fd Mon Sep 17 00:00:00 2001 From: Dmitry Markin Date: Fri, 3 Nov 2023 16:09:08 +0200 Subject: [PATCH 08/12] Remove unneeded `pub` declarations --- substrate/client/network/sync/src/chain_sync.rs | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/substrate/client/network/sync/src/chain_sync.rs b/substrate/client/network/sync/src/chain_sync.rs index 37eb5ffb3556..844b58926c33 100644 --- a/substrate/client/network/sync/src/chain_sync.rs +++ b/substrate/client/network/sync/src/chain_sync.rs @@ -210,7 +210,7 @@ pub struct ImportJustificationsAction { /// Result of [`ChainSync::on_block_data`]. #[derive(Debug, Clone, PartialEq, Eq)] -pub enum OnBlockData { +enum OnBlockData { /// The block should be imported. Import(ImportBlocksAction), /// A new block request needs to be made to the given peer. @@ -221,7 +221,7 @@ pub enum OnBlockData { /// Result of [`ChainSync::on_block_justification`]. #[derive(Debug, Clone, PartialEq, Eq)] -pub enum OnBlockJustification { +enum OnBlockJustification { /// The justification needs no further handling. Nothing, /// The justification should be imported. @@ -715,7 +715,7 @@ where /// Submit a block response for processing. #[must_use] - pub fn on_block_data( + fn on_block_data( &mut self, who: &PeerId, request: Option>, @@ -981,7 +981,7 @@ where /// Submit a justification response for processing. #[must_use] - pub fn on_block_justification( + fn on_block_justification( &mut self, who: PeerId, response: BlockResponse, From e62af3ef89ca4cfc40887fb989bf95965d3aae55 Mon Sep 17 00:00:00 2001 From: Dmitry Markin Date: Fri, 3 Nov 2023 16:12:43 +0200 Subject: [PATCH 09/12] Remove more unneeded `pub` declarations --- substrate/client/network/sync/src/engine.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/substrate/client/network/sync/src/engine.rs b/substrate/client/network/sync/src/engine.rs index 21c14c375013..7b2e4be888aa 100644 --- a/substrate/client/network/sync/src/engine.rs +++ b/substrate/client/network/sync/src/engine.rs @@ -915,7 +915,7 @@ where /// Called by peer when it is disconnecting. /// /// Returns a result if the handshake of this peer was indeed accepted. - pub fn on_sync_peer_disconnected(&mut self, peer_id: PeerId) -> Result<(), ()> { + fn on_sync_peer_disconnected(&mut self, peer_id: PeerId) -> Result<(), ()> { if let Some(info) = self.peers.remove(&peer_id) { if self.important_peers.contains(&peer_id) { log::warn!(target: LOG_TARGET, "Reserved peer {peer_id} disconnected"); @@ -959,7 +959,7 @@ where /// /// Returns `Ok` if the handshake is accepted and the peer added to the list of peers we sync /// from. - pub fn on_sync_peer_connected( + fn on_sync_peer_connected( &mut self, peer_id: PeerId, status: &BlockAnnouncesHandshake, From b780003f57fcba598658732612b2bb9eb1aaafe2 Mon Sep 17 00:00:00 2001 From: Dmitry Markin Date: Mon, 6 Nov 2023 10:35:33 +0200 Subject: [PATCH 10/12] Apply review suggestions --- substrate/client/network/sync/src/engine.rs | 15 ++++++--------- 1 file changed, 6 insertions(+), 9 deletions(-) diff --git a/substrate/client/network/sync/src/engine.rs b/substrate/client/network/sync/src/engine.rs index 7b2e4be888aa..560887132e3a 100644 --- a/substrate/client/network/sync/src/engine.rs +++ b/substrate/client/network/sync/src/engine.rs @@ -1276,15 +1276,12 @@ where } }, PeerRequest::WarpProof => { - match self.chain_sync.on_warp_sync_response(&peer_id, EncodedProof(resp)) { - Ok(()) => (), - Err(BadPeer(peer_id, rep)) => { - self.network_service.disconnect_peer( - peer_id, - self.block_announce_protocol_name.clone(), - ); - self.network_service.report_peer(peer_id, rep); - }, + if let Err(BadPeer(peer_id, rep)) = + self.chain_sync.on_warp_sync_response(&peer_id, EncodedProof(resp)) + { + self.network_service + .disconnect_peer(peer_id, self.block_announce_protocol_name.clone()); + self.network_service.report_peer(peer_id, rep); } }, }, From 1bea9072955711f56915195354ef176dc359125c Mon Sep 17 00:00:00 2001 From: Dmitry Markin Date: Mon, 6 Nov 2023 10:43:41 +0200 Subject: [PATCH 11/12] Rename `who`->`peer_id` in `ChainSync` --- .../client/network/sync/src/chain_sync.rs | 247 +++++++++--------- 1 file changed, 126 insertions(+), 121 deletions(-) diff --git a/substrate/client/network/sync/src/chain_sync.rs b/substrate/client/network/sync/src/chain_sync.rs index 6fcbe7664429..858125f93f1f 100644 --- a/substrate/client/network/sync/src/chain_sync.rs +++ b/substrate/client/network/sync/src/chain_sync.rs @@ -434,9 +434,9 @@ where } /// Get peer's best hash & number. - pub fn peer_info(&self, who: &PeerId) -> Option> { + pub fn peer_info(&self, peer_id: &PeerId) -> Option> { self.peers - .get(who) + .get(peer_id) .map(|p| PeerInfo { best_hash: p.best_hash, best_number: p.best_number }) } @@ -512,7 +512,7 @@ where #[must_use] pub fn new_peer( &mut self, - who: PeerId, + peer_id: PeerId, best_hash: B::Hash, best_number: NumberFor, ) -> Result>, BadPeer> { @@ -520,19 +520,21 @@ where match self.block_status(&best_hash) { Err(e) => { debug!(target:LOG_TARGET, "Error reading blockchain: {e}"); - Err(BadPeer(who, rep::BLOCKCHAIN_READ_ERROR)) + Err(BadPeer(peer_id, rep::BLOCKCHAIN_READ_ERROR)) }, Ok(BlockStatus::KnownBad) => { - info!("💔 New peer {who} with known bad best block {best_hash} ({best_number})."); - Err(BadPeer(who, rep::BAD_BLOCK)) + info!( + "💔 New peer {peer_id} with known bad best block {best_hash} ({best_number})." + ); + Err(BadPeer(peer_id, rep::BAD_BLOCK)) }, Ok(BlockStatus::Unknown) => { if best_number.is_zero() { info!( "💔 New peer {} with unknown genesis hash {} ({}).", - who, best_hash, best_number, + peer_id, best_hash, best_number, ); - return Err(BadPeer(who, rep::GENESIS_MISMATCH)) + return Err(BadPeer(peer_id, rep::GENESIS_MISMATCH)) } // If there are more than `MAJOR_SYNC_BLOCKS` in the import queue then we have @@ -542,14 +544,14 @@ where debug!( target:LOG_TARGET, "New peer {} with unknown best hash {} ({}), assuming common block.", - who, + peer_id, self.best_queued_hash, self.best_queued_number ); self.peers.insert( - who, + peer_id, PeerSync { - peer_id: who, + peer_id, common_number: self.best_queued_number, best_hash, best_number, @@ -563,7 +565,7 @@ where let (state, req) = if self.best_queued_number.is_zero() { debug!( target:LOG_TARGET, - "New peer {who} with best hash {best_hash} ({best_number}).", + "New peer {peer_id} with best hash {best_hash} ({best_number}).", ); (PeerSyncState::Available, None) @@ -573,7 +575,7 @@ where debug!( target:LOG_TARGET, "New peer {} with unknown best hash {} ({}), searching for common ancestor.", - who, + peer_id, best_hash, best_number ); @@ -588,11 +590,11 @@ where ) }; - self.allowed_requests.add(&who); + self.allowed_requests.add(&peer_id); self.peers.insert( - who, + peer_id, PeerSync { - peer_id: who, + peer_id, common_number: Zero::zero(), best_hash, best_number, @@ -621,19 +623,19 @@ where Ok(BlockStatus::InChainPruned) => { debug!( target: LOG_TARGET, - "New peer {who} with known best hash {best_hash} ({best_number}).", + "New peer {peer_id} with known best hash {best_hash} ({best_number}).", ); self.peers.insert( - who, + peer_id, PeerSync { - peer_id: who, + peer_id, common_number: std::cmp::min(self.best_queued_number, best_number), best_hash, best_number, state: PeerSyncState::Available, }, ); - self.allowed_requests.add(&who); + self.allowed_requests.add(&peer_id); Ok(None) }, } @@ -722,39 +724,39 @@ where #[must_use] fn on_block_data( &mut self, - who: &PeerId, + peer_id: &PeerId, request: Option>, response: BlockResponse, ) -> Result, BadPeer> { self.downloaded_blocks += response.blocks.len(); let mut gap = false; - let new_blocks: Vec> = if let Some(peer) = self.peers.get_mut(who) { + let new_blocks: Vec> = if let Some(peer) = self.peers.get_mut(peer_id) { let mut blocks = response.blocks; if request.as_ref().map_or(false, |r| r.direction == Direction::Descending) { trace!(target: LOG_TARGET, "Reversing incoming block list"); blocks.reverse() } - self.allowed_requests.add(who); + self.allowed_requests.add(peer_id); if let Some(request) = request { match &mut peer.state { PeerSyncState::DownloadingNew(_) => { - self.blocks.clear_peer_download(who); + self.blocks.clear_peer_download(peer_id); peer.state = PeerSyncState::Available; if let Some(start_block) = - validate_blocks::(&blocks, who, Some(request))? + validate_blocks::(&blocks, peer_id, Some(request))? { - self.blocks.insert(start_block, blocks, *who); + self.blocks.insert(start_block, blocks, *peer_id); } self.ready_blocks() }, PeerSyncState::DownloadingGap(_) => { peer.state = PeerSyncState::Available; if let Some(gap_sync) = &mut self.gap_sync { - gap_sync.blocks.clear_peer_download(who); + gap_sync.blocks.clear_peer_download(peer_id); if let Some(start_block) = - validate_blocks::(&blocks, who, Some(request))? + validate_blocks::(&blocks, peer_id, Some(request))? { - gap_sync.blocks.insert(start_block, blocks, *who); + gap_sync.blocks.insert(start_block, blocks, *peer_id); } gap = true; let blocks: Vec<_> = gap_sync @@ -790,17 +792,17 @@ where ); blocks } else { - debug!(target: LOG_TARGET, "Unexpected gap block response from {who}"); - return Err(BadPeer(*who, rep::NO_BLOCK)) + debug!(target: LOG_TARGET, "Unexpected gap block response from {peer_id}"); + return Err(BadPeer(*peer_id, rep::NO_BLOCK)) } }, PeerSyncState::DownloadingStale(_) => { peer.state = PeerSyncState::Available; if blocks.is_empty() { - debug!(target: LOG_TARGET, "Empty block response from {who}"); - return Err(BadPeer(*who, rep::NO_BLOCK)) + debug!(target: LOG_TARGET, "Empty block response from {peer_id}"); + return Err(BadPeer(*peer_id, rep::NO_BLOCK)) } - validate_blocks::(&blocks, who, Some(request))?; + validate_blocks::(&blocks, peer_id, Some(request))?; blocks .into_iter() .map(|b| { @@ -813,7 +815,7 @@ where body: b.body, indexed_body: None, justifications, - origin: Some(*who), + origin: Some(*peer_id), allow_missing_state: true, import_existing: self.import_existing, skip_execution: self.skip_execution(), @@ -830,23 +832,23 @@ where "Got ancestry block #{} ({}) from peer {}", current, block.hash, - who, + peer_id, ); maybe_our_block_hash.filter(|x| x == &block.hash) }, (None, _) => { debug!( target: LOG_TARGET, - "Invalid response when searching for ancestor from {who}", + "Invalid response when searching for ancestor from {peer_id}", ); - return Err(BadPeer(*who, rep::UNKNOWN_ANCESTOR)) + return Err(BadPeer(*peer_id, rep::UNKNOWN_ANCESTOR)) }, (_, Err(e)) => { info!( target: LOG_TARGET, "❌ Error answering legitimate blockchain query: {e}", ); - return Err(BadPeer(*who, rep::BLOCKCHAIN_READ_ERROR)) + return Err(BadPeer(*peer_id, rep::BLOCKCHAIN_READ_ERROR)) }, }; if matching_hash.is_some() { @@ -859,7 +861,7 @@ where trace!( target: LOG_TARGET, "Ancestry search: opportunistically updating peer {} common number from={} => to={}.", - *who, + *peer_id, peer.common_number, self.best_queued_number, ); @@ -868,7 +870,7 @@ where trace!( target: LOG_TARGET, "Ancestry search: updating peer {} common number from={} => to={}.", - *who, + *peer_id, peer.common_number, *current, ); @@ -878,9 +880,9 @@ where if matching_hash.is_none() && current.is_zero() { trace!( target:LOG_TARGET, - "Ancestry search: genesis mismatch for peer {who}", + "Ancestry search: genesis mismatch for peer {peer_id}", ); - return Err(BadPeer(*who, rep::GENESIS_MISMATCH)) + return Err(BadPeer(*peer_id, rep::GENESIS_MISMATCH)) } if let Some((next_state, next_num)) = handle_ancestor_search_state(state, *current, matching_hash.is_some()) @@ -890,7 +892,10 @@ where start: *start, state: next_state, }; - return Ok(OnBlockData::Request(*who, ancestry_request::(next_num))) + return Ok(OnBlockData::Request( + *peer_id, + ancestry_request::(next_num), + )) } else { // Ancestry search is complete. Check if peer is on a stale fork unknown // to us and add it to sync targets if necessary. @@ -911,7 +916,7 @@ where target: LOG_TARGET, "Added fork target {} for {}", peer.best_hash, - who, + peer_id, ); self.fork_targets .entry(peer.best_hash) @@ -921,7 +926,7 @@ where peers: Default::default(), }) .peers - .insert(*who); + .insert(*peer_id); } peer.state = PeerSyncState::Available; Vec::new() @@ -931,32 +936,32 @@ where peer.state = PeerSyncState::Available; if let Some(warp_sync) = &mut self.warp_sync { if blocks.len() == 1 { - validate_blocks::(&blocks, who, Some(request))?; + validate_blocks::(&blocks, peer_id, Some(request))?; match warp_sync.import_target_block( blocks.pop().expect("`blocks` len checked above."), ) { warp::TargetBlockImportResult::Success => return Ok(OnBlockData::Continue), warp::TargetBlockImportResult::BadResponse => - return Err(BadPeer(*who, rep::VERIFICATION_FAIL)), + return Err(BadPeer(*peer_id, rep::VERIFICATION_FAIL)), } } else if blocks.is_empty() { - debug!(target: LOG_TARGET, "Empty block response from {who}"); - return Err(BadPeer(*who, rep::NO_BLOCK)) + debug!(target: LOG_TARGET, "Empty block response from {peer_id}"); + return Err(BadPeer(*peer_id, rep::NO_BLOCK)) } else { debug!( target: LOG_TARGET, "Too many blocks ({}) in warp target block response from {}", blocks.len(), - who, + peer_id, ); - return Err(BadPeer(*who, rep::NOT_REQUESTED)) + return Err(BadPeer(*peer_id, rep::NOT_REQUESTED)) } } else { debug!( target: LOG_TARGET, "Logic error: we think we are downloading warp target block from {}, but no warp sync is happening.", - who, + peer_id, ); return Ok(OnBlockData::Continue) } @@ -968,7 +973,7 @@ where } } else { // When request.is_none() this is a block announcement. Just accept blocks. - validate_blocks::(&blocks, who, None)?; + validate_blocks::(&blocks, peer_id, None)?; blocks .into_iter() .map(|b| { @@ -981,7 +986,7 @@ where body: b.body, indexed_body: None, justifications, - origin: Some(*who), + origin: Some(*peer_id), allow_missing_state: true, import_existing: false, skip_execution: true, @@ -992,7 +997,7 @@ where } } else { // We don't know of this peer, so we also did not request anything from it. - return Err(BadPeer(*who, rep::NOT_REQUESTED)) + return Err(BadPeer(*peer_id, rep::NOT_REQUESTED)) }; Ok(OnBlockData::Import(self.validate_and_queue_blocks(new_blocks, gap))) @@ -1002,10 +1007,10 @@ where #[must_use] fn on_block_justification( &mut self, - who: PeerId, + peer_id: PeerId, response: BlockResponse, ) -> Result, BadPeer> { - let peer = if let Some(peer) = self.peers.get_mut(&who) { + let peer = if let Some(peer) = self.peers.get_mut(&peer_id) { peer } else { error!( @@ -1015,7 +1020,7 @@ where return Ok(OnBlockJustification::Nothing) }; - self.allowed_requests.add(&who); + self.allowed_requests.add(&peer_id); if let PeerSyncState::DownloadingJustification(hash) = peer.state { peer.state = PeerSyncState::Available; @@ -1025,11 +1030,11 @@ where warn!( target: LOG_TARGET, "💔 Invalid block justification provided by {}: requested: {:?} got: {:?}", - who, + peer_id, hash, block.hash, ); - return Err(BadPeer(who, rep::BAD_JUSTIFICATION)) + return Err(BadPeer(peer_id, rep::BAD_JUSTIFICATION)) } block @@ -1040,14 +1045,14 @@ where // had but didn't (regardless of whether it had a justification for it or not). trace!( target: LOG_TARGET, - "Peer {who:?} provided empty response for justification request {hash:?}", + "Peer {peer_id:?} provided empty response for justification request {hash:?}", ); None }; if let Some((peer_id, hash, number, justifications)) = - self.extra_justifications.on_response(who, justification) + self.extra_justifications.on_response(peer_id, justification) { return Ok(OnBlockJustification::Import { peer_id, hash, number, justifications }) } @@ -1108,7 +1113,7 @@ where pub fn on_validated_block_announce( &mut self, is_best: bool, - who: PeerId, + peer_id: PeerId, announce: &BlockAnnounce, ) { let number = *announce.header.number(); @@ -1119,7 +1124,7 @@ where let ancient_parent = parent_status == BlockStatus::InChainPruned; let known = self.is_known(&hash); - let peer = if let Some(peer) = self.peers.get_mut(&who) { + let peer = if let Some(peer) = self.peers.get_mut(&peer_id) { peer } else { error!(target: LOG_TARGET, "💔 Called `on_validated_block_announce` with a bad peer ID"); @@ -1127,7 +1132,7 @@ where }; if let PeerSyncState::AncestorSearch { .. } = peer.state { - trace!(target: LOG_TARGET, "Peer {} is in the ancestor search state.", who); + trace!(target: LOG_TARGET, "Peer {} is in the ancestor search state.", peer_id); return } @@ -1141,20 +1146,20 @@ where // is either one further ahead or it's the one they just announced, if we know about it. if is_best { if known && self.best_queued_number >= number { - self.update_peer_common_number(&who, number); + self.update_peer_common_number(&peer_id, number); } else if announce.header.parent_hash() == &self.best_queued_hash || known_parent && self.best_queued_number >= number { - self.update_peer_common_number(&who, number.saturating_sub(One::one())); + self.update_peer_common_number(&peer_id, number.saturating_sub(One::one())); } } - self.allowed_requests.add(&who); + self.allowed_requests.add(&peer_id); // known block case if known || self.is_already_downloading(&hash) { - trace!(target: "sync", "Known block announce from {}: {}", who, hash); + trace!(target: "sync", "Known block announce from {}: {}", peer_id, hash); if let Some(target) = self.fork_targets.get_mut(&hash) { - target.peers.insert(who); + target.peers.insert(peer_id); } return } @@ -1163,7 +1168,7 @@ where trace!( target: "sync", "Ignored ancient block announced from {}: {} {:?}", - who, + peer_id, hash, announce.header, ); @@ -1174,7 +1179,7 @@ where trace!( target: "sync", "Added sync target for block announced from {}: {} {:?}", - who, + peer_id, hash, announce.summary(), ); @@ -1186,22 +1191,22 @@ where peers: Default::default(), }) .peers - .insert(who); + .insert(peer_id); } } /// Notify that a sync peer has disconnected. #[must_use] - pub fn peer_disconnected(&mut self, who: &PeerId) -> Option> { - self.blocks.clear_peer_download(who); + pub fn peer_disconnected(&mut self, peer_id: &PeerId) -> Option> { + self.blocks.clear_peer_download(peer_id); if let Some(gap_sync) = &mut self.gap_sync { - gap_sync.blocks.clear_peer_download(who) + gap_sync.blocks.clear_peer_download(peer_id) } - self.peers.remove(who); - self.extra_justifications.peer_disconnected(who); + self.peers.remove(peer_id); + self.extra_justifications.peer_disconnected(peer_id); self.allowed_requests.set_all(); self.fork_targets.retain(|_, target| { - target.peers.remove(who); + target.peers.remove(peer_id); !target.peers.is_empty() }); @@ -1830,7 +1835,7 @@ where fn on_state_data( &mut self, - who: &PeerId, + peer_id: &PeerId, response: OpaqueStateResponse, ) -> Result, BadPeer> { let response: Box = response.0.downcast().map_err(|_error| { @@ -1839,10 +1844,10 @@ where "Failed to downcast opaque state response, this is an implementation bug." ); - BadPeer(*who, rep::BAD_RESPONSE) + BadPeer(*peer_id, rep::BAD_RESPONSE) })?; - if let Some(peer) = self.peers.get_mut(who) { + if let Some(peer) = self.peers.get_mut(peer_id) { if let PeerSyncState::DownloadingState = peer.state { peer.state = PeerSyncState::Available; self.allowed_requests.set_all(); @@ -1852,7 +1857,7 @@ where debug!( target: LOG_TARGET, "Importing state data from {} with {} keys, {} proof nodes.", - who, + peer_id, response.entries.len(), response.proof.len(), ); @@ -1861,14 +1866,14 @@ where debug!( target: LOG_TARGET, "Importing state data from {} with {} keys, {} proof nodes.", - who, + peer_id, response.entries.len(), response.proof.len(), ); sync.import_state(*response) } else { - debug!(target: LOG_TARGET, "Ignored obsolete state response from {who}"); - return Err(BadPeer(*who, rep::NOT_REQUESTED)) + debug!(target: LOG_TARGET, "Ignored obsolete state response from {peer_id}"); + return Err(BadPeer(*peer_id, rep::NOT_REQUESTED)) }; match import_result { @@ -1891,8 +1896,8 @@ where }, ImportResult::Continue => Ok(OnStateData::Continue), ImportResult::BadResponse => { - debug!(target: LOG_TARGET, "Bad state data received from {who}"); - Err(BadPeer(*who, rep::BAD_BLOCK)) + debug!(target: LOG_TARGET, "Bad state data received from {peer_id}"); + Err(BadPeer(*peer_id, rep::BAD_BLOCK)) }, } } @@ -1901,10 +1906,10 @@ where #[must_use] pub fn on_warp_sync_response( &mut self, - who: &PeerId, + peer_id: &PeerId, response: EncodedProof, ) -> Result<(), BadPeer> { - if let Some(peer) = self.peers.get_mut(who) { + if let Some(peer) = self.peers.get_mut(peer_id) { if let PeerSyncState::DownloadingWarpProof = peer.state { peer.state = PeerSyncState::Available; self.allowed_requests.set_all(); @@ -1914,20 +1919,20 @@ where debug!( target: LOG_TARGET, "Importing warp proof data from {}, {} bytes.", - who, + peer_id, response.0.len(), ); sync.import_warp_proof(response) } else { - debug!(target: LOG_TARGET, "Ignored obsolete warp sync response from {who}"); - return Err(BadPeer(*who, rep::NOT_REQUESTED)) + debug!(target: LOG_TARGET, "Ignored obsolete warp sync response from {peer_id}"); + return Err(BadPeer(*peer_id, rep::NOT_REQUESTED)) }; match import_result { WarpProofImportResult::Success => Ok(()), WarpProofImportResult::BadResponse => { - debug!(target: LOG_TARGET, "Bad proof data received from {who}"); - Err(BadPeer(*who, rep::BAD_BLOCK)) + debug!(target: LOG_TARGET, "Bad proof data received from {peer_id}"); + Err(BadPeer(*peer_id, rep::BAD_BLOCK)) }, } } @@ -1964,11 +1969,11 @@ where has_error |= result.is_err(); match result { - Ok(BlockImportStatus::ImportedKnown(number, who)) => - if let Some(peer) = who { + Ok(BlockImportStatus::ImportedKnown(number, peer_id)) => + if let Some(peer) = peer_id { self.update_peer_common_number(&peer, number); }, - Ok(BlockImportStatus::ImportedUnknown(number, aux, who)) => { + Ok(BlockImportStatus::ImportedUnknown(number, aux, peer_id)) => { if aux.clear_justification_requests { trace!( target: LOG_TARGET, @@ -1986,13 +1991,13 @@ where } if aux.bad_justification { - if let Some(ref peer) = who { + if let Some(ref peer) = peer_id { warn!("💔 Sent block with bad justification to import"); output.push(Err(BadPeer(*peer, rep::BAD_JUSTIFICATION))); } } - if let Some(peer) = who { + if let Some(peer) = peer_id { self.update_peer_common_number(&peer, number); } let state_sync_complete = @@ -2031,8 +2036,8 @@ where self.gap_sync = None; } }, - Err(BlockImportError::IncompleteHeader(who)) => - if let Some(peer) = who { + Err(BlockImportError::IncompleteHeader(peer_id)) => + if let Some(peer) = peer_id { warn!( target: LOG_TARGET, "💔 Peer sent block with incomplete header to import", @@ -2040,23 +2045,23 @@ where output.push(Err(BadPeer(peer, rep::INCOMPLETE_HEADER))); output.extend(self.restart()); }, - Err(BlockImportError::VerificationFailed(who, e)) => { - let extra_message = - who.map_or_else(|| "".into(), |peer| format!(" received from ({peer})")); + Err(BlockImportError::VerificationFailed(peer_id, e)) => { + let extra_message = peer_id + .map_or_else(|| "".into(), |peer| format!(" received from ({peer})")); warn!( target: LOG_TARGET, "💔 Verification failed for block {hash:?}{extra_message}: {e:?}", ); - if let Some(peer) = who { + if let Some(peer) = peer_id { output.push(Err(BadPeer(peer, rep::VERIFICATION_FAIL))); } output.extend(self.restart()); }, - Err(BlockImportError::BadBlock(who)) => - if let Some(peer) = who { + Err(BlockImportError::BadBlock(peer_id)) => + if let Some(peer) = peer_id { warn!( target: LOG_TARGET, "💔 Block {hash:?} received from peer {peer} has been blacklisted", @@ -2350,7 +2355,7 @@ where /// It is expected that `blocks` are in ascending order. fn validate_blocks( blocks: &Vec>, - who: &PeerId, + peer_id: &PeerId, request: Option>, ) -> Result>, BadPeer> { if let Some(request) = request { @@ -2358,12 +2363,12 @@ fn validate_blocks( debug!( target: LOG_TARGET, "Received more blocks than requested from {}. Expected in maximum {:?}, got {}.", - who, + peer_id, request.max, blocks.len(), ); - return Err(BadPeer(*who, rep::NOT_REQUESTED)) + return Err(BadPeer(*peer_id, rep::NOT_REQUESTED)) } let block_header = @@ -2383,7 +2388,7 @@ fn validate_blocks( block_header, ); - return Err(BadPeer(*who, rep::NOT_REQUESTED)) + return Err(BadPeer(*peer_id, rep::NOT_REQUESTED)) } if request.fields.contains(BlockAttributes::HEADER) && @@ -2391,20 +2396,20 @@ fn validate_blocks( { trace!( target: LOG_TARGET, - "Missing requested header for a block in response from {who}.", + "Missing requested header for a block in response from {peer_id}.", ); - return Err(BadPeer(*who, rep::BAD_RESPONSE)) + return Err(BadPeer(*peer_id, rep::BAD_RESPONSE)) } if request.fields.contains(BlockAttributes::BODY) && blocks.iter().any(|b| b.body.is_none()) { trace!( target: LOG_TARGET, - "Missing requested body for a block in response from {who}.", + "Missing requested body for a block in response from {peer_id}.", ); - return Err(BadPeer(*who, rep::BAD_RESPONSE)) + return Err(BadPeer(*peer_id, rep::BAD_RESPONSE)) } } @@ -2415,11 +2420,11 @@ fn validate_blocks( debug!( target:LOG_TARGET, "Bad header received from {}. Expected hash {:?}, got {:?}", - who, + peer_id, b.hash, hash, ); - return Err(BadPeer(*who, rep::BAD_BLOCK)) + return Err(BadPeer(*peer_id, rep::BAD_BLOCK)) } } if let (Some(header), Some(body)) = (&b.header, &b.body) { @@ -2433,11 +2438,11 @@ fn validate_blocks( target:LOG_TARGET, "Bad extrinsic root for a block {} received from {}. Expected {:?}, got {:?}", b.hash, - who, + peer_id, expected, got, ); - return Err(BadPeer(*who, rep::BAD_BLOCK)) + return Err(BadPeer(*peer_id, rep::BAD_BLOCK)) } } } From 855e10a84076cd18e3098f893ed7ee63b383f802 Mon Sep 17 00:00:00 2001 From: Dmitry Markin Date: Mon, 6 Nov 2023 12:03:07 +0200 Subject: [PATCH 12/12] Run `zepter` to fix feature propagation --- substrate/bin/minimal/runtime/Cargo.toml | 2 ++ 1 file changed, 2 insertions(+) diff --git a/substrate/bin/minimal/runtime/Cargo.toml b/substrate/bin/minimal/runtime/Cargo.toml index 08feb445879f..b75816a5ea3b 100644 --- a/substrate/bin/minimal/runtime/Cargo.toml +++ b/substrate/bin/minimal/runtime/Cargo.toml @@ -28,6 +28,7 @@ substrate-wasm-builder = { path = "../../../utils/wasm-builder", optional = true [features] default = [ "std" ] std = [ + "frame-support/std", "frame/std", "pallet-balances/std", "pallet-sudo/std", @@ -36,5 +37,6 @@ std = [ "pallet-transaction-payment/std", "parity-scale-codec/std", "scale-info/std", + "sp-genesis-builder/std", "substrate-wasm-builder", ]