Skip to content

Commit

Permalink
[consensus] Dedicated channel for proposal buffering and batch process
Browse files Browse the repository at this point in the history
  • Loading branch information
zekun000 authored and Zekun Wang committed Oct 13, 2023
1 parent ed3b35e commit 9956fed
Show file tree
Hide file tree
Showing 9 changed files with 137 additions and 70 deletions.
67 changes: 56 additions & 11 deletions consensus/src/epoch_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@ use aptos_config::config::{ConsensusConfig, NodeConfig, SecureBackend};
use aptos_consensus_types::{
common::{Author, Round},
epoch_retrieval::EpochRetrievalRequest,
proposal_msg::ProposalMsg,
};
use aptos_event_notifications::ReconfigNotificationListener;
use aptos_global_constants::CONSENSUS_KEY;
Expand Down Expand Up @@ -100,10 +101,10 @@ use std::{

/// Range of rounds (window) that we might be calling proposer election
/// functions with at any given time, in addition to the proposer history length.
const PROPSER_ELECTION_CACHING_WINDOW_ADDITION: usize = 3;
const PROPOSER_ELECTION_CACHING_WINDOW_ADDITION: usize = 3;
/// Number of rounds we expect storage to be ahead of the proposer round,
/// used for fetching data from DB.
const PROPSER_ROUND_BEHIND_STORAGE_BUFFER: usize = 10;
const PROPOSER_ROUND_BEHIND_STORAGE_BUFFER: usize = 10;

#[allow(clippy::large_enum_variant)]
pub enum LivenessStorageData {
Expand Down Expand Up @@ -133,6 +134,7 @@ pub struct EpochManager<P: OnChainConfigProvider> {
round_manager_tx: Option<
aptos_channel::Sender<(Author, Discriminant<VerifiedEvent>), (Author, VerifiedEvent)>,
>,
proposal_precheck_tx: Option<aptos_channel::Sender<Author, Box<ProposalMsg>>>,
round_manager_close_tx: Option<oneshot::Sender<oneshot::Sender<()>>>,
epoch_state: Option<Arc<EpochState>>,
block_retrieval_tx:
Expand Down Expand Up @@ -183,6 +185,7 @@ impl<P: OnChainConfigProvider> EpochManager<P> {
buffer_manager_reset_tx: None,
round_manager_tx: None,
round_manager_close_tx: None,
proposal_precheck_tx: None,
epoch_state: None,
block_retrieval_tx: None,
quorum_store_msg_tx: None,
Expand Down Expand Up @@ -222,19 +225,19 @@ impl<P: OnChainConfigProvider> EpochManager<P> {
&self,
epoch_state: &EpochState,
onchain_config: &OnChainConsensusConfig,
) -> Box<dyn ProposerElection + Send + Sync> {
) -> Arc<dyn ProposerElection + Send + Sync> {
let proposers = epoch_state
.verifier
.get_ordered_account_addresses_iter()
.collect::<Vec<_>>();
match &onchain_config.proposer_election_type() {
ProposerElectionType::RotatingProposer(contiguous_rounds) => {
Box::new(RotatingProposer::new(proposers, *contiguous_rounds))
Arc::new(RotatingProposer::new(proposers, *contiguous_rounds))
},
// We don't really have a fixed proposer!
ProposerElectionType::FixedProposer(contiguous_rounds) => {
let proposer = choose_leader(proposers);
Box::new(RotatingProposer::new(vec![proposer], *contiguous_rounds))
Arc::new(RotatingProposer::new(vec![proposer], *contiguous_rounds))
},
ProposerElectionType::LeaderReputation(leader_reputation_type) => {
let (
Expand Down Expand Up @@ -271,7 +274,7 @@ impl<P: OnChainConfigProvider> EpochManager<P> {

let seek_len = onchain_config.leader_reputation_exclude_round() as usize
+ onchain_config.max_failed_authors_to_store()
+ PROPSER_ROUND_BEHIND_STORAGE_BUFFER;
+ PROPOSER_ROUND_BEHIND_STORAGE_BUFFER;

let backend = Box::new(AptosDBBackend::new(
window_size,
Expand Down Expand Up @@ -335,17 +338,17 @@ impl<P: OnChainConfigProvider> EpochManager<P> {
self.config.window_for_chain_health,
));
// LeaderReputation is not cheap, so we can cache the amount of rounds round_manager needs.
Box::new(CachedProposerElection::new(
Arc::new(CachedProposerElection::new(
epoch_state.epoch,
proposer_election,
onchain_config.max_failed_authors_to_store()
+ PROPSER_ELECTION_CACHING_WINDOW_ADDITION,
+ PROPOSER_ELECTION_CACHING_WINDOW_ADDITION,
))
},
ProposerElectionType::RoundProposer(round_proposers) => {
// Hardcoded to the first proposer
let default_proposer = proposers.first().unwrap();
Box::new(RoundProposer::new(
Arc::new(RoundProposer::new(
round_proposers.clone(),
*default_proposer,
))
Expand Down Expand Up @@ -809,7 +812,42 @@ impl<P: OnChainConfigProvider> EpochManager<P> {
Some(&counters::ROUND_MANAGER_CHANNEL_MSGS),
);

let (proposal_precheck_tx, mut proposal_precheck_rx) = aptos_channel::new(
QueueStyle::FIFO,
1,
Some(&counters::ROUND_MANAGER_CHANNEL_MSGS),
);
self.round_manager_tx = Some(round_manager_tx.clone());
self.proposal_precheck_tx = Some(proposal_precheck_tx);

let (checked_proposal_tx, checked_proposal_rx) = aptos_channel::new(
QueueStyle::KLAST,
onchain_consensus_config.leader_reputation_exclude_round() as usize,
None,
);
let proposer_election_clone = proposer_election.clone();
let checked_proposal_tx_clone = checked_proposal_tx.clone();

// Spawn task to buffer valid proposals
tokio::spawn(async move {
while let Some(proposal_msg) = proposal_precheck_rx.next().await {
if proposer_election_clone
.is_valid_proposer(proposal_msg.proposer(), proposal_msg.proposal().round())
{
if let Err(e) =
checked_proposal_tx_clone.push((), VerifiedEvent::ProposalMsg(proposal_msg))
{
warn!("Failed to send to proposal channel {:?}", e);
}
} else {
warn!(
"Invalid proposal {} from {}",
proposal_msg.proposal(),
proposal_msg.proposer(),
);
}
}
});

self.set_epoch_start_metrics(&epoch_state);

Expand All @@ -823,15 +861,15 @@ impl<P: OnChainConfigProvider> EpochManager<P> {
network_sender,
self.storage.clone(),
onchain_consensus_config,
round_manager_tx,
checked_proposal_tx,
self.config.clone(),
);

round_manager.init(last_vote).await;

let (close_tx, close_rx) = oneshot::channel();
self.round_manager_close_tx = Some(close_tx);
tokio::spawn(round_manager.start(round_manager_rx, close_rx));
tokio::spawn(round_manager.start(round_manager_rx, checked_proposal_rx, close_rx));

self.spawn_block_retrieval_task(epoch, block_store);
}
Expand Down Expand Up @@ -932,6 +970,7 @@ impl<P: OnChainConfigProvider> EpochManager<P> {
let epoch_state = self.epoch_state.clone().unwrap();
let quorum_store_enabled = self.quorum_store_enabled;
let quorum_store_msg_tx = self.quorum_store_msg_tx.clone();
let proposal_precheck_tx = self.proposal_precheck_tx.clone();
let round_manager_tx = self.round_manager_tx.clone();
let my_peer_id = self.author;
let max_num_batches = self.config.quorum_store.receiver_max_num_batches;
Expand All @@ -951,6 +990,7 @@ impl<P: OnChainConfigProvider> EpochManager<P> {
Self::forward_event(
quorum_store_msg_tx,
round_manager_tx,
proposal_precheck_tx,
peer_id,
verified_event,
);
Expand Down Expand Up @@ -1071,6 +1111,7 @@ impl<P: OnChainConfigProvider> EpochManager<P> {
round_manager_tx: Option<
aptos_channel::Sender<(Author, Discriminant<VerifiedEvent>), (Author, VerifiedEvent)>,
>,
proposal_precheck_tx: Option<aptos_channel::Sender<Author, Box<ProposalMsg>>>,
peer_id: AccountAddress,
event: VerifiedEvent,
) {
Expand All @@ -1087,6 +1128,10 @@ impl<P: OnChainConfigProvider> EpochManager<P> {
Self::forward_event_to(quorum_store_msg_tx, peer_id, quorum_store_event)
.context("quorum store sender")
},
VerifiedEvent::ProposalMsg(proposal_msg) => {
Self::forward_event_to(proposal_precheck_tx, peer_id, proposal_msg)
.context("proposal precheck sender")
},
round_manager_event => Self::forward_event_to(
round_manager_tx,
(peer_id, discriminant(&round_manager_event)),
Expand Down
8 changes: 4 additions & 4 deletions consensus/src/liveness/proposal_generator_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ async fn test_proposal_generation_empty_tree() {
false,
);
let mut proposer_election =
UnequivocalProposerElection::new(Box::new(RotatingProposer::new(vec![signer.author()], 1)));
UnequivocalProposerElection::new(Arc::new(RotatingProposer::new(vec![signer.author()], 1)));
let genesis = block_store.ordered_root();

// Generate proposals for an empty tree.
Expand Down Expand Up @@ -83,7 +83,7 @@ async fn test_proposal_generation_parent() {
ChainHealthBackoffConfig::new_no_backoff(),
false,
);
let mut proposer_election = UnequivocalProposerElection::new(Box::new(RotatingProposer::new(
let mut proposer_election = UnequivocalProposerElection::new(Arc::new(RotatingProposer::new(
vec![inserter.signer().author()],
1,
)));
Expand Down Expand Up @@ -155,7 +155,7 @@ async fn test_old_proposal_generation() {
ChainHealthBackoffConfig::new_no_backoff(),
false,
);
let mut proposer_election = UnequivocalProposerElection::new(Box::new(RotatingProposer::new(
let mut proposer_election = UnequivocalProposerElection::new(Arc::new(RotatingProposer::new(
vec![inserter.signer().author()],
1,
)));
Expand Down Expand Up @@ -192,7 +192,7 @@ async fn test_correct_failed_authors() {
ChainHealthBackoffConfig::new_no_backoff(),
false,
);
let mut proposer_election = UnequivocalProposerElection::new(Box::new(RotatingProposer::new(
let mut proposer_election = UnequivocalProposerElection::new(Arc::new(RotatingProposer::new(
vec![author, peer1, peer2],
1,
)));
Expand Down
12 changes: 4 additions & 8 deletions consensus/src/liveness/unequivocal_proposer_election.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,14 +9,14 @@ use aptos_consensus_types::{
use aptos_crypto::HashValue;
use aptos_infallible::Mutex;
use aptos_logger::{error, SecurityEvent};
use std::cmp::Ordering;
use std::{cmp::Ordering, sync::Arc};

// Wrapper around ProposerElection.
//
// Provides is_valid_proposal that remembers, and rejects if
// the same leader proposes multiple blocks.
pub struct UnequivocalProposerElection {
proposer_election: Box<dyn ProposerElection + Send + Sync>,
proposer_election: Arc<dyn ProposerElection + Send + Sync>,
already_proposed: Mutex<(Round, HashValue)>,
}

Expand All @@ -32,7 +32,7 @@ impl ProposerElection for UnequivocalProposerElection {
}

impl UnequivocalProposerElection {
pub fn new(proposer_election: Box<dyn ProposerElection + Send + Sync>) -> Self {
pub fn new(proposer_election: Arc<dyn ProposerElection + Send + Sync>) -> Self {
Self {
proposer_election,
already_proposed: Mutex::new((0, HashValue::zero())),
Expand All @@ -56,7 +56,6 @@ impl UnequivocalProposerElection {
block.id()
);

println!("Not a valid author");
return false;
}
let mut already_proposed = self.already_proposed.lock();
Expand All @@ -82,10 +81,7 @@ impl UnequivocalProposerElection {
true
}
},
Ordering::Less => {
println!("Older Block");
false
},
Ordering::Less => false,
}
})
}
Expand Down
4 changes: 2 additions & 2 deletions consensus/src/liveness/unequivocal_proposer_election_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ use aptos_consensus_types::{
common::{Author, Payload, Round},
};
use aptos_types::validator_signer::ValidatorSigner;
use std::collections::HashMap;
use std::{collections::HashMap, sync::Arc};

struct MockProposerElection {
proposers: HashMap<Round, Author>,
Expand Down Expand Up @@ -83,7 +83,7 @@ fn test_is_valid_proposal() {
.unwrap();

let pe =
UnequivocalProposerElection::new(Box::new(MockProposerElection::new(HashMap::from([
UnequivocalProposerElection::new(Arc::new(MockProposerElection::new(HashMap::from([
(1, chosen_author),
(2, chosen_author),
]))));
Expand Down
1 change: 1 addition & 0 deletions consensus/src/logging.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ pub struct LogSchema {
pub enum LogEvent {
CommitViaBlock,
CommitViaSync,
NetworkReceiveProposal,
NewEpoch,
NewRound,
Propose,
Expand Down
8 changes: 7 additions & 1 deletion consensus/src/network.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ use crate::{
counters,
dag::{DAGMessage, DAGNetworkMessage, ProofNotifier, RpcWithFallback, TDAGNetworkSender},
experimental::commit_reliable_broadcast::CommitMessage,
logging::LogEvent,
logging::{LogEvent, LogSchema},
monitor,
network_interface::{ConsensusMsg, ConsensusNetworkClient, RPC},
quorum_store::types::{Batch, BatchMsg, BatchRequest},
Expand Down Expand Up @@ -641,6 +641,12 @@ impl NetworkTask {
proposal.proposal().timestamp_usecs(),
BlockStage::NETWORK_RECEIVED,
);
info!(
LogSchema::new(LogEvent::NetworkReceiveProposal)
.remote_peer(peer_id),
block_round = proposal.proposal().round(),
block_hash = proposal.proposal().id(),
);
}
Self::push_msg(peer_id, consensus_msg, &self.consensus_messages_tx);
},
Expand Down
Loading

0 comments on commit 9956fed

Please sign in to comment.