From bac3c903a0109dbdc120cc3074cfb7769f42051f Mon Sep 17 00:00:00 2001 From: Phuong Nguyen Date: Wed, 27 Mar 2024 11:48:07 -0700 Subject: [PATCH 1/3] Added metrics for protocol loop iter latency --- node/src/metrics.rs | 40 ++++++++++++++++++++++++++++++++++++++++ node/src/protocol/mod.rs | 17 +++++++++++++++++ 2 files changed, 57 insertions(+) diff --git a/node/src/metrics.rs b/node/src/metrics.rs index b86a5ab9c..b64b80686 100644 --- a/node/src/metrics.rs +++ b/node/src/metrics.rs @@ -248,6 +248,46 @@ pub(crate) static NUM_SIGN_SUCCESS_30S: Lazy = Lazy::new(|| { .unwrap() }); +pub(crate) static PROTOCOL_LATENCY_ITER_TOTAL: Lazy = Lazy::new(|| { + try_create_histogram_vec( + "multichain_protocol_iter_total", + "Latency of multichain protocol iter, start of protocol till end of iteration", + &["node_account_id"], + Some(exponential_buckets(0.001, 3.0, 20).unwrap()), + ) + .unwrap() +}); + +pub(crate) static PROTOCOL_LATENCY_ITER_CRYPTO: Lazy = Lazy::new(|| { + try_create_histogram_vec( + "multichain_protocol_iter_crypto", + "Latency of multichain protocol iter, start of crypto iter till end", + &["node_account_id"], + Some(exponential_buckets(0.001, 2.0, 20).unwrap()), + ) + .unwrap() +}); + +pub(crate) static PROTOCOL_LATENCY_ITER_CONSENSUS: Lazy = Lazy::new(|| { + try_create_histogram_vec( + "multichain_protocol_iter_consensus", + "Latency of multichain protocol iter, start of consensus iter till end", + &["node_account_id"], + Some(exponential_buckets(0.001, 2.0, 20).unwrap()), + ) + .unwrap() +}); + +pub(crate) static PROTOCOL_LATENCY_ITER_MESSAGE: Lazy = Lazy::new(|| { + try_create_histogram_vec( + "multichain_protocol_iter_message", + "Latency of multichain protocol iter, start of message iter till end", + &["node_account_id"], + Some(exponential_buckets(0.001, 2.0, 20).unwrap()), + ) + .unwrap() +}); + pub fn try_create_int_gauge_vec(name: &str, help: &str, labels: &[&str]) -> Result { check_metric_multichain_prefix(name)?; let opts = Opts::new(name, help); diff --git a/node/src/protocol/mod.rs b/node/src/protocol/mod.rs index 9dad29072..16217097d 100644 --- a/node/src/protocol/mod.rs +++ b/node/src/protocol/mod.rs @@ -229,6 +229,7 @@ impl MpcSignProtocol { let mut last_state_update = Instant::now(); let mut last_pinged = Instant::now(); loop { + let protocol_time = Instant::now(); tracing::debug!("trying to advance mpc recovery protocol"); loop { let msg_result = self.receiver.try_recv(); @@ -285,6 +286,7 @@ impl MpcSignProtocol { guard.clone() }; + let crypto_time = Instant::now(); let mut state = match state.progress(&mut self).await { Ok(state) => state, Err(err) => { @@ -293,7 +295,11 @@ impl MpcSignProtocol { continue; } }; + crate::metrics::PROTOCOL_LATENCY_ITER_CRYPTO + .with_label_values(&[&my_account_id]) + .observe(crypto_time.elapsed().as_secs_f64()); + let consensus_time = Instant::now(); if let Some(contract_state) = contract_state { state = match state.advance(&mut self, contract_state).await { Ok(state) => state, @@ -304,12 +310,19 @@ impl MpcSignProtocol { } }; } + crate::metrics::PROTOCOL_LATENCY_ITER_CONSENSUS + .with_label_values(&[&my_account_id]) + .observe(consensus_time.elapsed().as_secs_f64()); + let message_time = Instant::now(); if let Err(err) = state.handle(&self, &mut queue).await { tracing::info!("protocol unable to handle messages: {err:?}"); tokio::time::sleep(Duration::from_millis(100)).await; continue; } + crate::metrics::PROTOCOL_LATENCY_ITER_MESSAGE + .with_label_values(&[&my_account_id]) + .observe(message_time.elapsed().as_secs_f64()); let sleep_ms = match state { NodeState::Generating(_) => 500, @@ -325,6 +338,10 @@ impl MpcSignProtocol { let mut guard = self.state.write().await; *guard = state; drop(guard); + + crate::metrics::PROTOCOL_LATENCY_ITER_TOTAL + .with_label_values(&[&my_account_id]) + .observe(protocol_time.elapsed().as_secs_f64()); tokio::time::sleep(Duration::from_millis(sleep_ms)).await; } } From 272bcdcecb0399d2d8e6fefe580eecfc46503a78 Mon Sep 17 00:00:00 2001 From: Phuong Nguyen Date: Wed, 27 Mar 2024 17:17:35 -0700 Subject: [PATCH 2/3] Added message compacting and new /msg_multi endpoint --- node/src/http_client.rs | 97 +++++++++++++++++++++++++++++------- node/src/protocol/message.rs | 4 +- node/src/web/mod.rs | 31 +++++++++++- 3 files changed, 110 insertions(+), 22 deletions(-) diff --git a/node/src/http_client.rs b/node/src/http_client.rs index 8dccb036a..2ccc9aa47 100644 --- a/node/src/http_client.rs +++ b/node/src/http_client.rs @@ -2,7 +2,7 @@ use crate::protocol::contract::primitives::{ParticipantInfo, Participants}; use crate::protocol::message::SignedMessage; use crate::protocol::MpcMessage; use cait_sith::protocol::Participant; -use mpc_keys::hpke; +use mpc_keys::hpke::Ciphered; use reqwest::{Client, IntoUrl}; use std::collections::{HashMap, HashSet, VecDeque}; use std::str::Utf8Error; @@ -33,26 +33,21 @@ pub enum SendError { ParticipantNotAlive(String), } -async fn send_encrypted( +async fn send_encrypted_multi( from: Participant, - cipher_pk: &hpke::PublicKey, - sign_sk: &near_crypto::SecretKey, client: &Client, url: U, - message: &MpcMessage, + message: Vec, ) -> Result<(), SendError> { - let encrypted = SignedMessage::encrypt(message, from, sign_sk, cipher_pk) - .map_err(|err| SendError::EncryptionError(err.to_string()))?; - let _span = tracing::info_span!("message_request"); let mut url = url.into_url()?; - url.set_path("msg"); + url.set_path("msg_multi"); tracing::debug!(?from, to = %url, "making http request: sending encrypted message"); let action = || async { let response = client .post(url.clone()) .header("content-type", "application/json") - .json(&encrypted) + .json(&message) .send() .await .map_err(SendError::ReqwestClientError)?; @@ -111,6 +106,10 @@ impl MessageQueue { let mut failed = VecDeque::new(); let mut errors = Vec::new(); let mut participant_counter = HashMap::new(); + + let outer = Instant::now(); + let uncompacted = self.deque.len(); + let mut encrypted = HashMap::new(); while let Some((info, msg, instant)) = self.deque.pop_front() { if !participants.contains_key(&Participant::from(info.id)) { if instant.elapsed() > message_type_to_timeout(&msg) { @@ -124,21 +123,56 @@ impl MessageQueue { failed.push_back((info, msg, instant)); continue; } + if instant.elapsed() > message_type_to_timeout(&msg) { + errors.push(SendError::Timeout(format!( + "message has timed out: {info:?}" + ))); + continue; + } - if let Err(err) = - send_encrypted(from, &info.cipher_pk, sign_sk, client, &info.url, &msg).await - { - if instant.elapsed() > message_type_to_timeout(&msg) { - errors.push(SendError::Timeout(format!( - "message has timed out: {err:?}" - ))); + let encrypted_msg = match SignedMessage::encrypt(&msg, from, sign_sk, &info.cipher_pk) { + Ok(encrypted) => encrypted, + Err(err) => { + errors.push(SendError::EncryptionError(err.to_string())); continue; } + }; - failed.push_back((info, msg, instant)); - errors.push(err); + let (encrypted, msgs) = encrypted + .entry(info.id) + .or_insert_with(|| (Vec::new(), Vec::new())); + + encrypted.push(encrypted_msg); + msgs.push((info, msg, instant)); + } + + let mut compacted = 0; + for (id, (encrypted, msgs)) in encrypted { + let partitioned = partition_ciphered(encrypted); + compacted += partitioned.len(); + + for encrypted_partition in partitioned { + let info = participants.get(&Participant::from(id)).unwrap(); + if let Err(err) = + send_encrypted_multi(from, client, &info.url, encrypted_partition).await + { + for (info, msg, instant) in msgs { + failed.push_back((info, msg, instant)); + } + errors.push(err); + break; + } } } + + if uncompacted > 0 { + tracing::debug!( + uncompacted, + compacted, + "{from:?} sent messages in {:?};", + outer.elapsed() + ); + } // only add the participant count if it hasn't been seen before. let counts = format!("{participant_counter:?}"); if !participant_counter.is_empty() && self.seen_counts.insert(counts.clone()) { @@ -153,6 +187,31 @@ impl MessageQueue { } } +fn partition_ciphered(encrypted: Vec) -> Vec> { + let mut result: Vec> = Vec::new(); + let mut current_partition: Vec = Vec::new(); + let mut current_size: usize = 0; + + for ciphered in encrypted { + let bytesize = ciphered.text.len(); + if current_size + bytesize > 256 * 1024 { + // If adding this byte vector exceeds 1MB, start a new partition + result.push(current_partition); + current_partition = Vec::new(); + current_size = 0; + } + current_partition.push(ciphered); + current_size += bytesize; + } + + if !current_partition.is_empty() { + // Add the last partition + result.push(current_partition); + } + + result +} + const fn message_type_to_timeout(msg: &MpcMessage) -> Duration { match msg { MpcMessage::Generating(_) => MESSAGE_TIMEOUT, diff --git a/node/src/protocol/message.rs b/node/src/protocol/message.rs index c0707a317..f5a1465b6 100644 --- a/node/src/protocol/message.rs +++ b/node/src/protocol/message.rs @@ -376,12 +376,12 @@ where T: Serialize, { pub fn encrypt( - msg: T, + msg: &T, from: Participant, sign_sk: &near_crypto::SecretKey, cipher_pk: &hpke::PublicKey, ) -> Result { - let msg = serde_json::to_vec(&msg)?; + let msg = serde_json::to_vec(msg)?; let sig = sign_sk.sign(&msg); let msg = SignedMessage { msg, sig, from }; let msg = serde_json::to_vec(&msg)?; diff --git a/node/src/web/mod.rs b/node/src/web/mod.rs index 2b0f5c08e..a798d46bf 100644 --- a/node/src/web/mod.rs +++ b/node/src/web/mod.rs @@ -45,6 +45,7 @@ pub async fn run( }), ) .route("/msg", post(msg)) + .route("/msg_multi", post(msg_multi)) .route("/state", get(state)) .route("/metrics", get(metrics)) .layer(Extension(Arc::new(axum_state))); @@ -86,7 +87,35 @@ async fn msg( Ok(()) } -#[derive(Serialize, Deserialize)] +#[tracing::instrument(level = "debug", skip_all)] +async fn msg_multi( + Extension(state): Extension>, + WithRejection(Json(encrypted), _): WithRejection>, Error>, +) -> Result<()> { + for encrypted in encrypted.into_iter() { + let message = match SignedMessage::decrypt( + &state.cipher_sk, + &state.protocol_state, + encrypted, + ) + .await + { + Ok(msg) => msg, + Err(err) => { + tracing::error!(?err, "failed to decrypt or verify an encrypted message"); + return Err(err.into()); + } + }; + + if let Err(err) = state.sender.send(message).await { + tracing::error!(?err, "failed to forward an encrypted protocol message"); + return Err(err.into()); + } + } + Ok(()) +} + +#[derive(Debug, Serialize, Deserialize)] #[serde(tag = "type")] #[serde(rename_all = "snake_case")] pub enum StateView { From 57ce65eb75dcadd51d8ed35b1c67791d1bfcbfab Mon Sep 17 00:00:00 2001 From: Phuong N Date: Thu, 28 Mar 2024 04:02:16 +0000 Subject: [PATCH 3/3] Added send encrypted latency metric --- node/src/http_client.rs | 5 +++++ node/src/metrics.rs | 10 ++++++++++ 2 files changed, 15 insertions(+) diff --git a/node/src/http_client.rs b/node/src/http_client.rs index 2ccc9aa47..c6291aac5 100644 --- a/node/src/http_client.rs +++ b/node/src/http_client.rs @@ -153,6 +153,7 @@ impl MessageQueue { for encrypted_partition in partitioned { let info = participants.get(&Participant::from(id)).unwrap(); + let start = Instant::now(); if let Err(err) = send_encrypted_multi(from, client, &info.url, encrypted_partition).await { @@ -162,6 +163,10 @@ impl MessageQueue { errors.push(err); break; } + + crate::metrics::SEND_ENCRYPTED_LATENCY + .with_label_values(&[info.account_id.as_ref()]) + .observe(start.elapsed().as_millis() as f64); } } diff --git a/node/src/metrics.rs b/node/src/metrics.rs index b64b80686..fb045afe2 100644 --- a/node/src/metrics.rs +++ b/node/src/metrics.rs @@ -248,6 +248,16 @@ pub(crate) static NUM_SIGN_SUCCESS_30S: Lazy = Lazy::new(|| { .unwrap() }); +pub(crate) static SEND_ENCRYPTED_LATENCY: Lazy = Lazy::new(|| { + try_create_histogram_vec( + "multichain_send_encrypted_ms", + "Latency of send encrypted.", + &["node_account_id"], + Some(exponential_buckets(0.5, 1.5, 20).unwrap()), + ) + .unwrap() +}); + pub(crate) static PROTOCOL_LATENCY_ITER_TOTAL: Lazy = Lazy::new(|| { try_create_histogram_vec( "multichain_protocol_iter_total",