diff --git a/perf/src/recycler.rs b/perf/src/recycler.rs index 1d94d8e4e216e9..db2d66f2b6b161 100644 --- a/perf/src/recycler.rs +++ b/perf/src/recycler.rs @@ -3,6 +3,18 @@ use std::sync::atomic::AtomicBool; use std::sync::atomic::{AtomicUsize, Ordering}; use std::sync::{Arc, Mutex, Weak}; +// A temporary burst in the workload can cause a large number of allocations, +// after which they will be recycled and still reside in memory. If the number +// of recycled objects stays above below limit for long, they will be deemed as +// redundant since they are not getting reused. The recycler will then shrink +// by releasing objects above this threshold. This limit aims to maintain a +// cushion against *normal* variations in the workload while bounding the +// number of redundant garbage collected objects after temporary bursts. +const RECYCLER_SHRINK_SIZE: usize = 1024; +// Lookback window for averaging number of garbage collected objects in terms +// of number of allocations. +const RECYCLER_SHRINK_WINDOW: usize = 16384; + #[derive(Debug, Default)] struct RecyclerStats { total: AtomicUsize, @@ -21,6 +33,8 @@ pub struct RecyclerX { gc: Mutex>, stats: RecyclerStats, id: usize, + // Shrink window times the exponential moving average size of gc.len(). + size_factor: AtomicUsize, } impl Default for RecyclerX { @@ -28,9 +42,10 @@ impl Default for RecyclerX { let id = thread_rng().gen_range(0, 1000); trace!("new recycler..{}", id); RecyclerX { - gc: Mutex::new(vec![]), + gc: Mutex::default(), stats: RecyclerStats::default(), id, + size_factor: AtomicUsize::default(), } } } @@ -74,19 +89,28 @@ impl Recycler { } pub fn allocate(&self, name: &'static str) -> T { - let new = self - .recycler - .gc - .lock() - .expect("recycler lock in pb fn allocate") - .pop(); - - if let Some(mut x) = new { - self.recycler.stats.reuse.fetch_add(1, Ordering::Relaxed); - x.reset(); - return x; + { + const RECYCLER_SHRINK_WINDOW_HALF: usize = RECYCLER_SHRINK_WINDOW / 2; + const RECYCLER_SHRINK_WINDOW_SUB_ONE: usize = RECYCLER_SHRINK_WINDOW - 1; + let mut gc = self.recycler.gc.lock().unwrap(); + // Update the exponential moving average of gc.len(). + self.recycler.size_factor.store( + self.recycler + .size_factor + .load(Ordering::Acquire) + .saturating_mul(RECYCLER_SHRINK_WINDOW_SUB_ONE) + .saturating_add(RECYCLER_SHRINK_WINDOW_HALF) + .checked_div(RECYCLER_SHRINK_WINDOW) + .unwrap() + .saturating_add(gc.len()), + Ordering::Release, + ); + if let Some(mut x) = gc.pop() { + self.recycler.stats.reuse.fetch_add(1, Ordering::Relaxed); + x.reset(); + return x; + } } - let total = self.recycler.stats.total.fetch_add(1, Ordering::Relaxed); trace!( "allocating new: total {} {:?} id: {} reuse: {} max_gc: {}", @@ -108,6 +132,20 @@ impl RecyclerX { let len = { let mut gc = self.gc.lock().expect("recycler lock in pub fn recycle"); gc.push(x); + // Allow 20% overshoot to amortize down the computation cost. + const SHRINK_THRESHOLD: usize = RECYCLER_SHRINK_SIZE.saturating_mul(12); + if self.size_factor.load(Ordering::Acquire).saturating_mul(10) + >= SHRINK_THRESHOLD.saturating_mul(RECYCLER_SHRINK_WINDOW) + && gc.len().saturating_mul(10) >= SHRINK_THRESHOLD + { + const SIZE_FACTOR_AFTER_SHRINK: usize = + RECYCLER_SHRINK_SIZE * RECYCLER_SHRINK_WINDOW; + for mut x in gc.drain(RECYCLER_SHRINK_SIZE..) { + x.set_recycler(Weak::default()); + } + self.size_factor + .store(SIZE_FACTOR_AFTER_SHRINK, Ordering::Release); + } gc.len() };