Skip to content

Commit

Permalink
rework towards single code path
Browse files Browse the repository at this point in the history
  • Loading branch information
jeffwashington committed May 19, 2021
1 parent 1c999e3 commit 5ea5beb
Showing 1 changed file with 177 additions and 38 deletions.
215 changes: 177 additions & 38 deletions runtime/src/accounts_index.rs
Original file line number Diff line number Diff line change
Expand Up @@ -179,6 +179,18 @@ impl<T: 'static + Clone + IsCached> WriteAccountMapEntry<T> {
&self.borrow_owned_entry_contents().ref_count
}

// create an entry that is equivalent to this process:
// 1. new empty (refcount=0, slot_list={})
// 2. update(slot, account_info)
// This code is called when the first entry [ie. (slot,account_info)] for a pubkey is inserted into the index.
pub fn new_entry_after_update(slot: Slot, account_info: &T) -> AccountMapEntry<T> {
let ref_count = if account_info.is_cached() { 0 } else { 1 };
Arc::new(AccountMapEntryInner {
ref_count: AtomicU64::new(ref_count),
slot_list: RwLock::new(vec![(slot, account_info.clone())]),
})
}

// Try to update an item in the slot list the given `slot` If an item for the slot
// already exists in the list, remove the older item, add it to `reclaims`, and insert
// the new item.
Expand Down Expand Up @@ -842,15 +854,13 @@ impl<T: 'static + Clone + IsCached + ZeroLamport> AccountsIndex<T> {
.map(WriteAccountMapEntry::from_account_map_entry)
}

fn new_entry() -> AccountMapEntry<T> {
Arc::new(AccountMapEntryInner {
ref_count: AtomicU64::new(0),
slot_list: RwLock::new(SlotList::with_capacity(1)),
})
}

fn insert_new_entry_if_missing(&self, pubkey: &Pubkey) -> (WriteAccountMapEntry<T>, bool) {
let new_entry = Self::new_entry();
fn insert_new_entry_if_missing(
&self,
pubkey: &Pubkey,
slot: Slot,
info: &T,
) -> Option<WriteAccountMapEntry<T>> {
let new_entry = WriteAccountMapEntry::new_entry_after_update(slot, info);
let mut w_account_maps = self.get_account_maps_write_lock();
self.insert_new_entry_if_missing_with_lock(pubkey, &mut w_account_maps, new_entry)
}
Expand All @@ -860,29 +870,29 @@ impl<T: 'static + Clone + IsCached + ZeroLamport> AccountsIndex<T> {
pubkey: &Pubkey,
w_account_maps: &mut ReadWriteLockMapType<T>,
new_entry: AccountMapEntry<T>,
) -> (WriteAccountMapEntry<T>, bool) {
) -> Option<WriteAccountMapEntry<T>> {
let mut is_newly_inserted = false;
let account_entry = w_account_maps.entry(*pubkey).or_insert_with(|| {
is_newly_inserted = true;
new_entry
});
let w_account_entry = WriteAccountMapEntry::from_account_map_entry(account_entry.clone());
(w_account_entry, is_newly_inserted)
if is_newly_inserted {
None
} else {
Some(WriteAccountMapEntry::from_account_map_entry(
account_entry.clone(),
))
}
}

fn get_account_write_entry_else_create(
&self,
pubkey: &Pubkey,
) -> (WriteAccountMapEntry<T>, bool) {
let mut w_account_entry = self.get_account_write_entry(pubkey);
let mut is_newly_inserted = false;
if w_account_entry.is_none() {
let entry_is_new = self.insert_new_entry_if_missing(pubkey);
w_account_entry = Some(entry_is_new.0);
is_newly_inserted = entry_is_new.1;
}

(w_account_entry.unwrap(), is_newly_inserted)
slot: Slot,
info: &T,
) -> Option<WriteAccountMapEntry<T>> {
let w_account_entry = self.get_account_write_entry(pubkey);
w_account_entry.or_else(|| self.insert_new_entry_if_missing(pubkey, slot, info))
}

