Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

use tokio::time::timeout to timeout fetch state and send encrypted #889

Merged
merged 10 commits into from
Oct 17, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 13 additions & 1 deletion chain-signatures/node/src/cli.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ use crate::config::{Config, LocalConfig, NetworkConfig, OverrideConfig};
use crate::gcp::GcpService;
use crate::protocol::{MpcSignProtocol, SignQueue};
use crate::storage::triple_storage::LockTripleNodeStorageBox;
use crate::{indexer, storage, web};
use crate::{http_client, indexer, mesh, storage, web};
use clap::Parser;
use local_ip_address::local_ip;
use near_account_id::AccountId;
Expand Down Expand Up @@ -63,6 +63,10 @@ pub enum Cli {
/// referer header for mainnet whitelist
#[arg(long, env("MPC_CLIENT_HEADER_REFERER"), default_value(None))]
client_header_referer: Option<String>,
#[clap(flatten)]
mesh_options: mesh::Options,
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do partners need to update their terraform setup?

#[clap(flatten)]
message_options: http_client::Options,
},
}

Expand All @@ -83,6 +87,8 @@ impl Cli {
storage_options,
override_config,
client_header_referer,
mesh_options,
message_options,
} => {
let mut args = vec![
"start".to_string(),
Expand Down Expand Up @@ -120,6 +126,8 @@ impl Cli {

args.extend(indexer_options.into_str_args());
args.extend(storage_options.into_str_args());
args.extend(mesh_options.into_str_args());
args.extend(message_options.into_str_args());
args
}
}
Expand Down Expand Up @@ -176,6 +184,8 @@ pub fn run(cmd: Cli) -> anyhow::Result<()> {
storage_options,
override_config,
client_header_referer,
mesh_options,
message_options,
} => {
let sign_queue = Arc::new(RwLock::new(SignQueue::new()));
let rt = tokio::runtime::Builder::new_multi_thread()
Expand Down Expand Up @@ -237,6 +247,8 @@ pub fn run(cmd: Cli) -> anyhow::Result<()> {
sign_sk,
},
}),
mesh_options,
message_options,
);

