Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

chore: Made failed generators only get added on self proposers #597

Merged
merged 5 commits into from
May 20, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion integration-tests/tests/multichain/actions/wait_for.rs
Original file line number Diff line number Diff line change
Expand Up @@ -236,7 +236,7 @@ pub async fn signature_payload_responded(
) -> anyhow::Result<FullSignature<Secp256k1>> {
let is_signature_ready = || async {
let (_, _, _, tx_hash) = crate::multichain::actions::request_sign_non_random(
&ctx,
ctx,
account.clone(),
payload,
payload_hashed,
Expand Down
71 changes: 8 additions & 63 deletions node/src/protocol/cryptography.rs
Original file line number Diff line number Diff line change
Expand Up @@ -405,77 +405,22 @@ impl CryptographicProtocol for RunningState {
crate::metrics::SIGN_QUEUE_SIZE
.with_label_values(&[&my_account_id])
.set(sign_queue.len() as i64);

let mut signature_manager = self.signature_manager.write().await;
sign_queue.organize(self.threshold, active, ctx.me().await, &my_account_id);
let my_requests = sign_queue.my_requests(ctx.me().await);
crate::metrics::SIGN_QUEUE_MINE_SIZE
.with_label_values(&[&my_account_id])
.set(my_requests.len() as i64);
let mut failed_presigs = Vec::new();
while presignature_manager.my_len() > 0 {
if let Some((receipt_id, failed_generator)) = signature_manager.take_failed_generator()
{
// only retry the failed signature generator if the proposer of the signature is me
if failed_generator.proposer == signature_manager.me() {
let Some(presignature) = presignature_manager.take_mine() else {
break;
};
let sig_participants = active.intersection(&[&presignature.participants]);
if sig_participants.len() < self.threshold {
tracing::debug!(
participants = ?sig_participants.keys_vec(),
"running: we don't have enough participants to generate a failed signature"
);
failed_presigs.push(presignature);
continue;
}

signature_manager.retry_failed_generation(
receipt_id,
&failed_generator,
presignature,
&sig_participants,
);
}
}

let Some((receipt_id, _)) = my_requests.iter().next() else {
break;
};

let Some(presignature) = presignature_manager.take_mine() else {
break;
};

let receipt_id = *receipt_id;
let sig_participants = active.intersection(&[&presignature.participants]);
if sig_participants.len() < self.threshold {
tracing::debug!(
participants = ?sig_participants.keys_vec(),
"running: we don't have enough participants to generate a signature"
);
failed_presigs.push(presignature);
continue;
}

let my_request = my_requests.remove(&receipt_id).unwrap();
signature_manager.generate(
&sig_participants,
receipt_id,
presignature,
self.public_key,
my_request.msg_hash,
my_request.epsilon,
my_request.delta,
my_request.time_added,
)?;
}
let mut signature_manager = self.signature_manager.write().await;
signature_manager.handle_requests(
self.threshold,
active,
my_requests,
&mut presignature_manager,
)?;
drop(sign_queue);
for presignature in failed_presigs {
presignature_manager.insert_mine(presignature);
}
drop(presignature_manager);

for (p, msg) in signature_manager.poke() {
let info = self.fetch_participant(&p)?;
messages.push(info.clone(), MpcMessage::Signature(msg));
Expand Down
176 changes: 122 additions & 54 deletions node/src/protocol/signature.rs
Original file line number Diff line number Diff line change
Expand Up @@ -161,7 +161,7 @@ impl SignatureGenerator {

/// Generator for signature thas has failed. Only retains essential information
/// for starting up this failed signature once again.
pub struct FailedGenerator {
pub struct GenerationRequest {
pub proposer: Participant,
pub msg_hash: [u8; 32],
pub epsilon: Scalar,
Expand All @@ -173,7 +173,7 @@ pub struct SignatureManager {
/// Ongoing signature generation protocols.
generators: HashMap<CryptoHash, SignatureGenerator>,
/// Failed signatures awaiting to be retried.
failed_generators: VecDeque<(CryptoHash, FailedGenerator)>,
failed: VecDeque<(CryptoHash, GenerationRequest)>,
/// Set of completed signatures
completed: HashMap<PresignatureId, Instant>,
/// Generated signatures assigned to the current node that are yet to be published.
Expand All @@ -188,7 +188,7 @@ impl SignatureManager {
pub fn new(me: Participant, public_key: PublicKey, epoch: u64) -> Self {
Self {
generators: HashMap::new(),
failed_generators: VecDeque::new(),
failed: VecDeque::new(),
completed: HashMap::new(),
signatures: Vec::new(),
me,
Expand All @@ -198,7 +198,7 @@ impl SignatureManager {
}

pub fn failed_len(&self) -> usize {
self.failed_generators.len()
self.failed.len()
}

pub fn me(&self) -> Participant {
Expand All @@ -210,14 +210,17 @@ impl SignatureManager {
participants: &Participants,
me: Participant,
public_key: PublicKey,
proposer: Participant,
presignature: Presignature,
msg_hash: [u8; 32],
epsilon: Scalar,
delta: Scalar,
sign_request_timestamp: Instant,
req: GenerationRequest,
) -> Result<SignatureGenerator, InitializationError> {
let participants: Vec<_> = participants.keys().cloned().collect();
let participants = participants.keys_vec();
let GenerationRequest {
proposer,
msg_hash,
epsilon,
delta,
sign_request_timestamp,
} = req;
let PresignOutput { big_r, k, sigma } = presignature.output;
// TODO: Check whether it is okay to use invert_vartime instead
let output: PresignOutput<Secp256k1> = PresignOutput {
Expand All @@ -244,32 +247,18 @@ impl SignatureManager {
))
}

pub fn take_failed_generator(&mut self) -> Option<(CryptoHash, FailedGenerator)> {
self.failed_generators.pop_front()
}

pub fn retry_failed_generation(
fn retry_failed_generation(
&mut self,
receipt_id: CryptoHash,
failed_generator: &FailedGenerator,
req: GenerationRequest,
presignature: Presignature,
participants: &Participants,
) -> Option<()> {
tracing::info!(receipt_id = %receipt_id, participants = ?participants.keys().collect::<Vec<_>>(), "restarting failed protocol to generate signature");
let generator = Self::generate_internal(
participants,
self.me,
self.public_key,
failed_generator.proposer,
presignature,
failed_generator.msg_hash,
failed_generator.epsilon,
failed_generator.delta,
failed_generator.sign_request_timestamp,
)
.unwrap();
) -> Result<(), InitializationError> {
tracing::info!(receipt_id = %receipt_id, participants = ?participants.keys_vec(), "restarting failed protocol to generate signature");
let generator =
Self::generate_internal(participants, self.me, self.public_key, presignature, req)?;
self.generators.insert(receipt_id, generator);
Some(())
Ok(())
}

/// Starts a new presignature generation protocol.
Expand All @@ -279,7 +268,6 @@ impl SignatureManager {
participants: &Participants,
receipt_id: CryptoHash,
presignature: Presignature,
public_key: PublicKey,
msg_hash: [u8; 32],
epsilon: Scalar,
delta: Scalar,
Expand All @@ -295,13 +283,15 @@ impl SignatureManager {
let generator = Self::generate_internal(
participants,
self.me,
public_key,
self.me,
self.public_key,
presignature,
msg_hash,
epsilon,
delta,
sign_request_timestamp,
GenerationRequest {
proposer: self.me,
msg_hash,
epsilon,
delta,
sign_request_timestamp,
},
)?;
self.generators.insert(receipt_id, generator);
Ok(())
Expand Down Expand Up @@ -329,20 +319,22 @@ impl SignatureManager {
Entry::Vacant(entry) => {
tracing::info!(%receipt_id, me = ?self.me, presignature_id, "joining protocol to generate a new signature");
let Some(presignature) = presignature_manager.take(presignature_id) else {
tracing::warn!(me = ?self.me, presignature_id, "presignature is missing, can't join");
tracing::warn!(me = ?self.me, presignature_id, "presignature is missing, can't join signature generation protocol");
return Ok(None);
};
tracing::info!(me = ?self.me, presignature_id, "found presignature: ready to start signature generation");
let generator = Self::generate_internal(
participants,
self.me,
self.public_key,
proposer,
presignature,
msg_hash,
epsilon,
delta,
Instant::now(),
GenerationRequest {
proposer,
msg_hash,
epsilon,
delta,
sign_request_timestamp: Instant::now(),
},
)?;
let generator = entry.insert(generator);
Ok(Some(&mut generator.protocol))
Expand All @@ -363,16 +355,20 @@ impl SignatureManager {
Ok(action) => action,
Err(err) => {
tracing::warn!(?err, "signature failed to be produced; pushing request back into failed queue");
self.failed_generators.push_back((
*receipt_id,
FailedGenerator {
proposer: generator.proposer,
msg_hash: generator.msg_hash,
epsilon: generator.epsilon,
delta: generator.delta,
sign_request_timestamp: generator.sign_request_timestamp
},
));
if generator.proposer == self.me {
// only retry the signature generation if it was initially proposed by us. We do not
// want any nodes to be proposing the same signature multiple times.
self.failed.push_back((
*receipt_id,
GenerationRequest {
proposer: generator.proposer,
msg_hash: generator.msg_hash,
epsilon: generator.epsilon,
delta: generator.delta,
sign_request_timestamp: generator.sign_request_timestamp
},
));
}
break false;
}
};
Expand Down Expand Up @@ -439,6 +435,78 @@ impl SignatureManager {
messages
}

pub fn handle_requests(
&mut self,
threshold: usize,
active: &Participants,
my_requests: &mut HashMap<CryptoHash, SignRequest>,
presignature_manager: &mut PresignatureManager,
) -> Result<(), super::CryptographicError> {
let mut failed_presigs = Vec::new();
while let Some(mut presignature) = {
if self.failed.is_empty() && my_requests.is_empty() {
None
} else {
presignature_manager.take_mine()
}
} {
let sig_participants = active.intersection(&[&presignature.participants]);
if sig_participants.len() < threshold {
tracing::debug!(
participants = ?sig_participants.keys_vec(),
"we do not have enough participants to generate a failed signature"
);
failed_presigs.push(presignature);
continue;
}

// NOTE: this prioritizes old requests first then tries to do new ones if there's enough presignatures.
// TODO: we need to decide how to prioritize certain requests over others such as with gas or time of
// when the request made it into the NEAR network.
// issue: https://github.com/near/mpc-recovery/issues/596
if let Some((receipt_id, failed_req)) = self.failed.pop_front() {
self.retry_failed_generation(
receipt_id,
failed_req,
presignature,
&sig_participants,
)?;

if let Some(another_presignature) = presignature_manager.take_mine() {
presignature = another_presignature;
} else {
break;
}
}

let Some(receipt_id) = my_requests.keys().next().cloned() else {
failed_presigs.push(presignature);
continue;
};
let Some(my_request) = my_requests.remove(&receipt_id) else {
failed_presigs.push(presignature);
continue;
};
self.generate(
&sig_participants,
receipt_id,
presignature,
my_request.msg_hash,
my_request.epsilon,
my_request.delta,
my_request.time_added,
)?;
}

// add back the failed presignatures that were incompatible to be made into
// signatures due to failures or lack of participants.
for presignature in failed_presigs {
presignature_manager.insert_mine(presignature);
}

Ok(())
}

pub async fn publish<T: Signer + ExposeAccountId>(
&mut self,
rpc_client: &near_fetch::Client,
Expand Down
Loading