Skip to content

Commit

Permalink
fix: end stale outbound queue immediately on disconnect, retry outbou…
Browse files Browse the repository at this point in the history
…nd messages (#3664)

Description
---
- immediately end the outbound messaging stream on peer connection disconnect
- if messages remain in the outbound channel, flush them to a retry queue 

Motivation and Context
---

Due to yamux substream internals, the outbound stream does not indicate it has ended until we attempt to write a message. Causing the outbound stream to remain active (i.e. available for sending) when it cannot send a message. This message and potentially any others that are queued up will then be dropped. This is rectified by cutting off the outbound queue channel as soon as a connection is disconnected and 'rerouting' the queued messages (if any) to be retried. Retry will attempt to reconnect to the peer and failing so, will drop the queued messages since there is nothing we can do in this case.

This increases messaging reliability.

How Has This Been Tested?
---

Manually by sending transaction and ping messages between wallets and base nodes that are banned/shutdown
  • Loading branch information
sdbondi authored Jan 3, 2022
1 parent 18f6e29 commit 576a00c
Show file tree
Hide file tree
Showing 3 changed files with 125 additions and 60 deletions.
15 changes: 9 additions & 6 deletions comms/src/connection_manager/peer_connection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@

use std::{
fmt,
future::Future,
sync::{
atomic::{AtomicUsize, Ordering},
Arc,
Expand Down Expand Up @@ -185,6 +186,12 @@ impl PeerConnection {
!self.request_tx.is_closed()
}

/// Returns a owned future that resolves on disconnection
pub fn on_disconnect(&self) -> impl Future<Output = ()> + 'static {
let request_tx = self.request_tx.clone();
async move { request_tx.closed().await }
}

pub fn age(&self) -> Duration {
self.started_at.elapsed()
}
Expand Down Expand Up @@ -354,7 +361,7 @@ impl PeerConnectionActor {
match maybe_request {
Some(request) => self.handle_request(request).await,
None => {
debug!(target: LOG_TARGET, "[{}] All peer connection handled dropped closing the connection", self);
debug!(target: LOG_TARGET, "[{}] All peer connection handles dropped closing the connection", self);
break;
}
}
Expand Down Expand Up @@ -468,11 +475,7 @@ impl PeerConnectionActor {
}

async fn notify_event(&mut self, event: ConnectionManagerEvent) {
log_if_error!(
target: LOG_TARGET,
self.event_notifier.send(event).await,
"Failed to send connection manager notification because '{}'",
);
let _ = self.event_notifier.send(event).await;
}

/// Disconnect this peer connection.
Expand Down
136 changes: 84 additions & 52 deletions comms/src/protocol/messaging/outbound.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@
use std::time::{Duration, Instant};

use futures::{future::Either, SinkExt, StreamExt, TryStreamExt};
use tokio::sync::mpsc as tokiompsc;
use tokio::sync::mpsc;
use tracing::{debug, error, event, span, Instrument, Level};

use super::{error::MessagingProtocolError, metrics, MessagingEvent, MessagingProtocol, SendFailReason};
Expand All @@ -44,24 +44,27 @@ const MAX_SEND_RETRIES: usize = 1;

pub struct OutboundMessaging {
connectivity: ConnectivityRequester,
request_rx: tokiompsc::UnboundedReceiver<OutboundMessage>,
messaging_events_tx: tokiompsc::Sender<MessagingEvent>,
messages_rx: mpsc::UnboundedReceiver<OutboundMessage>,
messaging_events_tx: mpsc::Sender<MessagingEvent>,
retry_queue_tx: mpsc::UnboundedSender<OutboundMessage>,
peer_node_id: NodeId,
inactivity_timeout: Option<Duration>,
}

impl OutboundMessaging {
pub fn new(
connectivity: ConnectivityRequester,
messaging_events_tx: tokiompsc::Sender<MessagingEvent>,
request_rx: tokiompsc::UnboundedReceiver<OutboundMessage>,
messaging_events_tx: mpsc::Sender<MessagingEvent>,
messages_rx: mpsc::UnboundedReceiver<OutboundMessage>,
retry_queue_tx: mpsc::UnboundedSender<OutboundMessage>,
peer_node_id: NodeId,
inactivity_timeout: Option<Duration>,
) -> Self {
Self {
connectivity,
request_rx,
messages_rx,
messaging_events_tx,
retry_queue_tx,
peer_node_id,
inactivity_timeout,
}
Expand All @@ -77,8 +80,7 @@ impl OutboundMessaging {
async move {
debug!(
target: LOG_TARGET,
"Attempting to dial peer '{}' if required",
self.peer_node_id.short_str()
"Attempting to dial peer '{}' if required", self.peer_node_id
);
let peer_node_id = self.peer_node_id.clone();
let messaging_events_tx = self.messaging_events_tx.clone();
Expand All @@ -87,25 +89,23 @@ impl OutboundMessaging {
event!(
Level::DEBUG,
"Outbound messaging for peer '{}' has stopped because the stream was closed",
peer_node_id.short_str()
peer_node_id
);

debug!(
target: LOG_TARGET,
"Outbound messaging for peer '{}' has stopped because the stream was closed",
peer_node_id.short_str()
"Outbound messaging for peer '{}' has stopped because the stream was closed", peer_node_id
);
},
Err(MessagingProtocolError::Inactivity) => {
event!(
Level::DEBUG,
"Outbound messaging for peer '{}' has stopped because it was inactive",
peer_node_id.short_str()
peer_node_id
);
debug!(
target: LOG_TARGET,
"Outbound messaging for peer '{}' has stopped because it was inactive",
peer_node_id.short_str()
"Outbound messaging for peer '{}' has stopped because it was inactive", peer_node_id
);
},
Err(MessagingProtocolError::PeerDialFailed(err)) => {
Expand Down Expand Up @@ -135,11 +135,11 @@ impl OutboundMessaging {
async fn run_inner(mut self) -> Result<(), MessagingProtocolError> {
let mut attempts = 0;

let substream = loop {
let (conn, substream) = loop {
match self.try_establish().await {
Ok(substream) => {
Ok(conn_and_substream) => {
event!(Level::DEBUG, "Substream established");
break substream;
break conn_and_substream;
},
Err(err) => {
if attempts >= MAX_SEND_RETRIES {
Expand All @@ -158,7 +158,7 @@ impl OutboundMessaging {
},
}
};
self.start_forwarding_messages(substream).await?;
self.start_forwarding_messages(conn, substream).await?;

Ok(())
}
Expand All @@ -178,16 +178,14 @@ impl OutboundMessaging {
target: LOG_TARGET,
"Dial was cancelled for peer '{}'. This is probably because of connection tie-breaking. \
Retrying...",
self.peer_node_id.short_str(),
self.peer_node_id,
);
continue;
},
Err(err) => {
debug!(
target: LOG_TARGET,
"MessagingProtocol failed to dial peer '{}' because '{:?}'",
self.peer_node_id.short_str(),
err
"MessagingProtocol failed to dial peer '{}' because '{:?}'", self.peer_node_id, err
);

break Err(MessagingProtocolError::PeerDialFailed(err));
Expand All @@ -199,7 +197,9 @@ impl OutboundMessaging {
.await
}

async fn try_establish(&mut self) -> Result<NegotiatedSubstream<Substream>, MessagingProtocolError> {
async fn try_establish(
&mut self,
) -> Result<(PeerConnection, NegotiatedSubstream<Substream>), MessagingProtocolError> {
let span = span!(
Level::DEBUG,
"establish_connection",
Expand All @@ -208,32 +208,30 @@ impl OutboundMessaging {
async move {
debug!(
target: LOG_TARGET,
"Attempting to establish messaging protocol connection to peer `{}`",
self.peer_node_id.short_str()
"Attempting to establish messaging protocol connection to peer `{}`", self.peer_node_id
);
let start = Instant::now();
let conn = self.try_dial_peer().await?;
let mut conn = self.try_dial_peer().await?;
debug!(
target: LOG_TARGET,
"Connection succeeded for peer `{}` in {:.0?}",
self.peer_node_id.short_str(),
self.peer_node_id,
start.elapsed()
);
let substream = self.try_open_substream(conn).await?;
let substream = self.try_open_substream(&mut conn).await?;
debug!(
target: LOG_TARGET,
"Substream established for peer `{}`",
self.peer_node_id.short_str(),
"Substream established for peer `{}`", self.peer_node_id,
);
Ok(substream)
Ok((conn, substream))
}
.instrument(span)
.await
}

async fn try_open_substream(
&mut self,
mut conn: PeerConnection,
conn: &mut PeerConnection,
) -> Result<NegotiatedSubstream<Substream>, MessagingProtocolError> {
let span = span!(
Level::DEBUG,
Expand All @@ -247,7 +245,7 @@ impl OutboundMessaging {
debug!(
target: LOG_TARGET,
"MessagingProtocol failed to open a substream to peer '{}' because '{}'",
self.peer_node_id.short_str(),
self.peer_node_id,
err
);
Err(err.into())
Expand All @@ -260,44 +258,43 @@ impl OutboundMessaging {

async fn start_forwarding_messages(
self,
conn: PeerConnection,
substream: NegotiatedSubstream<Substream>,
) -> Result<(), MessagingProtocolError> {
let Self {
mut messages_rx,
inactivity_timeout,
peer_node_id,
..
} = self;
let span = span!(
Level::DEBUG,
"start_forwarding_messages",
node_id = self.peer_node_id.to_string().as_str()
node_id = peer_node_id.to_string().as_str()
);
let _enter = span.enter();
debug!(
target: LOG_TARGET,
"Starting direct message forwarding for peer `{}`",
self.peer_node_id.short_str()
"Starting direct message forwarding for peer `{}`", peer_node_id
);
let substream = substream.stream;

let framed = MessagingProtocol::framed(substream);

let Self {
request_rx,
inactivity_timeout,
..
} = self;
let framed = MessagingProtocol::framed(substream.stream);

// Convert unbounded channel to a stream
let stream = futures::stream::unfold(request_rx, |mut rx| async move {
let stream = futures::stream::unfold(&mut messages_rx, |rx| async move {
let v = rx.recv().await;
v.map(|v| (v, rx))
});

let stream = match inactivity_timeout {
let outbound_stream = match inactivity_timeout {
Some(timeout) => Either::Left(
tokio_stream::StreamExt::timeout(stream, timeout).map_err(|_| MessagingProtocolError::Inactivity),
),
None => Either::Right(stream.map(Ok)),
};

let outbound_count = metrics::outbound_message_count(&self.peer_node_id);
let stream = stream.map(|msg| {
let outbound_count = metrics::outbound_message_count(&peer_node_id);
let stream = outbound_stream.map(|msg| {
outbound_count.inc();
msg.map(|mut out_msg| {
event!(Level::DEBUG, "Message buffered for sending {}", out_msg);
Expand All @@ -306,21 +303,56 @@ impl OutboundMessaging {
})
});

// Stop the stream as soon as the disconnection occurs, this allows the outbound stream to terminate as soon as
// the connection terminates rather than detecting the disconnect on the next message send.
let stream = stream.take_until(async move {
let on_disconnect = conn.on_disconnect();
let peer_node_id = conn.peer_node_id().clone();
// We drop the conn handle here BEFORE awaiting a disconnect to ensure that the outbound messaging isn't
// holding onto the handle keeping the connection alive
drop(conn);
on_disconnect.await;
debug!(
target: LOG_TARGET,
"Peer connection closed. Ending outbound messaging stream for peer {}.", peer_node_id
)
});

super::forward::Forward::new(stream, framed.sink_map_err(Into::into)).await?;

// Close so that the protocol handler does not resend to this session
messages_rx.close();
// The stream ended, perhaps due to a disconnect, but there could be more messages left on the queue. Collect
// any messages and queue them up for retry. If we cannot reconnect to the peer, the queued messages will be
// dropped.
let mut retried_messages_count = 0;
while let Some(msg) = messages_rx.recv().await {
if self.retry_queue_tx.send(msg).is_err() {
// The messaging protocol has shut down, so let's exit too
break;
}
retried_messages_count += 1;
}

if retried_messages_count > 0 {
debug!(
target: LOG_TARGET,
"{} pending message(s) were still queued after disconnect. Retrying them.", retried_messages_count
);
}

debug!(
target: LOG_TARGET,
"Direct message forwarding successfully completed for peer `{}`.",
self.peer_node_id.short_str()
"Direct message forwarding successfully completed for peer `{}`.", peer_node_id
);
Ok(())
}

async fn fail_all_pending_messages(&mut self, reason: SendFailReason) {
// Close the request channel so that we can read all the remaining messages and flush them
// to a failed event
self.request_rx.close();
while let Some(mut out_msg) = self.request_rx.recv().await {
self.messages_rx.close();
while let Some(mut out_msg) = self.messages_rx.recv().await {
out_msg.reply_fail(reason);
let _ = self
.messaging_events_tx
Expand Down
Loading

0 comments on commit 576a00c

Please sign in to comment.