Skip to content

Commit

Permalink
Add shrink logic
Browse files Browse the repository at this point in the history
  • Loading branch information
carllin committed Feb 15, 2021
1 parent 3d23691 commit 1379454
Show file tree
Hide file tree
Showing 5 changed files with 162 additions and 62 deletions.
4 changes: 2 additions & 2 deletions core/src/fetch_stage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ impl FetchStage {
exit: &Arc<AtomicBool>,
sender: &PacketSender,
poh_recorder: &Arc<Mutex<PohRecorder>>,
allocated_packet_limit: Option<usize>,
allocated_packet_limit: Option<u32>,
) -> Self {
let tx_sockets = sockets.into_iter().map(Arc::new).collect();
let tpu_forwards_sockets = tpu_forwards_sockets.into_iter().map(Arc::new).collect();
Expand Down Expand Up @@ -101,7 +101,7 @@ impl FetchStage {
exit: &Arc<AtomicBool>,
sender: &PacketSender,
poh_recorder: &Arc<Mutex<PohRecorder>>,
limit: Option<usize>,
limit: Option<u32>,
) -> Self {
let recycler: PacketsRecycler = Recycler::warmed(1000, 1024, limit);

Expand Down
2 changes: 1 addition & 1 deletion core/src/shred_fetch_stage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -167,7 +167,7 @@ impl ShredFetchStage {
sender: &PacketSender,
bank_forks: Option<Arc<RwLock<BankForks>>>,
exit: &Arc<AtomicBool>,
limit: Option<usize>,
limit: Option<u32>,
) -> Self {
let recycler: PacketsRecycler = Recycler::warmed(100, 1024, limit);

Expand Down
2 changes: 1 addition & 1 deletion core/src/tpu.rs
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ impl Tpu {
&exit,
&packet_sender,
&poh_recorder,
Some((TOTAL_BUFFERED_PACKETS + PACKETS_PER_BATCH - 1) / PACKETS_PER_BATCH),
Some(((TOTAL_BUFFERED_PACKETS + PACKETS_PER_BATCH - 1) / PACKETS_PER_BATCH) as u32),
);
let (verified_sender, verified_receiver) = unbounded();

Expand Down
3 changes: 3 additions & 0 deletions perf/src/cuda_runtime.rs
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,9 @@ impl<T: Default + Clone + Sized> Reset for PinnedVec<T> {
fn set_recycler(&mut self, recycler: Weak<RecyclerX<Self>>) {
self.recycler = Some(recycler);
}
fn unset_recycler(&mut self) {
self.recycler = None;
}
}

impl<T: Clone + Default + Sized> Default for PinnedVec<T> {
Expand Down
213 changes: 155 additions & 58 deletions perf/src/recycler.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,14 @@
use rand::{thread_rng, Rng};
use solana_sdk::timing::timestamp;
use std::sync::atomic::AtomicBool;
use std::sync::atomic::{AtomicUsize, Ordering};
use std::sync::{Arc, Mutex, Weak};

pub const DEFAULT_MINIMUM_OBJECT_COUNT: u32 = 1000;
pub const DEFAULT_SHRINK_RATIO: f64 = 0.80;
pub const DEFAULT_MAX_ABOVE_SHRINK_RATIO_COUNT: u32 = 10;
pub const DEFAULT_CHECK_SHRINK_INTERVAL_MS: u32 = 10000;

#[derive(Debug, Default)]
struct RecyclerStats {
total: AtomicUsize,
Expand All @@ -12,37 +18,74 @@ struct RecyclerStats {
}

#[derive(Clone, Default)]
pub struct Recycler<T> {
pub struct Recycler<T: Reset> {
recycler: Arc<RecyclerX<T>>,
}

#[derive(Debug)]
pub struct RecyclerX<T> {
gc: Mutex<Vec<T>>,
pub struct ObjectPool<T: Reset> {
object_pool: Vec<T>,
shrink_ratio: f64,
minimum_object_count: u32,
above_shrink_ratio_count: u32,
max_above_shrink_ratio_count: u32,
check_shrink_interval_ms: u32,
last_shrink_check_ts: u64,
pub total_allocated_count: u32,
limit: Option<u32>,
}
impl<T: Default + Reset> Default for ObjectPool<T> {
fn default() -> Self {
ObjectPool {
object_pool: vec![],
shrink_ratio: DEFAULT_SHRINK_RATIO,
minimum_object_count: DEFAULT_MINIMUM_OBJECT_COUNT,
above_shrink_ratio_count: 0,
max_above_shrink_ratio_count: DEFAULT_MAX_ABOVE_SHRINK_RATIO_COUNT,
check_shrink_interval_ms: DEFAULT_CHECK_SHRINK_INTERVAL_MS,
last_shrink_check_ts: timestamp(),
total_allocated_count: 0,
limit: None,
}
}
}

impl<T: Default + Reset> ObjectPool<T> {
fn new(limit: Option<u32>) -> Self {
Self {
limit,
..Self::default()
}
}

fn len(&self) -> usize {
self.object_pool.len()
}
}

#[derive(Debug)]
pub struct RecyclerX<T: Reset> {
gc: Mutex<ObjectPool<T>>,
stats: RecyclerStats,
id: usize,
outstanding_len: AtomicUsize,
limit: Option<usize>,
}

impl<T: Default> Default for RecyclerX<T> {
impl<T: Default + Reset> Default for RecyclerX<T> {
fn default() -> RecyclerX<T> {
let id = thread_rng().gen_range(0, 1000);
trace!("new recycler..{}", id);
RecyclerX {
gc: Mutex::new(vec![]),
gc: Mutex::new(ObjectPool::default()),
stats: RecyclerStats::default(),
id,
limit: None,
outstanding_len: AtomicUsize::default(),
}
}
}

impl<T: Default> RecyclerX<T> {
fn new(limit: Option<usize>) -> Self {
impl<T: Default + Reset> RecyclerX<T> {
fn new(limit: Option<u32>) -> Self {
RecyclerX {
limit,
gc: Mutex::new(ObjectPool::new(limit)),
..Self::default()
}
}
Expand All @@ -54,6 +97,9 @@ pub trait Reset {
fn set_recycler(&mut self, recycler: Weak<RecyclerX<Self>>)
where
Self: std::marker::Sized;
fn unset_recycler(&mut self)
where
Self: std::marker::Sized;
}

lazy_static! {
Expand All @@ -69,8 +115,8 @@ fn warm_recyclers() -> bool {
}

impl<T: Default + Reset + Sized> Recycler<T> {
pub fn warmed(num: usize, size_hint: usize, limit: Option<usize>) -> Self {
assert!(num <= limit.unwrap_or(std::usize::MAX));
pub fn warmed(num: u32, size_hint: usize, limit: Option<u32>) -> Self {
assert!(num <= limit.unwrap_or(std::u32::MAX));
let new = Self {
recycler: Arc::new(RecyclerX::new(limit)),
};
Expand All @@ -90,38 +136,95 @@ impl<T: Default + Reset + Sized> Recycler<T> {
}

pub fn allocate(&self, name: &'static str) -> Option<T> {
let new = self
.recycler
.gc
.lock()
.expect("recycler lock in pb fn allocate")
.pop();

if let Some(mut x) = new {
self.recycler.stats.reuse.fetch_add(1, Ordering::Relaxed);
x.reset();
return Some(x);
}
let (mut allocated_object, did_reuse, should_allocate_new) = {
let mut object_pool = self
.recycler
.gc
.lock()
.expect("recycler lock in pb fn allocate");

let total = self.recycler.stats.total.fetch_add(1, Ordering::Relaxed);
info!(
"RECYCLER: allocating new: total {} {:?} id: {} reuse: {} max_gc: {}",
total,
name,
self.recycler.id,
self.recycler.stats.reuse.load(Ordering::Relaxed),
self.recycler.stats.max_gc.load(Ordering::Relaxed),
);
let now = timestamp();
if now.saturating_sub(object_pool.last_shrink_check_ts)
> object_pool.check_shrink_interval_ms as u64
{
object_pool.last_shrink_check_ts = now;
let shrink_threshold_count = (object_pool.shrink_ratio
* object_pool.total_allocated_count as f64)
.ceil() as u32;

// If more than the shrink threshold of all allocated objects are sitting doing nothing,
// increment the `above_shrink_ratio_count`.
if object_pool.len() > object_pool.minimum_object_count as usize
&& object_pool.len() > shrink_threshold_count as usize
{
object_pool.above_shrink_ratio_count += 1;
} else {
object_pool.above_shrink_ratio_count = 0;
}

if object_pool.above_shrink_ratio_count as usize
>= object_pool.max_above_shrink_ratio_count as usize
{
// Do the shrink
let total_alocated_count_shrink_target =
std::cmp::min(object_pool.minimum_object_count, shrink_threshold_count);
let target_num_to_shrink =
object_pool.total_allocated_count - total_alocated_count_shrink_target;
for _ in 0..target_num_to_shrink {
if let Some(mut expired_object) = object_pool.object_pool.pop() {
expired_object.unset_recycler();
// May not be able to shrink exactly `target_num_to_shrink` objects sinc
// in the case of new allocations, `total_allocated_count` is incremented
// before the object is allocated (see `should_allocate_new` logic below).
// This race allows a difference of up to the number of threads allocating
// with this recycler.
object_pool.total_allocated_count -= 1;
}
}

let mut t = T::default();
let should_allocate = self
.recycler
.limit
.map(|limit| self.recycler.outstanding_len.load(Ordering::SeqCst) < limit)
.unwrap_or(true);
if should_allocate {
object_pool.above_shrink_ratio_count = 0;
datapoint_info!(
"recycler_shrink",
(
"total_alocated_count_shrink_target",
total_alocated_count_shrink_target as i64,
i64
),
("target_num_to_shrink", target_num_to_shrink as i64, i64),
(
"total_allocated_count",
object_pool.total_allocated_count as i64,
i64
),
);
}
}

let reused_object = object_pool.object_pool.pop();
if reused_object.is_some() {
(reused_object, true, false)
} else if let Some(limit) = object_pool.limit {
let should_allocate_new = object_pool.total_allocated_count < limit;
if should_allocate_new {
object_pool.total_allocated_count += 1;
}
(None, false, should_allocate_new)
} else {
(None, false, true)
}
};

if did_reuse {
if let Some(reused) = allocated_object.as_mut() {
self.recycler.stats.reuse.fetch_add(1, Ordering::Relaxed);
reused.reset();
return allocated_object;
}
}

if should_allocate_new {
let mut t = T::default();
t.set_recycler(Arc::downgrade(&self.recycler));
self.recycler.outstanding_len.fetch_add(1, Ordering::SeqCst);
Some(t)
} else {
None
Expand All @@ -133,7 +236,7 @@ impl<T: Default + Reset> RecyclerX<T> {
pub fn recycle(&self, x: T) {
let len = {
let mut gc = self.gc.lock().expect("recycler lock in pub fn recycle");
gc.push(x);
gc.object_pool.push(x);
gc.len()
};

Expand All @@ -150,20 +253,13 @@ impl<T: Default + Reset> RecyclerX<T> {
let total = self.stats.total.load(Ordering::Relaxed);
let reuse = self.stats.reuse.load(Ordering::Relaxed);
let freed = self.stats.total.fetch_add(1, Ordering::Relaxed);
if self.gc.lock().unwrap().len() % 1000 == 0 {
datapoint_info!(
"recycler",
("gc_len", len as i64, i64),
(
"outstanding_len",
self.outstanding_len.load(Ordering::Relaxed) as i64,
i64
),
("total", total as i64, i64),
("freed", freed as i64, i64),
("reuse", reuse as i64, i64),
);
}
datapoint_debug!(
"recycler",
("gc_len", len as i64, i64),
("total", total as i64, i64),
("freed", freed as i64, i64),
("reuse", reuse as i64, i64),
);
}
}

Expand All @@ -180,6 +276,7 @@ mod tests {
}
fn warm(&mut self, _size_hint: usize) {}
fn set_recycler(&mut self, _recycler: Weak<RecyclerX<Self>>) {}
fn unset_recycler(&mut self) {}
}

#[test]
Expand Down

0 comments on commit 1379454

Please sign in to comment.