Skip to content

Commit

Permalink
Move block/state/warpc sync requests/responses to ChainSync (parity…
Browse files Browse the repository at this point in the history
…tech#12739)

* Move block/state/warpc sync requests/responses to `ChainSync`

* Apply suggestions from code review

Co-authored-by: Bastian Köcher <git@kchr.de>

* Apply review suggestions

* cargo-fmt + doc fix

* Fix tests

Co-authored-by: Bastian Köcher <git@kchr.de>
  • Loading branch information
2 people authored and ark0f committed Feb 27, 2023
1 parent 4bb3d42 commit 6bfa731
Show file tree
Hide file tree
Showing 16 changed files with 1,213 additions and 1,245 deletions.
5 changes: 3 additions & 2 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

81 changes: 33 additions & 48 deletions client/network/common/src/sync.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,14 +24,16 @@ pub mod warp;

use libp2p::PeerId;
use message::{BlockAnnounce, BlockData, BlockRequest, BlockResponse};
use sc_consensus::{BlockImportError, BlockImportStatus, IncomingBlock};
use sc_consensus::{
import_queue::RuntimeOrigin, BlockImportError, BlockImportStatus, IncomingBlock,
};
use sp_consensus::BlockOrigin;
use sp_runtime::{
traits::{Block as BlockT, NumberFor},
Justifications,
};
use std::{any::Any, fmt, fmt::Formatter, task::Poll};
use warp::{EncodedProof, WarpProofRequest, WarpSyncProgress};
use warp::WarpSyncProgress;

/// The sync status of a peer we are trying to sync with
#[derive(Debug)]
Expand Down Expand Up @@ -123,7 +125,7 @@ pub enum OnBlockJustification<Block: BlockT> {
},
}

