Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Perf] Avoid fetching already seen Certificates #3439

Open
wants to merge 2 commits into
base: staging
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 3 additions & 0 deletions node/bft/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,9 @@ features = [ "thread-pool" ]
version = "2.1"
features = [ "serde", "rayon" ]

[dependencies.lru]
version = "0.12.1"

[dependencies.metrics]
package = "snarkos-node-metrics"
path = "../metrics"
Expand Down
70 changes: 68 additions & 2 deletions node/bft/src/helpers/storage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,9 +25,11 @@ use snarkvm::{
};

use indexmap::{IndexMap, IndexSet, map::Entry};
use lru::LruCache;
use parking_lot::RwLock;
use std::{
collections::{HashMap, HashSet},
num::NonZeroUsize,
sync::{
Arc,
atomic::{AtomicU32, AtomicU64, Ordering},
Expand Down Expand Up @@ -81,6 +83,8 @@ pub struct StorageInner<N: Network> {
/* Once per batch */
/// The map of `round` to a list of `(certificate ID, batch ID, author)` entries.
rounds: RwLock<IndexMap<u64, IndexSet<(Field<N>, Field<N>, Address<N>)>>>,
/// A cache of `certificate ID` to unprocessed `certificate`.
unprocessed_certificates: RwLock<LruCache<Field<N>, BatchCertificate<N>>>,
/// The map of `certificate ID` to `certificate`.
certificates: RwLock<IndexMap<Field<N>, BatchCertificate<N>>>,
/// The map of `batch ID` to `round`.
Expand All @@ -100,6 +104,8 @@ impl<N: Network> Storage<N> {
let committee = ledger.current_committee().expect("Ledger is missing a committee.");
// Retrieve the current round.
let current_round = committee.starting_round().max(1);
// Set the unprocessed certificates cache size.
let unprocessed_cache_size = NonZeroUsize::new((N::MAX_CERTIFICATES * 2) as usize).unwrap();

// Return the storage.
let storage = Self(Arc::new(StorageInner {
Expand All @@ -109,6 +115,7 @@ impl<N: Network> Storage<N> {
gc_round: Default::default(),
max_gc_rounds,
rounds: Default::default(),
unprocessed_certificates: RwLock::new(LruCache::new(unprocessed_cache_size)),
certificates: Default::default(),
batch_ids: Default::default(),
transmissions,
Expand Down Expand Up @@ -219,9 +226,9 @@ impl<N: Network> Storage<N> {
// Remove the GC round(s) from storage.
for gc_round in current_gc_round..=next_gc_round {
// Iterate over the certificates for the GC round.
for certificate in self.get_certificates_for_round(gc_round).iter() {
for id in self.get_certificate_ids_for_round(gc_round).into_iter() {
// Remove the certificate from storage.
self.remove_certificate(certificate.id());
self.remove_certificate(id);
}
}
// Update the GC round.
Expand All @@ -248,6 +255,12 @@ impl<N: Network> Storage<N> {
self.rounds.read().get(&round).map_or(false, |set| set.iter().any(|(_, _, a)| a == &author))
}

/// Returns `true` if the storage contains the specified `certificate ID`.
pub fn contains_unprocessed_certificate(&self, certificate_id: Field<N>) -> bool {
// Check if the certificate ID exists in storage.
self.unprocessed_certificates.read().contains(&certificate_id)
}

/// Returns `true` if the storage contains the specified `batch ID`.
pub fn contains_batch(&self, batch_id: Field<N>) -> bool {
// Check if the batch ID exists in storage.
Expand Down Expand Up @@ -293,6 +306,13 @@ impl<N: Network> Storage<N> {
self.certificates.read().get(&certificate_id).cloned()
}

/// Returns the unprocessed certificate for the given `certificate ID`.
/// If the certificate ID does not exist in storage, `None` is returned.
pub fn get_unprocessed_certificate(&self, certificate_id: Field<N>) -> Option<BatchCertificate<N>> {
// Get the unprocessed certificate.
self.unprocessed_certificates.read().peek(&certificate_id).cloned()
}

/// Returns the certificate for the given `round` and `author`.
/// If the round does not exist in storage, `None` is returned.
/// If the author for the round does not exist in storage, `None` is returned.
Expand Down Expand Up @@ -324,6 +344,36 @@ impl<N: Network> Storage<N> {
}
}

/// Returns the certificate IDs for the given `round`.
/// If the round does not exist in storage, `None` is returned.
pub fn get_certificate_ids_for_round(&self, round: u64) -> IndexSet<Field<N>> {
// The genesis round does not have batch certificates.
if round == 0 {
return Default::default();
}
// Retrieve the certificates.
if let Some(entries) = self.rounds.read().get(&round) {
entries.iter().map(|(certificate_id, _, _)| *certificate_id).collect()
} else {
Default::default()
}
}

/// Returns the certificate authors for the given `round`.
/// If the round does not exist in storage, `None` is returned.
pub fn get_certificate_authors_for_round(&self, round: u64) -> HashSet<Address<N>> {
// The genesis round does not have batch certificates.
if round == 0 {
return Default::default();
}
// Retrieve the certificates.
if let Some(entries) = self.rounds.read().get(&round) {
entries.iter().map(|(_, _, author)| *author).collect()
} else {
Default::default()
}
}

/// Returns the certificates that have not yet been included in the ledger.
/// Note that the order of this set is by round and then insertion.
pub(crate) fn get_pending_certificates(&self) -> IndexSet<BatchCertificate<N>> {
Expand Down Expand Up @@ -574,6 +624,8 @@ impl<N: Network> Storage<N> {
let transmission_ids = certificate.transmission_ids().clone();
// Insert the certificate.
self.certificates.write().insert(certificate_id, certificate);
// Remove the unprocessed certificate.
self.unprocessed_certificates.write().pop(&certificate_id);
// Insert the batch ID.
self.batch_ids.write().insert(batch_id, round);
// Insert the certificate ID for each of the transmissions into storage.
Expand All @@ -585,6 +637,18 @@ impl<N: Network> Storage<N> {
);
}

/// Inserts the given unprocessed `certificate` into storage.
///
/// This is a temporary storage, which is cleared again when calling `insert_certificate_atomic`.
pub fn insert_unprocessed_certificate(&self, certificate: BatchCertificate<N>) -> Result<()> {
// Ensure the certificate round is above the GC round.
ensure!(certificate.round() > self.gc_round(), "Certificate round is at or below the GC round");
// Insert the certificate.
self.unprocessed_certificates.write().put(certificate.id(), certificate);

Ok(())
}

/// Removes the given `certificate ID` from storage.
///
/// This method triggers updates to the `rounds`, `certificates`, `batch_ids`, and `transmissions` maps.
Expand Down Expand Up @@ -622,6 +686,8 @@ impl<N: Network> Storage<N> {
}
// Remove the certificate.
self.certificates.write().swap_remove(&certificate_id);
// Remove the unprocessed certificate.
self.unprocessed_certificates.write().pop(&certificate_id);
// Remove the batch ID.
self.batch_ids.write().swap_remove(&batch_id);
// Remove the transmission entries in the certificate from storage.
Expand Down
64 changes: 34 additions & 30 deletions node/bft/src/primary.rs
Original file line number Diff line number Diff line change
Expand Up @@ -916,9 +916,18 @@ impl<N: Network> Primary<N> {
peer_ip: SocketAddr,
certificate: BatchCertificate<N>,
) -> Result<()> {
// Ensure the batch certificate is from an authorized validator.
if !self.gateway.is_authorized_validator_ip(peer_ip) {
// Proceed to disconnect the validator.
self.gateway.disconnect(peer_ip);
bail!("Malicious peer - Received a batch certificate from an unauthorized validator IP ({peer_ip})");
}
// Ensure storage does not already contain the certificate.
if self.storage.contains_certificate(certificate.id()) {
return Ok(());
// Otherwise, ensure ephemeral storage contains the certificate.
} else if !self.storage.contains_unprocessed_certificate(certificate.id()) {
self.storage.insert_unprocessed_certificate(certificate.clone())?;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why is the certificate not removed from the unprocessed_certificates cache when this function is completed?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is removed from the cache if the certificate is inserted into storage, so even sooner than waiting for the function to complete.

You might wonder whether a malicious validator could fill up the cache, and they could. The cache helps in the optimistic case to speed up processing.

}

// Retrieve the batch certificate author.
Expand All @@ -928,12 +937,6 @@ impl<N: Network> Primary<N> {
// Retrieve the batch certificate committee ID.
let committee_id = certificate.committee_id();

// Ensure the batch certificate is from an authorized validator.
if !self.gateway.is_authorized_validator_ip(peer_ip) {
// Proceed to disconnect the validator.
self.gateway.disconnect(peer_ip);
bail!("Malicious peer - Received a batch certificate from an unauthorized validator IP ({peer_ip})");
}
// Ensure the batch certificate is not from the current primary.
if self.gateway.account().address() == author {
bail!("Received a batch certificate for myself ({author})");
Expand All @@ -947,10 +950,8 @@ impl<N: Network> Primary<N> {

// Retrieve the committee lookback.
let committee_lookback = self.ledger.get_committee_lookback_for_round(certificate_round)?;
// Retrieve the certificates.
let certificates = self.storage.get_certificates_for_round(certificate_round);
// Construct a set over the authors.
let authors = certificates.iter().map(BatchCertificate::author).collect();
// Retrieve the certificate authors.
let authors = self.storage.get_certificate_authors_for_round(certificate_round);
// Check if the certificates have reached the quorum threshold.
let is_quorum = committee_lookback.is_quorum_threshold_reached(&authors);

Expand Down Expand Up @@ -1027,13 +1028,13 @@ impl<N: Network> Primary<N> {
if current_round == 0 {
break;
}
// Retrieve the certificates.
let certificates = self_.storage.get_certificates_for_round(current_round);
// Retrieve the primary certificate.
certificate =
certificates.into_iter().find(|certificate| certificate.author() == primary_address);
// Retrieve the primary certificates.
if let Some(primary_certificate) =
self_.storage.get_certificate_for_round_with_author(current_round, primary_address)
{
certificate = Some(primary_certificate);
// If the primary certificate was not found, decrement the round.
if certificate.is_none() {
} else {
current_round = current_round.saturating_sub(1);
}
}
Expand Down Expand Up @@ -1210,17 +1211,16 @@ impl<N: Network> Primary<N> {
let next_round = self_.current_round().saturating_add(1);
// Determine if the quorum threshold is reached for the current round.
let is_quorum_threshold_reached = {
// Retrieve the certificates for the next round.
let certificates = self_.storage.get_certificates_for_round(next_round);
// Retrieve the certificate authors for the next round.
let authors = self_.storage.get_certificate_authors_for_round(next_round);
// If there are no certificates, then skip this check.
if certificates.is_empty() {
if authors.is_empty() {
continue;
}
let Ok(committee_lookback) = self_.ledger.get_committee_lookback_for_round(next_round) else {
warn!("Failed to retrieve the committee lookback for round {next_round}");
continue;
};
let authors = certificates.iter().map(BatchCertificate::author).collect();
committee_lookback.is_quorum_threshold_reached(&authors)
};
// Attempt to increment to the next round if the quorum threshold is reached.
Expand Down Expand Up @@ -1541,8 +1541,7 @@ impl<N: Network> Primary<N> {

// Determine if quorum threshold is reached on the batch round.
let is_quorum_threshold_reached = {
let certificates = self.storage.get_certificates_for_round(batch_round);
let authors = certificates.iter().map(BatchCertificate::author).collect();
let authors = self.storage.get_certificate_authors_for_round(batch_round);
let committee_lookback = self.ledger.get_committee_lookback_for_round(batch_round)?;
committee_lookback.is_quorum_threshold_reached(&authors)
};
Expand Down Expand Up @@ -1668,14 +1667,23 @@ impl<N: Network> Primary<N> {
) -> Result<HashSet<BatchCertificate<N>>> {
// Initialize a list for the missing certificates.
let mut fetch_certificates = FuturesUnordered::new();
// Initialize a set for the missing certificates.
let mut missing_certificates = HashSet::default();
// Iterate through the certificate IDs.
for certificate_id in certificate_ids {
// Check if the certificate already exists in the ledger.
if self.ledger.contains_certificate(certificate_id)? {
continue;
}
// If we do not have the certificate, request it.
if !self.storage.contains_certificate(*certificate_id) {
// Check if the certificate already exists in storage.
if self.storage.contains_certificate(*certificate_id) {
continue;
}
// If we have not fully processed the certificate yet, store it.
if let Some(certificate) = self.storage.get_unprocessed_certificate(*certificate_id) {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

NOTE: we can additionally resolve any outstanding certificate requests for this particular certificate.

missing_certificates.insert(certificate);
} else {
// If we do not have the certificate, request it.
trace!("Primary - Found a new certificate ID for round {round} from '{peer_ip}'");
// TODO (howardwu): Limit the number of open requests we send to a peer.
// Send an certificate request to the peer.
Expand All @@ -1692,8 +1700,6 @@ impl<N: Network> Primary<N> {
),
}

// Initialize a set for the missing certificates.
let mut missing_certificates = HashSet::with_capacity(fetch_certificates.len());
// Wait for all of the missing certificates to be fetched.
while let Some(result) = fetch_certificates.next().await {
// Insert the missing certificate into the set.
Expand Down Expand Up @@ -2058,8 +2064,7 @@ mod tests {
store_certificate_chain(&primary, &accounts, round, &mut rng);

// Get transmissions from previous certificates.
let previous_certificate_ids: IndexSet<_> =
primary.storage.get_certificates_for_round(prev_round).iter().map(|cert| cert.id()).collect();
let previous_certificate_ids: IndexSet<_> = primary.storage.get_certificate_ids_for_round(prev_round);

// Track the number of transmissions in the previous round.
let mut num_transmissions_in_previous_round = 0;
Expand Down Expand Up @@ -2602,8 +2607,7 @@ mod tests {
store_certificate_chain(&primary, &accounts, round, &mut rng);

// Get transmissions from previous certificates.
let previous_certificate_ids: IndexSet<_> =
primary.storage.get_certificates_for_round(prev_round).iter().map(|cert| cert.id()).collect();
let previous_certificate_ids: IndexSet<_> = primary.storage.get_certificate_ids_for_round(prev_round);

// Generate a solution and a transaction.
let (solution_commitment, solution) = sample_unconfirmed_solution(&mut rng);
Expand Down