Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: Added max_concurrent_generation for limiting triple timeouts #499

Merged
merged 2 commits into from
Mar 20, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions integration-tests/src/multichain/containers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,8 @@ impl<'a> Node<'a> {
storage_options: storage_options.clone(),
min_triples: cfg.triple_cfg.min_triples,
max_triples: cfg.triple_cfg.max_triples,
max_concurrent_introduction: cfg.triple_cfg.max_concurrent_introduction,
max_concurrent_generation: cfg.triple_cfg.max_concurrent_generation,
}
.into_str_args();
let image: GenericImage = GenericImage::new("near/mpc-recovery-node", "latest")
Expand Down
2 changes: 2 additions & 0 deletions integration-tests/src/multichain/local.rs
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,8 @@ impl Node {
storage_options: storage_options.clone(),
min_triples: cfg.triple_cfg.min_triples,
max_triples: cfg.triple_cfg.max_triples,
max_concurrent_introduction: cfg.triple_cfg.max_concurrent_introduction,
max_concurrent_generation: cfg.triple_cfg.max_concurrent_generation,
};

let mpc_node_id = format!("multichain/{account_id}", account_id = account_id);
Expand Down
8 changes: 6 additions & 2 deletions integration-tests/src/multichain/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,16 +18,20 @@ const NETWORK: &str = "mpc_it_network";
#[derive(Clone)]
pub struct MultichainConfig {
pub nodes: usize,
pub threshold: usize,
pub triple_cfg: TripleConfig,
}

