diff --git a/src/lib.rs b/src/lib.rs index 48b5044cb8..0ee20b18dc 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -63,8 +63,10 @@ unused_results, clippy::needless_borrow )] -// FIXME: find a way to not need this. -#![type_length_limit = "2259754"] +// FIXME: it seems the code in `Comm::send_message_to_targets` is triggering type-length limit +// reached error for some reason. This is a quick workaround, but we should probably look into it +// closely and find a proper fix (or establish that this is already a proper fix). +#![type_length_limit = "2268004"] #[macro_use] extern crate log; diff --git a/src/node/mod.rs b/src/node/mod.rs index cc8df81d8c..8ac0775faf 100644 --- a/src/node/mod.rs +++ b/src/node/mod.rs @@ -12,11 +12,10 @@ mod stage; #[cfg(all(test, feature = "mock"))] mod tests; +pub use self::event_stream::EventStream; #[cfg(feature = "mock")] pub use self::stage::{BOOTSTRAP_TIMEOUT, JOIN_TIMEOUT}; -pub use event_stream::EventStream; - use self::{executor::Executor, stage::Stage}; use crate::{ error::{Error, Result}, @@ -83,16 +82,13 @@ impl Node { let mut rng = MainRng::default(); let full_id = config.full_id.unwrap_or_else(|| FullId::gen(&mut rng)); let node_name = *full_id.public_id().name(); - let transport_config = config.transport_config; - let network_params = config.network_params; - let is_genesis = config.first; - let (stage, incoming_conns, timer_rx, events_rx) = if is_genesis { + let (stage, incoming_conns, timer_rx, events_rx) = if config.first { info!("{} Starting a new network as the seed node.", node_name); - Stage::first_node(transport_config, full_id, network_params).await? + Stage::first_node(config.transport_config, full_id, config.network_params).await? } else { info!("{} Bootstrapping a new node.", node_name); - Stage::bootstrap(transport_config, full_id, network_params).await? + Stage::bootstrap(config.transport_config, full_id, config.network_params).await? }; let stage = Arc::new(Mutex::new(stage)); diff --git a/src/node/stage/comm.rs b/src/node/stage/comm.rs index 707ff143b8..a1a4bee008 100644 --- a/src/node/stage/comm.rs +++ b/src/node/stage/comm.rs @@ -13,7 +13,7 @@ use futures::{ stream::{FuturesUnordered, StreamExt}, }; use lru_time_cache::LruCache; -use qp2p::{Config, Connection, Endpoint, IncomingConnections, QuicP2p}; +use qp2p::{Connection, Endpoint, IncomingConnections, QuicP2p}; use std::{net::SocketAddr, slice, sync::Arc, time::Duration}; use tokio::time; @@ -22,9 +22,35 @@ const CONNECTIONS_CACHE_SIZE: usize = 1024; /// Maximal number of resend attempts to the same target. pub const RESEND_MAX_ATTEMPTS: u8 = 3; -/// Delay before attempting to resend a previously failed message. +/// Default delay before attempting to resend a previously failed message. pub const RESEND_DELAY: Duration = Duration::from_secs(10); +/// Configuration for the communication component. +pub struct Config { + /// Config for the underlying network transport. + pub transport_config: qp2p::Config, + /// Delay before attempting to resend a message that previously failed to send. + pub resend_delay: Duration, +} + +impl Default for Config { + fn default() -> Self { + Self { + transport_config: Default::default(), + resend_delay: RESEND_DELAY, + } + } +} + +impl From for Config { + fn from(transport_config: qp2p::Config) -> Self { + Self { + transport_config, + ..Default::default() + } + } +} + // Communication component of the node to interact with other nodes. #[derive(Clone)] pub(crate) struct Comm { @@ -32,8 +58,9 @@ pub(crate) struct Comm { } impl Comm { - pub async fn new(transport_config: Config) -> Result { - let quic_p2p = QuicP2p::with_config(Some(transport_config), Default::default(), true)?; + pub async fn new(config: Config) -> Result { + let quic_p2p = + QuicP2p::with_config(Some(config.transport_config), Default::default(), true)?; // Don't bootstrap, just create an endpoint where to listen to // the incoming messages from other nodes. @@ -45,12 +72,14 @@ impl Comm { _quic_p2p: quic_p2p, endpoint, node_conns, + resend_delay: config.resend_delay, }), }) } - pub async fn from_bootstrapping(transport_config: Config) -> Result<(Self, SocketAddr)> { - let mut quic_p2p = QuicP2p::with_config(Some(transport_config), Default::default(), true)?; + pub async fn from_bootstrapping(config: Config) -> Result<(Self, SocketAddr)> { + let mut quic_p2p = + QuicP2p::with_config(Some(config.transport_config), Default::default(), true)?; // Bootstrap to the network returning the connection to a node. let (endpoint, conn) = quic_p2p.bootstrap().await?; @@ -66,6 +95,7 @@ impl Comm { _quic_p2p: quic_p2p, endpoint, node_conns, + resend_delay: config.resend_delay, }), }, addr, @@ -166,6 +196,7 @@ struct Inner { _quic_p2p: QuicP2p, endpoint: Endpoint, node_conns: Mutex>>, + resend_delay: Duration, } impl Inner { @@ -199,7 +230,7 @@ impl Inner { delay: bool, ) -> (SocketAddr, Result<()>) { if delay { - time::delay_for(RESEND_DELAY).await; + time::delay_for(self.resend_delay).await; } let result = self.send(&recipient, msg).await; diff --git a/src/node/stage/mod.rs b/src/node/stage/mod.rs index 473813d53a..d215e45d87 100644 --- a/src/node/stage/mod.rs +++ b/src/node/stage/mod.rs @@ -97,7 +97,7 @@ impl Stage { mpsc::UnboundedReceiver, mpsc::UnboundedReceiver, )> { - let comm = Comm::new(transport_config).await?; + let comm = Comm::new(transport_config.into()).await?; let connection_info = comm.our_connection_info()?; let p2p_node = P2pNode::new(*full_id.public_id(), connection_info); @@ -154,7 +154,7 @@ impl Stage { mpsc::UnboundedReceiver, mpsc::UnboundedReceiver, )> { - let (comm, addr) = Comm::from_bootstrapping(transport_config).await?; + let (comm, addr) = Comm::from_bootstrapping(transport_config.into()).await?; let (events_tx, events_rx) = mpsc::unbounded_channel(); let node_info = NodeInfo {