Skip to content

Commit

Permalink
Remove Tracer packet as a concept (#4043)
Browse files Browse the repository at this point in the history
  • Loading branch information
apfitzge authored Dec 12, 2024
1 parent 3d43824 commit 3a6d593
Show file tree
Hide file tree
Showing 24 changed files with 123 additions and 736 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ Release channels have their own copy of this changelog:
* CLI:
* Add global `--skip-preflight` option for skipping preflight checks on all transactions sent through RPC. This flag, along with `--use-rpc`, can improve success rate with program deployments using the public RPC nodes.
* Unhide `--accounts-db-access-storages-method` for agave-validator and agave-ledger-tool
* Remove tracer stats from banking-trace. `banking-trace` directory should be cleared when restarting on v2.2 for first time. It will not break if not cleared, but the file will be a mix of new/old format. (#4043)

## [2.1.0]
* Breaking:
Expand Down
2 changes: 1 addition & 1 deletion banking-bench/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -504,7 +504,7 @@ fn main() {
timestamp(),
);
non_vote_sender
.send(BankingPacketBatch::new((vec![packet_batch.clone()], None)))
.send(BankingPacketBatch::new(vec![packet_batch.clone()]))
.unwrap();
}

Expand Down
12 changes: 5 additions & 7 deletions core/benches/banking_stage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -319,16 +319,14 @@ fn bench_banking(bencher: &mut Bencher, tx_type: TransactionType) {
let mut sent = 0;
if let Some(vote_packets) = &vote_packets {
tpu_vote_sender
.send(BankingPacketBatch::new((
.send(BankingPacketBatch::new(
vote_packets[start..start + chunk_len].to_vec(),
None,
)))
))
.unwrap();
gossip_vote_sender
.send(BankingPacketBatch::new((
.send(BankingPacketBatch::new(
vote_packets[start..start + chunk_len].to_vec(),
None,
)))
))
.unwrap();
}
for v in verified[start..start + chunk_len].chunks(chunk_len / num_threads) {
Expand All @@ -343,7 +341,7 @@ fn bench_banking(bencher: &mut Bencher, tx_type: TransactionType) {
sent += xv.len();
}
non_vote_sender
.send(BankingPacketBatch::new((v.to_vec(), None)))
.send(BankingPacketBatch::new(v.to_vec()))
.unwrap();
}

Expand Down
36 changes: 8 additions & 28 deletions core/benches/forwarder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,15 +3,12 @@ extern crate test;
use {
itertools::Itertools,
solana_client::connection_cache::ConnectionCache,
solana_core::{
banking_stage::{
forwarder::Forwarder,
leader_slot_metrics::LeaderSlotMetricsTracker,
unprocessed_packet_batches::{DeserializedPacket, UnprocessedPacketBatches},
unprocessed_transaction_storage::{ThreadType, UnprocessedTransactionStorage},
BankingStageStats,
},
tracer_packet_stats::TracerPacketStats,
solana_core::banking_stage::{
forwarder::Forwarder,
leader_slot_metrics::LeaderSlotMetricsTracker,
unprocessed_packet_batches::{DeserializedPacket, UnprocessedPacketBatches},
unprocessed_transaction_storage::{ThreadType, UnprocessedTransactionStorage},
BankingStageStats,
},
solana_gossip::cluster_info::{ClusterInfo, Node},
solana_ledger::{
Expand All @@ -38,7 +35,6 @@ struct BenchSetup {
unprocessed_packet_batches: UnprocessedTransactionStorage,
tracker: LeaderSlotMetricsTracker,
stats: BankingStageStats,
tracer_stats: TracerPacketStats,
}

fn setup(num_packets: usize, contentious_transaction: bool) -> BenchSetup {
Expand Down Expand Up @@ -88,7 +84,6 @@ fn setup(num_packets: usize, contentious_transaction: bool) -> BenchSetup {
transaction.message.account_keys[0] = solana_sdk::pubkey::Pubkey::new_unique();
}
let mut packet = Packet::from_data(None, transaction).unwrap();
packet.meta_mut().set_tracer(true);
packet.meta_mut().set_from_staked_node(true);
DeserializedPacket::new(packet).unwrap()
})
Expand Down Expand Up @@ -118,7 +113,6 @@ fn setup(num_packets: usize, contentious_transaction: bool) -> BenchSetup {
unprocessed_packet_batches,
tracker: LeaderSlotMetricsTracker::new(0),
stats: BankingStageStats::default(),
tracer_stats: TracerPacketStats::new(0),
}
}

Expand All @@ -132,19 +126,12 @@ fn bench_forwarder_handle_forwading_contentious_transaction(bencher: &mut Benche
mut unprocessed_packet_batches,
mut tracker,
stats,
mut tracer_stats,
} = setup(num_packets, true);

// hold packets so they can be reused for benching
let hold = true;
bencher.iter(|| {
forwarder.handle_forwarding(
&mut unprocessed_packet_batches,
hold,
&mut tracker,
&stats,
&mut tracer_stats,
);
forwarder.handle_forwarding(&mut unprocessed_packet_batches, hold, &mut tracker, &stats);
// reset packet.forwarded flag to reuse `unprocessed_packet_batches`
if let UnprocessedTransactionStorage::LocalTransactionStorage(unprocessed_packets) =
&mut unprocessed_packet_batches
Expand All @@ -169,19 +156,12 @@ fn bench_forwarder_handle_forwading_parallel_transactions(bencher: &mut Bencher)
mut unprocessed_packet_batches,
mut tracker,
stats,
mut tracer_stats,
} = setup(num_packets, false);

// hold packets so they can be reused for benching
let hold = true;
bencher.iter(|| {
forwarder.handle_forwarding(
&mut unprocessed_packet_batches,
hold,
&mut tracker,
&stats,
&mut tracer_stats,
);
forwarder.handle_forwarding(&mut unprocessed_packet_batches, hold, &mut tracker, &stats);
// reset packet.forwarded flag to reuse `unprocessed_packet_batches`
if let UnprocessedTransactionStorage::LocalTransactionStorage(unprocessed_packets) =
&mut unprocessed_packet_batches
Expand Down
20 changes: 6 additions & 14 deletions core/benches/sigverify_stage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@ use {
},
solana_sdk::{
hash::Hash,
packet::PacketFlags,
signature::{Keypair, Signer},
system_transaction,
},
Expand Down Expand Up @@ -58,7 +57,7 @@ fn run_bench_packet_discard(num_ips: usize, bencher: &mut Bencher) {
info!("total packets: {}", total);

bencher.iter(move || {
SigVerifyStage::discard_excess_packets(&mut batches, 10_000, |_| ());
SigVerifyStage::discard_excess_packets(&mut batches, 10_000);
let mut num_packets = 0;
for batch in batches.iter_mut() {
for p in batch.iter_mut() {
Expand Down Expand Up @@ -105,7 +104,7 @@ fn bench_packet_discard_mixed_senders(bencher: &mut Bencher) {
}
}
bencher.iter(move || {
SigVerifyStage::discard_excess_packets(&mut batches, 10_000, |_| ());
SigVerifyStage::discard_excess_packets(&mut batches, 10_000);
let mut num_packets = 0;
for batch in batches.iter_mut() {
for packet in batch.iter_mut() {
Expand Down Expand Up @@ -171,24 +170,17 @@ fn bench_sigverify_stage(bencher: &mut Bencher, use_same_tx: bool) {
);

let mut sent_len = 0;
for mut batch in batches.into_iter() {
for batch in batches.into_iter() {
sent_len += batch.len();
batch
.iter_mut()
.for_each(|packet| packet.meta_mut().flags |= PacketFlags::TRACER_PACKET);
packet_s.send(batch).unwrap();
}
let mut received = 0;
let mut total_tracer_packets_received_in_sigverify_stage = 0;
trace!("sent: {}", sent_len);
loop {
if let Ok(message) = verified_r.recv_timeout(Duration::from_millis(10)) {
let (verifieds, tracer_packet_stats) = (&message.0, message.1.as_ref().unwrap());
if let Ok(verifieds) = verified_r.recv_timeout(Duration::from_millis(10)) {
received += verifieds.iter().map(|batch| batch.len()).sum::<usize>();
total_tracer_packets_received_in_sigverify_stage +=
tracer_packet_stats.total_tracer_packets_received_in_sigverify_stage;
test::black_box(message);
if total_tracer_packets_received_in_sigverify_stage >= sent_len {
test::black_box(verifieds);
if received >= sent_len {
break;
}
}
Expand Down
3 changes: 1 addition & 2 deletions core/src/banking_simulation.rs
Original file line number Diff line number Diff line change
Expand Up @@ -822,8 +822,7 @@ impl BankingSimulator {
let timed_batches_to_send = packet_batches_by_time.split_off(&base_event_time);
let batch_and_tx_counts = timed_batches_to_send
.values()
.map(|(_label, batches_with_stats)| {
let batches = &batches_with_stats.0;
.map(|(_label, batches)| {
(
batches.len(),
batches.iter().map(|batch| batch.len()).sum::<usize>(),
Expand Down
19 changes: 4 additions & 15 deletions core/src/banking_stage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@ use {
},
},
banking_trace::BankingPacketReceiver,
tracer_packet_stats::TracerPacketStats,
validator::BlockProductionMethod,
},
crossbeam_channel::{unbounded, Receiver, RecvTimeoutError, Sender},
Expand Down Expand Up @@ -319,8 +318,6 @@ pub enum ForwardOption {
#[derive(Debug, Default)]
pub struct FilterForwardingResults {
pub(crate) total_forwardable_packets: usize,
pub(crate) total_tracer_packets_in_buffer: usize,
pub(crate) total_forwardable_tracer_packets: usize,
pub(crate) total_dropped_packets: usize,
pub(crate) total_packet_conversion_us: u64,
pub(crate) total_filter_packets_us: u64,
Expand Down Expand Up @@ -686,7 +683,6 @@ impl BankingStage {
unprocessed_transaction_storage: &mut UnprocessedTransactionStorage,
banking_stage_stats: &BankingStageStats,
slot_metrics_tracker: &mut LeaderSlotMetricsTracker,
tracer_packet_stats: &mut TracerPacketStats,
) {
if unprocessed_transaction_storage.should_not_process() {
return;
Expand Down Expand Up @@ -722,7 +718,6 @@ impl BankingStage {
false,
slot_metrics_tracker,
banking_stage_stats,
tracer_packet_stats,
));
slot_metrics_tracker.increment_forward_us(forward_us);
// Take metrics action after forwarding packets to include forwarded
Expand All @@ -735,7 +730,6 @@ impl BankingStage {
true,
slot_metrics_tracker,
banking_stage_stats,
tracer_packet_stats,
));
slot_metrics_tracker.increment_forward_and_hold_us(forward_and_hold_us);
// Take metrics action after forwarding packets
Expand All @@ -754,7 +748,6 @@ impl BankingStage {
mut unprocessed_transaction_storage: UnprocessedTransactionStorage,
) {
let mut banking_stage_stats = BankingStageStats::new(id);
let mut tracer_packet_stats = TracerPacketStats::new(id);

let mut slot_metrics_tracker = LeaderSlotMetricsTracker::new(id);
let mut last_metrics_update = Instant::now();
Expand All @@ -770,19 +763,15 @@ impl BankingStage {
&mut unprocessed_transaction_storage,
&banking_stage_stats,
&mut slot_metrics_tracker,
&mut tracer_packet_stats,
));
slot_metrics_tracker
.increment_process_buffered_packets_us(process_buffered_packets_us);
last_metrics_update = Instant::now();
}

tracer_packet_stats.report(1000);

match packet_receiver.receive_and_buffer_packets(
&mut unprocessed_transaction_storage,
&mut banking_stage_stats,
&mut tracer_packet_stats,
&mut slot_metrics_tracker,
) {
Ok(()) | Err(RecvTimeoutError::Timeout) => (),
Expand Down Expand Up @@ -1073,7 +1062,7 @@ mod tests {
.collect();
let packet_batches = convert_from_old_verified(packet_batches);
non_vote_sender // no_ver, anf, tx
.send(BankingPacketBatch::new((packet_batches, None)))
.send(BankingPacketBatch::new(packet_batches))
.unwrap();

drop(non_vote_sender);
Expand Down Expand Up @@ -1152,7 +1141,7 @@ mod tests {
.collect();
let packet_batches = convert_from_old_verified(packet_batches);
non_vote_sender
.send(BankingPacketBatch::new((packet_batches, None)))
.send(BankingPacketBatch::new(packet_batches))
.unwrap();

// Process a second batch that uses the same from account, so conflicts with above TX
Expand All @@ -1165,7 +1154,7 @@ mod tests {
.collect();
let packet_batches = convert_from_old_verified(packet_batches);
non_vote_sender
.send(BankingPacketBatch::new((packet_batches, None)))
.send(BankingPacketBatch::new(packet_batches))
.unwrap();

let (tpu_vote_sender, tpu_vote_receiver) = banking_tracer.create_channel_tpu_vote();
Expand Down Expand Up @@ -1472,7 +1461,7 @@ mod tests {
Builder::new()
.spawn(move || {
sender
.send(BankingPacketBatch::new((packet_batches, None)))
.send(BankingPacketBatch::new(packet_batches))
.unwrap()
})
.unwrap()
Expand Down
15 changes: 1 addition & 14 deletions core/src/banking_stage/forwarder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@ use {
immutable_deserialized_packet::ImmutableDeserializedPacket, LikeClusterInfo,
},
next_leader::{next_leader, next_leader_tpu_vote},
tracer_packet_stats::TracerPacketStats,
},
solana_client::connection_cache::ConnectionCache,
solana_connection_cache::client_connection::ClientConnection as TpuConnection,
Expand Down Expand Up @@ -96,7 +95,6 @@ impl<T: LikeClusterInfo> Forwarder<T> {
hold: bool,
slot_metrics_tracker: &mut LeaderSlotMetricsTracker,
banking_stage_stats: &BankingStageStats,
tracer_packet_stats: &mut TracerPacketStats,
) {
let forward_option = unprocessed_transaction_storage.forward_option();

Expand Down Expand Up @@ -139,19 +137,13 @@ impl<T: LikeClusterInfo> Forwarder<T> {
slot_metrics_tracker.increment_forwardable_batches_count(1);

let batched_forwardable_packets_count = forward_batch.len();
let (_forward_result, successful_forwarded_packets_count, leader_pubkey) = self
let (_forward_result, successful_forwarded_packets_count, _leader_pubkey) = self
.forward_buffered_packets(
&forward_option,
forward_batch.get_forwardable_packets(),
banking_stage_stats,
);

if let Some(leader_pubkey) = leader_pubkey {
tracer_packet_stats.increment_total_forwardable_tracer_packets(
filter_forwarding_result.total_forwardable_tracer_packets,
leader_pubkey,
);
}
let failed_forwarded_packets_count = batched_forwardable_packets_count
.saturating_sub(successful_forwarded_packets_count);

Expand All @@ -174,9 +166,6 @@ impl<T: LikeClusterInfo> Forwarder<T> {
slot_metrics_tracker.increment_cleared_from_buffer_after_forward_count(
filter_forwarding_result.total_forwardable_packets as u64,
);
tracer_packet_stats.increment_total_cleared_from_buffer_after_forward(
filter_forwarding_result.total_tracer_packets_in_buffer,
);
unprocessed_transaction_storage.clear_forwarded_packets();
}
}
Expand Down Expand Up @@ -485,7 +474,6 @@ mod tests {
true,
&mut LeaderSlotMetricsTracker::new(0),
&stats,
&mut TracerPacketStats::new(0),
);

let recv_socket = &local_node.sockets.tpu_forwards_quic[0];
Expand Down Expand Up @@ -584,7 +572,6 @@ mod tests {
hold,
&mut LeaderSlotMetricsTracker::new(0),
&stats,
&mut TracerPacketStats::new(0),
);

let recv_socket = &local_node.sockets.tpu_forwards_quic[0];
Expand Down
Loading

0 comments on commit 3a6d593

Please sign in to comment.