Skip to content

Commit

Permalink
Do periodic inbound cleaning for rooted slots (#8436)
Browse files Browse the repository at this point in the history
* Do periodic inbound compaction for rooted slots

* Add comment

* nits

* Consider not_compacted_roots in cleanup_dead_slot

* Renames in AccountsIndex

* Rename to reflect expansion of removed accounts

* Fix a comment

* rename

* Parallelize clean over AccountsIndex

* Some niceties

* Reduce locks and real chunked parallelism

* Measure each step for sampling opportunities

* Just noticed par iter is maybe lazy

* Replace storage scan with optimized index scan

* Various clean-ups

* Clear uncleared_roots even if no updates

(cherry picked from commit d861033)

# Conflicts:
#	ledger/src/snapshot_utils.rs
#	runtime/src/accounts_db.rs
#	runtime/src/accounts_index.rs
#	runtime/src/bank.rs
  • Loading branch information
ryoqun authored and mergify-bot committed Mar 3, 2020
1 parent 1d2c3fc commit 4c12331
Show file tree
Hide file tree
Showing 4 changed files with 342 additions and 36 deletions.
5 changes: 5 additions & 0 deletions ledger/src/snapshot_utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -337,7 +337,12 @@ pub fn add_snapshot<P: AsRef<Path>>(
bank: &Bank,
snapshot_storages: &[SnapshotStorage],
) -> Result<SlotSnapshotPaths> {
<<<<<<< HEAD
bank.purge_zero_lamport_accounts();
=======
bank.clean_accounts();
bank.update_accounts_hash();
>>>>>>> d86103383... Do periodic inbound cleaning for rooted slots (#8436)
let slot = bank.slot();
// snapshot_path/slot
let slot_snapshot_dir = get_bank_snapshot_dir(snapshot_path, slot);
Expand Down
268 changes: 255 additions & 13 deletions runtime/src/accounts_db.rs
Original file line number Diff line number Diff line change
Expand Up @@ -611,18 +611,67 @@ impl AccountsDB {
false
}

// Purge zero lamport accounts for garbage collection purposes
// Reclaim older states of rooted non-zero lamport accounts as a general
// AccountsDB bloat mitigation and preprocess for better zero-lamport purging.
fn clean_old_rooted_accounts(&self, purges_in_root: Vec<Pubkey>) {
// This number isn't carefully chosen; just guessed randomly such that
// the hot loop will be the order of ~Xms.
const INDEX_CLEAN_BULK_COUNT: usize = 4096;

let mut measure = Measure::start("clean_old_root-ms");
let reclaim_vecs =
purges_in_root
.par_chunks(INDEX_CLEAN_BULK_COUNT)
.map(|pubkeys: &[Pubkey]| {
let mut reclaims = Vec::new();
let accounts_index = self.accounts_index.read().unwrap();
for pubkey in pubkeys {
accounts_index.clean_rooted_entries(&pubkey, &mut reclaims);
}
reclaims
});
let reclaims: Vec<_> = reclaim_vecs.flatten().collect();
measure.stop();
inc_new_counter_info!("clean-old-root-par-clean-ms", measure.as_ms() as usize);

let mut measure = Measure::start("clean_old_root-ms");
self.handle_reclaims(&reclaims);
measure.stop();
inc_new_counter_info!("clean-old-root-reclaim-ms", measure.as_ms() as usize);
}

fn clear_uncleaned_roots(&self) {
let mut accounts_index = self.accounts_index.write().unwrap();
accounts_index.uncleaned_roots.clear();
drop(accounts_index);
}

// Purge zero lamport accounts and older rooted account states as garbage
// collection
// Only remove those accounts where the entire rooted history of the account
// can be purged because there are no live append vecs in the ancestors
pub fn purge_zero_lamport_accounts(&self, ancestors: &HashMap<u64, usize>) {
pub fn clean_accounts(&self) {
self.report_store_stats();
let mut purges = HashMap::new();
let mut purges_in_root = Vec::new();
let no_ancestors = HashMap::new();
let accounts_index = self.accounts_index.read().unwrap();
accounts_index.scan_accounts(ancestors, |pubkey, (account_info, slot)| {
if account_info.lamports == 0 && accounts_index.is_root(slot) {

accounts_index.scan_accounts(&no_ancestors, |pubkey, (account_info, slot)| {
if account_info.lamports == 0 {
purges.insert(*pubkey, accounts_index.would_purge(pubkey));
} else if accounts_index.uncleaned_roots.contains(&slot) {
purges_in_root.push(*pubkey);
}
});
drop(accounts_index);

if !purges_in_root.is_empty() {
self.clean_old_rooted_accounts(purges_in_root);
}
self.clear_uncleaned_roots();

let accounts_index = self.accounts_index.read().unwrap();

// Calculate store counts as if everything was purged
// Then purge if we can
Expand Down Expand Up @@ -700,9 +749,15 @@ impl AccountsDB {
let mut dead_slots = self.remove_dead_accounts(reclaims);
dead_accounts.stop();

<<<<<<< HEAD
let mut cleanup_dead_slots = Measure::start("reclaims::purge_slots");
self.cleanup_dead_slots(&mut dead_slots, last_root);
cleanup_dead_slots.stop();
=======
let mut clean_dead_slots = Measure::start("reclaims::purge_slots");
self.clean_dead_slots(&mut dead_slots);
clean_dead_slots.stop();
>>>>>>> d86103383... Do periodic inbound cleaning for rooted slots (#8436)

let mut purge_slots = Measure::start("reclaims::purge_slots");
for slot in dead_slots {
Expand Down Expand Up @@ -1165,14 +1220,22 @@ impl AccountsDB {
dead_slots
}

<<<<<<< HEAD
fn cleanup_dead_slots(&self, dead_slots: &mut HashSet<Slot>, last_root: u64) {
// a slot is not totally dead until it is older than the root
dead_slots.retain(|slot| *slot < last_root);
=======
fn clean_dead_slots(&self, dead_slots: &mut HashSet<Slot>) {
if self.dont_cleanup_dead_slots.load(Ordering::Relaxed) {
return;
}

>>>>>>> d86103383... Do periodic inbound cleaning for rooted slots (#8436)
if !dead_slots.is_empty() {
{
let mut index = self.accounts_index.write().unwrap();
for slot in dead_slots.iter() {
index.cleanup_dead_slot(*slot);
index.clean_dead_slot(*slot);
}
}
{
Expand Down Expand Up @@ -1802,7 +1865,7 @@ pub mod tests {
//slot is still there, since gc is lazy
assert!(accounts.storage.read().unwrap().0[&0].get(&id).is_some());

//store causes cleanup
//store causes clean
accounts.store(1, &[(&pubkey, &account)]);

//slot is gone
Expand All @@ -1813,6 +1876,149 @@ pub mod tests {
assert_eq!(accounts.load_slow(&ancestors, &pubkey), Some((account, 1)));
}

impl AccountsDB {
fn store_count_for_slot(&self, slot: Slot) -> usize {
let storage = self.storage.read().unwrap();

let slot_storage = storage.0.get(&slot);
if let Some(slot_storage) = slot_storage {
slot_storage.values().nth(0).unwrap().count()
} else {
0
}
}

fn uncleaned_root_count(&self) -> usize {
self.accounts_index.read().unwrap().uncleaned_roots.len()
}
}

#[test]
fn test_clean_old_with_normal_account() {
solana_logger::setup();

let accounts = AccountsDB::new(Vec::new());
let pubkey = Pubkey::new_rand();
let account = Account::new(1, 0, &Account::default().owner);
//store an account
accounts.store(0, &[(&pubkey, &account)]);
accounts.store(1, &[(&pubkey, &account)]);

// simulate slots are rooted after while
accounts.add_root(0);
accounts.add_root(1);

//even if rooted, old state isn't cleaned up
assert_eq!(accounts.store_count_for_slot(0), 1);
assert_eq!(accounts.store_count_for_slot(1), 1);

accounts.clean_accounts();

//now old state is cleaned up
assert_eq!(accounts.store_count_for_slot(0), 0);
assert_eq!(accounts.store_count_for_slot(1), 1);
}

#[test]
fn test_clean_old_with_zero_lamport_account() {
solana_logger::setup();

let accounts = AccountsDB::new(Vec::new());
let pubkey1 = Pubkey::new_rand();
let pubkey2 = Pubkey::new_rand();
let normal_account = Account::new(1, 0, &Account::default().owner);
let zero_account = Account::new(0, 0, &Account::default().owner);
//store an account
accounts.store(0, &[(&pubkey1, &normal_account)]);
accounts.store(1, &[(&pubkey1, &zero_account)]);
accounts.store(0, &[(&pubkey2, &normal_account)]);
accounts.store(1, &[(&pubkey2, &normal_account)]);

//simulate slots are rooted after while
accounts.add_root(0);
accounts.add_root(1);

//even if rooted, old state isn't cleaned up
assert_eq!(accounts.store_count_for_slot(0), 2);
assert_eq!(accounts.store_count_for_slot(1), 2);

accounts.clean_accounts();

//still old state behind zero-lamport account isn't cleaned up
assert_eq!(accounts.store_count_for_slot(0), 1);
assert_eq!(accounts.store_count_for_slot(1), 2);
}

#[test]
fn test_clean_old_with_both_normal_and_zero_lamport_accounts() {
solana_logger::setup();

let accounts = AccountsDB::new(Vec::new());
let pubkey1 = Pubkey::new_rand();
let pubkey2 = Pubkey::new_rand();
let normal_account = Account::new(1, 0, &Account::default().owner);
let zero_account = Account::new(0, 0, &Account::default().owner);
//store an account
accounts.store(0, &[(&pubkey1, &normal_account)]);
accounts.store(1, &[(&pubkey1, &zero_account)]);
accounts.store(0, &[(&pubkey2, &normal_account)]);
accounts.store(2, &[(&pubkey2, &normal_account)]);

//simulate slots are rooted after while
accounts.add_root(0);
accounts.add_root(1);
accounts.add_root(2);

//even if rooted, old state isn't cleaned up
assert_eq!(accounts.store_count_for_slot(0), 2);
assert_eq!(accounts.store_count_for_slot(1), 1);
assert_eq!(accounts.store_count_for_slot(2), 1);

accounts.clean_accounts();

//both zero lamport and normal accounts are cleaned up
assert_eq!(accounts.store_count_for_slot(0), 0);
assert_eq!(accounts.store_count_for_slot(1), 0);
assert_eq!(accounts.store_count_for_slot(2), 1);
}

#[test]
fn test_uncleaned_roots_with_account() {
solana_logger::setup();

let accounts = AccountsDB::new(Vec::new());
let pubkey = Pubkey::new_rand();
let account = Account::new(1, 0, &Account::default().owner);
//store an account
accounts.store(0, &[(&pubkey, &account)]);
assert_eq!(accounts.uncleaned_root_count(), 0);

// simulate slots are rooted after while
accounts.add_root(0);
assert_eq!(accounts.uncleaned_root_count(), 1);

//now uncleaned roots are cleaned up
accounts.clean_accounts();
assert_eq!(accounts.uncleaned_root_count(), 0);
}

#[test]
fn test_uncleaned_roots_with_no_account() {
solana_logger::setup();

let accounts = AccountsDB::new(Vec::new());

assert_eq!(accounts.uncleaned_root_count(), 0);

// simulate slots are rooted after while
accounts.add_root(0);
assert_eq!(accounts.uncleaned_root_count(), 1);

//now uncleaned roots are cleaned up
accounts.clean_accounts();
assert_eq!(accounts.uncleaned_root_count(), 0);
}

fn print_accounts(label: &'static str, accounts: &AccountsDB) {
print_index(label, accounts);
print_count_and_status(label, accounts);
Expand Down Expand Up @@ -1966,12 +2172,6 @@ pub mod tests {
daccounts
}

fn purge_zero_lamport_accounts(accounts: &AccountsDB, slot: Slot) {
let ancestors = vec![(slot as Slot, 0)].into_iter().collect();
info!("ancestors: {:?}", ancestors);
accounts.purge_zero_lamport_accounts(&ancestors);
}

fn assert_no_stores(accounts: &AccountsDB, slot: Slot) {
let stores = accounts.storage.read().unwrap();
info!("{:?}", stores.0.get(&slot));
Expand Down Expand Up @@ -2014,7 +2214,13 @@ pub mod tests {
current_slot += 1;
accounts.add_root(current_slot);

<<<<<<< HEAD
purge_zero_lamport_accounts(&accounts, current_slot);
=======
print_accounts("pre_purge", &accounts);

accounts.clean_accounts();
>>>>>>> d86103383... Do periodic inbound cleaning for rooted slots (#8436)

print_accounts("post_purge", &accounts);

Expand Down Expand Up @@ -2068,7 +2274,17 @@ pub mod tests {
current_slot += 1;
accounts.add_root(current_slot);

<<<<<<< HEAD
purge_zero_lamport_accounts(&accounts, current_slot);
=======
print_accounts("pre_purge", &accounts);

let ancestors = linear_ancestors(current_slot);
info!("ancestors: {:?}", ancestors);
let hash = accounts.update_accounts_hash(current_slot, &ancestors);

accounts.clean_accounts();
>>>>>>> d86103383... Do periodic inbound cleaning for rooted slots (#8436)

print_accounts("post_purge", &accounts);

Expand Down Expand Up @@ -2134,7 +2350,7 @@ pub mod tests {

print_accounts("accounts", &accounts);

purge_zero_lamport_accounts(&accounts, current_slot);
accounts.clean_accounts();

print_accounts("accounts_post_purge", &accounts);
let accounts = reconstruct_accounts_db_via_serialization(&accounts, current_slot);
Expand Down Expand Up @@ -2196,6 +2412,32 @@ pub mod tests {
}

#[test]
<<<<<<< HEAD
=======
fn test_accounts_purge_chained_purge_before_snapshot_restore() {
solana_logger::setup();
with_chained_zero_lamport_accounts(|accounts, current_slot| {
accounts.clean_accounts();
let accounts = reconstruct_accounts_db_via_serialization(&accounts, current_slot);
(accounts, false)
});
}

#[test]
fn test_accounts_purge_chained_purge_after_snapshot_restore() {
solana_logger::setup();
with_chained_zero_lamport_accounts(|accounts, current_slot| {
let accounts = reconstruct_accounts_db_via_serialization(&accounts, current_slot);
accounts
.dont_cleanup_dead_slots
.store(true, Ordering::Relaxed);
accounts.clean_accounts();
(accounts, true)
});
}

#[test]
>>>>>>> d86103383... Do periodic inbound cleaning for rooted slots (#8436)
#[ignore]
fn test_store_account_stress() {
let slot_id = 42;
Expand Down
Loading

0 comments on commit 4c12331

Please sign in to comment.