Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

restore triple/presignature generation condition and optimize triple persistence #466

Merged
merged 16 commits into from
Feb 27, 2024
4 changes: 2 additions & 2 deletions integration-tests/src/env/containers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -456,7 +456,7 @@ impl<'a> Datastore<'a> {
tracing::info!("Running datastore container...");
let image = GenericImage::new(
"gcr.io/google.com/cloudsdktool/google-cloud-cli",
"436.0.0-emulators",
"464.0.0-emulators",
)
.with_wait_for(WaitFor::message_on_stderr("Dev App Server is now running."))
.with_exposed_port(Self::CONTAINER_PORT)
Expand All @@ -477,6 +477,7 @@ impl<'a> Datastore<'a> {
"--host-port".to_string(),
format!("0.0.0.0:{}", Self::CONTAINER_PORT),
"--no-store-on-disk".to_string(),
"--consistency=1.0".to_string(),
],
)
.into();
Expand Down Expand Up @@ -719,7 +720,6 @@ impl SignerNode<'_> {
container,
address: full_address,
local_address: format!("http://127.0.0.1:{host_port}"),

env: ctx.env.clone(),
node_id,
sk_share: sk_share.clone(),
Expand Down
41 changes: 32 additions & 9 deletions integration-tests/tests/multichain/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@ pub mod actions;

use crate::with_multichain_nodes;
use actions::wait_for;
use mpc_recovery_integration_tests::env::containers::DockerClient;
use mpc_recovery_node::test_utils;
use test_log::test;

#[test(tokio::test)]
Expand Down Expand Up @@ -38,15 +40,6 @@ async fn test_triples_and_presignatures() -> anyhow::Result<()> {
assert_eq!(state_0.participants.len(), 3);
wait_for::has_at_least_triples(&ctx, 2).await?;
wait_for::has_at_least_presignatures(&ctx, 2).await?;
// TODO: add test that checks #triples in datastore
// for account_id in state_0.participants.keys() {
// let triple_storage = ctx.nodes.triple_storage(account_id.to_string()).await?;
// // This errs out with
// // Err(GcpError(BadRequest(Object {"error": Object {"code": Number(400), "message": String("Payload isn't valid for request."), "status": String("INVALID_ARGUMENT")}})))
// let _load_res = triple_storage.load().await;
// //print!("result is: {:?}", load_res);
// //assert_eq!(load_res.len(), 6);
// }
Ok(())
})
})
Expand All @@ -66,3 +59,33 @@ async fn test_signature() -> anyhow::Result<()> {
})
.await
}

#[test(tokio::test)]
async fn test_triples_persistence_for_generation() -> anyhow::Result<()> {
let docker_client = DockerClient::default();
let gcp_project_id = "test-triple-persistence";
let docker_network = "test-triple-persistence";
docker_client.create_network(docker_network).await?;
let datastore =
crate::env::containers::Datastore::run(&docker_client, docker_network, gcp_project_id)
.await?;
let datastore_url = datastore.local_address.clone();
// verifies that @triple generation, the datastore triples are in sync with local generated triples
test_utils::test_triple_generation(Some(datastore_url.clone())).await;
Ok(())
}

#[test(tokio::test)]
async fn test_triples_persistence_for_deletion() -> anyhow::Result<()> {
let docker_client = DockerClient::default();
let gcp_project_id = "test-triple-persistence";
let docker_network = "test-triple-persistence";
docker_client.create_network(docker_network).await?;
let datastore =
crate::env::containers::Datastore::run(&docker_client, docker_network, gcp_project_id)
.await?;
let datastore_url = datastore.local_address.clone();
// verifies that @triple deletion, the datastore is working as expected
test_utils::test_triple_deletion(Some(datastore_url)).await;
Ok(())
}
2 changes: 2 additions & 0 deletions node/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -50,5 +50,7 @@ near-sdk = "5.0.0-alpha.1"
mpc-contract = { path = "../contract" }
mpc-keys = { path = "../keys" }

itertools = "0.12.0"

