From 5ebd28cc3bf7cda01786f2f49b77533e151ce0d4 Mon Sep 17 00:00:00 2001 From: Zekun Li Date: Mon, 28 Oct 2024 15:19:33 -0700 Subject: [PATCH] [pipeline] new pipeline --- .../consensus-types/src/pipelined_block.rs | 61 ++- consensus/src/block_storage/block_tree.rs | 33 ++ consensus/src/pipeline/mod.rs | 1 + consensus/src/pipeline/pipeline_builder.rs | 481 ++++++++++++++++++ consensus/src/state_computer.rs | 2 +- 5 files changed, 575 insertions(+), 3 deletions(-) create mode 100644 consensus/src/pipeline/pipeline_builder.rs diff --git a/consensus/consensus-types/src/pipelined_block.rs b/consensus/consensus-types/src/pipelined_block.rs index d080279e6aa6a..c197dac8cbe57 100644 --- a/consensus/consensus-types/src/pipelined_block.rs +++ b/consensus/consensus-types/src/pipelined_block.rs @@ -6,10 +6,12 @@ use crate::{ block::Block, common::{Payload, Round}, order_vote_proposal::OrderVoteProposal, + pipeline::commit_vote::CommitVote, pipeline_execution_result::PipelineExecutionResult, quorum_cert::QuorumCert, vote_proposal::VoteProposal, }; +use anyhow::Error; use aptos_crypto::hash::{HashValue, ACCUMULATOR_PLACEHOLDER_HASH}; use aptos_executor_types::{state_compute_result::StateComputeResult, ExecutorResult}; use aptos_infallible::Mutex; @@ -17,12 +19,16 @@ use aptos_logger::{error, warn}; use aptos_types::{ block_info::BlockInfo, contract_event::ContractEvent, + ledger_info::LedgerInfoWithSignatures, randomness::Randomness, - transaction::{SignedTransaction, TransactionStatus}, + transaction::{ + signature_verified_transaction::SignatureVerifiedTransaction, SignedTransaction, + TransactionStatus, + }, validator_txn::ValidatorTransaction, }; use derivative::Derivative; -use futures::future::BoxFuture; +use futures::future::{BoxFuture, Shared}; use once_cell::sync::OnceCell; use serde::{Deserialize, Deserializer, Serialize, Serializer}; use std::{ @@ -30,6 +36,56 @@ use std::{ sync::Arc, time::{Duration, Instant}, }; +use tokio::{sync::oneshot, task::JoinError}; + +#[derive(Clone)] +pub enum TaskError { + JoinError(Arc), + InternalError(Arc), +} + +impl Display for TaskError { + fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { + match self { + TaskError::JoinError(e) => write!(f, "JoinError: {}", e), + TaskError::InternalError(e) => write!(f, "InternalError: {}", e), + } + } +} + +impl From for TaskError { + fn from(value: Error) -> Self { + Self::InternalError(Arc::new(value)) + } +} +pub type TaskResult = Result; +pub type TaskFuture = Shared>>; + +pub struct PipelineFutures { + pub prepare_fut: TaskFuture>, + pub execute_fut: TaskFuture<()>, + pub ledger_update_fut: TaskFuture, + pub post_ledger_update_fut: TaskFuture<()>, + pub commit_vote_fut: TaskFuture, + pub pre_commit_fut: TaskFuture, + pub post_pre_commit_fut: TaskFuture<()>, + pub commit_ledger_fut: TaskFuture, + pub post_commit_fut: TaskFuture<()>, +} + +pub struct PipelineTx { + pub rand_tx: oneshot::Sender>, + pub order_vote_tx: oneshot::Sender<()>, + pub order_proof_tx: tokio::sync::broadcast::Sender<()>, + pub commit_proof_tx: tokio::sync::broadcast::Sender, +} + +pub struct PipelineRx { + pub rand_rx: oneshot::Receiver>, + pub order_vote_rx: oneshot::Receiver<()>, + pub order_proof_rx: tokio::sync::broadcast::Receiver<()>, + pub commit_proof_rx: tokio::sync::broadcast::Receiver, +} /// A representation of a block that has been added to the execution pipeline. It might either be in ordered /// or in executed state. In the ordered state, the block is waiting to be executed. In the executed state, @@ -51,6 +107,7 @@ pub struct PipelinedBlock { execution_summary: Arc>, #[derivative(PartialEq = "ignore")] pre_commit_fut: Arc>>>>, + // pipeline related fields } impl Serialize for PipelinedBlock { diff --git a/consensus/src/block_storage/block_tree.rs b/consensus/src/block_storage/block_tree.rs index 4d13c742747d4..6c5db6acc831d 100644 --- a/consensus/src/block_storage/block_tree.rs +++ b/consensus/src/block_storage/block_tree.rs @@ -479,6 +479,39 @@ impl BlockTree { self.process_pruned_blocks(ids_to_remove); self.update_highest_commit_cert(commit_proof); } + + pub fn commit_callback_v2( + &mut self, + storage: Arc, + block_id: HashValue, + block_round: Round, + finality_proof: WrappedLedgerInfo, + commit_decision: LedgerInfoWithSignatures, + ) { + let commit_proof = finality_proof + .create_merged_with_executed_state(commit_decision) + .expect("Inconsistent commit proof and evaluation decision, cannot commit block"); + + // let block_to_commit = blocks_to_commit.last().expect("pipeline is empty").clone(); + // update_counters_for_committed_blocks(blocks_to_commit); + let current_round = self.commit_root().round(); + let committed_round = block_round; + debug!( + LogSchema::new(LogEvent::CommitViaBlock).round(current_round), + committed_round = committed_round, + block_id = block_id, + ); + + let ids_to_remove = self.find_blocks_to_prune(block_id); + if let Err(e) = storage.prune_tree(ids_to_remove.clone().into_iter().collect()) { + // it's fine to fail here, as long as the commit succeeds, the next restart will clean + // up dangling blocks, and we need to prune the tree to keep the root consistent with + // executor. + warn!(error = ?e, "fail to delete block"); + } + self.process_pruned_blocks(ids_to_remove); + self.update_highest_commit_cert(commit_proof); + } } #[cfg(any(test, feature = "fuzzing"))] diff --git a/consensus/src/pipeline/mod.rs b/consensus/src/pipeline/mod.rs index 11269562e9cfa..7a0163d2ee015 100644 --- a/consensus/src/pipeline/mod.rs +++ b/consensus/src/pipeline/mod.rs @@ -36,5 +36,6 @@ pub mod pipeline_phase; pub mod signing_phase; pub mod execution_client; +mod pipeline_builder; #[cfg(test)] mod tests; diff --git a/consensus/src/pipeline/pipeline_builder.rs b/consensus/src/pipeline/pipeline_builder.rs new file mode 100644 index 0000000000000..ebe7e091707ae --- /dev/null +++ b/consensus/src/pipeline/pipeline_builder.rs @@ -0,0 +1,481 @@ +// Copyright © Aptos Foundation +// SPDX-License-Identifier: Apache-2.0 + +use crate::{ + block_preparer::BlockPreparer, counters, execution_pipeline::SIG_VERIFY_POOL, monitor, + payload_manager::TPayloadManager, txn_notifier::TxnNotifier, +}; +use anyhow::anyhow; +use aptos_consensus_notifications::ConsensusNotificationSender; +use aptos_consensus_types::{ + block::Block, + common::Round, + pipeline::commit_vote::CommitVote, + pipelined_block::{ + PipelineFutures, PipelineRx, PipelineTx, PipelinedBlock, TaskError, TaskFuture, TaskResult, + }, +}; +use aptos_crypto::HashValue; +use aptos_executor_types::{state_compute_result::StateComputeResult, BlockExecutorTrait}; +use aptos_experimental_runtimes::thread_manager::optimal_min_len; +use aptos_logger::{error, warn}; +use aptos_types::{ + block_executor::config::BlockExecutorConfigFromOnchain, + ledger_info::{LedgerInfo, LedgerInfoWithSignatures}, + randomness::Randomness, + transaction::{ + signature_verified_transaction::{SignatureVerifiedTransaction, TransactionProvider}, + SignedTransaction, Transaction, + }, + validator_signer::ValidatorSigner, +}; +use futures::FutureExt; +use move_core_types::account_address::AccountAddress; +use rayon::prelude::*; +use std::{ + future::Future, + sync::Arc, + time::{Duration, Instant}, +}; +use tokio::{select, sync::oneshot}; + +struct PipelineBuilder { + block_preparer: Arc, + executor: Arc, + validators: Arc<[AccountAddress]>, + block_executor_onchain_config: BlockExecutorConfigFromOnchain, + is_randomness_enabled: bool, + signer: Arc, + state_sync_notifier: Arc, + payload_manager: Arc, + txn_notifier: Arc, +} + +fn spawn_shared_fut< + T: Send + Clone + 'static, + F: Future> + Send + 'static, +>( + f: F, +) -> TaskFuture { + let join_handle = tokio::spawn(f); + async move { + match join_handle.await { + Ok(Ok(res)) => Ok(res), + Ok(Err(e)) => Err(e), + Err(e) => Err(TaskError::JoinError(Arc::new(e))), + } + } + .boxed() + .shared() +} + +async fn wait_and_log_error>>(f: F, msg: String) { + if let Err(TaskError::InternalError(e)) = f.await { + warn!("{} failed: {}", msg, e); + } +} + +// TODO: add counters for each phase +impl PipelineBuilder { + fn channel() -> (PipelineTx, PipelineRx) { + let (rand_tx, rand_rx) = oneshot::channel(); + let (order_vote_tx, order_vote_rx) = oneshot::channel(); + let (order_proof_tx, order_proof_rx) = tokio::sync::broadcast::channel(1); + let (commit_proof_tx, commit_proof_rx) = tokio::sync::broadcast::channel(1); + ( + PipelineTx { + rand_tx, + order_vote_tx, + order_proof_tx, + commit_proof_tx, + }, + PipelineRx { + rand_rx, + order_vote_rx, + order_proof_rx, + commit_proof_rx, + }, + ) + } + + fn build( + &self, + parent: &PipelineFutures, + block: Arc, + block_store_callback: Box< + dyn FnOnce(HashValue, Round, LedgerInfoWithSignatures) + Send + Sync, + >, + ) -> (PipelineFutures, PipelineTx) { + let (tx, rx) = Self::channel(); + let PipelineRx { + rand_rx, + order_vote_rx, + order_proof_rx, + commit_proof_rx, + } = rx; + + let prepare_fut = + spawn_shared_fut(Self::prepare(self.block_preparer.clone(), block.clone())); + let execute_fut = spawn_shared_fut(Self::execute( + prepare_fut.clone(), + parent.execute_fut.clone(), + rand_rx, + self.executor.clone(), + block.clone(), + self.is_randomness_enabled, + self.validators.clone(), + self.block_executor_onchain_config.clone(), + )); + let ledger_update_fut = spawn_shared_fut(Self::ledger_update( + execute_fut.clone(), + parent.ledger_update_fut.clone(), + self.executor.clone(), + block.clone(), + )); + let commit_vote_fut = spawn_shared_fut(Self::sign_commit_vote( + ledger_update_fut.clone(), + order_vote_rx, + order_proof_rx.resubscribe(), + commit_proof_rx.resubscribe(), + self.signer.clone(), + block.clone(), + )); + let pre_commit_fut = spawn_shared_fut(Self::pre_commit( + ledger_update_fut.clone(), + parent.pre_commit_fut.clone(), + order_proof_rx, + self.executor.clone(), + block.id(), + )); + let commit_ledger_fut = spawn_shared_fut(Self::commit_ledger( + commit_proof_rx, + parent.commit_ledger_fut.clone(), + self.executor.clone(), + )); + + let post_ledger_update_fut = spawn_shared_fut(Self::post_ledger_update( + prepare_fut.clone(), + ledger_update_fut.clone(), + self.txn_notifier.clone(), + )); + let post_pre_commit_fut = spawn_shared_fut(Self::post_pre_commit( + pre_commit_fut.clone(), + parent.post_pre_commit_fut.clone(), + self.state_sync_notifier.clone(), + self.payload_manager.clone(), + block.clone(), + )); + let post_commit_fut = spawn_shared_fut(Self::post_commit_ledger( + commit_ledger_fut.clone(), + parent.post_commit_fut.clone(), + block_store_callback, + block, + )); + ( + PipelineFutures { + prepare_fut, + execute_fut, + ledger_update_fut, + post_ledger_update_fut, + commit_vote_fut, + pre_commit_fut, + post_pre_commit_fut, + commit_ledger_fut, + post_commit_fut, + }, + tx, + ) + } + + async fn prepare( + preparer: Arc, + block: Arc, + ) -> TaskResult> { + let input_txns = loop { + match preparer.prepare_block(&block).await { + Ok(input_txns) => break input_txns, + Err(e) => { + warn!( + "[BlockPreparer] failed to prepare block {}, retrying: {}", + block.id(), + e + ); + tokio::time::sleep(Duration::from_millis(100)).await; + }, + } + }; + let sig_verification_start = Instant::now(); + let sig_verified_txns: Vec = SIG_VERIFY_POOL.install(|| { + let num_txns = input_txns.len(); + input_txns + .into_par_iter() + .with_min_len(optimal_min_len(num_txns, 32)) + .map(|t| Transaction::UserTransaction(t).into()) + .collect::>() + }); + counters::PREPARE_BLOCK_SIG_VERIFICATION_TIME + .observe_duration(sig_verification_start.elapsed()); + Ok(sig_verified_txns) + } + + async fn execute( + prepare_phase: TaskFuture>, + parent_block_execute_phase: TaskFuture<()>, + randomness_rx: oneshot::Receiver>, + executor: Arc, + block: Arc, + is_randomness_enabled: bool, + validator: Arc<[AccountAddress]>, + onchain_execution_config: BlockExecutorConfigFromOnchain, + ) -> TaskResult<()> { + parent_block_execute_phase.await?; + let user_txns = prepare_phase.await?; + let maybe_rand = randomness_rx + .await + .map_err(|_| anyhow!("randomness tx cancelled"))?; + let metadata_txn = if is_randomness_enabled { + block.new_metadata_with_randomness(&validator, maybe_rand) + } else { + block.new_block_metadata(&validator).into() + }; + let txns = [ + vec![SignatureVerifiedTransaction::from(Transaction::from( + metadata_txn, + ))], + block + .validator_txns() + .cloned() + .unwrap_or_default() + .into_iter() + .map(Transaction::ValidatorTransaction) + .map(SignatureVerifiedTransaction::from) + .collect(), + user_txns, + ] + .concat(); + tokio::task::spawn_blocking(move || { + executor + .execute_and_state_checkpoint( + (block.id(), txns).into(), + block.parent_id(), + onchain_execution_config, + ) + .map_err(anyhow::Error::from) + }) + .await + .expect("spawn blocking failed")?; + Ok(()) + } + + async fn ledger_update( + execute_phase: TaskFuture<()>, + parent_block_ledger_update_phase: TaskFuture, + executor: Arc, + block: Arc, + ) -> TaskResult { + parent_block_ledger_update_phase.await?; + execute_phase.await?; + let result = tokio::task::spawn_blocking(move || { + executor + .ledger_update(block.id(), block.parent_id()) + .map_err(anyhow::Error::from) + }) + .await + .expect("spawn blocking failed")?; + Ok(result) + } + + async fn post_ledger_update( + prepare_fut: TaskFuture>, + ledger_update: TaskFuture, + mempool_notifier: Arc, + ) -> TaskResult<()> { + let user_txns = prepare_fut.await?; + let compute_result = ledger_update.await?; + let compute_status = compute_result.compute_status_for_input_txns(); + // the length of compute_status is user_txns.len() + num_vtxns + 1 due to having blockmetadata + if user_txns.len() >= compute_status.len() { + // reconfiguration suffix blocks don't have any transactions + // otherwise, this is an error + if !compute_status.is_empty() { + error!( + "Expected compute_status length and actual compute_status length mismatch! user_txns len: {}, compute_status len: {}, has_reconfiguration: {}", + user_txns.len(), + compute_status.len(), + compute_result.has_reconfiguration(), + ); + } + } else { + let user_txn_status = &compute_status[compute_status.len() - user_txns.len()..]; + // todo: avoid clone + let txns: Vec = user_txns + .iter() + .flat_map(|txn| txn.get_transaction().map(|t| t.try_as_signed_user_txn())) + .flatten() + .cloned() + .collect(); + + // notify mempool about failed transaction + if let Err(e) = mempool_notifier + .notify_failed_txn(&txns, user_txn_status) + .await + { + error!( + error = ?e, "Failed to notify mempool of rejected txns", + ); + } + } + Ok(()) + } + + async fn sign_commit_vote( + ledger_update_phase: TaskFuture, + order_vote_rx: oneshot::Receiver<()>, + mut order_proof_rx: tokio::sync::broadcast::Receiver<()>, + mut commit_proof_rx: tokio::sync::broadcast::Receiver, + signer: Arc, + block: Arc, + ) -> TaskResult { + let compute_result = ledger_update_phase.await?; + let block_info = block.gen_block_info( + compute_result.root_hash(), + compute_result.last_version_or_0(), + compute_result.epoch_state().clone(), + ); + let ledger_info = LedgerInfo::new(block_info, HashValue::zero()); + // either order_vote_rx or order_proof_rx can trigger the next phase + select! { + Ok(_) = order_vote_rx => { + } + Ok(_) = order_proof_rx.recv() => { + } + Ok(_) = commit_proof_rx.recv() => { + } + } + let signature = signer.sign(&ledger_info).unwrap(); + Ok(CommitVote::new_with_signature( + signer.author(), + ledger_info, + signature, + )) + } + + async fn pre_commit( + ledger_update_phase: TaskFuture, + // TODO bound parent_commit_ledger too + parent_block_pre_commit_phase: TaskFuture, + mut order_proof_rx: tokio::sync::broadcast::Receiver<()>, + executor: Arc, + block_id: HashValue, + ) -> TaskResult { + let compute_result = ledger_update_phase.await?; + parent_block_pre_commit_phase.await?; + order_proof_rx + .recv() + .await + .map_err(|_| anyhow!("order proof tx cancelled"))?; + tokio::task::spawn_blocking(move || { + executor + .pre_commit_block(block_id) + .map_err(anyhow::Error::from) + }) + .await + .expect("spawn blocking failed")?; + Ok(compute_result) + } + + async fn post_pre_commit( + pre_commit: TaskFuture, + parent_post_pre_commit: TaskFuture<()>, + state_sync_notifier: Arc, + payload_manager: Arc, + block: Arc, + ) -> TaskResult<()> { + let compute_result = pre_commit.await?; + parent_post_pre_commit.await?; + let payload = block.payload().cloned(); + let timestamp = block.timestamp_usecs(); + let _timer = counters::OP_COUNTERS.timer("pre_commit_notify"); + + let txns = compute_result.transactions_to_commit().to_vec(); + let subscribable_events = compute_result.subscribable_events().to_vec(); + if let Err(e) = monitor!( + "notify_state_sync", + state_sync_notifier + .notify_new_commit(txns, subscribable_events) + .await + ) { + error!(error = ?e, "Failed to notify state synchronizer"); + } + + let payload_vec = payload.into_iter().collect(); + payload_manager.notify_commit(timestamp, payload_vec); + Ok(()) + } + + async fn commit_ledger( + mut commit_proof_rx: tokio::sync::broadcast::Receiver, + parent_block_commit_phase: TaskFuture, + executor: Arc, + ) -> TaskResult { + parent_block_commit_phase.await?; + let ledger_info_with_sigs = commit_proof_rx + .recv() + .await + .map_err(|_| anyhow!("commit rx cancelled"))?; + let ledger_info_with_sigs_clone = ledger_info_with_sigs.clone(); + tokio::task::spawn_blocking(move || { + executor + .commit_ledger(ledger_info_with_sigs_clone) + .map_err(anyhow::Error::from) + }) + .await + .expect("spawn blocking failed")?; + Ok(ledger_info_with_sigs) + } + + async fn post_commit_ledger( + commit_ledger: TaskFuture, + parent_post_commit: TaskFuture<()>, + block_store_callback: Box< + dyn FnOnce(HashValue, Round, LedgerInfoWithSignatures) + Send + Sync, + >, + block: Arc, + ) -> TaskResult<()> { + let ledger_info_with_sigs = commit_ledger.await?; + parent_post_commit.await?; + block_store_callback(block.id(), block.round(), ledger_info_with_sigs); + Ok(()) + } + + async fn monitor(epoch: u64, round: Round, block_id: HashValue, all_futs: PipelineFutures) { + let PipelineFutures { + prepare_fut, + execute_fut, + ledger_update_fut, + post_ledger_update_fut: _, + commit_vote_fut: _, + pre_commit_fut, + post_pre_commit_fut: _, + commit_ledger_fut, + post_commit_fut: _, + } = all_futs; + wait_and_log_error(prepare_fut, format!("{epoch} {round} {block_id} prepare")).await; + wait_and_log_error(execute_fut, format!("{epoch} {round} {block_id} execute")).await; + wait_and_log_error( + ledger_update_fut, + format!("{epoch} {round} {block_id} ledger update"), + ) + .await; + wait_and_log_error( + pre_commit_fut, + format!("{epoch} {round} {block_id} pre commit"), + ) + .await; + wait_and_log_error( + commit_ledger_fut, + format!("{epoch} {round} {block_id} commit ledger"), + ) + .await; + } +} diff --git a/consensus/src/state_computer.rs b/consensus/src/state_computer.rs index f63bc70e77e7a..93eceae6b4492 100644 --- a/consensus/src/state_computer.rs +++ b/consensus/src/state_computer.rs @@ -264,7 +264,7 @@ impl StateComputer for ExecutionProxy { // notify mempool about failed transaction if let Err(e) = txn_notifier - .notify_failed_txn(user_txns, user_txn_status) + .notify_failed_txn(&user_txns, user_txn_status) .await { error!(