Skip to content
This repository has been archived by the owner on Jun 25, 2021. It is now read-only.

Commit

Permalink
feat: implement message resending
Browse files Browse the repository at this point in the history
  • Loading branch information
madadam committed Sep 29, 2020
1 parent 48d7ce7 commit cc2fcbd
Show file tree
Hide file tree
Showing 10 changed files with 263 additions and 103 deletions.
3 changes: 1 addition & 2 deletions src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@
// KIND, either express or implied. Please review the Licences for the specific language governing
// permissions and limitations relating to use of the SAFE Network Software.

use crate::qp2p::Error as QuicP2pError;
use err_derive::Error;

/// The type returned by the sn_routing message handling methods.
Expand All @@ -23,7 +22,7 @@ pub enum Error {
#[error(display = "Cannot route.")]
CannotRoute,
#[error(display = "Network layer error: {}", _0)]
Network(#[error(source)] QuicP2pError),
Network(#[error(source)] qp2p::Error),
#[error(display = "The node is not in a state to handle the action.")]
InvalidState,
#[error(display = "Bincode error: {}", _0)]
Expand Down
10 changes: 3 additions & 7 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,8 @@
unused_results,
clippy::needless_borrow
)]
// FIXME: find a way to not need this.
#![type_length_limit = "2174929"]

#[macro_use]
extern crate log;
Expand All @@ -82,9 +84,9 @@ pub use self::{
location::{DstLocation, SrcLocation},
network_params::NetworkParams,
node::{EventStream, Node, NodeConfig},
qp2p::Config as TransportConfig,
section::{SectionProofChain, MIN_AGE},
};
pub use qp2p::Config as TransportConfig;

pub use xor_name::{Prefix, XorName, XOR_NAME_LEN}; // TODO remove pub on API update
/// sn_routing events.
Expand Down Expand Up @@ -152,12 +154,6 @@ const RECOMMENDED_SECTION_SIZE: usize = 60;
/// Number of elders per section.
const ELDER_SIZE: usize = 7;

// Quic-p2p
#[cfg(feature = "mock")]
use mock_qp2p as qp2p;
#[cfg(not(feature = "mock"))]
use qp2p::{self};

#[cfg(test)]
mod tests {
use super::{QUORUM_DENOMINATOR, QUORUM_NUMERATOR};
Expand Down
34 changes: 26 additions & 8 deletions src/node/event_stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,8 @@ fn spawn_connections_handler(stage: Arc<Mutex<Stage>>, mut incoming_conns: Incom
let _ = tokio::spawn(async move {
while let Some(incoming_msgs) = incoming_conns.next().await {
trace!(
"New connection established by peer {}",
"{}New connection established by peer {}",
stage.lock().await.log_ident(),
incoming_msgs.remote_addr()
);
spawn_messages_handler(stage.clone(), incoming_msgs)
Expand All @@ -57,7 +58,8 @@ fn spawn_messages_handler(stage: Arc<Mutex<Stage>>, mut incoming_msgs: IncomingM
match msg {
QuicP2pMsg::UniStream { bytes, src, .. } => {
trace!(
"New message ({} bytes) received on a uni-stream from: {}",
"{}New message ({} bytes) received on a uni-stream from: {}",
stage.lock().await.log_ident(),
bytes.len(),
src
);
Expand All @@ -73,7 +75,8 @@ fn spawn_messages_handler(stage: Arc<Mutex<Stage>>, mut incoming_msgs: IncomingM
recv,
} => {
trace!(
"New message ({} bytes) received on a bi-stream from: {}",
"{}New message ({} bytes) received on a bi-stream from: {}",
stage.lock().await.log_ident(),
bytes.len(),
src
);
Expand All @@ -92,17 +95,26 @@ fn spawn_messages_handler(stage: Arc<Mutex<Stage>>, mut incoming_msgs: IncomingM
}
}
}

trace!(
"{}Connection to peer {} closed",
stage.lock().await.log_ident(),
incoming_msgs.remote_addr()
);
});
}

fn spawn_node_message_handler(stage: Arc<Mutex<Stage>>, msg_bytes: Bytes, sender: SocketAddr) {
let _ = tokio::spawn(async move {
match Message::from_bytes(&msg_bytes) {
Err(error) => {
debug!("Failed to deserialize message: {:?}", error);
debug!(
"{}Failed to deserialize message: {:?}",
stage.lock().await.log_ident(),
error
);
}
Ok(msg) => {
trace!("try handle message {:?}", msg);
// Process the message according to our stage
if let Err(err) = stage
.lock()
Expand All @@ -111,8 +123,10 @@ fn spawn_node_message_handler(stage: Arc<Mutex<Stage>>, msg_bytes: Bytes, sender
.await
{
error!(
"Error encountered when processing message {:?}: {}",
msg, err
"{}Error encountered when processing message {:?}: {}",
stage.lock().await.log_ident(),
msg,
err
);
}
}
Expand All @@ -124,7 +138,11 @@ fn spawn_timer_handler(stage: Arc<Mutex<Stage>>, mut rx: mpsc::UnboundedReceiver
let _ = tokio::spawn(async move {
while let Some(token) = rx.recv().await {
if let Err(err) = stage.lock().await.process_timeout(token).await {
error!("Error encountered when processing timeout: {}", err);
error!(
"{}Error encountered when processing timeout: {}",
stage.lock().await.log_ident(),
err
);
}
}
});
Expand Down
8 changes: 1 addition & 7 deletions src/node/stage/approved.rs
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,6 @@ impl Approved {
sender: SocketAddr,
msg: Message,
) -> Result<Option<Bootstrapping>> {
trace!("Got {:?}", msg);
// Filter messages which were already handled
if self.msg_filter.contains_incoming(&msg) {
trace!("not handling message - already handled: {:?}", msg);
Expand All @@ -97,6 +96,7 @@ impl Approved {

match self.decide_message_status(&msg)? {
MessageStatus::Useful => {
trace!("Useful message from {}: {:?}", sender, msg);
self.update_section_knowledge(&msg).await?;
self.handle_useful_message(Some(sender), msg).await
}
Expand Down Expand Up @@ -134,8 +134,6 @@ impl Approved {
// Cast a vote that doesn't need total order, only section consensus.
#[async_recursion]
async fn cast_unordered_vote(&mut self, vote: Vote) -> Result<()> {
trace!("Vote for {:?}", vote);

let key_share = self.section_keys_provider.key_share()?;

trace!(
Expand Down Expand Up @@ -323,10 +321,6 @@ impl Approved {
fn decide_message_status(&self, msg: &Message) -> Result<MessageStatus> {
let our_id = self.node_info.full_id.public_id();

trace!(
"Deciding message status based upon variant: {:?}",
msg.variant()
);
match msg.variant() {
Variant::NeighbourInfo { .. } => {
if !self.is_our_elder(our_id) {
Expand Down
Loading

0 comments on commit cc2fcbd

Please sign in to comment.