From 3a6d593401bfbbf3ecf459e1fa7ead1613041954 Mon Sep 17 00:00:00 2001 From: Andrew Fitzgerald Date: Thu, 12 Dec 2024 16:46:04 -0600 Subject: [PATCH] Remove Tracer packet as a concept (#4043) --- CHANGELOG.md | 1 + banking-bench/src/main.rs | 2 +- core/benches/banking_stage.rs | 12 +- core/benches/forwarder.rs | 36 +-- core/benches/sigverify_stage.rs | 20 +- core/src/banking_simulation.rs | 3 +- core/src/banking_stage.rs | 19 +- core/src/banking_stage/forwarder.rs | 15 +- core/src/banking_stage/packet_deserializer.rs | 40 +--- core/src/banking_stage/packet_receiver.rs | 14 +- .../scheduler_controller.rs | 6 +- .../unprocessed_packet_batches.rs | 13 +- .../unprocessed_transaction_storage.rs | 114 ++-------- core/src/banking_trace.rs | 7 +- core/src/cluster_info_vote_listener.rs | 2 +- core/src/lib.rs | 1 - core/src/sigverify.rs | 86 +------ core/src/sigverify_stage.rs | 143 +++--------- core/src/tracer_packet_stats.rs | 214 ------------------ perf/benches/dedup.rs | 2 +- perf/benches/shrink.rs | 2 +- perf/src/deduper.rs | 42 ++-- perf/src/sigverify.rs | 50 +--- sdk/packet/src/lib.rs | 15 +- 24 files changed, 123 insertions(+), 736 deletions(-) delete mode 100644 core/src/tracer_packet_stats.rs diff --git a/CHANGELOG.md b/CHANGELOG.md index 4ef3bbb57bb3f2..666e2c7cfa24b1 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -18,6 +18,7 @@ Release channels have their own copy of this changelog: * CLI: * Add global `--skip-preflight` option for skipping preflight checks on all transactions sent through RPC. This flag, along with `--use-rpc`, can improve success rate with program deployments using the public RPC nodes. * Unhide `--accounts-db-access-storages-method` for agave-validator and agave-ledger-tool + * Remove tracer stats from banking-trace. `banking-trace` directory should be cleared when restarting on v2.2 for first time. It will not break if not cleared, but the file will be a mix of new/old format. (#4043) ## [2.1.0] * Breaking: diff --git a/banking-bench/src/main.rs b/banking-bench/src/main.rs index 287930bf4c0fe0..a95cbd320a8d37 100644 --- a/banking-bench/src/main.rs +++ b/banking-bench/src/main.rs @@ -504,7 +504,7 @@ fn main() { timestamp(), ); non_vote_sender - .send(BankingPacketBatch::new((vec![packet_batch.clone()], None))) + .send(BankingPacketBatch::new(vec![packet_batch.clone()])) .unwrap(); } diff --git a/core/benches/banking_stage.rs b/core/benches/banking_stage.rs index 0f449719ce34cb..673b993274fb66 100644 --- a/core/benches/banking_stage.rs +++ b/core/benches/banking_stage.rs @@ -319,16 +319,14 @@ fn bench_banking(bencher: &mut Bencher, tx_type: TransactionType) { let mut sent = 0; if let Some(vote_packets) = &vote_packets { tpu_vote_sender - .send(BankingPacketBatch::new(( + .send(BankingPacketBatch::new( vote_packets[start..start + chunk_len].to_vec(), - None, - ))) + )) .unwrap(); gossip_vote_sender - .send(BankingPacketBatch::new(( + .send(BankingPacketBatch::new( vote_packets[start..start + chunk_len].to_vec(), - None, - ))) + )) .unwrap(); } for v in verified[start..start + chunk_len].chunks(chunk_len / num_threads) { @@ -343,7 +341,7 @@ fn bench_banking(bencher: &mut Bencher, tx_type: TransactionType) { sent += xv.len(); } non_vote_sender - .send(BankingPacketBatch::new((v.to_vec(), None))) + .send(BankingPacketBatch::new(v.to_vec())) .unwrap(); } diff --git a/core/benches/forwarder.rs b/core/benches/forwarder.rs index 10a050f3d97d4b..80f401d9f5c834 100644 --- a/core/benches/forwarder.rs +++ b/core/benches/forwarder.rs @@ -3,15 +3,12 @@ extern crate test; use { itertools::Itertools, solana_client::connection_cache::ConnectionCache, - solana_core::{ - banking_stage::{ - forwarder::Forwarder, - leader_slot_metrics::LeaderSlotMetricsTracker, - unprocessed_packet_batches::{DeserializedPacket, UnprocessedPacketBatches}, - unprocessed_transaction_storage::{ThreadType, UnprocessedTransactionStorage}, - BankingStageStats, - }, - tracer_packet_stats::TracerPacketStats, + solana_core::banking_stage::{ + forwarder::Forwarder, + leader_slot_metrics::LeaderSlotMetricsTracker, + unprocessed_packet_batches::{DeserializedPacket, UnprocessedPacketBatches}, + unprocessed_transaction_storage::{ThreadType, UnprocessedTransactionStorage}, + BankingStageStats, }, solana_gossip::cluster_info::{ClusterInfo, Node}, solana_ledger::{ @@ -38,7 +35,6 @@ struct BenchSetup { unprocessed_packet_batches: UnprocessedTransactionStorage, tracker: LeaderSlotMetricsTracker, stats: BankingStageStats, - tracer_stats: TracerPacketStats, } fn setup(num_packets: usize, contentious_transaction: bool) -> BenchSetup { @@ -88,7 +84,6 @@ fn setup(num_packets: usize, contentious_transaction: bool) -> BenchSetup { transaction.message.account_keys[0] = solana_sdk::pubkey::Pubkey::new_unique(); } let mut packet = Packet::from_data(None, transaction).unwrap(); - packet.meta_mut().set_tracer(true); packet.meta_mut().set_from_staked_node(true); DeserializedPacket::new(packet).unwrap() }) @@ -118,7 +113,6 @@ fn setup(num_packets: usize, contentious_transaction: bool) -> BenchSetup { unprocessed_packet_batches, tracker: LeaderSlotMetricsTracker::new(0), stats: BankingStageStats::default(), - tracer_stats: TracerPacketStats::new(0), } } @@ -132,19 +126,12 @@ fn bench_forwarder_handle_forwading_contentious_transaction(bencher: &mut Benche mut unprocessed_packet_batches, mut tracker, stats, - mut tracer_stats, } = setup(num_packets, true); // hold packets so they can be reused for benching let hold = true; bencher.iter(|| { - forwarder.handle_forwarding( - &mut unprocessed_packet_batches, - hold, - &mut tracker, - &stats, - &mut tracer_stats, - ); + forwarder.handle_forwarding(&mut unprocessed_packet_batches, hold, &mut tracker, &stats); // reset packet.forwarded flag to reuse `unprocessed_packet_batches` if let UnprocessedTransactionStorage::LocalTransactionStorage(unprocessed_packets) = &mut unprocessed_packet_batches @@ -169,19 +156,12 @@ fn bench_forwarder_handle_forwading_parallel_transactions(bencher: &mut Bencher) mut unprocessed_packet_batches, mut tracker, stats, - mut tracer_stats, } = setup(num_packets, false); // hold packets so they can be reused for benching let hold = true; bencher.iter(|| { - forwarder.handle_forwarding( - &mut unprocessed_packet_batches, - hold, - &mut tracker, - &stats, - &mut tracer_stats, - ); + forwarder.handle_forwarding(&mut unprocessed_packet_batches, hold, &mut tracker, &stats); // reset packet.forwarded flag to reuse `unprocessed_packet_batches` if let UnprocessedTransactionStorage::LocalTransactionStorage(unprocessed_packets) = &mut unprocessed_packet_batches diff --git a/core/benches/sigverify_stage.rs b/core/benches/sigverify_stage.rs index 3f11cc150574d3..30220b0aa1d113 100644 --- a/core/benches/sigverify_stage.rs +++ b/core/benches/sigverify_stage.rs @@ -23,7 +23,6 @@ use { }, solana_sdk::{ hash::Hash, - packet::PacketFlags, signature::{Keypair, Signer}, system_transaction, }, @@ -58,7 +57,7 @@ fn run_bench_packet_discard(num_ips: usize, bencher: &mut Bencher) { info!("total packets: {}", total); bencher.iter(move || { - SigVerifyStage::discard_excess_packets(&mut batches, 10_000, |_| ()); + SigVerifyStage::discard_excess_packets(&mut batches, 10_000); let mut num_packets = 0; for batch in batches.iter_mut() { for p in batch.iter_mut() { @@ -105,7 +104,7 @@ fn bench_packet_discard_mixed_senders(bencher: &mut Bencher) { } } bencher.iter(move || { - SigVerifyStage::discard_excess_packets(&mut batches, 10_000, |_| ()); + SigVerifyStage::discard_excess_packets(&mut batches, 10_000); let mut num_packets = 0; for batch in batches.iter_mut() { for packet in batch.iter_mut() { @@ -171,24 +170,17 @@ fn bench_sigverify_stage(bencher: &mut Bencher, use_same_tx: bool) { ); let mut sent_len = 0; - for mut batch in batches.into_iter() { + for batch in batches.into_iter() { sent_len += batch.len(); - batch - .iter_mut() - .for_each(|packet| packet.meta_mut().flags |= PacketFlags::TRACER_PACKET); packet_s.send(batch).unwrap(); } let mut received = 0; - let mut total_tracer_packets_received_in_sigverify_stage = 0; trace!("sent: {}", sent_len); loop { - if let Ok(message) = verified_r.recv_timeout(Duration::from_millis(10)) { - let (verifieds, tracer_packet_stats) = (&message.0, message.1.as_ref().unwrap()); + if let Ok(verifieds) = verified_r.recv_timeout(Duration::from_millis(10)) { received += verifieds.iter().map(|batch| batch.len()).sum::(); - total_tracer_packets_received_in_sigverify_stage += - tracer_packet_stats.total_tracer_packets_received_in_sigverify_stage; - test::black_box(message); - if total_tracer_packets_received_in_sigverify_stage >= sent_len { + test::black_box(verifieds); + if received >= sent_len { break; } } diff --git a/core/src/banking_simulation.rs b/core/src/banking_simulation.rs index 6625011c88d0a7..3cd9ecab5977fc 100644 --- a/core/src/banking_simulation.rs +++ b/core/src/banking_simulation.rs @@ -822,8 +822,7 @@ impl BankingSimulator { let timed_batches_to_send = packet_batches_by_time.split_off(&base_event_time); let batch_and_tx_counts = timed_batches_to_send .values() - .map(|(_label, batches_with_stats)| { - let batches = &batches_with_stats.0; + .map(|(_label, batches)| { ( batches.len(), batches.iter().map(|batch| batch.len()).sum::(), diff --git a/core/src/banking_stage.rs b/core/src/banking_stage.rs index 49ccdb6ae15eff..2bc7148743657c 100644 --- a/core/src/banking_stage.rs +++ b/core/src/banking_stage.rs @@ -25,7 +25,6 @@ use { }, }, banking_trace::BankingPacketReceiver, - tracer_packet_stats::TracerPacketStats, validator::BlockProductionMethod, }, crossbeam_channel::{unbounded, Receiver, RecvTimeoutError, Sender}, @@ -319,8 +318,6 @@ pub enum ForwardOption { #[derive(Debug, Default)] pub struct FilterForwardingResults { pub(crate) total_forwardable_packets: usize, - pub(crate) total_tracer_packets_in_buffer: usize, - pub(crate) total_forwardable_tracer_packets: usize, pub(crate) total_dropped_packets: usize, pub(crate) total_packet_conversion_us: u64, pub(crate) total_filter_packets_us: u64, @@ -686,7 +683,6 @@ impl BankingStage { unprocessed_transaction_storage: &mut UnprocessedTransactionStorage, banking_stage_stats: &BankingStageStats, slot_metrics_tracker: &mut LeaderSlotMetricsTracker, - tracer_packet_stats: &mut TracerPacketStats, ) { if unprocessed_transaction_storage.should_not_process() { return; @@ -722,7 +718,6 @@ impl BankingStage { false, slot_metrics_tracker, banking_stage_stats, - tracer_packet_stats, )); slot_metrics_tracker.increment_forward_us(forward_us); // Take metrics action after forwarding packets to include forwarded @@ -735,7 +730,6 @@ impl BankingStage { true, slot_metrics_tracker, banking_stage_stats, - tracer_packet_stats, )); slot_metrics_tracker.increment_forward_and_hold_us(forward_and_hold_us); // Take metrics action after forwarding packets @@ -754,7 +748,6 @@ impl BankingStage { mut unprocessed_transaction_storage: UnprocessedTransactionStorage, ) { let mut banking_stage_stats = BankingStageStats::new(id); - let mut tracer_packet_stats = TracerPacketStats::new(id); let mut slot_metrics_tracker = LeaderSlotMetricsTracker::new(id); let mut last_metrics_update = Instant::now(); @@ -770,19 +763,15 @@ impl BankingStage { &mut unprocessed_transaction_storage, &banking_stage_stats, &mut slot_metrics_tracker, - &mut tracer_packet_stats, )); slot_metrics_tracker .increment_process_buffered_packets_us(process_buffered_packets_us); last_metrics_update = Instant::now(); } - tracer_packet_stats.report(1000); - match packet_receiver.receive_and_buffer_packets( &mut unprocessed_transaction_storage, &mut banking_stage_stats, - &mut tracer_packet_stats, &mut slot_metrics_tracker, ) { Ok(()) | Err(RecvTimeoutError::Timeout) => (), @@ -1073,7 +1062,7 @@ mod tests { .collect(); let packet_batches = convert_from_old_verified(packet_batches); non_vote_sender // no_ver, anf, tx - .send(BankingPacketBatch::new((packet_batches, None))) + .send(BankingPacketBatch::new(packet_batches)) .unwrap(); drop(non_vote_sender); @@ -1152,7 +1141,7 @@ mod tests { .collect(); let packet_batches = convert_from_old_verified(packet_batches); non_vote_sender - .send(BankingPacketBatch::new((packet_batches, None))) + .send(BankingPacketBatch::new(packet_batches)) .unwrap(); // Process a second batch that uses the same from account, so conflicts with above TX @@ -1165,7 +1154,7 @@ mod tests { .collect(); let packet_batches = convert_from_old_verified(packet_batches); non_vote_sender - .send(BankingPacketBatch::new((packet_batches, None))) + .send(BankingPacketBatch::new(packet_batches)) .unwrap(); let (tpu_vote_sender, tpu_vote_receiver) = banking_tracer.create_channel_tpu_vote(); @@ -1472,7 +1461,7 @@ mod tests { Builder::new() .spawn(move || { sender - .send(BankingPacketBatch::new((packet_batches, None))) + .send(BankingPacketBatch::new(packet_batches)) .unwrap() }) .unwrap() diff --git a/core/src/banking_stage/forwarder.rs b/core/src/banking_stage/forwarder.rs index de4a5d913b6f25..0a3cfd35b2a9e5 100644 --- a/core/src/banking_stage/forwarder.rs +++ b/core/src/banking_stage/forwarder.rs @@ -10,7 +10,6 @@ use { immutable_deserialized_packet::ImmutableDeserializedPacket, LikeClusterInfo, }, next_leader::{next_leader, next_leader_tpu_vote}, - tracer_packet_stats::TracerPacketStats, }, solana_client::connection_cache::ConnectionCache, solana_connection_cache::client_connection::ClientConnection as TpuConnection, @@ -96,7 +95,6 @@ impl Forwarder { hold: bool, slot_metrics_tracker: &mut LeaderSlotMetricsTracker, banking_stage_stats: &BankingStageStats, - tracer_packet_stats: &mut TracerPacketStats, ) { let forward_option = unprocessed_transaction_storage.forward_option(); @@ -139,19 +137,13 @@ impl Forwarder { slot_metrics_tracker.increment_forwardable_batches_count(1); let batched_forwardable_packets_count = forward_batch.len(); - let (_forward_result, successful_forwarded_packets_count, leader_pubkey) = self + let (_forward_result, successful_forwarded_packets_count, _leader_pubkey) = self .forward_buffered_packets( &forward_option, forward_batch.get_forwardable_packets(), banking_stage_stats, ); - if let Some(leader_pubkey) = leader_pubkey { - tracer_packet_stats.increment_total_forwardable_tracer_packets( - filter_forwarding_result.total_forwardable_tracer_packets, - leader_pubkey, - ); - } let failed_forwarded_packets_count = batched_forwardable_packets_count .saturating_sub(successful_forwarded_packets_count); @@ -174,9 +166,6 @@ impl Forwarder { slot_metrics_tracker.increment_cleared_from_buffer_after_forward_count( filter_forwarding_result.total_forwardable_packets as u64, ); - tracer_packet_stats.increment_total_cleared_from_buffer_after_forward( - filter_forwarding_result.total_tracer_packets_in_buffer, - ); unprocessed_transaction_storage.clear_forwarded_packets(); } } @@ -485,7 +474,6 @@ mod tests { true, &mut LeaderSlotMetricsTracker::new(0), &stats, - &mut TracerPacketStats::new(0), ); let recv_socket = &local_node.sockets.tpu_forwards_quic[0]; @@ -584,7 +572,6 @@ mod tests { hold, &mut LeaderSlotMetricsTracker::new(0), &stats, - &mut TracerPacketStats::new(0), ); let recv_socket = &local_node.sockets.tpu_forwards_quic[0]; diff --git a/core/src/banking_stage/packet_deserializer.rs b/core/src/banking_stage/packet_deserializer.rs index 78fab3718252f4..33f41cf377cc48 100644 --- a/core/src/banking_stage/packet_deserializer.rs +++ b/core/src/banking_stage/packet_deserializer.rs @@ -5,10 +5,7 @@ use { immutable_deserialized_packet::{DeserializedPacketError, ImmutableDeserializedPacket}, packet_filter::PacketFilterFailure, }, - crate::{ - banking_trace::{BankingPacketBatch, BankingPacketReceiver}, - sigverify::SigverifyTracerPacketStats, - }, + crate::banking_trace::{BankingPacketBatch, BankingPacketReceiver}, crossbeam_channel::RecvTimeoutError, solana_perf::packet::PacketBatch, solana_sdk::saturating_add_assign, @@ -19,8 +16,6 @@ use { pub struct ReceivePacketResults { /// Deserialized packets from all received packet batches pub deserialized_packets: Vec, - /// Aggregate tracer stats for all received packet batches - pub new_tracer_stats_option: Option, /// Counts of packets received and errors recorded during deserialization /// and filtering pub packet_stats: PacketReceiverStats, @@ -112,10 +107,9 @@ impl PacketDeserializer { ) -> ReceivePacketResults { let mut packet_stats = PacketReceiverStats::default(); let mut deserialized_packets = Vec::with_capacity(packet_count); - let mut aggregated_tracer_packet_stats_option = None::; for banking_batch in banking_batches { - for packet_batch in &banking_batch.0 { + for packet_batch in banking_batch.iter() { let packet_indexes = Self::generate_packet_indexes(packet_batch); saturating_add_assign!( @@ -134,23 +128,10 @@ impl PacketDeserializer { &packet_filter, )); } - - if let Some(tracer_packet_stats) = &banking_batch.1 { - if let Some(aggregated_tracer_packet_stats) = - &mut aggregated_tracer_packet_stats_option - { - aggregated_tracer_packet_stats.aggregate(tracer_packet_stats); - } else { - // BankingPacketBatch is owned by Arc; so we have to clone its internal field - // (SigverifyTracerPacketStats). - aggregated_tracer_packet_stats_option = Some(tracer_packet_stats.clone()); - } - } } ReceivePacketResults { deserialized_packets, - new_tracer_stats_option: aggregated_tracer_packet_stats_option, packet_stats, } } @@ -163,22 +144,20 @@ impl PacketDeserializer { ) -> Result<(usize, Vec), RecvTimeoutError> { let start = Instant::now(); - let message = self.packet_batch_receiver.recv_timeout(recv_timeout)?; - let packet_batches = &message.0; + let packet_batches = self.packet_batch_receiver.recv_timeout(recv_timeout)?; let mut num_packets_received = packet_batches .iter() .map(|batch| batch.len()) .sum::(); - let mut messages = vec![message]; + let mut messages = vec![packet_batches]; - while let Ok(message) = self.packet_batch_receiver.try_recv() { - let packet_batches = &message.0; + while let Ok(packet_batches) = self.packet_batch_receiver.try_recv() { trace!("got more packet batches in packet deserializer"); num_packets_received += packet_batches .iter() .map(|batch| batch.len()) .sum::(); - messages.push(message); + messages.push(packet_batches); if start.elapsed() >= recv_timeout || num_packets_received >= packet_count_upperbound { break; @@ -240,7 +219,6 @@ mod tests { fn test_deserialize_and_collect_packets_empty() { let results = PacketDeserializer::deserialize_and_collect_packets(0, &[], Ok); assert_eq!(results.deserialized_packets.len(), 0); - assert!(results.new_tracer_stats_option.is_none()); assert_eq!(results.packet_stats.passed_sigverify_count, 0); assert_eq!(results.packet_stats.failed_sigverify_count, 0); } @@ -254,11 +232,10 @@ mod tests { let packet_count: usize = packet_batches.iter().map(|x| x.len()).sum(); let results = PacketDeserializer::deserialize_and_collect_packets( packet_count, - &[BankingPacketBatch::new((packet_batches, None))], + &[BankingPacketBatch::new(packet_batches)], Ok, ); assert_eq!(results.deserialized_packets.len(), 2); - assert!(results.new_tracer_stats_option.is_none()); assert_eq!(results.packet_stats.passed_sigverify_count, 2); assert_eq!(results.packet_stats.failed_sigverify_count, 0); } @@ -273,11 +250,10 @@ mod tests { let packet_count: usize = packet_batches.iter().map(|x| x.len()).sum(); let results = PacketDeserializer::deserialize_and_collect_packets( packet_count, - &[BankingPacketBatch::new((packet_batches, None))], + &[BankingPacketBatch::new(packet_batches)], Ok, ); assert_eq!(results.deserialized_packets.len(), 1); - assert!(results.new_tracer_stats_option.is_none()); assert_eq!(results.packet_stats.passed_sigverify_count, 1); assert_eq!(results.packet_stats.failed_sigverify_count, 1); } diff --git a/core/src/banking_stage/packet_receiver.rs b/core/src/banking_stage/packet_receiver.rs index 6b77d103c69670..e95b20c3df4f1e 100644 --- a/core/src/banking_stage/packet_receiver.rs +++ b/core/src/banking_stage/packet_receiver.rs @@ -6,7 +6,7 @@ use { unprocessed_transaction_storage::UnprocessedTransactionStorage, BankingStageStats, }, - crate::{banking_trace::BankingPacketReceiver, tracer_packet_stats::TracerPacketStats}, + crate::banking_trace::BankingPacketReceiver, crossbeam_channel::RecvTimeoutError, solana_measure::{measure::Measure, measure_us}, solana_sdk::{saturating_add_assign, timing::timestamp}, @@ -31,7 +31,6 @@ impl PacketReceiver { &mut self, unprocessed_transaction_storage: &mut UnprocessedTransactionStorage, banking_stage_stats: &mut BankingStageStats, - tracer_packet_stats: &mut TracerPacketStats, slot_metrics_tracker: &mut LeaderSlotMetricsTracker, ) -> Result<(), RecvTimeoutError> { let (result, recv_time_us) = measure_us!({ @@ -53,7 +52,6 @@ impl PacketReceiver { receive_packet_results, unprocessed_transaction_storage, banking_stage_stats, - tracer_packet_stats, slot_metrics_tracker, ); recv_and_buffer_measure.stop(); @@ -93,21 +91,16 @@ impl PacketReceiver { &self, ReceivePacketResults { deserialized_packets, - new_tracer_stats_option, packet_stats, }: ReceivePacketResults, unprocessed_transaction_storage: &mut UnprocessedTransactionStorage, banking_stage_stats: &mut BankingStageStats, - tracer_packet_stats: &mut TracerPacketStats, slot_metrics_tracker: &mut LeaderSlotMetricsTracker, ) { let packet_count = deserialized_packets.len(); debug!("@{:?} txs: {} id: {}", timestamp(), packet_count, self.id); slot_metrics_tracker.increment_received_packet_counts(packet_stats); - if let Some(new_sigverify_stats) = &new_tracer_stats_option { - tracer_packet_stats.aggregate_sigverify_tracer_packet_stats(new_sigverify_stats); - } let mut dropped_packets_count = 0; let mut newly_buffered_packets_count = 0; @@ -120,7 +113,6 @@ impl PacketReceiver { &mut newly_buffered_forwarded_packets_count, banking_stage_stats, slot_metrics_tracker, - tracer_packet_stats, ); banking_stage_stats @@ -145,7 +137,6 @@ impl PacketReceiver { newly_buffered_forwarded_packets_count: &mut usize, banking_stage_stats: &mut BankingStageStats, slot_metrics_tracker: &mut LeaderSlotMetricsTracker, - tracer_packet_stats: &mut TracerPacketStats, ) { if !deserialized_packets.is_empty() { let _ = banking_stage_stats @@ -168,9 +159,6 @@ impl PacketReceiver { *dropped_packets_count, insert_packet_batches_summary.total_dropped_packets() ); - tracer_packet_stats.increment_total_exceeded_banking_stage_buffer( - insert_packet_batches_summary.dropped_tracer_packets(), - ); } } } diff --git a/core/src/banking_stage/transaction_scheduler/scheduler_controller.rs b/core/src/banking_stage/transaction_scheduler/scheduler_controller.rs index 14a175b2018260..17012db79257a2 100644 --- a/core/src/banking_stage/transaction_scheduler/scheduler_controller.rs +++ b/core/src/banking_stage/transaction_scheduler/scheduler_controller.rs @@ -444,7 +444,6 @@ mod tests { transaction_scheduler::receive_and_buffer::SanitizedTransactionReceiveAndBuffer, }, banking_trace::BankingPacketBatch, - sigverify::SigverifyTracerPacketStats, }, crossbeam_channel::{unbounded, Receiver, Sender}, itertools::Itertools, @@ -486,7 +485,7 @@ mod tests { _entry_receiver: Receiver, _record_receiver: Receiver, poh_recorder: Arc>, - banking_packet_sender: Sender, Option)>>, + banking_packet_sender: Sender>>, consume_work_receivers: Vec>>>, @@ -589,8 +588,7 @@ mod tests { } fn to_banking_packet_batch(txs: &[Transaction]) -> BankingPacketBatch { - let packet_batch = to_packet_batches(txs, NUM_PACKETS); - Arc::new((packet_batch, None)) + BankingPacketBatch::new(to_packet_batches(txs, NUM_PACKETS)) } // Helper function to let test receive and then schedule packets. diff --git a/core/src/banking_stage/unprocessed_packet_batches.rs b/core/src/banking_stage/unprocessed_packet_batches.rs index 3c4e0f66664dd2..493025e9cd635e 100644 --- a/core/src/banking_stage/unprocessed_packet_batches.rs +++ b/core/src/banking_stage/unprocessed_packet_batches.rs @@ -57,7 +57,6 @@ impl Ord for DeserializedPacket { #[derive(Debug)] pub struct PacketBatchInsertionMetrics { pub(crate) num_dropped_packets: usize, - pub(crate) num_dropped_tracer_packets: usize, } /// Currently each banking_stage thread has a `UnprocessedPacketBatches` buffer to store @@ -103,23 +102,13 @@ impl UnprocessedPacketBatches { deserialized_packets: impl Iterator, ) -> PacketBatchInsertionMetrics { let mut num_dropped_packets = 0; - let mut num_dropped_tracer_packets = 0; for deserialized_packet in deserialized_packets { - if let Some(dropped_packet) = self.push(deserialized_packet) { + if let Some(_dropped_packet) = self.push(deserialized_packet) { num_dropped_packets += 1; - if dropped_packet - .immutable_section() - .original_packet() - .meta() - .is_tracer_packet() - { - num_dropped_tracer_packets += 1; - } } } PacketBatchInsertionMetrics { num_dropped_packets, - num_dropped_tracer_packets, } } diff --git a/core/src/banking_stage/unprocessed_transaction_storage.rs b/core/src/banking_stage/unprocessed_transaction_storage.rs index e86780002ea694..e28501c065300f 100644 --- a/core/src/banking_stage/unprocessed_transaction_storage.rs +++ b/core/src/banking_stage/unprocessed_transaction_storage.rs @@ -93,13 +93,6 @@ impl InsertPacketBatchSummary { _ => 0, } } - - pub fn dropped_tracer_packets(&self) -> usize { - match self { - Self::PacketBatchInsertionMetrics(metrics) => metrics.num_dropped_tracer_packets, - _ => 0, - } - } } impl From for InsertPacketBatchSummary { @@ -638,8 +631,6 @@ impl ThreadLocalUnprocessedPackets { bank: Arc, forward_buffer: &mut ForwardPacketBatchesByAccounts, ) -> FilterForwardingResults { - let mut total_forwardable_tracer_packets: usize = 0; - let mut total_tracer_packets_in_buffer: usize = 0; let mut total_forwardable_packets: usize = 0; let mut total_packet_conversion_us: u64 = 0; let mut total_filter_packets_us: u64 = 0; @@ -660,11 +651,8 @@ impl ThreadLocalUnprocessedPackets { .into_iter() .flat_map(|packets_to_process| { // Only process packets not yet forwarded - let (forwarded_packets, packets_to_forward, is_tracer_packet) = self - .prepare_packets_to_forward( - packets_to_process, - &mut total_tracer_packets_in_buffer, - ); + let (forwarded_packets, packets_to_forward) = + self.prepare_packets_to_forward(packets_to_process); [ forwarded_packets, @@ -689,15 +677,10 @@ impl ThreadLocalUnprocessedPackets { &mut total_dropped_packets )); saturating_add_assign!(total_filter_packets_us, filter_packets_us); - - for forwardable_transaction_index in &forwardable_transaction_indexes { - saturating_add_assign!(total_forwardable_packets, 1); - let forwardable_packet_index = - transaction_to_packet_indexes[*forwardable_transaction_index]; - if is_tracer_packet[forwardable_packet_index] { - saturating_add_assign!(total_forwardable_tracer_packets, 1); - } - } + saturating_add_assign!( + total_forwardable_packets, + forwardable_transaction_indexes.len() + ); let accepted_packet_indexes = Self::add_filtered_packets_to_forward_buffer( @@ -750,8 +733,6 @@ impl ThreadLocalUnprocessedPackets { FilterForwardingResults { total_forwardable_packets, - total_tracer_packets_in_buffer, - total_forwardable_tracer_packets, total_dropped_packets, total_packet_conversion_us, total_filter_packets_us, @@ -982,36 +963,27 @@ impl ThreadLocalUnprocessedPackets { fn prepare_packets_to_forward( &self, packets_to_forward: impl Iterator>, - total_tracer_packets_in_buffer: &mut usize, ) -> ( Vec>, Vec>, - Vec, ) { let mut forwarded_packets: Vec> = vec![]; - let (forwardable_packets, is_tracer_packet) = packets_to_forward + let forwardable_packets = packets_to_forward .into_iter() .filter_map(|immutable_deserialized_packet| { - let is_tracer_packet = immutable_deserialized_packet - .original_packet() - .meta() - .is_tracer_packet(); - if is_tracer_packet { - saturating_add_assign!(*total_tracer_packets_in_buffer, 1); - } if !self .unprocessed_packet_batches .is_forwarded(&immutable_deserialized_packet) { - Some((immutable_deserialized_packet, is_tracer_packet)) + Some(immutable_deserialized_packet) } else { forwarded_packets.push(immutable_deserialized_packet); None } }) - .unzip(); + .collect(); - (forwarded_packets, forwardable_packets, is_tracer_packet) + (forwarded_packets, forwardable_packets) } } @@ -1116,7 +1088,6 @@ mod tests { .map(|(packets_id, transaction)| { let mut p = Packet::from_data(None, transaction).unwrap(); p.meta_mut().port = packets_id as u16; - p.meta_mut().set_tracer(true); DeserializedPacket::new(p).unwrap() }) .collect_vec(); @@ -1134,16 +1105,12 @@ mod tests { let FilterForwardingResults { total_forwardable_packets, - total_tracer_packets_in_buffer, - total_forwardable_tracer_packets, .. } = transaction_storage.filter_forwardable_packets_and_add_batches( current_bank.clone(), &mut forward_packet_batches_by_accounts, ); assert_eq!(total_forwardable_packets, 256); - assert_eq!(total_tracer_packets_in_buffer, 256); - assert_eq!(total_forwardable_tracer_packets, 256); // packets in a batch are forwarded in arbitrary order; verify the ports match after // sorting @@ -1172,8 +1139,6 @@ mod tests { ForwardPacketBatchesByAccounts::new_with_default_batch_limits(); let FilterForwardingResults { total_forwardable_packets, - total_tracer_packets_in_buffer, - total_forwardable_tracer_packets, .. } = transaction_storage.filter_forwardable_packets_and_add_batches( current_bank.clone(), @@ -1183,11 +1148,6 @@ mod tests { total_forwardable_packets, packets.len() - num_already_forwarded ); - assert_eq!(total_tracer_packets_in_buffer, packets.len()); - assert_eq!( - total_forwardable_tracer_packets, - packets.len() - num_already_forwarded - ); } // some packets are invalid (already processed) @@ -1206,8 +1166,6 @@ mod tests { ForwardPacketBatchesByAccounts::new_with_default_batch_limits(); let FilterForwardingResults { total_forwardable_packets, - total_tracer_packets_in_buffer, - total_forwardable_tracer_packets, .. } = transaction_storage.filter_forwardable_packets_and_add_batches( current_bank, @@ -1217,11 +1175,6 @@ mod tests { total_forwardable_packets, packets.len() - num_already_processed ); - assert_eq!(total_tracer_packets_in_buffer, packets.len()); - assert_eq!( - total_forwardable_tracer_packets, - packets.len() - num_already_processed - ); } } @@ -1383,17 +1336,14 @@ mod tests { .map(|(packets_id, transaction)| { let mut p = Packet::from_data(None, transaction).unwrap(); p.meta_mut().port = packets_id as u16; - p.meta_mut().set_tracer(true); DeserializedPacket::new(p).unwrap() }) .collect_vec(); // test preparing buffered packets for forwarding let test_prepareing_buffered_packets_for_forwarding = - |buffered_packet_batches: UnprocessedPacketBatches| -> (usize, usize, usize) { - let mut total_tracer_packets_in_buffer: usize = 0; + |buffered_packet_batches: UnprocessedPacketBatches| -> usize { let mut total_packets_to_forward: usize = 0; - let mut total_tracer_packets_to_forward: usize = 0; let mut unprocessed_transactions = ThreadLocalUnprocessedPackets { unprocessed_packet_batches: buffered_packet_batches, @@ -1406,35 +1356,21 @@ mod tests { .chunks(128usize) .into_iter() .flat_map(|packets_to_process| { - let (_, packets_to_forward, is_tracer_packet) = unprocessed_transactions - .prepare_packets_to_forward( - packets_to_process, - &mut total_tracer_packets_in_buffer, - ); + let (_, packets_to_forward) = + unprocessed_transactions.prepare_packets_to_forward(packets_to_process); total_packets_to_forward += packets_to_forward.len(); - total_tracer_packets_to_forward += is_tracer_packet.len(); packets_to_forward }) .collect::>>(); - ( - total_tracer_packets_in_buffer, - total_packets_to_forward, - total_tracer_packets_to_forward, - ) + total_packets_to_forward }; - // all tracer packets are forwardable { let buffered_packet_batches: UnprocessedPacketBatches = UnprocessedPacketBatches::from_iter(packets.clone(), packets.len()); - let ( - total_tracer_packets_in_buffer, - total_packets_to_forward, - total_tracer_packets_to_forward, - ) = test_prepareing_buffered_packets_for_forwarding(buffered_packet_batches); - assert_eq!(total_tracer_packets_in_buffer, 256); + let total_packets_to_forward = + test_prepareing_buffered_packets_for_forwarding(buffered_packet_batches); assert_eq!(total_packets_to_forward, 256); - assert_eq!(total_tracer_packets_to_forward, 256); } // some packets are forwarded @@ -1445,14 +1381,9 @@ mod tests { } let buffered_packet_batches: UnprocessedPacketBatches = UnprocessedPacketBatches::from_iter(packets.clone(), packets.len()); - let ( - total_tracer_packets_in_buffer, - total_packets_to_forward, - total_tracer_packets_to_forward, - ) = test_prepareing_buffered_packets_for_forwarding(buffered_packet_batches); - assert_eq!(total_tracer_packets_in_buffer, 256); + let total_packets_to_forward = + test_prepareing_buffered_packets_for_forwarding(buffered_packet_batches); assert_eq!(total_packets_to_forward, 256 - num_already_forwarded); - assert_eq!(total_tracer_packets_to_forward, 256 - num_already_forwarded); } // all packets are forwarded @@ -1462,14 +1393,9 @@ mod tests { } let buffered_packet_batches: UnprocessedPacketBatches = UnprocessedPacketBatches::from_iter(packets.clone(), packets.len()); - let ( - total_tracer_packets_in_buffer, - total_packets_to_forward, - total_tracer_packets_to_forward, - ) = test_prepareing_buffered_packets_for_forwarding(buffered_packet_batches); - assert_eq!(total_tracer_packets_in_buffer, 256); + let total_packets_to_forward = + test_prepareing_buffered_packets_for_forwarding(buffered_packet_batches); assert_eq!(total_packets_to_forward, 0); - assert_eq!(total_tracer_packets_to_forward, 0); } } } diff --git a/core/src/banking_trace.rs b/core/src/banking_trace.rs index f9d560f5d17087..4591cf0dcdf7e7 100644 --- a/core/src/banking_trace.rs +++ b/core/src/banking_trace.rs @@ -1,5 +1,4 @@ use { - crate::sigverify::SigverifyTracerPacketStats, bincode::serialize_into, chrono::{DateTime, Local}, crossbeam_channel::{unbounded, Receiver, SendError, Sender, TryRecvError}, @@ -20,7 +19,7 @@ use { thiserror::Error, }; -pub type BankingPacketBatch = Arc<(Vec, Option)>; +pub type BankingPacketBatch = Arc>; pub type BankingPacketSender = TracedSender; pub type BankingPacketReceiver = Receiver; pub type TracerThreadResult = Result<(), TraceError>; @@ -373,7 +372,7 @@ pub mod for_test { }; pub fn sample_packet_batch() -> BankingPacketBatch { - BankingPacketBatch::new((to_packet_batches(&vec![test_tx(); 4], 10), None)) + BankingPacketBatch::new(to_packet_batches(&vec![test_tx(); 4], 10)) } pub fn drop_and_clean_temp_dir_unless_suppressed(temp_dir: TempDir) { @@ -430,7 +429,7 @@ mod tests { }); non_vote_sender - .send(BankingPacketBatch::new((vec![], None))) + .send(BankingPacketBatch::new(vec![])) .unwrap(); for_test::terminate_tracer(tracer, None, dummy_main_thread, non_vote_sender, None); } diff --git a/core/src/cluster_info_vote_listener.rs b/core/src/cluster_info_vote_listener.rs index 56869624812940..87ac964c3978a9 100644 --- a/core/src/cluster_info_vote_listener.rs +++ b/core/src/cluster_info_vote_listener.rs @@ -265,7 +265,7 @@ impl ClusterInfoVoteListener { if !votes.is_empty() { let (vote_txs, packets) = Self::verify_votes(votes, root_bank_cache); verified_vote_transactions_sender.send(vote_txs)?; - verified_packets_sender.send(BankingPacketBatch::new((packets, None)))?; + verified_packets_sender.send(BankingPacketBatch::new(packets))?; } sleep(Duration::from_millis(GOSSIP_SLEEP_MILLIS)); } diff --git a/core/src/lib.rs b/core/src/lib.rs index a7639993871fcb..c40eebdc09c4db 100644 --- a/core/src/lib.rs +++ b/core/src/lib.rs @@ -39,7 +39,6 @@ pub mod stats_reporter_service; pub mod system_monitor_service; pub mod tpu; mod tpu_entry_notifier; -pub mod tracer_packet_stats; pub mod tvu; pub mod unfrozen_gossip_verified_vote_hashes; pub mod validator; diff --git a/core/src/sigverify.rs b/core/src/sigverify.rs index 18984ecc4ef836..61da8cf9ef70dd 100644 --- a/core/src/sigverify.rs +++ b/core/src/sigverify.rs @@ -13,51 +13,10 @@ use { sigverify_stage::{SigVerifier, SigVerifyServiceError}, }, solana_perf::{cuda_runtime::PinnedVec, packet::PacketBatch, recycler::Recycler, sigverify}, - solana_sdk::{packet::Packet, saturating_add_assign}, }; -#[cfg_attr(feature = "frozen-abi", derive(AbiExample))] -#[derive(Debug, Default, Clone, PartialEq, Eq, Serialize, Deserialize)] -pub struct SigverifyTracerPacketStats { - pub total_removed_before_sigverify_stage: usize, - pub total_tracer_packets_received_in_sigverify_stage: usize, - pub total_tracer_packets_deduped: usize, - pub total_excess_tracer_packets: usize, - pub total_tracker_packets_passed_sigverify: usize, -} - -impl SigverifyTracerPacketStats { - pub fn is_default(&self) -> bool { - *self == SigverifyTracerPacketStats::default() - } - - pub fn aggregate(&mut self, other: &SigverifyTracerPacketStats) { - saturating_add_assign!( - self.total_removed_before_sigverify_stage, - other.total_removed_before_sigverify_stage - ); - saturating_add_assign!( - self.total_tracer_packets_received_in_sigverify_stage, - other.total_tracer_packets_received_in_sigverify_stage - ); - saturating_add_assign!( - self.total_tracer_packets_deduped, - other.total_tracer_packets_deduped - ); - saturating_add_assign!( - self.total_excess_tracer_packets, - other.total_excess_tracer_packets - ); - saturating_add_assign!( - self.total_tracker_packets_passed_sigverify, - other.total_tracker_packets_passed_sigverify - ); - } -} - pub struct TransactionSigVerifier { packet_sender: BankingPacketSender, - tracer_packet_stats: SigverifyTracerPacketStats, recycler: Recycler, recycler_out: Recycler>, reject_non_vote: bool, @@ -74,7 +33,6 @@ impl TransactionSigVerifier { init(); Self { packet_sender, - tracer_packet_stats: SigverifyTracerPacketStats::default(), recycler: Recycler::warmed(50, 4096), recycler_out: Recycler::warmed(50, 4096), reject_non_vote: false, @@ -85,52 +43,12 @@ impl TransactionSigVerifier { impl SigVerifier for TransactionSigVerifier { type SendType = BankingPacketBatch; - #[inline(always)] - fn process_received_packet( - &mut self, - packet: &mut Packet, - removed_before_sigverify_stage: bool, - is_dup: bool, - ) { - sigverify::check_for_tracer_packet(packet); - if packet.meta().is_tracer_packet() { - if removed_before_sigverify_stage { - self.tracer_packet_stats - .total_removed_before_sigverify_stage += 1; - } else { - self.tracer_packet_stats - .total_tracer_packets_received_in_sigverify_stage += 1; - if is_dup { - self.tracer_packet_stats.total_tracer_packets_deduped += 1; - } - } - } - } - - #[inline(always)] - fn process_excess_packet(&mut self, packet: &Packet) { - if packet.meta().is_tracer_packet() { - self.tracer_packet_stats.total_excess_tracer_packets += 1; - } - } - - #[inline(always)] - fn process_passed_sigverify_packet(&mut self, packet: &Packet) { - if packet.meta().is_tracer_packet() { - self.tracer_packet_stats - .total_tracker_packets_passed_sigverify += 1; - } - } - fn send_packets( &mut self, packet_batches: Vec, ) -> Result<(), SigVerifyServiceError> { - let tracer_packet_stats_to_send = std::mem::take(&mut self.tracer_packet_stats); - self.packet_sender.send(BankingPacketBatch::new(( - packet_batches, - Some(tracer_packet_stats_to_send), - )))?; + self.packet_sender + .send(BankingPacketBatch::new(packet_batches))?; Ok(()) } diff --git a/core/src/sigverify_stage.rs b/core/src/sigverify_stage.rs index ac7d9889db0ed8..a59df5ae36fea3 100644 --- a/core/src/sigverify_stage.rs +++ b/core/src/sigverify_stage.rs @@ -13,7 +13,7 @@ use { solana_measure::measure::Measure, solana_perf::{ deduper::{self, Deduper}, - packet::{Packet, PacketBatch}, + packet::PacketBatch, sigverify::{ count_discarded_packets, count_packets_in_batches, count_valid_packets, shrink_batches, }, @@ -57,15 +57,6 @@ pub struct SigVerifyStage { pub trait SigVerifier { type SendType: std::fmt::Debug; fn verify_batches(&self, batches: Vec, valid_packets: usize) -> Vec; - fn process_received_packet( - &mut self, - _packet: &mut Packet, - _removed_before_sigverify_stage: bool, - _is_dup: bool, - ) { - } - fn process_excess_packet(&mut self, _packet: &Packet) {} - fn process_passed_sigverify_packet(&mut self, _packet: &Packet) {} fn send_packets(&mut self, packet_batches: Vec) -> Result<(), Self::SendType>; } @@ -251,11 +242,7 @@ impl SigVerifyStage { Self { thread_hdl } } - pub fn discard_excess_packets( - batches: &mut [PacketBatch], - mut max_packets: usize, - mut process_excess_packet: impl FnMut(&Packet), - ) { + pub fn discard_excess_packets(batches: &mut [PacketBatch], mut max_packets: usize) { // Group packets by their incoming IP address. let mut addrs = batches .iter_mut() @@ -276,7 +263,6 @@ impl SigVerifyStage { } // Discard excess packets from each address. for packet in addrs.into_values().flatten() { - process_excess_packet(packet); packet.meta_mut().set_discard(true); } } @@ -322,30 +308,15 @@ impl SigVerifyStage { discard_random_time.stop(); let mut dedup_time = Measure::start("sigverify_dedup_time"); - let discard_or_dedup_fail = deduper::dedup_packets_and_count_discards( - deduper, - &mut batches, - #[inline(always)] - |received_packet, removed_before_sigverify_stage, is_dup| { - verifier.process_received_packet( - received_packet, - removed_before_sigverify_stage, - is_dup, - ); - }, - ) as usize; + let discard_or_dedup_fail = + deduper::dedup_packets_and_count_discards(deduper, &mut batches) as usize; dedup_time.stop(); let num_unique = non_discarded_packets.saturating_sub(discard_or_dedup_fail); let mut discard_time = Measure::start("sigverify_discard_time"); let mut num_packets_to_verify = num_unique; if num_unique > MAX_SIGVERIFY_BATCH { - Self::discard_excess_packets( - &mut batches, - MAX_SIGVERIFY_BATCH, - #[inline(always)] - |excess_packet| verifier.process_excess_packet(excess_packet), - ); + Self::discard_excess_packets(&mut batches, MAX_SIGVERIFY_BATCH); num_packets_to_verify = MAX_SIGVERIFY_BATCH; } let excess_fail = num_unique.saturating_sub(MAX_SIGVERIFY_BATCH); @@ -356,11 +327,7 @@ impl SigVerifyStage { let mut verify_time = Measure::start("sigverify_batch_time"); let mut batches = verifier.verify_batches(batches, num_packets_to_verify); - let num_valid_packets = count_valid_packets( - &batches, - #[inline(always)] - |valid_packet| verifier.process_passed_sigverify_packet(valid_packet), - ); + let num_valid_packets = count_valid_packets(&batches); verify_time.stop(); // Post-shrink packet batches if many packets are discarded from sigverify @@ -472,7 +439,6 @@ mod tests { packet::{to_packet_batches, Packet}, test_tx::test_tx, }, - solana_sdk::packet::PacketFlags, }; fn count_non_discard(packet_batches: &[PacketBatch]) -> usize { @@ -488,31 +454,15 @@ mod tests { solana_logger::setup(); let batch_size = 10; let mut batch = PacketBatch::with_capacity(batch_size); - let mut tracer_packet = Packet::default(); - tracer_packet.meta_mut().flags |= PacketFlags::TRACER_PACKET; - batch.resize(batch_size, tracer_packet); + let packet = Packet::default(); + batch.resize(batch_size, packet); batch[3].meta_mut().addr = std::net::IpAddr::from([1u16; 8]); batch[3].meta_mut().set_discard(true); - let num_discarded_before_filter = 1; batch[4].meta_mut().addr = std::net::IpAddr::from([2u16; 8]); - let total_num_packets = batch.len(); let mut batches = vec![batch]; let max = 3; - let mut total_tracer_packets_discarded = 0; - SigVerifyStage::discard_excess_packets(&mut batches, max, |packet| { - if packet.meta().is_tracer_packet() { - total_tracer_packets_discarded += 1; - } - }); + SigVerifyStage::discard_excess_packets(&mut batches, max); let total_non_discard = count_non_discard(&batches); - let total_discarded = total_num_packets - total_non_discard; - // Every packet except the packets already marked `discard` before the call - // to `discard_excess_packets()` should count towards the - // `total_tracer_packets_discarded` - assert_eq!( - total_tracer_packets_discarded, - total_discarded - num_discarded_before_filter - ); assert_eq!(total_non_discard, max); assert!(!batches[0][0].meta().discard()); assert!(batches[0][3].meta().discard()); @@ -565,69 +515,38 @@ mod tests { ); let mut sent_len = 0; - for mut batch in batches.into_iter() { + for batch in batches.into_iter() { sent_len += batch.len(); - batch - .iter_mut() - .for_each(|packet| packet.meta_mut().flags |= PacketFlags::TRACER_PACKET); assert_eq!(batch.len(), packets_per_batch); packet_s.send(batch).unwrap(); } - let mut received = 0; - let mut total_tracer_packets_received_in_sigverify_stage = 0; + let mut packet_s = Some(packet_s); + let mut valid_received = 0; trace!("sent: {}", sent_len); loop { - if let Ok(message) = verified_r.recv() { - let (verifieds, tracer_packet_stats) = (&message.0, message.1.as_ref().unwrap()); - total_tracer_packets_received_in_sigverify_stage += - tracer_packet_stats.total_tracer_packets_received_in_sigverify_stage; - assert_eq!( - tracer_packet_stats.total_tracer_packets_received_in_sigverify_stage - % packets_per_batch, - 0, - ); - - if use_same_tx { - // Every transaction other than the very first one in the very first batch - // should be deduped. - - // Also have to account for the fact that deduper could be cleared periodically, - // in which case the first transaction in the next batch won't be deduped - assert!( - (tracer_packet_stats.total_tracer_packets_deduped - == tracer_packet_stats - .total_tracer_packets_received_in_sigverify_stage - - 1) - || (tracer_packet_stats.total_tracer_packets_deduped - == tracer_packet_stats - .total_tracer_packets_received_in_sigverify_stage) - ); - assert!( - (tracer_packet_stats.total_tracker_packets_passed_sigverify == 1) - || (tracer_packet_stats.total_tracker_packets_passed_sigverify == 0) - ); - } else { - assert_eq!(tracer_packet_stats.total_tracer_packets_deduped, 0); - assert!( - (tracer_packet_stats.total_tracker_packets_passed_sigverify - == tracer_packet_stats - .total_tracer_packets_received_in_sigverify_stage) - ); - } - assert_eq!(tracer_packet_stats.total_excess_tracer_packets, 0); - received += verifieds.iter().map(|batch| batch.len()).sum::(); + if let Ok(verifieds) = verified_r.recv() { + valid_received += verifieds + .iter() + .map(|batch| batch.iter().filter(|p| !p.meta().discard()).count()) + .sum::(); + } else { + break; } - if total_tracer_packets_received_in_sigverify_stage >= sent_len { - break; + // Check if all the sent batches have been picked up by sigverify stage. + // Drop sender to exit the loop on next receive call, once the channel is + // drained. + if packet_s.as_ref().map(|s| s.is_empty()).unwrap_or(true) { + packet_s.take(); } } - trace!("received: {}", received); - assert_eq!( - total_tracer_packets_received_in_sigverify_stage, - total_packets - ); - drop(packet_s); + trace!("received: {}", valid_received); + + if use_same_tx { + assert_eq!(valid_received, 1); + } else { + assert_eq!(valid_received, total_packets); + } stage.join().unwrap(); } diff --git a/core/src/tracer_packet_stats.rs b/core/src/tracer_packet_stats.rs deleted file mode 100644 index 2269b35cc702fb..00000000000000 --- a/core/src/tracer_packet_stats.rs +++ /dev/null @@ -1,214 +0,0 @@ -use { - crate::sigverify::SigverifyTracerPacketStats, - solana_sdk::{pubkey::Pubkey, saturating_add_assign, timing::timestamp}, - std::collections::HashSet, -}; - -#[derive(Debug, Default)] -pub struct BankingStageTracerPacketStats { - total_exceeded_banking_stage_buffer: usize, - // This is the total number of tracer packets removed from the buffer - // after a leader's set of slots. Of these, only a subset that were in - // the buffer were actually forwardable (didn't arrive on forward port and haven't been - // forwarded before) - total_cleared_from_buffer_after_forward: usize, - total_forwardable_tracer_packets: usize, - forward_target_leaders: HashSet, -} - -#[derive(Debug, Default)] -pub struct ModifiableTracerPacketStats { - sigverify_tracer_packet_stats: SigverifyTracerPacketStats, - banking_stage_tracer_packet_stats: BankingStageTracerPacketStats, -} - -#[derive(Debug, Default)] -pub struct TracerPacketStats { - id: String, - last_report: u64, - modifiable_tracer_packet_stats: Option, -} - -impl TracerPacketStats { - pub fn new(id: u32) -> Self { - Self { - id: id.to_string(), - ..Self::default() - } - } - - fn reset(id: String) -> Self { - Self { - id, - ..Self::default() - } - } - - pub fn get_mutable_stats(&mut self) -> &mut ModifiableTracerPacketStats { - if self.modifiable_tracer_packet_stats.is_none() { - self.modifiable_tracer_packet_stats = Some(ModifiableTracerPacketStats::default()); - } - self.modifiable_tracer_packet_stats.as_mut().unwrap() - } - - pub fn aggregate_sigverify_tracer_packet_stats( - &mut self, - new_sigverify_stats: &SigverifyTracerPacketStats, - ) { - if !new_sigverify_stats.is_default() { - let stats = self.get_mutable_stats(); - stats - .sigverify_tracer_packet_stats - .aggregate(new_sigverify_stats); - } - } - - pub fn increment_total_exceeded_banking_stage_buffer( - &mut self, - total_exceeded_banking_stage_buffer: usize, - ) { - if total_exceeded_banking_stage_buffer != 0 { - let stats = self.get_mutable_stats(); - saturating_add_assign!( - stats - .banking_stage_tracer_packet_stats - .total_exceeded_banking_stage_buffer, - total_exceeded_banking_stage_buffer - ); - } - } - - pub fn increment_total_cleared_from_buffer_after_forward( - &mut self, - total_cleared_from_buffer_after_forward: usize, - ) { - if total_cleared_from_buffer_after_forward != 0 { - let stats = self.get_mutable_stats(); - saturating_add_assign!( - stats - .banking_stage_tracer_packet_stats - .total_cleared_from_buffer_after_forward, - total_cleared_from_buffer_after_forward - ); - } - } - - pub fn increment_total_forwardable_tracer_packets( - &mut self, - total_forwardable_tracer_packets: usize, - forward_target_leader: Pubkey, - ) { - if total_forwardable_tracer_packets != 0 { - let stats = self.get_mutable_stats(); - stats - .banking_stage_tracer_packet_stats - .forward_target_leaders - .insert(forward_target_leader); - saturating_add_assign!( - stats - .banking_stage_tracer_packet_stats - .total_forwardable_tracer_packets, - total_forwardable_tracer_packets - ); - } - } - - pub fn report(&mut self, report_interval_ms: u64) { - let now = timestamp(); - const LEADER_REPORT_LIMIT: usize = 4; - if now.saturating_sub(self.last_report) > report_interval_ms { - // We don't want to report unless we actually saw/forwarded a tracer packet - // to prevent noisy metrics - if let Some(modifiable_tracer_packet_stats) = self.modifiable_tracer_packet_stats.take() - { - datapoint_info!( - "tracer-packet-stats", - "id" => &self.id, - ( - "total_removed_before_sigverify", - modifiable_tracer_packet_stats - .sigverify_tracer_packet_stats - .total_removed_before_sigverify_stage as i64, - i64 - ), - ( - "total_tracer_packets_received_in_sigverify", - modifiable_tracer_packet_stats - .sigverify_tracer_packet_stats - .total_tracer_packets_received_in_sigverify_stage - as i64, - i64 - ), - ( - "total_tracer_packets_deduped_in_sigverify", - modifiable_tracer_packet_stats - .sigverify_tracer_packet_stats - .total_tracer_packets_deduped as i64, - i64 - ), - ( - "total_excess_tracer_packets_discarded_in_sigverify", - modifiable_tracer_packet_stats - .sigverify_tracer_packet_stats - .total_excess_tracer_packets as i64, - i64 - ), - ( - "total_tracker_packets_passed_sigverify", - modifiable_tracer_packet_stats - .sigverify_tracer_packet_stats - .total_tracker_packets_passed_sigverify as i64, - i64 - ), - ( - "total_exceeded_banking_stage_buffer", - modifiable_tracer_packet_stats - .banking_stage_tracer_packet_stats - .total_exceeded_banking_stage_buffer as i64, - i64 - ), - ( - "total_cleared_from_buffer_after_forward", - modifiable_tracer_packet_stats - .banking_stage_tracer_packet_stats - .total_cleared_from_buffer_after_forward as i64, - i64 - ), - ( - "total_forwardable_tracer_packets", - modifiable_tracer_packet_stats - .banking_stage_tracer_packet_stats - .total_forwardable_tracer_packets as i64, - i64 - ), - ( - "exceeded_expected_forward_leader_count", - modifiable_tracer_packet_stats - .banking_stage_tracer_packet_stats - .forward_target_leaders - .len() - > LEADER_REPORT_LIMIT, - bool - ), - ( - "forward_target_leaders", - itertools::Itertools::intersperse( - modifiable_tracer_packet_stats - .banking_stage_tracer_packet_stats - .forward_target_leaders - .iter() - .take(LEADER_REPORT_LIMIT) - .map(|leader_pubkey| leader_pubkey.to_string()), - ", ".to_string() - ) - .collect::(), - String - ) - ); - - *self = Self::reset(self.id.clone()); - self.last_report = timestamp(); - } - } - } -} diff --git a/perf/benches/dedup.rs b/perf/benches/dedup.rs index 8d2198d319b22f..3c00baa609e363 100644 --- a/perf/benches/dedup.rs +++ b/perf/benches/dedup.rs @@ -27,7 +27,7 @@ fn do_bench_dedup_packets(bencher: &mut Bencher, mut batches: Vec) let mut rng = rand::thread_rng(); let mut deduper = Deduper::<2, [u8]>::new(&mut rng, /*num_bits:*/ 63_999_979); bencher.iter(|| { - let _ans = deduper::dedup_packets_and_count_discards(&deduper, &mut batches, |_, _, _| ()); + let _ans = deduper::dedup_packets_and_count_discards(&deduper, &mut batches); deduper.maybe_reset( &mut rng, 0.001, // false_positive_rate diff --git a/perf/benches/shrink.rs b/perf/benches/shrink.rs index 4459f5f9a6b0c3..92f377548cf2d5 100644 --- a/perf/benches/shrink.rs +++ b/perf/benches/shrink.rs @@ -79,6 +79,6 @@ fn bench_shrink_count_packets(bencher: &mut Bencher) { }); bencher.iter(|| { - let _ = sigverify::count_valid_packets(&batches, |_| ()); + let _ = sigverify::count_valid_packets(&batches); }); } diff --git a/perf/src/deduper.rs b/perf/src/deduper.rs index afb60bd9419593..b25bcc93ed4449 100644 --- a/perf/src/deduper.rs +++ b/perf/src/deduper.rs @@ -1,7 +1,7 @@ //! Utility to deduplicate baches of incoming network packets. use { - crate::packet::{Packet, PacketBatch}, + crate::packet::PacketBatch, ahash::RandomState, rand::Rng, std::{ @@ -90,23 +90,18 @@ fn new_random_state(rng: &mut R) -> RandomState { pub fn dedup_packets_and_count_discards( deduper: &Deduper, batches: &mut [PacketBatch], - mut process_received_packet: impl FnMut(&mut Packet, bool, bool), ) -> u64 { batches .iter_mut() .flat_map(PacketBatch::iter_mut) .map(|packet| { - if packet.meta().discard() { - process_received_packet(packet, true, false); - } else if packet - .data(..) - .map(|data| deduper.dedup(data)) - .unwrap_or(true) + if !packet.meta().discard() + && packet + .data(..) + .map(|data| deduper.dedup(data)) + .unwrap_or(true) { packet.meta_mut().set_discard(true); - process_received_packet(packet, false, true); - } else { - process_received_packet(packet, false, false); } u64::from(packet.meta().discard()) }) @@ -118,7 +113,11 @@ pub fn dedup_packets_and_count_discards( mod tests { use { super::*, - crate::{packet::to_packet_batches, sigverify, test_tx::test_tx}, + crate::{ + packet::{to_packet_batches, Packet}, + sigverify, + test_tx::test_tx, + }, rand::SeedableRng, rand_chacha::ChaChaRng, solana_packet::{Meta, PACKET_DATA_SIZE}, @@ -134,15 +133,7 @@ mod tests { let packet_count = sigverify::count_packets_in_batches(&batches); let mut rng = rand::thread_rng(); let filter = Deduper::<2, [u8]>::new(&mut rng, /*num_bits:*/ 63_999_979); - let mut num_deduped = 0; - let discard = dedup_packets_and_count_discards( - &filter, - &mut batches, - |_deduped_packet, _removed_before_sigverify_stage, _is_dup| { - num_deduped += 1; - }, - ) as usize; - assert_eq!(num_deduped, discard + 1); + let discard = dedup_packets_and_count_discards(&filter, &mut batches) as usize; assert_eq!(packet_count, discard + 1); } @@ -151,8 +142,7 @@ mod tests { let mut rng = rand::thread_rng(); let mut filter = Deduper::<2, [u8]>::new(&mut rng, /*num_bits:*/ 63_999_979); let mut batches = to_packet_batches(&(0..1024).map(|_| test_tx()).collect::>(), 128); - let discard = - dedup_packets_and_count_discards(&filter, &mut batches, |_, _, _| ()) as usize; + let discard = dedup_packets_and_count_discards(&filter, &mut batches) as usize; // because dedup uses a threadpool, there maybe up to N threads of txs that go through assert_eq!(discard, 0); assert!(!filter.maybe_reset( @@ -182,8 +172,7 @@ mod tests { for i in 0..1000 { let mut batches = to_packet_batches(&(0..1000).map(|_| test_tx()).collect::>(), 128); - discard += - dedup_packets_and_count_discards(&filter, &mut batches, |_, _, _| ()) as usize; + discard += dedup_packets_and_count_discards(&filter, &mut batches) as usize; trace!("{} {}", i, discard); if filter.popcount.load(Ordering::Relaxed) > capacity { break; @@ -206,8 +195,7 @@ mod tests { for i in 0..10 { let mut batches = to_packet_batches(&(0..1024).map(|_| test_tx()).collect::>(), 128); - discard += - dedup_packets_and_count_discards(&filter, &mut batches, |_, _, _| ()) as usize; + discard += dedup_packets_and_count_discards(&filter, &mut batches) as usize; debug!("false positive rate: {}/{}", discard, i * 1024); } //allow for 1 false positive even if extremely unlikely diff --git a/perf/src/sigverify.rs b/perf/src/sigverify.rs index 5394d0462b263f..e07da9bcecf732 100644 --- a/perf/src/sigverify.rs +++ b/perf/src/sigverify.rs @@ -20,13 +20,6 @@ use { std::{convert::TryFrom, mem::size_of}, }; -// Representing key tKeYE4wtowRb8yRroZShTipE18YVnqwXjsSAoNsFU6g -const TRACER_KEY_BYTES: [u8; 32] = [ - 13, 37, 180, 170, 252, 137, 36, 194, 183, 143, 161, 193, 201, 207, 211, 23, 189, 93, 33, 110, - 155, 90, 30, 39, 116, 115, 238, 38, 126, 21, 232, 133, -]; -const TRACER_KEY: Pubkey = Pubkey::new_from_array(TRACER_KEY_BYTES); -const TRACER_KEY_OFFSET_IN_TRANSACTION: usize = 69; // Empirically derived to constrain max verify latency to ~8ms at lower packet counts pub const VERIFY_PACKET_CHUNK_SIZE: usize = 128; @@ -153,24 +146,10 @@ pub fn count_packets_in_batches(batches: &[PacketBatch]) -> usize { batches.iter().map(|batch| batch.len()).sum() } -pub fn count_valid_packets( - batches: &[PacketBatch], - mut process_valid_packet: impl FnMut(&Packet), -) -> usize { +pub fn count_valid_packets(batches: &[PacketBatch]) -> usize { batches .iter() - .map(|batch| { - batch - .iter() - .filter(|p| { - let should_keep = !p.meta().discard(); - if should_keep { - process_valid_packet(p); - } - should_keep - }) - .count() - }) + .map(|batch| batch.iter().filter(|p| !p.meta().discard()).count()) .sum() } @@ -308,21 +287,6 @@ fn do_get_packet_offsets( )) } -pub fn check_for_tracer_packet(packet: &mut Packet) -> bool { - let first_pubkey_start: usize = TRACER_KEY_OFFSET_IN_TRANSACTION; - let Some(first_pubkey_end) = first_pubkey_start.checked_add(size_of::()) else { - return false; - }; - // Check for tracer pubkey - match packet.data(first_pubkey_start..first_pubkey_end) { - Some(pubkey) if pubkey == TRACER_KEY.as_ref() => { - packet.meta_mut().set_tracer(true); - true - } - _ => false, - } -} - fn get_packet_offsets( packet: &mut Packet, current_offset: usize, @@ -1484,7 +1448,7 @@ mod tests { }); start.sort_by(|a, b| a.data(..).cmp(&b.data(..))); - let packet_count = count_valid_packets(&batches, |_| ()); + let packet_count = count_valid_packets(&batches); shrink_batches(&mut batches); //make sure all the non discarded packets are the same @@ -1495,7 +1459,7 @@ mod tests { .for_each(|p| end.push(p.clone())) }); end.sort_by(|a, b| a.data(..).cmp(&b.data(..))); - let packet_count2 = count_valid_packets(&batches, |_| ()); + let packet_count2 = count_valid_packets(&batches); assert_eq!(packet_count, packet_count2); assert_eq!(start, end); } @@ -1659,13 +1623,13 @@ mod tests { PACKETS_PER_BATCH, ); assert_eq!(batches.len(), BATCH_COUNT); - assert_eq!(count_valid_packets(&batches, |_| ()), PACKET_COUNT); + assert_eq!(count_valid_packets(&batches), PACKET_COUNT); batches.iter_mut().enumerate().for_each(|(i, b)| { b.iter_mut() .enumerate() .for_each(|(j, p)| p.meta_mut().set_discard(set_discard(i, j))) }); - assert_eq!(count_valid_packets(&batches, |_| ()), *expect_valid_packets); + assert_eq!(count_valid_packets(&batches), *expect_valid_packets); debug!("show valid packets for case {}", i); batches.iter_mut().enumerate().for_each(|(i, b)| { b.iter_mut().enumerate().for_each(|(j, p)| { @@ -1679,7 +1643,7 @@ mod tests { let shrunken_batch_count = batches.len(); debug!("shrunk batch test {} count: {}", i, shrunken_batch_count); assert_eq!(shrunken_batch_count, *expect_batch_count); - assert_eq!(count_valid_packets(&batches, |_| ()), *expect_valid_packets); + assert_eq!(count_valid_packets(&batches), *expect_valid_packets); } } } diff --git a/sdk/packet/src/lib.rs b/sdk/packet/src/lib.rs index fe25e4ffa108f4..871f9050c41336 100644 --- a/sdk/packet/src/lib.rs +++ b/sdk/packet/src/lib.rs @@ -52,9 +52,10 @@ bitflags! { const FORWARDED = 0b0000_0010; const REPAIR = 0b0000_0100; const SIMPLE_VOTE_TX = 0b0000_1000; - const TRACER_PACKET = 0b0001_0000; // Previously used - this can now be re-used for something else. - const UNUSED = 0b0010_0000; + const UNUSED_0 = 0b0001_0000; + // Previously used - this can now be re-used for something else. + const UNUSED_1 = 0b0010_0000; /// For tracking performance const PERF_TRACK_PACKET = 0b0100_0000; /// For marking packets from staked nodes @@ -264,11 +265,6 @@ impl Meta { self.flags.set(PacketFlags::DISCARD, discard); } - #[inline] - pub fn set_tracer(&mut self, is_tracer: bool) { - self.flags.set(PacketFlags::TRACER_PACKET, is_tracer); - } - #[inline] pub fn set_track_performance(&mut self, is_performance_track: bool) { self.flags @@ -295,11 +291,6 @@ impl Meta { self.flags.contains(PacketFlags::SIMPLE_VOTE_TX) } - #[inline] - pub fn is_tracer_packet(&self) -> bool { - self.flags.contains(PacketFlags::TRACER_PACKET) - } - #[inline] pub fn is_perf_track_packet(&self) -> bool { self.flags.contains(PacketFlags::PERF_TRACK_PACKET)