diff --git a/Cargo.lock b/Cargo.lock index 41c641cf05963..8225e557141d1 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -7328,6 +7328,7 @@ dependencies = [ "futures-timer", "libp2p", "log", + "mockall", "parking_lot 0.12.1", "sc-client-api", "sc-utils", @@ -7929,6 +7930,7 @@ dependencies = [ "sp-runtime", "sp-test-primitives", "sp-tracing", + "substrate-prometheus-endpoint", "substrate-test-runtime-client", "thiserror", "tokio", diff --git a/client/consensus/common/Cargo.toml b/client/consensus/common/Cargo.toml index 971ee71ab8040..b61c6a4334285 100644 --- a/client/consensus/common/Cargo.toml +++ b/client/consensus/common/Cargo.toml @@ -18,6 +18,7 @@ futures = { version = "0.3.21", features = ["thread-pool"] } futures-timer = "3.0.1" libp2p = { version = "0.49.0", default-features = false } log = "0.4.17" +mockall = "0.11.2" parking_lot = "0.12.1" serde = { version = "1.0", features = ["derive"] } thiserror = "1.0.30" diff --git a/client/consensus/common/src/import_queue.rs b/client/consensus/common/src/import_queue.rs index 3741fa99663cd..d49b240ef3489 100644 --- a/client/consensus/common/src/import_queue.rs +++ b/client/consensus/common/src/import_queue.rs @@ -53,6 +53,7 @@ pub type DefaultImportQueue = mod basic_queue; pub mod buffered_link; +pub mod mock; /// Shared block import struct used by the queue. pub type BoxBlockImport = @@ -105,10 +106,10 @@ pub trait Verifier: Send + Sync { /// Blocks import queue API. /// /// The `import_*` methods can be called in order to send elements for the import queue to verify. -/// Afterwards, call `poll_actions` to determine how to respond to these elements. -pub trait ImportQueue: Send { +pub trait ImportQueueService: Send { /// Import bunch of blocks. fn import_blocks(&mut self, origin: BlockOrigin, blocks: Vec>); + /// Import block justifications. fn import_justifications( &mut self, @@ -117,12 +118,26 @@ pub trait ImportQueue: Send { number: NumberFor, justifications: Justifications, ); - /// Polls for actions to perform on the network. - /// +} + +#[async_trait::async_trait] +pub trait ImportQueue: Send { + /// Get a copy of the handle to [`ImportQueueService`]. + fn service(&self) -> Box>; + + /// Get a reference to the handle to [`ImportQueueService`]. + fn service_ref(&mut self) -> &mut dyn ImportQueueService; + /// This method should behave in a way similar to `Future::poll`. It can register the current /// task and notify later when more actions are ready to be polled. To continue the comparison, /// it is as if this method always returned `Poll::Pending`. fn poll_actions(&mut self, cx: &mut futures::task::Context, link: &mut dyn Link); + + /// Start asynchronous runner for import queue. + /// + /// Takes an object implementing [`Link`] which allows the import queue to + /// influece the synchronization process. + async fn run(self, link: Box>); } /// Hooks that the verification queue can use to influence the synchronization diff --git a/client/consensus/common/src/import_queue/basic_queue.rs b/client/consensus/common/src/import_queue/basic_queue.rs index 0e607159b75c3..20e8d262cacda 100644 --- a/client/consensus/common/src/import_queue/basic_queue.rs +++ b/client/consensus/common/src/import_queue/basic_queue.rs @@ -34,7 +34,8 @@ use crate::{ import_queue::{ buffered_link::{self, BufferedLinkReceiver, BufferedLinkSender}, import_single_block_metered, BlockImportError, BlockImportStatus, BoxBlockImport, - BoxJustificationImport, ImportQueue, IncomingBlock, Link, RuntimeOrigin, Verifier, + BoxJustificationImport, ImportQueue, ImportQueueService, IncomingBlock, Link, + RuntimeOrigin, Verifier, }, metrics::Metrics, }; @@ -42,10 +43,8 @@ use crate::{ /// Interface to a basic block import queue that is importing blocks sequentially in a separate /// task, with plugable verification. pub struct BasicQueue { - /// Channel to send justification import messages to the background task. - justification_sender: TracingUnboundedSender>, - /// Channel to send block import messages to the background task. - block_import_sender: TracingUnboundedSender>, + /// Handle for sending justification and block import messages to the background task. + handle: BasicQueueHandle, /// Results coming from the worker task. result_port: BufferedLinkReceiver, _phantom: PhantomData, @@ -54,8 +53,7 @@ pub struct BasicQueue { impl Drop for BasicQueue { fn drop(&mut self) { // Flush the queue and close the receiver to terminate the future. - self.justification_sender.close_channel(); - self.block_import_sender.close_channel(); + self.handle.close(); self.result_port.close(); } } @@ -95,11 +93,37 @@ impl BasicQueue { future.boxed(), ); - Self { justification_sender, block_import_sender, result_port, _phantom: PhantomData } + Self { + handle: BasicQueueHandle::new(justification_sender, block_import_sender), + result_port, + _phantom: PhantomData, + } } } -impl ImportQueue for BasicQueue { +#[derive(Clone)] +struct BasicQueueHandle { + /// Channel to send justification import messages to the background task. + justification_sender: TracingUnboundedSender>, + /// Channel to send block import messages to the background task. + block_import_sender: TracingUnboundedSender>, +} + +impl BasicQueueHandle { + pub fn new( + justification_sender: TracingUnboundedSender>, + block_import_sender: TracingUnboundedSender>, + ) -> Self { + Self { justification_sender, block_import_sender } + } + + pub fn close(&mut self) { + self.justification_sender.close_channel(); + self.block_import_sender.close_channel(); + } +} + +impl ImportQueueService for BasicQueueHandle { fn import_blocks(&mut self, origin: BlockOrigin, blocks: Vec>) { if blocks.is_empty() { return @@ -138,12 +162,39 @@ impl ImportQueue for BasicQueue } } } +} + +#[async_trait::async_trait] +impl ImportQueue for BasicQueue { + /// Get handle to [`ImportQueueService`]. + fn service(&self) -> Box> { + Box::new(self.handle.clone()) + } + /// Get a reference to the handle to [`ImportQueueService`]. + fn service_ref(&mut self) -> &mut dyn ImportQueueService { + &mut self.handle + } + + /// Poll actions from network. fn poll_actions(&mut self, cx: &mut Context, link: &mut dyn Link) { if self.result_port.poll_actions(cx, link).is_err() { log::error!(target: "sync", "poll_actions: Background import task is no longer alive"); } } + + /// Start asynchronous runner for import queue. + /// + /// Takes an object implementing [`Link`] which allows the import queue to + /// influece the synchronization process. + async fn run(mut self, mut link: Box>) { + loop { + if let Err(_) = self.result_port.next_action(&mut *link).await { + log::error!(target: "sync", "poll_actions: Background import task is no longer alive"); + return + } + } + } } /// Messages destinated to the background worker. diff --git a/client/consensus/common/src/import_queue/buffered_link.rs b/client/consensus/common/src/import_queue/buffered_link.rs index 5d418dddf0853..e6d3b212fdbac 100644 --- a/client/consensus/common/src/import_queue/buffered_link.rs +++ b/client/consensus/common/src/import_queue/buffered_link.rs @@ -80,7 +80,7 @@ impl Clone for BufferedLinkSender { } /// Internal buffered message. -enum BlockImportWorkerMsg { +pub enum BlockImportWorkerMsg { BlocksProcessed(usize, usize, Vec<(BlockImportResult, B::Hash)>), JustificationImported(RuntimeOrigin, B::Hash, NumberFor, bool), RequestJustification(B::Hash, NumberFor), @@ -122,6 +122,18 @@ pub struct BufferedLinkReceiver { } impl BufferedLinkReceiver { + /// Send action for the synchronization to perform. + pub fn send_actions(&mut self, msg: BlockImportWorkerMsg, link: &mut dyn Link) { + match msg { + BlockImportWorkerMsg::BlocksProcessed(imported, count, results) => + link.blocks_processed(imported, count, results), + BlockImportWorkerMsg::JustificationImported(who, hash, number, success) => + link.justification_imported(who, &hash, number, success), + BlockImportWorkerMsg::RequestJustification(hash, number) => + link.request_justification(&hash, number), + } + } + /// Polls for the buffered link actions. Any enqueued action will be propagated to the link /// passed as parameter. /// @@ -138,15 +150,17 @@ impl BufferedLinkReceiver { Poll::Pending => break Ok(()), }; - match msg { - BlockImportWorkerMsg::BlocksProcessed(imported, count, results) => - link.blocks_processed(imported, count, results), - BlockImportWorkerMsg::JustificationImported(who, hash, number, success) => - link.justification_imported(who, &hash, number, success), - BlockImportWorkerMsg::RequestJustification(hash, number) => - link.request_justification(&hash, number), - } + self.send_actions(msg, &mut *link); + } + } + + /// Poll next element from import queue and send the corresponding action command over the link. + pub async fn next_action(&mut self, link: &mut dyn Link) -> Result<(), ()> { + if let Some(msg) = self.rx.next().await { + self.send_actions(msg, link); + return Ok(()) } + Err(()) } /// Close the channel. diff --git a/client/consensus/common/src/import_queue/mock.rs b/client/consensus/common/src/import_queue/mock.rs new file mode 100644 index 0000000000000..67deee9514a1c --- /dev/null +++ b/client/consensus/common/src/import_queue/mock.rs @@ -0,0 +1,46 @@ +// This file is part of Substrate. + +// Copyright (C) 2022 Parity Technologies (UK) Ltd. +// SPDX-License-Identifier: GPL-3.0-or-later WITH Classpath-exception-2.0 + +// This program is free software: you can redistribute it and/or modify +// it under the terms of the GNU General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. + +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. + +// You should have received a copy of the GNU General Public License +// along with this program. If not, see . + +use super::*; + +mockall::mock! { + pub ImportQueueHandle {} + + impl ImportQueueService for ImportQueueHandle { + fn import_blocks(&mut self, origin: BlockOrigin, blocks: Vec>); + fn import_justifications( + &mut self, + who: RuntimeOrigin, + hash: B::Hash, + number: NumberFor, + justifications: Justifications, + ); + } +} + +mockall::mock! { + pub ImportQueue {} + + #[async_trait::async_trait] + impl ImportQueue for ImportQueue { + fn service(&self) -> Box>; + fn service_ref(&mut self) -> &mut dyn ImportQueueService; + fn poll_actions<'a>(&mut self, cx: &mut futures::task::Context<'a>, link: &mut dyn Link); + async fn run(self, link: Box>); + } +} diff --git a/client/network/common/src/sync.rs b/client/network/common/src/sync.rs index bed9935698769..5e8219c550d19 100644 --- a/client/network/common/src/sync.rs +++ b/client/network/common/src/sync.rs @@ -24,9 +24,7 @@ pub mod warp; use libp2p::PeerId; use message::{BlockAnnounce, BlockData, BlockRequest, BlockResponse}; -use sc_consensus::{ - import_queue::RuntimeOrigin, BlockImportError, BlockImportStatus, IncomingBlock, -}; +use sc_consensus::{import_queue::RuntimeOrigin, IncomingBlock}; use sp_consensus::BlockOrigin; use sp_runtime::{ traits::{Block as BlockT, NumberFor}, @@ -317,6 +315,12 @@ pub trait ChainSync: Send { response: BlockResponse, ) -> Result, BadPeer>; + /// Procss received block data. + fn process_block_response_data( + &mut self, + blocks_to_import: Result, BadPeer>, + ); + /// Handle a response from the remote to a justification request that we made. /// /// `request` must be the original request that triggered `response`. @@ -326,17 +330,6 @@ pub trait ChainSync: Send { response: BlockResponse, ) -> Result, BadPeer>; - /// A batch of blocks have been processed, with or without errors. - /// - /// Call this when a batch of blocks have been processed by the import - /// queue, with or without errors. - fn on_blocks_processed( - &mut self, - imported: usize, - count: usize, - results: Vec<(Result>, BlockImportError>, Block::Hash)>, - ) -> Box), BadPeer>>>; - /// Call this when a justification has been processed by the import queue, /// with or without errors. fn on_justification_import( @@ -378,7 +371,7 @@ pub trait ChainSync: Send { /// Call when a peer has disconnected. /// Canceled obsolete block request may result in some blocks being ready for /// import, so this functions checks for such blocks and returns them. - fn peer_disconnected(&mut self, who: &PeerId) -> Option>; + fn peer_disconnected(&mut self, who: &PeerId); /// Return some key metrics. fn metrics(&self) -> Metrics; @@ -395,7 +388,10 @@ pub trait ChainSync: Send { /// Internally calls [`ChainSync::poll_block_announce_validation()`] and /// this function should be polled until it returns [`Poll::Pending`] to /// consume all pending events. - fn poll(&mut self, cx: &mut std::task::Context) -> Poll>; + fn poll( + &mut self, + cx: &mut std::task::Context, + ) -> Poll>; /// Send block request to peer fn send_block_request(&mut self, who: PeerId, request: BlockRequest); diff --git a/client/network/src/behaviour.rs b/client/network/src/behaviour.rs index 48d6127f642c3..3a977edbca574 100644 --- a/client/network/src/behaviour.rs +++ b/client/network/src/behaviour.rs @@ -32,7 +32,6 @@ use libp2p::{ NetworkBehaviour, }; -use sc_consensus::import_queue::{IncomingBlock, RuntimeOrigin}; use sc_network_common::{ protocol::{ event::DhtEvent, @@ -43,18 +42,14 @@ use sc_network_common::{ }; use sc_peerset::{PeersetHandle, ReputationChange}; use sp_blockchain::HeaderBackend; -use sp_consensus::BlockOrigin; -use sp_runtime::{ - traits::{Block as BlockT, NumberFor}, - Justifications, -}; +use sp_runtime::traits::Block as BlockT; use std::{collections::HashSet, time::Duration}; pub use crate::request_responses::{InboundFailure, OutboundFailure, RequestId, ResponseFailure}; /// General behaviour of the network. Combines all protocols together. #[derive(NetworkBehaviour)] -#[behaviour(out_event = "BehaviourOut")] +#[behaviour(out_event = "BehaviourOut")] pub struct Behaviour where B: BlockT, @@ -72,10 +67,7 @@ where } /// Event generated by `Behaviour`. -pub enum BehaviourOut { - BlockImport(BlockOrigin, Vec>), - JustificationImport(RuntimeOrigin, B::Hash, NumberFor, Justifications), - +pub enum BehaviourOut { /// Started a random iterative Kademlia discovery query. RandomKademliaStarted, @@ -107,10 +99,7 @@ pub enum BehaviourOut { }, /// A request protocol handler issued reputation changes for the given peer. - ReputationChanges { - peer: PeerId, - changes: Vec, - }, + ReputationChanges { peer: PeerId, changes: Vec }, /// Opened a substream with the given node with the given notifications protocol. /// @@ -306,13 +295,9 @@ fn reported_roles_to_observed_role(roles: Roles) -> ObservedRole { } } -impl From> for BehaviourOut { +impl From> for BehaviourOut { fn from(event: CustomMessageOutcome) -> Self { match event { - CustomMessageOutcome::BlockImport(origin, blocks) => - BehaviourOut::BlockImport(origin, blocks), - CustomMessageOutcome::JustificationImport(origin, hash, nb, justification) => - BehaviourOut::JustificationImport(origin, hash, nb, justification), CustomMessageOutcome::NotificationStreamOpened { remote, protocol, @@ -344,7 +329,7 @@ impl From> for BehaviourOut { } } -impl From for BehaviourOut { +impl From for BehaviourOut { fn from(event: request_responses::Event) -> Self { match event { request_responses::Event::InboundRequest { peer, protocol, result } => @@ -357,14 +342,14 @@ impl From for BehaviourOut { } } -impl From for BehaviourOut { +impl From for BehaviourOut { fn from(event: peer_info::PeerInfoEvent) -> Self { let peer_info::PeerInfoEvent::Identified { peer_id, info } = event; BehaviourOut::PeerIdentify { peer_id, info } } } -impl From for BehaviourOut { +impl From for BehaviourOut { fn from(event: DiscoveryOut) -> Self { match event { DiscoveryOut::UnroutablePeer(_peer_id) => { diff --git a/client/network/src/config.rs b/client/network/src/config.rs index b10612dd17094..52993e2519400 100644 --- a/client/network/src/config.rs +++ b/client/network/src/config.rs @@ -40,7 +40,6 @@ use libp2p::{ multiaddr, Multiaddr, }; use prometheus_endpoint::Registry; -use sc_consensus::ImportQueue; use sc_network_common::{ config::{MultiaddrWithPeerId, NonDefaultSetConfig, SetConfig, TransportConfig}, sync::ChainSync, @@ -82,12 +81,6 @@ where /// name on the wire. pub fork_id: Option, - /// Import queue to use. - /// - /// The import queue is the component that verifies that blocks received from other nodes are - /// valid. - pub import_queue: Box>, - /// Instance of chain sync implementation. pub chain_sync: Box>, diff --git a/client/network/src/lib.rs b/client/network/src/lib.rs index f3faa44ee6dbd..f185458e0dace 100644 --- a/client/network/src/lib.rs +++ b/client/network/src/lib.rs @@ -258,6 +258,7 @@ pub mod network_state; #[doc(inline)] pub use libp2p::{multiaddr, Multiaddr, PeerId}; pub use protocol::PeerInfo; +use sc_consensus::{JustificationSyncLink, Link}; pub use sc_network_common::{ protocol::{ event::{DhtEvent, Event}, @@ -297,11 +298,15 @@ const MAX_CONNECTIONS_ESTABLISHED_INCOMING: u32 = 10_000; /// Abstraction over syncing-related services pub trait ChainSyncInterface: - NetworkSyncForkRequest> + Send + Sync + NetworkSyncForkRequest> + JustificationSyncLink + Link + Send + Sync { } impl ChainSyncInterface for T where - T: NetworkSyncForkRequest> + Send + Sync + T: NetworkSyncForkRequest> + + JustificationSyncLink + + Link + + Send + + Sync { } diff --git a/client/network/src/protocol.rs b/client/network/src/protocol.rs index 8c1dd39b49be3..10eb31b595253 100644 --- a/client/network/src/protocol.rs +++ b/client/network/src/protocol.rs @@ -29,32 +29,26 @@ use libp2p::{ }, Multiaddr, PeerId, }; -use log::{debug, error, info, log, trace, warn, Level}; +use log::{debug, error, log, trace, warn, Level}; use lru::LruCache; use message::{generic::Message as GenericMessage, Message}; use notifications::{Notifications, NotificationsOut}; use prometheus_endpoint::{register, Gauge, GaugeVec, Opts, PrometheusError, Registry, U64}; use sc_client_api::HeaderBackend; -use sc_consensus::import_queue::{ - BlockImportError, BlockImportStatus, IncomingBlock, RuntimeOrigin, -}; use sc_network_common::{ config::NonReservedPeerMode, error, protocol::{role::Roles, ProtocolName}, sync::{ message::{BlockAnnounce, BlockAnnouncesHandshake, BlockData, BlockResponse, BlockState}, - BadPeer, ChainSync, ImportResult, OnBlockData, PollBlockAnnounceValidation, PollResult, - SyncStatus, + BadPeer, ChainSync, PollBlockAnnounceValidation, SyncStatus, }, utils::{interval, LruHashSet}, }; use sp_arithmetic::traits::SaturatedConversion; -use sp_consensus::BlockOrigin; use sp_runtime::{ generic::BlockId, traits::{Block as BlockT, CheckedSub, Header as HeaderT, NumberFor, Zero}, - Justifications, }; use std::{ collections::{HashMap, HashSet, VecDeque}, @@ -481,12 +475,7 @@ where } if let Some(_peer_data) = self.peers.remove(&peer) { - if let Some(OnBlockData::Import(origin, blocks)) = - self.chain_sync.peer_disconnected(&peer) - { - self.pending_messages - .push_back(CustomMessageOutcome::BlockImport(origin, blocks)); - } + self.chain_sync.peer_disconnected(&peer); self.default_peers_set_no_slot_connected_peers.remove(&peer); Ok(()) } else { @@ -785,25 +774,13 @@ where }], }, ); + self.chain_sync.process_block_response_data(blocks_to_import); if is_best { self.pending_messages.push_back(CustomMessageOutcome::PeerNewBest(who, number)); } - match blocks_to_import { - Ok(OnBlockData::Import(origin, blocks)) => - CustomMessageOutcome::BlockImport(origin, blocks), - Ok(OnBlockData::Request(peer, req)) => { - self.chain_sync.send_block_request(peer, req); - CustomMessageOutcome::None - }, - Ok(OnBlockData::Continue) => CustomMessageOutcome::None, - Err(BadPeer(id, repu)) => { - self.behaviour.disconnect_peer(&id, HARDCODED_PEERSETS_SYNC); - self.peerset_handle.report_peer(id, repu); - CustomMessageOutcome::None - }, - } + CustomMessageOutcome::None } /// Call this when a block has been finalized. The sync layer may have some additional @@ -812,58 +789,6 @@ where self.chain_sync.on_block_finalized(&hash, *header.number()) } - /// Request a justification for the given block. - /// - /// Uses `protocol` to queue a new justification request and tries to dispatch all pending - /// requests. - pub fn request_justification(&mut self, hash: &B::Hash, number: NumberFor) { - self.chain_sync.request_justification(hash, number) - } - - /// Clear all pending justification requests. - pub fn clear_justification_requests(&mut self) { - self.chain_sync.clear_justification_requests(); - } - - /// A batch of blocks have been processed, with or without errors. - /// Call this when a batch of blocks have been processed by the importqueue, with or without - /// errors. - pub fn on_blocks_processed( - &mut self, - imported: usize, - count: usize, - results: Vec<(Result>, BlockImportError>, B::Hash)>, - ) { - let results = self.chain_sync.on_blocks_processed(imported, count, results); - for result in results { - match result { - Ok((id, req)) => self.chain_sync.send_block_request(id, req), - Err(BadPeer(id, repu)) => { - self.behaviour.disconnect_peer(&id, HARDCODED_PEERSETS_SYNC); - self.peerset_handle.report_peer(id, repu) - }, - } - } - } - - /// Call this when a justification has been processed by the import queue, with or without - /// errors. - pub fn justification_import_result( - &mut self, - who: PeerId, - hash: B::Hash, - number: NumberFor, - success: bool, - ) { - self.chain_sync.on_justification_import(hash, number, success); - if !success { - info!("💔 Invalid justification provided by {} for #{}", who, hash); - self.behaviour.disconnect_peer(&who, HARDCODED_PEERSETS_SYNC); - self.peerset_handle - .report_peer(who, sc_peerset::ReputationChange::new_fatal("Invalid justification")); - } - } - /// Set whether the syncing peers set is in reserved-only mode. pub fn set_reserved_only(&self, reserved_only: bool) { self.peerset_handle.set_reserved_only(HARDCODED_PEERSETS_SYNC, reserved_only); @@ -997,8 +922,6 @@ where #[derive(Debug)] #[must_use] pub enum CustomMessageOutcome { - BlockImport(BlockOrigin, Vec>), - JustificationImport(RuntimeOrigin, B::Hash, NumberFor, Justifications), /// Notification protocols have been opened with a remote. NotificationStreamOpened { remote: PeerId, @@ -1106,23 +1029,9 @@ where // Process any received requests received from `NetworkService` and // check if there is any block announcement validation finished. while let Poll::Ready(result) = self.chain_sync.poll(cx) { - match result { - PollResult::Import(import) => self.pending_messages.push_back(match import { - ImportResult::BlockImport(origin, blocks) => - CustomMessageOutcome::BlockImport(origin, blocks), - ImportResult::JustificationImport(origin, hash, number, justifications) => - CustomMessageOutcome::JustificationImport( - origin, - hash, - number, - justifications, - ), - }), - PollResult::Announce(announce) => - match self.process_block_announce_validation_result(announce) { - CustomMessageOutcome::None => {}, - outcome => self.pending_messages.push_back(outcome), - }, + match self.process_block_announce_validation_result(result) { + CustomMessageOutcome::None => {}, + outcome => self.pending_messages.push_back(outcome), } } diff --git a/client/network/src/service.rs b/client/network/src/service.rs index d35594a07e38a..08e498299a1d3 100644 --- a/client/network/src/service.rs +++ b/client/network/src/service.rs @@ -54,7 +54,6 @@ use libp2p::{ use log::{debug, error, info, trace, warn}; use metrics::{Histogram, HistogramVec, MetricSources, Metrics}; use parking_lot::Mutex; -use sc_consensus::{BlockImportError, BlockImportStatus, ImportQueue, Link}; use sc_network_common::{ config::{MultiaddrWithPeerId, TransportConfig}, error::Error, @@ -450,7 +449,6 @@ where is_major_syncing, network_service: swarm, service, - import_queue: params.import_queue, from_service, event_streams: out_events::OutChannels::new(params.metrics_registry.as_ref())?, peers_notifications_sinks, @@ -748,13 +746,11 @@ impl sc_consensus::JustificationSyncLink for NetworkSe /// On success, the justification will be passed to the import queue that was part at /// initialization as part of the configuration. fn request_justification(&self, hash: &B::Hash, number: NumberFor) { - let _ = self - .to_worker - .unbounded_send(ServiceToWorkerMsg::RequestJustification(*hash, number)); + let _ = self.chain_sync_service.request_justification(hash, number); } fn clear_justification_requests(&self) { - let _ = self.to_worker.unbounded_send(ServiceToWorkerMsg::ClearJustificationRequests); + let _ = self.chain_sync_service.clear_justification_requests(); } } @@ -1208,8 +1204,6 @@ impl<'a> NotificationSenderReadyT for NotificationSenderReady<'a> { /// /// Each entry corresponds to a method of `NetworkService`. enum ServiceToWorkerMsg { - RequestJustification(B::Hash, NumberFor), - ClearJustificationRequests, AnnounceBlock(B::Hash, Option>), GetValue(KademliaKey), PutValue(KademliaKey, Vec), @@ -1261,8 +1255,6 @@ where service: Arc>, /// The *actual* network. network_service: Swarm>, - /// The import queue that was passed at initialization. - import_queue: Box>, /// Messages from the [`NetworkService`] that must be processed. from_service: TracingUnboundedReceiver>, /// Senders for events that happen on the network. @@ -1290,10 +1282,6 @@ where fn poll(mut self: Pin<&mut Self>, cx: &mut std::task::Context) -> Poll { let this = &mut *self; - // Poll the import queue for actions to perform. - this.import_queue - .poll_actions(cx, &mut NetworkLink { protocol: &mut this.network_service }); - // At the time of writing of this comment, due to a high volume of messages, the network // worker sometimes takes a long time to process the loop below. When that happens, the // rest of the polling is frozen. In order to avoid negative side-effects caused by this @@ -1322,16 +1310,6 @@ where .behaviour_mut() .user_protocol_mut() .announce_block(hash, data), - ServiceToWorkerMsg::RequestJustification(hash, number) => this - .network_service - .behaviour_mut() - .user_protocol_mut() - .request_justification(&hash, number), - ServiceToWorkerMsg::ClearJustificationRequests => this - .network_service - .behaviour_mut() - .user_protocol_mut() - .clear_justification_requests(), ServiceToWorkerMsg::GetValue(key) => this.network_service.behaviour_mut().get_value(key), ServiceToWorkerMsg::PutValue(key, value) => @@ -1435,23 +1413,6 @@ where match poll_value { Poll::Pending => break, - Poll::Ready(SwarmEvent::Behaviour(BehaviourOut::BlockImport(origin, blocks))) => { - if let Some(metrics) = this.metrics.as_ref() { - metrics.import_queue_blocks_submitted.inc(); - } - this.import_queue.import_blocks(origin, blocks); - }, - Poll::Ready(SwarmEvent::Behaviour(BehaviourOut::JustificationImport( - origin, - hash, - nb, - justifications, - ))) => { - if let Some(metrics) = this.metrics.as_ref() { - metrics.import_queue_justifications_submitted.inc(); - } - this.import_queue.import_justifications(origin, hash, nb, justifications); - }, Poll::Ready(SwarmEvent::Behaviour(BehaviourOut::InboundRequest { protocol, result, @@ -1952,51 +1913,6 @@ where { } -// Implementation of `import_queue::Link` trait using the available local variables. -struct NetworkLink<'a, B, Client> -where - B: BlockT, - Client: HeaderBackend + 'static, -{ - protocol: &'a mut Swarm>, -} - -impl<'a, B, Client> Link for NetworkLink<'a, B, Client> -where - B: BlockT, - Client: HeaderBackend + 'static, -{ - fn blocks_processed( - &mut self, - imported: usize, - count: usize, - results: Vec<(Result>, BlockImportError>, B::Hash)>, - ) { - self.protocol - .behaviour_mut() - .user_protocol_mut() - .on_blocks_processed(imported, count, results) - } - fn justification_imported( - &mut self, - who: PeerId, - hash: &B::Hash, - number: NumberFor, - success: bool, - ) { - self.protocol - .behaviour_mut() - .user_protocol_mut() - .justification_import_result(who, *hash, number, success); - } - fn request_justification(&mut self, hash: &B::Hash, number: NumberFor) { - self.protocol - .behaviour_mut() - .user_protocol_mut() - .request_justification(hash, number) - } -} - fn ensure_addresses_consistent_with_transport<'a>( addresses: impl Iterator, transport: &TransportConfig, diff --git a/client/network/src/service/metrics.rs b/client/network/src/service/metrics.rs index db1b6f7f6500d..a099bba716eb9 100644 --- a/client/network/src/service/metrics.rs +++ b/client/network/src/service/metrics.rs @@ -53,8 +53,6 @@ pub struct Metrics { pub connections_opened_total: CounterVec, pub distinct_peers_connections_closed_total: Counter, pub distinct_peers_connections_opened_total: Counter, - pub import_queue_blocks_submitted: Counter, - pub import_queue_justifications_submitted: Counter, pub incoming_connections_errors_total: CounterVec, pub incoming_connections_total: Counter, pub issued_light_requests: Counter, @@ -103,14 +101,6 @@ impl Metrics { "substrate_sub_libp2p_distinct_peers_connections_opened_total", "Total number of connections opened with distinct peers" )?, registry)?, - import_queue_blocks_submitted: prometheus::register(Counter::new( - "substrate_import_queue_blocks_submitted", - "Number of blocks submitted to the import queue.", - )?, registry)?, - import_queue_justifications_submitted: prometheus::register(Counter::new( - "substrate_import_queue_justifications_submitted", - "Number of justifications submitted to the import queue.", - )?, registry)?, incoming_connections_errors_total: prometheus::register(CounterVec::new( Opts::new( "substrate_sub_libp2p_incoming_connections_handshake_errors_total", diff --git a/client/network/src/service/tests/chain_sync.rs b/client/network/src/service/tests/chain_sync.rs index bd4967f25973a..0f47b64c352f2 100644 --- a/client/network/src/service/tests/chain_sync.rs +++ b/client/network/src/service/tests/chain_sync.rs @@ -86,27 +86,26 @@ async fn normal_network_poll_no_peers() { #[tokio::test] async fn request_justification() { - // build `ChainSyncInterface` provider and set no expecations for it (i.e., it cannot be - // called) - let chain_sync_service = - Box::new(MockChainSyncInterface::::new()); - - // build `ChainSync` and verify that call to `request_justification()` is made - let mut chain_sync = - Box::new(MockChainSync::::new()); - let hash = H256::random(); let number = 1337u64; - chain_sync - .expect_request_justification() + // build `ChainSyncInterface` provider and and expect + // `JustificationSyncLink::request_justification() to be called once + let mut chain_sync_service = + Box::new(MockChainSyncInterface::::new()); + + chain_sync_service + .expect_justification_sync_link_request_justification() .withf(move |in_hash, in_number| &hash == in_hash && &number == in_number) .once() .returning(|_, _| ()); + // build `ChainSync` and set default expecations for it + let mut chain_sync = MockChainSync::::new(); + set_default_expecations_no_peers(&mut chain_sync); let mut network = TestNetworkBuilder::new(Handle::current()) - .with_chain_sync((chain_sync, chain_sync_service)) + .with_chain_sync((Box::new(chain_sync), chain_sync_service)) .build(); // send "request justifiction" message and poll the network @@ -121,17 +120,20 @@ async fn request_justification() { #[tokio::test] async fn clear_justification_requests() { - // build `ChainSyncInterface` provider and set no expecations for it (i.e., it cannot be - // called) - let chain_sync_service = + // build `ChainSyncInterface` provider and expect + // `JustificationSyncLink::clear_justification_requests()` to be called + let mut chain_sync_service = Box::new(MockChainSyncInterface::::new()); - // build `ChainSync` and verify that call to `clear_justification_requests()` is made + chain_sync_service + .expect_justification_sync_link_clear_justification_requests() + .once() + .returning(|| ()); + + // build `ChainSync` and set default expecations for it let mut chain_sync = Box::new(MockChainSync::::new()); - chain_sync.expect_clear_justification_requests().once().returning(|| ()); - set_default_expecations_no_peers(&mut chain_sync); let mut network = TestNetworkBuilder::new(Handle::current()) .with_chain_sync((chain_sync, chain_sync_service)) @@ -235,19 +237,13 @@ async fn on_block_finalized() { // and verify that connection to the peer is closed #[tokio::test] async fn invalid_justification_imported() { - struct DummyImportQueue( - Arc< - RwLock< - Option<( - PeerId, - substrate_test_runtime_client::runtime::Hash, - sp_runtime::traits::NumberFor, - )>, - >, - >, - ); + struct DummyImportQueueHandle; - impl sc_consensus::ImportQueue for DummyImportQueue { + impl + sc_consensus::import_queue::ImportQueueService< + substrate_test_runtime_client::runtime::Block, + > for DummyImportQueueHandle + { fn import_blocks( &mut self, _origin: sp_consensus::BlockOrigin, @@ -265,7 +261,23 @@ async fn invalid_justification_imported() { _justifications: sp_runtime::Justifications, ) { } + } + struct DummyImportQueue( + Arc< + RwLock< + Option<( + PeerId, + substrate_test_runtime_client::runtime::Hash, + sp_runtime::traits::NumberFor, + )>, + >, + >, + DummyImportQueueHandle, + ); + + #[async_trait::async_trait] + impl sc_consensus::ImportQueue for DummyImportQueue { fn poll_actions( &mut self, _cx: &mut futures::task::Context, @@ -275,13 +287,40 @@ async fn invalid_justification_imported() { link.justification_imported(peer, &hash, number, false); } } + + fn service( + &self, + ) -> Box< + dyn sc_consensus::import_queue::ImportQueueService< + substrate_test_runtime_client::runtime::Block, + >, + > { + Box::new(DummyImportQueueHandle {}) + } + + fn service_ref( + &mut self, + ) -> &mut dyn sc_consensus::import_queue::ImportQueueService< + substrate_test_runtime_client::runtime::Block, + > { + &mut self.1 + } + + async fn run( + self, + _link: Box>, + ) { + } } let justification_info = Arc::new(RwLock::new(None)); let listen_addr = config::build_multiaddr![Memory(rand::random::())]; let (service1, mut event_stream1) = TestNetworkBuilder::new(Handle::current()) - .with_import_queue(Box::new(DummyImportQueue(justification_info.clone()))) + .with_import_queue(Box::new(DummyImportQueue( + justification_info.clone(), + DummyImportQueueHandle {}, + ))) .with_listen_addresses(vec![listen_addr.clone()]) .build() .start_network(); @@ -331,6 +370,7 @@ async fn disconnect_peer_using_chain_sync_handle() { let client = Arc::new(TestClientBuilder::with_default_backend().build_with_longest_chain().0); let listen_addr = config::build_multiaddr![Memory(rand::random::())]; + let import_queue = Box::new(sc_consensus::import_queue::mock::MockImportQueueHandle::new()); let (chain_sync_network_provider, chain_sync_network_handle) = sc_network_sync::service::network::NetworkServiceProvider::new(); let handle_clone = chain_sync_network_handle.clone(); @@ -344,7 +384,9 @@ async fn disconnect_peer_using_chain_sync_handle() { Box::new(sp_consensus::block_validation::DefaultBlockAnnounceValidator), 1u32, None, + None, chain_sync_network_handle.clone(), + import_queue, ProtocolName::from("block-request"), ProtocolName::from("state-request"), None, @@ -353,7 +395,7 @@ async fn disconnect_peer_using_chain_sync_handle() { let (node1, mut event_stream1) = TestNetworkBuilder::new(Handle::current()) .with_listen_addresses(vec![listen_addr.clone()]) - .with_chain_sync((Box::new(chain_sync), chain_sync_service)) + .with_chain_sync((Box::new(chain_sync), Box::new(chain_sync_service))) .with_chain_sync_network((chain_sync_network_provider, chain_sync_network_handle)) .with_client(client.clone()) .build() diff --git a/client/network/src/service/tests/mod.rs b/client/network/src/service/tests/mod.rs index f8635e39e9da9..fa1486a791213 100644 --- a/client/network/src/service/tests/mod.rs +++ b/client/network/src/service/tests/mod.rs @@ -21,7 +21,7 @@ use crate::{config, ChainSyncInterface, NetworkService, NetworkWorker}; use futures::prelude::*; use libp2p::Multiaddr; use sc_client_api::{BlockBackend, HeaderBackend}; -use sc_consensus::ImportQueue; +use sc_consensus::{ImportQueue, Link}; use sc_network_common::{ config::{ NonDefaultSetConfig, NonReservedPeerMode, NotificationHandshake, ProtocolId, SetConfig, @@ -93,6 +93,7 @@ impl TestNetwork { struct TestNetworkBuilder { import_queue: Option>>, + link: Option>>, client: Option>, listen_addresses: Vec, set_config: Option, @@ -106,6 +107,7 @@ impl TestNetworkBuilder { pub fn new(rt_handle: Handle) -> Self { Self { import_queue: None, + link: None, client: None, listen_addresses: Vec::new(), set_config: None, @@ -212,13 +214,14 @@ impl TestNetworkBuilder { } } - let import_queue = self.import_queue.unwrap_or(Box::new(sc_consensus::BasicQueue::new( - PassThroughVerifier(false), - Box::new(client.clone()), - None, - &sp_core::testing::TaskExecutor::new(), - None, - ))); + let mut import_queue = + self.import_queue.unwrap_or(Box::new(sc_consensus::BasicQueue::new( + PassThroughVerifier(false), + Box::new(client.clone()), + None, + &sp_core::testing::TaskExecutor::new(), + None, + ))); let protocol_id = ProtocolId::from("test-protocol-name"); let fork_id = Some(String::from("test-fork-id")); @@ -289,15 +292,23 @@ impl TestNetworkBuilder { Box::new(sp_consensus::block_validation::DefaultBlockAnnounceValidator), network_config.max_parallel_downloads, None, + None, chain_sync_network_handle, + import_queue.service(), block_request_protocol_config.name.clone(), state_request_protocol_config.name.clone(), None, ) .unwrap(); - (Box::new(chain_sync), chain_sync_service) + if let None = self.link { + self.link = Some(Box::new(chain_sync_service.clone())); + } + (Box::new(chain_sync), Box::new(chain_sync_service)) }); + let mut link = self + .link + .unwrap_or(Box::new(sc_network_sync::service::mock::MockChainSyncInterface::new())); let handle = self.rt_handle.clone(); let executor = move |f| { @@ -316,7 +327,6 @@ impl TestNetworkBuilder { chain: client.clone(), protocol_id, fork_id, - import_queue, chain_sync, chain_sync_service, metrics_registry: None, @@ -333,6 +343,16 @@ impl TestNetworkBuilder { self.rt_handle.spawn(async move { let _ = chain_sync_network_provider.run(service).await; }); + self.rt_handle.spawn(async move { + loop { + futures::future::poll_fn(|cx| { + import_queue.poll_actions(cx, &mut *link); + std::task::Poll::Ready(()) + }) + .await; + tokio::time::sleep(std::time::Duration::from_millis(250)).await; + } + }); TestNetwork::new(worker, self.rt_handle) } diff --git a/client/network/sync/Cargo.toml b/client/network/sync/Cargo.toml index 086ab3c30cc25..e29d8047161ce 100644 --- a/client/network/sync/Cargo.toml +++ b/client/network/sync/Cargo.toml @@ -28,6 +28,7 @@ prost = "0.11" smallvec = "1.8.0" thiserror = "1.0" fork-tree = { version = "3.0.0", path = "../../../utils/fork-tree" } +prometheus-endpoint = { package = "substrate-prometheus-endpoint", version = "0.10.0-dev", path = "../../../utils/prometheus" } sc-client-api = { version = "4.0.0-dev", path = "../../api" } sc-consensus = { version = "0.10.0-dev", path = "../../consensus/common" } sc-network-common = { version = "0.10.0-dev", path = "../common" } diff --git a/client/network/sync/src/lib.rs b/client/network/sync/src/lib.rs index 697445334a073..75eda91219ec8 100644 --- a/client/network/sync/src/lib.rs +++ b/client/network/sync/src/lib.rs @@ -54,9 +54,12 @@ use futures::{ }; use libp2p::{request_response::OutboundFailure, PeerId}; use log::{debug, error, info, trace, warn}; +use prometheus_endpoint::{register, Counter, PrometheusError, Registry, U64}; use prost::Message; use sc_client_api::{BlockBackend, ProofProvider}; -use sc_consensus::{BlockImportError, BlockImportStatus, IncomingBlock}; +use sc_consensus::{ + import_queue::ImportQueueService, BlockImportError, BlockImportStatus, IncomingBlock, +}; use sc_network_common::{ config::{ NonDefaultSetConfig, NonReservedPeerMode, NotificationHandshake, ProtocolId, SetConfig, @@ -71,8 +74,8 @@ use sc_network_common::{ warp::{EncodedProof, WarpProofRequest, WarpSyncPhase, WarpSyncProgress, WarpSyncProvider}, BadPeer, ChainSync as ChainSyncT, ImportResult, Metrics, OnBlockData, OnBlockJustification, OnStateData, OpaqueBlockRequest, OpaqueBlockResponse, OpaqueStateRequest, - OpaqueStateResponse, PeerInfo, PeerRequest, PollBlockAnnounceValidation, PollResult, - SyncMode, SyncState, SyncStatus, + OpaqueStateResponse, PeerInfo, PeerRequest, PollBlockAnnounceValidation, SyncMode, + SyncState, SyncStatus, }, }; use sc_utils::mpsc::{tracing_unbounded, TracingUnboundedReceiver}; @@ -233,6 +236,32 @@ impl Default for AllowedRequests { } } +struct SyncingMetrics { + pub import_queue_blocks_submitted: Counter, + pub import_queue_justifications_submitted: Counter, +} + +impl SyncingMetrics { + fn register(registry: &Registry) -> Result { + Ok(Self { + import_queue_blocks_submitted: register( + Counter::new( + "substrate_sync_import_queue_blocks_submitted", + "Number of blocks submitted to the import queue.", + )?, + registry, + )?, + import_queue_justifications_submitted: register( + Counter::new( + "substrate_sync_import_queue_justifications_submitted", + "Number of justifications submitted to the import queue.", + )?, + registry, + )?, + }) + } +} + struct GapSync { blocks: BlockCollection, best_queued_number: NumberFor, @@ -311,6 +340,10 @@ pub struct ChainSync { warp_sync_protocol_name: Option, /// Pending responses pending_responses: FuturesUnordered>, + /// Handle to import queue. + import_queue: Box>, + /// Metrics. + metrics: Option, } /// All the data we have about a Peer that we are trying to sync with @@ -961,6 +994,19 @@ where Ok(self.validate_and_queue_blocks(new_blocks, gap)) } + fn process_block_response_data(&mut self, blocks_to_import: Result, BadPeer>) { + match blocks_to_import { + Ok(OnBlockData::Import(origin, blocks)) => self.import_blocks(origin, blocks), + Ok(OnBlockData::Request(peer, req)) => self.send_block_request(peer, req), + Ok(OnBlockData::Continue) => {}, + Err(BadPeer(id, repu)) => { + self.network_service + .disconnect_peer(id, self.block_announce_protocol_name.clone()); + self.network_service.report_peer(id, repu); + }, + } + } + fn on_block_justification( &mut self, who: PeerId, @@ -1016,156 +1062,6 @@ where Ok(OnBlockJustification::Nothing) } - fn on_blocks_processed( - &mut self, - imported: usize, - count: usize, - results: Vec<(Result>, BlockImportError>, B::Hash)>, - ) -> Box), BadPeer>>> { - trace!(target: "sync", "Imported {} of {}", imported, count); - - let mut output = Vec::new(); - - let mut has_error = false; - for (_, hash) in &results { - self.queue_blocks.remove(hash); - self.blocks.clear_queued(hash); - if let Some(gap_sync) = &mut self.gap_sync { - gap_sync.blocks.clear_queued(hash); - } - } - for (result, hash) in results { - if has_error { - break - } - - if result.is_err() { - has_error = true; - } - - match result { - Ok(BlockImportStatus::ImportedKnown(number, who)) => - if let Some(peer) = who { - self.update_peer_common_number(&peer, number); - }, - Ok(BlockImportStatus::ImportedUnknown(number, aux, who)) => { - if aux.clear_justification_requests { - trace!( - target: "sync", - "Block imported clears all pending justification requests {}: {:?}", - number, - hash, - ); - self.clear_justification_requests(); - } - - if aux.needs_justification { - trace!( - target: "sync", - "Block imported but requires justification {}: {:?}", - number, - hash, - ); - self.request_justification(&hash, number); - } - - if aux.bad_justification { - if let Some(ref peer) = who { - warn!("💔 Sent block with bad justification to import"); - output.push(Err(BadPeer(*peer, rep::BAD_JUSTIFICATION))); - } - } - - if let Some(peer) = who { - self.update_peer_common_number(&peer, number); - } - let state_sync_complete = - self.state_sync.as_ref().map_or(false, |s| s.target() == hash); - if state_sync_complete { - info!( - target: "sync", - "State sync is complete ({} MiB), restarting block sync.", - self.state_sync.as_ref().map_or(0, |s| s.progress().size / (1024 * 1024)), - ); - self.state_sync = None; - self.mode = SyncMode::Full; - output.extend(self.restart()); - } - let warp_sync_complete = self - .warp_sync - .as_ref() - .map_or(false, |s| s.target_block_hash() == Some(hash)); - if warp_sync_complete { - info!( - target: "sync", - "Warp sync is complete ({} MiB), restarting block sync.", - self.warp_sync.as_ref().map_or(0, |s| s.progress().total_bytes / (1024 * 1024)), - ); - self.warp_sync = None; - self.mode = SyncMode::Full; - output.extend(self.restart()); - } - let gap_sync_complete = - self.gap_sync.as_ref().map_or(false, |s| s.target == number); - if gap_sync_complete { - info!( - target: "sync", - "Block history download is complete." - ); - self.gap_sync = None; - } - }, - Err(BlockImportError::IncompleteHeader(who)) => - if let Some(peer) = who { - warn!( - target: "sync", - "💔 Peer sent block with incomplete header to import", - ); - output.push(Err(BadPeer(peer, rep::INCOMPLETE_HEADER))); - output.extend(self.restart()); - }, - Err(BlockImportError::VerificationFailed(who, e)) => - if let Some(peer) = who { - warn!( - target: "sync", - "💔 Verification failed for block {:?} received from peer: {}, {:?}", - hash, - peer, - e, - ); - output.push(Err(BadPeer(peer, rep::VERIFICATION_FAIL))); - output.extend(self.restart()); - }, - Err(BlockImportError::BadBlock(who)) => - if let Some(peer) = who { - warn!( - target: "sync", - "💔 Block {:?} received from peer {} has been blacklisted", - hash, - peer, - ); - output.push(Err(BadPeer(peer, rep::BAD_BLOCK))); - }, - Err(BlockImportError::MissingState) => { - // This may happen if the chain we were requesting upon has been discarded - // in the meantime because other chain has been finalized. - // Don't mark it as bad as it still may be synced if explicitly requested. - trace!(target: "sync", "Obsolete block {:?}", hash); - }, - e @ Err(BlockImportError::UnknownParent) | e @ Err(BlockImportError::Other(_)) => { - warn!(target: "sync", "💔 Error importing block {:?}: {}", hash, e.unwrap_err()); - self.state_sync = None; - self.warp_sync = None; - output.extend(self.restart()); - }, - Err(BlockImportError::Cancelled) => {}, - }; - } - - self.allowed_requests.set_all(); - Box::new(output.into_iter()) - } - fn on_justification_import(&mut self, hash: B::Hash, number: NumberFor, success: bool) { let finalization_result = if success { Ok((hash, number)) } else { Err(()) }; self.extra_justifications @@ -1331,7 +1227,7 @@ where } } - fn peer_disconnected(&mut self, who: &PeerId) -> Option> { + fn peer_disconnected(&mut self, who: &PeerId) { self.blocks.clear_peer_download(who); if let Some(gap_sync) = &mut self.gap_sync { gap_sync.blocks.clear_peer_download(who) @@ -1343,8 +1239,13 @@ where target.peers.remove(who); !target.peers.is_empty() }); + let blocks = self.ready_blocks(); - (!blocks.is_empty()).then(|| self.validate_and_queue_blocks(blocks, false)) + if let Some(OnBlockData::Import(origin, blocks)) = + (!blocks.is_empty()).then(|| self.validate_and_queue_blocks(blocks, false)) + { + self.import_blocks(origin, blocks); + } } fn metrics(&self) -> Metrics { @@ -1421,22 +1322,56 @@ where .map_err(|error: codec::Error| error.to_string()) } - fn poll(&mut self, cx: &mut std::task::Context) -> Poll> { + fn poll( + &mut self, + cx: &mut std::task::Context, + ) -> Poll> { while let Poll::Ready(Some(event)) = self.service_rx.poll_next_unpin(cx) { match event { ToServiceCommand::SetSyncForkRequest(peers, hash, number) => { self.set_sync_fork_request(peers, &hash, number); }, + ToServiceCommand::RequestJustification(hash, number) => + self.request_justification(&hash, number), + ToServiceCommand::ClearJustificationRequests => self.clear_justification_requests(), + ToServiceCommand::BlocksProcessed(imported, count, results) => { + for result in self.on_blocks_processed(imported, count, results) { + match result { + Ok((id, req)) => self.send_block_request(id, req), + Err(BadPeer(id, repu)) => { + self.network_service + .disconnect_peer(id, self.block_announce_protocol_name.clone()); + self.network_service.report_peer(id, repu) + }, + } + } + }, + ToServiceCommand::JustificationImported(peer, hash, number, success) => { + self.on_justification_import(hash, number, success); + if !success { + info!(target: "sync", "💔 Invalid justification provided by {} for #{}", peer, hash); + self.network_service + .disconnect_peer(peer, self.block_announce_protocol_name.clone()); + self.network_service.report_peer( + peer, + sc_peerset::ReputationChange::new_fatal("Invalid justification"), + ); + } + }, } } self.process_outbound_requests(); - if let Poll::Ready(result) = self.poll_pending_responses(cx) { - return Poll::Ready(PollResult::Import(result)) + while let Poll::Ready(result) = self.poll_pending_responses(cx) { + match result { + ImportResult::BlockImport(origin, blocks) => self.import_blocks(origin, blocks), + ImportResult::JustificationImport(who, hash, number, justifications) => + self.import_justifications(who, hash, number, justifications), + } } if let Poll::Ready(announce) = self.poll_block_announce_validation(cx) { - return Poll::Ready(PollResult::Announce(announce)) + return Poll::Ready(announce) } Poll::Pending @@ -1494,11 +1429,13 @@ where block_announce_validator: Box + Send>, max_parallel_downloads: u32, warp_sync_provider: Option>>, + metrics_registry: Option<&Registry>, network_service: service::network::NetworkServiceHandle, + import_queue: Box>, block_request_protocol_name: ProtocolName, state_request_protocol_name: ProtocolName, warp_sync_protocol_name: Option, - ) -> Result<(Self, Box>, NonDefaultSetConfig), ClientError> { + ) -> Result<(Self, ChainSyncInterfaceHandle, NonDefaultSetConfig), ClientError> { let (tx, service_rx) = tracing_unbounded("mpsc_chain_sync"); let block_announce_config = Self::get_block_announce_proto_config( protocol_id, @@ -1544,10 +1481,22 @@ where .clone() .into(), pending_responses: Default::default(), + import_queue, + metrics: if let Some(r) = &metrics_registry { + match SyncingMetrics::register(r) { + Ok(metrics) => Some(metrics), + Err(err) => { + error!(target: "sync", "Failed to register metrics for ChainSync: {err:?}"); + None + }, + } + } else { + None + }, }; sync.reset_sync_start_point()?; - Ok((sync, Box::new(ChainSyncInterfaceHandle::new(tx)), block_announce_config)) + Ok((sync, ChainSyncInterfaceHandle::new(tx), block_announce_config)) } /// Returns the median seen block number. @@ -2173,8 +2122,10 @@ where if request.fields == BlockAttributes::JUSTIFICATION { match self.on_block_justification(peer_id, block_response) { Ok(OnBlockJustification::Nothing) => None, - Ok(OnBlockJustification::Import { peer, hash, number, justifications }) => - Some(ImportResult::JustificationImport(peer, hash, number, justifications)), + Ok(OnBlockJustification::Import { peer, hash, number, justifications }) => { + self.import_justifications(peer, hash, number, justifications); + None + }, Err(BadPeer(id, repu)) => { self.network_service .disconnect_peer(id, self.block_announce_protocol_name.clone()); @@ -2184,8 +2135,10 @@ where } } else { match self.on_block_data(&peer_id, Some(request), block_response) { - Ok(OnBlockData::Import(origin, blocks)) => - Some(ImportResult::BlockImport(origin, blocks)), + Ok(OnBlockData::Import(origin, blocks)) => { + self.import_blocks(origin, blocks); + None + }, Ok(OnBlockData::Request(peer, req)) => { self.send_block_request(peer, req); None @@ -2712,6 +2665,182 @@ where }, } } + + fn import_blocks(&mut self, origin: BlockOrigin, blocks: Vec>) { + if let Some(metrics) = &self.metrics { + metrics.import_queue_blocks_submitted.inc(); + } + + self.import_queue.import_blocks(origin, blocks); + } + + fn import_justifications( + &mut self, + peer: PeerId, + hash: B::Hash, + number: NumberFor, + justifications: Justifications, + ) { + if let Some(metrics) = &self.metrics { + metrics.import_queue_justifications_submitted.inc(); + } + + self.import_queue.import_justifications(peer, hash, number, justifications); + } + + /// A batch of blocks have been processed, with or without errors. + /// + /// Call this when a batch of blocks have been processed by the import + /// queue, with or without errors. + fn on_blocks_processed( + &mut self, + imported: usize, + count: usize, + results: Vec<(Result>, BlockImportError>, B::Hash)>, + ) -> Box), BadPeer>>> { + trace!(target: "sync", "Imported {} of {}", imported, count); + + let mut output = Vec::new(); + + let mut has_error = false; + for (_, hash) in &results { + self.queue_blocks.remove(hash); + self.blocks.clear_queued(hash); + if let Some(gap_sync) = &mut self.gap_sync { + gap_sync.blocks.clear_queued(hash); + } + } + for (result, hash) in results { + if has_error { + break + } + + if result.is_err() { + has_error = true; + } + + match result { + Ok(BlockImportStatus::ImportedKnown(number, who)) => + if let Some(peer) = who { + self.update_peer_common_number(&peer, number); + }, + Ok(BlockImportStatus::ImportedUnknown(number, aux, who)) => { + if aux.clear_justification_requests { + trace!( + target: "sync", + "Block imported clears all pending justification requests {}: {:?}", + number, + hash, + ); + self.clear_justification_requests(); + } + + if aux.needs_justification { + trace!( + target: "sync", + "Block imported but requires justification {}: {:?}", + number, + hash, + ); + self.request_justification(&hash, number); + } + + if aux.bad_justification { + if let Some(ref peer) = who { + warn!("💔 Sent block with bad justification to import"); + output.push(Err(BadPeer(*peer, rep::BAD_JUSTIFICATION))); + } + } + + if let Some(peer) = who { + self.update_peer_common_number(&peer, number); + } + let state_sync_complete = + self.state_sync.as_ref().map_or(false, |s| s.target() == hash); + if state_sync_complete { + info!( + target: "sync", + "State sync is complete ({} MiB), restarting block sync.", + self.state_sync.as_ref().map_or(0, |s| s.progress().size / (1024 * 1024)), + ); + self.state_sync = None; + self.mode = SyncMode::Full; + output.extend(self.restart()); + } + let warp_sync_complete = self + .warp_sync + .as_ref() + .map_or(false, |s| s.target_block_hash() == Some(hash)); + if warp_sync_complete { + info!( + target: "sync", + "Warp sync is complete ({} MiB), restarting block sync.", + self.warp_sync.as_ref().map_or(0, |s| s.progress().total_bytes / (1024 * 1024)), + ); + self.warp_sync = None; + self.mode = SyncMode::Full; + output.extend(self.restart()); + } + let gap_sync_complete = + self.gap_sync.as_ref().map_or(false, |s| s.target == number); + if gap_sync_complete { + info!( + target: "sync", + "Block history download is complete." + ); + self.gap_sync = None; + } + }, + Err(BlockImportError::IncompleteHeader(who)) => + if let Some(peer) = who { + warn!( + target: "sync", + "💔 Peer sent block with incomplete header to import", + ); + output.push(Err(BadPeer(peer, rep::INCOMPLETE_HEADER))); + output.extend(self.restart()); + }, + Err(BlockImportError::VerificationFailed(who, e)) => + if let Some(peer) = who { + warn!( + target: "sync", + "💔 Verification failed for block {:?} received from peer: {}, {:?}", + hash, + peer, + e, + ); + output.push(Err(BadPeer(peer, rep::VERIFICATION_FAIL))); + output.extend(self.restart()); + }, + Err(BlockImportError::BadBlock(who)) => + if let Some(peer) = who { + warn!( + target: "sync", + "💔 Block {:?} received from peer {} has been blacklisted", + hash, + peer, + ); + output.push(Err(BadPeer(peer, rep::BAD_BLOCK))); + }, + Err(BlockImportError::MissingState) => { + // This may happen if the chain we were requesting upon has been discarded + // in the meantime because other chain has been finalized. + // Don't mark it as bad as it still may be synced if explicitly requested. + trace!(target: "sync", "Obsolete block {:?}", hash); + }, + e @ Err(BlockImportError::UnknownParent) | e @ Err(BlockImportError::Other(_)) => { + warn!(target: "sync", "💔 Error importing block {:?}: {}", hash, e.unwrap_err()); + self.state_sync = None; + self.warp_sync = None; + output.extend(self.restart()); + }, + Err(BlockImportError::Cancelled) => {}, + }; + } + + self.allowed_requests.set_all(); + Box::new(output.into_iter()) + } } // This is purely during a backwards compatible transitionary period and should be removed @@ -3089,6 +3218,7 @@ mod test { let block_announce_validator = Box::new(DefaultBlockAnnounceValidator); let peer_id = PeerId::random(); + let import_queue = Box::new(sc_consensus::import_queue::mock::MockImportQueueHandle::new()); let (_chain_sync_network_provider, chain_sync_network_handle) = NetworkServiceProvider::new(); let (mut sync, _, _) = ChainSync::new( @@ -3100,7 +3230,9 @@ mod test { block_announce_validator, 1, None, + None, chain_sync_network_handle, + import_queue, ProtocolName::from("block-request"), ProtocolName::from("state-request"), None, @@ -3151,6 +3283,7 @@ mod test { #[test] fn restart_doesnt_affect_peers_downloading_finality_data() { let mut client = Arc::new(TestClientBuilder::new().build()); + let import_queue = Box::new(sc_consensus::import_queue::mock::MockImportQueueHandle::new()); let (_chain_sync_network_provider, chain_sync_network_handle) = NetworkServiceProvider::new(); @@ -3163,7 +3296,9 @@ mod test { Box::new(DefaultBlockAnnounceValidator), 1, None, + None, chain_sync_network_handle, + import_queue, ProtocolName::from("block-request"), ProtocolName::from("state-request"), None, @@ -3330,6 +3465,7 @@ mod test { sp_tracing::try_init_simple(); let mut client = Arc::new(TestClientBuilder::new().build()); + let import_queue = Box::new(sc_consensus::import_queue::mock::MockImportQueueHandle::new()); let (_chain_sync_network_provider, chain_sync_network_handle) = NetworkServiceProvider::new(); @@ -3342,7 +3478,9 @@ mod test { Box::new(DefaultBlockAnnounceValidator), 5, None, + None, chain_sync_network_handle, + import_queue, ProtocolName::from("block-request"), ProtocolName::from("state-request"), None, @@ -3453,6 +3591,7 @@ mod test { }; let mut client = Arc::new(TestClientBuilder::new().build()); + let import_queue = Box::new(sc_consensus::import_queue::mock::MockImportQueueHandle::new()); let (_chain_sync_network_provider, chain_sync_network_handle) = NetworkServiceProvider::new(); let info = client.info(); @@ -3466,7 +3605,9 @@ mod test { Box::new(DefaultBlockAnnounceValidator), 5, None, + None, chain_sync_network_handle, + import_queue, ProtocolName::from("block-request"), ProtocolName::from("state-request"), None, @@ -3584,6 +3725,7 @@ mod test { fn can_sync_huge_fork() { sp_tracing::try_init_simple(); + let import_queue = Box::new(sc_consensus::import_queue::mock::MockImportQueueHandle::new()); let (_chain_sync_network_provider, chain_sync_network_handle) = NetworkServiceProvider::new(); let mut client = Arc::new(TestClientBuilder::new().build()); @@ -3619,7 +3761,9 @@ mod test { Box::new(DefaultBlockAnnounceValidator), 5, None, + None, chain_sync_network_handle, + import_queue, ProtocolName::from("block-request"), ProtocolName::from("state-request"), None, @@ -3722,6 +3866,7 @@ mod test { fn syncs_fork_without_duplicate_requests() { sp_tracing::try_init_simple(); + let import_queue = Box::new(sc_consensus::import_queue::mock::MockImportQueueHandle::new()); let (_chain_sync_network_provider, chain_sync_network_handle) = NetworkServiceProvider::new(); let mut client = Arc::new(TestClientBuilder::new().build()); @@ -3757,7 +3902,9 @@ mod test { Box::new(DefaultBlockAnnounceValidator), 5, None, + None, chain_sync_network_handle, + import_queue, ProtocolName::from("block-request"), ProtocolName::from("state-request"), None, @@ -3881,6 +4028,7 @@ mod test { #[test] fn removes_target_fork_on_disconnect() { sp_tracing::try_init_simple(); + let import_queue = Box::new(sc_consensus::import_queue::mock::MockImportQueueHandle::new()); let (_chain_sync_network_provider, chain_sync_network_handle) = NetworkServiceProvider::new(); let mut client = Arc::new(TestClientBuilder::new().build()); @@ -3895,7 +4043,9 @@ mod test { Box::new(DefaultBlockAnnounceValidator), 1, None, + None, chain_sync_network_handle, + import_queue, ProtocolName::from("block-request"), ProtocolName::from("state-request"), None, @@ -3921,6 +4071,7 @@ mod test { #[test] fn can_import_response_with_missing_blocks() { sp_tracing::try_init_simple(); + let import_queue = Box::new(sc_consensus::import_queue::mock::MockImportQueueHandle::new()); let (_chain_sync_network_provider, chain_sync_network_handle) = NetworkServiceProvider::new(); let mut client2 = Arc::new(TestClientBuilder::new().build()); @@ -3937,7 +4088,9 @@ mod test { Box::new(DefaultBlockAnnounceValidator), 1, None, + None, chain_sync_network_handle, + import_queue, ProtocolName::from("block-request"), ProtocolName::from("state-request"), None, diff --git a/client/network/sync/src/mock.rs b/client/network/sync/src/mock.rs index 48d72c425bd03..b59ea7e4fea70 100644 --- a/client/network/sync/src/mock.rs +++ b/client/network/sync/src/mock.rs @@ -21,11 +21,10 @@ use futures::task::Poll; use libp2p::PeerId; -use sc_consensus::{BlockImportError, BlockImportStatus}; use sc_network_common::sync::{ message::{BlockAnnounce, BlockData, BlockRequest, BlockResponse}, BadPeer, ChainSync as ChainSyncT, Metrics, OnBlockData, OnBlockJustification, - OpaqueBlockResponse, PeerInfo, PollBlockAnnounceValidation, PollResult, SyncStatus, + OpaqueBlockResponse, PeerInfo, PollBlockAnnounceValidation, SyncStatus, }; use sp_runtime::traits::{Block as BlockT, NumberFor}; @@ -60,17 +59,12 @@ mockall::mock! { request: Option>, response: BlockResponse, ) -> Result, BadPeer>; + fn process_block_response_data(&mut self, blocks_to_import: Result, BadPeer>); fn on_block_justification( &mut self, who: PeerId, response: BlockResponse, ) -> Result, BadPeer>; - fn on_blocks_processed( - &mut self, - imported: usize, - count: usize, - results: Vec<(Result>, BlockImportError>, Block::Hash)>, - ) -> Box), BadPeer>>>; fn on_justification_import( &mut self, hash: Block::Hash, @@ -89,7 +83,7 @@ mockall::mock! { &mut self, cx: &mut std::task::Context<'a>, ) -> Poll>; - fn peer_disconnected(&mut self, who: &PeerId) -> Option>; + fn peer_disconnected(&mut self, who: &PeerId); fn metrics(&self) -> Metrics; fn block_response_into_blocks( &self, @@ -99,7 +93,7 @@ mockall::mock! { fn poll<'a>( &mut self, cx: &mut std::task::Context<'a>, - ) -> Poll>; + ) -> Poll>; fn send_block_request( &mut self, who: PeerId, diff --git a/client/network/sync/src/service/chain_sync.rs b/client/network/sync/src/service/chain_sync.rs index cf07c65ee3109..50ded5b643dea 100644 --- a/client/network/sync/src/service/chain_sync.rs +++ b/client/network/sync/src/service/chain_sync.rs @@ -17,6 +17,7 @@ // along with this program. If not, see . use libp2p::PeerId; +use sc_consensus::{BlockImportError, BlockImportStatus, JustificationSyncLink, Link}; use sc_network_common::service::NetworkSyncForkRequest; use sc_utils::mpsc::TracingUnboundedSender; use sp_runtime::traits::{Block as BlockT, NumberFor}; @@ -25,9 +26,18 @@ use sp_runtime::traits::{Block as BlockT, NumberFor}; #[derive(Debug)] pub enum ToServiceCommand { SetSyncForkRequest(Vec, B::Hash, NumberFor), + RequestJustification(B::Hash, NumberFor), + ClearJustificationRequests, + BlocksProcessed( + usize, + usize, + Vec<(Result>, BlockImportError>, B::Hash)>, + ), + JustificationImported(PeerId, B::Hash, NumberFor, bool), } /// Handle for communicating with `ChainSync` asynchronously +#[derive(Clone)] pub struct ChainSyncInterfaceHandle { tx: TracingUnboundedSender>, } @@ -56,3 +66,46 @@ impl NetworkSyncForkRequest> .unbounded_send(ToServiceCommand::SetSyncForkRequest(peers, hash, number)); } } + +impl JustificationSyncLink for ChainSyncInterfaceHandle { + /// Request a justification for the given block from the network. + /// + /// On success, the justification will be passed to the import queue that was part at + /// initialization as part of the configuration. + fn request_justification(&self, hash: &B::Hash, number: NumberFor) { + let _ = self.tx.unbounded_send(ToServiceCommand::RequestJustification(*hash, number)); + } + + fn clear_justification_requests(&self) { + let _ = self.tx.unbounded_send(ToServiceCommand::ClearJustificationRequests); + } +} + +impl Link for ChainSyncInterfaceHandle { + fn blocks_processed( + &mut self, + imported: usize, + count: usize, + results: Vec<(Result>, BlockImportError>, B::Hash)>, + ) { + let _ = self + .tx + .unbounded_send(ToServiceCommand::BlocksProcessed(imported, count, results)); + } + + fn justification_imported( + &mut self, + who: PeerId, + hash: &B::Hash, + number: NumberFor, + success: bool, + ) { + let _ = self + .tx + .unbounded_send(ToServiceCommand::JustificationImported(who, *hash, number, success)); + } + + fn request_justification(&mut self, hash: &B::Hash, number: NumberFor) { + let _ = self.tx.unbounded_send(ToServiceCommand::RequestJustification(*hash, number)); + } +} diff --git a/client/network/sync/src/service/mock.rs b/client/network/sync/src/service/mock.rs index c8a29e1fba8ea..d8aad2fa7bac1 100644 --- a/client/network/sync/src/service/mock.rs +++ b/client/network/sync/src/service/mock.rs @@ -18,6 +18,7 @@ use futures::channel::oneshot; use libp2p::{Multiaddr, PeerId}; +use sc_consensus::{BlockImportError, BlockImportStatus}; use sc_network_common::{ config::MultiaddrWithPeerId, protocol::ProtocolName, @@ -29,13 +30,43 @@ use sp_runtime::traits::{Block as BlockT, NumberFor}; use std::collections::HashSet; mockall::mock! { - pub ChainSyncInterface {} + pub ChainSyncInterface { + pub fn justification_sync_link_request_justification(&self, hash: &B::Hash, number: NumberFor); + pub fn justification_sync_link_clear_justification_requests(&self); + } impl NetworkSyncForkRequest> for ChainSyncInterface { fn set_sync_fork_request(&self, peers: Vec, hash: B::Hash, number: NumberFor); } + + impl sc_consensus::Link for ChainSyncInterface { + fn blocks_processed( + &mut self, + imported: usize, + count: usize, + results: Vec<(Result>, BlockImportError>, B::Hash)>, + ); + fn justification_imported( + &mut self, + who: PeerId, + hash: &B::Hash, + number: NumberFor, + success: bool, + ); + fn request_justification(&mut self, hash: &B::Hash, number: NumberFor); + } +} + +impl sc_consensus::JustificationSyncLink for MockChainSyncInterface { + fn request_justification(&self, hash: &B::Hash, number: NumberFor) { + self.justification_sync_link_request_justification(hash, number); + } + + fn clear_justification_requests(&self) { + self.justification_sync_link_clear_justification_requests(); + } } mockall::mock! { diff --git a/client/network/sync/src/tests.rs b/client/network/sync/src/tests.rs index a03e657f03ab2..61de08443a6c2 100644 --- a/client/network/sync/src/tests.rs +++ b/client/network/sync/src/tests.rs @@ -37,6 +37,7 @@ use substrate_test_runtime_client::{TestClientBuilder, TestClientBuilderExt as _ // poll `ChainSync` and verify that a new sync fork request has been registered #[tokio::test] async fn delegate_to_chainsync() { + let import_queue = Box::new(sc_consensus::import_queue::mock::MockImportQueueHandle::new()); let (_chain_sync_network_provider, chain_sync_network_handle) = NetworkServiceProvider::new(); let (mut chain_sync, chain_sync_service, _) = ChainSync::new( sc_network_common::sync::SyncMode::Full, @@ -47,7 +48,9 @@ async fn delegate_to_chainsync() { Box::new(DefaultBlockAnnounceValidator), 1u32, None, + None, chain_sync_network_handle, + import_queue, ProtocolName::from("block-request"), ProtocolName::from("state-request"), None, diff --git a/client/network/test/src/lib.rs b/client/network/test/src/lib.rs index d3642e69cb632..173ca81653b1a 100644 --- a/client/network/test/src/lib.rs +++ b/client/network/test/src/lib.rs @@ -43,8 +43,8 @@ use sc_client_api::{ }; use sc_consensus::{ BasicQueue, BlockCheckParams, BlockImport, BlockImportParams, BoxJustificationImport, - ForkChoiceStrategy, ImportResult, JustificationImport, JustificationSyncLink, LongestChain, - Verifier, + ForkChoiceStrategy, ImportQueue, ImportResult, JustificationImport, JustificationSyncLink, + LongestChain, Verifier, }; use sc_network::{ config::{NetworkConfiguration, RequestResponseConfig, Role, SyncMode}, @@ -896,7 +896,9 @@ where block_announce_validator, network_config.max_parallel_downloads, Some(warp_sync), + None, chain_sync_network_handle, + import_queue.service(), block_request_protocol_config.name.clone(), state_request_protocol_config.name.clone(), Some(warp_protocol_config.name.clone()), @@ -915,9 +917,8 @@ where chain: client.clone(), protocol_id, fork_id, - import_queue, chain_sync: Box::new(chain_sync), - chain_sync_service, + chain_sync_service: Box::new(chain_sync_service.clone()), metrics_registry: None, block_announce_config, request_response_protocol_configs: [ @@ -936,6 +937,9 @@ where self.rt_handle().spawn(async move { chain_sync_network_provider.run(service).await; }); + self.rt_handle().spawn(async move { + import_queue.run(Box::new(chain_sync_service)).await; + }); self.mut_peers(move |peers| { for peer in peers.iter_mut() { diff --git a/client/service/src/builder.rs b/client/service/src/builder.rs index dd89ce6dff10a..7153672030d6a 100644 --- a/client/service/src/builder.rs +++ b/client/service/src/builder.rs @@ -853,7 +853,9 @@ where block_announce_validator, config.network.max_parallel_downloads, warp_sync_provider, + config.prometheus_config.as_ref().map(|config| config.registry.clone()).as_ref(), chain_sync_network_handle, + import_queue.service(), block_request_protocol_config.name.clone(), state_request_protocol_config.name.clone(), warp_sync_protocol_config.as_ref().map(|config| config.name.clone()), @@ -877,9 +879,8 @@ where chain: client.clone(), protocol_id: protocol_id.clone(), fork_id: config.chain_spec.fork_id().map(ToOwned::to_owned), - import_queue: Box::new(import_queue), chain_sync: Box::new(chain_sync), - chain_sync_service, + chain_sync_service: Box::new(chain_sync_service.clone()), metrics_registry: config.prometheus_config.as_ref().map(|config| config.registry.clone()), block_announce_config, request_response_protocol_configs: request_response_protocol_configs @@ -925,6 +926,7 @@ where Some("networking"), chain_sync_network_provider.run(network.clone()), ); + spawn_handle.spawn("import-queue", None, import_queue.run(Box::new(chain_sync_service))); let (system_rpc_tx, system_rpc_rx) = tracing_unbounded("mpsc_system_rpc"); diff --git a/client/service/src/chain_ops/import_blocks.rs b/client/service/src/chain_ops/import_blocks.rs index c0612124dd0c2..ca09c1658d72f 100644 --- a/client/service/src/chain_ops/import_blocks.rs +++ b/client/service/src/chain_ops/import_blocks.rs @@ -157,7 +157,7 @@ fn import_block_to_queue( let (header, extrinsics) = signed_block.block.deconstruct(); let hash = header.hash(); // import queue handles verification and importing it into the client. - queue.import_blocks( + queue.service_ref().import_blocks( BlockOrigin::File, vec![IncomingBlock:: { hash,