From b32f34093e5e99e363ce21daeeddae232db3d658 Mon Sep 17 00:00:00 2001 From: Carl Lin Date: Tue, 16 Feb 2021 03:33:50 -0800 Subject: [PATCH] Fix metric names --- bench-streamer/src/main.rs | 2 +- core/src/cluster_info.rs | 12 ++++----- core/src/fetch_stage.rs | 3 ++- core/src/gossip_service.rs | 2 +- core/src/serve_repair.rs | 25 ++++++------------- core/src/serve_repair_service.rs | 2 +- core/src/shred_fetch_stage.rs | 3 ++- core/src/sigverify.rs | 4 +-- core/src/sigverify_shreds.rs | 5 +++- ledger/benches/sigverify_shreds.rs | 2 +- ledger/src/entry.rs | 18 +++++++++----- ledger/src/sigverify_shreds.rs | 39 +++++++++--------------------- perf/benches/sigverify.rs | 6 ++--- perf/src/packet.rs | 22 +++++------------ perf/src/recycler.rs | 36 +++++++++++++++++++-------- perf/src/recycler_cache.rs | 15 +++++++++--- perf/src/sigverify.rs | 26 ++++++++++---------- streamer/src/streamer.rs | 4 +-- 18 files changed, 112 insertions(+), 114 deletions(-) diff --git a/bench-streamer/src/main.rs b/bench-streamer/src/main.rs index 4ab14153a5436f..92c1b04ded2b49 100644 --- a/bench-streamer/src/main.rs +++ b/bench-streamer/src/main.rs @@ -74,7 +74,7 @@ fn main() -> Result<()> { let mut read_channels = Vec::new(); let mut read_threads = Vec::new(); - let recycler = PacketsRecycler::default(); + let recycler = PacketsRecycler::new("bench-streamer-recycler-shrink-stats"); for _ in 0..num_sockets { let read = solana_net_utils::bind_to(ip_addr, port, false).unwrap(); read.set_read_timeout(Some(Duration::new(1, 0))).unwrap(); diff --git a/core/src/cluster_info.rs b/core/src/cluster_info.rs index 73a72bee03d0d3..43a69c2ca3194e 100644 --- a/core/src/cluster_info.rs +++ b/core/src/cluster_info.rs @@ -1826,7 +1826,7 @@ impl ClusterInfo { let mut last_contact_info_trace = timestamp(); let mut last_contact_info_save = timestamp(); let mut entrypoints_processed = false; - let recycler = PacketsRecycler::default(); + let recycler = PacketsRecycler::new("gossip-recycler-shrink-stats"); let crds_data = vec![ CrdsData::Version(Version::new(self.id())), CrdsData::NodeInstance(self.instance.with_wallclock(timestamp())), @@ -2074,8 +2074,7 @@ impl ClusterInfo { .process_pull_requests(callers.cloned(), timestamp()); let output_size_limit = self.update_data_budget(stakes.len()) / PULL_RESPONSE_MIN_SERIALIZED_SIZE; - let mut packets = - Packets::new_with_recycler(recycler.clone(), 64, "handle_pull_requests").unwrap(); + let mut packets = Packets::new_with_recycler(recycler.clone(), 64).unwrap(); let (caller_and_filters, addrs): (Vec<_>, Vec<_>) = { let mut rng = rand::thread_rng(); let check_pull_request = @@ -2360,8 +2359,7 @@ impl ClusterInfo { if packets.is_empty() { None } else { - let packets = - Packets::new_with_recycler_data(recycler, "handle_ping_messages", packets).unwrap(); + let packets = Packets::new_with_recycler_data(recycler, packets).unwrap(); Some(packets) } } @@ -2910,7 +2908,7 @@ impl ClusterInfo { exit: &Arc, ) -> JoinHandle<()> { let exit = exit.clone(); - let recycler = PacketsRecycler::default(); + let recycler = PacketsRecycler::new("cluster-info-listen-recycler-shrink-stats"); Builder::new() .name("solana-listen".to_string()) .spawn(move || { @@ -3355,7 +3353,7 @@ mod tests { .iter() .map(|ping| Pong::new(ping, &this_node).unwrap()) .collect(); - let recycler = PacketsRecycler::default(); + let recycler = PacketsRecycler::new(""); let packets = cluster_info .handle_ping_messages( remote_nodes diff --git a/core/src/fetch_stage.rs b/core/src/fetch_stage.rs index 6c5ca6e9803053..f2a660e61f69b5 100644 --- a/core/src/fetch_stage.rs +++ b/core/src/fetch_stage.rs @@ -103,7 +103,8 @@ impl FetchStage { poh_recorder: &Arc>, limit: Option, ) -> Self { - let recycler: PacketsRecycler = Recycler::warmed(1000, 1024, limit); + let recycler: PacketsRecycler = + Recycler::warmed(1000, 1024, limit, "fetch_stage_recycler_shrink"); let tpu_threads = sockets.into_iter().map(|socket| { streamer::receiver( diff --git a/core/src/gossip_service.rs b/core/src/gossip_service.rs index e8b321cd59958f..4d094c614b9e48 100644 --- a/core/src/gossip_service.rs +++ b/core/src/gossip_service.rs @@ -47,7 +47,7 @@ impl GossipService { gossip_socket.clone(), &exit, request_sender, - Recycler::default(), + Recycler::new("gossip-receiver-recycler-shrink-stats"), "gossip_receiver", ); let (response_sender, response_receiver) = channel(); diff --git a/core/src/serve_repair.rs b/core/src/serve_repair.rs index 45dbb0ce1c0201..3a8e62f9a48cf2 100644 --- a/core/src/serve_repair.rs +++ b/core/src/serve_repair.rs @@ -279,7 +279,7 @@ impl ServeRepair { exit: &Arc, ) -> JoinHandle<()> { let exit = exit.clone(); - let recycler = PacketsRecycler::default(); + let recycler = PacketsRecycler::new("serve-repair-recycler-shrink-stats"); Builder::new() .name("solana-repair-listen".to_string()) .spawn(move || { @@ -498,12 +498,7 @@ impl ServeRepair { if let Some(packet) = packet { inc_new_counter_debug!("serve_repair-window-request-ledger", 1); - return Some(Packets::new_with_recycler_data( - recycler, - "run_window_request", - vec![packet], - )) - .unwrap(); + return Some(Packets::new_with_recycler_data(recycler, vec![packet])).unwrap(); } } @@ -539,11 +534,7 @@ impl ServeRepair { from_addr, nonce, )?; - return Packets::new_with_recycler_data( - recycler, - "run_highest_window_request", - vec![packet], - ); + return Packets::new_with_recycler_data(recycler, vec![packet]); } None } @@ -556,7 +547,7 @@ impl ServeRepair { max_responses: usize, nonce: Nonce, ) -> Option { - let mut res = Packets::new_with_recycler(recycler.clone(), 64, "run_orphan").unwrap(); + let mut res = Packets::new_with_recycler(recycler.clone(), 64).unwrap(); if let Some(blockstore) = blockstore { // Try to find the next "n" parent slots of the input slot while let Ok(Some(meta)) = blockstore.meta(slot) { @@ -610,7 +601,7 @@ mod tests { /// test run_window_request responds with the right shred, and do not overrun fn run_highest_window_request(slot: Slot, num_slots: u64, nonce: Nonce) { - let recycler = PacketsRecycler::default(); + let recycler = PacketsRecycler::new(""); solana_logger::setup(); let ledger_path = get_tmp_ledger_path!(); { @@ -678,7 +669,7 @@ mod tests { /// test window requests respond with the right shred, and do not overrun fn run_window_request(slot: Slot, nonce: Nonce) { - let recycler = PacketsRecycler::default(); + let recycler = PacketsRecycler::new(""); solana_logger::setup(); let ledger_path = get_tmp_ledger_path!(); { @@ -846,7 +837,7 @@ mod tests { fn run_orphan(slot: Slot, num_slots: u64, nonce: Nonce) { solana_logger::setup(); - let recycler = PacketsRecycler::default(); + let recycler = PacketsRecycler::new(""); let ledger_path = get_tmp_ledger_path!(); { let blockstore = Arc::new(Blockstore::open(&ledger_path).unwrap()); @@ -917,7 +908,7 @@ mod tests { #[test] fn run_orphan_corrupted_shred_size() { solana_logger::setup(); - let recycler = PacketsRecycler::default(); + let recycler = PacketsRecycler::new(""); let ledger_path = get_tmp_ledger_path!(); { let blockstore = Arc::new(Blockstore::open(&ledger_path).unwrap()); diff --git a/core/src/serve_repair_service.rs b/core/src/serve_repair_service.rs index f252ab1e4ba593..cf0bf3ec0e387f 100644 --- a/core/src/serve_repair_service.rs +++ b/core/src/serve_repair_service.rs @@ -30,7 +30,7 @@ impl ServeRepairService { serve_repair_socket.clone(), &exit, request_sender, - Recycler::default(), + Recycler::new("serve-repair-receiver-recycler-shrink-stats"), "serve_repair_receiver", ); let (response_sender, response_receiver) = channel(); diff --git a/core/src/shred_fetch_stage.rs b/core/src/shred_fetch_stage.rs index 22a6e0fe5091d7..01b1f177b34eab 100644 --- a/core/src/shred_fetch_stage.rs +++ b/core/src/shred_fetch_stage.rs @@ -169,7 +169,8 @@ impl ShredFetchStage { exit: &Arc, limit: Option, ) -> Self { - let recycler: PacketsRecycler = Recycler::warmed(100, 1024, limit); + let recycler: PacketsRecycler = + Recycler::warmed(100, 1024, limit, "shred_fetch_stage_recycler_shrink"); let (mut tvu_threads, tvu_filter) = Self::packet_modifier( sockets, diff --git a/core/src/sigverify.rs b/core/src/sigverify.rs index fba72c3c618019..8228cbeeb92f0b 100644 --- a/core/src/sigverify.rs +++ b/core/src/sigverify.rs @@ -23,8 +23,8 @@ impl Default for TransactionSigVerifier { fn default() -> Self { init(); Self { - recycler: Recycler::warmed(50, 4096, None), - recycler_out: Recycler::warmed(50, 4096, None), + recycler: Recycler::warmed(50, 4096, None, ""), + recycler_out: Recycler::warmed(50, 4096, None, ""), } } } diff --git a/core/src/sigverify_shreds.rs b/core/src/sigverify_shreds.rs index c256af65579138..d7e44622c65798 100644 --- a/core/src/sigverify_shreds.rs +++ b/core/src/sigverify_shreds.rs @@ -26,7 +26,10 @@ impl ShredSigVerifier { Self { bank_forks, leader_schedule_cache, - recycler_cache: RecyclerCache::warmed(), + recycler_cache: RecyclerCache::warmed( + "shred-sig-verifier-offsets-recycler-shrink-stats", + "shred-sig-verifier-buffer-recycler-shrink-stats", + ), } } fn read_slots(batches: &[Packets]) -> HashSet { diff --git a/ledger/benches/sigverify_shreds.rs b/ledger/benches/sigverify_shreds.rs index 116e1fd77ae7a4..0fd83dd0376a96 100644 --- a/ledger/benches/sigverify_shreds.rs +++ b/ledger/benches/sigverify_shreds.rs @@ -16,7 +16,7 @@ const NUM_PACKETS: usize = 256; const NUM_BATCHES: usize = 1; #[bench] fn bench_sigverify_shreds_sign_gpu(bencher: &mut Bencher) { - let recycler_cache = RecyclerCache::default(); + let recycler_cache = RecyclerCache::new("", ""); let mut packets = Packets::default(); packets.packets.set_pinnable(); diff --git a/ledger/src/entry.rs b/ledger/src/entry.rs index fabf7b9b3ca1db..311ac42bc64763 100644 --- a/ledger/src/entry.rs +++ b/ledger/src/entry.rs @@ -225,12 +225,21 @@ pub struct EntryVerificationState { device_verification_data: DeviceVerificationData, } -#[derive(Default, Clone)] +#[derive(Clone)] pub struct VerifyRecyclers { hash_recycler: Recycler>, tick_count_recycler: Recycler>, } +impl Default for VerifyRecyclers { + fn default() -> Self { + Self { + hash_recycler: Recycler::new("hash_recycler_shrink_stats"), + tick_count_recycler: Recycler::new("tick_count_recycler_shrink_stats"), + } + } +} + #[derive(PartialEq, Clone, Copy, Debug)] pub enum EntryVerificationStatus { Failure, @@ -554,15 +563,12 @@ impl EntrySlice for [Entry] { .take(self.len()) .collect(); - let mut hashes_pinned = recyclers.hash_recycler.allocate("poh_verify_hash").unwrap(); + let mut hashes_pinned = recyclers.hash_recycler.allocate().unwrap(); hashes_pinned.set_pinnable(); hashes_pinned.resize(hashes.len(), Hash::default()); hashes_pinned.copy_from_slice(&hashes); - let mut num_hashes_vec = recyclers - .tick_count_recycler - .allocate("poh_verify_num_hashes") - .unwrap(); + let mut num_hashes_vec = recyclers.tick_count_recycler.allocate().unwrap(); num_hashes_vec.reserve_and_pin(cmp::max(1, self.len())); for entry in self { num_hashes_vec.push(entry.num_hashes.saturating_sub(1)); diff --git a/ledger/src/sigverify_shreds.rs b/ledger/src/sigverify_shreds.rs index d9dac15a3cf081..6601f07a36d142 100644 --- a/ledger/src/sigverify_shreds.rs +++ b/ledger/src/sigverify_shreds.rs @@ -137,10 +137,7 @@ fn slot_key_data_for_gpu< .push(*slot); } } - let mut keyvec = recycler_cache - .buffer() - .allocate("shred_gpu_pubkeys") - .unwrap(); + let mut keyvec = recycler_cache.buffer().allocate().unwrap(); keyvec.set_pinnable(); let mut slot_to_key_ix = HashMap::new(); @@ -155,7 +152,7 @@ fn slot_key_data_for_gpu< slot_to_key_ix.insert(s, i); } } - let mut offsets = recycler_cache.offsets().allocate("shred_offsets").unwrap(); + let mut offsets = recycler_cache.offsets().allocate().unwrap(); offsets.set_pinnable(); slots.iter().for_each(|packet_slots| { packet_slots.iter().for_each(|slot| { @@ -188,20 +185,11 @@ fn shred_gpu_offsets( batches: &[Packets], recycler_cache: &RecyclerCache, ) -> (TxOffset, TxOffset, TxOffset, Vec>) { - let mut signature_offsets = recycler_cache - .offsets() - .allocate("shred_signatures") - .unwrap(); + let mut signature_offsets = recycler_cache.offsets().allocate().unwrap(); signature_offsets.set_pinnable(); - let mut msg_start_offsets = recycler_cache - .offsets() - .allocate("shred_msg_starts") - .unwrap(); + let mut msg_start_offsets = recycler_cache.offsets().allocate().unwrap(); msg_start_offsets.set_pinnable(); - let mut msg_sizes = recycler_cache - .offsets() - .allocate("shred_msg_sizes") - .unwrap(); + let mut msg_sizes = recycler_cache.offsets().allocate().unwrap(); msg_sizes.set_pinnable(); let mut v_sig_lens = vec![]; for batch in batches.iter() { @@ -254,7 +242,7 @@ pub fn verify_shreds_gpu( trace!("pubkeys_len: {}", pubkeys_len); let (signature_offsets, msg_start_offsets, msg_sizes, v_sig_lens) = shred_gpu_offsets(pubkeys_len, batches, recycler_cache); - let mut out = recycler_cache.buffer().allocate("out_buffer").unwrap(); + let mut out = recycler_cache.buffer().allocate().unwrap(); out.set_pinnable(); elems.push( perf_libs::Elems { @@ -344,7 +332,7 @@ pub fn sign_shreds_cpu(keypair: &Keypair, batches: &mut [Packets]) { } pub fn sign_shreds_gpu_pinned_keypair(keypair: &Keypair, cache: &RecyclerCache) -> PinnedVec { - let mut vec = cache.buffer().allocate("pinned_keypair").unwrap(); + let mut vec = cache.buffer().allocate().unwrap(); let pubkey = keypair.pubkey().to_bytes(); let secret = keypair.secret().to_bytes(); let mut hasher = Sha512::default(); @@ -382,20 +370,17 @@ pub fn sign_shreds_gpu( let mut num_packets = num_keypair_packets; //should be zero - let mut pubkey_offsets = recycler_cache.offsets().allocate("pubkey offsets").unwrap(); + let mut pubkey_offsets = recycler_cache.offsets().allocate().unwrap(); pubkey_offsets.resize(count, 0); - let mut secret_offsets = recycler_cache.offsets().allocate("secret_offsets").unwrap(); + let mut secret_offsets = recycler_cache.offsets().allocate().unwrap(); secret_offsets.resize(count, pubkey_size as u32); trace!("offset: {}", offset); let (signature_offsets, msg_start_offsets, msg_sizes, _v_sig_lens) = shred_gpu_offsets(offset, batches, recycler_cache); let total_sigs = signature_offsets.len(); - let mut signatures_out = recycler_cache - .buffer() - .allocate("ed25519 signatures") - .unwrap(); + let mut signatures_out = recycler_cache.buffer().allocate().unwrap(); signatures_out.set_pinnable(); signatures_out.resize(total_sigs * sig_size, 0); elems.push( @@ -575,7 +560,7 @@ pub mod tests { fn run_test_sigverify_shreds_gpu(slot: Slot) { solana_logger::setup(); - let recycler_cache = RecyclerCache::default(); + let recycler_cache = RecyclerCache::new("", ""); let mut batch = [Packets::default()]; let mut shred = Shred::new_from_data( @@ -639,7 +624,7 @@ pub mod tests { fn run_test_sigverify_shreds_sign_gpu(slot: Slot) { solana_logger::setup(); - let recycler_cache = RecyclerCache::default(); + let recycler_cache = RecyclerCache::new("", ""); let mut packets = Packets::default(); let num_packets = 32; diff --git a/perf/benches/sigverify.rs b/perf/benches/sigverify.rs index f1f41ced9702ae..e07790d9cf7530 100644 --- a/perf/benches/sigverify.rs +++ b/perf/benches/sigverify.rs @@ -15,8 +15,8 @@ fn bench_sigverify(bencher: &mut Bencher) { // generate packet vector let batches = to_packets_chunked(&std::iter::repeat(tx).take(128).collect::>(), 128); - let recycler = Recycler::default(); - let recycler_out = Recycler::default(); + let recycler = Recycler::new(""); + let recycler_out = Recycler::new(""); // verify packets bencher.iter(|| { let _ans = sigverify::ed25519_verify(&batches, &recycler, &recycler_out); @@ -30,7 +30,7 @@ fn bench_get_offsets(bencher: &mut Bencher) { // generate packet vector let batches = to_packets_chunked(&std::iter::repeat(tx).take(1024).collect::>(), 1024); - let recycler = Recycler::default(); + let recycler = Recycler::new(""); // verify packets bencher.iter(|| { let _ans = sigverify::generate_offsets(&batches, &recycler); diff --git a/perf/src/packet.rs b/perf/src/packet.rs index 751bce9515dde3..b433a535f485fa 100644 --- a/perf/src/packet.rs +++ b/perf/src/packet.rs @@ -29,12 +29,8 @@ impl Packets { Packets { packets } } - pub fn new_with_recycler( - recycler: PacketsRecycler, - size: usize, - name: &'static str, - ) -> Option { - let maybe_packets = recycler.allocate(name); + pub fn new_with_recycler(recycler: PacketsRecycler, size: usize) -> Option { + let maybe_packets = recycler.allocate(); maybe_packets.map(|mut packets| { packets.reserve_and_pin(size); Packets { packets } @@ -42,10 +38,9 @@ impl Packets { } pub fn new_with_recycler_data( recycler: &PacketsRecycler, - name: &'static str, mut packets: Vec, ) -> Option { - Self::new_with_recycler(recycler.clone(), packets.len(), name).map(|mut vec| { + Self::new_with_recycler(recycler.clone(), packets.len()).map(|mut vec| { vec.packets.append(&mut packets); vec }) @@ -84,12 +79,7 @@ pub fn to_packets_with_destination( recycler: PacketsRecycler, dests_and_data: &[(SocketAddr, T)], ) -> Packets { - let mut out = Packets::new_with_recycler( - recycler, - dests_and_data.len(), - "to_packets_with_destination", - ) - .unwrap(); + let mut out = Packets::new_with_recycler(recycler, dests_and_data.len()).unwrap(); out.packets.resize(dests_and_data.len(), Packet::default()); for (dest_and_data, o) in dests_and_data.iter().zip(out.packets.iter_mut()) { if !dest_and_data.0.ip().is_unspecified() && dest_and_data.0.port() != 0 { @@ -147,9 +137,9 @@ mod tests { #[test] fn test_to_packets_pinning() { - let recycler = PacketsRecycler::default(); + let recycler = PacketsRecycler::new(""); for i in 0..2 { - let _first_packets = Packets::new_with_recycler(recycler.clone(), i + 1, "first one"); + let _first_packets = Packets::new_with_recycler(recycler.clone(), i + 1); } } } diff --git a/perf/src/recycler.rs b/perf/src/recycler.rs index 6d84f73eec544a..f0524cd77d3cf0 100644 --- a/perf/src/recycler.rs +++ b/perf/src/recycler.rs @@ -28,9 +28,9 @@ struct RecyclerShrinkStats { } impl RecyclerShrinkStats { - fn report(&self) { + fn report(&self, shrink_metric_name: &'static str) { datapoint_info!( - "recycler_shrink", + shrink_metric_name, ("target_size", self.target_size as i64, i64), ("resulting_size", self.resulting_size as i64, i64), ("ideal_num_to_remove", self.ideal_num_to_remove as i64, i64), @@ -40,9 +40,19 @@ impl RecyclerShrinkStats { } } -#[derive(Clone, Default)] +#[derive(Clone)] pub struct Recycler { recycler: Arc>, + shrink_metric_name: &'static str, +} + +impl Recycler { + pub fn new(shrink_metric_name: &'static str) -> Self { + Self { + recycler: Arc::new(RecyclerX::default()), + shrink_metric_name, + } + } } #[derive(Debug)] @@ -138,15 +148,21 @@ fn warm_recyclers() -> bool { } impl Recycler { - pub fn warmed(num: u32, size_hint: usize, limit: Option) -> Self { + pub fn warmed( + num: u32, + size_hint: usize, + limit: Option, + shrink_metric_name: &'static str, + ) -> Self { assert!(num <= limit.unwrap_or(std::u32::MAX)); let new = Self { recycler: Arc::new(RecyclerX::new(limit)), + shrink_metric_name, }; if warm_recyclers() { let warmed_items: Vec<_> = (0..num) .map(|_| { - let mut item = new.allocate("warming").unwrap(); + let mut item = new.allocate().unwrap(); item.warm(size_hint); item }) @@ -158,7 +174,7 @@ impl Recycler { new } - pub fn allocate(&self, name: &'static str) -> Option { + pub fn allocate(&self) -> Option { let mut shrink_removed_objects = vec![]; let (mut allocated_object, did_reuse, should_allocate_new, mut shrink_stats) = { let mut object_pool = self @@ -245,7 +261,7 @@ impl Recycler { if let Some(shrink_stats) = shrink_stats.as_mut() { shrink_stats.drop_elapsed = shrink_removed_object_elapsed.as_us(); - shrink_stats.report(); + shrink_stats.report(self.shrink_metric_name); } if did_reuse { @@ -312,14 +328,14 @@ mod tests { #[test] fn test_recycler() { - let recycler = Recycler::default(); - let mut y: u64 = recycler.allocate("test_recycler1"); + let recycler = Recycler::new(""); + let mut y: u64 = recycler.allocate().unwrap(); assert_eq!(y, 0); y = 20; let recycler2 = recycler.clone(); recycler2.recycler.recycle(y); assert_eq!(recycler.recycler.gc.lock().unwrap().len(), 1); - let z = recycler.allocate("test_recycler2"); + let z = recycler.allocate().unwrap(); assert_eq!(z, 10); assert_eq!(recycler.recycler.gc.lock().unwrap().len(), 0); } diff --git a/perf/src/recycler_cache.rs b/perf/src/recycler_cache.rs index ef720f676ec164..f6005a1d687c3d 100644 --- a/perf/src/recycler_cache.rs +++ b/perf/src/recycler_cache.rs @@ -2,17 +2,24 @@ use crate::cuda_runtime::PinnedVec; use crate::recycler::Recycler; use crate::sigverify::TxOffset; -#[derive(Default, Clone)] +#[derive(Clone)] pub struct RecyclerCache { recycler_offsets: Recycler, recycler_buffer: Recycler>, } impl RecyclerCache { - pub fn warmed() -> Self { + pub fn new(offsets_shrink_name: &'static str, buffer_shrink_name: &'static str) -> Self { Self { - recycler_offsets: Recycler::warmed(50, 4096, None), - recycler_buffer: Recycler::warmed(50, 4096, None), + recycler_offsets: Recycler::new(offsets_shrink_name), + recycler_buffer: Recycler::new(buffer_shrink_name), + } + } + + pub fn warmed(offsets_shrink_name: &'static str, buffer_shrink_name: &'static str) -> Self { + Self { + recycler_offsets: Recycler::warmed(50, 4096, None, offsets_shrink_name), + recycler_buffer: Recycler::warmed(50, 4096, None, buffer_shrink_name), } } pub fn offsets(&self) -> &Recycler { diff --git a/perf/src/sigverify.rs b/perf/src/sigverify.rs index e17a0bf99cc9bd..b1789acb61b1d1 100644 --- a/perf/src/sigverify.rs +++ b/perf/src/sigverify.rs @@ -194,13 +194,13 @@ fn get_packet_offsets(packet: &Packet, current_offset: u32) -> PacketOffsets { pub fn generate_offsets(batches: &[Packets], recycler: &Recycler) -> TxOffsets { debug!("allocating.."); - let mut signature_offsets: PinnedVec<_> = recycler.allocate("sig_offsets").unwrap(); + let mut signature_offsets: PinnedVec<_> = recycler.allocate().unwrap(); signature_offsets.set_pinnable(); - let mut pubkey_offsets: PinnedVec<_> = recycler.allocate("pubkey_offsets").unwrap(); + let mut pubkey_offsets: PinnedVec<_> = recycler.allocate().unwrap(); pubkey_offsets.set_pinnable(); - let mut msg_start_offsets: PinnedVec<_> = recycler.allocate("msg_start_offsets").unwrap(); + let mut msg_start_offsets: PinnedVec<_> = recycler.allocate().unwrap(); msg_start_offsets.set_pinnable(); - let mut msg_sizes: PinnedVec<_> = recycler.allocate("msg_size_offsets").unwrap(); + let mut msg_sizes: PinnedVec<_> = recycler.allocate().unwrap(); msg_sizes.set_pinnable(); let mut current_packet = 0; let mut v_sig_lens = Vec::new(); @@ -347,7 +347,7 @@ pub fn ed25519_verify( debug!("CUDA ECDSA for {}", batch_size(batches)); debug!("allocating out.."); - let mut out = recycler_out.allocate("out_buffer").unwrap(); + let mut out = recycler_out.allocate().unwrap(); out.set_pinnable(); let mut elems = Vec::new(); let mut rvs = Vec::new(); @@ -678,8 +678,8 @@ mod tests { let batches = generate_packet_vec(&packet, n, 2); - let recycler = Recycler::default(); - let recycler_out = Recycler::default(); + let recycler = Recycler::new(""); + let recycler_out = Recycler::new(""); // verify packets let ans = sigverify::ed25519_verify(&batches, &recycler, &recycler_out); @@ -697,8 +697,8 @@ mod tests { let batches = generate_packet_vec(&packet, 1, 1); - let recycler = Recycler::default(); - let recycler_out = Recycler::default(); + let recycler = Recycler::new(""); + let recycler_out = Recycler::new(""); // verify packets let ans = sigverify::ed25519_verify(&batches, &recycler, &recycler_out); @@ -735,8 +735,8 @@ mod tests { batches[0].packets.push(packet); - let recycler = Recycler::default(); - let recycler_out = Recycler::default(); + let recycler = Recycler::new(""); + let recycler_out = Recycler::new(""); // verify packets let ans = sigverify::ed25519_verify(&batches, &recycler, &recycler_out); @@ -755,8 +755,8 @@ mod tests { let tx = test_multisig_tx(); let packet = sigverify::make_packet_from_transaction(tx); - let recycler = Recycler::default(); - let recycler_out = Recycler::default(); + let recycler = Recycler::new(""); + let recycler_out = Recycler::new(""); for _ in 0..50 { let n = thread_rng().gen_range(1, 30); let num_batches = thread_rng().gen_range(2, 30); diff --git a/streamer/src/streamer.rs b/streamer/src/streamer.rs index 8a8c7f28f8f1d2..34e58212abee02 100644 --- a/streamer/src/streamer.rs +++ b/streamer/src/streamer.rs @@ -43,7 +43,7 @@ fn recv_loop( let mut num_max_received = 0; // Number of times maximum packets were received loop { let (mut msgs, should_send) = - Packets::new_with_recycler(recycler.clone(), PACKETS_PER_BATCH, name) + Packets::new_with_recycler(recycler.clone(), PACKETS_PER_BATCH) .map(|allocated| (allocated, true)) .unwrap_or((Packets::with_capacity(PACKETS_PER_BATCH), false)); loop { @@ -201,7 +201,7 @@ mod test { let send = UdpSocket::bind("127.0.0.1:0").expect("bind"); let exit = Arc::new(AtomicBool::new(false)); let (s_reader, r_reader) = channel(); - let t_receiver = receiver(Arc::new(read), &exit, s_reader, Recycler::default(), "test"); + let t_receiver = receiver(Arc::new(read), &exit, s_reader, Recycler::new(""), "test"); let t_responder = { let (s_responder, r_responder) = channel(); let t_responder = responder("streamer_send_test", Arc::new(send), r_responder);