Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Unify ChainSync actions under one enum (follow-up) #2317

Merged
merged 7 commits into from
Nov 15, 2023
43 changes: 33 additions & 10 deletions substrate/client/network/sync/src/chain_sync.rs
Original file line number Diff line number Diff line change
Expand Up @@ -191,6 +191,10 @@ pub enum ChainSyncAction<B: BlockT> {
SendBlockRequest { peer_id: PeerId, request: BlockRequest<B> },
/// Drop stale block request.
CancelBlockRequest { peer_id: PeerId },
/// Send state request to peer.
SendStateRequest { peer_id: PeerId, request: OpaqueStateRequest },
/// Send warp proof request to peer.
SendWarpProofRequest { peer_id: PeerId, request: WarpProofRequest<B> },
/// Peer misbehaved. Disconnect, report it and cancel the block request to it.
DropPeer(BadPeer),
/// Import blocks.
Expand Down Expand Up @@ -1420,11 +1424,6 @@ where
.any(|(_, p)| p.state == PeerSyncState::DownloadingStale(*hash))
}

/// Check if the peer is known to the sync state machine. Used for sanity checks.
pub fn is_peer_known(&self, peer_id: &PeerId) -> bool {
self.peers.contains_key(peer_id)
}

/// Get the set of downloaded blocks that are ready to be queued for import.
fn ready_blocks(&mut self) -> Vec<IncomingBlock<B>> {
self.blocks
Expand Down Expand Up @@ -1537,7 +1536,7 @@ where
}

