diff --git a/node/src/http_client.rs b/node/src/http_client.rs index b0da9f1aa..10574a362 100644 --- a/node/src/http_client.rs +++ b/node/src/http_client.rs @@ -1,8 +1,10 @@ use crate::protocol::message::SignedMessage; use crate::protocol::MpcMessage; +use crate::protocol::ParticipantInfo; use cait_sith::protocol::Participant; use mpc_keys::hpke; use reqwest::{Client, IntoUrl}; +use std::collections::VecDeque; use std::str::Utf8Error; use tokio_retry::strategy::{jitter, ExponentialBackoff}; use tokio_retry::Retry; @@ -23,17 +25,17 @@ pub enum SendError { EncryptionError(String), } -pub async fn send_encrypted( - participant: Participant, +async fn send_encrypted( + from: Participant, cipher_pk: &hpke::PublicKey, sign_sk: &near_crypto::SecretKey, client: &Client, url: U, - message: MpcMessage, + message: &MpcMessage, ) -> Result<(), SendError> { - let encrypted = SignedMessage::encrypt(message, participant, sign_sk, cipher_pk) + let encrypted = SignedMessage::encrypt(message, from, sign_sk, cipher_pk) .map_err(|err| SendError::EncryptionError(err.to_string()))?; - tracing::debug!(?participant, ciphertext = ?encrypted.text, "sending encrypted"); + tracing::debug!(?from, ciphertext = ?encrypted.text, "sending encrypted"); let _span = tracing::info_span!("message_request"); let mut url = url.into_url()?; @@ -103,6 +105,50 @@ pub async fn join(client: &Client, url: U, me: &Participant) -> Resu Retry::spawn(retry_strategy, action).await } +// TODO: add in retry logic either in struct or at call site. +// TODO: add check for participant list to see if the messages to be sent are still valid. +#[derive(Default)] +pub struct MessageQueue { + deque: VecDeque<(ParticipantInfo, MpcMessage)>, +} + +impl MessageQueue { + pub fn len(&self) -> usize { + self.deque.len() + } + + pub fn is_empty(&self) -> bool { + self.deque.is_empty() + } + + pub fn push(&mut self, info: ParticipantInfo, msg: MpcMessage) { + self.deque.push_back((info, msg)); + } + + pub async fn send_encrypted( + &mut self, + from: Participant, + sign_sk: &near_crypto::SecretKey, + client: &Client, + ) -> Result<(), SendError> { + while let Some((info, msg)) = self.deque.front() { + let result = + send_encrypted(from, &info.cipher_pk, sign_sk, client, &info.url, msg).await; + + match result { + Ok(()) => { + let _ = self.deque.pop_front(); + } + Err(err) => { + return Err(err); + } + } + } + + Ok(()) + } +} + #[cfg(test)] mod tests { use crate::protocol::message::GeneratingMessage; diff --git a/node/src/protocol/consensus.rs b/node/src/protocol/consensus.rs index 01d309911..33aac1b48 100644 --- a/node/src/protocol/consensus.rs +++ b/node/src/protocol/consensus.rs @@ -33,6 +33,7 @@ pub trait ConsensusCtx { fn sign_queue(&self) -> Arc>; fn cipher_pk(&self) -> &hpke::PublicKey; fn sign_pk(&self) -> near_crypto::PublicKey; + fn sign_sk(&self) -> &near_crypto::SecretKey; } #[derive(thiserror::Error, Debug)] @@ -126,6 +127,7 @@ impl ConsensusProtocol for StartedState { epoch, ), )), + messages: Default::default(), })) } else { Ok(NodeState::Joining(JoiningState { public_key })) @@ -170,6 +172,7 @@ impl ConsensusProtocol for StartedState { participants, threshold: contract_state.threshold, protocol: Arc::new(RwLock::new(protocol)), + messages: Default::default(), })) } else { tracing::info!("we are not a part of the initial participant set, waiting for key generation to complete"); @@ -314,6 +317,7 @@ impl ConsensusProtocol for WaitingForConsensusState { self.public_key, self.epoch, ))), + messages: self.messages, })) } }, @@ -626,5 +630,6 @@ fn start_resharing( threshold: contract_state.threshold, public_key: contract_state.public_key, protocol: Arc::new(RwLock::new(protocol)), + messages: Default::default(), })) } diff --git a/node/src/protocol/cryptography.rs b/node/src/protocol/cryptography.rs index 930ef4ed7..2863a0c93 100644 --- a/node/src/protocol/cryptography.rs +++ b/node/src/protocol/cryptography.rs @@ -1,13 +1,14 @@ use std::sync::PoisonError; use super::state::{GeneratingState, NodeState, ResharingState, RunningState}; -use crate::http_client::{self, SendError}; +use crate::http_client::SendError; use crate::protocol::message::{GeneratingMessage, ResharingMessage}; use crate::protocol::state::WaitingForConsensusState; use crate::protocol::MpcMessage; use async_trait::async_trait; use cait_sith::protocol::{Action, InitializationError, Participant, ProtocolError}; use k256::elliptic_curve::group::GroupEncoding; +use mpc_keys::hpke; use near_crypto::InMemorySigner; use near_primitives::types::AccountId; @@ -17,6 +18,7 @@ pub trait CryptographicCtx { fn rpc_client(&self) -> &near_fetch::Client; fn signer(&self) -> &InMemorySigner; fn mpc_contract_id(&self) -> &AccountId; + fn cipher_pk(&self) -> &hpke::PublicKey; fn sign_sk(&self) -> &near_crypto::SecretKey; } @@ -71,56 +73,69 @@ impl CryptographicProtocol for GeneratingState { Action::Wait => { drop(protocol); tracing::debug!("waiting"); + if let Err(err) = self + .messages + .write() + .await + .send_encrypted(ctx.me(), ctx.sign_sk(), ctx.http_client()) + .await + { + tracing::warn!(?err, participants = ?self.participants, "generating: failed to send encrypted message"); + } + return Ok(NodeState::Generating(self)); } Action::SendMany(m) => { tracing::debug!("sending a message to many participants"); + let mut messages = self.messages.write().await; for (p, info) in &self.participants { if p == &ctx.me() { // Skip yourself, cait-sith never sends messages to oneself continue; } - http_client::send_encrypted( - ctx.me(), - &info.cipher_pk, - ctx.sign_sk(), - ctx.http_client(), - info.url.clone(), + messages.push( + info.clone(), MpcMessage::Generating(GeneratingMessage { from: ctx.me(), data: m.clone(), }), - ) - .await?; + ); } } Action::SendPrivate(to, m) => { tracing::debug!("sending a private message to {to:?}"); let info = self.fetch_participant(&to)?; - http_client::send_encrypted( - ctx.me(), - &info.cipher_pk, - ctx.sign_sk(), - ctx.http_client(), - info.url.clone(), + self.messages.write().await.push( + info.clone(), MpcMessage::Generating(GeneratingMessage { from: ctx.me(), data: m.clone(), }), - ) - .await? + ); } Action::Return(r) => { tracing::info!( public_key = hex::encode(r.public_key.to_bytes()), "successfully completed key generation" ); + // Send any leftover messages + if let Err(err) = self + .messages + .write() + .await + .send_encrypted(ctx.me(), ctx.sign_sk(), ctx.http_client()) + .await + { + tracing::warn!(?err, participants = ?self.participants, "generating: failed to send encrypted message"); + } + return Ok(NodeState::WaitingForConsensus(WaitingForConsensusState { epoch: 0, participants: self.participants, threshold: self.threshold, private_share: r.private_share, public_key: r.public_key, + messages: self.messages, })); } } @@ -128,6 +143,27 @@ impl CryptographicProtocol for GeneratingState { } } +#[async_trait] +impl CryptographicProtocol for WaitingForConsensusState { + async fn progress( + mut self, + ctx: C, + ) -> Result { + if let Err(err) = self + .messages + .write() + .await + .send_encrypted(ctx.me(), ctx.sign_sk(), ctx.http_client()) + .await + { + tracing::warn!(?err, participants = ?self.participants, "waiting: failed to send encrypted message"); + } + + // Wait for ConsensusProtocol step to advance state + Ok(NodeState::WaitingForConsensus(self)) + } +} + #[async_trait] impl CryptographicProtocol for ResharingState { async fn progress( @@ -142,59 +178,72 @@ impl CryptographicProtocol for ResharingState { Action::Wait => { drop(protocol); tracing::debug!("waiting"); + if let Err(err) = self + .messages + .write() + .await + .send_encrypted(ctx.me(), ctx.sign_sk(), ctx.http_client()) + .await + { + tracing::warn!(?err, new = ?self.new_participants, old = ?self.old_participants, "resharing(wait): failed to send encrypted message"); + } + return Ok(NodeState::Resharing(self)); } Action::SendMany(m) => { tracing::debug!("sending a message to all participants"); + let mut messages = self.messages.write().await; for (p, info) in &self.new_participants { if p == &ctx.me() { // Skip yourself, cait-sith never sends messages to oneself continue; } - http_client::send_encrypted( - ctx.me(), - &info.cipher_pk, - ctx.sign_sk(), - ctx.http_client(), - info.url.clone(), + + messages.push( + info.clone(), MpcMessage::Resharing(ResharingMessage { epoch: self.old_epoch, from: ctx.me(), data: m.clone(), }), ) - .await?; } } Action::SendPrivate(to, m) => { tracing::debug!("sending a private message to {to:?}"); match self.new_participants.get(&to) { - Some(info) => { - http_client::send_encrypted( - ctx.me(), - &info.cipher_pk, - ctx.sign_sk(), - ctx.http_client(), - info.url.clone(), - MpcMessage::Resharing(ResharingMessage { - epoch: self.old_epoch, - from: ctx.me(), - data: m.clone(), - }), - ) - .await?; - } + Some(info) => self.messages.write().await.push( + info.clone(), + MpcMessage::Resharing(ResharingMessage { + epoch: self.old_epoch, + from: ctx.me(), + data: m.clone(), + }), + ), None => return Err(CryptographicError::UnknownParticipant(to)), } } Action::Return(private_share) => { tracing::debug!("successfully completed key reshare"); + + // Send any leftover messages. + if let Err(err) = self + .messages + .write() + .await + .send_encrypted(ctx.me(), ctx.sign_sk(), ctx.http_client()) + .await + { + tracing::warn!(?err, new = ?self.new_participants, old = ?self.old_participants, "resharing(return): failed to send encrypted message"); + } + return Ok(NodeState::WaitingForConsensus(WaitingForConsensusState { epoch: self.old_epoch + 1, participants: self.new_participants, threshold: self.threshold, private_share, public_key: self.public_key, + messages: self.messages, })); } } @@ -208,21 +257,22 @@ impl CryptographicProtocol for RunningState { mut self, ctx: C, ) -> Result { + let mut messages = self.messages.write().await; + // Try sending any leftover messages donated to RunningState. + if let Err(err) = messages + .send_encrypted(ctx.me(), ctx.sign_sk(), ctx.http_client()) + .await + { + tracing::warn!(?err, participants = ?self.participants, "running(pre): failed to send encrypted message"); + } + let mut triple_manager = self.triple_manager.write().await; if triple_manager.my_len() < 2 && triple_manager.potential_len() < 10 { triple_manager.generate()?; } for (p, msg) in triple_manager.poke()? { let info = self.fetch_participant(&p)?; - http_client::send_encrypted( - ctx.me(), - &info.cipher_pk, - ctx.sign_sk(), - ctx.http_client(), - info.url.clone(), - MpcMessage::Triple(msg), - ) - .await?; + messages.push(info.clone(), MpcMessage::Triple(msg)); } let mut presignature_manager = self.presignature_manager.write().await; @@ -244,15 +294,7 @@ impl CryptographicProtocol for RunningState { drop(triple_manager); for (p, msg) in presignature_manager.poke()? { let info = self.fetch_participant(&p)?; - http_client::send_encrypted( - ctx.me(), - &info.cipher_pk, - ctx.sign_sk(), - ctx.http_client(), - info.url.clone(), - MpcMessage::Presignature(msg), - ) - .await?; + messages.push(info.clone(), MpcMessage::Presignature(msg)); } let mut sign_queue = self.sign_queue.write().await; @@ -279,20 +321,19 @@ impl CryptographicProtocol for RunningState { drop(presignature_manager); for (p, msg) in signature_manager.poke()? { let info = self.participants.get(&p).unwrap(); - http_client::send_encrypted( - ctx.me(), - &info.cipher_pk, - ctx.sign_sk(), - ctx.http_client(), - info.url.clone(), - MpcMessage::Signature(msg), - ) - .await?; + messages.push(info.clone(), MpcMessage::Signature(msg)); } signature_manager .publish(ctx.rpc_client(), ctx.signer(), ctx.mpc_contract_id()) .await?; drop(signature_manager); + if let Err(err) = messages + .send_encrypted(ctx.me(), ctx.sign_sk(), ctx.http_client()) + .await + { + tracing::warn!(?err, participants = ?self.participants, "running(post): failed to send encrypted message"); + } + drop(messages); Ok(NodeState::Running(self)) } @@ -308,6 +349,7 @@ impl CryptographicProtocol for NodeState { NodeState::Generating(state) => state.progress(ctx).await, NodeState::Resharing(state) => state.progress(ctx).await, NodeState::Running(state) => state.progress(ctx).await, + NodeState::WaitingForConsensus(state) => state.progress(ctx).await, _ => Ok(self), } } diff --git a/node/src/protocol/mod.rs b/node/src/protocol/mod.rs index 7e26d1519..56977c4e5 100644 --- a/node/src/protocol/mod.rs +++ b/node/src/protocol/mod.rs @@ -1,10 +1,10 @@ -mod consensus; mod contract; mod cryptography; mod presignature; mod signature; mod triple; +pub mod consensus; pub mod message; pub mod state; @@ -82,6 +82,10 @@ impl ConsensusCtx for &Ctx { fn sign_pk(&self) -> near_crypto::PublicKey { self.sign_sk.public_key() } + + fn sign_sk(&self) -> &near_crypto::SecretKey { + &self.sign_sk + } } impl CryptographicCtx for &Ctx { @@ -105,6 +109,10 @@ impl CryptographicCtx for &Ctx { &self.mpc_contract_id } + fn cipher_pk(&self) -> &hpke::PublicKey { + &self.cipher_pk + } + fn sign_sk(&self) -> &near_crypto::SecretKey { &self.sign_sk } diff --git a/node/src/protocol/state.rs b/node/src/protocol/state.rs index 2dac5d8a2..493a75403 100644 --- a/node/src/protocol/state.rs +++ b/node/src/protocol/state.rs @@ -3,6 +3,7 @@ use super::presignature::PresignatureManager; use super::signature::SignatureManager; use super::triple::TripleManager; use super::SignQueue; +use crate::http_client::MessageQueue; use crate::protocol::ParticipantInfo; use crate::types::{KeygenProtocol, PrivateKeyShare, PublicKey, ReshareProtocol}; use cait_sith::protocol::Participant; @@ -25,6 +26,7 @@ pub struct GeneratingState { pub participants: BTreeMap, pub threshold: usize, pub protocol: KeygenProtocol, + pub messages: Arc>, } impl GeneratingState { @@ -43,6 +45,7 @@ pub struct WaitingForConsensusState { pub threshold: usize, pub private_share: PrivateKeyShare, pub public_key: PublicKey, + pub messages: Arc>, } impl WaitingForConsensusState { @@ -65,6 +68,7 @@ pub struct RunningState { pub triple_manager: Arc>, pub presignature_manager: Arc>, pub signature_manager: Arc>, + pub messages: Arc>, } impl RunningState { @@ -84,6 +88,7 @@ pub struct ResharingState { pub threshold: usize, pub public_key: PublicKey, pub protocol: ReshareProtocol, + pub messages: Arc>, } impl ResharingState {