Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: stop infinite process of presignature message missing triples #560

Merged
merged 5 commits into from
May 8, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions node/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ cait-sith = { git = "https://github.com/LIT-Protocol/cait-sith.git", features =
"k256",
], rev = "8ad2316"}
clap = { version = "4.2", features = ["derive", "env"] }
chrono = "0.4.24"
google-datastore1 = "5"
google-secretmanager1 = "5"
hex = "0.4.3"
Expand Down
28 changes: 27 additions & 1 deletion node/src/protocol/message.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ use super::triple::TripleId;
use crate::gcp::error::SecretStorageError;
use crate::http_client::SendError;
use crate::mesh::Mesh;
use crate::util;

use async_trait::async_trait;
use cait_sith::protocol::{InitializationError, MessageData, Participant, ProtocolError};
Expand Down Expand Up @@ -43,6 +44,8 @@ pub struct TripleMessage {
pub epoch: u64,
pub from: Participant,
pub data: MessageData,
// UNIX timestamp as seconds since the epoch
pub timestamp: u64,
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's add this field to all message types. Let's make it a standard. We want to delete all types of messages from the queue when they expire. Message trait may be a good idea here.

}

#[derive(Serialize, Deserialize, Debug, PartialEq, Eq)]
Expand All @@ -53,6 +56,8 @@ pub struct PresignatureMessage {
pub epoch: u64,
pub from: Participant,
pub data: MessageData,
// UNIX timestamp as seconds since the epoch
pub timestamp: u64,
}

#[derive(Serialize, Deserialize, Debug, PartialEq, Eq)]
Expand All @@ -66,6 +71,8 @@ pub struct SignatureMessage {
pub epoch: u64,
pub from: Participant,
pub data: MessageData,
// UNIX timestamp as seconds since the epoch
pub timestamp: u64,
}

