Skip to content

Commit

Permalink
fix: track pending as vec (will delete this shortly)
Browse files Browse the repository at this point in the history
  • Loading branch information
dav1do committed Aug 13, 2024
1 parent 2dd6e6b commit 82e59d7
Show file tree
Hide file tree
Showing 2 changed files with 112 additions and 12 deletions.
2 changes: 1 addition & 1 deletion recon/src/protocol.rs
Original file line number Diff line number Diff line change
Expand Up @@ -751,7 +751,7 @@ where
let new = ReconItem::new(key, value);
if let Some(ok_now) = self.pending.remove_by_needed(&new) {
self.event_q.push(new);
self.event_q.push(ok_now);
self.event_q.extend(ok_now);
} else if !self.pending.is_tracking(&new) {
self.event_q.push(new);
}
Expand Down
122 changes: 111 additions & 11 deletions recon/src/recon/pending_cache.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ use crate::{Key, PendingItem, ReconItem};
/// This struct manages tracking items that we attempted to deliver but could not be processed
/// because we need to discover additional information.
pub struct PendingCache<K: Key> {
by_needed_key: HashMap<Vec<u8>, ReconItem<K>>,
by_needed_key: HashMap<Vec<u8>, Vec<ReconItem<K>>>,
item_keys: std::collections::HashSet<Vec<u8>>,
max_size: usize,
}
Expand All @@ -26,14 +26,19 @@ impl<K: Key> PendingCache<K> {
}

fn size(&self) -> usize {
debug_assert_eq!(self.by_needed_key.len(), self.item_keys.len());
debug_assert_eq!(
self.by_needed_key.values().map(|i| i.len()).sum::<usize>(),
self.item_keys.len(),
);
self.by_needed_key.len()
}

/// Stop tracking an item and return it if the key it the item it required shows up
pub fn remove_by_needed(&mut self, new_item: &ReconItem<K>) -> Option<ReconItem<K>> {
pub fn remove_by_needed(&mut self, new_item: &ReconItem<K>) -> Option<Vec<ReconItem<K>>> {
if let Some(ok_now) = self.by_needed_key.remove(new_item.key.as_bytes()) {
self.item_keys.remove(ok_now.key.as_bytes());
ok_now.iter().for_each(|i| {
self.item_keys.remove(i.key.as_bytes());
});
Some(ok_now)
} else {
None
Expand All @@ -50,11 +55,20 @@ impl<K: Key> PendingCache<K> {
pub fn track_pending(&mut self, items: &mut Vec<PendingItem<K>>) -> (usize, usize) {
let mut tracked = 0;
for val in items.drain(0..self.capacity().min(items.len())) {
if !self.item_keys.contains(val.item.key.as_bytes()) {
self.item_keys.insert(val.item.key.as_bytes().to_vec());
self.by_needed_key
.insert(val.required_key.as_bytes().to_vec(), val.item);
tracked += 1;
match self
.by_needed_key
.entry(val.required_key.as_bytes().to_vec())
{
std::collections::hash_map::Entry::Occupied(mut o) => {
self.item_keys.insert(val.item.key.as_bytes().to_vec());
o.get_mut().push(val.item);
tracked += 1;
}
std::collections::hash_map::Entry::Vacant(v) => {
self.item_keys.insert(val.item.key.as_bytes().to_vec());
v.insert(vec![val.item]);
tracked += 1;
}
}
}

Expand Down Expand Up @@ -97,24 +111,27 @@ mod test {
let expected_items = items.clone();
let expected_cache_size = cache.size() + expected_cached;
let expected_remaining = items.len() - expected_cached;

cache.track_pending(&mut items);

assert_eq!(expected_cache_size, cache.item_keys.len());
assert_eq!(expected_cache_size, cache.by_needed_key.len());
assert_eq!(expected_remaining, items.len(), "{:?}", items);

// we cache the vec as a stack so we need to use the original index from the list
for (i, v) in expected_items.into_iter().take(expected_cached).enumerate() {
assert!(
cache.is_tracking(&v.item),
"not tracking: {:?} {:?}",
v,
cache
);

let req = required_item(i);
let cached = cache.remove_by_needed(&req).unwrap_or_else(|| {
panic!("should have cached {:?} by {:?} cache={:?}", v, req, cache)
});
assert_eq!(v.item, cached);

assert_eq!(vec![v.item], cached);
}

assert_eq!(0, cache.item_keys.len());
Expand Down Expand Up @@ -143,4 +160,87 @@ mod test {
let items = get_items(20);
cache_and_assert(&mut cache, items, 10);
}

#[test]
fn pending_caches_drops_too_many_need_same_key() {
let expected_cached = 10;
let mut cache = PendingCache::new(expected_cached);
let mut items = Vec::with_capacity(20);

fn required_item_dup_keys(i: usize) -> ReconItem<AlphaNumBytes> {
let req = if i % 2 == 0 {
// all even depends on the same item
AlphaNumBytes::from(format!("{}", 1_000))
} else {
AlphaNumBytes::from(format!("{}", i + REQUIRED_OFFSET))
};
let req_item = req.as_bytes().to_vec();
ReconItem::new(req, req_item)
}

for i in 0..20 {
let id = AlphaNumBytes::from(format!("{}", i));
let req = required_item_dup_keys(i);
let item = ReconItem::new(id, i.to_le_bytes().to_vec());
items.push(PendingItem::new(req.key, item));
}
let expected_cached = 10;

let expected_items: Vec<_> = items.clone().into_iter().take(expected_cached).collect();
let expected_cache_size = cache.size() + expected_cached;
let expected_remaining = items.len() - expected_cached;

cache.track_pending(&mut items);

assert_eq!(expected_cache_size, cache.item_keys.len());
assert_eq!(
expected_cache_size,
cache.by_needed_key.values().map(|i| i.len()).sum::<usize>()
);
assert_eq!(expected_remaining, items.len(), "{:?}", items);

// cached everything we expected
for v in expected_items.iter() {
assert!(
cache.is_tracking(&v.item),
"not tracking: {:?} {:?}",
v,
cache
);
}

// odd items are all a vec of one item
for (i, v) in expected_items.iter().enumerate() {
if i % 2 == 0 {
continue;
}
let req = required_item_dup_keys(i);
let cached = cache.remove_by_needed(&req).unwrap_or_else(|| {
panic!("should have cached {:?} by {:?} cache={:?}", v, req, cache)
});

assert_eq!(vec![v.item.clone()], cached);
}

// the even items are under one key
let req = required_item_dup_keys(0); // anything even
let cached = cache
.remove_by_needed(&req)
.unwrap_or_else(|| panic!("should have cached vec by {:?} cache={:?}", req, cache));

let even: Vec<_> =
expected_items
.into_iter()
.enumerate()
.fold(vec![], |mut even, (i, v)| {
if i % 2 == 0 {
even.push(v.item);
}
even
});
assert_eq!(even, cached);

assert_eq!(0, cache.item_keys.len());
assert_eq!(0, cache.by_needed_key.len());
}
}

0 comments on commit 82e59d7

Please sign in to comment.