Skip to content

Commit

Permalink
adds a shrink policy to the recycler without an allocation limit
Browse files Browse the repository at this point in the history
solana-labs#15320
added an allocation limit to the recycler, which has been the source of a
number of bugs. For example the code bellow panics by simply cloning packets:

    const RECYCLER_LIMIT: usize = 8;
    let recycler = PacketsRecycler::new_with_limit("", RECYCLER_LIMIT as u32);
    let packets = Packets::new_with_recycler(recycler.clone(), 1).unwrap();
    for _ in 0..RECYCLER_LIMIT {
        let _ = packets.clone();
    }
    Packets::new_with_recycler(recycler.clone(), 1);

The implementation also fails to account for instances where objects are
consumed. Having the allocation limit in the recycler also seems out of place,
as higher level code has better context to impose allocation limits (e.g. by
using bounded channels to rate-limit), whereas the recycler would be simpler
and more efficient if it just do the recycling.

This commit:
* Reverts solana-labs#15320
* Adds a shrink policy to the recycler without an allocation limit.
  • Loading branch information
behzadnouri committed Apr 9, 2021
1 parent ae47761 commit 7bd8b45
Showing 1 changed file with 49 additions and 13 deletions.
62 changes: 49 additions & 13 deletions perf/src/recycler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,18 @@ use std::sync::atomic::AtomicBool;
use std::sync::atomic::{AtomicUsize, Ordering};
use std::sync::{Arc, Mutex, Weak};

// A temporary burst in the workload can cause a large number of allocations,
// after which they will be recycled and still reside in memory. If the number
// of recycled objects stays above below limit for long, they will be deemed as
// redundant since they are not getting reused. The recycler will then shrink
// by releasing objects above this threshold. This limit aims to maintain a
// cushion against *normal* variations in the workload while bounding the
// number of redundant garbage collected objects after temporary bursts.
const RECYCLER_SHRINK_SIZE: usize = 1024;
// Lookback window for averaging number of garbage collected objects in terms
// of number of allocations.
const RECYCLER_SHRINK_WINDOW: usize = 16384;

#[derive(Debug, Default)]
struct RecyclerStats {
total: AtomicUsize,
Expand All @@ -21,16 +33,19 @@ pub struct RecyclerX<T> {
gc: Mutex<Vec<T>>,
stats: RecyclerStats,
id: usize,
// Shrink window times the exponential moving average size of gc.len().
size_factor: AtomicUsize,
}

impl<T: Default> Default for RecyclerX<T> {
fn default() -> RecyclerX<T> {
let id = thread_rng().gen_range(0, 1000);
trace!("new recycler..{}", id);
RecyclerX {
gc: Mutex::new(vec![]),
gc: Mutex::default(),
stats: RecyclerStats::default(),
id,
size_factor: AtomicUsize::default(),
}
}
}
Expand Down Expand Up @@ -74,19 +89,26 @@ impl<T: Default + Reset + Sized> Recycler<T> {
}

pub fn allocate(&self, name: &'static str) -> T {
let new = self
.recycler
.gc
.lock()
.expect("recycler lock in pb fn allocate")
.pop();

if let Some(mut x) = new {
self.recycler.stats.reuse.fetch_add(1, Ordering::Relaxed);
x.reset();
return x;
{
const RECYCLER_SHRINK_WINDOW_SUB_ONE: usize = RECYCLER_SHRINK_WINDOW - 1;
let mut gc = self.recycler.gc.lock().unwrap();
// Update the exponential moving average of gc.len().
self.recycler.size_factor.store(
self.recycler
.size_factor
.load(Ordering::Acquire)
.saturating_mul(RECYCLER_SHRINK_WINDOW_SUB_ONE)
.checked_div(RECYCLER_SHRINK_WINDOW)
.unwrap()
.saturating_add(gc.len()),
Ordering::Release,
);
if let Some(mut x) = gc.pop() {
self.recycler.stats.reuse.fetch_add(1, Ordering::Relaxed);
x.reset();
return x;
}
}

let total = self.recycler.stats.total.fetch_add(1, Ordering::Relaxed);
trace!(
"allocating new: total {} {:?} id: {} reuse: {} max_gc: {}",
Expand All @@ -108,6 +130,20 @@ impl<T: Default + Reset> RecyclerX<T> {
let len = {
let mut gc = self.gc.lock().expect("recycler lock in pub fn recycle");
gc.push(x);
// Allow 20% overshoot to amortize down the computation cost.
const SHRINK_THRESHOLD: usize = RECYCLER_SHRINK_SIZE.saturating_mul(12);
if self.size_factor.load(Ordering::Acquire).saturating_mul(10)
>= SHRINK_THRESHOLD.saturating_mul(RECYCLER_SHRINK_WINDOW)
&& gc.len().saturating_mul(10) >= SHRINK_THRESHOLD
{
const SIZE_FACTOR_AFTER_SHRINK: usize =
RECYCLER_SHRINK_SIZE * RECYCLER_SHRINK_WINDOW;
for mut x in gc.drain(RECYCLER_SHRINK_SIZE..) {
x.set_recycler(Weak::default());
}
self.size_factor
.store(SIZE_FACTOR_AFTER_SHRINK, Ordering::Release);
}
gc.len()
};

Expand Down

0 comments on commit 7bd8b45

Please sign in to comment.