rt.block_on(async {
Expand Down
52 changes: 43 additions & 9 deletions chain-signatures/node/src/http_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,19 @@ use std::time::{Duration, Instant};
use tokio_retry::strategy::{jitter, ExponentialBackoff};
use tokio_retry::Retry;

#[derive(Debug, Clone, clap::Parser)]
#[group(id = "message_options")]
pub struct Options {
#[clap(long, env("MPC_MESSAGE_TIMEOUT"), default_value = "1000")]
pub timeout: u64,
}

impl Options {
pub fn into_str_args(self) -> Vec<String> {
vec!["--timeout".to_string(), self.timeout.to_string()]
}
}

#[derive(Debug, thiserror::Error)]
pub enum SendError {
#[error("http request was unsuccessful: {0}")]
Expand All @@ -36,19 +49,25 @@ async fn send_encrypted<U: IntoUrl>(
client: &Client,
url: U,
message: Vec<Ciphered>,
request_timeout: Duration,
) -> Result<(), SendError> {
let _span = tracing::info_span!("message_request");
let mut url = url.into_url()?;
url.set_path("msg");
tracing::debug!(?from, to = %url, "making http request: sending encrypted message");
let action = || async {
let response = client
.post(url.clone())
.header("content-type", "application/json")
.json(&message)
.send()
.await
.map_err(SendError::ReqwestClientError)?;
let response = tokio::time::timeout(
request_timeout,
client
.post(url.clone())
.header("content-type", "application/json")
.json(&message)
.send(),
)
.await
.map_err(|_| SendError::Timeout(format!("send encrypted from {from:?} to {url}")))?
.map_err(SendError::ReqwestClientError)?;

let status = response.status();
let response_bytes = response
.bytes()
Expand All @@ -75,13 +94,21 @@ async fn send_encrypted<U: IntoUrl>(

// TODO: add in retry logic either in struct or at call site.
// TODO: add check for participant list to see if the messages to be sent are still valid.
#[derive(Default)]
pub struct MessageQueue {
deque: VecDeque<(ParticipantInfo, MpcMessage, Instant)>,
seen_counts: HashSet<String>,
message_options: Options,
}

impl MessageQueue {
pub fn new(options: Options) -> Self {
Self {
deque: VecDeque::default(),
seen_counts: HashSet::default(),
message_options: options,
}
}

pub fn len(&self) -> usize {
self.deque.len()
}
Expand Down Expand Up @@ -147,7 +174,14 @@ impl MessageQueue {
crate::metrics::NUM_SEND_ENCRYPTED_TOTAL
.with_label_values(&[account_id.as_str()])
.inc();
if let Err(err) = send_encrypted(from, client, &info.url, encrypted_partition).await
if let Err(err) = send_encrypted(
from,
client,
&info.url,
encrypted_partition,
Duration::from_millis(self.message_options.timeout),
)
.await
{
crate::metrics::NUM_SEND_ENCRYPTED_FAILURE
.with_label_values(&[account_id.as_str()])
Expand Down
121 changes: 74 additions & 47 deletions chain-signatures/node/src/mesh/connection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,11 +6,10 @@ use tokio::sync::RwLock;
use url::Url;

use crate::protocol::contract::primitives::Participants;
use crate::protocol::ParticipantInfo;
use crate::protocol::ProtocolState;
use crate::web::StateView;

const DEFAULT_TIMEOUT: Duration = Duration::from_secs(1);

// TODO: this is a basic connection pool and does not do most of the work yet. This is
// mostly here just to facilitate offline node handling for now.
// TODO/NOTE: we can use libp2p to facilitate most the of low level TCP connection work.
Expand All @@ -25,12 +24,43 @@ pub struct Pool {
current_active: RwLock<Option<(Participants, Instant)>>,
// Potentially active participants that we can use to establish a connection in the next epoch.
potential_active: RwLock<Option<(Participants, Instant)>>,
fetch_participant_timeout: Duration,
refresh_active_timeout: Duration,
}

#[derive(Debug, Clone, PartialEq, Eq, thiserror::Error)]
pub enum FetchParticipantError {
#[error("request timed out")]
Timeout,
#[error("Response cannot be converted to JSON")]
JsonConversion,
#[error("Invalid URL")]
InvalidUrl,
#[error("Network error: {0}")]
NetworkError(String),
}

impl Pool {
pub fn new(fetch_participant_timeout: Duration, refresh_active_timeout: Duration) -> Self {
tracing::info!(
?fetch_participant_timeout,
?refresh_active_timeout,
"creating a new pool"
);
Self {
http: reqwest::Client::new(),
connections: RwLock::new(Participants::default()),
potential_connections: RwLock::new(Participants::default()),
status: RwLock::new(HashMap::default()),
current_active: RwLock::new(Option::default()),
potential_active: RwLock::new(Option::default()),
fetch_participant_timeout,
refresh_active_timeout,
}
}
pub async fn ping(&self) -> Participants {
if let Some((ref active, timestamp)) = *self.current_active.read().await {
if timestamp.elapsed() < DEFAULT_TIMEOUT {
if timestamp.elapsed() < self.refresh_active_timeout {
return active.clone();
}
}
Expand All @@ -40,35 +70,15 @@ impl Pool {
let mut status = self.status.write().await;
let mut participants = Participants::default();
for (participant, info) in connections.iter() {
let Ok(Ok(url)) = Url::parse(&info.url).map(|url| url.join("/state")) else {
tracing::error!(
"Pool.ping url is invalid participant {:?} url {} /state",
participant,
info.url
);
continue;
};

let Ok(resp) = self.http.get(url.clone()).send().await else {
tracing::warn!(
"Pool.ping resp err participant {:?} url {}",
participant,
url
);
continue;
};

let Ok(state): Result<StateView, _> = resp.json().await else {
tracing::warn!(
"Pool.ping state view err participant {:?} url {}",
participant,
url
);
continue;
};

status.insert(*participant, state);
participants.insert(participant, info.clone());
match self.fetch_participant_state(info).await {
Ok(state) => {
status.insert(*participant, state);
participants.insert(participant, info.clone());
}
Err(e) => {
tracing::warn!("Fetch state for participant {participant:?} with url {} has failed with error {e}.", info.url);
}
}
}
drop(status);

Expand All @@ -79,7 +89,7 @@ impl Pool {

pub async fn ping_potential(&self) -> Participants {
if let Some((ref active, timestamp)) = *self.potential_active.read().await {
if timestamp.elapsed() < DEFAULT_TIMEOUT {
if timestamp.elapsed() < self.refresh_active_timeout {
return active.clone();
}
}
Expand All @@ -89,20 +99,15 @@ impl Pool {
let mut status = self.status.write().await;
let mut participants = Participants::default();
for (participant, info) in connections.iter() {
let Ok(Ok(url)) = Url::parse(&info.url).map(|url| url.join("/state")) else {
continue;
};

let Ok(resp) = self.http.get(url).send().await else {
continue;
};

let Ok(state): Result<StateView, _> = resp.json().await else {
continue;
};

status.insert(*participant, state);
participants.insert(participant, info.clone());
match self.fetch_participant_state(info).await {
Ok(state) => {
status.insert(*participant, state);
participants.insert(participant, info.clone());
}
Err(e) => {
tracing::warn!("Fetch state for participant {participant:?} with url {} has failed with error {e}.", info.url);
}
}
}
drop(status);

Expand Down Expand Up @@ -159,4 +164,26 @@ impl Pool {
_ => false,
})
}

async fn fetch_participant_state(
&self,
participant_info: &ParticipantInfo,
) -> Result<StateView, FetchParticipantError> {
let Ok(Ok(url)) = Url::parse(&participant_info.url).map(|url| url.join("/state")) else {
return Err(FetchParticipantError::InvalidUrl);
};
match tokio::time::timeout(
self.fetch_participant_timeout,
self.http.get(url.clone()).send(),
)
.await
{
Ok(Ok(resp)) => match resp.json::<StateView>().await {
Ok(state) => Ok(state),
Err(_) => Err(FetchParticipantError::JsonConversion),
},
Ok(Err(e)) => Err(FetchParticipantError::NetworkError(e.to_string())),
Err(_) => Err(FetchParticipantError::Timeout),
}
}
}
38 changes: 37 additions & 1 deletion chain-signatures/node/src/mesh/mod.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,34 @@
use std::time::Duration;

use crate::protocol::contract::primitives::Participants;
use crate::protocol::ProtocolState;

pub mod connection;

#[derive(Default)]
#[derive(Debug, Clone, clap::Parser)]
#[group(id = "mesh_options")]
pub struct Options {
#[clap(
long,
env("MPC_MESH_FETCH_PARTICIPANT_TIMEOUT"),
default_value = "1000"
)]
pub fetch_participant_timeout: u64,
#[clap(long, env("MPC_MESH_REFRESH_ACTIVE_TIMEOUT"), default_value = "1000")]
pub refresh_active_timeout: u64,
}

impl Options {
pub fn into_str_args(self) -> Vec<String> {
vec![
"--fetch-participant-timeout".to_string(),
self.fetch_participant_timeout.to_string(),
"--refresh-active-timeout".to_string(),
self.refresh_active_timeout.to_string(),
]
}
}

pub struct Mesh {
/// Pool of connections to participants. Used to check who is alive in the network.
pub connections: connection::Pool,
Expand All @@ -17,6 +42,17 @@ pub struct Mesh {
}

impl Mesh {
pub fn new(options: Options) -> Self {
Self {
connections: connection::Pool::new(
Duration::from_millis(options.fetch_participant_timeout),
Duration::from_millis(options.refresh_active_timeout),
),
active_participants: Participants::default(),
active_potential_participants: Participants::default(),
}
}

/// Participants that are active at the beginning of each protocol loop.
pub fn active_participants(&self) -> &Participants {
&self.active_participants
Expand Down
Loading
Loading