diff --git a/chain-signatures/node/src/http_client.rs b/chain-signatures/node/src/http_client.rs index 4620a805f..8fa11eb70 100644 --- a/chain-signatures/node/src/http_client.rs +++ b/chain-signatures/node/src/http_client.rs @@ -11,6 +11,8 @@ use std::time::{Duration, Instant}; use tokio_retry::strategy::{jitter, ExponentialBackoff}; use tokio_retry::Retry; +use near_account_id::AccountId; + #[derive(Debug, Clone, clap::Parser)] #[group(id = "message_options")] pub struct Options { @@ -46,7 +48,7 @@ pub enum SendError { pub async fn send_encrypted( from: Participant, - client: &Client, + client: Client, url: U, message: Vec, request_timeout: Duration, @@ -97,14 +99,16 @@ pub async fn send_encrypted( pub struct MessageQueue { deque: VecDeque<(ParticipantInfo, MpcMessage, Instant)>, seen_counts: HashSet, + account_id: AccountId, message_options: Options, } impl MessageQueue { - pub fn new(options: Options) -> Self { + pub fn new(id: &AccountId, options: Options) -> Self { Self { - deque: VecDeque::default(), - seen_counts: HashSet::default(), + deque: VecDeque::new(), + seen_counts: HashSet::new(), + account_id: id.clone(), message_options: options, } } @@ -162,54 +166,57 @@ impl MessageQueue { encrypted.push((encrypted_msg, (info, msg, instant))); } - let mut compacted = 0; + let mut tasks = tokio::task::JoinSet::new(); for (id, encrypted) in encrypted { for partition in partition_ciphered_256kb(encrypted) { - let (encrypted_partition, msgs): (Vec<_>, Vec<_>) = partition.into_iter().unzip(); + let (encrypted_partition, _msgs): (Vec<_>, Vec<_>) = partition.into_iter().unzip(); // guaranteed to unwrap due to our previous loop check: - let info = participants.get(&Participant::from(id)).unwrap(); + let id = Participant::from(id); + let info = participants.get(&id).unwrap(); let account_id = &info.account_id; - let start = Instant::now(); crate::metrics::NUM_SEND_ENCRYPTED_TOTAL .with_label_values(&[account_id.as_str()]) .inc(); - if let Err(err) = send_encrypted( + + tasks.spawn(send_encrypted( from, - client, - &info.url, + client.clone(), + info.url.clone(), encrypted_partition, Duration::from_millis(self.message_options.timeout), - ) - .await - { - crate::metrics::NUM_SEND_ENCRYPTED_FAILURE - .with_label_values(&[account_id.as_str()]) - .inc(); - crate::metrics::FAILED_SEND_ENCRYPTED_LATENCY - .with_label_values(&[account_id.as_str()]) - .observe(start.elapsed().as_millis() as f64); - - // since we failed, put back all the messages related to this - failed.extend(msgs); + )); + } + } + + let mut compacted = 0; + while let Some(result) = tasks.join_next().await { + match result { + Ok(result) => { + let Err(err) = result else { + compacted += 1; + continue; + }; errors.push(err); - } else { - compacted += msgs.len(); - crate::metrics::SEND_ENCRYPTED_LATENCY - .with_label_values(&[account_id.as_str()]) - .observe(start.elapsed().as_millis() as f64); + } + Err(err) => { + tracing::error!(?err, "message queue task failure"); } } } + let elapsed = outer.elapsed(); if uncompacted > 0 { tracing::info!( uncompacted, compacted, - "{from:?} sent messages in {:?};", - outer.elapsed() + "{from:?} sent messages in {elapsed:?}", ); } + crate::metrics::SEND_ENCRYPTED_LATENCY + .with_label_values(&[self.account_id.as_str()]) + .observe(elapsed.as_millis() as f64); + // only add the participant count if it hasn't been seen before. let counts = format!("{participant_counter:?}"); if !participant_counter.is_empty() && self.seen_counts.insert(counts.clone()) { diff --git a/chain-signatures/node/src/mesh/connection.rs b/chain-signatures/node/src/mesh/connection.rs index 310f43638..c446ddcc5 100644 --- a/chain-signatures/node/src/mesh/connection.rs +++ b/chain-signatures/node/src/mesh/connection.rs @@ -206,7 +206,7 @@ impl Pool { let empty_msg: Vec = Vec::new(); crate::http_client::send_encrypted( *participant, - &self.http, + self.http.clone(), participant_info.url.clone(), empty_msg, self.fetch_participant_timeout, diff --git a/chain-signatures/node/src/metrics.rs b/chain-signatures/node/src/metrics.rs index d03fae289..7b4b2ae65 100644 --- a/chain-signatures/node/src/metrics.rs +++ b/chain-signatures/node/src/metrics.rs @@ -307,15 +307,6 @@ pub(crate) static PROTOCOL_LATENCY_ITER_MESSAGE: Lazy = Lazy::new( .unwrap() }); -pub(crate) static NUM_SEND_ENCRYPTED_FAILURE: Lazy = Lazy::new(|| { - try_create_counter_vec( - "multichain_send_encrypted_failure", - "number of successful send encrypted", - &["node_account_id"], - ) - .unwrap() -}); - pub(crate) static NUM_SEND_ENCRYPTED_TOTAL: Lazy = Lazy::new(|| { try_create_counter_vec( "multichain_send_encrypted_total", @@ -325,16 +316,6 @@ pub(crate) static NUM_SEND_ENCRYPTED_TOTAL: Lazy = Lazy::new(|| { .unwrap() }); -pub(crate) static FAILED_SEND_ENCRYPTED_LATENCY: Lazy = Lazy::new(|| { - try_create_histogram_vec( - "multichain_failed_send_encrypted_ms", - "Latency of failed send encrypted.", - &["node_account_id"], - Some(exponential_buckets(0.5, 1.5, 20).unwrap()), - ) - .unwrap() -}); - pub(crate) static NUM_TOTAL_HISTORICAL_SIGNATURE_GENERATORS: Lazy = Lazy::new(|| { try_create_counter_vec( "multichain_num_total_historical_signature_generators", diff --git a/chain-signatures/node/src/protocol/consensus.rs b/chain-signatures/node/src/protocol/consensus.rs index af03d18a5..aa14951b5 100644 --- a/chain-signatures/node/src/protocol/consensus.rs +++ b/chain-signatures/node/src/protocol/consensus.rs @@ -173,6 +173,7 @@ impl ConsensusProtocol for StartedState { ), )), messages: Arc::new(RwLock::new(MessageQueue::new( + ctx.my_account_id(), ctx.message_options().clone(), ))), })) @@ -229,6 +230,7 @@ impl ConsensusProtocol for StartedState { threshold: contract_state.threshold, protocol, messages: Arc::new(RwLock::new(MessageQueue::new( + ctx.my_account_id(), ctx.message_options().clone(), ))), })) @@ -767,6 +769,7 @@ async fn start_resharing( public_key: contract_state.public_key, protocol, messages: Arc::new(RwLock::new(MessageQueue::new( + ctx.my_account_id(), ctx.message_options().clone(), ))), })) diff --git a/chain-signatures/node/src/protocol/mod.rs b/chain-signatures/node/src/protocol/mod.rs index 7c87e9b4f..d6f4186af 100644 --- a/chain-signatures/node/src/protocol/mod.rs +++ b/chain-signatures/node/src/protocol/mod.rs @@ -249,15 +249,17 @@ impl MpcSignProtocol { crate::metrics::PROTOCOL_ITER_CNT .with_label_values(&[my_account_id.as_str()]) .inc(); + + let msg_time = Instant::now(); + let mut msg_count = 0; loop { let msg_result = self.receiver.try_recv(); match msg_result { Ok(msg) => { - tracing::debug!("received a new message"); + msg_count += 1; queue.push(msg); } Err(TryRecvError::Empty) => { - tracing::debug!("no new messages received"); break; } Err(TryRecvError::Disconnected) => { @@ -266,6 +268,7 @@ impl MpcSignProtocol { } } } + tracing::debug!("received {msg_count} messages in {:?}", msg_time.elapsed()); let contract_state = if last_state_update.elapsed() > Duration::from_secs(1) { let contract_state = match rpc_client::fetch_mpc_contract_state( diff --git a/integration-tests/chain-signatures/tests/actions/wait_for.rs b/integration-tests/chain-signatures/tests/actions/wait_for.rs index ac2e00693..3c774cdc2 100644 --- a/integration-tests/chain-signatures/tests/actions/wait_for.rs +++ b/integration-tests/chain-signatures/tests/actions/wait_for.rs @@ -51,7 +51,7 @@ pub async fn running_mpc<'a>( } ); is_running - .retry(&ExponentialBuilder::default().with_max_times(6)) + .retry(&ExponentialBuilder::default().with_max_times(7)) .await .with_context(|| err_msg) } diff --git a/integration-tests/chain-signatures/tests/cases/mod.rs b/integration-tests/chain-signatures/tests/cases/mod.rs index a166d7d23..27665c53f 100644 --- a/integration-tests/chain-signatures/tests/cases/mod.rs +++ b/integration-tests/chain-signatures/tests/cases/mod.rs @@ -28,7 +28,7 @@ async fn test_multichain_reshare() -> anyhow::Result<()> { actions::single_signature_production(&ctx, &state).await?; tracing::info!("!!! Add participant 3"); - assert!(ctx.add_participant(None).await.is_ok()); + ctx.add_participant(None).await.unwrap(); let state = wait_for::running_mpc(&ctx, None).await?; wait_for::has_at_least_triples(&ctx, 2).await?; wait_for::has_at_least_presignatures(&ctx, 2).await?; @@ -39,14 +39,12 @@ async fn test_multichain_reshare() -> anyhow::Result<()> { state.participants.keys().nth(2).unwrap().clone().as_ref(), ) .unwrap(); - assert!(ctx.remove_participant(Some(&account_2)).await.is_ok()); + ctx.remove_participant(Some(&account_2)).await.unwrap(); let account_0 = near_workspaces::types::AccountId::from_str( state.participants.keys().next().unwrap().clone().as_ref(), ) .unwrap(); - let node_cfg_0 = ctx.remove_participant(Some(&account_0)).await; - assert!(node_cfg_0.is_ok()); - let node_cfg_0 = node_cfg_0.unwrap(); + let node_cfg_0 = ctx.remove_participant(Some(&account_0)).await.unwrap(); let state = wait_for::running_mpc(&ctx, None).await?; wait_for::has_at_least_triples(&ctx, 2).await?; wait_for::has_at_least_presignatures(&ctx, 2).await?; @@ -56,14 +54,14 @@ async fn test_multichain_reshare() -> anyhow::Result<()> { assert!(ctx.remove_participant(None).await.is_err()); tracing::info!("!!! Add participant 5"); - assert!(ctx.add_participant(None).await.is_ok()); + ctx.add_participant(None).await.unwrap(); let state = wait_for::running_mpc(&ctx, None).await?; wait_for::has_at_least_triples(&ctx, 2).await?; wait_for::has_at_least_presignatures(&ctx, 2).await?; actions::single_signature_production(&ctx, &state).await?; tracing::info!("!!! Add back participant 0"); - assert!(ctx.add_participant(Some(node_cfg_0)).await.is_ok()); + ctx.add_participant(Some(node_cfg_0)).await.unwrap(); let state = wait_for::running_mpc(&ctx, None).await?; wait_for::has_at_least_triples(&ctx, 2).await?; wait_for::has_at_least_presignatures(&ctx, 2).await?; @@ -331,19 +329,19 @@ async fn test_multichain_reshare_with_lake_congestion() -> anyhow::Result<()> { add_latency(&ctx.nodes.proxy_name_for_node(1), true, 1.0, 1_000, 100).await?; // remove node2, node0 and node1 should still reach concensus // this fails if the latency above is too long (10s) - assert!(ctx.remove_participant(None).await.is_ok()); + ctx.remove_participant(None).await.unwrap(); let state = wait_for::running_mpc(&ctx, Some(0)).await?; assert!(state.participants.len() == 2); // Going below T should error out assert!(ctx.remove_participant(None).await.is_err()); let state = wait_for::running_mpc(&ctx, Some(0)).await?; assert!(state.participants.len() == 2); - assert!(ctx.add_participant(None).await.is_ok()); + ctx.add_participant(None).await.unwrap(); // add latency to node2->rpc add_latency(&ctx.nodes.proxy_name_for_node(2), true, 1.0, 1_000, 100).await?; let state = wait_for::running_mpc(&ctx, Some(0)).await?; assert!(state.participants.len() == 3); - assert!(ctx.remove_participant(None).await.is_ok()); + ctx.remove_participant(None).await.unwrap(); let state = wait_for::running_mpc(&ctx, Some(0)).await?; assert!(state.participants.len() == 2); // make sure signing works after reshare diff --git a/integration-tests/chain-signatures/tests/lib.rs b/integration-tests/chain-signatures/tests/lib.rs index 74fab5526..d234926de 100644 --- a/integration-tests/chain-signatures/tests/lib.rs +++ b/integration-tests/chain-signatures/tests/lib.rs @@ -61,7 +61,7 @@ impl MultichainTestContext<'_> { self.nodes.start_node(&self.cfg, &node_account).await?; // Wait for new node to add itself as a candidate - tokio::time::sleep(tokio::time::Duration::from_secs(10)).await; + tokio::time::sleep(tokio::time::Duration::from_secs(20)).await; // T number of participants should vote let participants = self.participant_accounts().await?;