Skip to content

Commit

Permalink
adds a shrink policy to the recycler without an allocation limit
Browse files Browse the repository at this point in the history
solana-labs#15320
added an allocation limit to the recycler, which has been the source of a
number of bugs. For example the code bellow panics by simply cloning packets:

    const RECYCLER_LIMIT: usize = 8;
    let recycler = PacketsRecycler::new_with_limit("", RECYCLER_LIMIT as u32);
    let packets = Packets::new_with_recycler(recycler.clone(), 1).unwrap();
    for _ in 0..RECYCLER_LIMIT {
        let _ = packets.clone();
    }
    Packets::new_with_recycler(recycler.clone(), 1);

The implementation also fails to account for instances where objects are
consumed. Having the allocation limit in the recycler also seems out of place,
as higher level code has better context to impose allocation limits (e.g. by
using bounded channels to rate-limit), whereas the recycler would be simpler
and more efficient if it just do the recycling.

This commit:
* Reverts solana-labs#15320
* Adds a shrink policy to the recycler without an allocation limit.
  • Loading branch information
behzadnouri committed Apr 14, 2021
1 parent aa4300f commit b3c5e6c
Showing 1 changed file with 82 additions and 13 deletions.
95 changes: 82 additions & 13 deletions perf/src/recycler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,18 @@ use std::sync::atomic::AtomicBool;
use std::sync::atomic::{AtomicUsize, Ordering};
use std::sync::{Arc, Mutex, Weak};

// A temporary burst in the workload can cause a large number of allocations,
// after which they will be recycled and still reside in memory. If the number
// of recycled objects stays above below limit for long, they will be deemed as
// redundant since they are not getting reused. The recycler will then shrink
// by releasing objects above this threshold. This limit aims to maintain a
// cushion against *normal* variations in the workload while bounding the
// number of redundant garbage collected objects after temporary bursts.
const RECYCLER_SHRINK_SIZE: usize = 1024;
// Lookback window for averaging number of garbage collected objects in terms
// of number of allocations.
const RECYCLER_SHRINK_WINDOW: usize = 16384;

#[derive(Debug, Default)]
struct RecyclerStats {
total: AtomicUsize,
Expand All @@ -21,16 +33,19 @@ pub struct RecyclerX<T> {
gc: Mutex<Vec<T>>,
stats: RecyclerStats,
id: usize,
// Shrink window times the exponential moving average size of gc.len().
size_factor: AtomicUsize,
}

impl<T: Default> Default for RecyclerX<T> {
fn default() -> RecyclerX<T> {
let id = thread_rng().gen_range(0, 1000);
trace!("new recycler..{}", id);
RecyclerX {
gc: Mutex::new(vec![]),
gc: Mutex::default(),
stats: RecyclerStats::default(),
id,
size_factor: AtomicUsize::default(),
}
}
}
Expand Down Expand Up @@ -74,19 +89,37 @@ impl<T: Default + Reset + Sized> Recycler<T> {
}

pub fn allocate(&self, name: &'static str) -> T {
let new = self
.recycler
.gc
.lock()
.expect("recycler lock in pb fn allocate")
.pop();

if let Some(mut x) = new {
self.recycler.stats.reuse.fetch_add(1, Ordering::Relaxed);
x.reset();
return x;
{
const RECYCLER_SHRINK_WINDOW_HALF: usize = RECYCLER_SHRINK_WINDOW / 2;
const RECYCLER_SHRINK_WINDOW_SUB_ONE: usize = RECYCLER_SHRINK_WINDOW - 1;
let mut gc = self.recycler.gc.lock().unwrap();
// Update the exponential moving average of gc.len().
// The update equation is:
// a <- a * (n - 1) / n + x / n
// To avoid floating point math, define b = n a:
// b <- b * (n - 1) / n + x
// To make the remaining division to round (instead of truncate),
// add n/2 to the numerator.
// Effectively b (size_factor here) is an exponential moving
// estimate of the "sum" of x (gc.len()) over the window as opposed
// to the "average".
self.recycler.size_factor.store(
self.recycler
.size_factor
.load(Ordering::Acquire)
.saturating_mul(RECYCLER_SHRINK_WINDOW_SUB_ONE)
.saturating_add(RECYCLER_SHRINK_WINDOW_HALF)
.checked_div(RECYCLER_SHRINK_WINDOW)
.unwrap()
.saturating_add(gc.len()),
Ordering::Release,
);
if let Some(mut x) = gc.pop() {
self.recycler.stats.reuse.fetch_add(1, Ordering::Relaxed);
x.reset();
return x;
}
}

let total = self.recycler.stats.total.fetch_add(1, Ordering::Relaxed);
trace!(
"allocating new: total {} {:?} id: {} reuse: {} max_gc: {}",
Expand All @@ -108,6 +141,16 @@ impl<T: Default + Reset> RecyclerX<T> {
let len = {
let mut gc = self.gc.lock().expect("recycler lock in pub fn recycle");
gc.push(x);
const SIZE_FACTOR_AFTER_SHRINK: usize = RECYCLER_SHRINK_SIZE * RECYCLER_SHRINK_WINDOW;
if gc.len() > RECYCLER_SHRINK_SIZE
&& self.size_factor.load(Ordering::Acquire) >= SIZE_FACTOR_AFTER_SHRINK
{
for mut x in gc.drain(RECYCLER_SHRINK_SIZE..) {
x.set_recycler(Weak::default());
}
self.size_factor
.store(SIZE_FACTOR_AFTER_SHRINK, Ordering::Release);
}
gc.len()
};

Expand Down Expand Up @@ -137,6 +180,8 @@ impl<T: Default + Reset> RecyclerX<T> {
#[cfg(test)]
mod tests {
use super::*;
use crate::packet::PacketsRecycler;
use std::iter::repeat_with;

impl Reset for u64 {
fn reset(&mut self) {
Expand All @@ -159,4 +204,28 @@ mod tests {
assert_eq!(z, 10);
assert_eq!(recycler.recycler.gc.lock().unwrap().len(), 0);
}

#[test]
fn test_recycler_shrink() {
let mut rng = rand::thread_rng();
let recycler = PacketsRecycler::default();
// Allocate a burst of packets.
const NUM_PACKETS: usize = RECYCLER_SHRINK_SIZE * 2;
{
let _packets: Vec<_> = repeat_with(|| recycler.allocate(""))
.take(NUM_PACKETS)
.collect();
}
assert_eq!(recycler.recycler.gc.lock().unwrap().len(), NUM_PACKETS);
// Process a normal load of packets for a while.
for _ in 0..RECYCLER_SHRINK_WINDOW / 16 {
let count = rng.gen_range(1, 128);
let _packets: Vec<_> = repeat_with(|| recycler.allocate("")).take(count).collect();
}
// Assert that the gc size has shrinked.
assert_eq!(
recycler.recycler.gc.lock().unwrap().len(),
RECYCLER_SHRINK_SIZE
);
}
}

0 comments on commit b3c5e6c

Please sign in to comment.