From fa5b17a0e1e0b09b2643c252c00628208b2b9c86 Mon Sep 17 00:00:00 2001 From: Phuong Nguyen Date: Fri, 29 Mar 2024 14:39:17 -0700 Subject: [PATCH 1/3] Added msg_multi --- node/src/http_client.rs | 128 +++++++++++++++++++++++++---------- node/src/protocol/message.rs | 4 +- node/src/web/mod.rs | 31 ++++++++- 3 files changed, 126 insertions(+), 37 deletions(-) diff --git a/node/src/http_client.rs b/node/src/http_client.rs index 7a64c4e21..3964b7a62 100644 --- a/node/src/http_client.rs +++ b/node/src/http_client.rs @@ -2,7 +2,7 @@ use crate::protocol::contract::primitives::{ParticipantInfo, Participants}; use crate::protocol::message::SignedMessage; use crate::protocol::MpcMessage; use cait_sith::protocol::Participant; -use mpc_keys::hpke; +use mpc_keys::hpke::Ciphered; use reqwest::{Client, IntoUrl}; use std::collections::{HashMap, HashSet, VecDeque}; use std::str::Utf8Error; @@ -33,26 +33,21 @@ pub enum SendError { ParticipantNotAlive(String), } -async fn send_encrypted( +async fn send_encrypted_multi( from: Participant, - cipher_pk: &hpke::PublicKey, - sign_sk: &near_crypto::SecretKey, client: &Client, url: U, - message: &MpcMessage, + message: Vec, ) -> Result<(), SendError> { - let encrypted = SignedMessage::encrypt(message, from, sign_sk, cipher_pk) - .map_err(|err| SendError::EncryptionError(err.to_string()))?; - let _span = tracing::info_span!("message_request"); let mut url = url.into_url()?; - url.set_path("msg"); + url.set_path("msg_multi"); tracing::debug!(?from, to = %url, "making http request: sending encrypted message"); let action = || async { let response = client .post(url.clone()) .header("content-type", "application/json") - .json(&encrypted) + .json(&message) .send() .await .map_err(SendError::ReqwestClientError)?; @@ -111,8 +106,11 @@ impl MessageQueue { let mut failed = VecDeque::new(); let mut errors = Vec::new(); let mut participant_counter = HashMap::new(); + + let outer = Instant::now(); + let uncompacted = self.deque.len(); + let mut encrypted = HashMap::new(); while let Some((info, msg, instant)) = self.deque.pop_front() { - let account_id = info.account_id.clone(); if !participants.contains_key(&Participant::from(info.id)) { if instant.elapsed() > message_type_to_timeout(&msg) { errors.push(SendError::Timeout(format!( @@ -125,35 +123,72 @@ impl MessageQueue { failed.push_back((info, msg, instant)); continue; } + if instant.elapsed() > message_type_to_timeout(&msg) { + errors.push(SendError::Timeout(format!( + "message has timed out: {info:?}" + ))); + continue; + } - let start = Instant::now(); - crate::metrics::NUM_SEND_ENCRYPTED_TOTAL - .with_label_values(&[&account_id.as_ref()]) - .inc(); - if let Err(err) = - send_encrypted(from, &info.cipher_pk, sign_sk, client, &info.url, &msg).await - { - crate::metrics::NUM_SEND_ENCRYPTED_FAILURE - .with_label_values(&[&account_id.as_ref()]) - .inc(); - crate::metrics::FAILED_SEND_ENCRYPTED_LATENCY - .with_label_values(&[&account_id.as_ref()]) - .observe(start.elapsed().as_millis() as f64); - if instant.elapsed() > message_type_to_timeout(&msg) { - errors.push(SendError::Timeout(format!( - "message has timed out: {err:?}" - ))); + let encrypted_msg = match SignedMessage::encrypt(&msg, from, sign_sk, &info.cipher_pk) { + Ok(encrypted) => encrypted, + Err(err) => { + errors.push(SendError::EncryptionError(err.to_string())); continue; } + }; + let (encrypted, msgs) = encrypted + .entry(info.id) + .or_insert_with(|| (Vec::new(), Vec::new())); - failed.push_back((info, msg, instant)); - errors.push(err); - } else { - crate::metrics::SEND_ENCRYPTED_LATENCY - .with_label_values(&[&account_id.as_ref()]) - .observe(start.elapsed().as_millis() as f64); + encrypted.push(encrypted_msg); + msgs.push((info, msg, instant)); + } + + let mut compacted = 0; + for (id, (encrypted, msgs)) in encrypted { + let partitioned = partition_ciphered(encrypted); + compacted += partitioned.len(); + + for encrypted_partition in partitioned { + let info = participants.get(&Participant::from(id)).unwrap(); + let account_id = &info.account_id; + + let start = Instant::now(); + crate::metrics::NUM_SEND_ENCRYPTED_TOTAL + .with_label_values(&[account_id.as_ref()]) + .inc(); + if let Err(err) = + send_encrypted_multi(from, client, &info.url, encrypted_partition).await + { + crate::metrics::NUM_SEND_ENCRYPTED_FAILURE + .with_label_values(&[account_id.as_ref()]) + .inc(); + crate::metrics::FAILED_SEND_ENCRYPTED_LATENCY + .with_label_values(&[account_id.as_ref()]) + .observe(start.elapsed().as_millis() as f64); + + for (info, msg, instant) in msgs { + failed.push_back((info, msg, instant)); + } + errors.push(err); + break; + } else { + crate::metrics::SEND_ENCRYPTED_LATENCY + .with_label_values(&[account_id.as_ref()]) + .observe(start.elapsed().as_millis() as f64); + } } } + + if uncompacted > 0 { + tracing::debug!( + uncompacted, + compacted, + "{from:?} sent messages in {:?};", + outer.elapsed() + ); + } // only add the participant count if it hasn't been seen before. let counts = format!("{participant_counter:?}"); if !participant_counter.is_empty() && self.seen_counts.insert(counts.clone()) { @@ -168,6 +203,31 @@ impl MessageQueue { } } +fn partition_ciphered(encrypted: Vec) -> Vec> { + let mut result: Vec> = Vec::new(); + let mut current_partition: Vec = Vec::new(); + let mut current_size: usize = 0; + + for ciphered in encrypted { + let bytesize = ciphered.text.len(); + if current_size + bytesize > 256 * 1024 { + // If adding this byte vector exceeds 1MB, start a new partition + result.push(current_partition); + current_partition = Vec::new(); + current_size = 0; + } + current_partition.push(ciphered); + current_size += bytesize; + } + + if !current_partition.is_empty() { + // Add the last partition + result.push(current_partition); + } + + result +} + fn message_type_to_timeout(msg: &MpcMessage) -> Duration { match msg { MpcMessage::Generating(_) => MESSAGE_TIMEOUT, diff --git a/node/src/protocol/message.rs b/node/src/protocol/message.rs index c0707a317..f5a1465b6 100644 --- a/node/src/protocol/message.rs +++ b/node/src/protocol/message.rs @@ -376,12 +376,12 @@ where T: Serialize, { pub fn encrypt( - msg: T, + msg: &T, from: Participant, sign_sk: &near_crypto::SecretKey, cipher_pk: &hpke::PublicKey, ) -> Result { - let msg = serde_json::to_vec(&msg)?; + let msg = serde_json::to_vec(msg)?; let sig = sign_sk.sign(&msg); let msg = SignedMessage { msg, sig, from }; let msg = serde_json::to_vec(&msg)?; diff --git a/node/src/web/mod.rs b/node/src/web/mod.rs index 2b0f5c08e..a798d46bf 100644 --- a/node/src/web/mod.rs +++ b/node/src/web/mod.rs @@ -45,6 +45,7 @@ pub async fn run( }), ) .route("/msg", post(msg)) + .route("/msg_multi", post(msg_multi)) .route("/state", get(state)) .route("/metrics", get(metrics)) .layer(Extension(Arc::new(axum_state))); @@ -86,7 +87,35 @@ async fn msg( Ok(()) } -#[derive(Serialize, Deserialize)] +#[tracing::instrument(level = "debug", skip_all)] +async fn msg_multi( + Extension(state): Extension>, + WithRejection(Json(encrypted), _): WithRejection>, Error>, +) -> Result<()> { + for encrypted in encrypted.into_iter() { + let message = match SignedMessage::decrypt( + &state.cipher_sk, + &state.protocol_state, + encrypted, + ) + .await + { + Ok(msg) => msg, + Err(err) => { + tracing::error!(?err, "failed to decrypt or verify an encrypted message"); + return Err(err.into()); + } + }; + + if let Err(err) = state.sender.send(message).await { + tracing::error!(?err, "failed to forward an encrypted protocol message"); + return Err(err.into()); + } + } + Ok(()) +} + +#[derive(Debug, Serialize, Deserialize)] #[serde(tag = "type")] #[serde(rename_all = "snake_case")] pub enum StateView { From 28d22949d45bf37b3c6626612bec7718f75ad611 Mon Sep 17 00:00:00 2001 From: Phuong Nguyen Date: Fri, 29 Mar 2024 17:30:09 -0700 Subject: [PATCH 2/3] Made msg_multi just msg --- node/src/http_client.rs | 31 +++++++++++++------------------ node/src/protocol/message.rs | 12 ++++++++++++ node/src/web/mod.rs | 22 ---------------------- 3 files changed, 25 insertions(+), 40 deletions(-) diff --git a/node/src/http_client.rs b/node/src/http_client.rs index 3964b7a62..1a91089ec 100644 --- a/node/src/http_client.rs +++ b/node/src/http_client.rs @@ -33,7 +33,7 @@ pub enum SendError { ParticipantNotAlive(String), } -async fn send_encrypted_multi( +async fn send_encrypted( from: Participant, client: &Client, url: U, @@ -41,7 +41,7 @@ async fn send_encrypted_multi( ) -> Result<(), SendError> { let _span = tracing::info_span!("message_request"); let mut url = url.into_url()?; - url.set_path("msg_multi"); + url.set_path("msg"); tracing::debug!(?from, to = %url, "making http request: sending encrypted message"); let action = || async { let response = client @@ -111,25 +111,20 @@ impl MessageQueue { let uncompacted = self.deque.len(); let mut encrypted = HashMap::new(); while let Some((info, msg, instant)) = self.deque.pop_front() { - if !participants.contains_key(&Participant::from(info.id)) { - if instant.elapsed() > message_type_to_timeout(&msg) { - errors.push(SendError::Timeout(format!( - "message has timed out on offline node: {info:?}", - ))); - continue; - } - let counter = participant_counter.entry(info.id).or_insert(0); - *counter += 1; - failed.push_back((info, msg, instant)); - continue; - } if instant.elapsed() > message_type_to_timeout(&msg) { errors.push(SendError::Timeout(format!( - "message has timed out: {info:?}" + "{} message has timed out: {info:?}", + msg.typename(), ))); continue; } + if !participants.contains_key(&Participant::from(info.id)) { + let counter = participant_counter.entry(info.id).or_insert(0); + *counter += 1; + failed.push_back((info, msg, instant)); + continue; + } let encrypted_msg = match SignedMessage::encrypt(&msg, from, sign_sk, &info.cipher_pk) { Ok(encrypted) => encrypted, Err(err) => { @@ -158,8 +153,7 @@ impl MessageQueue { crate::metrics::NUM_SEND_ENCRYPTED_TOTAL .with_label_values(&[account_id.as_ref()]) .inc(); - if let Err(err) = - send_encrypted_multi(from, client, &info.url, encrypted_partition).await + if let Err(err) = send_encrypted(from, client, &info.url, encrypted_partition).await { crate::metrics::NUM_SEND_ENCRYPTED_FAILURE .with_label_values(&[account_id.as_ref()]) @@ -168,6 +162,7 @@ impl MessageQueue { .with_label_values(&[account_id.as_ref()]) .observe(start.elapsed().as_millis() as f64); + // since we failed, put back all the messages related to this for (info, msg, instant) in msgs { failed.push_back((info, msg, instant)); } @@ -211,7 +206,7 @@ fn partition_ciphered(encrypted: Vec) -> Vec> { for ciphered in encrypted { let bytesize = ciphered.text.len(); if current_size + bytesize > 256 * 1024 { - // If adding this byte vector exceeds 1MB, start a new partition + // If adding this byte vector exceeds 256kb, start a new partition result.push(current_partition); current_partition = Vec::new(); current_size = 0; diff --git a/node/src/protocol/message.rs b/node/src/protocol/message.rs index f5a1465b6..f2ce62911 100644 --- a/node/src/protocol/message.rs +++ b/node/src/protocol/message.rs @@ -77,6 +77,18 @@ pub enum MpcMessage { Signature(SignatureMessage), } +impl MpcMessage { + pub const fn typename(&self) -> &'static str { + match self { + MpcMessage::Generating(_) => "Generating", + MpcMessage::Resharing(_) => "Resharing", + MpcMessage::Triple(_) => "Triple", + MpcMessage::Presignature(_) => "Presignature", + MpcMessage::Signature(_) => "Signature", + } + } +} + #[derive(Default)] pub struct MpcMessageQueue { generating: VecDeque, diff --git a/node/src/web/mod.rs b/node/src/web/mod.rs index a798d46bf..e672f5ceb 100644 --- a/node/src/web/mod.rs +++ b/node/src/web/mod.rs @@ -45,7 +45,6 @@ pub async fn run( }), ) .route("/msg", post(msg)) - .route("/msg_multi", post(msg_multi)) .route("/state", get(state)) .route("/metrics", get(metrics)) .layer(Extension(Arc::new(axum_state))); @@ -68,27 +67,6 @@ pub struct MsgRequest { #[tracing::instrument(level = "debug", skip_all)] async fn msg( - Extension(state): Extension>, - WithRejection(Json(encrypted), _): WithRejection, Error>, -) -> Result<()> { - let message = - match SignedMessage::decrypt(&state.cipher_sk, &state.protocol_state, encrypted).await { - Ok(msg) => msg, - Err(err) => { - tracing::error!(?err, "failed to decrypt or verify an encrypted message"); - return Err(err.into()); - } - }; - - if let Err(err) = state.sender.send(message).await { - tracing::error!(?err, "failed to forward an encrypted protocol message"); - return Err(err.into()); - } - Ok(()) -} - -#[tracing::instrument(level = "debug", skip_all)] -async fn msg_multi( Extension(state): Extension>, WithRejection(Json(encrypted), _): WithRejection>, Error>, ) -> Result<()> { From ceb6105259b2ccb958de69a8060e8fd50f454809 Mon Sep 17 00:00:00 2001 From: Phuong Nguyen Date: Fri, 29 Mar 2024 17:59:11 -0700 Subject: [PATCH 3/3] Made partitioned messages more resilient --- node/src/http_client.rs | 36 +++++++++++++++++------------------- 1 file changed, 17 insertions(+), 19 deletions(-) diff --git a/node/src/http_client.rs b/node/src/http_client.rs index 1a91089ec..369ab58e0 100644 --- a/node/src/http_client.rs +++ b/node/src/http_client.rs @@ -132,20 +132,15 @@ impl MessageQueue { continue; } }; - let (encrypted, msgs) = encrypted - .entry(info.id) - .or_insert_with(|| (Vec::new(), Vec::new())); - - encrypted.push(encrypted_msg); - msgs.push((info, msg, instant)); + let encrypted = encrypted.entry(info.id).or_insert_with(Vec::new); + encrypted.push((encrypted_msg, (info, msg, instant))); } let mut compacted = 0; - for (id, (encrypted, msgs)) in encrypted { - let partitioned = partition_ciphered(encrypted); - compacted += partitioned.len(); - - for encrypted_partition in partitioned { + for (id, encrypted) in encrypted { + for partition in partition_ciphered_256kb(encrypted) { + let (encrypted_partition, msgs): (Vec<_>, Vec<_>) = partition.into_iter().unzip(); + // guaranteed to unwrap due to our previous loop check: let info = participants.get(&Participant::from(id)).unwrap(); let account_id = &info.account_id; @@ -163,12 +158,10 @@ impl MessageQueue { .observe(start.elapsed().as_millis() as f64); // since we failed, put back all the messages related to this - for (info, msg, instant) in msgs { - failed.push_back((info, msg, instant)); - } + failed.extend(msgs); errors.push(err); - break; } else { + compacted += msgs.len(); crate::metrics::SEND_ENCRYPTED_LATENCY .with_label_values(&[account_id.as_ref()]) .observe(start.elapsed().as_millis() as f64); @@ -198,13 +191,18 @@ impl MessageQueue { } } -fn partition_ciphered(encrypted: Vec) -> Vec> { - let mut result: Vec> = Vec::new(); - let mut current_partition: Vec = Vec::new(); +/// Encrypted message with a reference to the old message. Only the ciphered portion of this +/// type will be sent over the wire, while the original message is kept just in case things +/// go wrong somewhere and the message needs to be requeued to be sent later. +type EncryptedMessage = (Ciphered, (ParticipantInfo, MpcMessage, Instant)); + +fn partition_ciphered_256kb(encrypted: Vec) -> Vec> { + let mut result = Vec::new(); + let mut current_partition = Vec::new(); let mut current_size: usize = 0; for ciphered in encrypted { - let bytesize = ciphered.text.len(); + let bytesize = ciphered.0.text.len(); if current_size + bytesize > 256 * 1024 { // If adding this byte vector exceeds 256kb, start a new partition result.push(current_partition);