impl Default for MultichainConfig {
fn default() -> Self {
Self {
nodes: 3,
threshold: 2,
triple_cfg: TripleConfig {
min_triples: 2,
max_triples: 10,
max_concurrent_introduction: 8,
max_concurrent_generation: 24,
},
}
}
Expand Down Expand Up @@ -236,7 +240,7 @@ pub async fn docker(cfg: MultichainConfig, docker_client: &DockerClient) -> anyh
ctx.mpc_contract
.call("init")
.args_json(json!({
"threshold": 2,
"threshold": cfg.threshold,
"candidates": candidates
}))
.transact()
Expand Down Expand Up @@ -286,7 +290,7 @@ pub async fn host(cfg: MultichainConfig, docker_client: &DockerClient) -> anyhow
ctx.mpc_contract
.call("init")
.args_json(json!({
"threshold": 2,
"threshold": cfg.threshold,
"candidates": candidates
}))
.transact()
Expand Down
2 changes: 1 addition & 1 deletion integration-tests/tests/multichain/actions/wait_for.rs
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ pub async fn has_at_least_triples<'a>(
let mut state_views = Vec::new();
for id in 0..ctx.nodes.len() {
let state_view = is_enough_triples(id)
.retry(&ExponentialBuilder::default().with_max_times(8))
.retry(&ExponentialBuilder::default().with_max_times(10))
.await
.with_context(|| format!("mpc node '{id}' failed to generate '{expected_triple_count}' triples before deadline"))?;
state_views.push(state_view);
Expand Down
21 changes: 15 additions & 6 deletions integration-tests/tests/multichain/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -90,22 +90,31 @@ async fn test_signature_offline_node() -> anyhow::Result<()> {
#[test(tokio::test)]
async fn test_signature_large_stockpile() -> anyhow::Result<()> {
const SIGNATURE_AMOUNT: usize = 10;
const NODES: usize = 3;
const THRESHOLD: usize = 2;
const MIN_TRIPLES: usize = 20;
const MAX_TRIPLES: usize = 2 * NODES * MIN_TRIPLES;

let triple_cfg = TripleConfig {
// This is the min triples required by each node.
min_triples: 20,
// This is the total amount of triples that will be generated by all nodes. This also
// determines how many concurrent triples can be generated.
max_triples: 100,
min_triples: MIN_TRIPLES,
// This is the total amount of triples that will be generated by all nodes.
max_triples: MAX_TRIPLES,
// This is the amount each node can introduce a triple generation protocol into the system.
max_concurrent_introduction: 8,
// This is the maximum amount of triples that can be generated concurrently by the whole system.
max_concurrent_generation: 24,
};
let config = MultichainConfig {
triple_cfg,
..MultichainConfig::default()
nodes: NODES,
threshold: THRESHOLD,
};

with_multichain_nodes(config, |ctx| {
Box::pin(async move {
let state_0 = wait_for::running_mpc(&ctx, 0).await?;
assert_eq!(state_0.participants.len(), 3);
assert_eq!(state_0.participants.len(), NODES);
wait_for::has_at_least_triples(&ctx, triple_cfg.min_triples).await?;
wait_for::has_at_least_presignatures(&ctx, SIGNATURE_AMOUNT).await?;

Expand Down
2 changes: 1 addition & 1 deletion node/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ mpc-contract = { path = "../contract" }
mpc-keys = { path = "../keys" }

itertools = "0.12.0"
prometheus = { version = "0.13.3"}
prometheus = { version = "0.13.3" }
once_cell = "1.13.1"

[dev-dependencies]
Expand Down
28 changes: 28 additions & 0 deletions node/src/cli.rs
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,24 @@ pub enum Cli {
/// At maximum, how many triples to stockpile on this node.
#[arg(long, env("MPC_RECOVERY_MAX_TRIPLES"), default_value("10"))]
max_triples: usize,

/// At maximum, how many triple protocols can this current node introduce
/// at the same time. This should be something like `max_concurrent_gen / num_nodes`
#[arg(
long,
env("MPC_RECOVERY_MAX_CONCURRENT_INTRODUCTION"),
default_value("2")
)]
max_concurrent_introduction: usize,

/// At maximum, how many ongoing protocols for triples to be running
/// at the same time. The rest will be queued up.
#[arg(
long,
env("MPC_RECOVERY_MAX_CONCURRENT_GENERATION"),
default_value("8")
)]
max_concurrent_generation: usize,
},
}

Expand All @@ -77,6 +95,8 @@ impl Cli {
storage_options,
min_triples,
max_triples,
max_concurrent_introduction,
max_concurrent_generation,
} => {
let mut args = vec![
"start".to_string(),
Expand All @@ -98,6 +118,10 @@ impl Cli {
min_triples.to_string(),
"--max-triples".to_string(),
max_triples.to_string(),
"--max-concurrent-introduction".to_string(),
max_concurrent_introduction.to_string(),
"--max-concurrent-generation".to_string(),
max_concurrent_generation.to_string(),
];
if let Some(my_address) = my_address {
args.extend(vec!["--my-address".to_string(), my_address.to_string()]);
Expand Down Expand Up @@ -137,6 +161,8 @@ pub fn run(cmd: Cli) -> anyhow::Result<()> {
storage_options,
min_triples,
max_triples,
max_concurrent_introduction,
max_concurrent_generation,
} => {
let sign_queue = Arc::new(RwLock::new(SignQueue::new()));
tokio::runtime::Builder::new_multi_thread()
Expand Down Expand Up @@ -182,6 +208,8 @@ pub fn run(cmd: Cli) -> anyhow::Result<()> {
TripleConfig {
min_triples,
max_triples,
max_concurrent_introduction,
max_concurrent_generation,
},
triple_storage,
);
Expand Down
75 changes: 65 additions & 10 deletions node/src/protocol/triple.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ use k256::elliptic_curve::group::GroupEncoding;
use k256::Secp256k1;
use serde::{Deserialize, Serialize};
use std::collections::hash_map::Entry;
use std::collections::{HashMap, VecDeque};
use std::collections::{HashMap, HashSet, VecDeque};
use std::time::Instant;

/// The minimum amount of triples that each node needs to own.
Expand All @@ -39,7 +39,7 @@ pub struct TripleGenerator {
pub id: TripleId,
pub participants: Vec<Participant>,
pub protocol: TripleProtocol,
pub timestamp: Instant,
pub timestamp: Option<Instant>,
}

impl TripleGenerator {
Expand All @@ -48,12 +48,13 @@ impl TripleGenerator {
id,
participants,
protocol,
timestamp: Instant::now(),
timestamp: None,
}
}

pub fn poke(&mut self) -> Result<Action<TripleGenerationOutput<Secp256k1>>, ProtocolError> {
if self.timestamp.elapsed() > crate::types::PROTOCOL_TRIPLE_TIMEOUT {
let timestamp = self.timestamp.get_or_insert_with(Instant::now);
if timestamp.elapsed() > crate::types::PROTOCOL_TRIPLE_TIMEOUT {
tracing::info!(id = self.id, "triple protocol timed out");
return Err(ProtocolError::Other(
anyhow::anyhow!("triple protocol timed out").into(),
Expand All @@ -71,18 +72,32 @@ pub struct TripleConfig {
pub min_triples: usize,
/// Maximum amount of triples that is owned by each node.
pub max_triples: usize,
// TODO: utilize concurrent config for threaded triple generation.
// / Maximum amount of concurrent triple generation that can be done per node.
// pub max_concurrent_triples: usize,
/// Maximum amount of concurrent triple generation that can be introduce by this node.
pub max_concurrent_introduction: usize,
/// Maximum amount of concurrent triple generation that can be done per node.
pub max_concurrent_generation: usize,
}

/// Abstracts how triples are generated by providing a way to request a new triple that will be
/// complete some time in the future and a way to take an already generated triple.
pub struct TripleManager {
/// Completed unspent triples
pub triples: HashMap<TripleId, Triple>,
/// Ongoing triple generation protocols

/// The pool of triple protocols that have yet to be completed.
pub generators: HashMap<TripleId, TripleGenerator>,

/// Triples that are queued to be poked. If these generators sit for too long in
/// the queue, they will be removed due to triple generation timeout.
pub queued: VecDeque<TripleId>,

/// Ongoing triple generation protocols. Once added here, they will not be removed until
/// they are completed or timed out.
pub ongoing: HashSet<TripleId>,

/// The set of triples that were introduced to the system by the current node.
pub introduced: HashSet<TripleId>,

/// List of triple ids generation of which was initiated by the current node.
pub mine: VecDeque<TripleId>,

Expand Down Expand Up @@ -117,6 +132,9 @@ impl TripleManager {
Self {
triples: all_triples,
generators: HashMap::new(),
queued: VecDeque::new(),
ongoing: HashSet::new(),
introduced: HashSet::new(),
mine,
me,
threshold,
Expand Down Expand Up @@ -166,6 +184,8 @@ impl TripleManager {
)?);
self.generators
.insert(id, TripleGenerator::new(id, participants, protocol));
self.queued.push_back(id);
self.introduced.insert(id);
Ok(())
}

Expand All @@ -175,9 +195,23 @@ impl TripleManager {
let TripleConfig {
min_triples,
max_triples,
max_concurrent_introduction,
max_concurrent_generation,
} = self.triple_cfg;
let not_enough_triples =
|| self.my_len() < min_triples && self.potential_len() < max_triples;

let not_enough_triples = || {
// Stopgap to prevent too many triples in the system. This should be around min_triple*nodes*2
// for good measure so that we have enough triples to do presig generation while also maintain
// the minimum number of triples where a single node can't flood the system.
if self.potential_len() >= max_triples {
return false;
}

// We will always try to generate a new triple if we have less than the minimum
self.my_len() < min_triples
&& self.introduced.len() < max_concurrent_introduction
&& self.generators.len() < max_concurrent_generation
};

if not_enough_triples() {
self.generate(participants)?;
Expand Down Expand Up @@ -290,6 +324,7 @@ impl TripleManager {
self.threshold,
)?);
let generator = e.insert(TripleGenerator::new(id, participants, protocol));
self.queued.push_back(id);
Ok(Some(&mut generator.protocol))
}
Entry::Occupied(e) => Ok(Some(&mut e.into_mut().protocol)),
Expand All @@ -302,16 +337,32 @@ impl TripleManager {
///
/// An empty vector means we cannot progress until we receive a new message.
pub async fn poke(&mut self) -> Result<Vec<(Participant, TripleMessage)>, ProtocolError> {
// Add more protocols to the ongoing pool if there is space.
let to_generate_len = self.triple_cfg.max_concurrent_generation - self.ongoing.len();
if !self.queued.is_empty() && to_generate_len > 0 {
for _ in 0..to_generate_len {
self.queued.pop_front().map(|id| self.ongoing.insert(id));
}
}

let mut messages = Vec::new();
let mut result = Ok(());
let mut triples_to_insert = Vec::new();
self.generators.retain(|id, generator| {
if !self.ongoing.contains(id) {
// If the protocol is not ongoing, we should retain it for the next time
// it is in the ongoing pool.
return true;
}

loop {
let action = match generator.poke() {
Ok(action) => action,
Err(e) => {
result = Err(e);
self.failed_triples.insert(*id, Instant::now());
self.ongoing.remove(id);
self.introduced.remove(id);
tracing::info!("added {} to failed triples", id.clone());
break false;
}
Expand Down Expand Up @@ -386,6 +437,10 @@ impl TripleManager {
self.triples.insert(*id, triple.clone());

triples_to_insert.push(triple.clone());

// Protocol done, remove it from the ongoing pool.
self.ongoing.remove(id);
self.introduced.remove(id);
// Do not retain the protocol
break false;
}
Expand Down
2 changes: 2 additions & 0 deletions node/src/test_utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@ const STARTING_EPOCH: u64 = 0;
const DEFAULT_TRIPLE_CONFIG: TripleConfig = TripleConfig {
min_triples: 2,
max_triples: 10,
max_concurrent_introduction: 4,
max_concurrent_generation: 16,
};

struct TestTripleManagers {
Expand Down
8 changes: 4 additions & 4 deletions node/src/types.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,13 +13,13 @@ use crate::gcp::value::{FromValue, IntoValue, Value};
use crate::gcp::{DatastoreResult, GcpService, KeyKind};
use crate::protocol::contract::ResharingContractState;

/// Default timeout for triple/presig generation protocols. Times out after 3 minutes of being alive.
pub const PROTOCOL_TRIPLE_TIMEOUT: Duration = Duration::from_secs(3 * 60);
/// Default timeout for triple/presig generation protocols. Times out after 10 minutes of being alive.
pub const PROTOCOL_TRIPLE_TIMEOUT: Duration = Duration::from_secs(10 * 60);

/// Default timeout for triple/presig generation protocols. Times out after 1 minutes of being alive since this should be shorted lived.
/// Default timeout for triple/presig generation protocols. Times out after 1 minute of being alive since this should be shorted lived.
pub const PROTOCOL_PRESIG_TIMEOUT: Duration = Duration::from_secs(60);

/// Default timeout for signature generation protocol. Times out after 1 minutes of being alive since this should be shorted lived.
/// Default timeout for signature generation protocol. Times out after 1 minute of being alive since this should be shorted lived.
pub const PROTOCOL_SIGNATURE_TIMEOUT: Duration = Duration::from_secs(60);

/// Default invalidation time for failed triples. 120 mins
Expand Down
Loading