diff --git a/ledger/src/snapshot_utils.rs b/ledger/src/snapshot_utils.rs index fa09d108db185c..0db69bf90c47aa 100644 --- a/ledger/src/snapshot_utils.rs +++ b/ledger/src/snapshot_utils.rs @@ -151,6 +151,7 @@ where } pub fn add_snapshot>(snapshot_path: P, bank: &Bank) -> Result<()> { + trace!("ryoqun saving snapshot"); bank.purge_zero_lamport_accounts(); let slot = bank.slot(); // snapshot_path/slot @@ -211,6 +212,7 @@ pub fn bank_from_archive>( snapshot_path: &PathBuf, snapshot_tar: P, ) -> Result { + trace!("ryoqun loading snapshot"); // Untar the snapshot into a temp directory under `snapshot_config.snapshot_path()` let unpack_dir = tempfile::tempdir_in(snapshot_path)?; untar_snapshot_in(&snapshot_tar, &unpack_dir)?; @@ -224,6 +226,7 @@ pub fn bank_from_archive>( unpacked_accounts_dir, )?; + bank.purge_zero_lamport_accounts(); if !bank.verify_snapshot_bank() { panic!("Snapshot bank failed to verify"); } diff --git a/runtime/src/accounts.rs b/runtime/src/accounts.rs index 40c462ed447bbb..d6715a365d4bab 100644 --- a/runtime/src/accounts.rs +++ b/runtime/src/accounts.rs @@ -331,6 +331,10 @@ impl Accounts { self.accounts_db.verify_hash_internal_state(slot, ancestors) } + pub fn verify_account_balances(&self, ancestors: &HashMap) -> bool { + self.accounts_db.verify_account_balances(ancestors) + } + pub fn load_by_program( &self, ancestors: &HashMap, diff --git a/runtime/src/accounts_db.rs b/runtime/src/accounts_db.rs index 4351c039fd4863..fc9e6acda0a151 100644 --- a/runtime/src/accounts_db.rs +++ b/runtime/src/accounts_db.rs @@ -249,6 +249,22 @@ impl AccountStorageEntry { self.accounts.flush() } + pub fn all_existing_accounts(&self) -> Vec { + self.accounts.accounts(0) + } + + pub fn restore_account_count(&self) { + let mut count_and_status = self.count_and_status.write().unwrap(); + let new_count = self.all_existing_accounts().len(); + trace!( + "ryoqun: restored storage: from {:?} {:?} to {}", + self, + *count_and_status, + new_count + ); + *count_and_status = (new_count, count_and_status.1); + } + fn add_account(&self) { let mut count_and_status = self.count_and_status.write().unwrap(); *count_and_status = (count_and_status.0 + 1, count_and_status.1); @@ -289,7 +305,11 @@ impl AccountStorageEntry { if count > 0 { *count_and_status = (count - 1, status); } else { - warn!("count value 0 for slot {}", self.slot_id); + // promoted this to critical because unconsistent ref count directly translates into a corrupted ledger + panic!( + "ryoqun count value 0 for slot {}, {:?}", + self.slot_id, status + ); } count_and_status.0 } @@ -566,6 +586,13 @@ impl AccountsDB { let mut purges = Vec::new(); accounts_index.scan_accounts(ancestors, |pubkey, (account_info, slot)| { if account_info.lamports == 0 && accounts_index.is_root(slot) { + trace!( + "ryoqun purging....{} {} {:?} {}", + pubkey, + slot, + account_info, + accounts_index.is_root(slot) + ); purges.push(*pubkey); } }); @@ -575,10 +602,16 @@ impl AccountsDB { for purge in &purges { reclaims.extend(accounts_index.purge(purge)); } + trace!("ryoqun: zla reclaims {:?}", reclaims); let last_root = accounts_index.last_root; drop(accounts_index); let mut dead_slots = self.remove_dead_accounts(reclaims); + trace!("ryoqun: zla dead slots 2 {:?}", dead_slots); self.cleanup_dead_slots(&mut dead_slots, last_root); + trace!("ryoqun: zla dead slots 2 {:?}", dead_slots); + for slot in dead_slots { + self.purge_slot(slot); + } } pub fn scan_accounts(&self, ancestors: &HashMap, scan_func: F) -> A @@ -632,8 +665,8 @@ impl AccountsDB { storage_maps .into_par_iter() .map(|storage| { - let accounts = storage.accounts.accounts(0); let mut retval = B::default(); + let accounts = storage.all_existing_accounts(); accounts.iter().for_each(|stored_account| { scan_func(stored_account, storage.id, &mut retval) }); @@ -754,15 +787,6 @@ impl AccountsDB { } } - pub fn hash_stored_account(slot: Slot, account: &StoredAccount) -> Hash { - Self::hash_account_data( - slot, - account.account_meta.lamports, - account.data, - &account.meta.pubkey, - ) - } - pub fn hash_account(slot: Slot, account: &Account, pubkey: &Pubkey) -> Hash { Self::hash_account_data(slot, account.lamports, &account.data, pubkey) } @@ -870,7 +894,7 @@ impl AccountsDB { oldest_slot = *slot; } } - info!("accounts_db: total_stores: {} newest_slot: {} oldest_slot: {} max_slot: {} (num={}) min_slot: {} (num={})", + trace!("ryoqun accounts_db: total_stores: {} newest_slot: {} oldest_slot: {} max_slot: {} (num={}) min_slot: {} (num={})", total_count, newest_slot, oldest_slot, max_slot, max, min_slot, min); datapoint_info!("accounts_db-stores", ("total_count", total_count, i64)); } @@ -900,6 +924,16 @@ impl AccountsDB { } } + pub fn verify_account_balances(&self, ancestors: &HashMap) -> bool { + !self.scan_accounts(&ancestors, |collector: &mut bool, option| { + if let Some((_, account, _)) = option { + if account.lamports == 0 { + *collector = true; + } + } + }) + } + pub fn xor_in_hash_state(&self, slot_id: Slot, hash: BankHash) { let mut slot_hashes = self.slot_hashes.write().unwrap(); let slot_hash_state = slot_hashes.entry(slot_id).or_insert_with(BankHash::default); @@ -1086,11 +1120,27 @@ impl AccountsDB { } fn generate_index(&self) { + let mut dead_slots = HashSet::new(); + let storage = self.storage.read().unwrap(); let mut slots: Vec = storage.0.keys().cloned().collect(); slots.sort(); let mut accounts_index = self.accounts_index.write().unwrap(); for slot_id in slots.iter() { + let storage_maps: Vec> = self + .storage + .read() + .unwrap() + .0 + .get(&slot_id) + .unwrap_or(&HashMap::new()) + .values() + .cloned() + .collect(); + for storage in storage_maps.into_iter() { + storage.restore_account_count(); + } + let mut accumulator: Vec> = self .scan_account_storage( *slot_id, @@ -1114,10 +1164,53 @@ impl AccountsDB { AccountsDB::merge(&mut account_maps, &maps); } if !account_maps.is_empty() { + let storage_maps: Vec> = self + .storage + .read() + .unwrap() + .0 + .get(&slot_id) + .unwrap_or(&HashMap::new()) + .values() + .cloned() + .collect(); + for (pubkey, (version, _account_info)) in account_maps.iter() { + storage_maps.iter().for_each(|storage| { + storage.all_existing_accounts().iter().for_each(|a| { + if a.meta.pubkey == *pubkey + && *version != a.meta.write_version + && storage.remove_account() == 0 + { + dead_slots.insert(*slot_id); + } + }) + }); + } accounts_index.roots.insert(*slot_id); - let mut _reclaims: Vec<(u64, AccountInfo)> = vec![]; + trace!("ryoqun account_maps: {:?}", account_maps.len()); + let mut reclaims: Vec<(u64, AccountInfo)> = vec![]; for (pubkey, (_, account_info)) in account_maps.iter() { - accounts_index.insert(*slot_id, pubkey, account_info.clone(), &mut _reclaims); + trace!( + "ryoqun slot: {}, account_info: {:?}", + *slot_id, + account_info + ); + accounts_index.insert(*slot_id, pubkey, account_info.clone(), &mut reclaims); + } + + trace!("ryoqun reclaims: {:?}", reclaims); + for (slot_id, account_info) in reclaims { + if let Some(slot_storage) = storage.0.get(&slot_id) { + if let Some(store) = slot_storage.get(&account_info.id) { + assert_eq!( + slot_id, store.slot_id, + "AccountDB::accounts_index corrupted. Storage should only point to one slot" + ); + if store.remove_account() == 0 { + dead_slots.insert(slot_id); + } + } + } } } } @@ -1170,6 +1263,142 @@ pub mod tests { assert_eq!(db.load_slow(&ancestors, &key), Some((account0, 0))); } + #[test] + fn test_accountsdb_generate_index() { + solana_logger::setup(); + let db = AccountsDB::new(None); + //db.scan_accounts(ancestors: &HashMap, scan_func: F) + //db.generate_index() + let key0 = Pubkey::new_rand(); + let account0 = Account::new(1, 0, &Account::default().owner); + let key1 = Pubkey::new_rand(); + let account1 = Account::new(2, 0, &Account::default().owner); + let key2 = Pubkey::new_rand(); + let account2 = Account::new(3, 0, &Account::default().owner); + let slot_a = 12; + db.add_root(slot_a); + + let slot_b = slot_a + 1; + db.add_root(slot_b); + + db.store(slot_a, &[(&key0, &account0)]); + db.store(slot_a, &[(&key1, &account1)]); + + { + let stores = db.storage.read().unwrap(); + let slot_a_stores = &stores.0.get(&slot_a).unwrap(); + assert_eq!( + slot_a_stores + .values() + .map(|v| v.count()) + .collect::>(), + vec![2] + ); + assert_eq!( + slot_a_stores + .values() + .map(|v| v.all_existing_accounts().len()) + .collect::>(), + vec![2] + ); + } + + db.store(slot_b, &[(&key0, &account0), (&key2, &account2)]); + + { + let stores = db.storage.read().unwrap(); + let slot_a_stores = &stores.0.get(&slot_a).unwrap(); + assert_eq!( + slot_a_stores + .values() + .map(|v| v.count()) + .collect::>(), + vec![1] + ); + assert_eq!( + slot_a_stores + .values() + .map(|v| v.all_existing_accounts().len()) + .collect::>(), + vec![2] + ); + let slot_b_stores = &stores.0.get(&slot_b).unwrap(); + assert_eq!( + slot_b_stores + .values() + .map(|v| v.count()) + .collect::>(), + vec![2] + ); + assert_eq!( + slot_b_stores + .values() + .map(|v| v.all_existing_accounts().len()) + .collect::>(), + vec![2] + ); + } + + let ancestors = vec![(slot_a, 1), (slot_b, 1)].into_iter().collect(); + //assert!(db.verify_hash_internal_state(slot_b, &ancestors)); + + let key3 = Pubkey::new_rand(); + let lamport_0 = 0; + let account3 = Account::new(lamport_0, 0, &Account::default().owner); + assert!(db.verify_account_balances(&ancestors)); + db.store(slot_b, &[(&key3, &account3)]); + + let mut db2 = AccountsDB::new(None); + db2.storage = RwLock::new(db.storage.read().unwrap().clone()); + db2.generate_index(); + db2.purge_zero_lamport_accounts(&ancestors); + assert!(db2.verify_account_balances(&ancestors)); + //assert_eq!(*db2.accounts_index.read().unwrap().account_maps.map(|m| m.read().unwrap()), *db.accounts_index.read().unwrap()); + + let index = db2.accounts_index.read().unwrap(); + + { + let (list, _index) = index.get(&key0, &ancestors).unwrap(); + assert_eq!((&*list).first().map(|item| item.0), Some(slot_b)); + let (list, _index) = index.get(&key1, &ancestors).unwrap(); + assert_eq!((&*list).first().map(|item| item.0), Some(slot_a)); + let (list, _index) = index.get(&key2, &ancestors).unwrap(); + assert_eq!((&*list).first().map(|item| item.0), Some(slot_b)); + + let stores = db2.storage.read().unwrap(); + let slot_a_stores = &stores.0.get(&slot_a).unwrap(); + assert_eq!( + slot_a_stores + .values() + .map(|v| v.count()) + .collect::>(), + vec![1] + ); + assert_eq!( + slot_a_stores + .values() + .map(|v| v.all_existing_accounts().len()) + .collect::>(), + vec![2] + ); + let slot_b_stores = &stores.0.get(&slot_b).unwrap(); + assert_eq!( + slot_b_stores + .values() + .map(|v| v.count()) + .collect::>(), + vec![2] + ); + assert_eq!( + slot_b_stores + .values() + .map(|v| v.all_existing_accounts().len()) + .collect::>(), + vec![2 + 1] + ); + } + } + #[test] fn test_accountsdb_latest_ancestor() { solana_logger::setup(); @@ -1396,6 +1625,7 @@ pub mod tests { let slot_storage = storage.0.get(&slot).unwrap(); let mut total_count: usize = 0; for store in slot_storage.values() { + trace!("ryoqum check_storage: {:?}", store); assert_eq!(store.status(), AccountStorageStatus::Available); total_count += store.count(); } @@ -1735,6 +1965,95 @@ pub mod tests { assert!(check_storage(&daccounts, 2, 31)); } + fn assert_load_account( + accounts: &AccountsDB, + slot: Slot, + pubkey: Pubkey, + expected_lamports: u64, + ) { + let ancestors = vec![(slot, 0)].into_iter().collect(); + let owner = Account::default().owner; + let (account, slot) = accounts + .load_slow(&ancestors, &pubkey) + .unwrap_or((Account::new(0, 0, &owner), slot)); + assert_eq!((account.lamports, slot), (expected_lamports, slot)); + } + + fn reconstruct_accounts_db_via_serialization(accounts: AccountsDB, slot: Slot) -> AccountsDB { + let mut writer = Cursor::new(vec![]); + serialize_into(&mut writer, &AccountsDBSerialize::new(&accounts, slot)).unwrap(); + + let buf = writer.into_inner(); + let mut reader = BufReader::new(&buf[..]); + let daccounts = AccountsDB::new(None); + + let local_paths = { + let paths = daccounts.paths.read().unwrap(); + AccountsDB::format_paths(paths.to_vec()) + }; + + let copied_accounts = TempDir::new().unwrap(); + // Simulate obtaining a copy of the AppendVecs from a tarball + copy_append_vecs(&accounts, copied_accounts.path()).unwrap(); + daccounts + .accounts_from_stream(&mut reader, local_paths, copied_accounts.path()) + .unwrap(); + + daccounts + } + + fn purge_zero_lamport_accounts(accounts: &AccountsDB, slot: Slot) { + let ancestors = vec![(slot as Slot, 0)].into_iter().collect(); + accounts.purge_zero_lamport_accounts(&ancestors); + } + + #[test] + fn test_accounts_db_serialize_zero_and_free() { + solana_logger::setup(); + + let some_lamport = 223; + let zero_lamport = 0; + let no_data = 0; + let owner = Account::default().owner; + + let account = Account::new(some_lamport, no_data, &owner); + let pubkey = Pubkey::new_rand(); + let pubkey2 = Pubkey::new_rand(); + let zero_lamport_account = Account::new(zero_lamport, no_data, &owner); + + let filler_account = Account::new(some_lamport, no_data, &owner); + let filler_account_pubkey = Pubkey::new_rand(); + + let accounts = AccountsDB::new_single(); + + let mut current_slot = 1; + accounts.store(current_slot, &[(&pubkey, &account)]); + //accounts.store(current_slot, &[(&pubkey2, &account)]); // <= Enabling this line causes even my PR fails... + error!("#1: {:#?}", accounts.get_storage_entries()); // #1 + accounts.add_root(current_slot); + + current_slot += 1; + accounts.store(current_slot, &[(&pubkey, &zero_lamport_account)]); + error!("#2: {:#?}", accounts.get_storage_entries()); // #2 + for _ in 0..33000 { + accounts.store(current_slot, &[(&filler_account_pubkey, &filler_account)]); + } + error!("#3: {:#?}", accounts.get_storage_entries()); // #3 + accounts.add_root(current_slot); + + error!("doesn't fail:"); + assert_load_account(&accounts, current_slot, pubkey, zero_lamport); // #4 + + purge_zero_lamport_accounts(&accounts, current_slot); // #5 + error!("#6: {:#?}", accounts.get_storage_entries()); // #6 + + let accounts = reconstruct_accounts_db_via_serialization(accounts, current_slot); + + error!("#7: {:#?}", accounts.get_storage_entries()); // #7 + error!("does fail due to a reconstruction bug:"); + assert_load_account(&accounts, current_slot, pubkey, zero_lamport); + } + #[test] #[ignore] fn test_store_account_stress() { diff --git a/runtime/src/accounts_index.rs b/runtime/src/accounts_index.rs index 0939f3581b057a..292e99d963e772 100644 --- a/runtime/src/accounts_index.rs +++ b/runtime/src/accounts_index.rs @@ -91,7 +91,7 @@ impl AccountsIndex { let _slot_vec = self .account_maps .entry(*pubkey) - .or_insert_with(|| RwLock::new(Vec::with_capacity(32))); + .or_insert_with(|| RwLock::new(Vec::with_capacity(32))); // magic number; use the lockout consant? self.update(slot, pubkey, account_info, reclaims); } diff --git a/runtime/src/append_vec.rs b/runtime/src/append_vec.rs index fa26e60363f198..78ac0e4bff67d3 100644 --- a/runtime/src/append_vec.rs +++ b/runtime/src/append_vec.rs @@ -451,6 +451,21 @@ pub mod tests { } } + #[test] + fn test_append_vec_accounts() { + let path = get_append_vec_path("test_append"); + + let av = AppendVec::new(&path.path, true, 1024 * 1024); + assert_eq!(av.accounts(0).len(), 0); + + let account = create_test_account(0); + av.append_account_test(&account).unwrap(); + assert_eq!(av.accounts(0).len(), 1); + + av.reset(); + assert_eq!(av.accounts(0).len(), 0); + } + #[test] fn test_append_vec_one() { let path = get_append_vec_path("test_append"); diff --git a/runtime/src/bank.rs b/runtime/src/bank.rs index 19036dd6d462dc..d7bc89ce5bee26 100644 --- a/runtime/src/bank.rs +++ b/runtime/src/bank.rs @@ -1442,24 +1442,10 @@ impl Bank { /// A snapshot bank should be purged of 0 lamport accounts which are not part of the hash /// calculation and could shield other real accounts. pub fn verify_snapshot_bank(&self) -> bool { - self.purge_zero_lamport_accounts(); self.rc .accounts .verify_hash_internal_state(self.slot(), &self.ancestors) - && !self.has_accounts_with_zero_lamports() - } - - fn has_accounts_with_zero_lamports(&self) -> bool { - self.rc.accounts.accounts_db.scan_accounts( - &self.ancestors, - |collector: &mut bool, option| { - if let Some((_, account, _)) = option { - if account.lamports == 0 { - *collector = true; - } - } - }, - ) + && self.rc.accounts.verify_account_balances(&self.ancestors) } /// Return the number of hashes per tick @@ -2317,7 +2303,7 @@ mod tests { } fn assert_no_zero_balance_accounts(bank: &Arc) { - assert!(!bank.has_accounts_with_zero_lamports()); + assert!(bank.rc.accounts.verify_account_balances(&bank.ancestors)); } // Test that purging 0 lamports accounts works.