From 576a00caab30f2da5aea9fab2dcc7373a827ec2d Mon Sep 17 00:00:00 2001 From: Stan Bondi Date: Mon, 3 Jan 2022 15:07:33 +0200 Subject: [PATCH] fix: end stale outbound queue immediately on disconnect, retry outbound messages (#3664) Description --- - immediately end the outbound messaging stream on peer connection disconnect - if messages remain in the outbound channel, flush them to a retry queue Motivation and Context --- Due to yamux substream internals, the outbound stream does not indicate it has ended until we attempt to write a message. Causing the outbound stream to remain active (i.e. available for sending) when it cannot send a message. This message and potentially any others that are queued up will then be dropped. This is rectified by cutting off the outbound queue channel as soon as a connection is disconnected and 'rerouting' the queued messages (if any) to be retried. Retry will attempt to reconnect to the peer and failing so, will drop the queued messages since there is nothing we can do in this case. This increases messaging reliability. How Has This Been Tested? --- Manually by sending transaction and ping messages between wallets and base nodes that are banned/shutdown --- .../src/connection_manager/peer_connection.rs | 15 +- comms/src/protocol/messaging/outbound.rs | 136 +++++++++++------- comms/src/protocol/messaging/protocol.rs | 34 ++++- 3 files changed, 125 insertions(+), 60 deletions(-) diff --git a/comms/src/connection_manager/peer_connection.rs b/comms/src/connection_manager/peer_connection.rs index 65ee916c9c..912f3a141e 100644 --- a/comms/src/connection_manager/peer_connection.rs +++ b/comms/src/connection_manager/peer_connection.rs @@ -22,6 +22,7 @@ use std::{ fmt, + future::Future, sync::{ atomic::{AtomicUsize, Ordering}, Arc, @@ -185,6 +186,12 @@ impl PeerConnection { !self.request_tx.is_closed() } + /// Returns a owned future that resolves on disconnection + pub fn on_disconnect(&self) -> impl Future + 'static { + let request_tx = self.request_tx.clone(); + async move { request_tx.closed().await } + } + pub fn age(&self) -> Duration { self.started_at.elapsed() } @@ -354,7 +361,7 @@ impl PeerConnectionActor { match maybe_request { Some(request) => self.handle_request(request).await, None => { - debug!(target: LOG_TARGET, "[{}] All peer connection handled dropped closing the connection", self); + debug!(target: LOG_TARGET, "[{}] All peer connection handles dropped closing the connection", self); break; } } @@ -468,11 +475,7 @@ impl PeerConnectionActor { } async fn notify_event(&mut self, event: ConnectionManagerEvent) { - log_if_error!( - target: LOG_TARGET, - self.event_notifier.send(event).await, - "Failed to send connection manager notification because '{}'", - ); + let _ = self.event_notifier.send(event).await; } /// Disconnect this peer connection. diff --git a/comms/src/protocol/messaging/outbound.rs b/comms/src/protocol/messaging/outbound.rs index 55549740c9..2220373e2b 100644 --- a/comms/src/protocol/messaging/outbound.rs +++ b/comms/src/protocol/messaging/outbound.rs @@ -23,7 +23,7 @@ use std::time::{Duration, Instant}; use futures::{future::Either, SinkExt, StreamExt, TryStreamExt}; -use tokio::sync::mpsc as tokiompsc; +use tokio::sync::mpsc; use tracing::{debug, error, event, span, Instrument, Level}; use super::{error::MessagingProtocolError, metrics, MessagingEvent, MessagingProtocol, SendFailReason}; @@ -44,8 +44,9 @@ const MAX_SEND_RETRIES: usize = 1; pub struct OutboundMessaging { connectivity: ConnectivityRequester, - request_rx: tokiompsc::UnboundedReceiver, - messaging_events_tx: tokiompsc::Sender, + messages_rx: mpsc::UnboundedReceiver, + messaging_events_tx: mpsc::Sender, + retry_queue_tx: mpsc::UnboundedSender, peer_node_id: NodeId, inactivity_timeout: Option, } @@ -53,15 +54,17 @@ pub struct OutboundMessaging { impl OutboundMessaging { pub fn new( connectivity: ConnectivityRequester, - messaging_events_tx: tokiompsc::Sender, - request_rx: tokiompsc::UnboundedReceiver, + messaging_events_tx: mpsc::Sender, + messages_rx: mpsc::UnboundedReceiver, + retry_queue_tx: mpsc::UnboundedSender, peer_node_id: NodeId, inactivity_timeout: Option, ) -> Self { Self { connectivity, - request_rx, + messages_rx, messaging_events_tx, + retry_queue_tx, peer_node_id, inactivity_timeout, } @@ -77,8 +80,7 @@ impl OutboundMessaging { async move { debug!( target: LOG_TARGET, - "Attempting to dial peer '{}' if required", - self.peer_node_id.short_str() + "Attempting to dial peer '{}' if required", self.peer_node_id ); let peer_node_id = self.peer_node_id.clone(); let messaging_events_tx = self.messaging_events_tx.clone(); @@ -87,25 +89,23 @@ impl OutboundMessaging { event!( Level::DEBUG, "Outbound messaging for peer '{}' has stopped because the stream was closed", - peer_node_id.short_str() + peer_node_id ); debug!( target: LOG_TARGET, - "Outbound messaging for peer '{}' has stopped because the stream was closed", - peer_node_id.short_str() + "Outbound messaging for peer '{}' has stopped because the stream was closed", peer_node_id ); }, Err(MessagingProtocolError::Inactivity) => { event!( Level::DEBUG, "Outbound messaging for peer '{}' has stopped because it was inactive", - peer_node_id.short_str() + peer_node_id ); debug!( target: LOG_TARGET, - "Outbound messaging for peer '{}' has stopped because it was inactive", - peer_node_id.short_str() + "Outbound messaging for peer '{}' has stopped because it was inactive", peer_node_id ); }, Err(MessagingProtocolError::PeerDialFailed(err)) => { @@ -135,11 +135,11 @@ impl OutboundMessaging { async fn run_inner(mut self) -> Result<(), MessagingProtocolError> { let mut attempts = 0; - let substream = loop { + let (conn, substream) = loop { match self.try_establish().await { - Ok(substream) => { + Ok(conn_and_substream) => { event!(Level::DEBUG, "Substream established"); - break substream; + break conn_and_substream; }, Err(err) => { if attempts >= MAX_SEND_RETRIES { @@ -158,7 +158,7 @@ impl OutboundMessaging { }, } }; - self.start_forwarding_messages(substream).await?; + self.start_forwarding_messages(conn, substream).await?; Ok(()) } @@ -178,16 +178,14 @@ impl OutboundMessaging { target: LOG_TARGET, "Dial was cancelled for peer '{}'. This is probably because of connection tie-breaking. \ Retrying...", - self.peer_node_id.short_str(), + self.peer_node_id, ); continue; }, Err(err) => { debug!( target: LOG_TARGET, - "MessagingProtocol failed to dial peer '{}' because '{:?}'", - self.peer_node_id.short_str(), - err + "MessagingProtocol failed to dial peer '{}' because '{:?}'", self.peer_node_id, err ); break Err(MessagingProtocolError::PeerDialFailed(err)); @@ -199,7 +197,9 @@ impl OutboundMessaging { .await } - async fn try_establish(&mut self) -> Result, MessagingProtocolError> { + async fn try_establish( + &mut self, + ) -> Result<(PeerConnection, NegotiatedSubstream), MessagingProtocolError> { let span = span!( Level::DEBUG, "establish_connection", @@ -208,24 +208,22 @@ impl OutboundMessaging { async move { debug!( target: LOG_TARGET, - "Attempting to establish messaging protocol connection to peer `{}`", - self.peer_node_id.short_str() + "Attempting to establish messaging protocol connection to peer `{}`", self.peer_node_id ); let start = Instant::now(); - let conn = self.try_dial_peer().await?; + let mut conn = self.try_dial_peer().await?; debug!( target: LOG_TARGET, "Connection succeeded for peer `{}` in {:.0?}", - self.peer_node_id.short_str(), + self.peer_node_id, start.elapsed() ); - let substream = self.try_open_substream(conn).await?; + let substream = self.try_open_substream(&mut conn).await?; debug!( target: LOG_TARGET, - "Substream established for peer `{}`", - self.peer_node_id.short_str(), + "Substream established for peer `{}`", self.peer_node_id, ); - Ok(substream) + Ok((conn, substream)) } .instrument(span) .await @@ -233,7 +231,7 @@ impl OutboundMessaging { async fn try_open_substream( &mut self, - mut conn: PeerConnection, + conn: &mut PeerConnection, ) -> Result, MessagingProtocolError> { let span = span!( Level::DEBUG, @@ -247,7 +245,7 @@ impl OutboundMessaging { debug!( target: LOG_TARGET, "MessagingProtocol failed to open a substream to peer '{}' because '{}'", - self.peer_node_id.short_str(), + self.peer_node_id, err ); Err(err.into()) @@ -260,44 +258,43 @@ impl OutboundMessaging { async fn start_forwarding_messages( self, + conn: PeerConnection, substream: NegotiatedSubstream, ) -> Result<(), MessagingProtocolError> { + let Self { + mut messages_rx, + inactivity_timeout, + peer_node_id, + .. + } = self; let span = span!( Level::DEBUG, "start_forwarding_messages", - node_id = self.peer_node_id.to_string().as_str() + node_id = peer_node_id.to_string().as_str() ); let _enter = span.enter(); debug!( target: LOG_TARGET, - "Starting direct message forwarding for peer `{}`", - self.peer_node_id.short_str() + "Starting direct message forwarding for peer `{}`", peer_node_id ); - let substream = substream.stream; - - let framed = MessagingProtocol::framed(substream); - let Self { - request_rx, - inactivity_timeout, - .. - } = self; + let framed = MessagingProtocol::framed(substream.stream); // Convert unbounded channel to a stream - let stream = futures::stream::unfold(request_rx, |mut rx| async move { + let stream = futures::stream::unfold(&mut messages_rx, |rx| async move { let v = rx.recv().await; v.map(|v| (v, rx)) }); - let stream = match inactivity_timeout { + let outbound_stream = match inactivity_timeout { Some(timeout) => Either::Left( tokio_stream::StreamExt::timeout(stream, timeout).map_err(|_| MessagingProtocolError::Inactivity), ), None => Either::Right(stream.map(Ok)), }; - let outbound_count = metrics::outbound_message_count(&self.peer_node_id); - let stream = stream.map(|msg| { + let outbound_count = metrics::outbound_message_count(&peer_node_id); + let stream = outbound_stream.map(|msg| { outbound_count.inc(); msg.map(|mut out_msg| { event!(Level::DEBUG, "Message buffered for sending {}", out_msg); @@ -306,12 +303,47 @@ impl OutboundMessaging { }) }); + // Stop the stream as soon as the disconnection occurs, this allows the outbound stream to terminate as soon as + // the connection terminates rather than detecting the disconnect on the next message send. + let stream = stream.take_until(async move { + let on_disconnect = conn.on_disconnect(); + let peer_node_id = conn.peer_node_id().clone(); + // We drop the conn handle here BEFORE awaiting a disconnect to ensure that the outbound messaging isn't + // holding onto the handle keeping the connection alive + drop(conn); + on_disconnect.await; + debug!( + target: LOG_TARGET, + "Peer connection closed. Ending outbound messaging stream for peer {}.", peer_node_id + ) + }); + super::forward::Forward::new(stream, framed.sink_map_err(Into::into)).await?; + // Close so that the protocol handler does not resend to this session + messages_rx.close(); + // The stream ended, perhaps due to a disconnect, but there could be more messages left on the queue. Collect + // any messages and queue them up for retry. If we cannot reconnect to the peer, the queued messages will be + // dropped. + let mut retried_messages_count = 0; + while let Some(msg) = messages_rx.recv().await { + if self.retry_queue_tx.send(msg).is_err() { + // The messaging protocol has shut down, so let's exit too + break; + } + retried_messages_count += 1; + } + + if retried_messages_count > 0 { + debug!( + target: LOG_TARGET, + "{} pending message(s) were still queued after disconnect. Retrying them.", retried_messages_count + ); + } + debug!( target: LOG_TARGET, - "Direct message forwarding successfully completed for peer `{}`.", - self.peer_node_id.short_str() + "Direct message forwarding successfully completed for peer `{}`.", peer_node_id ); Ok(()) } @@ -319,8 +351,8 @@ impl OutboundMessaging { async fn fail_all_pending_messages(&mut self, reason: SendFailReason) { // Close the request channel so that we can read all the remaining messages and flush them // to a failed event - self.request_rx.close(); - while let Some(mut out_msg) = self.request_rx.recv().await { + self.messages_rx.close(); + while let Some(mut out_msg) = self.messages_rx.recv().await { out_msg.reply_fail(reason); let _ = self .messaging_events_tx diff --git a/comms/src/protocol/messaging/protocol.rs b/comms/src/protocol/messaging/protocol.rs index 2f9b031947..2564eb880a 100644 --- a/comms/src/protocol/messaging/protocol.rs +++ b/comms/src/protocol/messaging/protocol.rs @@ -116,6 +116,8 @@ pub struct MessagingProtocol { inbound_message_tx: mpsc::Sender, internal_messaging_event_tx: mpsc::Sender, internal_messaging_event_rx: mpsc::Receiver, + retry_queue_tx: mpsc::UnboundedSender, + retry_queue_rx: mpsc::UnboundedReceiver, shutdown_signal: ShutdownSignal, complete_trigger: Shutdown, } @@ -133,6 +135,8 @@ impl MessagingProtocol { ) -> Self { let (internal_messaging_event_tx, internal_messaging_event_rx) = mpsc::channel(INTERNAL_MESSAGING_EVENT_CHANNEL_SIZE); + let (retry_queue_tx, retry_queue_rx) = mpsc::unbounded_channel(); + Self { config, connectivity, @@ -142,6 +146,8 @@ impl MessagingProtocol { messaging_events_tx, internal_messaging_event_rx, internal_messaging_event_tx, + retry_queue_tx, + retry_queue_rx, inbound_message_tx, shutdown_signal, complete_trigger: Shutdown::new(), @@ -162,6 +168,16 @@ impl MessagingProtocol { self.handle_internal_messaging_event(event).await; }, + Some(msg) = self.retry_queue_rx.recv() => { + if let Err(err) = self.handle_retry_queue_messages(msg).await { + error!( + target: LOG_TARGET, + "Failed to retry outbound message because '{}'", + err + ); + } + }, + Some(req) = self.request_rx.recv() => { if let Err(err) = self.handle_request(req).await { error!( @@ -248,6 +264,12 @@ impl MessagingProtocol { Ok(()) } + async fn handle_retry_queue_messages(&mut self, msg: OutboundMessage) -> Result<(), MessagingProtocolError> { + debug!(target: LOG_TARGET, "Retrying outbound message ({})", msg); + self.send_message(msg).await?; + Ok(()) + } + // #[tracing::instrument(skip(self, out_msg), err)] async fn send_message(&mut self, out_msg: OutboundMessage) -> Result<(), MessagingProtocolError> { let peer_node_id = out_msg.peer_node_id.clone(); @@ -266,6 +288,7 @@ impl MessagingProtocol { self.internal_messaging_event_tx.clone(), peer_node_id, self.config.inactivity_timeout, + self.retry_queue_tx.clone(), ); break entry.insert(sender); }, @@ -294,10 +317,17 @@ impl MessagingProtocol { events_tx: mpsc::Sender, peer_node_id: NodeId, inactivity_timeout: Option, + retry_queue_tx: mpsc::UnboundedSender, ) -> mpsc::UnboundedSender { let (msg_tx, msg_rx) = mpsc::unbounded_channel(); - let outbound_messaging = - OutboundMessaging::new(connectivity, events_tx, msg_rx, peer_node_id, inactivity_timeout); + let outbound_messaging = OutboundMessaging::new( + connectivity, + events_tx, + msg_rx, + retry_queue_tx, + peer_node_id, + inactivity_timeout, + ); task::spawn(outbound_messaging.run()); msg_tx }