diff --git a/perf/benches/recycler.rs b/perf/benches/recycler.rs new file mode 100644 index 00000000000000..742241514614e5 --- /dev/null +++ b/perf/benches/recycler.rs @@ -0,0 +1,23 @@ +#![feature(test)] + +extern crate test; + +use solana_perf::recycler::Recycler; +use test::Bencher; + +#[bench] +fn bench_recycler(bencher: &mut Bencher) { + solana_logger::setup(); + + let recycler = Recycler::default(); + + for _ in 0..1000 { + let mut p = solana_perf::packet::Packets::with_capacity(128); + p.packets.resize(128, solana_sdk::packet::Packet::default()); + recycler.recycle_for_test(p.packets); + } + + bencher.iter(move || { + recycler.recycle_for_test(recycler.allocate("me")); + }); +} diff --git a/perf/src/cuda_runtime.rs b/perf/src/cuda_runtime.rs index dc9b84a0395ca2..a8f9b01acf6b77 100644 --- a/perf/src/cuda_runtime.rs +++ b/perf/src/cuda_runtime.rs @@ -76,6 +76,9 @@ impl Reset for PinnedVec { fn set_recycler(&mut self, recycler: Weak>) { self.recycler = Some(recycler); } + fn unset_recycler(&mut self) { + self.recycler = None; + } } impl Default for PinnedVec { diff --git a/perf/src/recycler.rs b/perf/src/recycler.rs index 1d94d8e4e216e9..0213e7b232c95c 100644 --- a/perf/src/recycler.rs +++ b/perf/src/recycler.rs @@ -1,7 +1,9 @@ +use log::*; use rand::{thread_rng, Rng}; -use std::sync::atomic::AtomicBool; use std::sync::atomic::{AtomicUsize, Ordering}; use std::sync::{Arc, Mutex, Weak}; +use std::time::Instant; +use std::{collections::VecDeque, sync::atomic::AtomicBool}; #[derive(Debug, Default)] struct RecyclerStats { @@ -18,7 +20,7 @@ pub struct Recycler { #[derive(Debug)] pub struct RecyclerX { - gc: Mutex>, + gc: Mutex>, stats: RecyclerStats, id: usize, } @@ -28,7 +30,7 @@ impl Default for RecyclerX { let id = thread_rng().gen_range(0, 1000); trace!("new recycler..{}", id); RecyclerX { - gc: Mutex::new(vec![]), + gc: Mutex::new(VecDeque::new()), stats: RecyclerStats::default(), id, } @@ -41,6 +43,9 @@ pub trait Reset { fn set_recycler(&mut self, recycler: Weak>) where Self: std::marker::Sized; + fn unset_recycler(&mut self) + where + Self: std::marker::Sized; } lazy_static! { @@ -55,6 +60,9 @@ fn warm_recyclers() -> bool { WARM_RECYCLERS.load(Ordering::Relaxed) } +pub const MAX_INVENTORY_COUNT_WITHOUT_EXPIRATION: usize = 10; +pub const EXPIRATION_TTL_SECONDS: u64 = 3600; + impl Recycler { pub fn warmed(num: usize, size_hint: usize) -> Self { let new = Self::default(); @@ -74,12 +82,26 @@ impl Recycler { } pub fn allocate(&self, name: &'static str) -> T { - let new = self - .recycler - .gc - .lock() - .expect("recycler lock in pb fn allocate") - .pop(); + let new = { + let mut gc = self.recycler.gc.lock().unwrap(); + + // Don't expire when inventory is rather small. At least, + // we shouldn't expire (= drop) the last inventory; we can just + // return it from here. Also add some buffer. Thus, we don't + // expire if less than 10. + if gc.len() > MAX_INVENTORY_COUNT_WITHOUT_EXPIRATION { + if let Some((oldest_time, _old_item)) = gc.front() { + if oldest_time.elapsed().as_secs() >= EXPIRATION_TTL_SECONDS { + let (_, mut expired) = gc.pop_front().unwrap(); + // unref-ing recycler here is crucial to prevent + // the expired from being recycled again via Drop, + // lading to dead lock! + expired.unset_recycler(); + } + } + } + gc.pop_back().map(|(_added_time, item)| item) + }; if let Some(mut x) = new { self.recycler.stats.reuse.fetch_add(1, Ordering::Relaxed); @@ -101,13 +123,17 @@ impl Recycler { t.set_recycler(Arc::downgrade(&self.recycler)); t } + + pub fn recycle_for_test(&self, x: T) { + self.recycler.recycle(x); + } } impl RecyclerX { pub fn recycle(&self, x: T) { let len = { let mut gc = self.gc.lock().expect("recycler lock in pub fn recycle"); - gc.push(x); + gc.push_back((Instant::now(), x)); gc.len() }; @@ -137,13 +163,16 @@ impl RecyclerX { #[cfg(test)] mod tests { use super::*; + use std::time::Duration; + const RESET_VALUE: u64 = 10; impl Reset for u64 { fn reset(&mut self) { - *self = 10; + *self += RESET_VALUE; } fn warm(&mut self, _size_hint: usize) {} fn set_recycler(&mut self, _recycler: Weak>) {} + fn unset_recycler(&mut self) {} } #[test] @@ -153,10 +182,63 @@ mod tests { assert_eq!(y, 0); y = 20; let recycler2 = recycler.clone(); - recycler2.recycler.recycle(y); + recycler2.recycle_for_test(y); assert_eq!(recycler.recycler.gc.lock().unwrap().len(), 1); let z = recycler.allocate("test_recycler2"); - assert_eq!(z, 10); + assert_eq!(z, 20 + RESET_VALUE); assert_eq!(recycler.recycler.gc.lock().unwrap().len(), 0); } + + #[test] + fn test_recycler_ttl() { + let recycler = Recycler::default(); + recycler.recycle_for_test(42); + recycler.recycle_for_test(43); + assert_eq!(recycler.recycler.gc.lock().unwrap().len(), 2); + + // meddle to make the first element to expire + recycler.recycler.gc.lock().unwrap().front_mut().unwrap().0 = + Instant::now() - Duration::from_secs(EXPIRATION_TTL_SECONDS + 1); + + let y: u64 = recycler.allocate("test_recycler1"); + assert_eq!(y, 43 + RESET_VALUE); + assert_eq!(recycler.recycler.gc.lock().unwrap().len(), 1); + + // create enough inventory to trigger expiration + for i in 44..44 + MAX_INVENTORY_COUNT_WITHOUT_EXPIRATION as u64 { + recycler.recycle_for_test(i); + } + assert_eq!(recycler.recycler.gc.lock().unwrap().len(), 11); + + // allocate with expiration causes len to be reduced by 2 + let y: u64 = recycler.allocate("test_recycler1"); + assert_eq!(y, 53 + RESET_VALUE); + assert_eq!(recycler.recycler.gc.lock().unwrap().len(), 9); + + // allocate without expiration causes len to be reduced by 2 + let y: u64 = recycler.allocate("test_recycler1"); + assert_eq!(y, 52 + RESET_VALUE); + assert_eq!(recycler.recycler.gc.lock().unwrap().len(), 8); + } + + #[test] + fn test_recycler_no_deadlock() { + solana_logger::setup(); + + let recycler = + Recycler::>::default(); + + // create bunch of packets and drop at once to force enough inventory + let packets = (0..=MAX_INVENTORY_COUNT_WITHOUT_EXPIRATION) + .into_iter() + .map(|_| recycler.allocate("me")) + .collect::>(); + drop(packets); + + recycler.recycler.gc.lock().unwrap().front_mut().unwrap().0 = + Instant::now() - Duration::from_secs(7200); + + // this drops the first inventory item but shouldn't cause recycling of it! + recycler.recycle_for_test(recycler.allocate("me")); + } }