Skip to content
This repository has been archived by the owner on Jun 25, 2021. It is now read-only.

Commit

Permalink
feat(AE): flesh out remaining Anti-Entropy flow
Browse files Browse the repository at this point in the history
  • Loading branch information
Yoga07 authored and dirvine committed May 13, 2021
1 parent ddd0682 commit b28c422
Show file tree
Hide file tree
Showing 6 changed files with 146 additions and 48 deletions.
2 changes: 1 addition & 1 deletion src/messages/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ pub(crate) struct Message {
/// Source authority.
/// Messages do not need to sign this field as it is all verifiable (i.e. if the sig validates
/// agains the public key and we know the pub key then we are good. If the proof is not recognised we
/// ask for a longer chain that can be recognised). Therefor we don't need to sign this field.
/// ask for a longer chain that can be recognised). Therefore we don't need to sign this field.
src: SrcAuthority,
/// Destination location.
dst: DstLocation,
Expand Down
7 changes: 1 addition & 6 deletions src/messages/variant.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ pub(crate) enum Variant {
/// `SectionAuthorityProvider` and `SectionChain` of the sender's section, with the proof chain.
src_info: (Proven<SectionAuthorityProvider>, SectionChain),
/// Message
msg: Box<Message>,
msg: Option<Box<Message>>,
// Nonce that is derived from the incoming message that triggered sending this
// message. It's purpose is to make sure that `OtherSection`s that are identical
// but triggered by different messages are not filtered out.
Expand Down Expand Up @@ -119,10 +119,6 @@ pub(crate) enum Variant {
key: bls::PublicKey,
msg: Box<Message>,
},
/// Message sent by dst to indicate that sender is lagging on knowledge and shares it.
// SrcOutdated(Variant),
/// Message sent by dst to indicate that the Dst is ahead on knowledge and shares it.
DstAhead(SectionChain),
/// Message sent by dst to indicate that Dst is outdated in knowledge.
/// A follow-up reply will be sent by src with SectionKnowledge.
// DstOutdated,
Expand Down Expand Up @@ -253,7 +249,6 @@ impl Debug for Variant {
.field("difficulty", difficulty)
.finish(),
Self::ConnectivityComplaint(name) => write!(f, "ConnectivityComplaint({:?})", name),
Self::DstAhead(_) => write!(f, "DstAhead"),
Self::SrcAhead { .. } => write!(f, "SrcAhead"),
}
}
Expand Down
1 change: 0 additions & 1 deletion src/routing/core/messaging/handling/decisions.rs
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,6 @@ impl Core {
| Variant::DkgFailureObservation { .. }
| Variant::DkgFailureAgreement { .. }
| Variant::SrcAhead { .. }
| Variant::DstAhead(_)
| Variant::ResourceChallenge { .. } => {}
}

Expand Down
37 changes: 28 additions & 9 deletions src/routing/core/messaging/handling/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -72,8 +72,12 @@ impl Core {
match self.decide_message_status(&msg)? {
MessageStatus::Useful => {
trace!("Useful message from {:?}: {:?}", sender, msg);
commands.extend(self.check_for_entropy(&msg, dest_info.clone())?);
commands.extend(self.handle_useful_message(sender, msg, dest_info).await?);
let (entropy_commands, can_be_executed) =
self.check_for_entropy(&msg, dest_info.clone(), sender)?;
commands.extend(entropy_commands);
if can_be_executed {
commands.extend(self.handle_useful_message(sender, msg, dest_info).await?);
}
}
MessageStatus::Untrusted => {
debug!("Untrusted message from {:?}: {:?} ", sender, msg);
Expand Down Expand Up @@ -294,11 +298,15 @@ impl Core {
Variant::SectionKnowledge { src_info, msg } => {
let src_info = src_info.clone();
self.update_section_knowledge(src_info.0, src_info.1);
Ok(vec![Command::HandleMessage {
sender,
message: *msg.clone(),
dest_info,
}])
if let Some(bounced_msg) = msg {
Ok(vec![Command::HandleMessage {
sender,
message: *bounced_msg.clone(),
dest_info,
}])
} else {
Ok(vec![])
}
}
Variant::Sync { section, network } => {
self.handle_sync(section.clone(), network.clone())
Expand Down Expand Up @@ -343,7 +351,7 @@ impl Core {
msg.src().src_location().to_dst(),
)?])
}
Variant::DstAhead(_chain) => Ok(vec![]),

Variant::DkgStart {
dkg_key,
elders_info,
Expand Down Expand Up @@ -418,7 +426,7 @@ impl Core {
let section_auth = self.section.proven_authority_provider();
let variant = Variant::SectionKnowledge {
src_info: (section_auth.clone(), truncated_key),
msg,
msg: Some(msg),
};

let msg = Message::single_src(self.node(), dst_location, variant, None)?;
Expand Down Expand Up @@ -545,6 +553,17 @@ impl Core {
self.update_state(snapshot)
}

pub fn handle_lagging_messages_on_sync(&mut self) -> Result<Vec<Command>> {
let mut commands = vec![];
let latest_key = *self.section_chain().last_key();
if let Some(lagged_commands) = self.lagging_messages.src_ahead.remove(&latest_key) {
// We now have the latest key, execute the messages that received when we were lagging.
commands.extend(lagged_commands);
}

Ok(commands)
}

pub(crate) fn handle_join_request(
&mut self,
peer: Peer,
Expand Down
37 changes: 28 additions & 9 deletions src/routing/core/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,9 @@ use bls_signature_aggregator::SignatureAggregator;
use itertools::Itertools;
use resource_proof::ResourceProof;
use sn_messaging::DestInfo;
use std::collections::BTreeMap;
use std::collections::BTreeSet;
use std::net::SocketAddr;
use tokio::sync::mpsc;
use xor_name::{Prefix, XorName};

Expand All @@ -56,6 +58,13 @@ pub(crate) struct Core {
resource_proof: ResourceProof,
end_users: EndUserRegistry,
connectivity_complaints: ConnectivityComplaints,
lagging_messages: LaggingMessages,
}

#[derive(Default)]
pub(crate) struct LaggingMessages {
// Execute upon sync
pub src_ahead: BTreeMap<bls::PublicKey, Vec<Command>>,
}

impl Core {
Expand Down Expand Up @@ -84,31 +93,40 @@ impl Core {
resource_proof: ResourceProof::new(RESOURCE_PROOF_DATA_SIZE, RESOURCE_PROOF_DIFFICULTY),
end_users: EndUserRegistry::new(),
connectivity_complaints: ConnectivityComplaints::new(),
lagging_messages: LaggingMessages::default(),
}
}

////////////////////////////////////////////////////////////////////////////
// Miscellaneous
////////////////////////////////////////////////////////////////////////////

fn check_for_entropy(&mut self, msg: &Message, dest_info: DestInfo) -> Result<Vec<Command>> {
fn check_for_entropy(
&mut self,
msg: &Message,
dest_info: DestInfo,
sender: Option<SocketAddr>,
) -> Result<(Vec<Command>, bool)> {
if !self.is_elder() {
return Ok(vec![]);
return Ok((vec![], false));
}

let actions =
lazy_messaging::process(&self.node, &self.section, &self.network, msg, dest_info)?;
let (actions, can_be_executed) = lazy_messaging::process(
&self.node,
&self.section,
&self.network,
&mut self.lagging_messages,
msg,
dest_info,
sender,
)?;
let mut commands = vec![];

for msg in actions.send {
commands.extend(self.relay_message(&msg)?);
}

if let Some(proposal) = actions.propose {
commands.extend(self.propose(proposal)?);
}

Ok(commands)
Ok((commands, can_be_executed))
}

pub(crate) fn state_snapshot(&self) -> StateSnapshot {
Expand Down Expand Up @@ -184,6 +202,7 @@ impl Core {

if new.is_elder || old.is_elder {
commands.extend(self.send_sync(self.section.clone(), self.network.clone())?);
commands.extend(self.handle_lagging_messages_on_sync()?);
}

let current: BTreeSet<_> = self
Expand Down
Loading

0 comments on commit b28c422

Please sign in to comment.