-
Notifications
You must be signed in to change notification settings - Fork 4.4k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Introduce minimalistic ttl eviction for Recycler/PinnedVec #15139
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,23 @@ | ||
#![feature(test)] | ||
|
||
extern crate test; | ||
|
||
use solana_perf::recycler::Recycler; | ||
use test::Bencher; | ||
|
||
#[bench] | ||
fn bench_recycler(bencher: &mut Bencher) { | ||
solana_logger::setup(); | ||
|
||
let recycler = Recycler::default(); | ||
|
||
for _ in 0..1000 { | ||
let mut p = solana_perf::packet::Packets::with_capacity(128); | ||
p.packets.resize(128, solana_sdk::packet::Packet::default()); | ||
recycler.recycle_for_test(p.packets); | ||
} | ||
|
||
bencher.iter(move || { | ||
recycler.recycle_for_test(recycler.allocate("me")); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
In summary, sadly this increased twice of fast path of recycle->allocate iteration around 70ns to 140ns. But, I think this shouldn't affect the overall cluster perf. (I'm running gcp system-performance-test to be sure). In detail, simply changing to VecDeque increases around 10ns and calling Instance::now() twice increases around 30ns * 2 by recycle and allocate.
I think these numbers are reasonable. VecDec is basically Vec and it only incurs some small additional offset calculation so the order should be 10ns-20ns. As for Instance::now, I think this is decent value as well, considering it's basically vdso call, recalling my vague perf number for it. xD Also, I also think This tested on my laptop and I know there is jitter. If you're curious, I can run more rigorous experiment in more clean room enviroment. :) There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. ok not bad, thanks. |
||
}); | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,7 +1,9 @@ | ||
use log::*; | ||
use rand::{thread_rng, Rng}; | ||
use std::sync::atomic::AtomicBool; | ||
use std::sync::atomic::{AtomicUsize, Ordering}; | ||
use std::sync::{Arc, Mutex, Weak}; | ||
use std::time::Instant; | ||
use std::{collections::VecDeque, sync::atomic::AtomicBool}; | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. hehe, rustfmt became lazy. ;) |
||
|
||
#[derive(Debug, Default)] | ||
struct RecyclerStats { | ||
|
@@ -18,7 +20,7 @@ pub struct Recycler<T> { | |
|
||
#[derive(Debug)] | ||
pub struct RecyclerX<T> { | ||
gc: Mutex<Vec<T>>, | ||
gc: Mutex<VecDeque<(Instant, T)>>, | ||
stats: RecyclerStats, | ||
id: usize, | ||
} | ||
|
@@ -28,7 +30,7 @@ impl<T: Default> Default for RecyclerX<T> { | |
let id = thread_rng().gen_range(0, 1000); | ||
trace!("new recycler..{}", id); | ||
RecyclerX { | ||
gc: Mutex::new(vec![]), | ||
gc: Mutex::new(VecDeque::new()), | ||
stats: RecyclerStats::default(), | ||
id, | ||
} | ||
|
@@ -41,6 +43,9 @@ pub trait Reset { | |
fn set_recycler(&mut self, recycler: Weak<RecyclerX<Self>>) | ||
where | ||
Self: std::marker::Sized; | ||
fn unset_recycler(&mut self) | ||
where | ||
Self: std::marker::Sized; | ||
} | ||
|
||
lazy_static! { | ||
|
@@ -55,6 +60,9 @@ fn warm_recyclers() -> bool { | |
WARM_RECYCLERS.load(Ordering::Relaxed) | ||
} | ||
|
||
pub const MAX_INVENTORY_COUNT_WITHOUT_EXPIRATION: usize = 10; | ||
pub const EXPIRATION_TTL_SECONDS: u64 = 3600; | ||
|
||
impl<T: Default + Reset + Sized> Recycler<T> { | ||
pub fn warmed(num: usize, size_hint: usize) -> Self { | ||
let new = Self::default(); | ||
|
@@ -74,12 +82,26 @@ impl<T: Default + Reset + Sized> Recycler<T> { | |
} | ||
|
||
pub fn allocate(&self, name: &'static str) -> T { | ||
let new = self | ||
.recycler | ||
.gc | ||
.lock() | ||
.expect("recycler lock in pb fn allocate") | ||
.pop(); | ||
let new = { | ||
let mut gc = self.recycler.gc.lock().unwrap(); | ||
|
||
// Don't expire when inventory is rather small. At least, | ||
// we shouldn't expire (= drop) the last inventory; we can just | ||
// return it from here. Also add some buffer. Thus, we don't | ||
// expire if less than 10. | ||
if gc.len() > MAX_INVENTORY_COUNT_WITHOUT_EXPIRATION { | ||
if let Some((oldest_time, _old_item)) = gc.front() { | ||
if oldest_time.elapsed().as_secs() >= EXPIRATION_TTL_SECONDS { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. like this |
||
let (_, mut expired) = gc.pop_front().unwrap(); | ||
// unref-ing recycler here is crucial to prevent | ||
// the expired from being recycled again via Drop, | ||
// lading to dead lock! | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. my test validator was bitten by this. T_T rerunning it.... |
||
expired.unset_recycler(); | ||
} | ||
} | ||
} | ||
gc.pop_back().map(|(_added_time, item)| item) | ||
}; | ||
|
||
if let Some(mut x) = new { | ||
self.recycler.stats.reuse.fetch_add(1, Ordering::Relaxed); | ||
|
@@ -101,13 +123,17 @@ impl<T: Default + Reset + Sized> Recycler<T> { | |
t.set_recycler(Arc::downgrade(&self.recycler)); | ||
t | ||
} | ||
|
||
pub fn recycle_for_test(&self, x: T) { | ||
self.recycler.recycle(x); | ||
} | ||
} | ||
|
||
impl<T: Default + Reset> RecyclerX<T> { | ||
pub fn recycle(&self, x: T) { | ||
let len = { | ||
let mut gc = self.gc.lock().expect("recycler lock in pub fn recycle"); | ||
gc.push(x); | ||
gc.push_back((Instant::now(), x)); | ||
gc.len() | ||
}; | ||
|
||
|
@@ -137,13 +163,16 @@ impl<T: Default + Reset> RecyclerX<T> { | |
#[cfg(test)] | ||
mod tests { | ||
use super::*; | ||
use std::time::Duration; | ||
|
||
const RESET_VALUE: u64 = 10; | ||
impl Reset for u64 { | ||
fn reset(&mut self) { | ||
*self = 10; | ||
*self += RESET_VALUE; | ||
} | ||
fn warm(&mut self, _size_hint: usize) {} | ||
fn set_recycler(&mut self, _recycler: Weak<RecyclerX<Self>>) {} | ||
fn unset_recycler(&mut self) {} | ||
} | ||
|
||
#[test] | ||
|
@@ -153,10 +182,63 @@ mod tests { | |
assert_eq!(y, 0); | ||
y = 20; | ||
let recycler2 = recycler.clone(); | ||
recycler2.recycler.recycle(y); | ||
recycler2.recycle_for_test(y); | ||
assert_eq!(recycler.recycler.gc.lock().unwrap().len(), 1); | ||
let z = recycler.allocate("test_recycler2"); | ||
assert_eq!(z, 10); | ||
assert_eq!(z, 20 + RESET_VALUE); | ||
assert_eq!(recycler.recycler.gc.lock().unwrap().len(), 0); | ||
} | ||
|
||
#[test] | ||
fn test_recycler_ttl() { | ||
let recycler = Recycler::default(); | ||
recycler.recycle_for_test(42); | ||
recycler.recycle_for_test(43); | ||
assert_eq!(recycler.recycler.gc.lock().unwrap().len(), 2); | ||
|
||
// meddle to make the first element to expire | ||
recycler.recycler.gc.lock().unwrap().front_mut().unwrap().0 = | ||
Instant::now() - Duration::from_secs(EXPIRATION_TTL_SECONDS + 1); | ||
|
||
let y: u64 = recycler.allocate("test_recycler1"); | ||
assert_eq!(y, 43 + RESET_VALUE); | ||
assert_eq!(recycler.recycler.gc.lock().unwrap().len(), 1); | ||
|
||
// create enough inventory to trigger expiration | ||
for i in 44..44 + MAX_INVENTORY_COUNT_WITHOUT_EXPIRATION as u64 { | ||
recycler.recycle_for_test(i); | ||
} | ||
assert_eq!(recycler.recycler.gc.lock().unwrap().len(), 11); | ||
|
||
// allocate with expiration causes len to be reduced by 2 | ||
let y: u64 = recycler.allocate("test_recycler1"); | ||
assert_eq!(y, 53 + RESET_VALUE); | ||
assert_eq!(recycler.recycler.gc.lock().unwrap().len(), 9); | ||
|
||
// allocate without expiration causes len to be reduced by 2 | ||
let y: u64 = recycler.allocate("test_recycler1"); | ||
assert_eq!(y, 52 + RESET_VALUE); | ||
assert_eq!(recycler.recycler.gc.lock().unwrap().len(), 8); | ||
} | ||
|
||
#[test] | ||
fn test_recycler_no_deadlock() { | ||
solana_logger::setup(); | ||
|
||
let recycler = | ||
Recycler::<crate::cuda_runtime::PinnedVec<solana_sdk::packet::Packet>>::default(); | ||
|
||
// create bunch of packets and drop at once to force enough inventory | ||
let packets = (0..=MAX_INVENTORY_COUNT_WITHOUT_EXPIRATION) | ||
.into_iter() | ||
.map(|_| recycler.allocate("me")) | ||
.collect::<Vec<_>>(); | ||
drop(packets); | ||
|
||
recycler.recycler.gc.lock().unwrap().front_mut().unwrap().0 = | ||
Instant::now() - Duration::from_secs(7200); | ||
|
||
// this drops the first inventory item but shouldn't cause recycling of it! | ||
recycler.recycle_for_test(recycler.allocate("me")); | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this