Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Introduce ttl eviction for RecycleStore #15513

Merged
merged 1 commit into from
Feb 25, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 21 additions & 1 deletion runtime/src/accounts_background_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ use std::{
Arc, RwLock,
},
thread::{self, sleep, Builder, JoinHandle},
time::Duration,
time::{Duration, Instant},
};

const INTERVAL_MS: u64 = 100;
Expand All @@ -30,6 +30,13 @@ const SHRUNKEN_ACCOUNT_PER_INTERVAL: usize =
SHRUNKEN_ACCOUNT_PER_SEC / (1000 / INTERVAL_MS as usize);
const CLEAN_INTERVAL_BLOCKS: u64 = 100;

// This value is chosen to spread the dropping cost over 3 expiration checks
// RecycleStores are fully populated almost all of its lifetime. So, otherwise
// this would drop MAX_RECYCLE_STORES mmaps at once in the worst case...
// (Anyway, the dropping part is outside the AccountsDb::recycle_stores lock
// and dropped in this AccountsBackgroundServe, so this shouldn't matter much)
const RECYCLE_STORE_EXPIRATION_INTERVAL_SECS: u64 = crate::accounts_db::EXPIRATION_TTL_SECONDS / 3;

pub type SnapshotRequestSender = Sender<SnapshotRequest>;
pub type SnapshotRequestReceiver = Receiver<SnapshotRequest>;
pub type DroppedSlotsSender = Sender<Slot>;
Expand Down Expand Up @@ -286,6 +293,7 @@ impl AccountsBackgroundService {
let mut last_cleaned_block_height = 0;
let mut removed_slots_count = 0;
let mut total_remove_slots_time = 0;
let mut last_expiration_check_time = Instant::now();
let t_background = Builder::new()
.name("solana-accounts-background".to_string())
.spawn(move || loop {
Expand All @@ -304,6 +312,8 @@ impl AccountsBackgroundService {
&mut total_remove_slots_time,
);

Self::expire_old_recycle_stores(&bank, &mut last_expiration_check_time);

// Check to see if there were any requests for snapshotting banks
// < the current root bank `bank` above.

Expand Down Expand Up @@ -397,6 +407,16 @@ impl AccountsBackgroundService {
*removed_slots_count = 0;
}
}

fn expire_old_recycle_stores(bank: &Bank, last_expiration_check_time: &mut Instant) {
let now = Instant::now();
if now.duration_since(*last_expiration_check_time).as_secs()
> RECYCLE_STORE_EXPIRATION_INTERVAL_SECS
{
bank.expire_old_recycle_stores();
*last_expiration_check_time = now;
}
}
}

#[cfg(test)]
Expand Down
140 changes: 132 additions & 8 deletions runtime/src/accounts_db.rs
Original file line number Diff line number Diff line change
Expand Up @@ -615,27 +615,61 @@ pub struct StoreAccountsTiming {

#[derive(Debug, Default)]
struct RecycleStores {
entries: Vec<Arc<AccountStorageEntry>>,
entries: Vec<(Instant, Arc<AccountStorageEntry>)>,
total_bytes: u64,
}

// 30 min should be enough to be certain there won't be any prospective recycle uses for given
// store entry
// That's because it already processed ~2500 slots and ~25 passes of AccountsBackgroundService
pub const EXPIRATION_TTL_SECONDS: u64 = 1800;

impl RecycleStores {
fn add_entry(&mut self, new_entry: Arc<AccountStorageEntry>) {
self.total_bytes += new_entry.total_bytes();
self.entries.push(new_entry)
self.entries.push((Instant::now(), new_entry))
}

fn iter(&self) -> std::slice::Iter<Arc<AccountStorageEntry>> {
fn iter(&self) -> std::slice::Iter<(Instant, Arc<AccountStorageEntry>)> {
self.entries.iter()
}

fn add_entries(&mut self, new_entries: Vec<Arc<AccountStorageEntry>>) {
self.total_bytes += new_entries.iter().map(|e| e.total_bytes()).sum::<u64>();
self.entries.extend(new_entries);
let now = Instant::now();
for new_entry in new_entries {
self.entries.push((now, new_entry));
}
}

fn expire_old_entries(&mut self) -> Vec<Arc<AccountStorageEntry>> {
let mut expired = vec![];
let now = Instant::now();
let mut expired_bytes = 0;
self.entries.retain(|(recycled_time, entry)| {
if now.duration_since(*recycled_time).as_secs() > EXPIRATION_TTL_SECONDS {
if Arc::strong_count(entry) >= 2 {
warn!(
"Expiring still in-use recycled StorageEntry anyway...: id: {} slot: {}",
entry.append_vec_id(),
entry.slot(),
);
}
expired_bytes += entry.total_bytes();
expired.push(entry.clone());
false
} else {
true
}
});

self.total_bytes -= expired_bytes;

expired
}

fn remove_entry(&mut self, index: usize) -> Arc<AccountStorageEntry> {
let removed_entry = self.entries.swap_remove(index);
let (_added_time, removed_entry) = self.entries.swap_remove(index);
self.total_bytes -= removed_entry.total_bytes();
removed_entry
}
Expand Down Expand Up @@ -2260,7 +2294,7 @@ impl AccountsDb {
let mut min = std::u64::MAX;
let mut avail = 0;
let mut recycle_stores = self.recycle_stores.write().unwrap();
for (i, store) in recycle_stores.iter().enumerate() {
for (i, (_recycled_time, store)) in recycle_stores.iter().enumerate() {
if Arc::strong_count(store) == 1 {
max = std::cmp::max(store.accounts.capacity(), max);
min = std::cmp::min(store.accounts.capacity(), min);
Expand Down Expand Up @@ -2996,6 +3030,25 @@ impl AccountsDb {
self.accounts_cache.report_size();
}

pub fn expire_old_recycle_stores(&self) {
let mut recycle_stores_write_elapsed = Measure::start("recycle_stores_write_time");
let recycle_stores = self.recycle_stores.write().unwrap().expire_old_entries();
recycle_stores_write_elapsed.stop();

let mut drop_storage_entries_elapsed = Measure::start("drop_storage_entries_elapsed");
drop(recycle_stores);
drop_storage_entries_elapsed.stop();

self.clean_accounts_stats
.purge_stats
.drop_storage_entries_elapsed
.fetch_add(drop_storage_entries_elapsed.as_us(), Ordering::Relaxed);
self.clean_accounts_stats
.purge_stats
.recycle_stores_write_elapsed
.fetch_add(recycle_stores_write_elapsed.as_us(), Ordering::Relaxed);
}

// `force_flush` flushes all the cached roots `<= requested_flush_root`. It also then
// flushes:
// 1) Any remaining roots if there are > MAX_CACHE_SLOTS remaining slots in the cache,
Expand Down Expand Up @@ -4495,15 +4548,16 @@ impl AccountsDb {
self.print_count_and_status(label);
info!("recycle_stores:");
let recycle_stores = self.recycle_stores.read().unwrap();
for entry in recycle_stores.iter() {
for (recycled_time, entry) in recycle_stores.iter() {
info!(
" slot: {} id: {} count_and_status: {:?} approx_store_count: {} len: {} capacity: {}",
" slot: {} id: {} count_and_status: {:?} approx_store_count: {} len: {} capacity: {} (recycled: {:?})",
entry.slot(),
entry.append_vec_id(),
*entry.count_and_status.read().unwrap(),
entry.approx_store_count.load(Ordering::Relaxed),
entry.accounts.len(),
entry.accounts.capacity(),
recycled_time,
);
}
}
Expand Down Expand Up @@ -8936,4 +8990,74 @@ pub mod tests {
assert!(slot_stores(&db, 0).is_empty());
assert!(!slot_stores(&db, 1).is_empty());
}

#[test]
fn test_recycle_stores_expiration() {
solana_logger::setup();

let dummy_path = Path::new("");
let dummy_slot = 12;
let dummy_size = 1000;

let dummy_id1 = 22;
let entry1 = Arc::new(AccountStorageEntry::new(
&dummy_path,
dummy_slot,
dummy_id1,
dummy_size,
));

let dummy_id2 = 44;
let entry2 = Arc::new(AccountStorageEntry::new(
&dummy_path,
dummy_slot,
dummy_id2,
dummy_size,
));

let mut recycle_stores = RecycleStores::default();
recycle_stores.add_entry(entry1);
recycle_stores.add_entry(entry2);
assert_eq!(recycle_stores.entry_count(), 2);

// no expiration for newly added entries
let expired = recycle_stores.expire_old_entries();
assert_eq!(
expired
.iter()
.map(|e| e.append_vec_id())
.collect::<Vec<_>>(),
Vec::<AppendVecId>::new()
);
assert_eq!(
recycle_stores
.iter()
.map(|(_, e)| e.append_vec_id())
.collect::<Vec<_>>(),
vec![dummy_id1, dummy_id2]
);
assert_eq!(recycle_stores.entry_count(), 2);
assert_eq!(recycle_stores.total_bytes(), dummy_size * 2);

// expiration for only too old entries
recycle_stores.entries[0].0 =
Instant::now() - Duration::from_secs(EXPIRATION_TTL_SECONDS + 1);
let expired = recycle_stores.expire_old_entries();
assert_eq!(
expired
.iter()
.map(|e| e.append_vec_id())
.collect::<Vec<_>>(),
vec![dummy_id1]
);
assert_eq!(
recycle_stores
.iter()
.map(|(_, e)| e.append_vec_id())
.collect::<Vec<_>>(),
vec![dummy_id2]
);
assert_eq!(recycle_stores.entry_count(), 1);
assert_eq!(recycle_stores.total_bytes(), dummy_size);
}
}
4 changes: 4 additions & 0 deletions runtime/src/bank.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3886,6 +3886,10 @@ impl Bank {
.flush_accounts_cache(false, Some(self.slot()))
}

pub fn expire_old_recycle_stores(&self) {
self.rc.accounts.accounts_db.expire_old_recycle_stores()
}

fn store_account_and_update_capitalization(&self, pubkey: &Pubkey, new_account: &Account) {
if let Some(old_account) = self.get_account(&pubkey) {
match new_account.lamports.cmp(&old_account.lamports) {
Expand Down