Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Wake the task up when inserting a new response into PendingResponses #1735

Closed
wants to merge 3 commits into from
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 15 additions & 6 deletions substrate/client/network/sync/src/pending_responses.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
// along with this program. If not, see <https://www.gnu.org/licenses/>.

//! [`PendingResponses`] is responsible for keeping track of pending responses and
//! polling them.
//! polling them. The [`Stream`] implemented by [`PendingResponses`] never terminates.

use futures::{
channel::oneshot,
Expand All @@ -30,7 +30,7 @@ use log::error;
use sc_network::request_responses::RequestFailure;
use sc_network_common::sync::PeerRequest;
use sp_runtime::traits::Block as BlockT;
use std::task::{Context, Poll};
use std::task::{Context, Poll, Waker};
use tokio_stream::StreamMap;

/// Response result.
Expand All @@ -50,11 +50,12 @@ pub(crate) struct ResponseEvent<B: BlockT> {
pub(crate) struct PendingResponses<B: BlockT> {
/// Pending responses
pending_responses: StreamMap<PeerId, BoxStream<'static, (PeerRequest<B>, ResponseResult)>>,
waker: Option<Waker>,
}

impl<B: BlockT> PendingResponses<B> {
pub fn new() -> Self {
Self { pending_responses: StreamMap::new() }
Self { pending_responses: StreamMap::new(), waker: None }
}

pub fn insert(
Expand All @@ -79,6 +80,10 @@ impl<B: BlockT> PendingResponses<B> {
);
debug_assert!(false);
}

if let Some(waker) = self.waker.take() {
waker.wake();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This requires that we called poll before. Not sure that this is removing any footgun.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looking at the code we are already polling pending_responses after we inserted any kind of element, so why is the task not waking up?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This requires that we called poll before. Not sure that this is removing any footgun.

Yeah, but we definitely called it at least once.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looking at the code we are already polling pending_responses after we inserted any kind of element, so why is the task not waking up?

There is no problem with waking up right now, but it can shoot us once the polling in SyncingEngine is converted into select! and the order of inserting / polling pending_responses is not strictly defined.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

But for a select!. Once you insert data to the pending_responses your match arm will be finished and you start again from the top of the loop, aka calling select! again. This will poll all futures, including your pending_responses futures. So, not sure why we could miss there to poll it.

}
}

pub fn remove(&mut self, peer_id: &PeerId) -> bool {
Expand All @@ -99,16 +104,20 @@ impl<B: BlockT> Stream for PendingResponses<B> {
mut self: std::pin::Pin<&mut Self>,
cx: &mut Context<'_>,
) -> Poll<Option<Self::Item>> {
match futures::ready!(self.pending_responses.poll_next_unpin(cx)) {
Some((peer_id, (request, response))) => {
match self.pending_responses.poll_next_unpin(cx) {
Poll::Ready(Some((peer_id, (request, response)))) => {
// We need to manually remove the stream, because `StreamMap` doesn't know yet that
// it's going to yield `None`, so may not remove it before the next request is made
// to the same peer.
self.pending_responses.remove(&peer_id);

Poll::Ready(Some(ResponseEvent { peer_id, request, response }))
},
None => Poll::Ready(None),
Poll::Ready(None) | Poll::Pending => {
self.waker = Some(cx.waker().clone());

Poll::Pending
},
}
}
}
Loading