From d95db0ca83f949c6046cb3c371d332d789495252 Mon Sep 17 00:00:00 2001 From: Tao Zhu Date: Fri, 24 Sep 2021 11:18:38 -0500 Subject: [PATCH] banking forward tpu vote --- core/src/banking_stage.rs | 42 +++++++++++++++++++++------------------ 1 file changed, 23 insertions(+), 19 deletions(-) diff --git a/core/src/banking_stage.rs b/core/src/banking_stage.rs index 3204b26da438a0..ad7e3209b0585d 100644 --- a/core/src/banking_stage.rs +++ b/core/src/banking_stage.rs @@ -244,6 +244,12 @@ pub enum BufferedPacketsDecision { Hold, } +pub enum ForwardOption { + NotForward, + ForwardTpuVote, + ForwardTransaction, +} + impl BankingStage { /// Create the stage using `bank`. Exit when `verified_receiver` is dropped. #[allow(clippy::new_ret_no_self)] @@ -292,13 +298,13 @@ impl BankingStage { assert!(num_threads >= MIN_THREADS_VOTES + MIN_THREADS_BANKING); let bank_thread_hdls: Vec> = (0..num_threads) .map(|i| { - let (verified_receiver, enable_forwarding) = match i.cmp(&(num_threads - 2)) { - std::cmp::Ordering::Less => (verified_receiver.clone(), true), - std::cmp::Ordering::Equal => (vote_verified_receiver.clone(), false), + let (verified_receiver, forward_option) = match i.cmp(&(num_threads - 2)) { + std::cmp::Ordering::Less => (verified_receiver.clone(), ForwardOption::ForwardTransaction), + std::cmp::Ordering::Equal => (vote_verified_receiver.clone(), ForwardOption::ForwardTpuVote), std::cmp::Ordering::Greater => { // Disable forwarding of vote transactions // from gossip. Note - votes can also arrive from tpu - (verified_vote_receiver.clone(), false) + (verified_vote_receiver.clone(), ForwardOption::NotForward) } }; @@ -319,7 +325,7 @@ impl BankingStage { &poh_recorder, &cluster_info, &mut recv_start, - enable_forwarding, + forward_option, i, batch_limit, transaction_status_sender, @@ -525,7 +531,7 @@ impl BankingStage { poh_recorder: &Arc>, cluster_info: &ClusterInfo, buffered_packets: &mut UnprocessedPackets, - enable_forwarding: bool, + forward_option: &ForwardOption, transaction_status_sender: Option, gossip_vote_sender: &ReplayVoteSender, banking_stage_stats: &BankingStageStats, @@ -575,7 +581,7 @@ impl BankingStage { } BufferedPacketsDecision::Forward => { Self::handle_forwarding( - enable_forwarding, + forward_option, cluster_info, buffered_packets, poh_recorder, @@ -586,7 +592,7 @@ impl BankingStage { } BufferedPacketsDecision::ForwardAndHold => { Self::handle_forwarding( - enable_forwarding, + forward_option, cluster_info, buffered_packets, poh_recorder, @@ -601,7 +607,7 @@ impl BankingStage { } fn handle_forwarding( - enable_forwarding: bool, + forward_option: &ForwardOption, cluster_info: &ClusterInfo, buffered_packets: &mut UnprocessedPackets, poh_recorder: &Arc>, @@ -609,14 +615,12 @@ impl BankingStage { hold: bool, data_budget: &DataBudget, ) { - if !enable_forwarding { - if !hold { - buffered_packets.clear(); - } - return; - } - - let addr = match next_leader_tpu_forwards(cluster_info, poh_recorder) { + let addr = match forward_option { + ForwardOption::NotForward => {if !hold { buffered_packets.clear(); } return}, + ForwardOption::ForwardTransaction => next_leader_tpu_forwards(cluster_info, poh_recorder), + ForwardOption::ForwardTpuVote => next_leader_tpu_vote(cluster_info, poh_recorder), + }; + let addr = match addr { Some(addr) => addr, None => return, }; @@ -638,7 +642,7 @@ impl BankingStage { poh_recorder: &Arc>, cluster_info: &ClusterInfo, recv_start: &mut Instant, - enable_forwarding: bool, + forward_option: ForwardOption, id: u32, batch_limit: usize, transaction_status_sender: Option, @@ -658,7 +662,7 @@ impl BankingStage { &poh_recorder, cluster_info, &mut buffered_packets, - enable_forwarding, + &forward_option, transaction_status_sender.clone(), &gossip_vote_sender, &banking_stage_stats,