/// Result of [`ChainSync::on_state_data`].
/// Result of `ChainSync::on_state_data`.
#[derive(Debug)]
pub enum OnStateData<Block: BlockT> {
/// The block and state that should be imported.
Expand All @@ -132,6 +134,20 @@ pub enum OnStateData<Block: BlockT> {
Continue,
}

/// Block or justification request polled from `ChainSync`
#[derive(Debug)]
pub enum ImportResult<B: BlockT> {
BlockImport(BlockOrigin, Vec<IncomingBlock<B>>),
JustificationImport(RuntimeOrigin, B::Hash, NumberFor<B>, Justifications),
}

/// Value polled from `ChainSync`
#[derive(Debug)]
pub enum PollResult<B: BlockT> {
Import(ImportResult<B>),
Announce(PollBlockAnnounceValidation<B::Header>),
}

/// Result of [`ChainSync::poll_block_announce_validation`].
#[derive(Debug, Clone, PartialEq, Eq)]
pub enum PollBlockAnnounceValidation<H> {
Expand Down Expand Up @@ -186,6 +202,13 @@ pub struct Metrics {
pub justifications: metrics::Metrics,
}

#[derive(Debug)]
pub enum PeerRequest<B: BlockT> {
Block(BlockRequest<B>),
State,
WarpProof,
}

/// Wrapper for implementation-specific state request.
///
/// NOTE: Implementation must be able to encode and decode it for network purposes.
Expand Down Expand Up @@ -250,6 +273,9 @@ pub trait ChainSync<Block: BlockT>: Send {
/// Returns the current number of peers stored within this state machine.
fn num_peers(&self) -> usize;

/// Returns the number of peers we're connected to and that are being queried.
fn num_active_peers(&self) -> usize;

/// Handle a new connected peer.
///
/// Call this method whenever we connect to a new peer.
Expand Down Expand Up @@ -277,22 +303,6 @@ pub trait ChainSync<Block: BlockT>: Send {
number: NumberFor<Block>,
);

/// Get an iterator over all scheduled justification requests.
fn justification_requests<'a>(
&'a mut self,
) -> Box<dyn Iterator<Item = (PeerId, BlockRequest<Block>)> + 'a>;

/// Get an iterator over all block requests of all peers.
fn block_requests<'a>(
&'a mut self,
) -> Box<dyn Iterator<Item = (PeerId, BlockRequest<Block>)> + 'a>;

/// Get a state request, if any.
fn state_request(&mut self) -> Option<(PeerId, OpaqueStateRequest)>;

/// Get a warp sync request, if any.
fn warp_sync_request(&mut self) -> Option<(PeerId, WarpProofRequest<Block>)>;

/// Handle a response from the remote to a block request that we made.
///
/// `request` must be the original request that triggered `response`.
Expand All @@ -307,16 +317,6 @@ pub trait ChainSync<Block: BlockT>: Send {
response: BlockResponse<Block>,
) -> Result<OnBlockData<Block>, BadPeer>;

/// Handle a response from the remote to a state request that we made.
fn on_state_data(
&mut self,
who: &PeerId,
response: OpaqueStateResponse,
) -> Result<OnStateData<Block>, BadPeer>;

/// Handle a response from the remote to a warp proof request that we made.
fn on_warp_sync_data(&mut self, who: &PeerId, response: EncodedProof) -> Result<(), BadPeer>;

/// Handle a response from the remote to a justification request that we made.
///
/// `request` must be the original request that triggered `response`.
Expand Down Expand Up @@ -383,35 +383,20 @@ pub trait ChainSync<Block: BlockT>: Send {
/// Return some key metrics.
fn metrics(&self) -> Metrics;

/// Create implementation-specific block request.
fn create_opaque_block_request(&self, request: &BlockRequest<Block>) -> OpaqueBlockRequest;

/// Encode implementation-specific block request.
fn encode_block_request(&self, request: &OpaqueBlockRequest) -> Result<Vec<u8>, String>;

/// Decode implementation-specific block response.
fn decode_block_response(&self, response: &[u8]) -> Result<OpaqueBlockResponse, String>;

/// Access blocks from implementation-specific block response.
fn block_response_into_blocks(
&self,
request: &BlockRequest<Block>,
response: OpaqueBlockResponse,
) -> Result<Vec<BlockData<Block>>, String>;

/// Encode implementation-specific state request.
fn encode_state_request(&self, request: &OpaqueStateRequest) -> Result<Vec<u8>, String>;

/// Decode implementation-specific state response.
fn decode_state_response(&self, response: &[u8]) -> Result<OpaqueStateResponse, String>;

/// Advance the state of `ChainSync`
///
/// Internally calls [`ChainSync::poll_block_announce_validation()`] and
/// this function should be polled until it returns [`Poll::Pending`] to
/// consume all pending events.
fn poll(
&mut self,
cx: &mut std::task::Context,
) -> Poll<PollBlockAnnounceValidation<Block::Header>>;
fn poll(&mut self, cx: &mut std::task::Context) -> Poll<PollResult<Block>>;

/// Send block request to peer
fn send_block_request(&mut self, who: PeerId, request: BlockRequest<Block>);
}
51 changes: 1 addition & 50 deletions client/network/src/behaviour.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,6 @@ use sc_network_common::{
ProtocolName,
},
request_responses::{IfDisconnected, ProtocolConfig, RequestFailure},
sync::{warp::WarpProofRequest, OpaqueBlockRequest, OpaqueStateRequest},
};
use sc_peerset::{PeersetHandle, ReputationChange};
use sp_blockchain::HeaderBackend;
Expand Down Expand Up @@ -163,36 +162,6 @@ pub enum BehaviourOut<B: BlockT> {
messages: Vec<(ProtocolName, Bytes)>,
},

/// A new block request must be emitted.
BlockRequest {
/// Node we send the request to.
target: PeerId,
/// Opaque implementation-specific block request.
request: OpaqueBlockRequest,
/// One-shot channel to receive the response.
pending_response: oneshot::Sender<Result<Vec<u8>, RequestFailure>>,
},

/// A new state request must be emitted.
StateRequest {
/// Node we send the request to.
target: PeerId,
/// Opaque implementation-specific state request.
request: OpaqueStateRequest,
/// One-shot channel to receive the response.
pending_response: oneshot::Sender<Result<Vec<u8>, RequestFailure>>,
},

