diff --git a/src/error.rs b/src/error.rs index efef346d26..9803de15b5 100644 --- a/src/error.rs +++ b/src/error.rs @@ -6,7 +6,6 @@ // KIND, either express or implied. Please review the Licences for the specific language governing // permissions and limitations relating to use of the SAFE Network Software. -use crate::qp2p::Error as QuicP2pError; use err_derive::Error; /// The type returned by the sn_routing message handling methods. @@ -23,7 +22,7 @@ pub enum Error { #[error(display = "Cannot route.")] CannotRoute, #[error(display = "Network layer error: {}", _0)] - Network(#[error(source)] QuicP2pError), + Network(#[error(source)] qp2p::Error), #[error(display = "The node is not in a state to handle the action.")] InvalidState, #[error(display = "Bincode error: {}", _0)] diff --git a/src/lib.rs b/src/lib.rs index a1923cef03..cada5c1215 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -63,6 +63,8 @@ unused_results, clippy::needless_borrow )] +// FIXME: find a way to not need this. +#![type_length_limit = "2174929"] #[macro_use] extern crate log; @@ -82,9 +84,9 @@ pub use self::{ location::{DstLocation, SrcLocation}, network_params::NetworkParams, node::{EventStream, Node, NodeConfig}, - qp2p::Config as TransportConfig, section::{SectionProofChain, MIN_AGE}, }; +pub use qp2p::Config as TransportConfig; pub use xor_name::{Prefix, XorName, XOR_NAME_LEN}; // TODO remove pub on API update /// sn_routing events. @@ -152,12 +154,6 @@ const RECOMMENDED_SECTION_SIZE: usize = 60; /// Number of elders per section. const ELDER_SIZE: usize = 7; -// Quic-p2p -#[cfg(feature = "mock")] -use mock_qp2p as qp2p; -#[cfg(not(feature = "mock"))] -use qp2p::{self}; - #[cfg(test)] mod tests { use super::{QUORUM_DENOMINATOR, QUORUM_NUMERATOR}; diff --git a/src/node/event_stream.rs b/src/node/event_stream.rs index f5ce10fba6..ffa4eb4f63 100644 --- a/src/node/event_stream.rs +++ b/src/node/event_stream.rs @@ -42,7 +42,8 @@ fn spawn_connections_handler(stage: Arc>, mut incoming_conns: Incom let _ = tokio::spawn(async move { while let Some(incoming_msgs) = incoming_conns.next().await { trace!( - "New connection established by peer {}", + "{}New connection established by peer {}", + stage.lock().await.log_ident(), incoming_msgs.remote_addr() ); spawn_messages_handler(stage.clone(), incoming_msgs) @@ -57,7 +58,8 @@ fn spawn_messages_handler(stage: Arc>, mut incoming_msgs: IncomingM match msg { QuicP2pMsg::UniStream { bytes, src, .. } => { trace!( - "New message ({} bytes) received on a uni-stream from: {}", + "{}New message ({} bytes) received on a uni-stream from: {}", + stage.lock().await.log_ident(), bytes.len(), src ); @@ -73,7 +75,8 @@ fn spawn_messages_handler(stage: Arc>, mut incoming_msgs: IncomingM recv, } => { trace!( - "New message ({} bytes) received on a bi-stream from: {}", + "{}New message ({} bytes) received on a bi-stream from: {}", + stage.lock().await.log_ident(), bytes.len(), src ); @@ -92,6 +95,12 @@ fn spawn_messages_handler(stage: Arc>, mut incoming_msgs: IncomingM } } } + + trace!( + "{}Connection to peer {} closed", + stage.lock().await.log_ident(), + incoming_msgs.remote_addr() + ); }); } @@ -99,10 +108,13 @@ fn spawn_node_message_handler(stage: Arc>, msg_bytes: Bytes, sender let _ = tokio::spawn(async move { match Message::from_bytes(&msg_bytes) { Err(error) => { - debug!("Failed to deserialize message: {:?}", error); + debug!( + "{}Failed to deserialize message: {:?}", + stage.lock().await.log_ident(), + error + ); } Ok(msg) => { - trace!("try handle message {:?}", msg); // Process the message according to our stage if let Err(err) = stage .lock() @@ -111,8 +123,10 @@ fn spawn_node_message_handler(stage: Arc>, msg_bytes: Bytes, sender .await { error!( - "Error encountered when processing message {:?}: {}", - msg, err + "{}Error encountered when processing message {:?}: {}", + stage.lock().await.log_ident(), + msg, + err ); } } @@ -124,7 +138,11 @@ fn spawn_timer_handler(stage: Arc>, mut rx: mpsc::UnboundedReceiver let _ = tokio::spawn(async move { while let Some(token) = rx.recv().await { if let Err(err) = stage.lock().await.process_timeout(token).await { - error!("Error encountered when processing timeout: {}", err); + error!( + "{}Error encountered when processing timeout: {}", + stage.lock().await.log_ident(), + err + ); } } }); diff --git a/src/node/stage/approved.rs b/src/node/stage/approved.rs index fdd1376cfe..d872fbe22e 100644 --- a/src/node/stage/approved.rs +++ b/src/node/stage/approved.rs @@ -88,7 +88,6 @@ impl Approved { sender: SocketAddr, msg: Message, ) -> Result> { - trace!("Got {:?}", msg); // Filter messages which were already handled if self.msg_filter.contains_incoming(&msg) { trace!("not handling message - already handled: {:?}", msg); @@ -97,6 +96,7 @@ impl Approved { match self.decide_message_status(&msg)? { MessageStatus::Useful => { + trace!("Useful message from {}: {:?}", sender, msg); self.update_section_knowledge(&msg).await?; self.handle_useful_message(Some(sender), msg).await } @@ -134,8 +134,6 @@ impl Approved { // Cast a vote that doesn't need total order, only section consensus. #[async_recursion] async fn cast_unordered_vote(&mut self, vote: Vote) -> Result<()> { - trace!("Vote for {:?}", vote); - let key_share = self.section_keys_provider.key_share()?; trace!( @@ -323,10 +321,6 @@ impl Approved { fn decide_message_status(&self, msg: &Message) -> Result { let our_id = self.node_info.full_id.public_id(); - trace!( - "Deciding message status based upon variant: {:?}", - msg.variant() - ); match msg.variant() { Variant::NeighbourInfo { .. } => { if !self.is_our_elder(our_id) { diff --git a/src/node/stage/comm.rs b/src/node/stage/comm.rs index 1ec0dcb9c4..cb61663250 100644 --- a/src/node/stage/comm.rs +++ b/src/node/stage/comm.rs @@ -7,47 +7,51 @@ // permissions and limitations relating to use of the SAFE Network Software. use crate::{ - error::{Error, Result}, + error::Result, id::FullId, location::DstLocation, messages::{Message, Variant}, }; use bytes::Bytes; -use futures::lock::Mutex; -use hex_fmt::HexFmt; +use futures::{ + lock::Mutex, + stream::{FuturesUnordered, StreamExt}, +}; +use itertools::Itertools; use lru_time_cache::LruCache; use qp2p::{Config, Connection, Endpoint, IncomingConnections, QuicP2p}; -use std::{net::SocketAddr, sync::Arc}; +use std::{net::SocketAddr, slice, sync::Arc, time::Duration}; +use tokio::time; // Number of Connections to maintain in the cache const CONNECTIONS_CACHE_SIZE: usize = 1024; +/// Maximal number of resend attempts to the same target. +pub const RESEND_MAX_ATTEMPTS: u8 = 3; +/// Delay before attempting to resend a previously failed message. +pub const RESEND_DELAY: Duration = Duration::from_secs(10); + // Communication component of the node to interact with other nodes. #[derive(Clone)] pub(crate) struct Comm { - quic_p2p: Arc, - endpoint: Arc, - node_conns: Arc>>, + inner: Arc, } impl Comm { pub async fn new(transport_config: Config) -> Result { - let quic_p2p = Arc::new(QuicP2p::with_config( - Some(transport_config), - Default::default(), - true, - )?); + let quic_p2p = QuicP2p::with_config(Some(transport_config), Default::default(), true)?; // Don't bootstrap, just create an endpoint where to listen to // the incoming messages from other nodes. - let endpoint = Arc::new(quic_p2p.new_endpoint()?); - - let node_conns = Arc::new(Mutex::new(LruCache::with_capacity(CONNECTIONS_CACHE_SIZE))); + let endpoint = quic_p2p.new_endpoint()?; + let node_conns = Mutex::new(LruCache::with_capacity(CONNECTIONS_CACHE_SIZE)); Ok(Self { - quic_p2p, - endpoint, - node_conns, + inner: Arc::new(Inner { + _quic_p2p: quic_p2p, + endpoint, + node_conns, + }), }) } @@ -55,21 +59,20 @@ impl Comm { let mut quic_p2p = QuicP2p::with_config(Some(transport_config), Default::default(), true)?; // Bootstrap to the network returning the connection to a node. - let (endpoint, connection) = quic_p2p.bootstrap().await?; - - let quic_p2p = Arc::new(quic_p2p); - let endpoint = Arc::new(endpoint); - - let addr = connection.remote_address(); + let (endpoint, conn) = quic_p2p.bootstrap().await?; + let addr = conn.remote_address(); let mut node_conns = LruCache::with_capacity(CONNECTIONS_CACHE_SIZE); - let _ = node_conns.insert(addr, connection); + let _ = node_conns.insert(addr, Arc::new(conn)); + let node_conns = Mutex::new(node_conns); Ok(( Self { - quic_p2p, - endpoint, - node_conns: Arc::new(Mutex::new(node_conns)), + inner: Arc::new(Inner { + _quic_p2p: quic_p2p, + endpoint, + node_conns, + }), }, addr, )) @@ -77,11 +80,11 @@ impl Comm { /// Starts listening for connections returning a stream where to read them from. pub fn listen(&self) -> Result { - Ok(self.endpoint.listen()?) + Ok(self.inner.endpoint.listen()?) } pub fn our_connection_info(&self) -> Result { - self.endpoint.our_endpoint().map_err(|err| { + self.inner.endpoint.our_endpoint().map_err(|err| { debug!("Failed to retrieve our connection info: {:?}", err); err.into() }) @@ -89,43 +92,51 @@ impl Comm { pub async fn send_message_to_targets( &self, - conn_infos: &[SocketAddr], + recipients: &[SocketAddr], delivery_group_size: usize, msg: Bytes, ) -> Result<()> { - if conn_infos.len() < delivery_group_size { + if recipients.len() < delivery_group_size { warn!( - "Less than delivery_group_size valid targets! delivery_group_size = {}; targets = {:?}; msg = {:10}", + "Less than delivery_group_size valid recipients - delivery_group_size: {}, recipients: {:?}", delivery_group_size, - conn_infos, - HexFmt(&msg) + recipients, ); } - // TODO: retry upon failures. Timeout could perhaps still - // be handlded by user...? - trace!( - "Sending message to {:?}", - &conn_infos[..delivery_group_size.min(conn_infos.len())] - ); + let mut state = SendState::new(recipients, delivery_group_size); + let mut tasks = FuturesUnordered::new(); - // initially only send to delivery_group_size targets - for addr in conn_infos.iter().take(delivery_group_size) { - // NetworkBytes is refcounted and cheap to clone. - self.send_message_to_target(addr, msg.clone()).await?; + loop { + while let Some((addr, failed)) = state.next() { + trace!("Sending message to {}", addr); + tasks.push(self.inner.send_with_delay(addr, msg.clone(), failed)); + } + + if let Some((addr, result)) = tasks.next().await { + if result.is_ok() { + trace!("Sending message to {} succeeded", addr); + state.success(&addr); + } else { + trace!("Sending message to {} failed", addr); + state.failure(&addr); + } + } else { + break; + } } + trace!( + "Sending message finished (failed recipients: [{}])", + state.failed().format(", ") + ); + Ok(()) } pub async fn send_message_to_target(&self, recipient: &SocketAddr, msg: Bytes) -> Result<()> { - trace!("Sending message to target {:?}", recipient); - // Cache the Connection to the node or obtain the already cached one - let mut node_conns = self.node_conns.lock().await; - let conn = node_conns - .entry(*recipient) - .or_insert(self.endpoint.connect_to(recipient).await?); - conn.send_uni(msg).await.map_err(Error::Network) + self.send_message_to_targets(slice::from_ref(recipient), 1, msg) + .await } pub async fn send_direct_message( @@ -139,3 +150,135 @@ impl Comm { .await } } + +struct Inner { + _quic_p2p: QuicP2p, + endpoint: Endpoint, + node_conns: Mutex>>, +} + +impl Inner { + async fn send(&self, recipient: &SocketAddr, msg: Bytes) -> Result<()> { + // Cache the Connection to the node or obtain the already cached one + // Note: not using the entry API to avoid holding the mutex longer than necessary. + let conn = self.node_conns.lock().await.get(recipient).cloned(); + let conn = if let Some(conn) = conn { + conn + } else { + let conn = self.endpoint.connect_to(recipient).await?; + let conn = Arc::new(conn); + let _ = self + .node_conns + .lock() + .await + .insert(*recipient, Arc::clone(&conn)); + + conn + }; + + conn.send_uni(msg).await?; + + Ok(()) + } + + async fn send_with_delay( + &self, + recipient: SocketAddr, + msg: Bytes, + delay: bool, + ) -> (SocketAddr, Result<()>) { + if delay { + time::delay_for(RESEND_DELAY).await; + } + + let result = self.send(&recipient, msg).await; + (recipient, result) + } +} + +struct SendState { + recipients: Vec, + remaining: usize, +} + +struct Recipient { + addr: SocketAddr, + sending: bool, + attempt: u8, +} + +impl SendState { + fn new(recipients: &[SocketAddr], delivery_group_size: usize) -> Self { + Self { + recipients: recipients + .iter() + .map(|addr| Recipient { + addr: *addr, + sending: false, + attempt: 0, + }) + .collect(), + remaining: delivery_group_size, + } + } + + // Returns the next recipient to sent to. + fn next(&mut self) -> Option<(SocketAddr, bool)> { + let active = self + .recipients + .iter() + .filter(|recipient| recipient.sending) + .count(); + + if active >= self.remaining { + return None; + } + + let recipient = self + .recipients + .iter_mut() + .filter(|recipient| !recipient.sending && recipient.attempt < RESEND_MAX_ATTEMPTS) + .min_by_key(|recipient| recipient.attempt)?; + + recipient.attempt += 1; + recipient.sending = true; + + Some((recipient.addr, recipient.attempt > 1)) + } + + // Marks the recipient as failed. + fn failure(&mut self, addr: &SocketAddr) { + if let Some(recipient) = self + .recipients + .iter_mut() + .find(|recipient| recipient.addr == *addr) + { + recipient.sending = false; + } + } + + // Marks the recipient as successful. + fn success(&mut self, addr: &SocketAddr) { + if let Some(index) = self + .recipients + .iter() + .position(|recipient| recipient.addr == *addr) + { + let _ = self.recipients.swap_remove(index); + self.remaining -= 1; + } + } + + // // Did we successfuly send to the required number of recipients? + // fn complete(&self) -> bool { + // self.remaining == 0 + // } + + // Returns all failed recipients. + fn failed(&self) -> impl Iterator { + self.recipients + .iter() + .filter(|recipient| !recipient.sending && recipient.attempt >= RESEND_MAX_ATTEMPTS) + .map(|recipient| &recipient.addr) + } +} diff --git a/src/node/stage/mod.rs b/src/node/stage/mod.rs index 44a79c7a9d..82c1a74bec 100644 --- a/src/node/stage/mod.rs +++ b/src/node/stage/mod.rs @@ -166,10 +166,15 @@ impl Stage { let (timer_tx, timer_rx) = mpsc::unbounded_channel(); let timer = Timer::new(timer_tx); - let state = - Bootstrapping::new(None, vec![addr], comm.clone(), node_info.clone(), timer).await?; - let state = State::Bootstrapping(state); - let (stage, incomming_connections) = Self::new(state, comm)?; + let (stage, incomming_connections) = + log_ident::set(format!("{} ", node_info.name()), async { + let state = + Bootstrapping::new(None, vec![addr], comm.clone(), node_info.clone(), timer) + .await?; + let state = State::Bootstrapping(state); + Self::new(state, comm) + }) + .await?; Ok((stage, incomming_connections, timer_rx, events_rx)) } @@ -228,6 +233,8 @@ impl Stage { /// Process a message accordng to current stage. pub async fn process_message(&mut self, sender: SocketAddr, msg: Message) -> Result<()> { log_ident::set(self.log_ident(), async { + trace!("try handle {:?} from {}", msg, sender); + if !self.in_dst_location(&msg).await? { return Ok(()); } @@ -325,7 +332,7 @@ impl Stage { } } - fn log_ident(&self) -> String { + pub(crate) fn log_ident(&self) -> String { match &self.state { State::Bootstrapping(state) => format!("{}(?) ", state.node_info.name()), State::Joining(state) => format!( diff --git a/tests/bootstrap.rs b/tests/bootstrap.rs index b190435762..3af8b7ea94 100644 --- a/tests/bootstrap.rs +++ b/tests/bootstrap.rs @@ -29,8 +29,8 @@ async fn test_genesis_node() -> Result<()> { assert_eq!(*full_id.public_id(), node.id().await); - expect_next_event!(event_stream, Event::Connected(Connected::First))?; - expect_next_event!(event_stream, Event::PromotedToElder)?; + expect_next_event!(event_stream, Event::Connected(Connected::First)); + expect_next_event!(event_stream, Event::PromotedToElder); assert!(node.is_elder().await); @@ -43,9 +43,9 @@ async fn test_node_bootstrapping() -> Result<()> { // spawn genesis node events listener let genesis_handler = tokio::spawn(async move { - expect_next_event!(event_stream, Event::Connected(Connected::First))?; - expect_next_event!(event_stream, Event::PromotedToElder)?; - expect_next_event!(event_stream, Event::InfantJoined { age: 4, name: _ })?; + expect_next_event!(event_stream, Event::Connected(Connected::First)); + expect_next_event!(event_stream, Event::PromotedToElder); + expect_next_event!(event_stream, Event::InfantJoined { age: 4, name: _ }); // TODO: Should we expect EldersChanged event too ?? // expect_next_event!(event_stream, Event::EldersChanged { .. })?; Ok::<(), Error>(()) @@ -58,7 +58,7 @@ async fn test_node_bootstrapping() -> Result<()> { .create() .await?; - expect_next_event!(event_stream, Event::Connected(Connected::First))?; + expect_next_event!(event_stream, Event::Connected(Connected::First)); // just await for genesis node to finish receiving all events genesis_handler.await??; @@ -110,7 +110,7 @@ async fn test_section_bootstrapping() -> Result<()> { .create() .await?; - expect_next_event!(event_stream, Event::Connected(Connected::First))?; + expect_next_event!(event_stream, Event::Connected(Connected::First)); Ok::(node) }); diff --git a/tests/drop.rs b/tests/drop.rs index d3ea55ab61..7b3717ce7e 100644 --- a/tests/drop.rs +++ b/tests/drop.rs @@ -10,7 +10,8 @@ mod utils; use self::utils::*; use anyhow::{ensure, format_err, Result}; -use sn_routing::{event::Event, NetworkParams}; +use bytes::Bytes; +use sn_routing::{event::Event, DstLocation, NetworkParams, SrcLocation}; use tokio::time; #[tokio::test] @@ -19,6 +20,15 @@ async fn test_node_drop() -> Result<()> { // Drop one node let dropped_name = nodes.remove(1).0.name().await; + + // Send a message to the dropped node. This will cause us detect it as gone. + let src = SrcLocation::Node(nodes[0].0.name().await); + let dst = DstLocation::Node(dropped_name); + nodes[0] + .0 + .send_message(src, dst, Bytes::from_static(b"ping")) + .await?; + let expect_event = async { while let Some(event) = nodes[0].1.next().await { if let Event::MemberLeft { name, .. } = event { diff --git a/tests/messages.rs b/tests/messages.rs index f8740f8ccc..e61c450a6b 100644 --- a/tests/messages.rs +++ b/tests/messages.rs @@ -90,7 +90,7 @@ async fn test_messages_between_nodes() -> Result<()> { .await?; let node2_name = node2.name().await; - expect_next_event!(event_stream, Event::Connected(Connected::First))?; + expect_next_event!(event_stream, Event::Connected(Connected::First)); node2 .send_message( SrcLocation::Node(node2_name), diff --git a/tests/utils/mod.rs b/tests/utils/mod.rs index de367f9ca8..ca36518d1f 100644 --- a/tests/utils/mod.rs +++ b/tests/utils/mod.rs @@ -110,16 +110,9 @@ pub const TIMEOUT: Duration = Duration::from_secs(5); macro_rules! expect_next_event { ($node:expr, $pattern:pat) => { match tokio::time::timeout($crate::utils::TIMEOUT, $node.next()).await { - Ok(Some($pattern)) => Ok(()), - Ok(other) => Err(anyhow::format_err!( - "Expecting {}, got {:?}", - stringify!($pattern), - other - )), - Err(_) => Err(anyhow::format_err!( - "Timeout when expecting {}", - stringify!($pattern) - )), + Ok(Some($pattern)) => {} + Ok(other) => panic!("Expecting {}, got {:?}", stringify!($pattern), other), + Err(_) => panic!("Timeout when expecting {}", stringify!($pattern)), } }; } @@ -137,8 +130,8 @@ pub async fn create_connected_nodes( .network_params(network_params) .create() .await?; - expect_next_event!(event_stream, Event::Connected(Connected::First))?; - expect_next_event!(event_stream, Event::PromotedToElder)?; + expect_next_event!(event_stream, Event::Connected(Connected::First)); + expect_next_event!(event_stream, Event::PromotedToElder); let bootstrap_contact = node.our_connection_info().await?; @@ -152,7 +145,7 @@ pub async fn create_connected_nodes( .create() .await?; - expect_next_event!(event_stream, Event::Connected(Connected::First))?; + expect_next_event!(event_stream, Event::Connected(Connected::First)); Ok::<_, Error>((node, event_stream)) });