diff --git a/Cargo.lock b/Cargo.lock index 6bcacf21da7946..b1e2ed9c987b32 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -4896,6 +4896,7 @@ dependencies = [ "serde", "solana-budget-program", "solana-logger 1.6.0", + "solana-measure", "solana-metrics", "solana-rayon-threadlimit", "solana-sdk", diff --git a/bench-streamer/src/main.rs b/bench-streamer/src/main.rs index 5763aa15e642ad..6a668a6aa77b7b 100644 --- a/bench-streamer/src/main.rs +++ b/bench-streamer/src/main.rs @@ -75,7 +75,7 @@ fn main() -> Result<()> { let mut read_channels = Vec::new(); let mut read_threads = Vec::new(); - let recycler = PacketsRecycler::default(); + let recycler = PacketsRecycler::new_without_limit("bench-streamer-recycler-shrink-stats"); for _ in 0..num_sockets { let read = solana_net_utils::bind_to(ip_addr, port, false).unwrap(); read.set_read_timeout(Some(Duration::new(1, 0))).unwrap(); diff --git a/core/src/cluster_info.rs b/core/src/cluster_info.rs index 6268d0bb876d53..74ab21875d4ce4 100644 --- a/core/src/cluster_info.rs +++ b/core/src/cluster_info.rs @@ -1853,7 +1853,7 @@ impl ClusterInfo { let mut last_contact_info_trace = timestamp(); let mut last_contact_info_save = timestamp(); let mut entrypoints_processed = false; - let recycler = PacketsRecycler::default(); + let recycler = PacketsRecycler::new_without_limit("gossip-recycler-shrink-stats"); let crds_data = vec![ CrdsData::Version(Version::new(self.id())), CrdsData::NodeInstance(self.instance.with_wallclock(timestamp())), @@ -2104,7 +2104,7 @@ impl ClusterInfo { .process_pull_requests(callers.cloned(), timestamp()); let output_size_limit = self.update_data_budget(stakes.len()) / PULL_RESPONSE_MIN_SERIALIZED_SIZE; - let mut packets = Packets::new_with_recycler(recycler.clone(), 64, "handle_pull_requests"); + let mut packets = Packets::new_with_recycler(recycler.clone(), 64).unwrap(); let (caller_and_filters, addrs): (Vec<_>, Vec<_>) = { let mut rng = rand::thread_rng(); let check_pull_request = @@ -2389,8 +2389,7 @@ impl ClusterInfo { if packets.is_empty() { None } else { - let packets = - Packets::new_with_recycler_data(recycler, "handle_ping_messages", packets); + let packets = Packets::new_with_recycler_data(recycler, packets).unwrap(); Some(packets) } } @@ -3019,7 +3018,8 @@ impl ClusterInfo { exit: &Arc, ) -> JoinHandle<()> { let exit = exit.clone(); - let recycler = PacketsRecycler::default(); + let recycler = + PacketsRecycler::new_without_limit("cluster-info-listen-recycler-shrink-stats"); Builder::new() .name("solana-listen".to_string()) .spawn(move || { @@ -3464,7 +3464,7 @@ mod tests { .iter() .map(|ping| Pong::new(ping, &this_node).unwrap()) .collect(); - let recycler = PacketsRecycler::default(); + let recycler = PacketsRecycler::new_without_limit(""); let packets = cluster_info .handle_ping_messages( remote_nodes diff --git a/core/src/fetch_stage.rs b/core/src/fetch_stage.rs index 0966a2b0916711..f2a660e61f69b5 100644 --- a/core/src/fetch_stage.rs +++ b/core/src/fetch_stage.rs @@ -29,7 +29,14 @@ impl FetchStage { ) -> (Self, PacketReceiver) { let (sender, receiver) = channel(); ( - Self::new_with_sender(sockets, tpu_forwards_sockets, exit, &sender, &poh_recorder), + Self::new_with_sender( + sockets, + tpu_forwards_sockets, + exit, + &sender, + &poh_recorder, + None, + ), receiver, ) } @@ -39,6 +46,7 @@ impl FetchStage { exit: &Arc, sender: &PacketSender, poh_recorder: &Arc>, + allocated_packet_limit: Option, ) -> Self { let tx_sockets = sockets.into_iter().map(Arc::new).collect(); let tpu_forwards_sockets = tpu_forwards_sockets.into_iter().map(Arc::new).collect(); @@ -48,6 +56,7 @@ impl FetchStage { exit, &sender, &poh_recorder, + allocated_packet_limit, ) } @@ -92,8 +101,10 @@ impl FetchStage { exit: &Arc, sender: &PacketSender, poh_recorder: &Arc>, + limit: Option, ) -> Self { - let recycler: PacketsRecycler = Recycler::warmed(1000, 1024); + let recycler: PacketsRecycler = + Recycler::warmed(1000, 1024, limit, "fetch_stage_recycler_shrink"); let tpu_threads = sockets.into_iter().map(|socket| { streamer::receiver( diff --git a/core/src/gossip_service.rs b/core/src/gossip_service.rs index e8b321cd59958f..a4e633b4967861 100644 --- a/core/src/gossip_service.rs +++ b/core/src/gossip_service.rs @@ -47,7 +47,7 @@ impl GossipService { gossip_socket.clone(), &exit, request_sender, - Recycler::default(), + Recycler::new_without_limit("gossip-receiver-recycler-shrink-stats"), "gossip_receiver", ); let (response_sender, response_receiver) = channel(); diff --git a/core/src/serve_repair.rs b/core/src/serve_repair.rs index 756077a8b5f30a..b93dc0650f10fd 100644 --- a/core/src/serve_repair.rs +++ b/core/src/serve_repair.rs @@ -279,7 +279,7 @@ impl ServeRepair { exit: &Arc, ) -> JoinHandle<()> { let exit = exit.clone(); - let recycler = PacketsRecycler::default(); + let recycler = PacketsRecycler::new_without_limit("serve-repair-recycler-shrink-stats"); Builder::new() .name("solana-repair-listen".to_string()) .spawn(move || { @@ -498,11 +498,7 @@ impl ServeRepair { if let Some(packet) = packet { inc_new_counter_debug!("serve_repair-window-request-ledger", 1); - return Some(Packets::new_with_recycler_data( - recycler, - "run_window_request", - vec![packet], - )); + return Some(Packets::new_with_recycler_data(recycler, vec![packet])).unwrap(); } } @@ -538,11 +534,7 @@ impl ServeRepair { from_addr, nonce, )?; - return Some(Packets::new_with_recycler_data( - recycler, - "run_highest_window_request", - vec![packet], - )); + return Packets::new_with_recycler_data(recycler, vec![packet]); } None } @@ -555,7 +547,7 @@ impl ServeRepair { max_responses: usize, nonce: Nonce, ) -> Option { - let mut res = Packets::new_with_recycler(recycler.clone(), 64, "run_orphan"); + let mut res = Packets::new_with_recycler(recycler.clone(), 64).unwrap(); if let Some(blockstore) = blockstore { // Try to find the next "n" parent slots of the input slot while let Ok(Some(meta)) = blockstore.meta(slot) { @@ -609,7 +601,7 @@ mod tests { /// test run_window_request responds with the right shred, and do not overrun fn run_highest_window_request(slot: Slot, num_slots: u64, nonce: Nonce) { - let recycler = PacketsRecycler::default(); + let recycler = PacketsRecycler::new_without_limit(""); solana_logger::setup(); let ledger_path = get_tmp_ledger_path!(); { @@ -677,7 +669,7 @@ mod tests { /// test window requests respond with the right shred, and do not overrun fn run_window_request(slot: Slot, nonce: Nonce) { - let recycler = PacketsRecycler::default(); + let recycler = PacketsRecycler::new_without_limit(""); solana_logger::setup(); let ledger_path = get_tmp_ledger_path!(); { @@ -845,7 +837,7 @@ mod tests { fn run_orphan(slot: Slot, num_slots: u64, nonce: Nonce) { solana_logger::setup(); - let recycler = PacketsRecycler::default(); + let recycler = PacketsRecycler::new_without_limit(""); let ledger_path = get_tmp_ledger_path!(); { let blockstore = Arc::new(Blockstore::open(&ledger_path).unwrap()); @@ -916,7 +908,7 @@ mod tests { #[test] fn run_orphan_corrupted_shred_size() { solana_logger::setup(); - let recycler = PacketsRecycler::default(); + let recycler = PacketsRecycler::new_without_limit(""); let ledger_path = get_tmp_ledger_path!(); { let blockstore = Arc::new(Blockstore::open(&ledger_path).unwrap()); diff --git a/core/src/serve_repair_service.rs b/core/src/serve_repair_service.rs index f252ab1e4ba593..785ed06b0f1860 100644 --- a/core/src/serve_repair_service.rs +++ b/core/src/serve_repair_service.rs @@ -30,7 +30,7 @@ impl ServeRepairService { serve_repair_socket.clone(), &exit, request_sender, - Recycler::default(), + Recycler::new_without_limit("serve-repair-receiver-recycler-shrink-stats"), "serve_repair_receiver", ); let (response_sender, response_receiver) = channel(); diff --git a/core/src/shred_fetch_stage.rs b/core/src/shred_fetch_stage.rs index 1c5f725bb18b84..01b1f177b34eab 100644 --- a/core/src/shred_fetch_stage.rs +++ b/core/src/shred_fetch_stage.rs @@ -167,8 +167,10 @@ impl ShredFetchStage { sender: &PacketSender, bank_forks: Option>>, exit: &Arc, + limit: Option, ) -> Self { - let recycler: PacketsRecycler = Recycler::warmed(100, 1024); + let recycler: PacketsRecycler = + Recycler::warmed(100, 1024, limit, "shred_fetch_stage_recycler_shrink"); let (mut tvu_threads, tvu_filter) = Self::packet_modifier( sockets, diff --git a/core/src/sigverify.rs b/core/src/sigverify.rs index 1c9b2bd39ef801..8228cbeeb92f0b 100644 --- a/core/src/sigverify.rs +++ b/core/src/sigverify.rs @@ -23,8 +23,8 @@ impl Default for TransactionSigVerifier { fn default() -> Self { init(); Self { - recycler: Recycler::warmed(50, 4096), - recycler_out: Recycler::warmed(50, 4096), + recycler: Recycler::warmed(50, 4096, None, ""), + recycler_out: Recycler::warmed(50, 4096, None, ""), } } } diff --git a/core/src/sigverify_shreds.rs b/core/src/sigverify_shreds.rs index c256af65579138..d7e44622c65798 100644 --- a/core/src/sigverify_shreds.rs +++ b/core/src/sigverify_shreds.rs @@ -26,7 +26,10 @@ impl ShredSigVerifier { Self { bank_forks, leader_schedule_cache, - recycler_cache: RecyclerCache::warmed(), + recycler_cache: RecyclerCache::warmed( + "shred-sig-verifier-offsets-recycler-shrink-stats", + "shred-sig-verifier-buffer-recycler-shrink-stats", + ), } } fn read_slots(batches: &[Packets]) -> HashSet { diff --git a/core/src/tpu.rs b/core/src/tpu.rs index c26a8e18b319ec..e71b9f8d9be89f 100644 --- a/core/src/tpu.rs +++ b/core/src/tpu.rs @@ -67,6 +67,9 @@ impl Tpu { &exit, &packet_sender, &poh_recorder, + // At 1024 packets per `Packet`, each packet about MTU size ~1k, this is roughly + // 20GB + Some(20_000), ); let (verified_sender, verified_receiver) = unbounded(); diff --git a/core/src/tvu.rs b/core/src/tvu.rs index e2f41a06d722a7..b21f7d73af911e 100644 --- a/core/src/tvu.rs +++ b/core/src/tvu.rs @@ -145,6 +145,7 @@ impl Tvu { &fetch_sender, Some(bank_forks.clone()), &exit, + None, ); let (verified_sender, verified_receiver) = unbounded(); diff --git a/ledger/benches/sigverify_shreds.rs b/ledger/benches/sigverify_shreds.rs index 116e1fd77ae7a4..0fd83dd0376a96 100644 --- a/ledger/benches/sigverify_shreds.rs +++ b/ledger/benches/sigverify_shreds.rs @@ -16,7 +16,7 @@ const NUM_PACKETS: usize = 256; const NUM_BATCHES: usize = 1; #[bench] fn bench_sigverify_shreds_sign_gpu(bencher: &mut Bencher) { - let recycler_cache = RecyclerCache::default(); + let recycler_cache = RecyclerCache::new("", ""); let mut packets = Packets::default(); packets.packets.set_pinnable(); diff --git a/ledger/src/entry.rs b/ledger/src/entry.rs index 1cde091107108e..a77d3eb44e42d7 100644 --- a/ledger/src/entry.rs +++ b/ledger/src/entry.rs @@ -225,12 +225,21 @@ pub struct EntryVerificationState { device_verification_data: DeviceVerificationData, } -#[derive(Default, Clone)] +#[derive(Clone)] pub struct VerifyRecyclers { hash_recycler: Recycler>, tick_count_recycler: Recycler>, } +impl Default for VerifyRecyclers { + fn default() -> Self { + Self { + hash_recycler: Recycler::new_without_limit("hash_recycler_shrink_stats"), + tick_count_recycler: Recycler::new_without_limit("tick_count_recycler_shrink_stats"), + } + } +} + #[derive(PartialEq, Clone, Copy, Debug)] pub enum EntryVerificationStatus { Failure, @@ -554,14 +563,12 @@ impl EntrySlice for [Entry] { .take(self.len()) .collect(); - let mut hashes_pinned = recyclers.hash_recycler.allocate("poh_verify_hash"); + let mut hashes_pinned = recyclers.hash_recycler.allocate().unwrap(); hashes_pinned.set_pinnable(); hashes_pinned.resize(hashes.len(), Hash::default()); hashes_pinned.copy_from_slice(&hashes); - let mut num_hashes_vec = recyclers - .tick_count_recycler - .allocate("poh_verify_num_hashes"); + let mut num_hashes_vec = recyclers.tick_count_recycler.allocate().unwrap(); num_hashes_vec.reserve_and_pin(cmp::max(1, self.len())); for entry in self { num_hashes_vec.push(entry.num_hashes.saturating_sub(1)); diff --git a/ledger/src/sigverify_shreds.rs b/ledger/src/sigverify_shreds.rs index effe496a2b9257..6601f07a36d142 100644 --- a/ledger/src/sigverify_shreds.rs +++ b/ledger/src/sigverify_shreds.rs @@ -137,7 +137,7 @@ fn slot_key_data_for_gpu< .push(*slot); } } - let mut keyvec = recycler_cache.buffer().allocate("shred_gpu_pubkeys"); + let mut keyvec = recycler_cache.buffer().allocate().unwrap(); keyvec.set_pinnable(); let mut slot_to_key_ix = HashMap::new(); @@ -152,7 +152,7 @@ fn slot_key_data_for_gpu< slot_to_key_ix.insert(s, i); } } - let mut offsets = recycler_cache.offsets().allocate("shred_offsets"); + let mut offsets = recycler_cache.offsets().allocate().unwrap(); offsets.set_pinnable(); slots.iter().for_each(|packet_slots| { packet_slots.iter().for_each(|slot| { @@ -185,11 +185,11 @@ fn shred_gpu_offsets( batches: &[Packets], recycler_cache: &RecyclerCache, ) -> (TxOffset, TxOffset, TxOffset, Vec>) { - let mut signature_offsets = recycler_cache.offsets().allocate("shred_signatures"); + let mut signature_offsets = recycler_cache.offsets().allocate().unwrap(); signature_offsets.set_pinnable(); - let mut msg_start_offsets = recycler_cache.offsets().allocate("shred_msg_starts"); + let mut msg_start_offsets = recycler_cache.offsets().allocate().unwrap(); msg_start_offsets.set_pinnable(); - let mut msg_sizes = recycler_cache.offsets().allocate("shred_msg_sizes"); + let mut msg_sizes = recycler_cache.offsets().allocate().unwrap(); msg_sizes.set_pinnable(); let mut v_sig_lens = vec![]; for batch in batches.iter() { @@ -242,7 +242,7 @@ pub fn verify_shreds_gpu( trace!("pubkeys_len: {}", pubkeys_len); let (signature_offsets, msg_start_offsets, msg_sizes, v_sig_lens) = shred_gpu_offsets(pubkeys_len, batches, recycler_cache); - let mut out = recycler_cache.buffer().allocate("out_buffer"); + let mut out = recycler_cache.buffer().allocate().unwrap(); out.set_pinnable(); elems.push( perf_libs::Elems { @@ -332,7 +332,7 @@ pub fn sign_shreds_cpu(keypair: &Keypair, batches: &mut [Packets]) { } pub fn sign_shreds_gpu_pinned_keypair(keypair: &Keypair, cache: &RecyclerCache) -> PinnedVec { - let mut vec = cache.buffer().allocate("pinned_keypair"); + let mut vec = cache.buffer().allocate().unwrap(); let pubkey = keypair.pubkey().to_bytes(); let secret = keypair.secret().to_bytes(); let mut hasher = Sha512::default(); @@ -370,17 +370,17 @@ pub fn sign_shreds_gpu( let mut num_packets = num_keypair_packets; //should be zero - let mut pubkey_offsets = recycler_cache.offsets().allocate("pubkey offsets"); + let mut pubkey_offsets = recycler_cache.offsets().allocate().unwrap(); pubkey_offsets.resize(count, 0); - let mut secret_offsets = recycler_cache.offsets().allocate("secret_offsets"); + let mut secret_offsets = recycler_cache.offsets().allocate().unwrap(); secret_offsets.resize(count, pubkey_size as u32); trace!("offset: {}", offset); let (signature_offsets, msg_start_offsets, msg_sizes, _v_sig_lens) = shred_gpu_offsets(offset, batches, recycler_cache); let total_sigs = signature_offsets.len(); - let mut signatures_out = recycler_cache.buffer().allocate("ed25519 signatures"); + let mut signatures_out = recycler_cache.buffer().allocate().unwrap(); signatures_out.set_pinnable(); signatures_out.resize(total_sigs * sig_size, 0); elems.push( @@ -560,7 +560,7 @@ pub mod tests { fn run_test_sigverify_shreds_gpu(slot: Slot) { solana_logger::setup(); - let recycler_cache = RecyclerCache::default(); + let recycler_cache = RecyclerCache::new("", ""); let mut batch = [Packets::default()]; let mut shred = Shred::new_from_data( @@ -624,7 +624,7 @@ pub mod tests { fn run_test_sigverify_shreds_sign_gpu(slot: Slot) { solana_logger::setup(); - let recycler_cache = RecyclerCache::default(); + let recycler_cache = RecyclerCache::new("", ""); let mut packets = Packets::default(); let num_packets = 32; diff --git a/perf/Cargo.toml b/perf/Cargo.toml index 5a727a8db59897..df4fe7adbf0d63 100644 --- a/perf/Cargo.toml +++ b/perf/Cargo.toml @@ -21,6 +21,7 @@ solana-sdk = { path = "../sdk", version = "1.6.0" } solana-rayon-threadlimit = { path = "../rayon-threadlimit", version = "1.6.0" } solana-budget-program = { path = "../programs/budget", version = "1.6.0" } solana-logger = { path = "../logger", version = "1.6.0" } +solana-measure = { path = "../measure", version = "1.6.0" } solana-metrics = { path = "../metrics", version = "1.6.0" } curve25519-dalek = { version = "2" } diff --git a/perf/benches/recycler.rs b/perf/benches/recycler.rs new file mode 100644 index 00000000000000..a2a9788a072114 --- /dev/null +++ b/perf/benches/recycler.rs @@ -0,0 +1,22 @@ +#![feature(test)] + +extern crate test; + +use solana_perf::{packet::PacketsRecycler, recycler::Recycler}; + +use test::Bencher; + +#[bench] +fn bench_recycler(bencher: &mut Bencher) { + solana_logger::setup(); + + let recycler: PacketsRecycler = Recycler::new_without_limit("me"); + + for _ in 0..1000 { + recycler.recycle_for_test(recycler.allocate().expect("There is no limit")); + } + + bencher.iter(move || { + recycler.recycle_for_test(recycler.allocate().expect("There is no limit")); + }); +} diff --git a/perf/benches/sigverify.rs b/perf/benches/sigverify.rs index f1f41ced9702ae..2611272ef93214 100644 --- a/perf/benches/sigverify.rs +++ b/perf/benches/sigverify.rs @@ -15,8 +15,8 @@ fn bench_sigverify(bencher: &mut Bencher) { // generate packet vector let batches = to_packets_chunked(&std::iter::repeat(tx).take(128).collect::>(), 128); - let recycler = Recycler::default(); - let recycler_out = Recycler::default(); + let recycler = Recycler::new_without_limit(""); + let recycler_out = Recycler::new_without_limit(""); // verify packets bencher.iter(|| { let _ans = sigverify::ed25519_verify(&batches, &recycler, &recycler_out); @@ -30,7 +30,7 @@ fn bench_get_offsets(bencher: &mut Bencher) { // generate packet vector let batches = to_packets_chunked(&std::iter::repeat(tx).take(1024).collect::>(), 1024); - let recycler = Recycler::default(); + let recycler = Recycler::new_without_limit(""); // verify packets bencher.iter(|| { let _ans = sigverify::generate_offsets(&batches, &recycler); diff --git a/perf/src/cuda_runtime.rs b/perf/src/cuda_runtime.rs index dc9b84a0395ca2..a8f9b01acf6b77 100644 --- a/perf/src/cuda_runtime.rs +++ b/perf/src/cuda_runtime.rs @@ -76,6 +76,9 @@ impl Reset for PinnedVec { fn set_recycler(&mut self, recycler: Weak>) { self.recycler = Some(recycler); } + fn unset_recycler(&mut self) { + self.recycler = None; + } } impl Default for PinnedVec { diff --git a/perf/src/packet.rs b/perf/src/packet.rs index b29a8201bba2ce..292c3b7ce14dae 100644 --- a/perf/src/packet.rs +++ b/perf/src/packet.rs @@ -29,19 +29,21 @@ impl Packets { Packets { packets } } - pub fn new_with_recycler(recycler: PacketsRecycler, size: usize, name: &'static str) -> Self { - let mut packets = recycler.allocate(name); - packets.reserve_and_pin(size); - Packets { packets } + pub fn new_with_recycler(recycler: PacketsRecycler, size: usize) -> Option { + let maybe_packets = recycler.allocate(); + maybe_packets.map(|mut packets| { + packets.reserve_and_pin(size); + Packets { packets } + }) } pub fn new_with_recycler_data( recycler: &PacketsRecycler, - name: &'static str, mut packets: Vec, - ) -> Self { - let mut vec = Self::new_with_recycler(recycler.clone(), packets.len(), name); - vec.packets.append(&mut packets); - vec + ) -> Option { + Self::new_with_recycler(recycler.clone(), packets.len()).map(|mut vec| { + vec.packets.append(&mut packets); + vec + }) } pub fn set_addr(&mut self, addr: &SocketAddr) { @@ -77,11 +79,7 @@ pub fn to_packets_with_destination( recycler: PacketsRecycler, dests_and_data: &[(SocketAddr, T)], ) -> Packets { - let mut out = Packets::new_with_recycler( - recycler, - dests_and_data.len(), - "to_packets_with_destination", - ); + let mut out = Packets::new_with_recycler(recycler, dests_and_data.len()).unwrap(); out.packets.resize(dests_and_data.len(), Packet::default()); for (dest_and_data, o) in dests_and_data.iter().zip(out.packets.iter_mut()) { if !dest_and_data.0.ip().is_unspecified() && dest_and_data.0.port() != 0 { @@ -139,9 +137,9 @@ mod tests { #[test] fn test_to_packets_pinning() { - let recycler = PacketsRecycler::default(); + let recycler = PacketsRecycler::new_without_limit(""); for i in 0..2 { - let _first_packets = Packets::new_with_recycler(recycler.clone(), i + 1, "first one"); + let _first_packets = Packets::new_with_recycler(recycler.clone(), i + 1); } } } diff --git a/perf/src/recycler.rs b/perf/src/recycler.rs index 1d94d8e4e216e9..728fb29162aff5 100644 --- a/perf/src/recycler.rs +++ b/perf/src/recycler.rs @@ -1,7 +1,23 @@ use rand::{thread_rng, Rng}; -use std::sync::atomic::AtomicBool; -use std::sync::atomic::{AtomicUsize, Ordering}; -use std::sync::{Arc, Mutex, Weak}; +use solana_measure::measure::Measure; +use std::{ + sync::{ + atomic::{AtomicBool, AtomicUsize, Ordering}, + Arc, Mutex, Weak, + }, + time::Instant, +}; + +pub const DEFAULT_MINIMUM_OBJECT_COUNT: u32 = 1000; +pub const DEFAULT_SHRINK_PCT: u32 = 80; +pub const DEFAULT_MAX_ABOVE_SHRINK_PCT_COUNT: u32 = 10; +pub const DEFAULT_CHECK_SHRINK_INTERVAL_MS: u32 = 10000; + +enum AllocationDecision { + Reuse(T), + Allocate(u32, usize), + AllocationLimitReached, +} #[derive(Debug, Default)] struct RecyclerStats { @@ -11,36 +27,219 @@ struct RecyclerStats { max_gc: AtomicUsize, } -#[derive(Clone, Default)] -pub struct Recycler { +#[derive(Debug, Default)] +struct RecyclerShrinkStats { + resulting_size: u32, + target_size: u32, + ideal_num_to_remove: u32, + shrink_elapsed: u64, + drop_elapsed: u64, +} + +impl RecyclerShrinkStats { + fn report(&self, shrink_metric_name: &'static str) { + datapoint_info!( + shrink_metric_name, + ("target_size", self.target_size as i64, i64), + ("resulting_size", self.resulting_size as i64, i64), + ("ideal_num_to_remove", self.ideal_num_to_remove as i64, i64), + ("recycler_shrink_elapsed", self.shrink_elapsed as i64, i64), + ("drop_elapsed", self.drop_elapsed as i64, i64) + ); + } +} + +#[derive(Clone)] +pub struct Recycler { recycler: Arc>, + shrink_metric_name: &'static str, +} + +impl Recycler { + pub fn new_without_limit(shrink_metric_name: &'static str) -> Self { + Self { + recycler: Arc::new(RecyclerX::default()), + shrink_metric_name, + } + } + + pub fn new_with_limit(shrink_metric_name: &'static str, limit: u32) -> Self { + Self { + recycler: Arc::new(RecyclerX::new(Some(limit))), + shrink_metric_name, + } + } +} + +#[derive(Debug)] +pub struct ObjectPool { + objects: Vec, + shrink_pct: u32, + minimum_object_count: u32, + above_shrink_pct_count: u32, + max_above_shrink_pct_count: u32, + check_shrink_interval_ms: u32, + last_shrink_check_time: Instant, + pub total_allocated_count: u32, + limit: Option, +} +impl Default for ObjectPool { + fn default() -> Self { + ObjectPool { + objects: vec![], + shrink_pct: DEFAULT_SHRINK_PCT, + minimum_object_count: DEFAULT_MINIMUM_OBJECT_COUNT, + above_shrink_pct_count: 0, + max_above_shrink_pct_count: DEFAULT_MAX_ABOVE_SHRINK_PCT_COUNT, + check_shrink_interval_ms: DEFAULT_CHECK_SHRINK_INTERVAL_MS, + last_shrink_check_time: Instant::now(), + total_allocated_count: 0, + limit: None, + } + } +} + +impl ObjectPool { + fn new(limit: Option) -> Self { + Self { + limit, + ..Self::default() + } + } + + fn len(&self) -> usize { + self.objects.len() + } + + fn get_shrink_target(shrink_pct: u32, current_size: u32) -> u32 { + ((shrink_pct * current_size) + 99) / 100 + } + + fn shrink_if_necessary( + &mut self, + recycler_name: &'static str, + ) -> Option<(RecyclerShrinkStats, Vec)> { + let is_consistent = self.total_allocated_count as usize >= self.len(); + assert!( + is_consistent, + "Object pool inconsistent: {} {} {}", + self.total_allocated_count, + self.len(), + recycler_name + ); + if self.last_shrink_check_time.elapsed().as_millis() > self.check_shrink_interval_ms as u128 + { + self.last_shrink_check_time = Instant::now(); + let shrink_threshold_count = + Self::get_shrink_target(self.shrink_pct, self.total_allocated_count); + + // If more than the shrink threshold of all allocated objects are sitting doing nothing, + // increment the `above_shrink_pct_count`. + if self.len() > self.minimum_object_count as usize + && self.len() > shrink_threshold_count as usize + { + self.above_shrink_pct_count += 1; + } else { + self.above_shrink_pct_count = 0; + } + + if self.above_shrink_pct_count as usize >= self.max_above_shrink_pct_count as usize { + let mut recycler_shrink_elapsed = Measure::start("recycler_shrink"); + // Do the shrink + let target_size = std::cmp::max(self.minimum_object_count, shrink_threshold_count); + let ideal_num_to_remove = self.total_allocated_count - target_size; + let mut shrink_removed_objects = Vec::with_capacity(ideal_num_to_remove as usize); + for _ in 0..ideal_num_to_remove { + if let Some(mut expired_object) = self.objects.pop() { + expired_object.unset_recycler(); + // Drop these outside of the lock because the Drop() implmentation for + // certain objects like PinnedVec's can be expensive + shrink_removed_objects.push(expired_object); + // May not be able to shrink exactly `ideal_num_to_remove` objects since + // in the case of new allocations, `total_allocated_count` is incremented + // before the object is allocated (see `should_allocate_new` logic below). + // This race allows a difference of up to the number of threads allocating + // with this recycler. + self.total_allocated_count -= 1; + } else { + break; + } + } + recycler_shrink_elapsed.stop(); + self.above_shrink_pct_count = 0; + Some(( + RecyclerShrinkStats { + resulting_size: self.total_allocated_count, + target_size, + ideal_num_to_remove, + shrink_elapsed: recycler_shrink_elapsed.as_us(), + // Filled in later + drop_elapsed: 0, + }, + shrink_removed_objects, + )) + } else { + None + } + } else { + None + } + } + + fn make_allocation_decision(&mut self) -> AllocationDecision { + if let Some(reused_object) = self.objects.pop() { + AllocationDecision::Reuse(reused_object) + } else if let Some(limit) = self.limit { + if self.total_allocated_count < limit { + self.total_allocated_count += 1; + AllocationDecision::Allocate(self.total_allocated_count, self.len()) + } else { + AllocationDecision::AllocationLimitReached + } + } else { + self.total_allocated_count += 1; + AllocationDecision::Allocate(self.total_allocated_count, self.len()) + } + } } #[derive(Debug)] -pub struct RecyclerX { - gc: Mutex>, +pub struct RecyclerX { + gc: Mutex>, stats: RecyclerStats, id: usize, } -impl Default for RecyclerX { +impl Default for RecyclerX { fn default() -> RecyclerX { let id = thread_rng().gen_range(0, 1000); trace!("new recycler..{}", id); RecyclerX { - gc: Mutex::new(vec![]), + gc: Mutex::new(ObjectPool::default()), stats: RecyclerStats::default(), id, } } } +impl RecyclerX { + fn new(limit: Option) -> Self { + RecyclerX { + gc: Mutex::new(ObjectPool::new(limit)), + ..Self::default() + } + } +} + pub trait Reset { fn reset(&mut self); fn warm(&mut self, size_hint: usize); fn set_recycler(&mut self, recycler: Weak>) where Self: std::marker::Sized; + fn unset_recycler(&mut self) + where + Self: std::marker::Sized; } lazy_static! { @@ -56,12 +255,21 @@ fn warm_recyclers() -> bool { } impl Recycler { - pub fn warmed(num: usize, size_hint: usize) -> Self { - let new = Self::default(); + pub fn warmed( + num: u32, + size_hint: usize, + limit: Option, + shrink_metric_name: &'static str, + ) -> Self { + assert!(num <= limit.unwrap_or(std::u32::MAX)); + let new = Self { + recycler: Arc::new(RecyclerX::new(limit)), + shrink_metric_name, + }; if warm_recyclers() { let warmed_items: Vec<_> = (0..num) .map(|_| { - let mut item = new.allocate("warming"); + let mut item = new.allocate().unwrap(); item.warm(size_hint); item }) @@ -73,33 +281,55 @@ impl Recycler { new } - pub fn allocate(&self, name: &'static str) -> T { - let new = self - .recycler - .gc - .lock() - .expect("recycler lock in pb fn allocate") - .pop(); - - if let Some(mut x) = new { - self.recycler.stats.reuse.fetch_add(1, Ordering::Relaxed); - x.reset(); - return x; - } - - let total = self.recycler.stats.total.fetch_add(1, Ordering::Relaxed); - trace!( - "allocating new: total {} {:?} id: {} reuse: {} max_gc: {}", - total, - name, - self.recycler.id, - self.recycler.stats.reuse.load(Ordering::Relaxed), - self.recycler.stats.max_gc.load(Ordering::Relaxed), - ); + pub fn allocate(&self) -> Option { + let (allocation_decision, shrink_output) = { + let mut object_pool = self + .recycler + .gc + .lock() + .expect("recycler lock in pb fn allocate"); - let mut t = T::default(); - t.set_recycler(Arc::downgrade(&self.recycler)); - t + let shrink_output = object_pool.shrink_if_necessary(self.shrink_metric_name); + + // Grab the allocation decision and shrinking stats, do the expensive + // allocations/deallocations outside of the lock. + (object_pool.make_allocation_decision(), shrink_output) + }; + + if let Some((mut shrink_stats, shrink_removed_objects)) = shrink_output { + let mut shrink_removed_object_elapsed = Measure::start("shrink_removed_object_elapsed"); + drop(shrink_removed_objects); + shrink_removed_object_elapsed.stop(); + shrink_stats.drop_elapsed = shrink_removed_object_elapsed.as_us(); + shrink_stats.report(self.shrink_metric_name); + } + + match allocation_decision { + AllocationDecision::Reuse(mut reused_object) => { + self.recycler.stats.reuse.fetch_add(1, Ordering::Relaxed); + reused_object.reset(); + Some(reused_object) + } + AllocationDecision::Allocate(total_allocated_count, recycled_len) => { + let mut t = T::default(); + t.set_recycler(Arc::downgrade(&self.recycler)); + if total_allocated_count % 1000 == 0 { + datapoint_info!( + "recycler_total_allocated_count", + ("name", self.shrink_metric_name, String), + ("count", total_allocated_count as i64, i64), + ("recycled_len", recycled_len as i64, i64), + ) + } + Some(t) + } + + AllocationDecision::AllocationLimitReached => None, + } + } + + pub fn recycle_for_test(&self, x: T) { + self.recycler.recycle(x); } } @@ -107,7 +337,7 @@ impl RecyclerX { pub fn recycle(&self, x: T) { let len = { let mut gc = self.gc.lock().expect("recycler lock in pub fn recycle"); - gc.push(x); + gc.objects.push(x); gc.len() }; @@ -137,6 +367,8 @@ impl RecyclerX { #[cfg(test)] mod tests { use super::*; + use crate::packet::PacketsRecycler; + use std::{thread::sleep, time::Duration}; impl Reset for u64 { fn reset(&mut self) { @@ -144,19 +376,115 @@ mod tests { } fn warm(&mut self, _size_hint: usize) {} fn set_recycler(&mut self, _recycler: Weak>) {} + fn unset_recycler(&mut self) {} } #[test] fn test_recycler() { - let recycler = Recycler::default(); - let mut y: u64 = recycler.allocate("test_recycler1"); + let recycler = Recycler::new_without_limit(""); + let mut y: u64 = recycler.allocate().unwrap(); assert_eq!(y, 0); y = 20; let recycler2 = recycler.clone(); recycler2.recycler.recycle(y); assert_eq!(recycler.recycler.gc.lock().unwrap().len(), 1); - let z = recycler.allocate("test_recycler2"); + let z = recycler.allocate().unwrap(); assert_eq!(z, 10); assert_eq!(recycler.recycler.gc.lock().unwrap().len(), 0); } + + #[test] + fn test_recycler_limit() { + let limit = 10; + assert!(limit <= DEFAULT_MINIMUM_OBJECT_COUNT); + // Use PacketRecycler so that dropping the allocated object + // actually recycles + let recycler = PacketsRecycler::new_with_limit("", limit); + let mut allocated_items = vec![]; + for i in 0..limit * 2 { + let x = recycler.allocate(); + if i < limit { + allocated_items.push(x.unwrap()); + } else { + assert!(x.is_none()); + } + } + assert_eq!( + recycler.recycler.gc.lock().unwrap().total_allocated_count, + limit + ); + assert_eq!(recycler.recycler.gc.lock().unwrap().len(), 0_usize); + drop(allocated_items); + assert_eq!( + recycler.recycler.gc.lock().unwrap().total_allocated_count, + limit + ); + assert_eq!(recycler.recycler.gc.lock().unwrap().len(), limit as usize); + } + + #[test] + fn test_recycler_shrink() { + let limit = DEFAULT_MINIMUM_OBJECT_COUNT * 2; + let max_above_shrink_pct_count = 2; + let shrink_pct = 80; + let recycler = PacketsRecycler::new_with_limit("", limit); + { + let mut locked_recycler = recycler.recycler.gc.lock().unwrap(); + // Make the shrink interval a long time so shrinking doesn't happen yet + locked_recycler.check_shrink_interval_ms = std::u32::MAX; + // Set the count to one so that we shrink on every other allocation later. + locked_recycler.max_above_shrink_pct_count = max_above_shrink_pct_count; + locked_recycler.shrink_pct = shrink_pct; + } + + let mut allocated_items = vec![]; + for _ in 0..limit { + allocated_items.push(recycler.allocate().unwrap()); + } + assert_eq!( + recycler.recycler.gc.lock().unwrap().total_allocated_count, + limit + ); + assert_eq!(recycler.recycler.gc.lock().unwrap().len(), 0); + drop(allocated_items); + assert_eq!(recycler.recycler.gc.lock().unwrap().len(), limit as usize); + + let shrink_interval = 10; + { + let mut locked_recycler = recycler.recycler.gc.lock().unwrap(); + locked_recycler.check_shrink_interval_ms = shrink_interval; + } + + let mut current_total_allocated_count = + recycler.recycler.gc.lock().unwrap().total_allocated_count; + + // Shrink the recycler until it hits the minimum + let mut i = 0; + while current_total_allocated_count != DEFAULT_MINIMUM_OBJECT_COUNT { + sleep(Duration::from_millis(shrink_interval as u64 * 2)); + recycler.allocate().unwrap(); + let expected_above_shrink_pct_count = (i + 1) % max_above_shrink_pct_count; + assert_eq!( + recycler.recycler.gc.lock().unwrap().above_shrink_pct_count, + (i + 1) % max_above_shrink_pct_count + ); + if expected_above_shrink_pct_count == 0 { + // Shrink happened, update the expected `current_total_allocated_count`; + current_total_allocated_count = std::cmp::max( + ObjectPool::::get_shrink_target(shrink_pct, current_total_allocated_count), + DEFAULT_MINIMUM_OBJECT_COUNT, + ); + assert_eq!( + recycler.recycler.gc.lock().unwrap().total_allocated_count, + current_total_allocated_count + ); + assert_eq!( + recycler.recycler.gc.lock().unwrap().len(), + current_total_allocated_count as usize + ); + } + + i += 1; + } + } } diff --git a/perf/src/recycler_cache.rs b/perf/src/recycler_cache.rs index 5dcf77723090f3..b46875bd0fc6ec 100644 --- a/perf/src/recycler_cache.rs +++ b/perf/src/recycler_cache.rs @@ -2,17 +2,24 @@ use crate::cuda_runtime::PinnedVec; use crate::recycler::Recycler; use crate::sigverify::TxOffset; -#[derive(Default, Clone)] +#[derive(Clone)] pub struct RecyclerCache { recycler_offsets: Recycler, recycler_buffer: Recycler>, } impl RecyclerCache { - pub fn warmed() -> Self { + pub fn new(offsets_shrink_name: &'static str, buffer_shrink_name: &'static str) -> Self { Self { - recycler_offsets: Recycler::warmed(50, 4096), - recycler_buffer: Recycler::warmed(50, 4096), + recycler_offsets: Recycler::new_without_limit(offsets_shrink_name), + recycler_buffer: Recycler::new_without_limit(buffer_shrink_name), + } + } + + pub fn warmed(offsets_shrink_name: &'static str, buffer_shrink_name: &'static str) -> Self { + Self { + recycler_offsets: Recycler::warmed(50, 4096, None, offsets_shrink_name), + recycler_buffer: Recycler::warmed(50, 4096, None, buffer_shrink_name), } } pub fn offsets(&self) -> &Recycler { diff --git a/perf/src/sigverify.rs b/perf/src/sigverify.rs index 67f32f15c8a821..861d0162f08417 100644 --- a/perf/src/sigverify.rs +++ b/perf/src/sigverify.rs @@ -194,13 +194,13 @@ fn get_packet_offsets(packet: &Packet, current_offset: u32) -> PacketOffsets { pub fn generate_offsets(batches: &[Packets], recycler: &Recycler) -> TxOffsets { debug!("allocating.."); - let mut signature_offsets: PinnedVec<_> = recycler.allocate("sig_offsets"); + let mut signature_offsets: PinnedVec<_> = recycler.allocate().unwrap(); signature_offsets.set_pinnable(); - let mut pubkey_offsets: PinnedVec<_> = recycler.allocate("pubkey_offsets"); + let mut pubkey_offsets: PinnedVec<_> = recycler.allocate().unwrap(); pubkey_offsets.set_pinnable(); - let mut msg_start_offsets: PinnedVec<_> = recycler.allocate("msg_start_offsets"); + let mut msg_start_offsets: PinnedVec<_> = recycler.allocate().unwrap(); msg_start_offsets.set_pinnable(); - let mut msg_sizes: PinnedVec<_> = recycler.allocate("msg_size_offsets"); + let mut msg_sizes: PinnedVec<_> = recycler.allocate().unwrap(); msg_sizes.set_pinnable(); let mut current_packet = 0; let mut v_sig_lens = Vec::new(); @@ -347,7 +347,7 @@ pub fn ed25519_verify( debug!("CUDA ECDSA for {}", batch_size(batches)); debug!("allocating out.."); - let mut out = recycler_out.allocate("out_buffer"); + let mut out = recycler_out.allocate().unwrap(); out.set_pinnable(); let mut elems = Vec::new(); let mut rvs = Vec::new(); @@ -678,8 +678,8 @@ mod tests { let batches = generate_packet_vec(&packet, n, 2); - let recycler = Recycler::default(); - let recycler_out = Recycler::default(); + let recycler = Recycler::new_without_limit(""); + let recycler_out = Recycler::new_without_limit(""); // verify packets let ans = sigverify::ed25519_verify(&batches, &recycler, &recycler_out); @@ -697,8 +697,8 @@ mod tests { let batches = generate_packet_vec(&packet, 1, 1); - let recycler = Recycler::default(); - let recycler_out = Recycler::default(); + let recycler = Recycler::new_without_limit(""); + let recycler_out = Recycler::new_without_limit(""); // verify packets let ans = sigverify::ed25519_verify(&batches, &recycler, &recycler_out); @@ -735,8 +735,8 @@ mod tests { batches[0].packets.push(packet); - let recycler = Recycler::default(); - let recycler_out = Recycler::default(); + let recycler = Recycler::new_without_limit(""); + let recycler_out = Recycler::new_without_limit(""); // verify packets let ans = sigverify::ed25519_verify(&batches, &recycler, &recycler_out); @@ -755,8 +755,8 @@ mod tests { let tx = test_multisig_tx(); let packet = sigverify::make_packet_from_transaction(tx); - let recycler = Recycler::default(); - let recycler_out = Recycler::default(); + let recycler = Recycler::new_without_limit(""); + let recycler_out = Recycler::new_without_limit(""); for _ in 0..50 { let n = thread_rng().gen_range(1, 30); let num_batches = thread_rng().gen_range(2, 30); diff --git a/streamer/src/streamer.rs b/streamer/src/streamer.rs index 926342b8769045..a5d005383d548a 100644 --- a/streamer/src/streamer.rs +++ b/streamer/src/streamer.rs @@ -42,7 +42,10 @@ fn recv_loop( let mut now = Instant::now(); let mut num_max_received = 0; // Number of times maximum packets were received loop { - let mut msgs = Packets::new_with_recycler(recycler.clone(), PACKETS_PER_BATCH, name); + let (mut msgs, should_send) = + Packets::new_with_recycler(recycler.clone(), PACKETS_PER_BATCH) + .map(|allocated| (allocated, true)) + .unwrap_or((Packets::with_capacity(PACKETS_PER_BATCH), false)); loop { // Check for exit signal, even if socket is busy // (for instance the leader transaction socket) @@ -55,7 +58,7 @@ fn recv_loop( } recv_count += len; call_count += 1; - if len > 0 { + if len > 0 && should_send { channel.send(msgs)?; } break; @@ -198,7 +201,13 @@ mod test { let send = UdpSocket::bind("127.0.0.1:0").expect("bind"); let exit = Arc::new(AtomicBool::new(false)); let (s_reader, r_reader) = channel(); - let t_receiver = receiver(Arc::new(read), &exit, s_reader, Recycler::default(), "test"); + let t_receiver = receiver( + Arc::new(read), + &exit, + s_reader, + Recycler::new_without_limit(""), + "test", + ); let t_responder = { let (s_responder, r_responder) = channel(); let t_responder = responder("streamer_send_test", Arc::new(send), r_responder);