Skip to content

Commit

Permalink
[pipeline] integration
Browse files Browse the repository at this point in the history
disable CO
  • Loading branch information
Zekun Li authored and Zekun Li committed Nov 5, 2024
1 parent 5ebd28c commit bcf68af
Show file tree
Hide file tree
Showing 14 changed files with 677 additions and 280 deletions.
4 changes: 2 additions & 2 deletions config/src/config/consensus_observer_config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,8 @@ use serde::{Deserialize, Serialize};
use serde_yaml::Value;

// Useful constants for enabling consensus observer on different node types
const ENABLE_ON_VALIDATORS: bool = true;
const ENABLE_ON_VALIDATOR_FULLNODES: bool = true;
const ENABLE_ON_VALIDATORS: bool = false;
const ENABLE_ON_VALIDATOR_FULLNODES: bool = false;
const ENABLE_ON_PUBLIC_FULLNODES: bool = false;

#[derive(Clone, Copy, Debug, Deserialize, PartialEq, Serialize)]
Expand Down
119 changes: 85 additions & 34 deletions consensus/consensus-types/src/pipelined_block.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,13 +6,15 @@ use crate::{
block::Block,
common::{Payload, Round},
order_vote_proposal::OrderVoteProposal,
pipeline::commit_vote::CommitVote,
pipeline_execution_result::PipelineExecutionResult,
quorum_cert::QuorumCert,
vote_proposal::VoteProposal,
};
use anyhow::Error;
use aptos_crypto::hash::{HashValue, ACCUMULATOR_PLACEHOLDER_HASH};
use aptos_crypto::{
bls12381,
hash::{HashValue, ACCUMULATOR_PLACEHOLDER_HASH},
};
use aptos_executor_types::{state_compute_result::StateComputeResult, ExecutorResult};
use aptos_infallible::Mutex;
use aptos_logger::{error, warn};
Expand All @@ -36,9 +38,12 @@ use std::{
sync::Arc,
time::{Duration, Instant},
};
use tokio::{sync::oneshot, task::JoinError};
use tokio::{
sync::oneshot,
task::{AbortHandle, JoinError},
};

