From 4d29da2156d587a6b8e4efbe10e412efaedbbd4e Mon Sep 17 00:00:00 2001 From: Aaro Altonen <48052676+altonen@users.noreply.github.com> Date: Sat, 22 Oct 2022 15:36:26 +0300 Subject: [PATCH] Make `NetworkService` callable for `ChainSync` (#12542) Introduce a middleware called `NetworkServiceProvider` which the `ChainSync` can use to communicate with `NetworkService`. `ChainSync` is given a `NetworkServiceHandle` which it uses to call `NetworkServiceProvider` which then dispatches the calls to `NetworkService` on behalf of `ChainSync`. This change will allow `ChainSync` to disconnect and report peers and in the future it'll be possible to send requests and notifications through the `NetworkServiceProvider`. `NetworkServiceProvider` is needed only until the `ChainSync` object has been removed from `Protocol`. After that, a normal `NetworkService` handle can be passed onto `ChainSync` and these changes can be deprecated. Co-authored-by: parity-processbot <> --- .../network/src/service/tests/chain_sync.rs | 182 +++++++++++++++++- client/network/src/service/tests/mod.rs | 28 ++- client/network/sync/src/lib.rs | 41 +++- client/network/sync/src/service/mock.rs | 48 ++++- client/network/sync/src/service/mod.rs | 1 + client/network/sync/src/service/network.rs | 128 ++++++++++++ client/network/sync/src/tests.rs | 4 +- client/network/test/src/lib.rs | 12 +- client/service/src/builder.rs | 12 +- 9 files changed, 443 insertions(+), 13 deletions(-) create mode 100644 client/network/sync/src/service/network.rs diff --git a/client/network/src/service/tests/chain_sync.rs b/client/network/src/service/tests/chain_sync.rs index 7ff8c589d8550..21149459413f4 100644 --- a/client/network/src/service/tests/chain_sync.rs +++ b/client/network/src/service/tests/chain_sync.rs @@ -16,7 +16,10 @@ // You should have received a copy of the GNU General Public License // along with this program. If not, see . -use crate::service::tests::TestNetworkBuilder; +use crate::{ + config, + service::tests::{TestNetworkBuilder, BLOCK_ANNOUNCE_PROTO_NAME}, +}; use futures::prelude::*; use libp2p::PeerId; @@ -24,16 +27,23 @@ use sc_block_builder::BlockBuilderProvider; use sc_client_api::HeaderBackend; use sc_consensus::JustificationSyncLink; use sc_network_common::{ + config::{MultiaddrWithPeerId, SetConfig}, + protocol::event::Event, service::NetworkSyncForkRequest, sync::{SyncState, SyncStatus}, }; -use sc_network_sync::{mock::MockChainSync, service::mock::MockChainSyncInterface}; +use sc_network_sync::{mock::MockChainSync, service::mock::MockChainSyncInterface, ChainSync}; use sp_core::H256; use sp_runtime::{ generic::BlockId, traits::{Block as BlockT, Header as _}, }; -use std::{iter, sync::Arc, task::Poll}; +use std::{ + iter, + sync::{Arc, RwLock}, + task::Poll, + time::Duration, +}; use substrate_test_runtime_client::{TestClientBuilder, TestClientBuilderExt as _}; fn set_default_expecations_no_peers( @@ -224,3 +234,169 @@ async fn on_block_finalized() { }) .await; } + +// report from mock import queue that importing a justification was not successful +// and verify that connection to the peer is closed +#[async_std::test] +async fn invalid_justification_imported() { + struct DummyImportQueue( + Arc< + RwLock< + Option<( + PeerId, + substrate_test_runtime_client::runtime::Hash, + sp_runtime::traits::NumberFor, + )>, + >, + >, + ); + + impl sc_consensus::ImportQueue for DummyImportQueue { + fn import_blocks( + &mut self, + _origin: sp_consensus::BlockOrigin, + _blocks: Vec< + sc_consensus::IncomingBlock, + >, + ) { + } + + fn import_justifications( + &mut self, + _who: sc_consensus::import_queue::RuntimeOrigin, + _hash: substrate_test_runtime_client::runtime::Hash, + _number: sp_runtime::traits::NumberFor, + _justifications: sp_runtime::Justifications, + ) { + } + + fn poll_actions( + &mut self, + _cx: &mut futures::task::Context, + link: &mut dyn sc_consensus::Link, + ) { + if let Some((peer, hash, number)) = *self.0.read().unwrap() { + link.justification_imported(peer, &hash, number, false); + } + } + } + + let justification_info = Arc::new(RwLock::new(None)); + let listen_addr = config::build_multiaddr![Memory(rand::random::())]; + + let (service1, mut event_stream1) = TestNetworkBuilder::new() + .with_import_queue(Box::new(DummyImportQueue(justification_info.clone()))) + .with_listen_addresses(vec![listen_addr.clone()]) + .build() + .start_network(); + + let (service2, mut event_stream2) = TestNetworkBuilder::new() + .with_set_config(SetConfig { + reserved_nodes: vec![MultiaddrWithPeerId { + multiaddr: listen_addr, + peer_id: service1.local_peer_id, + }], + ..Default::default() + }) + .build() + .start_network(); + + async fn wait_for_events(stream: &mut (impl Stream + std::marker::Unpin)) { + let mut notif_received = false; + let mut sync_received = false; + while !notif_received || !sync_received { + match stream.next().await.unwrap() { + Event::NotificationStreamOpened { .. } => notif_received = true, + Event::SyncConnected { .. } => sync_received = true, + _ => {}, + }; + } + } + + wait_for_events(&mut event_stream1).await; + wait_for_events(&mut event_stream2).await; + + { + let mut info = justification_info.write().unwrap(); + *info = Some((service2.local_peer_id, H256::random(), 1337u64)); + } + + let wait_disconnection = async { + while !std::matches!(event_stream1.next().await, Some(Event::SyncDisconnected { .. })) {} + }; + + if async_std::future::timeout(Duration::from_secs(5), wait_disconnection) + .await + .is_err() + { + panic!("did not receive disconnection event in time"); + } +} + +#[async_std::test] +async fn disconnect_peer_using_chain_sync_handle() { + let client = Arc::new(TestClientBuilder::with_default_backend().build_with_longest_chain().0); + let listen_addr = config::build_multiaddr![Memory(rand::random::())]; + + let (chain_sync_network_provider, chain_sync_network_handle) = + sc_network_sync::service::network::NetworkServiceProvider::new(); + let handle_clone = chain_sync_network_handle.clone(); + + let (chain_sync, chain_sync_service) = ChainSync::new( + sc_network_common::sync::SyncMode::Full, + client.clone(), + Box::new(sp_consensus::block_validation::DefaultBlockAnnounceValidator), + 1u32, + None, + chain_sync_network_handle.clone(), + ) + .unwrap(); + + let (node1, mut event_stream1) = TestNetworkBuilder::new() + .with_listen_addresses(vec![listen_addr.clone()]) + .with_chain_sync((Box::new(chain_sync), chain_sync_service)) + .with_chain_sync_network((chain_sync_network_provider, chain_sync_network_handle)) + .with_client(client.clone()) + .build() + .start_network(); + + let (node2, mut event_stream2) = TestNetworkBuilder::new() + .with_set_config(SetConfig { + reserved_nodes: vec![MultiaddrWithPeerId { + multiaddr: listen_addr, + peer_id: node1.local_peer_id, + }], + ..Default::default() + }) + .with_client(client.clone()) + .build() + .start_network(); + + async fn wait_for_events(stream: &mut (impl Stream + std::marker::Unpin)) { + let mut notif_received = false; + let mut sync_received = false; + while !notif_received || !sync_received { + match stream.next().await.unwrap() { + Event::NotificationStreamOpened { .. } => notif_received = true, + Event::SyncConnected { .. } => sync_received = true, + _ => {}, + }; + } + } + + wait_for_events(&mut event_stream1).await; + wait_for_events(&mut event_stream2).await; + + handle_clone.disconnect_peer(node2.local_peer_id, BLOCK_ANNOUNCE_PROTO_NAME.into()); + + let wait_disconnection = async { + while !std::matches!(event_stream1.next().await, Some(Event::SyncDisconnected { .. })) {} + }; + + if async_std::future::timeout(Duration::from_secs(5), wait_disconnection) + .await + .is_err() + { + panic!("did not receive disconnection event in time"); + } +} diff --git a/client/network/src/service/tests/mod.rs b/client/network/src/service/tests/mod.rs index f829d9d43090f..ef25616a07b0d 100644 --- a/client/network/src/service/tests/mod.rs +++ b/client/network/src/service/tests/mod.rs @@ -33,7 +33,9 @@ use sc_network_common::{ }; use sc_network_light::light_client_requests::handler::LightClientRequestHandler; use sc_network_sync::{ - block_request_handler::BlockRequestHandler, state_request_handler::StateRequestHandler, + block_request_handler::BlockRequestHandler, + service::network::{NetworkServiceHandle, NetworkServiceProvider}, + state_request_handler::StateRequestHandler, ChainSync, }; use sp_runtime::traits::{Block as BlockT, Header as _, Zero}; @@ -93,6 +95,7 @@ struct TestNetworkBuilder { listen_addresses: Vec, set_config: Option, chain_sync: Option<(Box>, Box>)>, + chain_sync_network: Option<(NetworkServiceProvider, NetworkServiceHandle)>, config: Option, } @@ -104,6 +107,7 @@ impl TestNetworkBuilder { listen_addresses: Vec::new(), set_config: None, chain_sync: None, + chain_sync_network: None, config: None, } } @@ -136,6 +140,19 @@ impl TestNetworkBuilder { self } + pub fn with_chain_sync_network( + mut self, + chain_sync_network: (NetworkServiceProvider, NetworkServiceHandle), + ) -> Self { + self.chain_sync_network = Some(chain_sync_network); + self + } + + pub fn with_import_queue(mut self, import_queue: Box>) -> Self { + self.import_queue = Some(import_queue); + self + } + pub fn build(mut self) -> TestNetwork { let client = self.client.as_mut().map_or( Arc::new(TestClientBuilder::with_default_backend().build_with_longest_chain().0), @@ -199,6 +216,9 @@ impl TestNetworkBuilder { None, ))); + let (chain_sync_network_provider, chain_sync_network_handle) = + self.chain_sync_network.unwrap_or(NetworkServiceProvider::new()); + let (chain_sync, chain_sync_service) = self.chain_sync.unwrap_or({ let (chain_sync, chain_sync_service) = ChainSync::new( match network_config.sync_mode { @@ -214,6 +234,7 @@ impl TestNetworkBuilder { Box::new(sp_consensus::block_validation::DefaultBlockAnnounceValidator), network_config.max_parallel_downloads, None, + chain_sync_network_handle, ) .unwrap(); @@ -292,6 +313,11 @@ impl TestNetworkBuilder { }) .unwrap(); + let service = worker.service().clone(); + async_std::task::spawn(async move { + let _ = chain_sync_network_provider.run(service).await; + }); + TestNetwork::new(worker) } } diff --git a/client/network/sync/src/lib.rs b/client/network/sync/src/lib.rs index 63f63f7188cfe..f369bdb47e1c6 100644 --- a/client/network/sync/src/lib.rs +++ b/client/network/sync/src/lib.rs @@ -271,6 +271,8 @@ pub struct ChainSync { gap_sync: Option>, /// Channel for receiving service commands service_rx: TracingUnboundedReceiver>, + /// Handle for communicating with `NetworkService` + _network_service: service::network::NetworkServiceHandle, } /// All the data we have about a Peer that we are trying to sync with @@ -1775,6 +1777,7 @@ where block_announce_validator: Box + Send>, max_parallel_downloads: u32, warp_sync_provider: Option>>, + _network_service: service::network::NetworkServiceHandle, ) -> Result<(Self, Box>), ClientError> { let (tx, service_rx) = tracing_unbounded("mpsc_chain_sync"); @@ -1800,6 +1803,7 @@ where import_existing: false, gap_sync: None, service_rx, + _network_service, }; sync.reset_sync_start_point()?; Ok((sync, Box::new(ChainSyncInterfaceHandle::new(tx)))) @@ -2670,6 +2674,7 @@ fn validate_blocks( #[cfg(test)] mod test { use super::*; + use crate::service::network::NetworkServiceProvider; use futures::{executor::block_on, future::poll_fn}; use sc_block_builder::BlockBuilderProvider; use sc_network_common::sync::message::{BlockData, BlockState, FromBlock}; @@ -2691,9 +2696,17 @@ mod test { let block_announce_validator = Box::new(DefaultBlockAnnounceValidator); let peer_id = PeerId::random(); - let (mut sync, _) = - ChainSync::new(SyncMode::Full, client.clone(), block_announce_validator, 1, None) - .unwrap(); + let (_chain_sync_network_provider, chain_sync_network_handle) = + NetworkServiceProvider::new(); + let (mut sync, _) = ChainSync::new( + SyncMode::Full, + client.clone(), + block_announce_validator, + 1, + None, + chain_sync_network_handle, + ) + .unwrap(); let (a1_hash, a1_number) = { let a1 = client.new_block(Default::default()).unwrap().build().unwrap().block; @@ -2739,12 +2752,16 @@ mod test { #[test] fn restart_doesnt_affect_peers_downloading_finality_data() { let mut client = Arc::new(TestClientBuilder::new().build()); + let (_chain_sync_network_provider, chain_sync_network_handle) = + NetworkServiceProvider::new(); + let (mut sync, _) = ChainSync::new( SyncMode::Full, client.clone(), Box::new(DefaultBlockAnnounceValidator), 1, None, + chain_sync_network_handle, ) .unwrap(); @@ -2905,6 +2922,8 @@ mod test { sp_tracing::try_init_simple(); let mut client = Arc::new(TestClientBuilder::new().build()); + let (_chain_sync_network_provider, chain_sync_network_handle) = + NetworkServiceProvider::new(); let (mut sync, _) = ChainSync::new( SyncMode::Full, @@ -2912,6 +2931,7 @@ mod test { Box::new(DefaultBlockAnnounceValidator), 5, None, + chain_sync_network_handle, ) .unwrap(); @@ -3019,6 +3039,8 @@ mod test { }; let mut client = Arc::new(TestClientBuilder::new().build()); + let (_chain_sync_network_provider, chain_sync_network_handle) = + NetworkServiceProvider::new(); let info = client.info(); let (mut sync, _) = ChainSync::new( @@ -3027,6 +3049,7 @@ mod test { Box::new(DefaultBlockAnnounceValidator), 5, None, + chain_sync_network_handle, ) .unwrap(); @@ -3140,6 +3163,8 @@ mod test { fn can_sync_huge_fork() { sp_tracing::try_init_simple(); + let (_chain_sync_network_provider, chain_sync_network_handle) = + NetworkServiceProvider::new(); let mut client = Arc::new(TestClientBuilder::new().build()); let blocks = (0..MAX_BLOCKS_TO_LOOK_BACKWARDS * 4) .map(|_| build_block(&mut client, None, false)) @@ -3170,6 +3195,7 @@ mod test { Box::new(DefaultBlockAnnounceValidator), 5, None, + chain_sync_network_handle, ) .unwrap(); @@ -3269,6 +3295,8 @@ mod test { fn syncs_fork_without_duplicate_requests() { sp_tracing::try_init_simple(); + let (_chain_sync_network_provider, chain_sync_network_handle) = + NetworkServiceProvider::new(); let mut client = Arc::new(TestClientBuilder::new().build()); let blocks = (0..MAX_BLOCKS_TO_LOOK_BACKWARDS * 4) .map(|_| build_block(&mut client, None, false)) @@ -3299,6 +3327,7 @@ mod test { Box::new(DefaultBlockAnnounceValidator), 5, None, + chain_sync_network_handle, ) .unwrap(); @@ -3419,6 +3448,8 @@ mod test { #[test] fn removes_target_fork_on_disconnect() { sp_tracing::try_init_simple(); + let (_chain_sync_network_provider, chain_sync_network_handle) = + NetworkServiceProvider::new(); let mut client = Arc::new(TestClientBuilder::new().build()); let blocks = (0..3).map(|_| build_block(&mut client, None, false)).collect::>(); @@ -3428,6 +3459,7 @@ mod test { Box::new(DefaultBlockAnnounceValidator), 1, None, + chain_sync_network_handle, ) .unwrap(); @@ -3450,6 +3482,8 @@ mod test { #[test] fn can_import_response_with_missing_blocks() { sp_tracing::try_init_simple(); + let (_chain_sync_network_provider, chain_sync_network_handle) = + NetworkServiceProvider::new(); let mut client2 = Arc::new(TestClientBuilder::new().build()); let blocks = (0..4).map(|_| build_block(&mut client2, None, false)).collect::>(); @@ -3461,6 +3495,7 @@ mod test { Box::new(DefaultBlockAnnounceValidator), 1, None, + chain_sync_network_handle, ) .unwrap(); diff --git a/client/network/sync/src/service/mock.rs b/client/network/sync/src/service/mock.rs index e283907b392d1..c146e1ec07b48 100644 --- a/client/network/sync/src/service/mock.rs +++ b/client/network/sync/src/service/mock.rs @@ -16,10 +16,15 @@ // You should have received a copy of the GNU General Public License // along with this program. If not, see . -use libp2p::PeerId; -use sc_network_common::service::NetworkSyncForkRequest; +use sc_network_common::service::{NetworkPeers, NetworkSyncForkRequest}; use sp_runtime::traits::{Block as BlockT, NumberFor}; +pub use libp2p::{identity::error::SigningError, kad::record::Key as KademliaKey}; +use libp2p::{Multiaddr, PeerId}; +use sc_network_common::{config::MultiaddrWithPeerId, protocol::ProtocolName}; +use sc_peerset::ReputationChange; +use std::collections::HashSet; + mockall::mock! { pub ChainSyncInterface {} @@ -29,3 +34,42 @@ mockall::mock! { fn set_sync_fork_request(&self, peers: Vec, hash: B::Hash, number: NumberFor); } } + +mockall::mock! { + pub NetworkServiceHandle {} +} + +// Mocked `Network` for `ChainSync`-related tests +mockall::mock! { + pub Network {} + + impl NetworkPeers for Network { + fn set_authorized_peers(&self, peers: HashSet); + fn set_authorized_only(&self, reserved_only: bool); + fn add_known_address(&self, peer_id: PeerId, addr: Multiaddr); + fn report_peer(&self, who: PeerId, cost_benefit: ReputationChange); + fn disconnect_peer(&self, who: PeerId, protocol: ProtocolName); + fn accept_unreserved_peers(&self); + fn deny_unreserved_peers(&self); + fn add_reserved_peer(&self, peer: MultiaddrWithPeerId) -> Result<(), String>; + fn remove_reserved_peer(&self, peer_id: PeerId); + fn set_reserved_peers( + &self, + protocol: ProtocolName, + peers: HashSet, + ) -> Result<(), String>; + fn add_peers_to_reserved_set( + &self, + protocol: ProtocolName, + peers: HashSet, + ) -> Result<(), String>; + fn remove_peers_from_reserved_set(&self, protocol: ProtocolName, peers: Vec); + fn add_to_peers_set( + &self, + protocol: ProtocolName, + peers: HashSet, + ) -> Result<(), String>; + fn remove_from_peers_set(&self, protocol: ProtocolName, peers: Vec); + fn sync_num_connected(&self) -> usize; + } +} diff --git a/client/network/sync/src/service/mod.rs b/client/network/sync/src/service/mod.rs index d64d9bbd1b01f..692aa26985458 100644 --- a/client/network/sync/src/service/mod.rs +++ b/client/network/sync/src/service/mod.rs @@ -20,3 +20,4 @@ pub mod chain_sync; pub mod mock; +pub mod network; diff --git a/client/network/sync/src/service/network.rs b/client/network/sync/src/service/network.rs new file mode 100644 index 0000000000000..44ed177661264 --- /dev/null +++ b/client/network/sync/src/service/network.rs @@ -0,0 +1,128 @@ +// This file is part of Substrate. + +// Copyright (C) 2022 Parity Technologies (UK) Ltd. +// SPDX-License-Identifier: GPL-3.0-or-later WITH Classpath-exception-2.0 + +// This program is free software: you can redistribute it and/or modify +// it under the terms of the GNU General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. + +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. + +// You should have received a copy of the GNU General Public License +// along with this program. If not, see . + +use futures::StreamExt; +use libp2p::PeerId; +use sc_network_common::{protocol::ProtocolName, service::NetworkPeers}; +use sc_peerset::ReputationChange; +use sc_utils::mpsc::{tracing_unbounded, TracingUnboundedReceiver, TracingUnboundedSender}; +use std::sync::Arc; + +/// Network-related services required by `sc-network-sync` +pub trait Network: NetworkPeers {} + +impl Network for T where T: NetworkPeers {} + +/// Network service provider for `ChainSync` +/// +/// It runs as an asynchronous task and listens to commands coming from `ChainSync` and +/// calls the `NetworkService` on its behalf. +pub struct NetworkServiceProvider { + rx: TracingUnboundedReceiver, +} + +/// Commands that `ChainSync` wishes to send to `NetworkService` +pub enum ToServiceCommand { + /// Call `NetworkPeers::disconnect_peer()` + DisconnectPeer(PeerId, ProtocolName), + + /// Call `NetworkPeers::report_peer()` + ReportPeer(PeerId, ReputationChange), +} + +/// Handle that is (temporarily) passed to `ChainSync` so it can +/// communicate with `NetworkService` through `SyncingEngine` +#[derive(Clone)] +pub struct NetworkServiceHandle { + tx: TracingUnboundedSender, +} + +impl NetworkServiceHandle { + /// Create new service handle + pub fn new(tx: TracingUnboundedSender) -> NetworkServiceHandle { + Self { tx } + } + + /// Report peer + pub fn report_peer(&self, who: PeerId, cost_benefit: ReputationChange) { + let _ = self.tx.unbounded_send(ToServiceCommand::ReportPeer(who, cost_benefit)); + } + + /// Disconnect peer + pub fn disconnect_peer(&self, who: PeerId, protocol: ProtocolName) { + let _ = self.tx.unbounded_send(ToServiceCommand::DisconnectPeer(who, protocol)); + } +} + +impl NetworkServiceProvider { + /// Create new `NetworkServiceProvider` + pub fn new() -> (Self, NetworkServiceHandle) { + let (tx, rx) = tracing_unbounded("mpsc_network_service_provider"); + + (Self { rx }, NetworkServiceHandle::new(tx)) + } + + /// Run the `NetworkServiceProvider` + pub async fn run(mut self, service: Arc) { + while let Some(inner) = self.rx.next().await { + match inner { + ToServiceCommand::DisconnectPeer(peer, protocol_name) => + service.disconnect_peer(peer, protocol_name), + ToServiceCommand::ReportPeer(peer, reputation_change) => + service.report_peer(peer, reputation_change), + } + } + } +} + +#[cfg(test)] +mod tests { + use super::*; + use crate::service::mock::MockNetwork; + + // typical pattern in `Protocol` code where peer is disconnected + // and then reported + #[async_std::test] + async fn disconnect_and_report_peer() { + let (provider, handle) = NetworkServiceProvider::new(); + + let peer = PeerId::random(); + let proto = ProtocolName::from("test-protocol"); + let proto_clone = proto.clone(); + let change = sc_peerset::ReputationChange::new_fatal("test-change"); + + let mut mock_network = MockNetwork::new(); + mock_network + .expect_disconnect_peer() + .withf(move |in_peer, in_proto| &peer == in_peer && &proto == in_proto) + .once() + .returning(|_, _| ()); + mock_network + .expect_report_peer() + .withf(move |in_peer, in_change| &peer == in_peer && &change == in_change) + .once() + .returning(|_, _| ()); + + async_std::task::spawn(async move { + provider.run(Arc::new(mock_network)).await; + }); + + handle.disconnect_peer(peer, proto_clone); + handle.report_peer(peer, change); + } +} diff --git a/client/network/sync/src/tests.rs b/client/network/sync/src/tests.rs index 47483c4ac440d..479c78bfdea97 100644 --- a/client/network/sync/src/tests.rs +++ b/client/network/sync/src/tests.rs @@ -16,7 +16,7 @@ // You should have received a copy of the GNU General Public License // along with this program. If not, see . -use crate::{ChainSync, ForkTarget}; +use crate::{service::network::NetworkServiceProvider, ChainSync, ForkTarget}; use libp2p::PeerId; use sc_network_common::{service::NetworkSyncForkRequest, sync::ChainSync as ChainSyncT}; @@ -29,12 +29,14 @@ use substrate_test_runtime_client::{TestClientBuilder, TestClientBuilderExt as _ // poll `ChainSync` and verify that a new sync fork request has been registered #[async_std::test] async fn delegate_to_chainsync() { + let (_chain_sync_network_provider, chain_sync_network_handle) = NetworkServiceProvider::new(); let (mut chain_sync, chain_sync_service) = ChainSync::new( sc_network_common::sync::SyncMode::Full, Arc::new(TestClientBuilder::with_default_backend().build_with_longest_chain().0), Box::new(DefaultBlockAnnounceValidator), 1u32, None, + chain_sync_network_handle, ) .unwrap(); diff --git a/client/network/test/src/lib.rs b/client/network/test/src/lib.rs index 5460cc7d52461..c9b99fbc6af10 100644 --- a/client/network/test/src/lib.rs +++ b/client/network/test/src/lib.rs @@ -61,8 +61,8 @@ use sc_network_common::{ }; use sc_network_light::light_client_requests::handler::LightClientRequestHandler; use sc_network_sync::{ - block_request_handler::BlockRequestHandler, state_request_handler::StateRequestHandler, - warp_request_handler, ChainSync, + block_request_handler::BlockRequestHandler, service::network::NetworkServiceProvider, + state_request_handler::StateRequestHandler, warp_request_handler, ChainSync, }; use sc_service::client::Client; use sp_blockchain::{ @@ -864,6 +864,8 @@ where let block_announce_validator = config .block_announce_validator .unwrap_or_else(|| Box::new(DefaultBlockAnnounceValidator)); + let (chain_sync_network_provider, chain_sync_network_handle) = + NetworkServiceProvider::new(); let (chain_sync, chain_sync_service) = ChainSync::new( match network_config.sync_mode { SyncMode::Full => sc_network_common::sync::SyncMode::Full, @@ -878,6 +880,7 @@ where block_announce_validator, network_config.max_parallel_downloads, Some(warp_sync), + chain_sync_network_handle, ) .unwrap(); let block_announce_config = chain_sync.get_block_announce_proto_config( @@ -915,6 +918,11 @@ where trace!(target: "test_network", "Peer identifier: {}", network.service().local_peer_id()); + let service = network.service().clone(); + async_std::task::spawn(async move { + chain_sync_network_provider.run(service).await; + }); + self.mut_peers(move |peers| { for peer in peers.iter_mut() { peer.network diff --git a/client/service/src/builder.rs b/client/service/src/builder.rs index 1a16268839054..3cb064ec814c5 100644 --- a/client/service/src/builder.rs +++ b/client/service/src/builder.rs @@ -46,7 +46,8 @@ use sc_network_common::{ }; use sc_network_light::light_client_requests::handler::LightClientRequestHandler; use sc_network_sync::{ - block_request_handler::BlockRequestHandler, state_request_handler::StateRequestHandler, + block_request_handler::BlockRequestHandler, service::network::NetworkServiceProvider, + state_request_handler::StateRequestHandler, warp_request_handler::RequestHandler as WarpSyncRequestHandler, ChainSync, }; use sc_rpc::{ @@ -844,6 +845,7 @@ where protocol_config }; + let (chain_sync_network_provider, chain_sync_network_handle) = NetworkServiceProvider::new(); let (chain_sync, chain_sync_service) = ChainSync::new( match config.network.sync_mode { SyncMode::Full => sc_network_common::sync::SyncMode::Full, @@ -855,7 +857,9 @@ where block_announce_validator, config.network.max_parallel_downloads, warp_sync_provider, + chain_sync_network_handle, )?; + let block_announce_config = chain_sync.get_block_announce_proto_config( protocol_id.clone(), &config.chain_spec.fork_id().map(ToOwned::to_owned), @@ -926,7 +930,13 @@ where Arc::new(TransactionPoolAdapter { pool: transaction_pool, client: client.clone() }), config.prometheus_config.as_ref().map(|config| &config.registry), )?; + spawn_handle.spawn("network-transactions-handler", Some("networking"), tx_handler.run()); + spawn_handle.spawn( + "chain-sync-network-service-provider", + Some("networking"), + chain_sync_network_provider.run(network.clone()), + ); let (system_rpc_tx, system_rpc_rx) = tracing_unbounded("mpsc_system_rpc");