pub fn handle_dead_keys(
Expand Down Expand Up @@ -1178,11 +1188,7 @@ impl<T: 'static + Clone + IsCached + ZeroLamport> AccountsIndex<T> {
let mut is_newly_inserted = false;
let account_entry = w_account_maps.entry(*pubkey).or_insert_with(|| {
is_newly_inserted = true;
// this value is equivalent to what update() below would have created
Arc::new(AccountMapEntryInner {
ref_count: AtomicU64::new(if account_info.is_cached() { 0 } else { 1 }),
slot_list: RwLock::new(vec![(slot, account_info.clone())]),
})
WriteAccountMapEntry::new_entry_after_update(slot, &account_info)
});
if account_info.is_zero_lamport() {
self.zero_lamport_pubkeys.insert(*pubkey);
Expand All @@ -1208,8 +1214,8 @@ impl<T: 'static + Clone + IsCached + ZeroLamport> AccountsIndex<T> {
reclaims: &mut SlotList<T>,
) -> bool {
let is_newly_inserted = {
let (mut w_account_entry, is_newly_inserted) =
self.get_account_write_entry_else_create(pubkey);
let w_account_entry =
self.get_account_write_entry_else_create(pubkey, slot, &account_info);
// We don't atomically update both primary index and secondary index together.
// This certainly creates small time window with inconsistent state across the two indexes.
// However, this is acceptable because:
Expand All @@ -1224,8 +1230,12 @@ impl<T: 'static + Clone + IsCached + ZeroLamport> AccountsIndex<T> {
if account_info.is_zero_lamport() {
self.zero_lamport_pubkeys.insert(*pubkey);
}
w_account_entry.update(slot, account_info, reclaims);
is_newly_inserted
if let Some(mut w_account_entry) = w_account_entry {
w_account_entry.update(slot, account_info, reclaims);
true
} else {
false
}
};
self.update_secondary_indexes(pubkey, account_owner, account_data, account_indexes);
is_newly_inserted
Expand Down Expand Up @@ -2279,26 +2289,155 @@ pub mod tests {
assert_eq!(num, 1);
}

#[test]
fn test_new_entry() {
let slot = 0;
// account_info type that IS cached
let account_info = AccountInfoTest::default();

let new_entry = WriteAccountMapEntry::new_entry_after_update(slot, &account_info);
assert_eq!(new_entry.ref_count.load(Ordering::Relaxed), 0);
assert_eq!(new_entry.slot_list.read().unwrap().capacity(), 1);
assert_eq!(
new_entry.slot_list.read().unwrap().to_vec(),
vec![(slot, account_info)]
);

// account_info type that is NOT cached
let account_info = true;

let new_entry = WriteAccountMapEntry::new_entry_after_update(slot, &account_info);
assert_eq!(new_entry.ref_count.load(Ordering::Relaxed), 1);
assert_eq!(new_entry.slot_list.read().unwrap().capacity(), 1);
assert_eq!(
new_entry.slot_list.read().unwrap().to_vec(),
vec![(slot, account_info)]
);
}

fn test_new_entry_code_paths_helper<
T: 'static + Clone + IsCached + ZeroLamport + std::cmp::PartialEq + std::fmt::Debug,
>(
account_infos: [T; 2],
is_cached: bool,
upsert: bool,
) {
let slot0 = 0;
let slot1 = 1;
let key = Keypair::new().pubkey();

let index = AccountsIndex::<T>::default();
let mut gc = Vec::new();

if upsert {
// insert first entry for pubkey. This will use new_entry_after_update and not call update.
index.upsert(
slot0,
&key,
&Pubkey::default(),
&[],
&AccountSecondaryIndexes::default(),
account_infos[0].clone(),
&mut gc,
);
} else {
let mut lock = index.get_account_maps_write_lock();
index.insert_new_if_missing_into_primary_index(
slot0,
&key,
account_infos[0].clone(),
&mut gc,
&mut lock,
);
}
assert!(gc.is_empty());

// verify the added entry matches expected
{
let entry = index.get_account_read_entry(&key).unwrap();
assert_eq!(
entry.ref_count().load(Ordering::Relaxed),
if is_cached { 0 } else { 1 }
);
let expected = vec![(slot0, account_infos[0].clone())];
assert_eq!(entry.slot_list().to_vec(), expected);
let new_entry = WriteAccountMapEntry::new_entry_after_update(slot0, &account_infos[0]);
assert_eq!(
entry.slot_list().to_vec(),
new_entry.slot_list.read().unwrap().to_vec(),
);
}

// insert second entry for pubkey. This will use update and NOT use new_entry_after_update.
if upsert {
index.upsert(
slot1,
&key,
&Pubkey::default(),
&[],
&AccountSecondaryIndexes::default(),
account_infos[1].clone(),
&mut gc,
);
} else {
let mut lock = index.get_account_maps_write_lock();
index.insert_new_if_missing_into_primary_index(
slot1,
&key,
account_infos[1].clone(),
&mut gc,
&mut lock,
);
}
assert!(gc.is_empty());

{
let entry = index.get_account_read_entry(&key).unwrap();
assert_eq!(
entry.ref_count().load(Ordering::Relaxed),
if is_cached { 0 } else { 2 }
);
assert_eq!(
entry.slot_list().to_vec(),
vec![
(slot0, account_infos[0].clone()),
(slot1, account_infos[1].clone())
]
);

let new_entry = WriteAccountMapEntry::new_entry_after_update(slot1, &account_infos[1]);
assert_eq!(entry.slot_list()[1], new_entry.slot_list.read().unwrap()[0],);
}
}

#[test]
fn test_new_entry_and_update_code_paths() {
for is_upsert in &[false, true] {
// account_info type that IS cached
test_new_entry_code_paths_helper([1.0, 2.0], true, *is_upsert);

// account_info type that is NOT cached
test_new_entry_code_paths_helper([true, false], false, *is_upsert);
}
}

#[test]
fn test_insert_with_lock_no_ancestors() {
let key = Keypair::new();
let index = AccountsIndex::<bool>::default();
let mut gc = Vec::new();
let slot = 0;
let account_info = true;

let new_entry = AccountsIndex::new_entry();
assert_eq!(new_entry.ref_count.load(Ordering::Relaxed), 0);
assert!(new_entry.slot_list.read().unwrap().is_empty());
assert_eq!(new_entry.slot_list.read().unwrap().capacity(), 1);
let new_entry = WriteAccountMapEntry::new_entry_after_update(slot, &account_info);
let mut w_account_maps = index.get_account_maps_write_lock();
let (mut write, insert) = index.insert_new_entry_if_missing_with_lock(
let write = index.insert_new_entry_if_missing_with_lock(
&key.pubkey(),
&mut w_account_maps,
new_entry,
);
assert!(insert);
let mut write = write.unwrap();
drop(w_account_maps);
let account_info = true;
write.update(slot, account_info, &mut gc);
assert!(gc.is_empty());
drop(write);
Expand Down

0 comments on commit 5ea5beb

Please sign in to comment.