#[derive(Clone)]
#[derive(Clone, Debug)]
pub enum TaskError {
JoinError(Arc<JoinError>),
InternalError(Arc<Error>),
Expand All @@ -61,21 +66,32 @@ impl From<Error> for TaskError {
pub type TaskResult<T> = Result<T, TaskError>;
pub type TaskFuture<T> = Shared<BoxFuture<'static, TaskResult<T>>>;

pub type PrepareResult = Vec<SignatureVerifiedTransaction>;
pub type ExecuteResult = ();
pub type LedgerUpdateResult = (StateComputeResult, Option<u64>);
pub type PostLedgerUpdateResult = ();
pub type CommitVoteResult = bls12381::Signature;
pub type PreCommitResult = StateComputeResult;
pub type PostPreCommitResult = ();
pub type CommitLedgerResult = Option<LedgerInfoWithSignatures>;
pub type PostCommitResult = ();

#[derive(Clone)]
pub struct PipelineFutures {
pub prepare_fut: TaskFuture<Vec<SignatureVerifiedTransaction>>,
pub execute_fut: TaskFuture<()>,
pub ledger_update_fut: TaskFuture<StateComputeResult>,
pub post_ledger_update_fut: TaskFuture<()>,
pub commit_vote_fut: TaskFuture<CommitVote>,
pub pre_commit_fut: TaskFuture<StateComputeResult>,
pub post_pre_commit_fut: TaskFuture<()>,
pub commit_ledger_fut: TaskFuture<LedgerInfoWithSignatures>,
pub post_commit_fut: TaskFuture<()>,
pub prepare_fut: TaskFuture<PrepareResult>,
pub execute_fut: TaskFuture<ExecuteResult>,
pub ledger_update_fut: TaskFuture<LedgerUpdateResult>,
pub post_ledger_update_fut: TaskFuture<PostLedgerUpdateResult>,
pub commit_vote_fut: TaskFuture<CommitVoteResult>,
pub pre_commit_fut: TaskFuture<PreCommitResult>,
pub post_pre_commit_fut: TaskFuture<PostPreCommitResult>,
pub commit_ledger_fut: TaskFuture<CommitLedgerResult>,
pub post_commit_fut: TaskFuture<PostCommitResult>,
}

pub struct PipelineTx {
pub rand_tx: oneshot::Sender<Option<Randomness>>,
pub order_vote_tx: oneshot::Sender<()>,
pub rand_tx: Option<oneshot::Sender<Option<Randomness>>>,
pub order_vote_tx: Option<oneshot::Sender<()>>,
pub order_proof_tx: tokio::sync::broadcast::Sender<()>,
pub commit_proof_tx: tokio::sync::broadcast::Sender<LedgerInfoWithSignatures>,
}
Expand Down Expand Up @@ -108,6 +124,12 @@ pub struct PipelinedBlock {
#[derivative(PartialEq = "ignore")]
pre_commit_fut: Arc<Mutex<Option<BoxFuture<'static, ExecutorResult<()>>>>>,
// pipeline related fields
#[derivative(PartialEq = "ignore")]
pipeline_futures: Option<PipelineFutures>,
#[derivative(PartialEq = "ignore")]
pipeline_tx: Option<Arc<Mutex<PipelineTx>>>,
#[derivative(PartialEq = "ignore")]
pipeline_abort_handle: Option<Vec<AbortHandle>>,
}

impl Serialize for PipelinedBlock {
Expand Down Expand Up @@ -151,15 +173,7 @@ impl<'de> Deserialize<'de> for PipelinedBlock {
randomness,
} = SerializedBlock::deserialize(deserializer)?;

let block = PipelinedBlock {
block,
input_transactions,
state_compute_result: StateComputeResult::new_dummy(),
randomness: OnceCell::new(),
pipeline_insertion_time: OnceCell::new(),
execution_summary: Arc::new(OnceCell::new()),
pre_commit_fut: Arc::new(Mutex::new(None)),
};
let block = PipelinedBlock::new(block, input_transactions, StateComputeResult::new_dummy());
if let Some(r) = randomness {
block.set_randomness(r);
}
Expand All @@ -168,6 +182,10 @@ impl<'de> Deserialize<'de> for PipelinedBlock {
}

impl PipelinedBlock {
pub fn set_compute_result(&mut self, compute_result: StateComputeResult) {
self.state_compute_result = compute_result;
}

pub fn set_execution_result(
mut self,
pipeline_execution_result: PipelineExecutionResult,
Expand Down Expand Up @@ -234,7 +252,7 @@ impl PipelinedBlock {
}

pub fn set_randomness(&self, randomness: Randomness) {
assert!(self.randomness.set(randomness).is_ok());
assert!(self.randomness.set(randomness.clone()).is_ok());
}

pub fn set_insertion_time(&self) {
Expand Down Expand Up @@ -275,19 +293,14 @@ impl PipelinedBlock {
pipeline_insertion_time: OnceCell::new(),
execution_summary: Arc::new(OnceCell::new()),
pre_commit_fut: Arc::new(Mutex::new(None)),
pipeline_futures: None,
pipeline_tx: None,
pipeline_abort_handle: None,
}
}

pub fn new_ordered(block: Block) -> Self {
Self {
block,
input_transactions: vec![],
state_compute_result: StateComputeResult::new_dummy(),
randomness: OnceCell::new(),
pipeline_insertion_time: OnceCell::new(),
execution_summary: Arc::new(OnceCell::new()),
pre_commit_fut: Arc::new(Mutex::new(None)),
}
Self::new(block, vec![], StateComputeResult::new_dummy())
}

pub fn block(&self) -> &Block {
Expand Down Expand Up @@ -388,6 +401,44 @@ impl PipelinedBlock {
pub fn get_execution_summary(&self) -> Option<ExecutionSummary> {
self.execution_summary.get().cloned()
}

pub fn pipeline_fut(&self) -> Option<&PipelineFutures> {
self.pipeline_futures.as_ref()
}

pub fn set_pipeline_fut(&mut self, pipeline_futures: PipelineFutures) {
self.pipeline_futures = Some(pipeline_futures);
}

pub fn set_pipeline_tx(&mut self, pipeline_tx: PipelineTx) {
self.pipeline_tx = Some(Arc::new(Mutex::new(pipeline_tx)));
}

pub fn set_pipeline_abort_handles(&mut self, abort_handles: Vec<AbortHandle>) {
self.pipeline_abort_handle = Some(abort_handles);
}

pub fn pipeline_tx(&self) -> Option<&Arc<Mutex<PipelineTx>>> {
self.pipeline_tx.as_ref()
}

pub fn abort_pipeline(&self) {
if let Some(abort_handles) = &self.pipeline_abort_handle {
for handle in abort_handles {
handle.abort();
}
}
}

pub async fn wait_until_complete(&self) {
// for all stages that involve executor
if let Some(futs) = &self.pipeline_futures {
let _ = futs.execute_fut.clone().await;
let _ = futs.commit_ledger_fut.clone().await;
let _ = futs.pre_commit_fut.clone().await;
let _ = futs.commit_ledger_fut.clone().await;
}
}
}

#[derive(Debug, Clone, Eq, PartialEq)]
Expand Down
72 changes: 59 additions & 13 deletions consensus/src/block_storage/block_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ use crate::{
persistent_liveness_storage::{
PersistentLivenessStorage, RecoveryData, RootInfo, RootMetadata,
},
pipeline::execution_client::TExecutionClient,
pipeline::{execution_client::TExecutionClient, pipeline_builder::PipelineBuilder},
util::time_service::TimeService,
};
use anyhow::{bail, ensure, format_err, Context};
Expand Down Expand Up @@ -88,6 +88,7 @@ pub struct BlockStore {
back_pressure_for_test: AtomicBool,
order_vote_enabled: bool,
pending_blocks: Arc<Mutex<PendingBlocks>>,
pipeline_builder: PipelineBuilder,
}

impl BlockStore {
Expand All @@ -101,6 +102,7 @@ impl BlockStore {
payload_manager: Arc<dyn TPayloadManager>,
order_vote_enabled: bool,
pending_blocks: Arc<Mutex<PendingBlocks>>,
pipeline_builder: PipelineBuilder,
) -> Self {
let highest_2chain_tc = initial_data.highest_2chain_timeout_certificate();
let (root, root_metadata, blocks, quorum_certs) = initial_data.take();
Expand All @@ -118,6 +120,8 @@ impl BlockStore {
payload_manager,
order_vote_enabled,
pending_blocks,
pipeline_builder,
None,
));
block_on(block_store.try_send_for_execution());
block_store
Expand Down Expand Up @@ -156,6 +160,8 @@ impl BlockStore {
payload_manager: Arc<dyn TPayloadManager>,
order_vote_enabled: bool,
pending_blocks: Arc<Mutex<PendingBlocks>>,
pipeline_builder: PipelineBuilder,
tree_to_replace: Option<Arc<RwLock<BlockTree>>>,
) -> Self {
let RootInfo(root_block, root_qc, root_ordered_cert, root_commit_cert) = root;

Expand Down Expand Up @@ -186,13 +192,17 @@ impl BlockStore {
));
assert_eq!(result.root_hash(), root_metadata.accu_hash);

let pipelined_root_block = PipelinedBlock::new(
let mut pipelined_root_block = PipelinedBlock::new(
*root_block,
vec![],
// Create a dummy state_compute_result with necessary fields filled in.
result,
result.clone(),
);

let pipeline_fut =
pipeline_builder.build_root(result, root_commit_cert.ledger_info().clone());
pipelined_root_block.set_pipeline_fut(pipeline_fut);

let tree = BlockTree::new(
pipelined_root_block,
root_qc,
Expand All @@ -201,9 +211,15 @@ impl BlockStore {
max_pruned_blocks_in_mem,
highest_2chain_timeout_cert.map(Arc::new),
);
let inner = if let Some(tree_to_replace) = tree_to_replace {
*tree_to_replace.write() = tree;
tree_to_replace
} else {
Arc::new(RwLock::new(tree))
};

let block_store = Self {
inner: Arc::new(RwLock::new(tree)),
inner,
execution_client,
storage,
time_service,
Expand All @@ -213,6 +229,7 @@ impl BlockStore {
back_pressure_for_test: AtomicBool::new(false),
order_vote_enabled,
pending_blocks,
pipeline_builder,
};

for block in blocks {
Expand Down Expand Up @@ -254,6 +271,12 @@ impl BlockStore {

assert!(!blocks_to_commit.is_empty());

// send order proof to pipeline
for block in &blocks_to_commit {
let pipeline_tx = block.pipeline_tx().unwrap().lock();
let _ = pipeline_tx.order_proof_tx.send(());
}

let block_tree = self.inner.clone();
let storage = self.storage.clone();
let finality_proof_clone = finality_proof.clone();
Expand Down Expand Up @@ -324,13 +347,11 @@ impl BlockStore {
self.payload_manager.clone(),
self.order_vote_enabled,
self.pending_blocks.clone(),
self.pipeline_builder.clone(),
Some(self.inner.clone()),
)
.await;

// Unwrap the new tree and replace the existing tree.
*self.inner.write() = Arc::try_unwrap(inner)
.unwrap_or_else(|_| panic!("New block tree is not shared"))
.into_inner();
self.try_send_for_execution().await;
}

Expand All @@ -351,7 +372,36 @@ impl BlockStore {
"Block with old round"
);

let pipelined_block = PipelinedBlock::new_ordered(block.clone());
if let Some(payload) = block.payload() {
self.payload_manager
.prefetch_payload_data(payload, block.timestamp_usecs());
}

let mut pipelined_block = PipelinedBlock::new_ordered(block.clone());

// build pipeline
let parent_block = self
.get_block(block.parent_id())
.ok_or_else(|| anyhow::anyhow!("Parent block not found"))?;

let block_tree = self.inner.clone();
let storage = self.storage.clone();
let id = block.id();
let round = block.round();
let callback = Box::new(move |commit_decision: LedgerInfoWithSignatures| {
block_tree
.write()
.commit_callback_v2(storage, id, round, commit_decision);
});
let (fut, tx, abort_handles) = self.pipeline_builder.build(
parent_block.pipeline_fut().unwrap(),
Arc::new(block),
callback,
);
pipelined_block.set_pipeline_fut(fut);
pipelined_block.set_pipeline_tx(tx);
pipelined_block.set_pipeline_abort_handles(abort_handles);

// ensure local time past the block time
let block_time = Duration::from_micros(pipelined_block.timestamp_usecs());
let current_timestamp = self.time_service.get_current_timestamp();
Expand All @@ -365,10 +415,6 @@ impl BlockStore {
}
self.time_service.wait_until(block_time).await;
}
if let Some(payload) = pipelined_block.block().payload() {
self.payload_manager
.prefetch_payload_data(payload, pipelined_block.block().timestamp_usecs());
}
self.storage
.save_tree(vec![pipelined_block.block().clone()], vec![])
.context("Insert block failed when saving block")?;
Expand Down
Loading

0 comments on commit bcf68af

Please sign in to comment.