diff --git a/core/src/fetch_stage.rs b/core/src/fetch_stage.rs index 33a3e47f974c34..6c5ca6e9803053 100644 --- a/core/src/fetch_stage.rs +++ b/core/src/fetch_stage.rs @@ -46,7 +46,7 @@ impl FetchStage { exit: &Arc, sender: &PacketSender, poh_recorder: &Arc>, - allocated_packet_limit: Option, + allocated_packet_limit: Option, ) -> Self { let tx_sockets = sockets.into_iter().map(Arc::new).collect(); let tpu_forwards_sockets = tpu_forwards_sockets.into_iter().map(Arc::new).collect(); @@ -101,7 +101,7 @@ impl FetchStage { exit: &Arc, sender: &PacketSender, poh_recorder: &Arc>, - limit: Option, + limit: Option, ) -> Self { let recycler: PacketsRecycler = Recycler::warmed(1000, 1024, limit); diff --git a/core/src/shred_fetch_stage.rs b/core/src/shred_fetch_stage.rs index 09cad166994e05..22a6e0fe5091d7 100644 --- a/core/src/shred_fetch_stage.rs +++ b/core/src/shred_fetch_stage.rs @@ -167,7 +167,7 @@ impl ShredFetchStage { sender: &PacketSender, bank_forks: Option>>, exit: &Arc, - limit: Option, + limit: Option, ) -> Self { let recycler: PacketsRecycler = Recycler::warmed(100, 1024, limit); diff --git a/core/src/tpu.rs b/core/src/tpu.rs index 1d3b826fed5155..05f7a8ac3e38b1 100644 --- a/core/src/tpu.rs +++ b/core/src/tpu.rs @@ -68,7 +68,7 @@ impl Tpu { &exit, &packet_sender, &poh_recorder, - Some((TOTAL_BUFFERED_PACKETS + PACKETS_PER_BATCH - 1) / PACKETS_PER_BATCH), + Some(((TOTAL_BUFFERED_PACKETS + PACKETS_PER_BATCH - 1) / PACKETS_PER_BATCH) as u32), ); let (verified_sender, verified_receiver) = unbounded(); diff --git a/perf/src/cuda_runtime.rs b/perf/src/cuda_runtime.rs index dc9b84a0395ca2..a8f9b01acf6b77 100644 --- a/perf/src/cuda_runtime.rs +++ b/perf/src/cuda_runtime.rs @@ -76,6 +76,9 @@ impl Reset for PinnedVec { fn set_recycler(&mut self, recycler: Weak>) { self.recycler = Some(recycler); } + fn unset_recycler(&mut self) { + self.recycler = None; + } } impl Default for PinnedVec { diff --git a/perf/src/recycler.rs b/perf/src/recycler.rs index 5e94ed92bd6d01..126350fb709510 100644 --- a/perf/src/recycler.rs +++ b/perf/src/recycler.rs @@ -1,8 +1,14 @@ use rand::{thread_rng, Rng}; +use solana_sdk::timing::timestamp; use std::sync::atomic::AtomicBool; use std::sync::atomic::{AtomicUsize, Ordering}; use std::sync::{Arc, Mutex, Weak}; +pub const DEFAULT_MINIMUM_OBJECT_COUNT: u32 = 1000; +pub const DEFAULT_SHRINK_RATIO: f64 = 0.80; +pub const DEFAULT_MAX_ABOVE_SHRINK_RATIO_COUNT: u32 = 10; +pub const DEFAULT_CHECK_SHRINK_INTERVAL_MS: u32 = 10000; + #[derive(Debug, Default)] struct RecyclerStats { total: AtomicUsize, @@ -12,37 +18,74 @@ struct RecyclerStats { } #[derive(Clone, Default)] -pub struct Recycler { +pub struct Recycler { recycler: Arc>, } #[derive(Debug)] -pub struct RecyclerX { - gc: Mutex>, +pub struct ObjectPool { + object_pool: Vec, + shrink_ratio: f64, + minimum_object_count: u32, + above_shrink_ratio_count: u32, + max_above_shrink_ratio_count: u32, + check_shrink_interval_ms: u32, + last_shrink_check_ts: u64, + pub total_allocated_count: u32, + limit: Option, +} +impl Default for ObjectPool { + fn default() -> Self { + ObjectPool { + object_pool: vec![], + shrink_ratio: DEFAULT_SHRINK_RATIO, + minimum_object_count: DEFAULT_MINIMUM_OBJECT_COUNT, + above_shrink_ratio_count: 0, + max_above_shrink_ratio_count: DEFAULT_MAX_ABOVE_SHRINK_RATIO_COUNT, + check_shrink_interval_ms: DEFAULT_CHECK_SHRINK_INTERVAL_MS, + last_shrink_check_ts: timestamp(), + total_allocated_count: 0, + limit: None, + } + } +} + +impl ObjectPool { + fn new(limit: Option) -> Self { + Self { + limit, + ..Self::default() + } + } + + fn len(&self) -> usize { + self.object_pool.len() + } +} + +#[derive(Debug)] +pub struct RecyclerX { + gc: Mutex>, stats: RecyclerStats, id: usize, - outstanding_len: AtomicUsize, - limit: Option, } -impl Default for RecyclerX { +impl Default for RecyclerX { fn default() -> RecyclerX { let id = thread_rng().gen_range(0, 1000); trace!("new recycler..{}", id); RecyclerX { - gc: Mutex::new(vec![]), + gc: Mutex::new(ObjectPool::default()), stats: RecyclerStats::default(), id, - limit: None, - outstanding_len: AtomicUsize::default(), } } } -impl RecyclerX { - fn new(limit: Option) -> Self { +impl RecyclerX { + fn new(limit: Option) -> Self { RecyclerX { - limit, + gc: Mutex::new(ObjectPool::new(limit)), ..Self::default() } } @@ -54,6 +97,9 @@ pub trait Reset { fn set_recycler(&mut self, recycler: Weak>) where Self: std::marker::Sized; + fn unset_recycler(&mut self) + where + Self: std::marker::Sized; } lazy_static! { @@ -69,8 +115,8 @@ fn warm_recyclers() -> bool { } impl Recycler { - pub fn warmed(num: usize, size_hint: usize, limit: Option) -> Self { - assert!(num <= limit.unwrap_or(std::usize::MAX)); + pub fn warmed(num: u32, size_hint: usize, limit: Option) -> Self { + assert!(num <= limit.unwrap_or(std::u32::MAX)); let new = Self { recycler: Arc::new(RecyclerX::new(limit)), }; @@ -90,38 +136,95 @@ impl Recycler { } pub fn allocate(&self, name: &'static str) -> Option { - let new = self - .recycler - .gc - .lock() - .expect("recycler lock in pb fn allocate") - .pop(); - - if let Some(mut x) = new { - self.recycler.stats.reuse.fetch_add(1, Ordering::Relaxed); - x.reset(); - return Some(x); - } + let (mut allocated_object, did_reuse, should_allocate_new) = { + let mut object_pool = self + .recycler + .gc + .lock() + .expect("recycler lock in pb fn allocate"); - let total = self.recycler.stats.total.fetch_add(1, Ordering::Relaxed); - info!( - "RECYCLER: allocating new: total {} {:?} id: {} reuse: {} max_gc: {}", - total, - name, - self.recycler.id, - self.recycler.stats.reuse.load(Ordering::Relaxed), - self.recycler.stats.max_gc.load(Ordering::Relaxed), - ); + let now = timestamp(); + if now.saturating_sub(object_pool.last_shrink_check_ts) + > object_pool.check_shrink_interval_ms as u64 + { + object_pool.last_shrink_check_ts = now; + let shrink_threshold_count = (object_pool.shrink_ratio + * object_pool.total_allocated_count as f64) + .ceil() as u32; + + // If more than the shrink threshold of all allocated objects are sitting doing nothing, + // increment the `above_shrink_ratio_count`. + if object_pool.len() > object_pool.minimum_object_count as usize + && object_pool.len() > shrink_threshold_count as usize + { + object_pool.above_shrink_ratio_count += 1; + } else { + object_pool.above_shrink_ratio_count = 0; + } + + if object_pool.above_shrink_ratio_count as usize + >= object_pool.max_above_shrink_ratio_count as usize + { + // Do the shrink + let total_alocated_count_shrink_target = + std::cmp::min(object_pool.minimum_object_count, shrink_threshold_count); + let target_num_to_shrink = + object_pool.total_allocated_count - total_alocated_count_shrink_target; + for _ in 0..target_num_to_shrink { + if let Some(mut expired_object) = object_pool.object_pool.pop() { + expired_object.unset_recycler(); + // May not be able to shrink exactly `target_num_to_shrink` objects sinc + // in the case of new allocations, `total_allocated_count` is incremented + // before the object is allocated (see `should_allocate_new` logic below). + // This race allows a difference of up to the number of threads allocating + // with this recycler. + object_pool.total_allocated_count -= 1; + } + } - let mut t = T::default(); - let should_allocate = self - .recycler - .limit - .map(|limit| self.recycler.outstanding_len.load(Ordering::SeqCst) < limit) - .unwrap_or(true); - if should_allocate { + object_pool.above_shrink_ratio_count = 0; + datapoint_info!( + "recycler_shrink", + ( + "total_alocated_count_shrink_target", + total_alocated_count_shrink_target as i64, + i64 + ), + ("target_num_to_shrink", target_num_to_shrink as i64, i64), + ( + "total_allocated_count", + object_pool.total_allocated_count as i64, + i64 + ), + ); + } + } + + let reused_object = object_pool.object_pool.pop(); + if reused_object.is_some() { + (reused_object, true, false) + } else if let Some(limit) = object_pool.limit { + let should_allocate_new = object_pool.total_allocated_count < limit; + if should_allocate_new { + object_pool.total_allocated_count += 1; + } + (None, false, should_allocate_new) + } else { + (None, false, true) + } + }; + + if did_reuse { + if let Some(reused) = allocated_object.as_mut() { + self.recycler.stats.reuse.fetch_add(1, Ordering::Relaxed); + reused.reset(); + return allocated_object; + } + } + + if should_allocate_new { + let mut t = T::default(); t.set_recycler(Arc::downgrade(&self.recycler)); - self.recycler.outstanding_len.fetch_add(1, Ordering::SeqCst); Some(t) } else { None @@ -133,7 +236,7 @@ impl RecyclerX { pub fn recycle(&self, x: T) { let len = { let mut gc = self.gc.lock().expect("recycler lock in pub fn recycle"); - gc.push(x); + gc.object_pool.push(x); gc.len() }; @@ -150,20 +253,13 @@ impl RecyclerX { let total = self.stats.total.load(Ordering::Relaxed); let reuse = self.stats.reuse.load(Ordering::Relaxed); let freed = self.stats.total.fetch_add(1, Ordering::Relaxed); - if self.gc.lock().unwrap().len() % 1000 == 0 { - datapoint_info!( - "recycler", - ("gc_len", len as i64, i64), - ( - "outstanding_len", - self.outstanding_len.load(Ordering::Relaxed) as i64, - i64 - ), - ("total", total as i64, i64), - ("freed", freed as i64, i64), - ("reuse", reuse as i64, i64), - ); - } + datapoint_debug!( + "recycler", + ("gc_len", len as i64, i64), + ("total", total as i64, i64), + ("freed", freed as i64, i64), + ("reuse", reuse as i64, i64), + ); } } @@ -180,6 +276,7 @@ mod tests { } fn warm(&mut self, _size_hint: usize) {} fn set_recycler(&mut self, _recycler: Weak>) {} + fn unset_recycler(&mut self) {} } #[test]