Skip to content

Commit

Permalink
fix: protocol too fast and spamming messages (#522)
Browse files Browse the repository at this point in the history
* Limit spamming of logs

* Fix protocol loop such that active participants are more easily picked up

* Make offline test more stable
  • Loading branch information
ChaoticTempest authored Mar 26, 2024
1 parent d6673c5 commit 5919b6f
Show file tree
Hide file tree
Showing 5 changed files with 80 additions and 33 deletions.
17 changes: 14 additions & 3 deletions integration-tests/tests/multichain/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -112,14 +112,25 @@ async fn test_signature_offline_node() -> anyhow::Result<()> {
Box::pin(async move {
let state_0 = wait_for::running_mpc(&ctx, 0).await?;
assert_eq!(state_0.participants.len(), 3);
wait_for::has_at_least_triples(&ctx, 2).await?;
wait_for::has_at_least_triples(&ctx, 4).await?;

// Kill the node then have presignature and signature generation only use the active set of nodes
// to start generating presignatures and signatures.
ctx.nodes.kill_node(2).await?;

wait_for::has_at_least_presignatures(&ctx, 2).await?;
actions::single_signature_production(&ctx, &state_0).await?;
// This could potentially fail and timeout the first time if the participant set picked up is the
// one with the offline node. This is expected behavior for now if a user submits a request in between
// a node going offline and the system hasn't detected it yet.
let presig_res = wait_for::has_at_least_presignatures(&ctx, 2).await;
let sig_res = actions::single_signature_production(&ctx, &state_0).await;

// Try again if the first attempt failed. This second portion should not be needed when the NEP
// comes in for resumeable MPC.
if presig_res.is_err() || sig_res.is_err() {
// Retry if the first attempt failed.
wait_for::has_at_least_presignatures(&ctx, 2).await?;
actions::single_signature_production(&ctx, &state_0).await?;
}

Ok(())
})
Expand Down
23 changes: 15 additions & 8 deletions node/src/http_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ use crate::protocol::MpcMessage;
use cait_sith::protocol::Participant;
use mpc_keys::hpke;
use reqwest::{Client, IntoUrl};
use std::collections::{HashMap, VecDeque};
use std::collections::{HashMap, HashSet, VecDeque};
use std::str::Utf8Error;
use std::time::{Duration, Instant};
use tokio_retry::strategy::{jitter, ExponentialBackoff};
Expand All @@ -27,6 +27,10 @@ pub enum SendError {
MalformedResponse(Utf8Error),
#[error("encryption error: {0}")]
EncryptionError(String),
#[error("http request timeout: {0}")]
Timeout(String),
#[error("participant is not alive: {0}")]
ParticipantNotAlive(String),
}

async fn send_encrypted<U: IntoUrl>(
Expand Down Expand Up @@ -82,6 +86,7 @@ async fn send_encrypted<U: IntoUrl>(
#[derive(Default)]
pub struct MessageQueue {
deque: VecDeque<(ParticipantInfo, MpcMessage, Instant)>,
seen_counts: HashSet<String>,
}

impl MessageQueue {
Expand All @@ -106,16 +111,16 @@ impl MessageQueue {
) -> Vec<SendError> {
let mut failed = VecDeque::new();
let mut errors = Vec::new();
let mut cannot_send_errors = HashMap::new();
let mut participant_counter = HashMap::new();
while let Some((info, msg, instant)) = self.deque.pop_front() {
if !participants.contains_key(&Participant::from(info.id)) {
if instant.elapsed() > message_type_to_timeout(&msg) {
errors.push(SendError::Unsuccessful(format!(
errors.push(SendError::Timeout(format!(
"message has timed out on offline node: {info:?}",
)));
continue;
}
let counter = cannot_send_errors.entry(info.id).or_insert(0);
let counter = participant_counter.entry(info.id).or_insert(0);
*counter += 1;
failed.push_back((info, msg, instant));
continue;
Expand All @@ -125,7 +130,7 @@ impl MessageQueue {
send_encrypted(from, &info.cipher_pk, sign_sk, client, &info.url, &msg).await
{
if instant.elapsed() > message_type_to_timeout(&msg) {
errors.push(SendError::Unsuccessful(format!(
errors.push(SendError::Timeout(format!(
"message has timed out: {err:?}"
)));
continue;
Expand All @@ -135,9 +140,11 @@ impl MessageQueue {
errors.push(err);
}
}
if !cannot_send_errors.is_empty() {
errors.push(SendError::Unsuccessful(format!(
"cannot send message due to participants not responding: {cannot_send_errors:?}",
// only add the participant count if it hasn't been seen before.
let counts = format!("{participant_counter:?}");
if !participant_counter.is_empty() && self.seen_counts.insert(counts.clone()) {
errors.push(SendError::ParticipantNotAlive(format!(
"participants not responding: {counts:?}",
)));
}

Expand Down
5 changes: 5 additions & 0 deletions node/src/mesh/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,11 @@ impl Mesh {
self.connections
.establish_participants(contract_state)
.await;
self.ping().await;
}

/// Ping the active participants such that we can see who is alive.
pub async fn ping(&mut self) {
self.active_participants = self.connections.ping().await;
self.active_potential_participants = self.connections.ping_potential().await;
}
Expand Down
12 changes: 6 additions & 6 deletions node/src/protocol/cryptography.rs
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,7 @@ impl CryptographicProtocol for GeneratingState {
if !failures.is_empty() {
tracing::warn!(
active = ?ctx.mesh().active_participants().keys_vec(),
"generating(wait): failed to send encrypted message; {failures:#?}"
"generating(wait): failed to send encrypted message; {failures:?}"
);
}

Expand Down Expand Up @@ -167,7 +167,7 @@ impl CryptographicProtocol for GeneratingState {
if !failures.is_empty() {
tracing::warn!(
active = ?ctx.mesh().active_participants().keys_vec(),
"generating(return): failed to send encrypted message; {failures:#?}"
"generating(return): failed to send encrypted message; {failures:?}"
);
}
return Ok(NodeState::WaitingForConsensus(WaitingForConsensusState {
Expand Down Expand Up @@ -204,7 +204,7 @@ impl CryptographicProtocol for WaitingForConsensusState {
if !failures.is_empty() {
tracing::warn!(
active = ?ctx.mesh().active_participants().keys_vec(),
"waitingForConsensus: failed to send encrypted message; {failures:#?}"
"waitingForConsensus: failed to send encrypted message; {failures:?}"
);
}

Expand Down Expand Up @@ -254,7 +254,7 @@ impl CryptographicProtocol for ResharingState {
active = ?active.keys_vec(),
new = ?self.new_participants,
old = ?self.old_participants,
"resharing(wait): failed to send encrypted message; {failures:#?}",
"resharing(wait): failed to send encrypted message; {failures:?}",
);
}

Expand Down Expand Up @@ -308,7 +308,7 @@ impl CryptographicProtocol for ResharingState {
active = ?active.keys_vec(),
new = ?self.new_participants,
old = ?self.old_participants,
"resharing(return): failed to send encrypted message; {failures:#?}",
"resharing(return): failed to send encrypted message; {failures:?}",
);
}

Expand Down Expand Up @@ -480,7 +480,7 @@ impl CryptographicProtocol for RunningState {
if !failures.is_empty() {
tracing::warn!(
active = ?active.keys_vec(),
"running(post): failed to send encrypted message; {failures:#?}"
"running: failed to send encrypted message; {failures:?}"
);
}
drop(messages);
Expand Down
56 changes: 40 additions & 16 deletions node/src/protocol/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -221,6 +221,7 @@ impl MpcSignProtocol {
.set(1);
let mut queue = MpcMessageQueue::default();
let mut last_state_update = Instant::now();
let mut last_pinged = Instant::now();
loop {
tracing::debug!("trying to advance mpc recovery protocol");
loop {
Expand All @@ -241,21 +242,7 @@ impl MpcSignProtocol {
}
}

let state = {
let guard = self.state.read().await;
guard.clone()
};

let mut state = match state.progress(&mut self).await {
Ok(state) => state,
Err(err) => {
tracing::info!("protocol unable to progress: {err:?}");
tokio::time::sleep(Duration::from_millis(100)).await;
continue;
}
};

if last_state_update.elapsed() > Duration::from_secs(1) {
let contract_state = if last_state_update.elapsed() > Duration::from_secs(1) {
let contract_state = match rpc_client::fetch_mpc_contract_state(
&self.ctx.rpc_client,
&self.ctx.mpc_contract_id,
Expand All @@ -276,6 +263,32 @@ impl MpcSignProtocol {
// receiving messages.
self.ctx.mesh.establish_participants(&contract_state).await;

last_state_update = Instant::now();
Some(contract_state)
} else {
None
};

if last_pinged.elapsed() > Duration::from_millis(300) {
self.ctx.mesh.ping().await;
last_pinged = Instant::now();
}

let state = {
let guard = self.state.read().await;
guard.clone()
};

let mut state = match state.progress(&mut self).await {
Ok(state) => state,
Err(err) => {
tracing::info!("protocol unable to progress: {err:?}");
tokio::time::sleep(Duration::from_millis(100)).await;
continue;
}
};

if let Some(contract_state) = contract_state {
state = match state.advance(&mut self, contract_state).await {
Ok(state) => state,
Err(err) => {
Expand All @@ -284,7 +297,6 @@ impl MpcSignProtocol {
continue;
}
};
last_state_update = Instant::now();
}

if let Err(err) = state.handle(&self, &mut queue).await {
Expand All @@ -293,9 +305,21 @@ impl MpcSignProtocol {
continue;
}

let sleep_ms = match state {
NodeState::Generating(_) => 500,
NodeState::Resharing(_) => 500,
NodeState::Running(_) => 100,

NodeState::Starting => 1000,
NodeState::Started(_) => 1000,
NodeState::WaitingForConsensus(_) => 1000,
NodeState::Joining(_) => 1000,
};

let mut guard = self.state.write().await;
*guard = state;
drop(guard);
tokio::time::sleep(Duration::from_millis(sleep_ms)).await;
}
}
}
Expand Down

0 comments on commit 5919b6f

Please sign in to comment.