Skip to content

Commit

Permalink
Unify ChainSync actions under one enum (follow-up) (#2317)
Browse files Browse the repository at this point in the history
Get rid of public `ChainSync::..._requests()` functions and return all
requests as actions.

---------

Co-authored-by: Sebastian Kunert <skunert49@gmail.com>
  • Loading branch information
dmitry-markin and skunert authored Nov 15, 2023
1 parent f536043 commit 18165eb
Show file tree
Hide file tree
Showing 7 changed files with 74 additions and 47 deletions.
50 changes: 40 additions & 10 deletions substrate/client/network/sync/src/chain_sync.rs
Original file line number Diff line number Diff line change
Expand Up @@ -191,6 +191,10 @@ pub enum ChainSyncAction<B: BlockT> {
SendBlockRequest { peer_id: PeerId, request: BlockRequest<B> },
/// Drop stale block request.
CancelBlockRequest { peer_id: PeerId },
/// Send state request to peer.
SendStateRequest { peer_id: PeerId, request: OpaqueStateRequest },
/// Send warp proof request to peer.
SendWarpProofRequest { peer_id: PeerId, request: WarpProofRequest<B> },
/// Peer misbehaved. Disconnect, report it and cancel the block request to it.
DropPeer(BadPeer),
/// Import blocks.
Expand Down Expand Up @@ -1420,11 +1424,6 @@ where
.any(|(_, p)| p.state == PeerSyncState::DownloadingStale(*hash))
}

/// Check if the peer is known to the sync state machine. Used for sanity checks.
pub fn is_peer_known(&self, peer_id: &PeerId) -> bool {
self.peers.contains_key(peer_id)
}

/// Get the set of downloaded blocks that are ready to be queued for import.
fn ready_blocks(&mut self) -> Vec<IncomingBlock<B>> {
self.blocks
Expand Down Expand Up @@ -1537,7 +1536,7 @@ where
}

/// Get justification requests scheduled by sync to be sent out.
pub fn justification_requests(&mut self) -> Vec<(PeerId, BlockRequest<B>)> {
fn justification_requests(&mut self) -> Vec<(PeerId, BlockRequest<B>)> {
let peers = &mut self.peers;
let mut matcher = self.extra_justifications.matcher();
std::iter::from_fn(move || {
Expand All @@ -1564,7 +1563,7 @@ where
}

/// Get block requests scheduled by sync to be sent out.
pub fn block_requests(&mut self) -> Vec<(PeerId, BlockRequest<B>)> {
fn block_requests(&mut self) -> Vec<(PeerId, BlockRequest<B>)> {
if self.mode == SyncMode::Warp {
return self
.warp_target_block_request()
Expand Down Expand Up @@ -1691,7 +1690,7 @@ where
}

/// Get a state request scheduled by sync to be sent out (if any).
pub fn state_request(&mut self) -> Option<(PeerId, OpaqueStateRequest)> {
fn state_request(&mut self) -> Option<(PeerId, OpaqueStateRequest)> {
if self.allowed_requests.is_empty() {
return None
}
Expand Down Expand Up @@ -1737,7 +1736,7 @@ where
}

/// Get a warp proof request scheduled by sync to be sent out (if any).
pub fn warp_sync_request(&mut self) -> Option<(PeerId, WarpProofRequest<B>)> {
fn warp_sync_request(&mut self) -> Option<(PeerId, WarpProofRequest<B>)> {
if let Some(sync) = &self.warp_sync {
if self.allowed_requests.is_empty() ||
sync.is_complete() ||
Expand Down Expand Up @@ -2025,7 +2024,38 @@ where

/// Get pending actions to perform.
#[must_use]
pub fn take_actions(&mut self) -> impl Iterator<Item = ChainSyncAction<B>> {
pub fn actions(&mut self) -> impl Iterator<Item = ChainSyncAction<B>> {
let block_requests = self
.block_requests()
.into_iter()
.map(|(peer_id, request)| ChainSyncAction::SendBlockRequest { peer_id, request });
self.actions.extend(block_requests);

let justification_requests = self
.justification_requests()
.into_iter()
.map(|(peer_id, request)| ChainSyncAction::SendBlockRequest { peer_id, request });
self.actions.extend(justification_requests);

let state_request = self
.state_request()
.into_iter()
.map(|(peer_id, request)| ChainSyncAction::SendStateRequest { peer_id, request });
self.actions.extend(state_request);

let warp_proof_request = self
.warp_sync_request()
.into_iter()
.map(|(peer_id, request)| ChainSyncAction::SendWarpProofRequest { peer_id, request });
self.actions.extend(warp_proof_request);

std::mem::take(&mut self.actions).into_iter()
}

/// A version of `actions()` that doesn't schedule extra requests. For testing only.
#[cfg(test)]
#[must_use]
fn take_actions(&mut self) -> impl Iterator<Item = ChainSyncAction<B>> {
std::mem::take(&mut self.actions).into_iter()
}
}
Expand Down
55 changes: 26 additions & 29 deletions substrate/client/network/sync/src/engine.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ use crate::{
schema::v1::{StateRequest, StateResponse},
service::{
self,
chain_sync::{SyncingService, ToServiceCommand},
syncing_service::{SyncingService, ToServiceCommand},
},
types::{
BadPeer, ExtendedPeerInfo, OpaqueStateRequest, OpaqueStateResponse, PeerRequest, SyncEvent,
Expand Down Expand Up @@ -713,16 +713,13 @@ where
self.is_major_syncing
.store(self.chain_sync.status().state.is_major_syncing(), Ordering::Relaxed);

// Process actions requested by `ChainSync` during `select!`.
// Process actions requested by `ChainSync`.
self.process_chain_sync_actions();

// Send outbound requests on `ChanSync`'s behalf.
self.send_chain_sync_requests();
}
}

fn process_chain_sync_actions(&mut self) {
self.chain_sync.take_actions().for_each(|action| match action {
self.chain_sync.actions().for_each(|action| match action {
ChainSyncAction::SendBlockRequest { peer_id, request } => {
// Sending block request implies dropping obsolete pending response as we are not
// interested in it anymore (see [`ChainSyncAction::SendBlockRequest`]).
Expand All @@ -741,7 +738,25 @@ where
ChainSyncAction::CancelBlockRequest { peer_id } => {
let removed = self.pending_responses.remove(&peer_id);

trace!(target: LOG_TARGET, "Processed {action:?}., response removed: {removed}.");
trace!(target: LOG_TARGET, "Processed {action:?}, response removed: {removed}.");
},
ChainSyncAction::SendStateRequest { peer_id, request } => {
self.send_state_request(peer_id, request);

trace!(
target: LOG_TARGET,
"Processed `ChainSyncAction::SendBlockRequest` to {peer_id}.",
);
},
ChainSyncAction::SendWarpProofRequest { peer_id, request } => {
self.send_warp_proof_request(peer_id, request.clone());

trace!(
target: LOG_TARGET,
"Processed `ChainSyncAction::SendWarpProofRequest` to {}, request: {:?}.",
peer_id,
request,
);
},
ChainSyncAction::DropPeer(BadPeer(peer_id, rep)) => {
self.pending_responses.remove(&peer_id);
Expand Down Expand Up @@ -1104,26 +1119,8 @@ where
Ok(())
}

fn send_chain_sync_requests(&mut self) {
for (peer_id, request) in self.chain_sync.block_requests() {
self.send_block_request(peer_id, request);
}

if let Some((peer_id, request)) = self.chain_sync.state_request() {
self.send_state_request(peer_id, request);
}

for (peer_id, request) in self.chain_sync.justification_requests() {
self.send_block_request(peer_id, request);
}

if let Some((peer_id, request)) = self.chain_sync.warp_sync_request() {
self.send_warp_sync_request(peer_id, request);
}
}

fn send_block_request(&mut self, peer_id: PeerId, request: BlockRequest<B>) {
if !self.chain_sync.is_peer_known(&peer_id) {
if !self.peers.contains_key(&peer_id) {
trace!(target: LOG_TARGET, "Cannot send block request to unknown peer {peer_id}");
debug_assert!(false);
return
Expand All @@ -1139,7 +1136,7 @@ where
}

fn send_state_request(&mut self, peer_id: PeerId, request: OpaqueStateRequest) {
if !self.chain_sync.is_peer_known(&peer_id) {
if !self.peers.contains_key(&peer_id) {
trace!(target: LOG_TARGET, "Cannot send state request to unknown peer {peer_id}");
debug_assert!(false);
return
Expand Down Expand Up @@ -1168,8 +1165,8 @@ where
}
}

fn send_warp_sync_request(&mut self, peer_id: PeerId, request: WarpProofRequest<B>) {
if !self.chain_sync.is_peer_known(&peer_id) {
fn send_warp_proof_request(&mut self, peer_id: PeerId, request: WarpProofRequest<B>) {
if !self.peers.contains_key(&peer_id) {
trace!(target: LOG_TARGET, "Cannot send warp proof request to unknown peer {peer_id}");
debug_assert!(false);
return
Expand Down
2 changes: 1 addition & 1 deletion substrate/client/network/sync/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@

//! Blockchain syncing implementation in Substrate.

pub use service::chain_sync::SyncingService;
pub use service::syncing_service::SyncingService;
pub use types::{SyncEvent, SyncEventStream, SyncState, SyncStatus, SyncStatusProvider};

mod block_announce_validator;
Expand Down
4 changes: 2 additions & 2 deletions substrate/client/network/sync/src/service/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,8 @@
// You should have received a copy of the GNU General Public License
// along with this program. If not, see <https://www.gnu.org/licenses/>.

//! `ChainSync`-related service code
//! `SyncingEngine`-related service code

pub mod chain_sync;
pub mod mock;
pub mod network;
pub mod syncing_service;
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ use std::{
},
};

/// Commands send to `ChainSync`
/// Commands send to `SyncingEngine`
pub enum ToServiceCommand<B: BlockT> {
SetSyncForkRequest(Vec<PeerId>, B::Hash, NumberFor<B>),
RequestJustification(B::Hash, NumberFor<B>),
Expand Down Expand Up @@ -63,7 +63,7 @@ pub enum ToServiceCommand<B: BlockT> {
// },
}

/// Handle for communicating with `ChainSync` asynchronously
/// Handle for communicating with `SyncingEngine` asynchronously
#[derive(Clone)]
pub struct SyncingService<B: BlockT> {
tx: TracingUnboundedSender<ToServiceCommand<B>>,
Expand Down Expand Up @@ -148,7 +148,7 @@ impl<B: BlockT> SyncingService<B> {

/// Get sync status
///
/// Returns an error if `ChainSync` has terminated.
/// Returns an error if `SyncingEngine` has terminated.
pub async fn status(&self) -> Result<SyncStatus<B>, ()> {
let (tx, rx) = oneshot::channel();
let _ = self.tx.unbounded_send(ToServiceCommand::Status(tx));
Expand Down
2 changes: 1 addition & 1 deletion substrate/client/network/sync/src/warp.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ const LOG_TARGET: &'static str = "sync";
pub struct EncodedProof(pub Vec<u8>);

/// Warp sync request
#[derive(Encode, Decode, Debug)]
#[derive(Encode, Decode, Debug, Clone)]
pub struct WarpProofRequest<B: BlockT> {
/// Start collecting proofs from this block.
pub begin: B::Hash,
Expand Down
2 changes: 1 addition & 1 deletion substrate/client/network/test/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ use sc_network_common::role::Roles;
use sc_network_light::light_client_requests::handler::LightClientRequestHandler;
use sc_network_sync::{
block_request_handler::BlockRequestHandler,
service::{chain_sync::SyncingService, network::NetworkServiceProvider},
service::{network::NetworkServiceProvider, syncing_service::SyncingService},
state_request_handler::StateRequestHandler,
warp::{
AuthorityList, EncodedProof, SetId, VerificationResult, WarpSyncParams, WarpSyncProvider,
Expand Down

0 comments on commit 18165eb

Please sign in to comment.