Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Introduce background stale AppendVec shrink mechanism #9219

Merged
merged 13 commits into from
Apr 6, 2020
1 change: 1 addition & 0 deletions core/src/accounts_cleanup_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ impl AccountsCleanupService {
}
let bank = bank_forks.read().unwrap().working_bank();
bank.clean_dead_slots();
bank.compact_stale_slots();
sleep(Duration::from_millis(100));
})
.unwrap();
Expand Down
4 changes: 4 additions & 0 deletions ledger-tool/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1027,6 +1027,10 @@ fn main() {

println!("Creating a snapshot of slot {}", bank.slot());
bank.squash();
for slot in 0..=(snapshot_slot - 1000) {
ryoqun marked this conversation as resolved.
Show resolved Hide resolved
println!("shrink: {}", slot);
bank.compact_stale_slot(slot);
}

let temp_dir = tempfile::TempDir::new().unwrap_or_else(|err| {
eprintln!("Unable to create temporary directory: {}", err);
Expand Down
100 changes: 95 additions & 5 deletions runtime/src/accounts_db.rs
Original file line number Diff line number Diff line change
Expand Up @@ -48,12 +48,13 @@ use std::{
fmt,
io::{BufReader, Cursor, Error as IOError, ErrorKind, Read, Result as IOResult},
path::{Path, PathBuf},
sync::atomic::{AtomicBool, AtomicUsize, Ordering},
sync::atomic::{AtomicBool, AtomicUsize, AtomicU64, Ordering},
sync::{Arc, RwLock},
};
use tempfile::TempDir;

pub const DEFAULT_FILE_SIZE: u64 = 4 * 1024 * 1024;
const PAGE_SIZE: u64 = 4 * 1024;
pub const DEFAULT_FILE_SIZE: u64 = PAGE_SIZE * 1024;
pub const DEFAULT_NUM_THREADS: u32 = 8;
pub const DEFAULT_NUM_DIRS: u32 = 4;

Expand Down Expand Up @@ -450,6 +451,7 @@ pub struct AccountsDB {

/// distribute the accounts across storage lists
pub next_id: AtomicUsize,
pub next_compact_slot: AtomicU64,

write_version: AtomicUsize,

Expand Down Expand Up @@ -499,6 +501,7 @@ impl Default for AccountsDB {
accounts_index: RwLock::new(AccountsIndex::default()),
storage: RwLock::new(AccountStorage(HashMap::new())),
next_id: AtomicUsize::new(0),
next_compact_slot: AtomicU64::new(0),
write_version: AtomicUsize::new(0),
paths: vec![],
temp_paths: None,
Expand Down Expand Up @@ -871,6 +874,7 @@ impl AccountsDB {
self.purge_slots(&dead_slots);
purge_slots.stop();


debug!(
"process_dead_slots({}): {} {}",
dead_slots.len(),
Expand All @@ -879,6 +883,79 @@ impl AccountsDB {
);
}

pub fn compact_stale_slot(&self, next_compact_slot: Slot) {
let mut stored_accounts = vec![];
{
let storage = self.storage.read().unwrap();
// older than recent_root stores must be root!
if let Some(stores) = storage.0.get(&next_compact_slot) {
// bail out early if the total alive account in count_and_status is big.
for store in stores.values() {
let mut start = 0;
while let Some((account, next)) = store.accounts.get_account(start) {
// inherit write version?
stored_accounts.push((account.meta.pubkey, account.clone_account(), *account.hash, next - start, (store.id, account.offset)));
start = next;
}
}
}
}
error!("found {} stored accounts!", stored_accounts.len());
let no_ancestors = HashMap::new();
let alive_accounts: Vec<_> = {
let accounts_index = self.accounts_index.read().unwrap();
stored_accounts.iter().filter(|(pubkey, account, _hash, _storage_size, (store_id, offset))| {
if let Some((list, _)) = accounts_index.get(pubkey, &no_ancestors) {
// consider offset!!
list.iter().any(|(_slot, i)| i.store_id == *store_id && i.offset == *offset)
} else {
false
}
}).collect()
};
let alive_account_storage_total: u64 = alive_accounts.iter().fold(0, |t, (_, _, _, a, _)| t + *a as u64);
let aligned: u64 = (alive_account_storage_total + (PAGE_SIZE - 1)) & !(PAGE_SIZE - 1);
error!("found {} alive alive_accounts! ({} bytes/{} aligned bytes)", alive_accounts.len(), alive_account_storage_total, aligned);
// more smart predicate to shrink or not; don't do this when already shrank or there is
// little room for shrinking
if aligned > 0 {
let store = self.create_and_insert_store(next_compact_slot, aligned);
let mut accounts = vec![];
let mut hashes = vec![];
for (pubkey, account, hash, _size, _location) in alive_accounts {
accounts.push((pubkey, account));
hashes.push(*hash);
}
let write_version = self.write_version.fetch_add(accounts.len(), Ordering::Relaxed) as u64;
let infos = self.store_accounts_to(next_compact_slot, &accounts, &hashes, store, write_version);
let reclaims = self.update_index(next_compact_slot, infos, &accounts);
trace!("reclaim: {}", reclaims.len());

self.handle_reclaims(&reclaims);
ryoqun marked this conversation as resolved.
Show resolved Hide resolved
}
}

pub fn compact_stale_slots(&self) {
// move this into its own method and directly called from the background service
error!("compact!");
let recent_root = {
let accounts_index = self.accounts_index.read().unwrap();
accounts_index.uncleaned_roots.iter().next().cloned()
};
if let Some(recent_root) = recent_root {
let next_compact_slot = self.next_compact_slot.load(Ordering::Relaxed);
if next_compact_slot < recent_root - 100 {
self.compact_stale_slot(next_compact_slot);

// loop to the first scan from first
// introduce one more level of loop and abort on the certain index look-up count?
self.next_compact_slot.fetch_add(1, Ordering::Relaxed);
error!("next: {}, recent: {}", next_compact_slot, recent_root);
}
}

}

pub fn scan_accounts<F, A>(&self, ancestors: &HashMap<Slot, usize>, scan_func: F) -> A
where
F: Fn(&mut A, Option<(&Pubkey, Account, Slot)>) -> (),
Expand Down Expand Up @@ -1145,13 +1222,26 @@ impl AccountsDB {
slot: Slot,
accounts: &[(&Pubkey, &Account)],
hashes: &[Hash],
) -> Vec<AccountInfo> {
let write_version = self.write_version.fetch_add(accounts.len(), Ordering::Relaxed) as u64;
ryoqun marked this conversation as resolved.
Show resolved Hide resolved
let storage = self.find_storage_candidate(slot);
self.store_accounts_to(slot, accounts, hashes, storage, write_version)
}

fn store_accounts_to(
&self,
slot: Slot,
accounts: &[(&Pubkey, &Account)],
hashes: &[Hash],
storage: Arc<AccountStorageEntry>,
write_version: u64,
) -> Vec<AccountInfo> {
let default_account = Account::default();
let mut i = 0;

let with_meta: Vec<(StoredMeta, &Account)> = accounts
.iter()
.map(|(pubkey, account)| {
let write_version = self.write_version.fetch_add(1, Ordering::Relaxed) as u64;
let account = if account.lamports == 0 {
&default_account
} else {
Expand All @@ -1160,16 +1250,16 @@ impl AccountsDB {
let data_len = account.data.len() as u64;

let meta = StoredMeta {
write_version,
write_version: write_version + i,
pubkey: **pubkey,
data_len,
};
i += 1;
(meta, account)
})
.collect();
let mut infos: Vec<AccountInfo> = Vec::with_capacity(with_meta.len());
while infos.len() < with_meta.len() {
let storage = self.find_storage_candidate(slot);
let rvs = storage
.accounts
.append_accounts(&with_meta[infos.len()..], &hashes[infos.len()..]);
Expand Down
8 changes: 8 additions & 0 deletions runtime/src/bank.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2203,6 +2203,14 @@ impl Bank {
pub fn clean_dead_slots(&self) {
self.rc.accounts.accounts_db.process_dead_slots();
}

pub fn compact_stale_slots(&self) {
self.rc.accounts.accounts_db.compact_stale_slots();
}

pub fn compact_stale_slot(&self, slot: Slot) {
self.rc.accounts.accounts_db.compact_stale_slot(slot);
}
}

impl Drop for Bank {
Expand Down