Skip to content
This repository has been archived by the owner on Jun 25, 2021. It is now read-only.

Commit

Permalink
Merge pull request #2230 from madadam/connection-lost
Browse files Browse the repository at this point in the history
Improve lost peer detection
  • Loading branch information
madadam authored Nov 23, 2020
2 parents 36a469a + 00fad6e commit 9c97008
Show file tree
Hide file tree
Showing 17 changed files with 614 additions and 592 deletions.
3 changes: 1 addition & 2 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ hex_fmt = "~0.3.0"
itertools = "~0.9.0"
log = "~0.4.8"
lru_time_cache = "~0.11.0"
qp2p = "~0.8.8"
qp2p = "~0.9.1"
rand = "~0.7.3"
rand_chacha = "~0.2.2"
serde = { version = "1.0.117", features = ["derive"] }
Expand All @@ -38,4 +38,3 @@ env_logger = "~0.7.1"
structopt = "~0.3.17"
proptest = "0.10.1"
rand = { version = "~0.7.3", features = ["small_rng"] }

116 changes: 99 additions & 17 deletions src/consensus/dkg.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ use hex_fmt::HexFmt;
use itertools::Itertools;
use serde::{Deserialize, Serialize};
use std::{
collections::{HashMap, HashSet},
collections::{HashMap, HashSet, VecDeque},
fmt::{self, Debug, Formatter},
net::SocketAddr,
time::Duration,
Expand All @@ -32,6 +32,8 @@ use xor_name::XorName;
// Interval to progress DKG timed phase
const DKG_PROGRESS_INTERVAL: Duration = Duration::from_secs(30);

const BACKLOG_CAPACITY: usize = 100;

/// Unique identified of a DKG session.
#[derive(Copy, Clone, Eq, PartialEq, Ord, PartialOrd, Hash, Serialize, Deserialize)]
pub struct DkgKey(Digest256);
Expand Down Expand Up @@ -88,13 +90,19 @@ impl Debug for DkgKey {
pub(crate) struct DkgVoter {
participant: Option<Participant>,
observers: HashMap<DkgKey, Observer>,

// Due to the asyncronous nature of the network we might sometimes receive a DKG message before
// we created the corresponding `Participant` session. To avoid losing those messages, we store
// them in this backlog and replay them once we create the session.
backlog: Backlog,
}

impl Default for DkgVoter {
fn default() -> Self {
Self {
participant: None,
observers: HashMap::new(),
backlog: Backlog::new(),
}
}
}
Expand All @@ -114,6 +122,20 @@ impl DkgVoter {
}
}

// Special case: only one participant.
if elders_info.elders.len() == 1 {
let secret_key_set = bls::SecretKeySet::random(0, &mut rand::thread_rng());

return vec![DkgCommand::HandleParticipationResult {
dkg_key,
elders_info,
result: Ok(DkgOutcome {
public_key_set: secret_key_set.public_keys(),
secret_key_share: secret_key_set.secret_key_share(0),
}),
}];
}

let threshold = majority(elders_info.elders.len()) - 1;
let participants = elders_info
.elders
Expand All @@ -135,6 +157,13 @@ impl DkgVoter {

let mut commands = session.broadcast(&our_name, dkg_key, message);

commands.extend(
self.backlog
.take(&dkg_key)
.into_iter()
.flat_map(|message| session.process_message(&our_name, dkg_key, message)),
);

if let Some(command) = session.check() {
// Already completed.
commands.push(command)
Expand Down Expand Up @@ -228,9 +257,14 @@ impl DkgVoter {
dkg_key: DkgKey,
message: DkgMessage,
) -> Vec<DkgCommand> {
let session = if let Some(session) = &mut self.participant {
let session = if let Some(session) = self
.participant
.as_mut()
.filter(|session| session.dkg_key == dkg_key)
{
session
} else {
self.backlog.push(dkg_key, message);
return vec![];
};

Expand Down Expand Up @@ -352,10 +386,6 @@ impl Participant {
dkg_key: DkgKey,
message: DkgMessage,
) -> Vec<DkgCommand> {
if self.dkg_key != dkg_key {
return vec![];
}

trace!("process DKG message {:?}", message);
let responses = self
.key_gen
Expand All @@ -374,6 +404,8 @@ impl Participant {
dkg_key: DkgKey,
message: DkgMessage,
) -> Vec<DkgCommand> {
let mut commands = vec![];

let recipients: Vec<_> = self
.elders_info
.as_ref()
Expand All @@ -384,14 +416,15 @@ impl Participant {
.copied()
.collect();

trace!("broadcasting DKG message {:?} to {:?}", message, recipients);
if !recipients.is_empty() {
trace!("broadcasting DKG message {:?} to {:?}", message, recipients);
commands.push(DkgCommand::SendMessage {
recipients,
dkg_key,
message: message.clone(),
});
}

let mut commands = vec![];
commands.push(DkgCommand::SendMessage {
recipients,
dkg_key,
message: message.clone(),
});
commands.extend(self.process_message(our_name, dkg_key, message));
commands
}
Expand Down Expand Up @@ -448,6 +481,40 @@ struct Observer {
accumulator: HashMap<Result<bls::PublicKey, ()>, HashSet<XorName>>,
}

struct Backlog(VecDeque<(DkgKey, DkgMessage)>);

impl Backlog {
fn new() -> Self {
Self(VecDeque::with_capacity(BACKLOG_CAPACITY))
}

fn push(&mut self, dkg_key: DkgKey, message: DkgMessage) {
if self.0.len() == self.0.capacity() {
let _ = self.0.pop_front();
}

self.0.push_back((dkg_key, message))
}

fn take(&mut self, dkg_key: &DkgKey) -> Vec<DkgMessage> {
let mut output = Vec::new();
let max = self.0.len();

for _ in 0..max {
if let Some((message_dkg_key, message)) = self.0.pop_front() {
if &message_dkg_key == dkg_key {
output.push(message)
} else {
self.0.push_back((message_dkg_key, message))
}
}
}

output
}
}

#[derive(Debug)]
pub(crate) enum DkgCommand {
SendMessage {
recipients: Vec<SocketAddr>,
Expand Down Expand Up @@ -477,10 +544,7 @@ impl DkgCommand {
dkg_key,
message,
} => {
let variant = Variant::DKGMessage {
dkg_key,
message: bincode::serialize(&message)?.into(),
};
let variant = Variant::DKGMessage { dkg_key, message };
let message = Message::single_src(node, DstLocation::Direct, variant, None, None)?;

Ok(Command::send_message_to_targets(
Expand Down Expand Up @@ -540,6 +604,7 @@ mod tests {
section::test_utils::{gen_addr, gen_elders_info},
ELDER_SIZE, MIN_AGE,
};
use assert_matches::assert_matches;
use proptest::prelude::*;
use rand::{rngs::SmallRng, SeedableRng};
use std::iter;
Expand Down Expand Up @@ -646,6 +711,23 @@ mod tests {
.is_none());
}

#[test]
fn single_participant() {
// If there is only one participant, the DKG should complete immediately.

let mut voter = DkgVoter::default();

let peer = Peer::new(rand::random(), gen_addr(), MIN_AGE);
let elders_info = EldersInfo::new(iter::once(peer), Prefix::default());
let dkg_key = DkgKey::new(&elders_info);

let commands = voter.start_participating(*peer.name(), dkg_key, elders_info);
assert_matches!(
&commands[..],
&[DkgCommand::HandleParticipationResult { result: Ok(_), .. }]
);
}

proptest! {
// Run a DKG session where every participant handles every message sent to them.
// Expect the session to successfully complete without timed transitions.
Expand Down
4 changes: 4 additions & 0 deletions src/messages/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,10 @@ use serde::{Deserialize, Serialize};
use std::fmt::{self, Debug, Formatter};
use xor_name::Prefix;

// Message used to probe a peer connection.
// NOTE: ideally this would be empty, but that is currently treated as error by qp2p.
pub(crate) const PING: &[u8] = &[0];

/// Message sent over the network.
#[derive(Clone, Eq, Serialize, Deserialize)]
pub(crate) struct Message {
Expand Down
7 changes: 4 additions & 3 deletions src/messages/variant.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ use crate::{
relocation::{RelocateDetails, RelocatePayload, RelocatePromise},
section::{EldersInfo, Section, SectionProofChain, TrustStatus},
};
use bls_dkg::key_gen::message::Message as DkgMessage;
use bytes::Bytes;
use hex_fmt::HexFmt;
use serde::{Deserialize, Serialize};
Expand Down Expand Up @@ -85,8 +86,8 @@ pub(crate) enum Variant {
DKGMessage {
/// The identifier of the DKG session this message is for.
dkg_key: DkgKey,
/// The serialized DKG message.
message: Bytes,
/// The DKG message.
message: DkgMessage,
},
/// Message to notify current elders about the DKG result.
DKGResult {
Expand Down Expand Up @@ -183,7 +184,7 @@ impl Debug for Variant {
Self::DKGMessage { dkg_key, message } => f
.debug_struct("DKGMessage")
.field("dkg_key", &dkg_key)
.field("message_hash", &MessageHash::from_bytes(message))
.field("message", message)
.finish(),
Self::DKGResult { dkg_key, result } => f
.debug_struct("DKGResult")
Expand Down
38 changes: 30 additions & 8 deletions src/routing/approved.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ use crate::{
message_filter::MessageFilter,
messages::{
BootstrapResponse, JoinRequest, Message, MessageHash, MessageStatus, PlainMessage, Variant,
VerifyStatus,
VerifyStatus, PING,
},
network::Network,
node::Node,
Expand All @@ -34,7 +34,7 @@ use crate::{
},
RECOMMENDED_SECTION_SIZE,
};
use bls_dkg::key_gen::outcome::Outcome as DkgOutcome;
use bls_dkg::key_gen::{message::Message as DkgMessage, outcome::Outcome as DkgOutcome};
use bytes::Bytes;
use itertools::Itertools;
use std::{cmp, net::SocketAddr, slice};
Expand Down Expand Up @@ -199,8 +199,28 @@ impl Approved {
}
}

pub fn handle_connection_lost(&self, addr: &SocketAddr) -> Option<Command> {
if !self.is_elder() {
return None;
}

if let Some(peer) = self.section.find_joined_member_by_addr(addr) {
trace!("Lost connection to {}", peer);
} else {
return None;
}

// Try to send a "ping" message to probe the peer connection. If it succeeds, the
// connection loss was just temporary. Otherwise the peer is assumed lost and we will vote
// it offline.
Some(Command::send_message_to_target(
addr,
Bytes::from_static(PING),
))
}

pub fn handle_peer_lost(&self, addr: &SocketAddr) -> Result<Vec<Command>> {
let name = if let Some(peer) = self.section.find_member_from_addr(addr) {
let name = if let Some(peer) = self.section.find_joined_member_by_addr(addr) {
debug!("Lost known peer {}", peer);
*peer.name()
} else {
Expand Down Expand Up @@ -230,6 +250,11 @@ impl Approved {
let key = outcome.public_key_set.public_key();
self.section_keys_provider
.insert_dkg_outcome(&self.node.name(), &elders_info, outcome);

if self.section.chain().has_key(&key) {
self.section_keys_provider.finalise_dkg(&key)
}

Ok(key)
} else {
Err(())
Expand Down Expand Up @@ -1003,10 +1028,9 @@ impl Approved {
fn handle_dkg_message(
&mut self,
dkg_key: DkgKey,
message_bytes: Bytes,
message: DkgMessage,
sender: XorName,
) -> Result<Vec<Command>> {
let message = bincode::deserialize(&message_bytes[..])?;
trace!("handle DKG message {:?} from {}", message, sender);

self.dkg_voter
Expand Down Expand Up @@ -1316,9 +1340,7 @@ impl Approved {
let mut commands = vec![];

if self.section.chain().has_key(&public_key) {
// Our section state is already up to date, so no need to vote. Just finalize the DKG so
// we can start using the new secret key share.
self.section_keys_provider.finalise_dkg(&public_key);
// Our section state is already up to date, so no need to vote.
return Ok(commands);
}

Expand Down
Loading

0 comments on commit 9c97008

Please sign in to comment.