From c8971e5259ff6a0d8fffd730f2ee9fe80de5f620 Mon Sep 17 00:00:00 2001 From: Ryo Onodera Date: Tue, 23 Feb 2021 16:28:29 +0900 Subject: [PATCH] Introduce ttl eviction for RecycleStore --- runtime/src/accounts_background_service.rs | 22 +++- runtime/src/accounts_db.rs | 140 +++++++++++++++++++-- runtime/src/bank.rs | 4 + 3 files changed, 157 insertions(+), 9 deletions(-) diff --git a/runtime/src/accounts_background_service.rs b/runtime/src/accounts_background_service.rs index da29bc8764ec78..4091abd6b0318f 100644 --- a/runtime/src/accounts_background_service.rs +++ b/runtime/src/accounts_background_service.rs @@ -21,7 +21,7 @@ use std::{ Arc, RwLock, }, thread::{self, sleep, Builder, JoinHandle}, - time::Duration, + time::{Duration, Instant}, }; const INTERVAL_MS: u64 = 100; @@ -30,6 +30,13 @@ const SHRUNKEN_ACCOUNT_PER_INTERVAL: usize = SHRUNKEN_ACCOUNT_PER_SEC / (1000 / INTERVAL_MS as usize); const CLEAN_INTERVAL_BLOCKS: u64 = 100; +// This value is chosen to spread the dropping cost over 3 expiration checks +// RecycleStores are fully populated almost all of its lifetime. So, otherwise +// this would drop MAX_RECYCLE_STORES mmaps at once in the worst case... +// (Anyway, the dropping part is outside the AccountsDb::recycle_stores lock +// and dropped in this AccountsBackgroundServe, so this shouldn't matter much) +const RECYCLE_STORE_EXPIRATION_INTERVAL_SECS: u64 = crate::accounts_db::EXPIRATION_TTL_SECONDS / 3; + pub type SnapshotRequestSender = Sender; pub type SnapshotRequestReceiver = Receiver; pub type DroppedSlotsSender = Sender; @@ -286,6 +293,7 @@ impl AccountsBackgroundService { let mut last_cleaned_block_height = 0; let mut removed_slots_count = 0; let mut total_remove_slots_time = 0; + let mut last_expiration_check_time = Instant::now(); let t_background = Builder::new() .name("solana-accounts-background".to_string()) .spawn(move || loop { @@ -304,6 +312,8 @@ impl AccountsBackgroundService { &mut total_remove_slots_time, ); + Self::expire_old_recycle_stores(&bank, &mut last_expiration_check_time); + // Check to see if there were any requests for snapshotting banks // < the current root bank `bank` above. @@ -397,6 +407,16 @@ impl AccountsBackgroundService { *removed_slots_count = 0; } } + + fn expire_old_recycle_stores(bank: &Bank, last_expiration_check_time: &mut Instant) { + let now = Instant::now(); + if now.duration_since(*last_expiration_check_time).as_secs() + > RECYCLE_STORE_EXPIRATION_INTERVAL_SECS + { + bank.expire_old_recycle_stores(); + *last_expiration_check_time = now; + } + } } #[cfg(test)] diff --git a/runtime/src/accounts_db.rs b/runtime/src/accounts_db.rs index 22a57c8261d549..dd23796fdffed1 100644 --- a/runtime/src/accounts_db.rs +++ b/runtime/src/accounts_db.rs @@ -615,27 +615,61 @@ pub struct StoreAccountsTiming { #[derive(Debug, Default)] struct RecycleStores { - entries: Vec>, + entries: Vec<(Instant, Arc)>, total_bytes: u64, } +// 30 min should be enough to be certain there won't be any prospective recycle uses for given +// store entry +// That's because it already processed ~2500 slots and ~25 passes of AccountsBackgroundService +pub const EXPIRATION_TTL_SECONDS: u64 = 1800; + impl RecycleStores { fn add_entry(&mut self, new_entry: Arc) { self.total_bytes += new_entry.total_bytes(); - self.entries.push(new_entry) + self.entries.push((Instant::now(), new_entry)) } - fn iter(&self) -> std::slice::Iter> { + fn iter(&self) -> std::slice::Iter<(Instant, Arc)> { self.entries.iter() } fn add_entries(&mut self, new_entries: Vec>) { self.total_bytes += new_entries.iter().map(|e| e.total_bytes()).sum::(); - self.entries.extend(new_entries); + let now = Instant::now(); + for new_entry in new_entries { + self.entries.push((now, new_entry)); + } + } + + fn expire_old_entries(&mut self) -> Vec> { + let mut expired = vec![]; + let now = Instant::now(); + let mut expired_bytes = 0; + self.entries.retain(|(recycled_time, entry)| { + if now.duration_since(*recycled_time).as_secs() > EXPIRATION_TTL_SECONDS { + if Arc::strong_count(entry) >= 2 { + warn!( + "Expiring still in-use recycled StorageEntry anyway...: id: {} slot: {}", + entry.append_vec_id(), + entry.slot(), + ); + } + expired_bytes += entry.total_bytes(); + expired.push(entry.clone()); + false + } else { + true + } + }); + + self.total_bytes -= expired_bytes; + + expired } fn remove_entry(&mut self, index: usize) -> Arc { - let removed_entry = self.entries.swap_remove(index); + let (_added_time, removed_entry) = self.entries.swap_remove(index); self.total_bytes -= removed_entry.total_bytes(); removed_entry } @@ -2260,7 +2294,7 @@ impl AccountsDb { let mut min = std::u64::MAX; let mut avail = 0; let mut recycle_stores = self.recycle_stores.write().unwrap(); - for (i, store) in recycle_stores.iter().enumerate() { + for (i, (_recycled_time, store)) in recycle_stores.iter().enumerate() { if Arc::strong_count(store) == 1 { max = std::cmp::max(store.accounts.capacity(), max); min = std::cmp::min(store.accounts.capacity(), min); @@ -2996,6 +3030,25 @@ impl AccountsDb { self.accounts_cache.report_size(); } + pub fn expire_old_recycle_stores(&self) { + let mut recycle_stores_write_elapsed = Measure::start("recycle_stores_write_time"); + let recycle_stores = self.recycle_stores.write().unwrap().expire_old_entries(); + recycle_stores_write_elapsed.stop(); + + let mut drop_storage_entries_elapsed = Measure::start("drop_storage_entries_elapsed"); + drop(recycle_stores); + drop_storage_entries_elapsed.stop(); + + self.clean_accounts_stats + .purge_stats + .drop_storage_entries_elapsed + .fetch_add(drop_storage_entries_elapsed.as_us(), Ordering::Relaxed); + self.clean_accounts_stats + .purge_stats + .recycle_stores_write_elapsed + .fetch_add(recycle_stores_write_elapsed.as_us(), Ordering::Relaxed); + } + // `force_flush` flushes all the cached roots `<= requested_flush_root`. It also then // flushes: // 1) Any remaining roots if there are > MAX_CACHE_SLOTS remaining slots in the cache, @@ -4495,15 +4548,16 @@ impl AccountsDb { self.print_count_and_status(label); info!("recycle_stores:"); let recycle_stores = self.recycle_stores.read().unwrap(); - for entry in recycle_stores.iter() { + for (recycled_time, entry) in recycle_stores.iter() { info!( - " slot: {} id: {} count_and_status: {:?} approx_store_count: {} len: {} capacity: {}", + " slot: {} id: {} count_and_status: {:?} approx_store_count: {} len: {} capacity: {} (recycled: {:?})", entry.slot(), entry.append_vec_id(), *entry.count_and_status.read().unwrap(), entry.approx_store_count.load(Ordering::Relaxed), entry.accounts.len(), entry.accounts.capacity(), + recycled_time, ); } } @@ -8936,4 +8990,74 @@ pub mod tests { assert!(slot_stores(&db, 0).is_empty()); assert!(!slot_stores(&db, 1).is_empty()); } + + #[test] + fn test_recycle_stores_expiration() { + solana_logger::setup(); + + let dummy_path = Path::new(""); + let dummy_slot = 12; + let dummy_size = 1000; + + let dummy_id1 = 22; + let entry1 = Arc::new(AccountStorageEntry::new( + &dummy_path, + dummy_slot, + dummy_id1, + dummy_size, + )); + + let dummy_id2 = 44; + let entry2 = Arc::new(AccountStorageEntry::new( + &dummy_path, + dummy_slot, + dummy_id2, + dummy_size, + )); + + let mut recycle_stores = RecycleStores::default(); + recycle_stores.add_entry(entry1); + recycle_stores.add_entry(entry2); + assert_eq!(recycle_stores.entry_count(), 2); + + // no expiration for newly added entries + let expired = recycle_stores.expire_old_entries(); + assert_eq!( + expired + .iter() + .map(|e| e.append_vec_id()) + .collect::>(), + Vec::::new() + ); + assert_eq!( + recycle_stores + .iter() + .map(|(_, e)| e.append_vec_id()) + .collect::>(), + vec![dummy_id1, dummy_id2] + ); + assert_eq!(recycle_stores.entry_count(), 2); + assert_eq!(recycle_stores.total_bytes(), dummy_size * 2); + + // expiration for only too old entries + recycle_stores.entries[0].0 = + Instant::now() - Duration::from_secs(EXPIRATION_TTL_SECONDS + 1); + let expired = recycle_stores.expire_old_entries(); + assert_eq!( + expired + .iter() + .map(|e| e.append_vec_id()) + .collect::>(), + vec![dummy_id1] + ); + assert_eq!( + recycle_stores + .iter() + .map(|(_, e)| e.append_vec_id()) + .collect::>(), + vec![dummy_id2] + ); + assert_eq!(recycle_stores.entry_count(), 1); + assert_eq!(recycle_stores.total_bytes(), dummy_size); + } } diff --git a/runtime/src/bank.rs b/runtime/src/bank.rs index 5dcd58e17a6a84..c8aeccb4f400a6 100644 --- a/runtime/src/bank.rs +++ b/runtime/src/bank.rs @@ -3886,6 +3886,10 @@ impl Bank { .flush_accounts_cache(false, Some(self.slot())) } + pub fn expire_old_recycle_stores(&self) { + self.rc.accounts.accounts_db.expire_old_recycle_stores() + } + fn store_account_and_update_capitalization(&self, pubkey: &Pubkey, new_account: &Account) { if let Some(old_account) = self.get_account(&pubkey) { match new_account.lamports.cmp(&old_account.lamports) {