Skip to content

Commit

Permalink
runtime: parallelize Stakes::new using rayon (solana-labs#450)
Browse files Browse the repository at this point in the history
* runtime: Stakes::new: avoid extra loops over stakes.stake_delegations

* runtime: parallelize Stakes::new with rayon

* Address review comments

* Address more comments
  • Loading branch information
alessandrod authored Apr 19, 2024
1 parent c2936eb commit deec1f4
Show file tree
Hide file tree
Showing 2 changed files with 52 additions and 37 deletions.
6 changes: 4 additions & 2 deletions runtime/src/bank.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1587,10 +1587,11 @@ impl Bank {
// from Stakes<Delegation> by reading the full account state from
// accounts-db. Note that it is crucial that these accounts are loaded
// at the right slot and match precisely with serialized Delegations.
//
// Note that we are disabling the read cache while we populate the stakes cache.
// The stakes accounts will not be expected to be loaded again.
// If we populate the read cache with these loads, then we'll just soon have to evict these.
let stakes = Stakes::new(&fields.stakes, |pubkey| {
let (stakes, stakes_time) = measure!(Stakes::new(&fields.stakes, |pubkey| {
let (account, _slot) = bank_rc
.accounts
.load_with_fixed_root_do_not_populate_read_cache(&ancestors, pubkey)?;
Expand All @@ -1599,7 +1600,8 @@ impl Bank {
.expect(
"Stakes cache is inconsistent with accounts-db. This can indicate \
a corrupted snapshot or bugs in cached accounts or accounts-db.",
);
));
info!("Loading Stakes took: {stakes_time}");
let stakes_accounts_load_duration = now.elapsed();
let mut bank = Self {
skipped_rewrites: Mutex::default(),
Expand Down
83 changes: 48 additions & 35 deletions runtime/src/stakes.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ use {
},
solana_vote::vote_account::{VoteAccount, VoteAccounts},
std::{
collections::{HashMap, HashSet},
collections::HashMap,
ops::Add,
sync::{Arc, RwLock, RwLockReadGuard},
},
Expand Down Expand Up @@ -227,22 +227,53 @@ impl Stakes<StakeAccount> {
/// cached.
pub(crate) fn new<F>(stakes: &Stakes<Delegation>, get_account: F) -> Result<Self, Error>
where
F: Fn(&Pubkey) -> Option<AccountSharedData>,
F: Fn(&Pubkey) -> Option<AccountSharedData> + Sync,
{
let stake_delegations = stakes.stake_delegations.iter().map(|(pubkey, delegation)| {
let Some(stake_account) = get_account(pubkey) else {
return Err(Error::StakeAccountNotFound(*pubkey));
};
let stake_account = StakeAccount::try_from(stake_account)?;
// Sanity check that the delegation is consistent with what is
// stored in the account.
if stake_account.delegation() == *delegation {
Ok((*pubkey, stake_account))
} else {
Err(Error::InvalidDelegation(*pubkey))
}
});
let stake_delegations = stakes
.stake_delegations
.iter()
// im::HashMap doesn't support rayon so we manually build a temporary vector. Note this is
// what std HashMap::par_iter() does internally too.
.collect::<Vec<_>>()
.into_par_iter()
// We use fold/reduce to aggregate the results, which does a bit more work than calling
// collect()/collect_vec_list() and then im::HashMap::from_iter(collected.into_iter()),
// but it does it in background threads, so effectively it's faster.
.try_fold(ImHashMap::new, |mut map, (pubkey, delegation)| {
let Some(stake_account) = get_account(pubkey) else {
return Err(Error::StakeAccountNotFound(*pubkey));
};

// Assert that all valid vote-accounts referenced in stake delegations are already
// contained in `stakes.vote_account`.
let voter_pubkey = &delegation.voter_pubkey;
if stakes.vote_accounts.get(voter_pubkey).is_none() {
if let Some(account) = get_account(voter_pubkey) {
if VoteStateVersions::is_correct_size_and_initialized(account.data())
&& VoteAccount::try_from(account.clone()).is_ok()
{
error!("vote account not cached: {voter_pubkey}, {account:?}");
return Err(Error::VoteAccountNotCached(*voter_pubkey));
}
}
}

let stake_account = StakeAccount::try_from(stake_account)?;
// Sanity check that the delegation is consistent with what is
// stored in the account.
if stake_account.delegation() == *delegation {
map.insert(*pubkey, stake_account);
Ok(map)
} else {
Err(Error::InvalidDelegation(*pubkey))
}
})
.try_reduce(ImHashMap::new, |a, b| Ok(a.union(b)))?;

// Assert that cached vote accounts are consistent with accounts-db.
//
// This currently includes ~5500 accounts, parallelizing brings minor
// (sub 2s) improvements.
for (pubkey, vote_account) in stakes.vote_accounts.iter() {
let Some(account) = get_account(pubkey) else {
return Err(Error::VoteAccountNotFound(*pubkey));
Expand All @@ -253,28 +284,10 @@ impl Stakes<StakeAccount> {
return Err(Error::VoteAccountMismatch(*pubkey));
}
}
// Assert that all valid vote-accounts referenced in
// stake delegations are already cached.
let voter_pubkeys: HashSet<Pubkey> = stakes
.stake_delegations
.values()
.map(|delegation| delegation.voter_pubkey)
.filter(|voter_pubkey| stakes.vote_accounts.get(voter_pubkey).is_none())
.collect();
for pubkey in voter_pubkeys {
let Some(account) = get_account(&pubkey) else {
continue;
};
if VoteStateVersions::is_correct_size_and_initialized(account.data())
&& VoteAccount::try_from(account.clone()).is_ok()
{
error!("vote account not cached: {pubkey}, {account:?}");
return Err(Error::VoteAccountNotCached(pubkey));
}
}

Ok(Self {
vote_accounts: stakes.vote_accounts.clone(),
stake_delegations: stake_delegations.collect::<Result<_, _>>()?,
stake_delegations,
unused: stakes.unused,
epoch: stakes.epoch,
stake_history: stakes.stake_history.clone(),
Expand Down

0 comments on commit deec1f4

Please sign in to comment.