From 2a97b8a954565716f087d7678e8c757669c3fa16 Mon Sep 17 00:00:00 2001 From: Dmitry Markin Date: Thu, 28 Sep 2023 09:17:02 +0300 Subject: [PATCH 1/3] Wake the task up when pushing a new response into `PendingResponses` --- .../network/sync/src/pending_responses.rs | 17 +++++++++++++---- 1 file changed, 13 insertions(+), 4 deletions(-) diff --git a/substrate/client/network/sync/src/pending_responses.rs b/substrate/client/network/sync/src/pending_responses.rs index c863267e7808..441451483989 100644 --- a/substrate/client/network/sync/src/pending_responses.rs +++ b/substrate/client/network/sync/src/pending_responses.rs @@ -17,7 +17,7 @@ // along with this program. If not, see . //! [`PendingResponses`] is responsible for keeping track of pending responses and -//! polling them. +//! polling them. The [`Stream`] implemented by [`PendingResponses`] never terminates. use futures::{ channel::oneshot, @@ -50,6 +50,7 @@ pub(crate) struct ResponseEvent { pub(crate) struct PendingResponses { /// Pending responses pending_responses: StreamMap, ResponseResult)>>, + waker: Option, } impl PendingResponses { @@ -79,6 +80,10 @@ impl PendingResponses { ); debug_assert!(false); } + + if let Some(waker) = self.waker.take() { + waker.wake(); + } } pub fn remove(&mut self, peer_id: &PeerId) -> bool { @@ -99,8 +104,8 @@ impl Stream for PendingResponses { mut self: std::pin::Pin<&mut Self>, cx: &mut Context<'_>, ) -> Poll> { - match futures::ready!(self.pending_responses.poll_next_unpin(cx)) { - Some((peer_id, (request, response))) => { + match self.pending_responses.poll_next_unpin(cx) { + Poll::Ready(Some((peer_id, (request, response)))) => { // We need to manually remove the stream, because `StreamMap` doesn't know yet that // it's going to yield `None`, so may not remove it before the next request is made // to the same peer. @@ -108,7 +113,11 @@ impl Stream for PendingResponses { Poll::Ready(Some(ResponseEvent { peer_id, request, response })) }, - None => Poll::Ready(None), + Poll::Ready(None) | Poll::Pending => { + self.waker = Some(cx.waker().clone()); + + Poll::Pending + }, } } } From 6b9eca8a46797249bee5b710215a92b58582e369 Mon Sep 17 00:00:00 2001 From: Dmitry Markin Date: Thu, 28 Sep 2023 09:38:27 +0300 Subject: [PATCH 2/3] minor: add missing import --- substrate/client/network/sync/src/pending_responses.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/substrate/client/network/sync/src/pending_responses.rs b/substrate/client/network/sync/src/pending_responses.rs index 441451483989..917f5d0f1f00 100644 --- a/substrate/client/network/sync/src/pending_responses.rs +++ b/substrate/client/network/sync/src/pending_responses.rs @@ -30,7 +30,7 @@ use log::error; use sc_network::request_responses::RequestFailure; use sc_network_common::sync::PeerRequest; use sp_runtime::traits::Block as BlockT; -use std::task::{Context, Poll}; +use std::task::{Context, Poll, Waker}; use tokio_stream::StreamMap; /// Response result. From a61f47d679552c5c5c1aba6a599ca92dcfe946da Mon Sep 17 00:00:00 2001 From: Dmitry Markin Date: Thu, 28 Sep 2023 11:12:04 +0300 Subject: [PATCH 3/3] minor: add missing init --- substrate/client/network/sync/src/pending_responses.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/substrate/client/network/sync/src/pending_responses.rs b/substrate/client/network/sync/src/pending_responses.rs index 917f5d0f1f00..1706242c2e5f 100644 --- a/substrate/client/network/sync/src/pending_responses.rs +++ b/substrate/client/network/sync/src/pending_responses.rs @@ -55,7 +55,7 @@ pub(crate) struct PendingResponses { impl PendingResponses { pub fn new() -> Self { - Self { pending_responses: StreamMap::new() } + Self { pending_responses: StreamMap::new(), waker: None } } pub fn insert(