Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: Added queue for sending encrypted messages #395

Merged
merged 3 commits into from
Dec 13, 2023
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
57 changes: 52 additions & 5 deletions node/src/http_client.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,10 @@
use crate::protocol::message::SignedMessage;
use crate::protocol::MpcMessage;
use crate::protocol::ParticipantInfo;
use cait_sith::protocol::Participant;
use mpc_keys::hpke;
use reqwest::{Client, IntoUrl};
use std::collections::VecDeque;
use std::str::Utf8Error;
use tokio_retry::strategy::{jitter, ExponentialBackoff};
use tokio_retry::Retry;
Expand All @@ -23,17 +25,17 @@ pub enum SendError {
EncryptionError(String),
}

pub async fn send_encrypted<U: IntoUrl>(
participant: Participant,
async fn send_encrypted<U: IntoUrl>(
from: Participant,
cipher_pk: &hpke::PublicKey,
sign_sk: &near_crypto::SecretKey,
client: &Client,
url: U,
message: MpcMessage,
message: &MpcMessage,
) -> Result<(), SendError> {
let encrypted = SignedMessage::encrypt(message, participant, sign_sk, cipher_pk)
let encrypted = SignedMessage::encrypt(message, from, sign_sk, cipher_pk)
.map_err(|err| SendError::EncryptionError(err.to_string()))?;
tracing::debug!(?participant, ciphertext = ?encrypted.text, "sending encrypted");
tracing::debug!(?from, ciphertext = ?encrypted.text, "sending encrypted");

let _span = tracing::info_span!("message_request");
let mut url = url.into_url().unwrap();
Expand Down Expand Up @@ -103,6 +105,51 @@ pub async fn join<U: IntoUrl>(client: &Client, url: U, me: &Participant) -> Resu
Retry::spawn(retry_strategy, action).await
}

// TODO: add in retry logic either in struct or at call site.
// TODO: add check for participant list to see if the messages to be sent are still valid.
#[derive(Default)]
pub struct MessageQueue {
deque: VecDeque<(ParticipantInfo, MpcMessage)>,
}

impl MessageQueue {
pub fn len(&self) -> usize {
self.deque.len()
}

pub fn is_empty(&self) -> bool {
self.deque.is_empty()
}

pub fn push(&mut self, info: ParticipantInfo, msg: MpcMessage) {
self.deque.push_back((info, msg));
}

pub async fn send_encrypted(
&mut self,
from: Participant,
sign_sk: &near_crypto::SecretKey,
client: &Client,
) -> Result<(), SendError> {
while let Some((info, msg)) = self.deque.front() {
let result =
send_encrypted(from, &info.cipher_pk, sign_sk, client, &info.url, msg).await;

match result {
Ok(()) => {
let _ = self.deque.pop_front();
}
Err(err) => {
tracing::warn!(?err, "failed to send encrypted message");
return Err(err);
}
}
}

Ok(())
}
}

#[cfg(test)]
mod tests {
use crate::protocol::message::GeneratingMessage;
Expand Down
5 changes: 5 additions & 0 deletions node/src/protocol/consensus.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ pub trait ConsensusCtx {
fn sign_queue(&self) -> Arc<RwLock<SignQueue>>;
fn cipher_pk(&self) -> &hpke::PublicKey;
fn sign_pk(&self) -> near_crypto::PublicKey;
fn sign_sk(&self) -> &near_crypto::SecretKey;
}

#[derive(thiserror::Error, Debug)]
Expand Down Expand Up @@ -126,6 +127,7 @@ impl ConsensusProtocol for StartedState {
epoch,
),
)),
messages: Default::default(),
}))
} else {
Ok(NodeState::Joining(JoiningState { public_key }))
Expand Down Expand Up @@ -170,6 +172,7 @@ impl ConsensusProtocol for StartedState {
participants,
threshold: contract_state.threshold,
protocol: Arc::new(RwLock::new(protocol)),
messages: Default::default(),
}))
} else {
tracing::info!("we are not a part of the initial participant set, waiting for key generation to complete");
Expand Down Expand Up @@ -314,6 +317,7 @@ impl ConsensusProtocol for WaitingForConsensusState {
self.public_key,
self.epoch,
))),
messages: self.messages,
}))
}
},
Expand Down Expand Up @@ -626,5 +630,6 @@ fn start_resharing<C: ConsensusCtx>(
threshold: contract_state.threshold,
public_key: contract_state.public_key,
protocol: Arc::new(RwLock::new(protocol)),
messages: Default::default(),
}))
}
Loading
Loading