#[derive(Serialize, Deserialize, Debug, PartialEq, Eq)]
Expand Down Expand Up @@ -251,6 +258,14 @@ impl MessageHandler for RunningState {
for (id, queue) in queue.presignature_bins.entry(self.epoch).or_default() {
let mut leftover_messages = Vec::new();
while let Some(message) = queue.pop_front() {
// Skip message if it already timed out
if util::is_elapsed_longer_than_timeout(
message.timestamp,
crate::types::PROTOCOL_PRESIG_TIMEOUT,
) {
continue;
}

match presignature_manager
.get_or_generate(
participants,
Expand All @@ -267,8 +282,12 @@ impl MessageHandler for RunningState {
Err(presignature::GenerationError::AlreadyGenerated) => {
tracing::info!(id, "presignature already generated, nothing left to do")
}
Err(presignature::GenerationError::TripleIsGenerating(_)) => {
// Store the message until triple gets generated
leftover_messages.push(message)
}
Err(presignature::GenerationError::TripleIsMissing(_)) => {
// Store the message until we are ready to process it
// Store the message until triple is ready
leftover_messages.push(message)
}
Err(presignature::GenerationError::CaitSithInitializationError(error)) => {
Expand All @@ -293,6 +312,13 @@ impl MessageHandler for RunningState {
for (receipt_id, queue) in queue.signature_bins.entry(self.epoch).or_default() {
let mut leftover_messages = Vec::new();
while let Some(message) = queue.pop_front() {
// Skip message if it already timed out
if util::is_elapsed_longer_than_timeout(
message.timestamp,
crate::types::PROTOCOL_SIGNATURE_TIMEOUT,
) {
continue;
}
tracing::info!(
presignature_id = message.presignature_id,
"new signature message"
Expand Down
41 changes: 30 additions & 11 deletions node/src/protocol/presignature.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ use crate::types::{PresignatureProtocol, PublicKey, SecretKeyShare};
use crate::util::AffinePointExt;
use cait_sith::protocol::{Action, InitializationError, Participant, ProtocolError};
use cait_sith::{KeygenOutput, PresignArguments, PresignOutput};
use chrono::Utc;
use k256::Secp256k1;
use near_lake_primitives::AccountId;
use std::collections::hash_map::Entry;
Expand Down Expand Up @@ -86,6 +87,8 @@ pub enum GenerationError {
CaitSithInitializationError(#[from] InitializationError),
#[error("datastore storage error: {0}")]
DatastoreStorageError(#[from] DatastoreStorageError),
#[error("triple {0} is generating")]
TripleIsGenerating(TripleId),
}

/// Abstracts how triples are generated by providing a way to request a new triple that will be
Expand All @@ -102,7 +105,6 @@ pub struct PresignatureManager {
/// The set of presignatures that were already taken. This will be maintained for at most
/// presignature timeout period just so messages are cycled through the system.
taken: HashMap<PresignatureId, Instant>,

me: Participant,
threshold: usize,
epoch: u64,
Expand Down Expand Up @@ -311,16 +313,31 @@ impl PresignatureManager {
tracing::info!(id, "joining protocol to generate a new presignature");
let (triple0, triple1) = match triple_manager.take_two(triple0, triple1).await {
Ok(result) => result,
Err(error) => {
tracing::warn!(
?error,
id,
triple0,
triple1,
"could not initiate non-introduced presignature: triple might not have completed for this node yet"
);
return Err(error);
}
Err(error) => match error {
GenerationError::TripleIsGenerating(_) => {
tracing::warn!(
?error,
id,
triple0,
triple1,
"could not initiate non-introduced presignature: one triple is generating"
);
return Err(error);
}
GenerationError::TripleIsMissing(_) => {
tracing::warn!(
?error,
id,
triple0,
triple1,
"could not initiate non-introduced presignature: one triple is missing"
);
return Err(error);
}
_ => {
return Err(error);
}
},
};
let generator = Self::generate_internal(
participants,
Expand Down Expand Up @@ -396,6 +413,7 @@ impl PresignatureManager {
epoch: self.epoch,
from: self.me,
data: data.clone(),
timestamp: Utc::now().timestamp() as u64
},
))
}
Expand All @@ -409,6 +427,7 @@ impl PresignatureManager {
epoch: self.epoch,
from: self.me,
data,
timestamp: Utc::now().timestamp() as u64
},
)),
Action::Return(output) => {
Expand Down
3 changes: 3 additions & 0 deletions node/src/protocol/signature.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ use crate::types::{PublicKey, SignatureProtocol};
use crate::util::{AffinePointExt, ScalarExt};
use cait_sith::protocol::{Action, InitializationError, Participant, ProtocolError};
use cait_sith::{FullSignature, PresignOutput};
use chrono::Utc;
use k256::{Scalar, Secp256k1};
use near_crypto::Signer;
use near_fetch::signer::ExposeAccountId;
Expand Down Expand Up @@ -386,6 +387,7 @@ impl SignatureManager {
epoch: self.epoch,
from: self.me,
data: data.clone(),
timestamp: Utc::now().timestamp() as u64
},
))
}
Expand All @@ -402,6 +404,7 @@ impl SignatureManager {
epoch: self.epoch,
from: self.me,
data,
timestamp: Utc::now().timestamp() as u64
},
)),
Action::Return(output) => {
Expand Down
15 changes: 13 additions & 2 deletions node/src/protocol/triple.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ use crate::types::TripleProtocol;
use crate::util::AffinePointExt;
use cait_sith::protocol::{Action, InitializationError, Participant, ProtocolError};
use cait_sith::triples::{TripleGenerationOutput, TriplePub, TripleShare};
use chrono::Utc;
use highway::{HighwayHash, HighwayHasher};
use k256::elliptic_curve::group::GroupEncoding;
use k256::Secp256k1;
Expand Down Expand Up @@ -245,9 +246,17 @@ impl TripleManager {
id1: TripleId,
) -> Result<(Triple, Triple), GenerationError> {
if !self.triples.contains_key(&id0) {
Err(GenerationError::TripleIsMissing(id0))
if self.generators.contains_key(&id0) {
Err(GenerationError::TripleIsGenerating(id0))
} else {
Err(GenerationError::TripleIsMissing(id0))
}
} else if !self.triples.contains_key(&id1) {
Err(GenerationError::TripleIsMissing(id1))
if self.generators.contains_key(&id1) {
Err(GenerationError::TripleIsGenerating(id1))
} else {
Err(GenerationError::TripleIsMissing(id1))
}
} else {
self.delete_triple_from_storage(id0).await?;
self.delete_triple_from_storage(id1).await?;
Expand Down Expand Up @@ -405,6 +414,7 @@ impl TripleManager {
epoch: self.epoch,
from: self.me,
data: data.clone(),
timestamp: Utc::now().timestamp() as u64,
},
))
}
Expand All @@ -416,6 +426,7 @@ impl TripleManager {
epoch: self.epoch,
from: self.me,
data,
timestamp: Utc::now().timestamp() as u64,
},
)),
Action::Return(output) => {
Expand Down
14 changes: 14 additions & 0 deletions node/src/util.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
use crate::types::PublicKey;
use chrono::{DateTime, LocalResult, TimeZone, Utc};
use k256::elliptic_curve::scalar::FromUintUnchecked;
use k256::elliptic_curve::sec1::{FromEncodedPoint, ToEncodedPoint};
use k256::{AffinePoint, EncodedPoint, Scalar, U256};
Expand Down Expand Up @@ -83,3 +84,16 @@ pub fn get_triple_timeout() -> Duration {
.unwrap_or_default()
.unwrap_or(crate::types::PROTOCOL_TRIPLE_TIMEOUT)
}

pub fn is_elapsed_longer_than_timeout(timestamp_sec: u64, timeout: Duration) -> bool {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We've used simpler pattern before:

if timestamp.elapsed() < DEFAULT_TIMEOUT {...

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

that elapsed works with Instant, which records the relative timestamp in the system, not the absolute timestamp. Since we are sending the timestamp as a field in the messages, it needs to be comprehensible in all nodes' system, thus using u64 timestamp to represent the time.

if let LocalResult::Single(msg_timestamp) = Utc.timestamp_opt(timestamp_sec as i64, 0) {
let now_datetime: DateTime<Utc> = Utc::now();
// Calculate the difference in seconds
let elapsed_duration = now_datetime.signed_duration_since(msg_timestamp);
let timeout = chrono::Duration::seconds(timeout.as_secs() as i64)
+ chrono::Duration::nanoseconds(timeout.subsec_nanos() as i64);
elapsed_duration > timeout
} else {
false
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we return false in case of failure, such message will never be deleted from the queue.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Seems like the case where it's not Single only happens in the following

Returns MappedLocalTime::None on out-of-range number of seconds and/or invalid nanosecond, otherwise always returns MappedLocalTime::Single.

So we should be fine for the most part since our timestamp should never be out of range since we created it with Utc::now().timestamp() in the first place, but maybe we should add some guardrails to the type itself later when we get the chance, just in case someone ends up setting timestamp to be something else not from the chrono crate

}
}
Loading