/// A new warp sync request must be emitted.
WarpSyncRequest {
/// Node we send the request to.
target: PeerId,
/// Warp sync request.
request: WarpProofRequest<B>,
/// One-shot channel to receive the response.
pending_response: oneshot::Sender<Result<Vec<u8>, RequestFailure>>,
},

/// Now connected to a new peer for syncing purposes.
SyncConnected(PeerId),

Expand Down Expand Up @@ -230,21 +199,9 @@ where
user_agent: String,
local_public_key: PublicKey,
disco_config: DiscoveryConfig,
block_request_protocol_config: ProtocolConfig,
state_request_protocol_config: ProtocolConfig,
warp_sync_protocol_config: Option<ProtocolConfig>,
light_client_request_protocol_config: ProtocolConfig,
// All remaining request protocol configs.
mut request_response_protocols: Vec<ProtocolConfig>,
request_response_protocols: Vec<ProtocolConfig>,
peerset: PeersetHandle,
) -> Result<Self, request_responses::RegisterError> {
if let Some(config) = warp_sync_protocol_config {
request_response_protocols.push(config);
}
request_response_protocols.push(block_request_protocol_config);
request_response_protocols.push(state_request_protocol_config);
request_response_protocols.push(light_client_request_protocol_config);

Ok(Self {
substrate,
peer_info: peer_info::PeerInfoBehaviour::new(user_agent, local_public_key),
Expand Down Expand Up @@ -356,12 +313,6 @@ impl<B: BlockT> From<CustomMessageOutcome<B>> for BehaviourOut<B> {
BehaviourOut::BlockImport(origin, blocks),
CustomMessageOutcome::JustificationImport(origin, hash, nb, justification) =>
BehaviourOut::JustificationImport(origin, hash, nb, justification),
CustomMessageOutcome::BlockRequest { target, request, pending_response } =>
BehaviourOut::BlockRequest { target, request, pending_response },
CustomMessageOutcome::StateRequest { target, request, pending_response } =>
BehaviourOut::StateRequest { target, request, pending_response },
CustomMessageOutcome::WarpSyncRequest { target, request, pending_response } =>
BehaviourOut::WarpSyncRequest { target, request, pending_response },
CustomMessageOutcome::NotificationStreamOpened {
remote,
protocol,
Expand Down
33 changes: 0 additions & 33 deletions client/network/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -101,39 +101,6 @@ where
/// Block announce protocol configuration
pub block_announce_config: NonDefaultSetConfig,

/// Request response configuration for the block request protocol.
///
/// [`RequestResponseConfig::name`] is used to tag outgoing block requests with the correct
/// protocol name. In addition all of [`RequestResponseConfig`] is used to handle incoming
/// block requests, if enabled.
///
/// Can be constructed either via
/// `sc_network_sync::block_request_handler::generate_protocol_config` allowing outgoing but
/// not incoming requests, or constructed via `sc_network_sync::block_request_handler::
/// BlockRequestHandler::new` allowing both outgoing and incoming requests.
pub block_request_protocol_config: RequestResponseConfig,

/// Request response configuration for the light client request protocol.
///
/// Can be constructed either via
/// `sc_network_light::light_client_requests::generate_protocol_config` allowing outgoing but
/// not incoming requests, or constructed via
/// `sc_network_light::light_client_requests::handler::LightClientRequestHandler::new`
/// allowing both outgoing and incoming requests.
pub light_client_request_protocol_config: RequestResponseConfig,

/// Request response configuration for the state request protocol.
///
/// Can be constructed either via
/// `sc_network_sync::state_request_handler::generate_protocol_config` allowing outgoing but
/// not incoming requests, or constructed via
/// `sc_network_sync::state_request_handler::StateRequestHandler::new` allowing
/// both outgoing and incoming requests.
pub state_request_protocol_config: RequestResponseConfig,

/// Optional warp sync protocol config.
pub warp_sync_protocol_config: Option<RequestResponseConfig>,

/// Request response protocol configurations
pub request_response_protocol_configs: Vec<RequestResponseConfig>,
}
Expand Down
Loading

0 comments on commit 6bfa731

Please sign in to comment.