Skip to content

Commit

Permalink
Fix metric names
Browse files Browse the repository at this point in the history
  • Loading branch information
carllin committed Feb 17, 2021
1 parent 8b2809f commit b32f340
Show file tree
Hide file tree
Showing 18 changed files with 112 additions and 114 deletions.
2 changes: 1 addition & 1 deletion bench-streamer/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ fn main() -> Result<()> {

let mut read_channels = Vec::new();
let mut read_threads = Vec::new();
let recycler = PacketsRecycler::default();
let recycler = PacketsRecycler::new("bench-streamer-recycler-shrink-stats");
for _ in 0..num_sockets {
let read = solana_net_utils::bind_to(ip_addr, port, false).unwrap();
read.set_read_timeout(Some(Duration::new(1, 0))).unwrap();
Expand Down
12 changes: 5 additions & 7 deletions core/src/cluster_info.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1826,7 +1826,7 @@ impl ClusterInfo {
let mut last_contact_info_trace = timestamp();
let mut last_contact_info_save = timestamp();
let mut entrypoints_processed = false;
let recycler = PacketsRecycler::default();
let recycler = PacketsRecycler::new("gossip-recycler-shrink-stats");
let crds_data = vec![
CrdsData::Version(Version::new(self.id())),
CrdsData::NodeInstance(self.instance.with_wallclock(timestamp())),
Expand Down Expand Up @@ -2074,8 +2074,7 @@ impl ClusterInfo {
.process_pull_requests(callers.cloned(), timestamp());
let output_size_limit =
self.update_data_budget(stakes.len()) / PULL_RESPONSE_MIN_SERIALIZED_SIZE;
let mut packets =
Packets::new_with_recycler(recycler.clone(), 64, "handle_pull_requests").unwrap();
let mut packets = Packets::new_with_recycler(recycler.clone(), 64).unwrap();
let (caller_and_filters, addrs): (Vec<_>, Vec<_>) = {
let mut rng = rand::thread_rng();
let check_pull_request =
Expand Down Expand Up @@ -2360,8 +2359,7 @@ impl ClusterInfo {
if packets.is_empty() {
None
} else {
let packets =
Packets::new_with_recycler_data(recycler, "handle_ping_messages", packets).unwrap();
let packets = Packets::new_with_recycler_data(recycler, packets).unwrap();
Some(packets)
}
}
Expand Down Expand Up @@ -2910,7 +2908,7 @@ impl ClusterInfo {
exit: &Arc<AtomicBool>,
) -> JoinHandle<()> {
let exit = exit.clone();
let recycler = PacketsRecycler::default();
let recycler = PacketsRecycler::new("cluster-info-listen-recycler-shrink-stats");
Builder::new()
.name("solana-listen".to_string())
.spawn(move || {
Expand Down Expand Up @@ -3355,7 +3353,7 @@ mod tests {
.iter()
.map(|ping| Pong::new(ping, &this_node).unwrap())
.collect();
let recycler = PacketsRecycler::default();
let recycler = PacketsRecycler::new("");
let packets = cluster_info
.handle_ping_messages(
remote_nodes
Expand Down
3 changes: 2 additions & 1 deletion core/src/fetch_stage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,8 @@ impl FetchStage {
poh_recorder: &Arc<Mutex<PohRecorder>>,
limit: Option<u32>,
) -> Self {
let recycler: PacketsRecycler = Recycler::warmed(1000, 1024, limit);
let recycler: PacketsRecycler =
Recycler::warmed(1000, 1024, limit, "fetch_stage_recycler_shrink");

let tpu_threads = sockets.into_iter().map(|socket| {
streamer::receiver(
Expand Down
2 changes: 1 addition & 1 deletion core/src/gossip_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ impl GossipService {
gossip_socket.clone(),
&exit,
request_sender,
Recycler::default(),
Recycler::new("gossip-receiver-recycler-shrink-stats"),
"gossip_receiver",
);
let (response_sender, response_receiver) = channel();
Expand Down
25 changes: 8 additions & 17 deletions core/src/serve_repair.rs
Original file line number Diff line number Diff line change
Expand Up @@ -279,7 +279,7 @@ impl ServeRepair {
exit: &Arc<AtomicBool>,
) -> JoinHandle<()> {
let exit = exit.clone();
let recycler = PacketsRecycler::default();
let recycler = PacketsRecycler::new("serve-repair-recycler-shrink-stats");
Builder::new()
.name("solana-repair-listen".to_string())
.spawn(move || {
Expand Down Expand Up @@ -498,12 +498,7 @@ impl ServeRepair {

if let Some(packet) = packet {
inc_new_counter_debug!("serve_repair-window-request-ledger", 1);
return Some(Packets::new_with_recycler_data(
recycler,
"run_window_request",
vec![packet],
))
.unwrap();
return Some(Packets::new_with_recycler_data(recycler, vec![packet])).unwrap();
}
}

Expand Down Expand Up @@ -539,11 +534,7 @@ impl ServeRepair {
from_addr,
nonce,
)?;
return Packets::new_with_recycler_data(
recycler,
"run_highest_window_request",
vec![packet],
);
return Packets::new_with_recycler_data(recycler, vec![packet]);
}
None
}
Expand All @@ -556,7 +547,7 @@ impl ServeRepair {
max_responses: usize,
nonce: Nonce,
) -> Option<Packets> {
let mut res = Packets::new_with_recycler(recycler.clone(), 64, "run_orphan").unwrap();
let mut res = Packets::new_with_recycler(recycler.clone(), 64).unwrap();
if let Some(blockstore) = blockstore {
// Try to find the next "n" parent slots of the input slot
while let Ok(Some(meta)) = blockstore.meta(slot) {
Expand Down Expand Up @@ -610,7 +601,7 @@ mod tests {

/// test run_window_request responds with the right shred, and do not overrun
fn run_highest_window_request(slot: Slot, num_slots: u64, nonce: Nonce) {
let recycler = PacketsRecycler::default();
let recycler = PacketsRecycler::new("");
solana_logger::setup();
let ledger_path = get_tmp_ledger_path!();
{
Expand Down Expand Up @@ -678,7 +669,7 @@ mod tests {

/// test window requests respond with the right shred, and do not overrun
fn run_window_request(slot: Slot, nonce: Nonce) {
let recycler = PacketsRecycler::default();
let recycler = PacketsRecycler::new("");
solana_logger::setup();
let ledger_path = get_tmp_ledger_path!();
{
Expand Down Expand Up @@ -846,7 +837,7 @@ mod tests {

fn run_orphan(slot: Slot, num_slots: u64, nonce: Nonce) {
solana_logger::setup();
let recycler = PacketsRecycler::default();
let recycler = PacketsRecycler::new("");
let ledger_path = get_tmp_ledger_path!();
{
let blockstore = Arc::new(Blockstore::open(&ledger_path).unwrap());
Expand Down Expand Up @@ -917,7 +908,7 @@ mod tests {
#[test]
fn run_orphan_corrupted_shred_size() {
solana_logger::setup();
let recycler = PacketsRecycler::default();
let recycler = PacketsRecycler::new("");
let ledger_path = get_tmp_ledger_path!();
{
let blockstore = Arc::new(Blockstore::open(&ledger_path).unwrap());
Expand Down
2 changes: 1 addition & 1 deletion core/src/serve_repair_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ impl ServeRepairService {
serve_repair_socket.clone(),
&exit,
request_sender,
Recycler::default(),
Recycler::new("serve-repair-receiver-recycler-shrink-stats"),
"serve_repair_receiver",
);
let (response_sender, response_receiver) = channel();
Expand Down
3 changes: 2 additions & 1 deletion core/src/shred_fetch_stage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -169,7 +169,8 @@ impl ShredFetchStage {
exit: &Arc<AtomicBool>,
limit: Option<u32>,
) -> Self {
let recycler: PacketsRecycler = Recycler::warmed(100, 1024, limit);
let recycler: PacketsRecycler =
Recycler::warmed(100, 1024, limit, "shred_fetch_stage_recycler_shrink");

let (mut tvu_threads, tvu_filter) = Self::packet_modifier(
sockets,
Expand Down
4 changes: 2 additions & 2 deletions core/src/sigverify.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,8 @@ impl Default for TransactionSigVerifier {
fn default() -> Self {
init();
Self {
recycler: Recycler::warmed(50, 4096, None),
recycler_out: Recycler::warmed(50, 4096, None),
recycler: Recycler::warmed(50, 4096, None, ""),
recycler_out: Recycler::warmed(50, 4096, None, ""),
}
}
}
Expand Down
5 changes: 4 additions & 1 deletion core/src/sigverify_shreds.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,10 @@ impl ShredSigVerifier {
Self {
bank_forks,
leader_schedule_cache,
recycler_cache: RecyclerCache::warmed(),
recycler_cache: RecyclerCache::warmed(
"shred-sig-verifier-offsets-recycler-shrink-stats",
"shred-sig-verifier-buffer-recycler-shrink-stats",
),
}
}
fn read_slots(batches: &[Packets]) -> HashSet<u64> {
Expand Down
2 changes: 1 addition & 1 deletion ledger/benches/sigverify_shreds.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ const NUM_PACKETS: usize = 256;
const NUM_BATCHES: usize = 1;
#[bench]
fn bench_sigverify_shreds_sign_gpu(bencher: &mut Bencher) {
let recycler_cache = RecyclerCache::default();
let recycler_cache = RecyclerCache::new("", "");

let mut packets = Packets::default();
packets.packets.set_pinnable();
Expand Down
18 changes: 12 additions & 6 deletions ledger/src/entry.rs
Original file line number Diff line number Diff line change
Expand Up @@ -225,12 +225,21 @@ pub struct EntryVerificationState {
device_verification_data: DeviceVerificationData,
}

#[derive(Default, Clone)]
#[derive(Clone)]
pub struct VerifyRecyclers {
hash_recycler: Recycler<PinnedVec<Hash>>,
tick_count_recycler: Recycler<PinnedVec<u64>>,
}

impl Default for VerifyRecyclers {
fn default() -> Self {
Self {
hash_recycler: Recycler::new("hash_recycler_shrink_stats"),
tick_count_recycler: Recycler::new("tick_count_recycler_shrink_stats"),
}
}
}

#[derive(PartialEq, Clone, Copy, Debug)]
pub enum EntryVerificationStatus {
Failure,
Expand Down Expand Up @@ -554,15 +563,12 @@ impl EntrySlice for [Entry] {
.take(self.len())
.collect();

let mut hashes_pinned = recyclers.hash_recycler.allocate("poh_verify_hash").unwrap();
let mut hashes_pinned = recyclers.hash_recycler.allocate().unwrap();
hashes_pinned.set_pinnable();
hashes_pinned.resize(hashes.len(), Hash::default());
hashes_pinned.copy_from_slice(&hashes);

let mut num_hashes_vec = recyclers
.tick_count_recycler
.allocate("poh_verify_num_hashes")
.unwrap();
let mut num_hashes_vec = recyclers.tick_count_recycler.allocate().unwrap();
num_hashes_vec.reserve_and_pin(cmp::max(1, self.len()));
for entry in self {
num_hashes_vec.push(entry.num_hashes.saturating_sub(1));
Expand Down
39 changes: 12 additions & 27 deletions ledger/src/sigverify_shreds.rs
Original file line number Diff line number Diff line change
Expand Up @@ -137,10 +137,7 @@ fn slot_key_data_for_gpu<
.push(*slot);
}
}
let mut keyvec = recycler_cache
.buffer()
.allocate("shred_gpu_pubkeys")
.unwrap();
let mut keyvec = recycler_cache.buffer().allocate().unwrap();
keyvec.set_pinnable();
let mut slot_to_key_ix = HashMap::new();

Expand All @@ -155,7 +152,7 @@ fn slot_key_data_for_gpu<
slot_to_key_ix.insert(s, i);
}
}
let mut offsets = recycler_cache.offsets().allocate("shred_offsets").unwrap();
let mut offsets = recycler_cache.offsets().allocate().unwrap();
offsets.set_pinnable();
slots.iter().for_each(|packet_slots| {
packet_slots.iter().for_each(|slot| {
Expand Down Expand Up @@ -188,20 +185,11 @@ fn shred_gpu_offsets(
batches: &[Packets],
recycler_cache: &RecyclerCache,
) -> (TxOffset, TxOffset, TxOffset, Vec<Vec<u32>>) {
let mut signature_offsets = recycler_cache
.offsets()
.allocate("shred_signatures")
.unwrap();
let mut signature_offsets = recycler_cache.offsets().allocate().unwrap();
signature_offsets.set_pinnable();
let mut msg_start_offsets = recycler_cache
.offsets()
.allocate("shred_msg_starts")
.unwrap();
let mut msg_start_offsets = recycler_cache.offsets().allocate().unwrap();
msg_start_offsets.set_pinnable();
let mut msg_sizes = recycler_cache
.offsets()
.allocate("shred_msg_sizes")
.unwrap();
let mut msg_sizes = recycler_cache.offsets().allocate().unwrap();
msg_sizes.set_pinnable();
let mut v_sig_lens = vec![];
for batch in batches.iter() {
Expand Down Expand Up @@ -254,7 +242,7 @@ pub fn verify_shreds_gpu(
trace!("pubkeys_len: {}", pubkeys_len);
let (signature_offsets, msg_start_offsets, msg_sizes, v_sig_lens) =
shred_gpu_offsets(pubkeys_len, batches, recycler_cache);
let mut out = recycler_cache.buffer().allocate("out_buffer").unwrap();
let mut out = recycler_cache.buffer().allocate().unwrap();
out.set_pinnable();
elems.push(
perf_libs::Elems {
Expand Down Expand Up @@ -344,7 +332,7 @@ pub fn sign_shreds_cpu(keypair: &Keypair, batches: &mut [Packets]) {
}

pub fn sign_shreds_gpu_pinned_keypair(keypair: &Keypair, cache: &RecyclerCache) -> PinnedVec<u8> {
let mut vec = cache.buffer().allocate("pinned_keypair").unwrap();
let mut vec = cache.buffer().allocate().unwrap();
let pubkey = keypair.pubkey().to_bytes();
let secret = keypair.secret().to_bytes();
let mut hasher = Sha512::default();
Expand Down Expand Up @@ -382,20 +370,17 @@ pub fn sign_shreds_gpu(
let mut num_packets = num_keypair_packets;

//should be zero
let mut pubkey_offsets = recycler_cache.offsets().allocate("pubkey offsets").unwrap();
let mut pubkey_offsets = recycler_cache.offsets().allocate().unwrap();
pubkey_offsets.resize(count, 0);

let mut secret_offsets = recycler_cache.offsets().allocate("secret_offsets").unwrap();
let mut secret_offsets = recycler_cache.offsets().allocate().unwrap();
secret_offsets.resize(count, pubkey_size as u32);

trace!("offset: {}", offset);
let (signature_offsets, msg_start_offsets, msg_sizes, _v_sig_lens) =
shred_gpu_offsets(offset, batches, recycler_cache);
let total_sigs = signature_offsets.len();
let mut signatures_out = recycler_cache
.buffer()
.allocate("ed25519 signatures")
.unwrap();
let mut signatures_out = recycler_cache.buffer().allocate().unwrap();
signatures_out.set_pinnable();
signatures_out.resize(total_sigs * sig_size, 0);
elems.push(
Expand Down Expand Up @@ -575,7 +560,7 @@ pub mod tests {

fn run_test_sigverify_shreds_gpu(slot: Slot) {
solana_logger::setup();
let recycler_cache = RecyclerCache::default();
let recycler_cache = RecyclerCache::new("", "");

let mut batch = [Packets::default()];
let mut shred = Shred::new_from_data(
Expand Down Expand Up @@ -639,7 +624,7 @@ pub mod tests {

fn run_test_sigverify_shreds_sign_gpu(slot: Slot) {
solana_logger::setup();
let recycler_cache = RecyclerCache::default();
let recycler_cache = RecyclerCache::new("", "");

let mut packets = Packets::default();
let num_packets = 32;
Expand Down
6 changes: 3 additions & 3 deletions perf/benches/sigverify.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,8 @@ fn bench_sigverify(bencher: &mut Bencher) {
// generate packet vector
let batches = to_packets_chunked(&std::iter::repeat(tx).take(128).collect::<Vec<_>>(), 128);

let recycler = Recycler::default();
let recycler_out = Recycler::default();
let recycler = Recycler::new("");
let recycler_out = Recycler::new("");
// verify packets
bencher.iter(|| {
let _ans = sigverify::ed25519_verify(&batches, &recycler, &recycler_out);
Expand All @@ -30,7 +30,7 @@ fn bench_get_offsets(bencher: &mut Bencher) {
// generate packet vector
let batches = to_packets_chunked(&std::iter::repeat(tx).take(1024).collect::<Vec<_>>(), 1024);

let recycler = Recycler::default();
let recycler = Recycler::new("");
// verify packets
bencher.iter(|| {
let _ans = sigverify::generate_offsets(&batches, &recycler);
Expand Down
Loading

0 comments on commit b32f340

Please sign in to comment.