From 41e2d5dc4ad37af4b3256dc7c089188d1589e6e5 Mon Sep 17 00:00:00 2001 From: stormshield-frb <144998884+stormshield-frb@users.noreply.github.com> Date: Mon, 29 Jul 2024 21:43:39 +0200 Subject: [PATCH] fix(relay): wake the relay Listener on close When closing a relayed `Listener` manually, the `TransportEvent::ListenerClosed` generated by the `relay::priv_client::Transport` is never forwarded back up to the `Swarm`, causing the `Swarm` to never remove the corresponding listener and never emitting the `SwarmEvent::ListenerClosed` event. This happens because, when stopping a relayed listener manually, the call to the [`close()` function](https://github.com/libp2p/rust-libp2p/blob/master/protocols/relay/src/priv_client/transport.rs#L324), is done outside the `poll` function, which mean nothing is triggering a wake up call to wake up the polling. Unfortunately, even if the [`listeners` (`SelectAll`) is always polled](https://github.com/libp2p/rust-libp2p/blob/master/protocols/relay/src/priv_client/transport.rs#L241) after a call to the `close` method, since `SelectAll` uses a `FuturesUnordered` internally, the poll does nothing. Indeed, the `FuturesUnordered` states that: ```rust /// This structure is optimized to manage a large number of futures. /// Futures managed by [`FuturesUnordered`] will only be polled when they /// generate wake-up notifications. This reduces the required amount of work /// needed to poll large numbers of futures. ``` Since means that when closing a relayed listener manually (calling `swarm.remove_listener`), it is never removed. This PR fixes that by triggering a `waker` when calling the `close` function. Pull-Request: #5491. --- protocols/relay/CHANGELOG.md | 3 +++ protocols/relay/src/priv_client/transport.rs | 26 +++++++++++++++----- 2 files changed, 23 insertions(+), 6 deletions(-) diff --git a/protocols/relay/CHANGELOG.md b/protocols/relay/CHANGELOG.md index f49e57af0f7..97638d1ae6a 100644 --- a/protocols/relay/CHANGELOG.md +++ b/protocols/relay/CHANGELOG.md @@ -1,9 +1,12 @@ ## 0.17.3 - Use `web-time` instead of `instant`. See [PR 5347](https://github.com/libp2p/rust-libp2p/pull/5347). +- Fix manual closure of relayed listener. + See [PR 5491](https://github.com/libp2p/rust-libp2p/pull/5491) - Add resource limits to `CircuitReq` to be set See [PR 5493](https://github.com/libp2p/rust-libp2p/pull/5493) + ## 0.17.2 - Fix support for unlimited relay connection according to spec. diff --git a/protocols/relay/src/priv_client/transport.rs b/protocols/relay/src/priv_client/transport.rs index 7147f0b5e55..b4374aa4672 100644 --- a/protocols/relay/src/priv_client/transport.rs +++ b/protocols/relay/src/priv_client/transport.rs @@ -27,7 +27,6 @@ use crate::RequestId; use futures::channel::mpsc; use futures::channel::oneshot; use futures::future::{ready, BoxFuture, FutureExt, Ready}; -use futures::ready; use futures::sink::SinkExt; use futures::stream::SelectAll; use futures::stream::{Stream, StreamExt}; @@ -36,7 +35,7 @@ use libp2p_core::transport::{ListenerId, TransportError, TransportEvent}; use libp2p_identity::PeerId; use std::collections::VecDeque; use std::pin::Pin; -use std::task::{Context, Poll}; +use std::task::{Context, Poll, Waker}; use thiserror::Error; /// A [`Transport`] enabling client relay capabilities. @@ -151,6 +150,7 @@ impl libp2p_core::Transport for Transport { queued_events: Default::default(), from_behaviour, is_closed: false, + waker: None, }; self.listeners.push(listener); Ok(()) @@ -313,6 +313,7 @@ pub(crate) struct Listener { /// The listener can be closed either manually with [`Transport::remove_listener`](libp2p_core::Transport) or if /// the sender side of the `from_behaviour` channel is dropped. is_closed: bool, + waker: Option, } impl Listener { @@ -328,6 +329,10 @@ impl Listener { reason, }); self.is_closed = true; + + if let Some(waker) = self.waker.take() { + waker.wake(); + } } } @@ -337,18 +342,27 @@ impl Stream for Listener { fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { loop { if let Some(event) = self.queued_events.pop_front() { + self.waker = None; return Poll::Ready(Some(event)); } if self.is_closed { // Terminate the stream if the listener closed and all remaining events have been reported. + self.waker = None; return Poll::Ready(None); } - let Some(msg) = ready!(self.from_behaviour.poll_next_unpin(cx)) else { - // Sender of `from_behaviour` has been dropped, signaling listener to close. - self.close(Ok(())); - continue; + let msg = match self.from_behaviour.poll_next_unpin(cx) { + Poll::Ready(Some(msg)) => msg, + Poll::Ready(None) => { + // Sender of `from_behaviour` has been dropped, signaling listener to close. + self.close(Ok(())); + continue; + } + Poll::Pending => { + self.waker = Some(cx.waker().clone()); + return Poll::Pending; + } }; match msg {