From 1b748927656fdb440fceb9469cf4b97ba0332bba Mon Sep 17 00:00:00 2001 From: sam elamin Date: Mon, 28 Nov 2022 14:43:17 +0000 Subject: [PATCH] Remove blocking based on PR comments and create new `WarpSync` on poll --- client/network/common/src/sync.rs | 10 ++++++++++ client/network/sync/src/lib.rs | 27 +++++++++++++++++---------- client/network/sync/src/mock.rs | 4 ++++ 3 files changed, 31 insertions(+), 10 deletions(-) diff --git a/client/network/common/src/sync.rs b/client/network/common/src/sync.rs index bed9935698769..592ef690b5657 100644 --- a/client/network/common/src/sync.rs +++ b/client/network/common/src/sync.rs @@ -375,6 +375,16 @@ pub trait ChainSync: Send { cx: &mut std::task::Context<'a>, ) -> Poll>; + /// Poll warp sync target block + /// + /// This should be polled until it returns [`target_block`]. + /// + /// If [`target_block`] is returned, then `WarpSync::new` is called with a target header + fn poll_warp_sync_target_block<'a>( + &mut self, + cx: &mut std::task::Context<'a>, + ) -> Poll; + /// Call when a peer has disconnected. /// Canceled obsolete block request may result in some blocks being ready for /// import, so this functions checks for such blocks and returns them. diff --git a/client/network/sync/src/lib.rs b/client/network/sync/src/lib.rs index c4dbabaa6e3ce..34bf821d9d6f8 100644 --- a/client/network/sync/src/lib.rs +++ b/client/network/sync/src/lib.rs @@ -610,16 +610,7 @@ where warp_with_provider.clone(), )); }, - Some(WarpSyncParams::WaitForTarget(target_block)) => { - log::debug!(target: "sync", "Waiting for target block."); - futures::executor::block_on(async { - self.warp_sync = Some(WarpSync::new_with_target_block( - self.client.clone(), - target_block.await.unwrap(), - )); - }); - }, - None => {}, + _ => {}, } } } @@ -1345,6 +1336,16 @@ where } } + fn poll_warp_sync_target_block(&mut self, cx: &mut std::task::Context) -> Poll { + if let Some(WarpSyncParams::WaitForTarget(target_block)) = self.warp_sync_params.as_mut() { + return match target_block.poll_unpin(cx) { + Poll::Ready(Ok(target_block)) => Poll::Ready(target_block), + _ => Poll::Pending, + }; + } + Poll::Pending + } + fn peer_disconnected(&mut self, who: &PeerId) -> Option> { self.blocks.clear_peer_download(who); if let Some(gap_sync) = &mut self.gap_sync { @@ -1444,6 +1445,12 @@ where } } self.process_outbound_requests(); + match self.poll_warp_sync_target_block(cx) { + Poll::Ready(target_block) => + self.warp_sync = + Some(WarpSync::new_with_target_block(self.client.clone(), target_block)), + Poll::Pending => (), + }; if let Poll::Ready(result) = self.poll_pending_responses(cx) { return Poll::Ready(PollResult::Import(result)) diff --git a/client/network/sync/src/mock.rs b/client/network/sync/src/mock.rs index 48d72c425bd03..4ea8a7a5a8279 100644 --- a/client/network/sync/src/mock.rs +++ b/client/network/sync/src/mock.rs @@ -89,6 +89,10 @@ mockall::mock! { &mut self, cx: &mut std::task::Context<'a>, ) -> Poll>; + fn poll_warp_sync_target_block<'a>( + &mut self, + cx: &mut std::task::Context<'a>, + ) -> Poll; fn peer_disconnected(&mut self, who: &PeerId) -> Option>; fn metrics(&self) -> Metrics; fn block_response_into_blocks(