Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: prevent messages from taking too long to be sent #902

Open
wants to merge 8 commits into
base: develop
Choose a base branch
from
67 changes: 37 additions & 30 deletions chain-signatures/node/src/http_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@ use std::time::{Duration, Instant};
use tokio_retry::strategy::{jitter, ExponentialBackoff};
use tokio_retry::Retry;

use near_account_id::AccountId;

#[derive(Debug, Clone, clap::Parser)]
#[group(id = "message_options")]
pub struct Options {
Expand Down Expand Up @@ -46,7 +48,7 @@ pub enum SendError {

pub async fn send_encrypted<U: IntoUrl>(
from: Participant,
client: &Client,
client: Client,
url: U,
message: Vec<Ciphered>,
request_timeout: Duration,
Expand Down Expand Up @@ -97,14 +99,16 @@ pub async fn send_encrypted<U: IntoUrl>(
pub struct MessageQueue {
deque: VecDeque<(ParticipantInfo, MpcMessage, Instant)>,
seen_counts: HashSet<String>,
account_id: AccountId,
message_options: Options,
}

impl MessageQueue {
pub fn new(options: Options) -> Self {
pub fn new(id: &AccountId, options: Options) -> Self {
Self {
deque: VecDeque::default(),
seen_counts: HashSet::default(),
deque: VecDeque::new(),
seen_counts: HashSet::new(),
account_id: id.clone(),
message_options: options,
}
}
Expand Down Expand Up @@ -162,54 +166,57 @@ impl MessageQueue {
encrypted.push((encrypted_msg, (info, msg, instant)));
}

let mut compacted = 0;
let mut tasks = tokio::task::JoinSet::new();
for (id, encrypted) in encrypted {
for partition in partition_ciphered_256kb(encrypted) {
let (encrypted_partition, msgs): (Vec<_>, Vec<_>) = partition.into_iter().unzip();
let (encrypted_partition, _msgs): (Vec<_>, Vec<_>) = partition.into_iter().unzip();
// guaranteed to unwrap due to our previous loop check:
let info = participants.get(&Participant::from(id)).unwrap();
let id = Participant::from(id);
let info = participants.get(&id).unwrap();
let account_id = &info.account_id;

let start = Instant::now();
crate::metrics::NUM_SEND_ENCRYPTED_TOTAL
.with_label_values(&[account_id.as_str()])
.inc();
if let Err(err) = send_encrypted(

tasks.spawn(send_encrypted(
from,
client,
&info.url,
client.clone(),
info.url.clone(),
encrypted_partition,
Duration::from_millis(self.message_options.timeout),
)
.await
{
crate::metrics::NUM_SEND_ENCRYPTED_FAILURE
ChaoticTempest marked this conversation as resolved.
Show resolved Hide resolved
.with_label_values(&[account_id.as_str()])
.inc();
crate::metrics::FAILED_SEND_ENCRYPTED_LATENCY
.with_label_values(&[account_id.as_str()])
.observe(start.elapsed().as_millis() as f64);

// since we failed, put back all the messages related to this
failed.extend(msgs);
));
}
}

let mut compacted = 0;
while let Some(result) = tasks.join_next().await {
match result {
Ok(result) => {
let Err(err) = result else {
compacted += 1;
continue;
};
errors.push(err);
} else {
compacted += msgs.len();
crate::metrics::SEND_ENCRYPTED_LATENCY
.with_label_values(&[account_id.as_str()])
.observe(start.elapsed().as_millis() as f64);
}
Err(err) => {
tracing::error!(?err, "message queue task failure");
}
}
}

let elapsed = outer.elapsed();
if uncompacted > 0 {
tracing::info!(
uncompacted,
compacted,
"{from:?} sent messages in {:?};",
outer.elapsed()
"{from:?} sent messages in {elapsed:?}",
);
}
crate::metrics::SEND_ENCRYPTED_LATENCY
.with_label_values(&[self.account_id.as_str()])
.observe(elapsed.as_millis() as f64);

// only add the participant count if it hasn't been seen before.
let counts = format!("{participant_counter:?}");
if !participant_counter.is_empty() && self.seen_counts.insert(counts.clone()) {
Expand Down
2 changes: 1 addition & 1 deletion chain-signatures/node/src/mesh/connection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -206,7 +206,7 @@ impl Pool {
let empty_msg: Vec<Ciphered> = Vec::new();
crate::http_client::send_encrypted(
*participant,
&self.http,
self.http.clone(),
participant_info.url.clone(),
empty_msg,
self.fetch_participant_timeout,
Expand Down
19 changes: 0 additions & 19 deletions chain-signatures/node/src/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -307,15 +307,6 @@ pub(crate) static PROTOCOL_LATENCY_ITER_MESSAGE: Lazy<HistogramVec> = Lazy::new(
.unwrap()
});

pub(crate) static NUM_SEND_ENCRYPTED_FAILURE: Lazy<CounterVec> = Lazy::new(|| {
try_create_counter_vec(
"multichain_send_encrypted_failure",
"number of successful send encrypted",
&["node_account_id"],
)
.unwrap()
});

pub(crate) static NUM_SEND_ENCRYPTED_TOTAL: Lazy<CounterVec> = Lazy::new(|| {
try_create_counter_vec(
"multichain_send_encrypted_total",
Expand All @@ -325,16 +316,6 @@ pub(crate) static NUM_SEND_ENCRYPTED_TOTAL: Lazy<CounterVec> = Lazy::new(|| {
.unwrap()
});

pub(crate) static FAILED_SEND_ENCRYPTED_LATENCY: Lazy<HistogramVec> = Lazy::new(|| {
try_create_histogram_vec(
"multichain_failed_send_encrypted_ms",
"Latency of failed send encrypted.",
&["node_account_id"],
Some(exponential_buckets(0.5, 1.5, 20).unwrap()),
)
.unwrap()
});

pub(crate) static NUM_TOTAL_HISTORICAL_SIGNATURE_GENERATORS: Lazy<CounterVec> = Lazy::new(|| {
try_create_counter_vec(
"multichain_num_total_historical_signature_generators",
Expand Down
3 changes: 3 additions & 0 deletions chain-signatures/node/src/protocol/consensus.rs
Original file line number Diff line number Diff line change
Expand Up @@ -173,6 +173,7 @@ impl ConsensusProtocol for StartedState {
),
)),
messages: Arc::new(RwLock::new(MessageQueue::new(
ctx.my_account_id(),
ctx.message_options().clone(),
))),
}))
Expand Down Expand Up @@ -229,6 +230,7 @@ impl ConsensusProtocol for StartedState {
threshold: contract_state.threshold,
protocol,
messages: Arc::new(RwLock::new(MessageQueue::new(
ctx.my_account_id(),
ctx.message_options().clone(),
))),
}))
Expand Down Expand Up @@ -767,6 +769,7 @@ async fn start_resharing<C: ConsensusCtx>(
public_key: contract_state.public_key,
protocol,
messages: Arc::new(RwLock::new(MessageQueue::new(
ctx.my_account_id(),
ctx.message_options().clone(),
))),
}))
Expand Down
7 changes: 5 additions & 2 deletions chain-signatures/node/src/protocol/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -249,15 +249,17 @@ impl MpcSignProtocol {
crate::metrics::PROTOCOL_ITER_CNT
.with_label_values(&[my_account_id.as_str()])
.inc();

let msg_time = Instant::now();
let mut msg_count = 0;
loop {
let msg_result = self.receiver.try_recv();
match msg_result {
Ok(msg) => {
tracing::debug!("received a new message");
msg_count += 1;
ChaoticTempest marked this conversation as resolved.
Show resolved Hide resolved
queue.push(msg);
}
Err(TryRecvError::Empty) => {
tracing::debug!("no new messages received");
break;
}
Err(TryRecvError::Disconnected) => {
Expand All @@ -266,6 +268,7 @@ impl MpcSignProtocol {
}
}
}
tracing::debug!("received {msg_count} messages in {:?}", msg_time.elapsed());
ChaoticTempest marked this conversation as resolved.
Show resolved Hide resolved
ChaoticTempest marked this conversation as resolved.
Show resolved Hide resolved

let contract_state = if last_state_update.elapsed() > Duration::from_secs(1) {
let contract_state = match rpc_client::fetch_mpc_contract_state(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ pub async fn running_mpc<'a>(
}
);
is_running
.retry(&ExponentialBuilder::default().with_max_times(6))
.retry(&ExponentialBuilder::default().with_max_times(7))
.await
.with_context(|| err_msg)
}
Expand Down
18 changes: 8 additions & 10 deletions integration-tests/chain-signatures/tests/cases/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ async fn test_multichain_reshare() -> anyhow::Result<()> {
actions::single_signature_production(&ctx, &state).await?;

tracing::info!("!!! Add participant 3");
assert!(ctx.add_participant(None).await.is_ok());
ctx.add_participant(None).await.unwrap();
let state = wait_for::running_mpc(&ctx, None).await?;
wait_for::has_at_least_triples(&ctx, 2).await?;
wait_for::has_at_least_presignatures(&ctx, 2).await?;
Expand All @@ -39,14 +39,12 @@ async fn test_multichain_reshare() -> anyhow::Result<()> {
state.participants.keys().nth(2).unwrap().clone().as_ref(),
)
.unwrap();
assert!(ctx.remove_participant(Some(&account_2)).await.is_ok());
ctx.remove_participant(Some(&account_2)).await.unwrap();
let account_0 = near_workspaces::types::AccountId::from_str(
state.participants.keys().next().unwrap().clone().as_ref(),
)
.unwrap();
let node_cfg_0 = ctx.remove_participant(Some(&account_0)).await;
assert!(node_cfg_0.is_ok());
let node_cfg_0 = node_cfg_0.unwrap();
let node_cfg_0 = ctx.remove_participant(Some(&account_0)).await.unwrap();
let state = wait_for::running_mpc(&ctx, None).await?;
wait_for::has_at_least_triples(&ctx, 2).await?;
wait_for::has_at_least_presignatures(&ctx, 2).await?;
Expand All @@ -56,14 +54,14 @@ async fn test_multichain_reshare() -> anyhow::Result<()> {
assert!(ctx.remove_participant(None).await.is_err());

tracing::info!("!!! Add participant 5");
assert!(ctx.add_participant(None).await.is_ok());
ctx.add_participant(None).await.unwrap();
let state = wait_for::running_mpc(&ctx, None).await?;
wait_for::has_at_least_triples(&ctx, 2).await?;
wait_for::has_at_least_presignatures(&ctx, 2).await?;
actions::single_signature_production(&ctx, &state).await?;

tracing::info!("!!! Add back participant 0");
assert!(ctx.add_participant(Some(node_cfg_0)).await.is_ok());
ctx.add_participant(Some(node_cfg_0)).await.unwrap();
let state = wait_for::running_mpc(&ctx, None).await?;
wait_for::has_at_least_triples(&ctx, 2).await?;
wait_for::has_at_least_presignatures(&ctx, 2).await?;
Expand Down Expand Up @@ -331,19 +329,19 @@ async fn test_multichain_reshare_with_lake_congestion() -> anyhow::Result<()> {
add_latency(&ctx.nodes.proxy_name_for_node(1), true, 1.0, 1_000, 100).await?;
// remove node2, node0 and node1 should still reach concensus
// this fails if the latency above is too long (10s)
assert!(ctx.remove_participant(None).await.is_ok());
ctx.remove_participant(None).await.unwrap();
let state = wait_for::running_mpc(&ctx, Some(0)).await?;
assert!(state.participants.len() == 2);
// Going below T should error out
assert!(ctx.remove_participant(None).await.is_err());
let state = wait_for::running_mpc(&ctx, Some(0)).await?;
assert!(state.participants.len() == 2);
assert!(ctx.add_participant(None).await.is_ok());
ctx.add_participant(None).await.unwrap();
// add latency to node2->rpc
add_latency(&ctx.nodes.proxy_name_for_node(2), true, 1.0, 1_000, 100).await?;
let state = wait_for::running_mpc(&ctx, Some(0)).await?;
assert!(state.participants.len() == 3);
assert!(ctx.remove_participant(None).await.is_ok());
ctx.remove_participant(None).await.unwrap();
let state = wait_for::running_mpc(&ctx, Some(0)).await?;
assert!(state.participants.len() == 2);
// make sure signing works after reshare
Expand Down
2 changes: 1 addition & 1 deletion integration-tests/chain-signatures/tests/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ impl MultichainTestContext<'_> {

self.nodes.start_node(&self.cfg, &node_account).await?;
// Wait for new node to add itself as a candidate
tokio::time::sleep(tokio::time::Duration::from_secs(10)).await;
tokio::time::sleep(tokio::time::Duration::from_secs(20)).await;

// T number of participants should vote
let participants = self.participant_accounts().await?;
Expand Down
Loading