Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Optimize] Adjust blocking tasks in the primary #3121

Merged
merged 1 commit into from
Mar 2, 2024
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
33 changes: 19 additions & 14 deletions node/bft/src/primary.rs
Original file line number Diff line number Diff line change
Expand Up @@ -630,7 +630,7 @@ impl<N: Network> Primary<N> {
let BatchSignature { batch_id, signature } = batch_signature;

// Retrieve the signer.
let signer = spawn_blocking!(Ok(signature.to_address()))?;
let signer = signature.to_address();
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

note: this operation was found to be fast enough to not require blocking


// Ensure the batch signature is signed by the validator.
if self.gateway.resolver().get_address(peer_ip).map_or(true, |address| address != signer) {
Expand All @@ -643,15 +643,16 @@ impl<N: Network> Primary<N> {
bail!("Invalid peer - received a batch signature from myself ({signer})");
}

let proposal = {
let self_ = self.clone();
let Some(proposal) = spawn_blocking!({
// Acquire the write lock.
let mut proposed_batch = self.proposed_batch.write();
let mut proposed_batch = self_.proposed_batch.write();
// Add the signature to the batch, and determine if the batch is ready to be certified.
match proposed_batch.as_mut() {
Some(proposal) => {
// Ensure the batch ID matches the currently proposed batch ID.
if proposal.batch_id() != batch_id {
match self.storage.contains_batch(batch_id) {
match self_.storage.contains_batch(batch_id) {
true => bail!("This batch was already certified"),
false => bail!(
"Unknown batch ID '{batch_id}', expected '{}' for round {}",
Expand All @@ -661,9 +662,9 @@ impl<N: Network> Primary<N> {
}
}
// Retrieve the committee lookback for the round.
let committee_lookback = self.ledger.get_committee_lookback_for_round(proposal.round())?;
let committee_lookback = self_.ledger.get_committee_lookback_for_round(proposal.round())?;
// Retrieve the address of the validator.
let Some(signer) = self.gateway.resolver().get_address(peer_ip) else {
let Some(signer) = self_.gateway.resolver().get_address(peer_ip) else {
bail!("Signature is from a disconnected validator");
};
// Add the signature to the batch.
Expand All @@ -672,17 +673,20 @@ impl<N: Network> Primary<N> {
// Check if the batch is ready to be certified.
if !proposal.is_quorum_threshold_reached(&committee_lookback) {
// If the batch is not ready to be certified, return early.
return Ok(());
return Ok(None);
}
}
// There is no proposed batch, so return early.
None => return Ok(()),
None => return Ok(None),
};
// Retrieve the batch proposal, clearing the proposed batch.
match proposed_batch.take() {
Some(proposal) => proposal,
None => return Ok(()),
Some(proposal) => Ok(Some(proposal)),
None => Ok(None),
}
})?
else {
return Ok(());
};

/* Proceeding to certify the batch. */
Expand Down Expand Up @@ -793,7 +797,8 @@ impl<N: Network> Primary<N> {
tokio::time::sleep(Duration::from_millis(PRIMARY_PING_IN_MS)).await;

// Retrieve the block locators.
let block_locators = match self_.sync.get_block_locators() {
let self__ = self_.clone();
let block_locators = match spawn_blocking!(self__.sync.get_block_locators()) {
Ok(block_locators) => block_locators,
Err(e) => {
warn!("Failed to retrieve block locators - {e}");
Expand Down Expand Up @@ -1114,7 +1119,7 @@ impl<N: Network> Primary<N> {
/// Stores the certified batch and broadcasts it to all validators, returning the certificate.
async fn store_and_broadcast_certificate(&self, proposal: &Proposal<N>, committee: &Committee<N>) -> Result<()> {
// Create the batch certificate and transmissions.
let (certificate, transmissions) = proposal.to_certificate(committee)?;
let (certificate, transmissions) = tokio::task::block_in_place(|| proposal.to_certificate(committee))?;
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

note: this is done via block_in_place instead of spawn_blocking in order to not have to clone the Proposal, which could get expensive enough to defeat the purpose

// Convert the transmissions into a HashMap.
// Note: Do not change the `Proposal` to use a HashMap. The ordering there is necessary for safety.
let transmissions = transmissions.into_iter().collect::<HashMap<_, _>>();
Expand Down Expand Up @@ -1840,7 +1845,7 @@ mod tests {
);
}

#[tokio::test]
#[tokio::test(flavor = "multi_thread")]
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

note: this is a requirement for using block_in_place

async fn test_batch_signature_from_peer() {
let mut rng = TestRng::default();
let (primary, accounts) = primary_without_handlers(&mut rng).await;
Expand Down Expand Up @@ -1875,7 +1880,7 @@ mod tests {
assert_eq!(primary.current_round(), round + 1);
}

#[tokio::test]
#[tokio::test(flavor = "multi_thread")]
async fn test_batch_signature_from_peer_in_round() {
let round = 5;
let mut rng = TestRng::default();
Expand Down