Skip to content

Commit

Permalink
draft 3
Browse files Browse the repository at this point in the history
  • Loading branch information
Zekun Li authored and Zekun Li committed Nov 5, 2024
1 parent fab3258 commit bcccbf5
Show file tree
Hide file tree
Showing 6 changed files with 48 additions and 9 deletions.
8 changes: 3 additions & 5 deletions consensus/consensus-types/src/pipelined_block.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,15 +6,13 @@ use crate::{
block::Block,
common::{Payload, Round},
order_vote_proposal::OrderVoteProposal,
pipeline::commit_vote::CommitVote,
pipeline_execution_result::PipelineExecutionResult,
quorum_cert::QuorumCert,
vote_proposal::VoteProposal,
};
use anyhow::Error;
use aptos_crypto::{
bls12381,
hash::{HashValue, ACCUMULATOR_PLACEHOLDER_HASH},
};
use aptos_crypto::hash::{HashValue, ACCUMULATOR_PLACEHOLDER_HASH};
use aptos_executor_types::{state_compute_result::StateComputeResult, ExecutorResult};
use aptos_infallible::Mutex;
use aptos_logger::{error, warn};
Expand Down Expand Up @@ -70,7 +68,7 @@ pub type PrepareResult = Arc<Vec<SignatureVerifiedTransaction>>;
pub type ExecuteResult = ();
pub type LedgerUpdateResult = (StateComputeResult, Option<u64>);
pub type PostLedgerUpdateResult = ();
pub type CommitVoteResult = bls12381::Signature;
pub type CommitVoteResult = CommitVote;
pub type PreCommitResult = StateComputeResult;
pub type PostPreCommitResult = ();
pub type CommitLedgerResult = Option<LedgerInfoWithSignatures>;
Expand Down
6 changes: 6 additions & 0 deletions consensus/src/network.rs
Original file line number Diff line number Diff line change
Expand Up @@ -420,6 +420,12 @@ impl NetworkSender {
self.broadcast(msg).await
}

pub async fn broadcast_commit_vote(&self, commit_vote: CommitVote) {
fail_point!("consensus::send::order_vote", |_| ());
let msg = ConsensusMsg::CommitMessage(Box::new(CommitMessage::Vote(commit_vote)));
self.broadcast(msg).await
}

pub async fn broadcast_fast_share(&self, share: FastShare<Share>) {
fail_point!("consensus::send::broadcast_share", |_| ());
let msg = tokio::task::spawn_blocking(|| {
Expand Down
13 changes: 10 additions & 3 deletions consensus/src/pipeline/pipeline_builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -190,8 +190,11 @@ impl PipelineBuilder {
let prepare_fut = spawn_ready_fut(Arc::new(vec![]));
let execute_fut = spawn_ready_fut(());
let ledger_update_fut = spawn_ready_fut((compute_result.clone(), None));
let commit_vote_fut =
spawn_ready_fut(self.signer.sign(commit_proof.ledger_info()).unwrap());
let commit_vote_fut = spawn_ready_fut(CommitVote::new_with_signature(
self.signer.author(),
commit_proof.ledger_info().clone(),
self.signer.sign(commit_proof.ledger_info()).unwrap(),
));
let pre_commit_fut = spawn_ready_fut(compute_result);
let commit_ledger_fut = spawn_ready_fut(Some(commit_proof));
let post_ledger_update_fut = spawn_ready_fut(());
Expand Down Expand Up @@ -526,7 +529,11 @@ impl PipelineBuilder {
let ledger_info = LedgerInfo::new(block_info, HashValue::zero());
info!("[Pipeline] Signed ledger info {ledger_info}");
let signature = signer.sign(&ledger_info).unwrap();
Ok(signature)
Ok(CommitVote::new_with_signature(
signer.author(),
ledger_info,
signature,
))
}

async fn pre_commit(
Expand Down
1 change: 1 addition & 0 deletions consensus/src/pipeline/signing_phase.rs
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,7 @@ impl StatelessPipeline for SigningPhase {
.commit_vote_fut
.clone()
.await
.map(|sig| sig.signature().clone())
.map_err(|_| Error::InternalError("Failed to sign commit vote".to_string())),
// signature_result: self
// .safety_rule_handle
Expand Down
26 changes: 26 additions & 0 deletions consensus/src/round_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1161,6 +1161,12 @@ impl RoundManager {
.await
.context("[RoundManager] Failed to execute_and_insert the block")?;

if !self.randomness_config.randomness_enabled() {
if let Some(tx) = block_arc.pipeline_tx().unwrap().lock().rand_tx.take() {
let _ = tx.send(None);
}
}

// Short circuit if already voted.
ensure!(
self.round_state.vote_sent().is_none(),
Expand Down Expand Up @@ -1318,6 +1324,26 @@ impl RoundManager {
self.new_log(LogEvent::BroadcastOrderVote),
"{}", order_vote_msg
);
if let Some(tx) = proposed_block
.pipeline_tx()
.unwrap()
.lock()
.order_vote_tx
.take()
{
let _ = tx.send(());
}
let commit_vote = proposed_block
.pipeline_fut()
.unwrap()
.commit_vote_fut
.clone();
let network = self.network.clone();
tokio::spawn(async move {
if let Ok(commit_vote) = commit_vote.await {
network.broadcast_commit_vote(commit_vote).await;
}
});
self.network.broadcast_order_vote(order_vote_msg).await;
ORDER_VOTE_BROADCASTED.inc();
}
Expand Down
3 changes: 2 additions & 1 deletion types/src/on_chain_config/randomness_config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -199,7 +199,8 @@ impl OnChainRandomnessConfig {
}

pub fn default_for_genesis() -> Self {
OnChainRandomnessConfig::V2(ConfigV2::default())
Self::default_disabled()
// OnChainRandomnessConfig::V2(ConfigV2::default())
}

pub fn randomness_enabled(&self) -> bool {
Expand Down

0 comments on commit bcccbf5

Please sign in to comment.