[dev-dependencies]
itertools = "0.12.0"
6 changes: 6 additions & 0 deletions node/src/gcp/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,7 @@ pub struct DatastoreService {
datastore: Datastore<HttpsConnector<HttpConnector>>,
project_id: String,
env: String,
is_emulator: bool,
}

pub type DatastoreResult<T> = std::result::Result<T, error::DatastoreStorageError>;
Expand All @@ -86,6 +87,10 @@ pub trait KeyKind {
}

impl DatastoreService {
pub fn is_emulator(&self) -> bool {
self.is_emulator
}

#[tracing::instrument(level = "debug", skip_all, fields(key = name_key.to_string()))]
pub async fn get<K: ToString, T: FromValue + KeyKind>(
&self,
Expand Down Expand Up @@ -377,6 +382,7 @@ impl GcpService {
datastore,
project_id: project_id_non_empty.clone(),
env: storage_options.env.clone().unwrap(),
is_emulator: storage_options.gcp_datastore_url.is_some(),
volovyks marked this conversation as resolved.
Show resolved Hide resolved
},
secret_manager: SecretManagerService {
secret_manager,
Expand Down
1 change: 1 addition & 0 deletions node/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ pub mod kdf;
pub mod protocol;
pub mod rpc_client;
pub mod storage;
pub mod test_utils;
pub mod types;
pub mod util;
pub mod web;
39 changes: 33 additions & 6 deletions node/src/protocol/consensus.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ use crate::protocol::state::{GeneratingState, ResharingState};
use crate::protocol::triple::TripleManager;
use crate::storage::secret_storage::SecretNodeStorageBox;
use crate::storage::triple_storage::LockTripleNodeStorageBox;
use crate::storage::triple_storage::TripleData;
use crate::types::{KeygenProtocol, ReshareProtocol, SecretKeyShare};
use crate::util::AffinePointExt;
use crate::{http_client, rpc_client};
Expand Down Expand Up @@ -675,17 +676,13 @@ impl ConsensusProtocol for JoiningState {
impl ConsensusProtocol for NodeState {
async fn advance<C: ConsensusCtx + Send + Sync>(
self,
mut ctx: C,
ctx: C,
contract_state: ProtocolState,
) -> Result<NodeState, ConsensusError> {
match self {
NodeState::Starting => {
let persistent_node_data = ctx.secret_storage().load().await?;
let triple_storage = ctx.triple_storage();
let read_lock = triple_storage.read().await;
let triple_data_result = read_lock.load().await;
drop(read_lock);
let triple_data = triple_data_result.ok().unwrap_or_default();
let triple_data = load_triples(ctx).await?;
Ok(NodeState::Started(StartedState {
persistent_node_data,
triple_data,
Expand All @@ -701,6 +698,36 @@ impl ConsensusProtocol for NodeState {
}
}

async fn load_triples<C: ConsensusCtx + Send + Sync>(
mut ctx: C,
) -> Result<Vec<TripleData>, ConsensusError> {
let triple_storage = ctx.triple_storage();
let read_lock = triple_storage.read().await;
let mut retries = 3;
let mut error = None;
while retries > 0 {
match read_lock.load().await {
Err(DatastoreStorageError::FetchEntitiesError(_)) => {
tracing::info!("There are no triples persisted.");
drop(read_lock);
return Ok(vec![]);
}
Err(e) => {
retries -= 1;
tracing::warn!(?e, "triple load failed.");
error = Some(e);
tokio::time::sleep(std::time::Duration::from_millis(500)).await;
}
Ok(loaded_triples) => {
drop(read_lock);
return Ok(loaded_triples);
}
}
}
drop(read_lock);
Err(ConsensusError::DatastoreStorageError(error.unwrap()))
}

async fn start_resharing<C: ConsensusCtx>(
private_share: Option<SecretKeyShare>,
ctx: C,
Expand Down
4 changes: 2 additions & 2 deletions node/src/protocol/cryptography.rs
Original file line number Diff line number Diff line change
Expand Up @@ -297,7 +297,7 @@ impl CryptographicProtocol for RunningState {
}

let mut triple_manager = self.triple_manager.write().await;
if triple_manager.my_len() < 2 {
if triple_manager.my_len() < 2 && triple_manager.potential_len() < 10 {
volovyks marked this conversation as resolved.
Show resolved Hide resolved
triple_manager.generate()?;
}
for (p, msg) in triple_manager.poke().await? {
Expand All @@ -306,7 +306,7 @@ impl CryptographicProtocol for RunningState {
}

let mut presignature_manager = self.presignature_manager.write().await;
if presignature_manager.my_len() < 2 {
if presignature_manager.my_len() < 2 && presignature_manager.potential_len() < 10 {
// To ensure there is no contention between different nodes we are only using triples
// that we proposed. This way in a non-BFT environment we are guaranteed to never try
// to use the same triple as any other node.
Expand Down
4 changes: 4 additions & 0 deletions node/src/protocol/message.rs
Original file line number Diff line number Diff line change
Expand Up @@ -242,6 +242,10 @@ impl MessageHandler for RunningState {
Err(presignature::GenerationError::CaitSithInitializationError(error)) => {
return Err(error.into())
}
Err(presignature::GenerationError::DatastoreStorageError(_)) => {
// Store the message until we are ready to process it
leftover_messages.push(message)
}
}
}
if !leftover_messages.is_empty() {
Expand Down
5 changes: 4 additions & 1 deletion node/src/protocol/mod.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
pub mod contract;
mod cryptography;
mod presignature;
pub mod presignature;
mod signature;
pub mod triple;

Expand Down Expand Up @@ -230,18 +230,21 @@ impl MpcSignProtocol {
Ok(state) => state,
Err(err) => {
tracing::info!("protocol unable to progress: {err:?}");
tokio::time::sleep(Duration::from_millis(1000)).await;
continue;
}
};
let mut state = match state.advance(&mut self, contract_state).await {
Ok(state) => state,
Err(err) => {
tracing::info!("protocol unable to advance: {err:?}");
tokio::time::sleep(Duration::from_millis(1000)).await;
continue;
}
};
if let Err(err) = state.handle(&self, &mut queue).await {
tracing::info!("protocol unable to handle messages: {err:?}");
tokio::time::sleep(Duration::from_millis(1000)).await;
continue;
}

Expand Down
18 changes: 11 additions & 7 deletions node/src/protocol/presignature.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
use super::message::PresignatureMessage;
use super::triple::{Triple, TripleId, TripleManager};
use crate::gcp::error::DatastoreStorageError;
use crate::types::{PresignatureProtocol, PublicKey, SecretKeyShare};
use crate::util::AffinePointExt;
use cait_sith::protocol::{Action, InitializationError, Participant, ProtocolError};
Expand Down Expand Up @@ -70,6 +71,8 @@ pub enum GenerationError {
TripleIsMissing(TripleId),
#[error("cait-sith initialization error: {0}")]
CaitSithInitializationError(#[from] InitializationError),
#[error("datastore storage error: {0}")]
DatastoreStorageError(#[from] DatastoreStorageError),
}

/// Abstracts how triples are generated by providing a way to request a new triple that will be
Expand Down Expand Up @@ -122,6 +125,11 @@ impl PresignatureManager {
self.presignatures.len() + self.generators.len()
}

/// Returns if there are unspent presignatures available in the manager.
pub fn is_empty(&self) -> bool {
self.len() == 0
}

#[allow(clippy::too_many_arguments)]
fn generate_internal(
participants: &[Participant],
Expand Down Expand Up @@ -199,13 +207,9 @@ impl PresignatureManager {
let (triple0, triple1) =
match triple_manager.take_two(triple0, triple1, false).await {
Ok(result) => result,
Err(missing_triple_id) => {
tracing::warn!(
triple0,
triple1,
"one of the triples is missing, can't join"
);
return Err(GenerationError::TripleIsMissing(missing_triple_id));
Err(error) => {
tracing::warn!(?error, triple0, triple1,);
return Err(error);
}
};
let generator = Self::generate_internal(
Expand Down
Loading
Loading