From 88bffbf18c356fc9093d28bc06aeb24ce2061399 Mon Sep 17 00:00:00 2001 From: Xiangyi Zheng Date: Fri, 11 Oct 2024 15:33:59 -0700 Subject: [PATCH 01/10] increase timeout --- chain-signatures/node/src/http_client.rs | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/chain-signatures/node/src/http_client.rs b/chain-signatures/node/src/http_client.rs index 40a80a5a..b11e1f7f 100644 --- a/chain-signatures/node/src/http_client.rs +++ b/chain-signatures/node/src/http_client.rs @@ -46,6 +46,10 @@ async fn send_encrypted( .post(url.clone()) .header("content-type", "application/json") .json(&message) +<<<<<<< HEAD +======= + .timeout(Duration::from_millis(400)) +>>>>>>> b340f1a0 (increase timeout) .send() .await .map_err(SendError::ReqwestClientError)?; From 3dc8cc3ce5c512e247d44259c824c91f79d5c052 Mon Sep 17 00:00:00 2001 From: Xiangyi Zheng Date: Fri, 11 Oct 2024 16:09:28 -0700 Subject: [PATCH 02/10] revert timeout, change presig timeout --- chain-signatures/contract/src/config/impls.rs | 2 +- chain-signatures/node/src/http_client.rs | 4 ++++ 2 files changed, 5 insertions(+), 1 deletion(-) diff --git a/chain-signatures/contract/src/config/impls.rs b/chain-signatures/contract/src/config/impls.rs index 36c7d5b5..53dcb0a3 100644 --- a/chain-signatures/contract/src/config/impls.rs +++ b/chain-signatures/contract/src/config/impls.rs @@ -57,7 +57,7 @@ impl Default for PresignatureConfig { Self { min_presignatures: 512, max_presignatures: 512 * MAX_EXPECTED_PARTICIPANTS * NETWORK_MULTIPLIER, - generation_timeout: secs_to_ms(45), + generation_timeout: secs_to_ms(90), other: Default::default(), } diff --git a/chain-signatures/node/src/http_client.rs b/chain-signatures/node/src/http_client.rs index b11e1f7f..68c7acf3 100644 --- a/chain-signatures/node/src/http_client.rs +++ b/chain-signatures/node/src/http_client.rs @@ -47,9 +47,13 @@ async fn send_encrypted( .header("content-type", "application/json") .json(&message) <<<<<<< HEAD +<<<<<<< HEAD ======= .timeout(Duration::from_millis(400)) >>>>>>> b340f1a0 (increase timeout) +======= + .timeout(Duration::from_millis(200)) +>>>>>>> 26a2f6a4 (revert timeout, change presig timeout) .send() .await .map_err(SendError::ReqwestClientError)?; From fbdfc2e822996cb341a66b651726781ea7929397 Mon Sep 17 00:00:00 2001 From: Xiangyi Zheng Date: Fri, 11 Oct 2024 18:59:48 -0700 Subject: [PATCH 03/10] update timeout --- chain-signatures/node/src/mesh/connection.rs | 49 ++++++++++++-------- 1 file changed, 30 insertions(+), 19 deletions(-) diff --git a/chain-signatures/node/src/mesh/connection.rs b/chain-signatures/node/src/mesh/connection.rs index ef6029ac..be4aca75 100644 --- a/chain-signatures/node/src/mesh/connection.rs +++ b/chain-signatures/node/src/mesh/connection.rs @@ -49,26 +49,37 @@ impl Pool { continue; }; - let Ok(resp) = self.http.get(url.clone()).send().await else { - tracing::warn!( - "Pool.ping resp err participant {:?} url {}", - participant, - url - ); - continue; - }; - - let Ok(state): Result = resp.json().await else { - tracing::warn!( - "Pool.ping state view err participant {:?} url {}", - participant, - url - ); - continue; + let work = self.http.get(url.clone()).send(); + match tokio::time::timeout(Duration::from_millis(200), work).await { + Ok(result) => match result { + Ok(resp) => { + let Ok(state): Result = resp.json().await else { + tracing::warn!( + "Pool.ping state view err participant {:?} url {}", + participant, + url + ); + continue; + }; + + status.insert(*participant, state); + participants.insert(participant, info.clone()); + }, + Err(e) => { + tracing::warn!("Network error: {:?}", e); + continue; + } + }, + Err(_) => { + tracing::warn!( + "Pool.ping resp err participant {:?} url {}, timed out", + participant, + url + ); + continue; + } }; - - status.insert(*participant, state); - participants.insert(participant, info.clone()); + } drop(status); From b794d1303704ae19f28340b39f16c6eb8b24314c Mon Sep 17 00:00:00 2001 From: Xiangyi Zheng Date: Fri, 11 Oct 2024 19:12:05 -0700 Subject: [PATCH 04/10] 500 ms --- chain-signatures/node/src/mesh/connection.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/chain-signatures/node/src/mesh/connection.rs b/chain-signatures/node/src/mesh/connection.rs index be4aca75..635c0dc7 100644 --- a/chain-signatures/node/src/mesh/connection.rs +++ b/chain-signatures/node/src/mesh/connection.rs @@ -50,7 +50,7 @@ impl Pool { }; let work = self.http.get(url.clone()).send(); - match tokio::time::timeout(Duration::from_millis(200), work).await { + match tokio::time::timeout(Duration::from_millis(500), work).await { Ok(result) => match result { Ok(resp) => { let Ok(state): Result = resp.json().await else { From ee1f8ae6992789c6e48fb9cea0d34409ed8c6a34 Mon Sep 17 00:00:00 2001 From: Xiangyi Zheng Date: Fri, 11 Oct 2024 19:29:28 -0700 Subject: [PATCH 05/10] use 1s --- chain-signatures/node/src/mesh/connection.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/chain-signatures/node/src/mesh/connection.rs b/chain-signatures/node/src/mesh/connection.rs index 635c0dc7..9a317ec5 100644 --- a/chain-signatures/node/src/mesh/connection.rs +++ b/chain-signatures/node/src/mesh/connection.rs @@ -50,7 +50,7 @@ impl Pool { }; let work = self.http.get(url.clone()).send(); - match tokio::time::timeout(Duration::from_millis(500), work).await { + match tokio::time::timeout(Duration::from_millis(1000), work).await { Ok(result) => match result { Ok(resp) => { let Ok(state): Result = resp.json().await else { From 029880069e92dcf8b1091bc62e07cb6860d80552 Mon Sep 17 00:00:00 2001 From: Xiangyi Zheng Date: Fri, 11 Oct 2024 20:00:35 -0700 Subject: [PATCH 06/10] use 5s --- chain-signatures/node/src/mesh/connection.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/chain-signatures/node/src/mesh/connection.rs b/chain-signatures/node/src/mesh/connection.rs index 9a317ec5..25d1697c 100644 --- a/chain-signatures/node/src/mesh/connection.rs +++ b/chain-signatures/node/src/mesh/connection.rs @@ -50,7 +50,7 @@ impl Pool { }; let work = self.http.get(url.clone()).send(); - match tokio::time::timeout(Duration::from_millis(1000), work).await { + match tokio::time::timeout(Duration::from_millis(5000), work).await { Ok(result) => match result { Ok(resp) => { let Ok(state): Result = resp.json().await else { From 956612de12c71bf6b11840b1c9474888efe7aa6e Mon Sep 17 00:00:00 2001 From: Xiangyi Zheng Date: Fri, 11 Oct 2024 20:21:11 -0700 Subject: [PATCH 07/10] back to 1s --- chain-signatures/node/src/mesh/connection.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/chain-signatures/node/src/mesh/connection.rs b/chain-signatures/node/src/mesh/connection.rs index 25d1697c..9a317ec5 100644 --- a/chain-signatures/node/src/mesh/connection.rs +++ b/chain-signatures/node/src/mesh/connection.rs @@ -50,7 +50,7 @@ impl Pool { }; let work = self.http.get(url.clone()).send(); - match tokio::time::timeout(Duration::from_millis(5000), work).await { + match tokio::time::timeout(Duration::from_millis(1000), work).await { Ok(result) => match result { Ok(resp) => { let Ok(state): Result = resp.json().await else { From cca9c1ad700ca795603dc420ba0c2e6f7a25f140 Mon Sep 17 00:00:00 2001 From: Xiangyi Zheng Date: Tue, 15 Oct 2024 14:17:02 -0700 Subject: [PATCH 08/10] refactor: all param in env variables --- chain-signatures/node/src/cli.rs | 14 +- chain-signatures/node/src/http_client.rs | 48 ++++++- chain-signatures/node/src/mesh/connection.rs | 130 ++++++++++-------- chain-signatures/node/src/mesh/mod.rs | 38 ++++- .../node/src/protocol/consensus.rs | 16 ++- chain-signatures/node/src/protocol/mod.rs | 12 +- .../chain-signatures/src/containers.rs | 2 + integration-tests/chain-signatures/src/lib.rs | 14 ++ .../chain-signatures/src/local.rs | 4 + 9 files changed, 212 insertions(+), 66 deletions(-) diff --git a/chain-signatures/node/src/cli.rs b/chain-signatures/node/src/cli.rs index 15122674..25bbf744 100644 --- a/chain-signatures/node/src/cli.rs +++ b/chain-signatures/node/src/cli.rs @@ -2,7 +2,7 @@ use crate::config::{Config, LocalConfig, NetworkConfig, OverrideConfig}; use crate::gcp::GcpService; use crate::protocol::{MpcSignProtocol, SignQueue}; use crate::storage::triple_storage::LockTripleNodeStorageBox; -use crate::{indexer, storage, web}; +use crate::{http_client, indexer, mesh, storage, web}; use clap::Parser; use local_ip_address::local_ip; use near_account_id::AccountId; @@ -63,6 +63,10 @@ pub enum Cli { /// referer header for mainnet whitelist #[arg(long, env("MPC_CLIENT_HEADER_REFERER"), default_value(None))] client_header_referer: Option, + #[clap(flatten)] + mesh_options: mesh::Options, + #[clap(flatten)] + message_options: http_client::Options, }, } @@ -83,6 +87,8 @@ impl Cli { storage_options, override_config, client_header_referer, + mesh_options, + message_options, } => { let mut args = vec![ "start".to_string(), @@ -120,6 +126,8 @@ impl Cli { args.extend(indexer_options.into_str_args()); args.extend(storage_options.into_str_args()); + args.extend(mesh_options.into_str_args()); + args.extend(message_options.into_str_args()); args } } @@ -176,6 +184,8 @@ pub fn run(cmd: Cli) -> anyhow::Result<()> { storage_options, override_config, client_header_referer, + mesh_options, + message_options, } => { let sign_queue = Arc::new(RwLock::new(SignQueue::new())); let rt = tokio::runtime::Builder::new_multi_thread() @@ -237,6 +247,8 @@ pub fn run(cmd: Cli) -> anyhow::Result<()> { sign_sk, }, }), + mesh_options, + message_options, ); rt.block_on(async { diff --git a/chain-signatures/node/src/http_client.rs b/chain-signatures/node/src/http_client.rs index 68c7acf3..8cd10f6d 100644 --- a/chain-signatures/node/src/http_client.rs +++ b/chain-signatures/node/src/http_client.rs @@ -11,6 +11,19 @@ use std::time::{Duration, Instant}; use tokio_retry::strategy::{jitter, ExponentialBackoff}; use tokio_retry::Retry; +#[derive(Debug, Clone, clap::Parser)] +#[group(id = "message_options")] +pub struct Options { + #[clap(long, env("MPC_MESSAGE_TIMEOUT"), default_value = "1000")] + pub timeout: u64, +} + +impl Options { + pub fn into_str_args(self) -> Vec { + vec!["--timeout".to_string(), self.timeout.to_string()] + } +} + #[derive(Debug, thiserror::Error)] pub enum SendError { #[error("http request was unsuccessful: {0}")] @@ -36,12 +49,14 @@ async fn send_encrypted( client: &Client, url: U, message: Vec, + request_timeout: Duration, ) -> Result<(), SendError> { let _span = tracing::info_span!("message_request"); let mut url = url.into_url()?; url.set_path("msg"); tracing::debug!(?from, to = %url, "making http request: sending encrypted message"); let action = || async { +<<<<<<< HEAD let response = client .post(url.clone()) .header("content-type", "application/json") @@ -57,6 +72,20 @@ async fn send_encrypted( .send() .await .map_err(SendError::ReqwestClientError)?; +======= + let response = tokio::time::timeout( + request_timeout, + client + .post(url.clone()) + .header("content-type", "application/json") + .json(&message) + .send(), + ) + .await + .map_err(|_| SendError::Timeout(format!("send encrypted from {from:?} to {url}")))? + .map_err(SendError::ReqwestClientError)?; + +>>>>>>> 611df5c0 (refactor: all param in env variables) let status = response.status(); let response_bytes = response .bytes() @@ -83,13 +112,21 @@ async fn send_encrypted( // TODO: add in retry logic either in struct or at call site. // TODO: add check for participant list to see if the messages to be sent are still valid. -#[derive(Default)] pub struct MessageQueue { deque: VecDeque<(ParticipantInfo, MpcMessage, Instant)>, seen_counts: HashSet, + message_options: Options, } impl MessageQueue { + pub fn new(options: Options) -> Self { + Self { + deque: VecDeque::default(), + seen_counts: HashSet::default(), + message_options: options, + } + } + pub fn len(&self) -> usize { self.deque.len() } @@ -155,7 +192,14 @@ impl MessageQueue { crate::metrics::NUM_SEND_ENCRYPTED_TOTAL .with_label_values(&[account_id.as_str()]) .inc(); - if let Err(err) = send_encrypted(from, client, &info.url, encrypted_partition).await + if let Err(err) = send_encrypted( + from, + client, + &info.url, + encrypted_partition, + Duration::from_millis(self.message_options.timeout), + ) + .await { crate::metrics::NUM_SEND_ENCRYPTED_FAILURE .with_label_values(&[account_id.as_str()]) diff --git a/chain-signatures/node/src/mesh/connection.rs b/chain-signatures/node/src/mesh/connection.rs index 9a317ec5..18041fe1 100644 --- a/chain-signatures/node/src/mesh/connection.rs +++ b/chain-signatures/node/src/mesh/connection.rs @@ -6,11 +6,10 @@ use tokio::sync::RwLock; use url::Url; use crate::protocol::contract::primitives::Participants; +use crate::protocol::ParticipantInfo; use crate::protocol::ProtocolState; use crate::web::StateView; -const DEFAULT_TIMEOUT: Duration = Duration::from_secs(1); - // TODO: this is a basic connection pool and does not do most of the work yet. This is // mostly here just to facilitate offline node handling for now. // TODO/NOTE: we can use libp2p to facilitate most the of low level TCP connection work. @@ -25,12 +24,43 @@ pub struct Pool { current_active: RwLock>, // Potentially active participants that we can use to establish a connection in the next epoch. potential_active: RwLock>, + fetch_participant_timeout: Duration, + refresh_active_timeout: Duration, +} + +#[derive(Debug, Clone, PartialEq, Eq, thiserror::Error)] +pub enum FetchParticipantError { + #[error("request timed out")] + Timeout, + #[error("Response cannot be converted to JSON")] + JsonConversion, + #[error("Invalid URL")] + InvalidUrl, + #[error("Network error: {0}")] + NetworkError(String), } impl Pool { + pub fn new(fetch_participant_timeout: Duration, refresh_active_timeout: Duration) -> Self { + tracing::info!( + ?fetch_participant_timeout, + ?refresh_active_timeout, + "creating a new pool" + ); + Self { + http: reqwest::Client::new(), + connections: RwLock::new(Participants::default()), + potential_connections: RwLock::new(Participants::default()), + status: RwLock::new(HashMap::default()), + current_active: RwLock::new(Option::default()), + potential_active: RwLock::new(Option::default()), + fetch_participant_timeout, + refresh_active_timeout, + } + } pub async fn ping(&self) -> Participants { if let Some((ref active, timestamp)) = *self.current_active.read().await { - if timestamp.elapsed() < DEFAULT_TIMEOUT { + if timestamp.elapsed() < self.refresh_active_timeout { return active.clone(); } } @@ -40,46 +70,15 @@ impl Pool { let mut status = self.status.write().await; let mut participants = Participants::default(); for (participant, info) in connections.iter() { - let Ok(Ok(url)) = Url::parse(&info.url).map(|url| url.join("/state")) else { - tracing::error!( - "Pool.ping url is invalid participant {:?} url {} /state", - participant, - info.url - ); - continue; - }; - - let work = self.http.get(url.clone()).send(); - match tokio::time::timeout(Duration::from_millis(1000), work).await { - Ok(result) => match result { - Ok(resp) => { - let Ok(state): Result = resp.json().await else { - tracing::warn!( - "Pool.ping state view err participant {:?} url {}", - participant, - url - ); - continue; - }; - - status.insert(*participant, state); - participants.insert(participant, info.clone()); - }, - Err(e) => { - tracing::warn!("Network error: {:?}", e); - continue; - } - }, - Err(_) => { - tracing::warn!( - "Pool.ping resp err participant {:?} url {}, timed out", - participant, - url - ); - continue; + match self.fetch_participant_state(info).await { + Ok(state) => { + status.insert(*participant, state); + participants.insert(participant, info.clone()); + } + Err(e) => { + tracing::warn!("Fetch state for participant {participant:?} with url {} has failed with error {e}.", info.url); } - }; - + } } drop(status); @@ -90,7 +89,7 @@ impl Pool { pub async fn ping_potential(&self) -> Participants { if let Some((ref active, timestamp)) = *self.potential_active.read().await { - if timestamp.elapsed() < DEFAULT_TIMEOUT { + if timestamp.elapsed() < self.refresh_active_timeout { return active.clone(); } } @@ -100,20 +99,15 @@ impl Pool { let mut status = self.status.write().await; let mut participants = Participants::default(); for (participant, info) in connections.iter() { - let Ok(Ok(url)) = Url::parse(&info.url).map(|url| url.join("/state")) else { - continue; - }; - - let Ok(resp) = self.http.get(url).send().await else { - continue; - }; - - let Ok(state): Result = resp.json().await else { - continue; - }; - - status.insert(*participant, state); - participants.insert(participant, info.clone()); + match self.fetch_participant_state(info).await { + Ok(state) => { + status.insert(*participant, state); + participants.insert(participant, info.clone()); + } + Err(e) => { + tracing::warn!("Fetch state for participant {participant:?} with url {} has failed with error {e}.", info.url); + } + } } drop(status); @@ -170,4 +164,26 @@ impl Pool { _ => false, }) } + + async fn fetch_participant_state( + &self, + participant_info: &ParticipantInfo, + ) -> Result { + let Ok(Ok(url)) = Url::parse(&participant_info.url).map(|url| url.join("/state")) else { + return Err(FetchParticipantError::InvalidUrl); + }; + match tokio::time::timeout( + self.fetch_participant_timeout, + self.http.get(url.clone()).send(), + ) + .await + { + Ok(Ok(resp)) => match resp.json::().await { + Ok(state) => Ok(state), + Err(_) => Err(FetchParticipantError::JsonConversion), + }, + Ok(Err(e)) => Err(FetchParticipantError::NetworkError(e.to_string())), + Err(_) => Err(FetchParticipantError::Timeout), + } + } } diff --git a/chain-signatures/node/src/mesh/mod.rs b/chain-signatures/node/src/mesh/mod.rs index 56b7b9ac..7dcedbcc 100644 --- a/chain-signatures/node/src/mesh/mod.rs +++ b/chain-signatures/node/src/mesh/mod.rs @@ -1,9 +1,34 @@ +use std::time::Duration; + use crate::protocol::contract::primitives::Participants; use crate::protocol::ProtocolState; pub mod connection; -#[derive(Default)] +#[derive(Debug, Clone, clap::Parser)] +#[group(id = "mesh_options")] +pub struct Options { + #[clap( + long, + env("MPC_MESH_FETCH_PARTICIPANT_TIMEOUT"), + default_value = "1000" + )] + pub fetch_participant_timeout: u64, + #[clap(long, env("MPC_MESH_REFRESH_ACTIVE_TIMEOUT"), default_value = "1000")] + pub refresh_active_timeout: u64, +} + +impl Options { + pub fn into_str_args(self) -> Vec { + vec![ + "--fetch-participant-timeout".to_string(), + self.fetch_participant_timeout.to_string(), + "--refresh-active-timeout".to_string(), + self.refresh_active_timeout.to_string(), + ] + } +} + pub struct Mesh { /// Pool of connections to participants. Used to check who is alive in the network. pub connections: connection::Pool, @@ -17,6 +42,17 @@ pub struct Mesh { } impl Mesh { + pub fn new(options: Options) -> Self { + Self { + connections: connection::Pool::new( + Duration::from_millis(options.fetch_participant_timeout), + Duration::from_millis(options.refresh_active_timeout), + ), + active_participants: Participants::default(), + active_potential_participants: Participants::default(), + } + } + /// Participants that are active at the beginning of each protocol loop. pub fn active_participants(&self) -> &Participants { &self.active_participants diff --git a/chain-signatures/node/src/protocol/consensus.rs b/chain-signatures/node/src/protocol/consensus.rs index 6d3917af..af03d18a 100644 --- a/chain-signatures/node/src/protocol/consensus.rs +++ b/chain-signatures/node/src/protocol/consensus.rs @@ -6,18 +6,19 @@ use super::state::{ use super::{Config, SignQueue}; use crate::gcp::error::DatastoreStorageError; use crate::gcp::error::SecretStorageError; +use crate::http_client::MessageQueue; use crate::protocol::contract::primitives::Participants; use crate::protocol::monitor::StuckMonitor; use crate::protocol::presignature::PresignatureManager; use crate::protocol::signature::SignatureManager; use crate::protocol::state::{GeneratingState, ResharingState}; use crate::protocol::triple::TripleManager; -use crate::rpc_client; use crate::storage::secret_storage::SecretNodeStorageBox; use crate::storage::triple_storage::LockTripleNodeStorageBox; use crate::storage::triple_storage::TripleData; use crate::types::{KeygenProtocol, ReshareProtocol, SecretKeyShare}; use crate::util::AffinePointExt; +use crate::{http_client, rpc_client}; use std::cmp::Ordering; use std::sync::Arc; @@ -42,6 +43,7 @@ pub trait ConsensusCtx { fn secret_storage(&self) -> &SecretNodeStorageBox; fn triple_storage(&self) -> LockTripleNodeStorageBox; fn cfg(&self) -> &Config; + fn message_options(&self) -> http_client::Options; } #[derive(thiserror::Error, Debug)] @@ -170,7 +172,9 @@ impl ConsensusProtocol for StartedState { ctx.my_account_id(), ), )), - messages: Default::default(), + messages: Arc::new(RwLock::new(MessageQueue::new( + ctx.message_options().clone(), + ))), })) } None => Ok(NodeState::Joining(JoiningState { @@ -224,7 +228,9 @@ impl ConsensusProtocol for StartedState { participants, threshold: contract_state.threshold, protocol, - messages: Default::default(), + messages: Arc::new(RwLock::new(MessageQueue::new( + ctx.message_options().clone(), + ))), })) } None => { @@ -760,6 +766,8 @@ async fn start_resharing( threshold: contract_state.threshold, public_key: contract_state.public_key, protocol, - messages: Default::default(), + messages: Arc::new(RwLock::new(MessageQueue::new( + ctx.message_options().clone(), + ))), })) } diff --git a/chain-signatures/node/src/protocol/mod.rs b/chain-signatures/node/src/protocol/mod.rs index e8f3b00d..7c87e9b4 100644 --- a/chain-signatures/node/src/protocol/mod.rs +++ b/chain-signatures/node/src/protocol/mod.rs @@ -23,6 +23,8 @@ use self::consensus::ConsensusCtx; use self::cryptography::CryptographicCtx; use self::message::MessageCtx; use crate::config::Config; +use crate::http_client; +use crate::mesh; use crate::mesh::Mesh; use crate::protocol::consensus::ConsensusProtocol; use crate::protocol::cryptography::CryptographicProtocol; @@ -54,6 +56,7 @@ struct Ctx { triple_storage: LockTripleNodeStorageBox, cfg: Config, mesh: Mesh, + message_options: http_client::Options, } impl ConsensusCtx for &mut MpcSignProtocol { @@ -96,6 +99,10 @@ impl ConsensusCtx for &mut MpcSignProtocol { fn triple_storage(&self) -> LockTripleNodeStorageBox { self.ctx.triple_storage.clone() } + + fn message_options(&self) -> http_client::Options { + self.ctx.message_options.clone() + } } #[async_trait::async_trait] @@ -167,6 +174,8 @@ impl MpcSignProtocol { secret_storage: SecretNodeStorageBox, triple_storage: LockTripleNodeStorageBox, cfg: Config, + mesh_options: mesh::Options, + message_options: http_client::Options, ) -> (Self, Arc>) { let my_address = my_address.into_url().unwrap(); let rpc_url = rpc_client.rpc_addr(); @@ -192,7 +201,8 @@ impl MpcSignProtocol { secret_storage, triple_storage, cfg, - mesh: Mesh::default(), + mesh: Mesh::new(mesh_options), + message_options, }; let protocol = MpcSignProtocol { ctx, diff --git a/integration-tests/chain-signatures/src/containers.rs b/integration-tests/chain-signatures/src/containers.rs index 97eec7fd..b6b69d17 100644 --- a/integration-tests/chain-signatures/src/containers.rs +++ b/integration-tests/chain-signatures/src/containers.rs @@ -117,6 +117,8 @@ impl<'a> Node<'a> { config.cfg.protocol.clone(), )?)), client_header_referer: None, + mesh_options: ctx.mesh_options.clone(), + message_options: ctx.message_options.clone(), } .into_str_args(); let image: GenericImage = GenericImage::new("near/mpc-node", "latest") diff --git a/integration-tests/chain-signatures/src/lib.rs b/integration-tests/chain-signatures/src/lib.rs index dbdba4d3..0771a0ca 100644 --- a/integration-tests/chain-signatures/src/lib.rs +++ b/integration-tests/chain-signatures/src/lib.rs @@ -15,6 +15,8 @@ use futures::StreamExt; use mpc_contract::config::{PresignatureConfig, ProtocolConfig, TripleConfig}; use mpc_contract::primitives::CandidateInfo; use mpc_node::gcp::GcpService; +use mpc_node::http_client; +use mpc_node::mesh; use mpc_node::storage; use mpc_node::storage::triple_storage::TripleNodeStorageBox; use near_crypto::KeyFile; @@ -206,6 +208,8 @@ pub struct Context<'a> { pub mpc_contract: Contract, pub datastore: crate::containers::Datastore<'a>, pub storage_options: storage::Options, + pub mesh_options: mesh::Options, + pub message_options: http_client::Options, } pub async fn setup(docker_client: &DockerClient) -> anyhow::Result> { @@ -240,6 +244,14 @@ pub async fn setup(docker_client: &DockerClient) -> anyhow::Result> gcp_datastore_url: Some(datastore.local_address.clone()), sk_share_local_path: Some(sk_share_local_path), }; + + let mesh_options = mpc_node::mesh::Options { + fetch_participant_timeout: 1000, + refresh_active_timeout: 1000, + }; + + let message_options = http_client::Options { timeout: 1000 }; + Ok(Context { docker_client, docker_network: docker_network.to_string(), @@ -250,6 +262,8 @@ pub async fn setup(docker_client: &DockerClient) -> anyhow::Result> mpc_contract, datastore, storage_options, + mesh_options, + message_options, }) } diff --git a/integration-tests/chain-signatures/src/local.rs b/integration-tests/chain-signatures/src/local.rs index a3051b6b..923ccb15 100644 --- a/integration-tests/chain-signatures/src/local.rs +++ b/integration-tests/chain-signatures/src/local.rs @@ -74,6 +74,8 @@ impl Node { cfg.protocol.clone(), )?)), client_header_referer: None, + mesh_options: ctx.mesh_options.clone(), + message_options: ctx.message_options.clone(), }; let cmd = executable(ctx.release, crate::execute::PACKAGE_MULTICHAIN) @@ -165,6 +167,8 @@ impl Node { config.cfg.protocol.clone(), )?)), client_header_referer: None, + mesh_options: ctx.mesh_options.clone(), + message_options: ctx.message_options.clone(), }; let mpc_node_id = format!("multichain/{}", config.account.id()); From 8adaa71bd962bbec17710120e48bd053116eec68 Mon Sep 17 00:00:00 2001 From: Xiangyi Zheng Date: Tue, 15 Oct 2024 14:18:06 -0700 Subject: [PATCH 09/10] revert contract change --- chain-signatures/contract/src/config/impls.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/chain-signatures/contract/src/config/impls.rs b/chain-signatures/contract/src/config/impls.rs index 53dcb0a3..36c7d5b5 100644 --- a/chain-signatures/contract/src/config/impls.rs +++ b/chain-signatures/contract/src/config/impls.rs @@ -57,7 +57,7 @@ impl Default for PresignatureConfig { Self { min_presignatures: 512, max_presignatures: 512 * MAX_EXPECTED_PARTICIPANTS * NETWORK_MULTIPLIER, - generation_timeout: secs_to_ms(90), + generation_timeout: secs_to_ms(45), other: Default::default(), } From 58d7056ca1db7ef20ceeee14e2f95a3f9d31e4d6 Mon Sep 17 00:00:00 2001 From: Xiangyi Zheng Date: Wed, 16 Oct 2024 11:59:00 -0700 Subject: [PATCH 10/10] minor --- chain-signatures/node/src/http_client.rs | 18 ------------------ 1 file changed, 18 deletions(-) diff --git a/chain-signatures/node/src/http_client.rs b/chain-signatures/node/src/http_client.rs index 8cd10f6d..eaf5c37c 100644 --- a/chain-signatures/node/src/http_client.rs +++ b/chain-signatures/node/src/http_client.rs @@ -56,23 +56,6 @@ async fn send_encrypted( url.set_path("msg"); tracing::debug!(?from, to = %url, "making http request: sending encrypted message"); let action = || async { -<<<<<<< HEAD - let response = client - .post(url.clone()) - .header("content-type", "application/json") - .json(&message) -<<<<<<< HEAD -<<<<<<< HEAD -======= - .timeout(Duration::from_millis(400)) ->>>>>>> b340f1a0 (increase timeout) -======= - .timeout(Duration::from_millis(200)) ->>>>>>> 26a2f6a4 (revert timeout, change presig timeout) - .send() - .await - .map_err(SendError::ReqwestClientError)?; -======= let response = tokio::time::timeout( request_timeout, client @@ -85,7 +68,6 @@ async fn send_encrypted( .map_err(|_| SendError::Timeout(format!("send encrypted from {from:?} to {url}")))? .map_err(SendError::ReqwestClientError)?; ->>>>>>> 611df5c0 (refactor: all param in env variables) let status = response.status(); let response_bytes = response .bytes()