/// Get justification requests scheduled by sync to be sent out.
pub fn justification_requests(&mut self) -> Vec<(PeerId, BlockRequest<B>)> {
fn justification_requests(&mut self) -> Vec<(PeerId, BlockRequest<B>)> {
let peers = &mut self.peers;
let mut matcher = self.extra_justifications.matcher();
std::iter::from_fn(move || {
Expand All @@ -1564,7 +1563,7 @@ where
}

/// Get block requests scheduled by sync to be sent out.
pub fn block_requests(&mut self) -> Vec<(PeerId, BlockRequest<B>)> {
fn block_requests(&mut self) -> Vec<(PeerId, BlockRequest<B>)> {
if self.mode == SyncMode::Warp {
return self
.warp_target_block_request()
Expand Down Expand Up @@ -1691,7 +1690,7 @@ where
}

/// Get a state request scheduled by sync to be sent out (if any).
pub fn state_request(&mut self) -> Option<(PeerId, OpaqueStateRequest)> {
fn state_request(&mut self) -> Option<(PeerId, OpaqueStateRequest)> {
if self.allowed_requests.is_empty() {
return None
}
Expand Down Expand Up @@ -1737,7 +1736,7 @@ where
}

/// Get a warp proof request scheduled by sync to be sent out (if any).
pub fn warp_sync_request(&mut self) -> Option<(PeerId, WarpProofRequest<B>)> {
fn warp_sync_request(&mut self) -> Option<(PeerId, WarpProofRequest<B>)> {
if let Some(sync) = &self.warp_sync {
if self.allowed_requests.is_empty() ||
sync.is_complete() ||
Expand Down Expand Up @@ -2025,7 +2024,31 @@ where

/// Get pending actions to perform.
#[must_use]
pub fn take_actions(&mut self) -> impl Iterator<Item = ChainSyncAction<B>> {
pub fn actions(&mut self) -> impl Iterator<Item = ChainSyncAction<B>> {
let block_requests = self
.block_requests()
.into_iter()
.map(|(peer_id, request)| ChainSyncAction::SendBlockRequest { peer_id, request });
self.actions.extend(block_requests);

let justification_requests = self
.justification_requests()
.into_iter()
.map(|(peer_id, request)| ChainSyncAction::SendBlockRequest { peer_id, request });
self.actions.extend(justification_requests);

let state_request = self
.state_request()
.into_iter()
.map(|(peer_id, request)| ChainSyncAction::SendStateRequest { peer_id, request });
self.actions.extend(state_request);

let warp_proof_request = self
.warp_sync_request()
.into_iter()
.map(|(peer_id, request)| ChainSyncAction::SendWarpProofRequest { peer_id, request });
self.actions.extend(warp_proof_request);

std::mem::take(&mut self.actions).into_iter()
}
}
Expand Down
55 changes: 26 additions & 29 deletions substrate/client/network/sync/src/engine.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ use crate::{
schema::v1::{StateRequest, StateResponse},
service::{
self,
chain_sync::{SyncingService, ToServiceCommand},
engine::{SyncingService, ToServiceCommand},
},
types::{
BadPeer, ExtendedPeerInfo, OpaqueStateRequest, OpaqueStateResponse, PeerRequest, SyncEvent,
Expand Down Expand Up @@ -713,16 +713,13 @@ where
self.is_major_syncing
.store(self.chain_sync.status().state.is_major_syncing(), Ordering::Relaxed);

// Process actions requested by `ChainSync` during `select!`.
// Process actions requested by `ChainSync`.
self.process_chain_sync_actions();

// Send outbound requests on `ChanSync`'s behalf.
self.send_chain_sync_requests();
}
}

fn process_chain_sync_actions(&mut self) {
self.chain_sync.take_actions().for_each(|action| match action {
self.chain_sync.actions().for_each(|action| match action {
ChainSyncAction::SendBlockRequest { peer_id, request } => {
// Sending block request implies dropping obsolete pending response as we are not
// interested in it anymore (see [`ChainSyncAction::SendBlockRequest`]).
Expand All @@ -741,7 +738,25 @@ where
ChainSyncAction::CancelBlockRequest { peer_id } => {
let removed = self.pending_responses.remove(&peer_id);

trace!(target: LOG_TARGET, "Processed {action:?}., response removed: {removed}.");
trace!(target: LOG_TARGET, "Processed {action:?}, response removed: {removed}.");
},
ChainSyncAction::SendStateRequest { peer_id, request } => {
self.send_state_request(peer_id, request);

trace!(
target: LOG_TARGET,
"Processed `ChainSyncAction::SendBlockRequest` to {peer_id}.",
);
},
ChainSyncAction::SendWarpProofRequest { peer_id, request } => {
self.send_warp_proof_request(peer_id, request.clone());

trace!(
target: LOG_TARGET,
"Processed `ChainSyncAction::SendWarpProofRequest` to {}, request: {:?}.",
peer_id,
request,
altonen marked this conversation as resolved.
Show resolved Hide resolved
);
},
ChainSyncAction::DropPeer(BadPeer(peer_id, rep)) => {
self.pending_responses.remove(&peer_id);
Expand Down Expand Up @@ -1104,26 +1119,8 @@ where
Ok(())
}

fn send_chain_sync_requests(&mut self) {
for (peer_id, request) in self.chain_sync.block_requests() {
self.send_block_request(peer_id, request);
}

if let Some((peer_id, request)) = self.chain_sync.state_request() {
self.send_state_request(peer_id, request);
}

for (peer_id, request) in self.chain_sync.justification_requests() {
self.send_block_request(peer_id, request);
}

if let Some((peer_id, request)) = self.chain_sync.warp_sync_request() {
self.send_warp_sync_request(peer_id, request);
}
}

fn send_block_request(&mut self, peer_id: PeerId, request: BlockRequest<B>) {
if !self.chain_sync.is_peer_known(&peer_id) {
if !self.peers.contains_key(&peer_id) {
trace!(target: LOG_TARGET, "Cannot send block request to unknown peer {peer_id}");
debug_assert!(false);
return
Expand All @@ -1139,7 +1136,7 @@ where
}

fn send_state_request(&mut self, peer_id: PeerId, request: OpaqueStateRequest) {
if !self.chain_sync.is_peer_known(&peer_id) {
if !self.peers.contains_key(&peer_id) {
trace!(target: LOG_TARGET, "Cannot send state request to unknown peer {peer_id}");
debug_assert!(false);
return
Expand Down Expand Up @@ -1168,8 +1165,8 @@ where
}
}

fn send_warp_sync_request(&mut self, peer_id: PeerId, request: WarpProofRequest<B>) {
if !self.chain_sync.is_peer_known(&peer_id) {
fn send_warp_proof_request(&mut self, peer_id: PeerId, request: WarpProofRequest<B>) {
if !self.peers.contains_key(&peer_id) {
trace!(target: LOG_TARGET, "Cannot send warp proof request to unknown peer {peer_id}");
debug_assert!(false);
return
Expand Down
2 changes: 1 addition & 1 deletion substrate/client/network/sync/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@

//! Blockchain syncing implementation in Substrate.

pub use service::chain_sync::SyncingService;
pub use service::engine::SyncingService;
pub use types::{SyncEvent, SyncEventStream, SyncState, SyncStatus, SyncStatusProvider};

mod block_announce_validator;
Expand Down
dmitry-markin marked this conversation as resolved.
Show resolved Hide resolved
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ use std::{
},
};

/// Commands send to `ChainSync`
/// Commands send to `SyncingEngine`
pub enum ToServiceCommand<B: BlockT> {
SetSyncForkRequest(Vec<PeerId>, B::Hash, NumberFor<B>),
RequestJustification(B::Hash, NumberFor<B>),
Expand Down Expand Up @@ -63,7 +63,7 @@ pub enum ToServiceCommand<B: BlockT> {
// },
}

/// Handle for communicating with `ChainSync` asynchronously
/// Handle for communicating with `SyncingEngine` asynchronously
#[derive(Clone)]
pub struct SyncingService<B: BlockT> {
tx: TracingUnboundedSender<ToServiceCommand<B>>,
Expand Down Expand Up @@ -148,7 +148,7 @@ impl<B: BlockT> SyncingService<B> {

/// Get sync status
///
/// Returns an error if `ChainSync` has terminated.
/// Returns an error if `SyncingEngine` has terminated.
pub async fn status(&self) -> Result<SyncStatus<B>, ()> {
let (tx, rx) = oneshot::channel();
let _ = self.tx.unbounded_send(ToServiceCommand::Status(tx));
Expand Down
2 changes: 1 addition & 1 deletion substrate/client/network/sync/src/service/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,6 @@

//! `ChainSync`-related service code
dmitry-markin marked this conversation as resolved.
Show resolved Hide resolved

pub mod chain_sync;
pub mod engine;
altonen marked this conversation as resolved.
Show resolved Hide resolved
pub mod mock;
pub mod network;
2 changes: 1 addition & 1 deletion substrate/client/network/sync/src/warp.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ const LOG_TARGET: &'static str = "sync";
pub struct EncodedProof(pub Vec<u8>);

/// Warp sync request
#[derive(Encode, Decode, Debug)]
#[derive(Encode, Decode, Debug, Clone)]
pub struct WarpProofRequest<B: BlockT> {
/// Start collecting proofs from this block.
pub begin: B::Hash,
Expand Down
2 changes: 1 addition & 1 deletion substrate/client/network/test/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ use sc_network_common::role::Roles;
use sc_network_light::light_client_requests::handler::LightClientRequestHandler;
use sc_network_sync::{
block_request_handler::BlockRequestHandler,
service::{chain_sync::SyncingService, network::NetworkServiceProvider},
service::{engine::SyncingService, network::NetworkServiceProvider},
state_request_handler::StateRequestHandler,
warp::{
AuthorityList, EncodedProof, SetId, VerificationResult, WarpSyncParams, WarpSyncProvider,
Expand Down