From d6be64f087341f31838d51dfbdfb067ed24895df Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Adam=20Cig=C3=A1nek?= Date: Tue, 10 Nov 2020 15:14:54 +0100 Subject: [PATCH] feat: ping peers on connection loss to detect if they went offline --- src/messages/mod.rs | 4 ++++ src/routing/approved.rs | 24 ++++++++++++++++++++++-- src/routing/comm.rs | 2 +- src/routing/command.rs | 5 +++++ src/routing/executor.rs | 22 ++++++++++++++++++---- src/routing/stage.rs | 7 +++++++ src/section/mod.rs | 4 ++-- 7 files changed, 59 insertions(+), 9 deletions(-) diff --git a/src/messages/mod.rs b/src/messages/mod.rs index bd2b6e67bc..451fd0900b 100644 --- a/src/messages/mod.rs +++ b/src/messages/mod.rs @@ -30,6 +30,10 @@ use serde::{Deserialize, Serialize}; use std::fmt::{self, Debug, Formatter}; use xor_name::Prefix; +// Message used to probe a peer connection. +// NOTE: ideally this would be empty, but that is currently treated as error by qp2p. +pub(crate) const PING: &[u8] = &[0]; + /// Message sent over the network. #[derive(Clone, Eq, Serialize, Deserialize)] pub(crate) struct Message { diff --git a/src/routing/approved.rs b/src/routing/approved.rs index 4491ed2bda..b57ee8df6d 100644 --- a/src/routing/approved.rs +++ b/src/routing/approved.rs @@ -19,7 +19,7 @@ use crate::{ message_filter::MessageFilter, messages::{ BootstrapResponse, JoinRequest, Message, MessageHash, MessageStatus, PlainMessage, Variant, - VerifyStatus, + VerifyStatus, PING, }, network::Network, node::Node, @@ -199,8 +199,28 @@ impl Approved { } } + pub fn handle_connection_lost(&self, addr: &SocketAddr) -> Option { + if !self.is_elder() { + return None; + } + + if let Some(peer) = self.section.find_joined_member_by_addr(addr) { + trace!("Lost connection to {}", peer); + } else { + return None; + } + + // Try to send a "ping" message to probe the peer connection. If it succeeds, the + // connection loss was just temporary. Otherwise the peer is assumed lost and we will vote + // it offline. + Some(Command::send_message_to_target( + addr, + Bytes::from_static(PING), + )) + } + pub fn handle_peer_lost(&self, addr: &SocketAddr) -> Result> { - let name = if let Some(peer) = self.section.find_member_from_addr(addr) { + let name = if let Some(peer) = self.section.find_joined_member_by_addr(addr) { debug!("Lost known peer {}", peer); *peer.name() } else { diff --git a/src/routing/comm.rs b/src/routing/comm.rs index ae38e0bd3b..e1b7eaf039 100644 --- a/src/routing/comm.rs +++ b/src/routing/comm.rs @@ -30,7 +30,7 @@ use tokio::{ const CONNECTIONS_CACHE_SIZE: usize = 1024; /// Maximal number of resend attempts to the same target. -pub const RESEND_MAX_ATTEMPTS: u8 = 3; +pub(crate) const RESEND_MAX_ATTEMPTS: u8 = 3; // Communication component of the node to interact with other nodes. pub(crate) struct Comm { diff --git a/src/routing/command.rs b/src/routing/command.rs index 4a6db3def2..cf5008ad45 100644 --- a/src/routing/command.rs +++ b/src/routing/command.rs @@ -36,6 +36,8 @@ pub(crate) enum Command { }, /// Handle a timeout previously scheduled with `ScheduleTimeout`. HandleTimeout(u64), + /// Handle lost connection to a peer. + HandleConnectionLost(SocketAddr), /// Handle peer that's been detected as lost. HandlePeerLost(SocketAddr), /// Handle vote cast either by us or some other peer. @@ -110,6 +112,9 @@ impl Debug for Command { .field("message", message) .finish(), Self::HandleTimeout(token) => f.debug_tuple("HandleTimeout").field(token).finish(), + Self::HandleConnectionLost(addr) => { + f.debug_tuple("HandleConnectionLost").field(addr).finish() + } Self::HandlePeerLost(addr) => f.debug_tuple("HandlePeerLost").field(addr).finish(), Self::HandleVote { vote, proof_share } => f .debug_struct("HandleVote") diff --git a/src/routing/executor.rs b/src/routing/executor.rs index 6c7cde764e..7ee0440eb3 100644 --- a/src/routing/executor.rs +++ b/src/routing/executor.rs @@ -10,7 +10,10 @@ use super::{ comm::{ConnectionEvent, IncomingConnections}, Command, Stage, }; -use crate::{event::Event, messages::Message}; +use crate::{ + event::Event, + messages::{Message, PING}, +}; use bytes::Bytes; use std::{net::SocketAddr, sync::Arc}; use tokio::{sync::oneshot, task}; @@ -33,7 +36,7 @@ impl Executor { let _ = task::spawn(async move { tokio::select! { - _ = handle_incoming_messages(stage, incoming_conns) => (), + _ = handle_events(stage, incoming_conns) => (), _ = cancel_rx => (), } }); @@ -44,7 +47,7 @@ impl Executor { } } -async fn handle_incoming_messages(stage: Arc, mut incoming_conns: IncomingConnections) { +async fn handle_events(stage: Arc, mut incoming_conns: IncomingConnections) { while let Some(event) = incoming_conns.next().await { match event { ConnectionEvent::Received(qp2p::Message::UniStream { bytes, src, .. }) => { @@ -56,6 +59,12 @@ async fn handle_incoming_messages(stage: Arc, mut incoming_conns: Incomin // Since it's arriving on a uni-stream we treat it as a Node // message which needs to be processed by us, as well as // potentially reported to the event stream consumer. + + // Ignore pings. + if bytes == PING { + continue; + } + spawn_node_message_handler(stage.clone(), bytes, src); } ConnectionEvent::Received(qp2p::Message::BiStream { @@ -82,7 +91,12 @@ async fn handle_incoming_messages(stage: Arc, mut incoming_conns: Incomin stage.send_event(event).await; } - ConnectionEvent::Disconnected(addr) => trace!("Connection lost: {}", addr), + ConnectionEvent::Disconnected(addr) => { + let _ = stage + .clone() + .handle_commands(Command::HandleConnectionLost(addr)) + .await; + } } } } diff --git a/src/routing/stage.rs b/src/routing/stage.rs index 5e11d272ba..6eb4014a5b 100644 --- a/src/routing/stage.rs +++ b/src/routing/stage.rs @@ -76,6 +76,13 @@ impl Stage { Command::HandleConsensus { vote, proof } => { self.state.lock().await.handle_consensus(vote, proof) } + Command::HandleConnectionLost(addr) => Ok(self + .state + .lock() + .await + .handle_connection_lost(&addr) + .into_iter() + .collect()), Command::HandlePeerLost(addr) => self.state.lock().await.handle_peer_lost(&addr), Command::HandleDkgParticipationResult { dkg_key, diff --git a/src/section/mod.rs b/src/section/mod.rs index 0d39822920..28afe3aeb3 100644 --- a/src/section/mod.rs +++ b/src/section/mod.rs @@ -245,9 +245,9 @@ impl Section { .filter(move |peer| !self.is_elder(peer.name())) } - pub fn find_member_from_addr(&self, addr: &SocketAddr) -> Option<&Peer> { + pub fn find_joined_member_by_addr(&self, addr: &SocketAddr) -> Option<&Peer> { self.members - .all() + .joined() .find(|info| info.peer.addr() == addr) .map(|info| &info.peer) }