Skip to content

Commit

Permalink
[pipeline] new pipeline
Browse files Browse the repository at this point in the history
  • Loading branch information
Zekun Li authored and Zekun Li committed Oct 31, 2024
1 parent b92cbb8 commit 5ebd28c
Show file tree
Hide file tree
Showing 5 changed files with 575 additions and 3 deletions.
61 changes: 59 additions & 2 deletions consensus/consensus-types/src/pipelined_block.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,30 +6,86 @@ use crate::{
block::Block,
common::{Payload, Round},
order_vote_proposal::OrderVoteProposal,
pipeline::commit_vote::CommitVote,
pipeline_execution_result::PipelineExecutionResult,
quorum_cert::QuorumCert,
vote_proposal::VoteProposal,
};
use anyhow::Error;
use aptos_crypto::hash::{HashValue, ACCUMULATOR_PLACEHOLDER_HASH};
use aptos_executor_types::{state_compute_result::StateComputeResult, ExecutorResult};
use aptos_infallible::Mutex;
use aptos_logger::{error, warn};
use aptos_types::{
block_info::BlockInfo,
contract_event::ContractEvent,
ledger_info::LedgerInfoWithSignatures,
randomness::Randomness,
transaction::{SignedTransaction, TransactionStatus},
transaction::{
signature_verified_transaction::SignatureVerifiedTransaction, SignedTransaction,
TransactionStatus,
},
validator_txn::ValidatorTransaction,
};
use derivative::Derivative;
use futures::future::BoxFuture;
use futures::future::{BoxFuture, Shared};
use once_cell::sync::OnceCell;
use serde::{Deserialize, Deserializer, Serialize, Serializer};
use std::{
fmt::{Debug, Display, Formatter},
sync::Arc,
time::{Duration, Instant},
};
use tokio::{sync::oneshot, task::JoinError};

#[derive(Clone)]
pub enum TaskError {
JoinError(Arc<JoinError>),
InternalError(Arc<Error>),
}

impl Display for TaskError {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
match self {
TaskError::JoinError(e) => write!(f, "JoinError: {}", e),
TaskError::InternalError(e) => write!(f, "InternalError: {}", e),
}
}
}

impl From<Error> for TaskError {
fn from(value: Error) -> Self {
Self::InternalError(Arc::new(value))
}
}
pub type TaskResult<T> = Result<T, TaskError>;
pub type TaskFuture<T> = Shared<BoxFuture<'static, TaskResult<T>>>;

pub struct PipelineFutures {
pub prepare_fut: TaskFuture<Vec<SignatureVerifiedTransaction>>,
pub execute_fut: TaskFuture<()>,
pub ledger_update_fut: TaskFuture<StateComputeResult>,
pub post_ledger_update_fut: TaskFuture<()>,
pub commit_vote_fut: TaskFuture<CommitVote>,
pub pre_commit_fut: TaskFuture<StateComputeResult>,
pub post_pre_commit_fut: TaskFuture<()>,
pub commit_ledger_fut: TaskFuture<LedgerInfoWithSignatures>,
pub post_commit_fut: TaskFuture<()>,
}

pub struct PipelineTx {
pub rand_tx: oneshot::Sender<Option<Randomness>>,
pub order_vote_tx: oneshot::Sender<()>,
pub order_proof_tx: tokio::sync::broadcast::Sender<()>,
pub commit_proof_tx: tokio::sync::broadcast::Sender<LedgerInfoWithSignatures>,
}

pub struct PipelineRx {
pub rand_rx: oneshot::Receiver<Option<Randomness>>,
pub order_vote_rx: oneshot::Receiver<()>,
pub order_proof_rx: tokio::sync::broadcast::Receiver<()>,
pub commit_proof_rx: tokio::sync::broadcast::Receiver<LedgerInfoWithSignatures>,
}

/// A representation of a block that has been added to the execution pipeline. It might either be in ordered
/// or in executed state. In the ordered state, the block is waiting to be executed. In the executed state,
Expand All @@ -51,6 +107,7 @@ pub struct PipelinedBlock {
execution_summary: Arc<OnceCell<ExecutionSummary>>,
#[derivative(PartialEq = "ignore")]
pre_commit_fut: Arc<Mutex<Option<BoxFuture<'static, ExecutorResult<()>>>>>,
// pipeline related fields
}

impl Serialize for PipelinedBlock {
Expand Down
33 changes: 33 additions & 0 deletions consensus/src/block_storage/block_tree.rs
Original file line number Diff line number Diff line change
Expand Up @@ -479,6 +479,39 @@ impl BlockTree {
self.process_pruned_blocks(ids_to_remove);
self.update_highest_commit_cert(commit_proof);
}

pub fn commit_callback_v2(
&mut self,
storage: Arc<dyn PersistentLivenessStorage>,
block_id: HashValue,
block_round: Round,
finality_proof: WrappedLedgerInfo,
commit_decision: LedgerInfoWithSignatures,
) {
let commit_proof = finality_proof
.create_merged_with_executed_state(commit_decision)
.expect("Inconsistent commit proof and evaluation decision, cannot commit block");

// let block_to_commit = blocks_to_commit.last().expect("pipeline is empty").clone();
// update_counters_for_committed_blocks(blocks_to_commit);
let current_round = self.commit_root().round();
let committed_round = block_round;
debug!(
LogSchema::new(LogEvent::CommitViaBlock).round(current_round),
committed_round = committed_round,
block_id = block_id,
);

let ids_to_remove = self.find_blocks_to_prune(block_id);
if let Err(e) = storage.prune_tree(ids_to_remove.clone().into_iter().collect()) {
// it's fine to fail here, as long as the commit succeeds, the next restart will clean
// up dangling blocks, and we need to prune the tree to keep the root consistent with
// executor.
warn!(error = ?e, "fail to delete block");
}
self.process_pruned_blocks(ids_to_remove);
self.update_highest_commit_cert(commit_proof);
}
}

#[cfg(any(test, feature = "fuzzing"))]
Expand Down
1 change: 1 addition & 0 deletions consensus/src/pipeline/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,5 +36,6 @@ pub mod pipeline_phase;
pub mod signing_phase;

pub mod execution_client;
mod pipeline_builder;
#[cfg(test)]
mod tests;
Loading

0 comments on commit 5ebd28c

Please sign in to comment.