From 64a538d025882c6127f445d4524e47ae83c96178 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Fran=C3=A7ois=20RIBEAU?= Date: Thu, 11 Jul 2024 11:59:59 +0200 Subject: [PATCH] fix(relay): wake the relay Listener on close --- protocols/relay/src/priv_client/transport.rs | 24 +++++++++++++++----- 1 file changed, 18 insertions(+), 6 deletions(-) diff --git a/protocols/relay/src/priv_client/transport.rs b/protocols/relay/src/priv_client/transport.rs index 7147f0b5e55f..c4ece79a0a6f 100644 --- a/protocols/relay/src/priv_client/transport.rs +++ b/protocols/relay/src/priv_client/transport.rs @@ -27,7 +27,6 @@ use crate::RequestId; use futures::channel::mpsc; use futures::channel::oneshot; use futures::future::{ready, BoxFuture, FutureExt, Ready}; -use futures::ready; use futures::sink::SinkExt; use futures::stream::SelectAll; use futures::stream::{Stream, StreamExt}; @@ -36,7 +35,7 @@ use libp2p_core::transport::{ListenerId, TransportError, TransportEvent}; use libp2p_identity::PeerId; use std::collections::VecDeque; use std::pin::Pin; -use std::task::{Context, Poll}; +use std::task::{Context, Poll, Waker}; use thiserror::Error; /// A [`Transport`] enabling client relay capabilities. @@ -151,6 +150,7 @@ impl libp2p_core::Transport for Transport { queued_events: Default::default(), from_behaviour, is_closed: false, + waker: None, }; self.listeners.push(listener); Ok(()) @@ -313,6 +313,7 @@ pub(crate) struct Listener { /// The listener can be closed either manually with [`Transport::remove_listener`](libp2p_core::Transport) or if /// the sender side of the `from_behaviour` channel is dropped. is_closed: bool, + waker: Option, } impl Listener { @@ -328,6 +329,10 @@ impl Listener { reason, }); self.is_closed = true; + + if let Some(waker) = self.waker.take() { + waker.wake(); + } } } @@ -345,10 +350,17 @@ impl Stream for Listener { return Poll::Ready(None); } - let Some(msg) = ready!(self.from_behaviour.poll_next_unpin(cx)) else { - // Sender of `from_behaviour` has been dropped, signaling listener to close. - self.close(Ok(())); - continue; + let msg = match self.from_behaviour.poll_next_unpin(cx) { + Poll::Ready(Some(msg)) => msg, + Poll::Ready(None) => { + // Sender of `from_behaviour` has been dropped, signaling listener to close. + self.close(Ok(())); + continue; + } + Poll::Pending => { + self.waker = Some(cx.waker().clone()); + return Poll::Pending; + } }; match msg {