Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

chore: Added /msg_multi for sending multiple messages & metrics for protocol loop iter latency #538

Closed
wants to merge 3 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
102 changes: 83 additions & 19 deletions node/src/http_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ use crate::protocol::contract::primitives::{ParticipantInfo, Participants};
use crate::protocol::message::SignedMessage;
use crate::protocol::MpcMessage;
use cait_sith::protocol::Participant;
use mpc_keys::hpke;
use mpc_keys::hpke::Ciphered;
use reqwest::{Client, IntoUrl};
use std::collections::{HashMap, HashSet, VecDeque};
use std::str::Utf8Error;
Expand Down Expand Up @@ -33,26 +33,21 @@ pub enum SendError {
ParticipantNotAlive(String),
}

async fn send_encrypted<U: IntoUrl>(
async fn send_encrypted_multi<U: IntoUrl>(
from: Participant,
cipher_pk: &hpke::PublicKey,
sign_sk: &near_crypto::SecretKey,
client: &Client,
url: U,
message: &MpcMessage,
message: Vec<Ciphered>,
) -> Result<(), SendError> {
let encrypted = SignedMessage::encrypt(message, from, sign_sk, cipher_pk)
.map_err(|err| SendError::EncryptionError(err.to_string()))?;

let _span = tracing::info_span!("message_request");
let mut url = url.into_url()?;
url.set_path("msg");
url.set_path("msg_multi");
tracing::debug!(?from, to = %url, "making http request: sending encrypted message");
let action = || async {
let response = client
.post(url.clone())
.header("content-type", "application/json")
.json(&encrypted)
.json(&message)
.send()
.await
.map_err(SendError::ReqwestClientError)?;
Expand Down Expand Up @@ -111,6 +106,10 @@ impl MessageQueue {
let mut failed = VecDeque::new();
let mut errors = Vec::new();
let mut participant_counter = HashMap::new();

let outer = Instant::now();
let uncompacted = self.deque.len();
let mut encrypted = HashMap::new();
while let Some((info, msg, instant)) = self.deque.pop_front() {
if !participants.contains_key(&Participant::from(info.id)) {
if instant.elapsed() > message_type_to_timeout(&msg) {
Expand All @@ -124,21 +123,61 @@ impl MessageQueue {
failed.push_back((info, msg, instant));
continue;
}
if instant.elapsed() > message_type_to_timeout(&msg) {
errors.push(SendError::Timeout(format!(
"message has timed out: {info:?}"
)));
continue;
}

if let Err(err) =
send_encrypted(from, &info.cipher_pk, sign_sk, client, &info.url, &msg).await
{
if instant.elapsed() > message_type_to_timeout(&msg) {
errors.push(SendError::Timeout(format!(
"message has timed out: {err:?}"
)));
let encrypted_msg = match SignedMessage::encrypt(&msg, from, sign_sk, &info.cipher_pk) {
Ok(encrypted) => encrypted,
Err(err) => {
errors.push(SendError::EncryptionError(err.to_string()));
continue;
}
};

failed.push_back((info, msg, instant));
errors.push(err);
let (encrypted, msgs) = encrypted
.entry(info.id)
.or_insert_with(|| (Vec::new(), Vec::new()));

encrypted.push(encrypted_msg);
msgs.push((info, msg, instant));
}

let mut compacted = 0;
for (id, (encrypted, msgs)) in encrypted {
let partitioned = partition_ciphered(encrypted);
compacted += partitioned.len();

for encrypted_partition in partitioned {
let info = participants.get(&Participant::from(id)).unwrap();
let start = Instant::now();
if let Err(err) =
send_encrypted_multi(from, client, &info.url, encrypted_partition).await
{
for (info, msg, instant) in msgs {
failed.push_back((info, msg, instant));
}
errors.push(err);
break;
}

crate::metrics::SEND_ENCRYPTED_LATENCY
.with_label_values(&[info.account_id.as_ref()])
.observe(start.elapsed().as_millis() as f64);
}
}

if uncompacted > 0 {
tracing::debug!(
uncompacted,
compacted,
"{from:?} sent messages in {:?};",
outer.elapsed()
);
}
// only add the participant count if it hasn't been seen before.
let counts = format!("{participant_counter:?}");
if !participant_counter.is_empty() && self.seen_counts.insert(counts.clone()) {
Expand All @@ -153,6 +192,31 @@ impl MessageQueue {
}
}

fn partition_ciphered(encrypted: Vec<Ciphered>) -> Vec<Vec<Ciphered>> {
let mut result: Vec<Vec<Ciphered>> = Vec::new();
let mut current_partition: Vec<Ciphered> = Vec::new();
let mut current_size: usize = 0;

for ciphered in encrypted {
let bytesize = ciphered.text.len();
if current_size + bytesize > 256 * 1024 {
// If adding this byte vector exceeds 1MB, start a new partition
result.push(current_partition);
current_partition = Vec::new();
current_size = 0;
}
current_partition.push(ciphered);
current_size += bytesize;
}

if !current_partition.is_empty() {
// Add the last partition
result.push(current_partition);
}

result
}

const fn message_type_to_timeout(msg: &MpcMessage) -> Duration {
match msg {
MpcMessage::Generating(_) => MESSAGE_TIMEOUT,
Expand Down
50 changes: 50 additions & 0 deletions node/src/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -248,6 +248,56 @@ pub(crate) static NUM_SIGN_SUCCESS_30S: Lazy<IntGaugeVec> = Lazy::new(|| {
.unwrap()
});

pub(crate) static SEND_ENCRYPTED_LATENCY: Lazy<HistogramVec> = Lazy::new(|| {
try_create_histogram_vec(
"multichain_send_encrypted_ms",
"Latency of send encrypted.",
&["node_account_id"],
Some(exponential_buckets(0.5, 1.5, 20).unwrap()),
)
.unwrap()
});

pub(crate) static PROTOCOL_LATENCY_ITER_TOTAL: Lazy<HistogramVec> = Lazy::new(|| {
try_create_histogram_vec(
"multichain_protocol_iter_total",
"Latency of multichain protocol iter, start of protocol till end of iteration",
&["node_account_id"],
Some(exponential_buckets(0.001, 3.0, 20).unwrap()),
)
.unwrap()
});

pub(crate) static PROTOCOL_LATENCY_ITER_CRYPTO: Lazy<HistogramVec> = Lazy::new(|| {
try_create_histogram_vec(
"multichain_protocol_iter_crypto",
"Latency of multichain protocol iter, start of crypto iter till end",
&["node_account_id"],
Some(exponential_buckets(0.001, 2.0, 20).unwrap()),
)
.unwrap()
});

pub(crate) static PROTOCOL_LATENCY_ITER_CONSENSUS: Lazy<HistogramVec> = Lazy::new(|| {
try_create_histogram_vec(
"multichain_protocol_iter_consensus",
"Latency of multichain protocol iter, start of consensus iter till end",
&["node_account_id"],
Some(exponential_buckets(0.001, 2.0, 20).unwrap()),
)
.unwrap()
});

pub(crate) static PROTOCOL_LATENCY_ITER_MESSAGE: Lazy<HistogramVec> = Lazy::new(|| {
try_create_histogram_vec(
"multichain_protocol_iter_message",
"Latency of multichain protocol iter, start of message iter till end",
&["node_account_id"],
Some(exponential_buckets(0.001, 2.0, 20).unwrap()),
)
.unwrap()
});

pub fn try_create_int_gauge_vec(name: &str, help: &str, labels: &[&str]) -> Result<IntGaugeVec> {
check_metric_multichain_prefix(name)?;
let opts = Opts::new(name, help);
Expand Down
4 changes: 2 additions & 2 deletions node/src/protocol/message.rs
Original file line number Diff line number Diff line change
Expand Up @@ -376,12 +376,12 @@ where
T: Serialize,
{
pub fn encrypt(
msg: T,
msg: &T,
from: Participant,
sign_sk: &near_crypto::SecretKey,
cipher_pk: &hpke::PublicKey,
) -> Result<Ciphered, CryptographicError> {
let msg = serde_json::to_vec(&msg)?;
let msg = serde_json::to_vec(msg)?;
let sig = sign_sk.sign(&msg);
let msg = SignedMessage { msg, sig, from };
let msg = serde_json::to_vec(&msg)?;
Expand Down
17 changes: 17 additions & 0 deletions node/src/protocol/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -229,6 +229,7 @@ impl MpcSignProtocol {
let mut last_state_update = Instant::now();
let mut last_pinged = Instant::now();
loop {
let protocol_time = Instant::now();
tracing::debug!("trying to advance mpc recovery protocol");
loop {
let msg_result = self.receiver.try_recv();
Expand Down Expand Up @@ -285,6 +286,7 @@ impl MpcSignProtocol {
guard.clone()
};

let crypto_time = Instant::now();
let mut state = match state.progress(&mut self).await {
Ok(state) => state,
Err(err) => {
Expand All @@ -293,7 +295,11 @@ impl MpcSignProtocol {
continue;
}
};
crate::metrics::PROTOCOL_LATENCY_ITER_CRYPTO
.with_label_values(&[&my_account_id])
.observe(crypto_time.elapsed().as_secs_f64());

let consensus_time = Instant::now();
if let Some(contract_state) = contract_state {
state = match state.advance(&mut self, contract_state).await {
Ok(state) => state,
Expand All @@ -304,12 +310,19 @@ impl MpcSignProtocol {
}
};
}
crate::metrics::PROTOCOL_LATENCY_ITER_CONSENSUS
.with_label_values(&[&my_account_id])
.observe(consensus_time.elapsed().as_secs_f64());

let message_time = Instant::now();
if let Err(err) = state.handle(&self, &mut queue).await {
tracing::info!("protocol unable to handle messages: {err:?}");
tokio::time::sleep(Duration::from_millis(100)).await;
continue;
}
crate::metrics::PROTOCOL_LATENCY_ITER_MESSAGE
.with_label_values(&[&my_account_id])
.observe(message_time.elapsed().as_secs_f64());

let sleep_ms = match state {
NodeState::Generating(_) => 500,
Expand All @@ -325,6 +338,10 @@ impl MpcSignProtocol {
let mut guard = self.state.write().await;
*guard = state;
drop(guard);

crate::metrics::PROTOCOL_LATENCY_ITER_TOTAL
.with_label_values(&[&my_account_id])
.observe(protocol_time.elapsed().as_secs_f64());
tokio::time::sleep(Duration::from_millis(sleep_ms)).await;
}
}
Expand Down
31 changes: 30 additions & 1 deletion node/src/web/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ pub async fn run(
}),
)
.route("/msg", post(msg))
.route("/msg_multi", post(msg_multi))
.route("/state", get(state))
.route("/metrics", get(metrics))
.layer(Extension(Arc::new(axum_state)));
Expand Down Expand Up @@ -86,7 +87,35 @@ async fn msg(
Ok(())
}

#[derive(Serialize, Deserialize)]
#[tracing::instrument(level = "debug", skip_all)]
async fn msg_multi(
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we merge msg and msg_multi into one function?

Extension(state): Extension<Arc<AxumState>>,
WithRejection(Json(encrypted), _): WithRejection<Json<Vec<Ciphered>>, Error>,
) -> Result<()> {
for encrypted in encrypted.into_iter() {
let message = match SignedMessage::decrypt(
&state.cipher_sk,
&state.protocol_state,
encrypted,
)
.await
{
Ok(msg) => msg,
Err(err) => {
tracing::error!(?err, "failed to decrypt or verify an encrypted message");
return Err(err.into());
}
};

if let Err(err) = state.sender.send(message).await {
tracing::error!(?err, "failed to forward an encrypted protocol message");
return Err(err.into());
}
}
Ok(())
}

#[derive(Debug, Serialize, Deserialize)]
#[serde(tag = "type")]
#[serde(rename_all = "snake_case")]
pub enum StateView {
Expand Down
Loading