Skip to content

Commit

Permalink
[consensus] cancellation safety in payload manager
Browse files Browse the repository at this point in the history
  • Loading branch information
ibalajiarun committed Aug 19, 2024
1 parent 2dad5ed commit 6d3d7b5
Show file tree
Hide file tree
Showing 5 changed files with 207 additions and 130 deletions.
20 changes: 7 additions & 13 deletions consensus/consensus-types/src/common.rs
Original file line number Diff line number Diff line change
Expand Up @@ -418,22 +418,16 @@ impl Payload {
Payload::QuorumStoreInlineHybrid(b2, p3, m3)
},
(
Payload::QuorumStoreInlineHybrid(inline_batches, proofs, limit),
Payload::OptQuorumStore(opt_qs),
Payload::QuorumStoreInlineHybrid(_inline_batches, _proofs, _limit),
Payload::OptQuorumStore(_opt_qs),
)
| (
Payload::OptQuorumStore(opt_qs),
Payload::QuorumStoreInlineHybrid(inline_batches, proofs, limit),
Payload::OptQuorumStore(_opt_qs),
Payload::QuorumStoreInlineHybrid(_inline_batches, _proofs, _limit),
) => {
let execution_limits = PayloadExecutionLimit::max_txns_to_execute(limit);
let converted_payload = OptQuorumStorePayload::new(
inline_batches.into(),
Vec::new().into(),
proofs.into(),
execution_limits,
);
let opt_qs3 = opt_qs.extend(converted_payload);
Payload::OptQuorumStore(opt_qs3)
unimplemented!(
"Cannot extend OptQuorumStore with QuorumStoreInlineHybrid or viceversa"
)
},
(Payload::OptQuorumStore(opt_qs1), Payload::OptQuorumStore(opt_qs2)) => {
let opt_qs3 = opt_qs1.extend(opt_qs2);
Expand Down
53 changes: 38 additions & 15 deletions consensus/consensus-types/src/payload.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,18 @@ use crate::{
common::{DataStatus, ProofWithData},
proof_of_store::{BatchInfo, ProofOfStore},
};
use aptos_executor_types::ExecutorResult;
use aptos_infallible::Mutex;
use aptos_types::{transaction::SignedTransaction, PeerId};
use core::fmt;
use futures::{
future::{BoxFuture, Shared},
stream::Stream,
FutureExt,
};
use serde::{Deserialize, Serialize};
use std::{
fmt::Debug,
ops::{Deref, DerefMut},
sync::Arc,
};
Expand All @@ -24,11 +31,36 @@ pub trait TDataInfo {
fn signers(&self, ordered_authors: &[PeerId]) -> Vec<PeerId>;
}

pub struct DataFetchFut {
pub iteration: u32,
pub fut: Shared<BoxFuture<'static, ExecutorResult<Vec<SignedTransaction>>>>,
}

impl fmt::Debug for DataFetchFut {
fn fmt(&self, _: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
Ok(())
}
}

impl DataFetchFut {
pub fn extend(&mut self, other: DataFetchFut) {
let self_fut = self.fut.clone();
self.fut = async move {
let result1 = self_fut.await?;
let result2 = other.fut.await?;
let result = [result1, result2].concat();
Ok(result)
}
.boxed()
.shared();
}
}

#[derive(Deserialize, Serialize, Clone, Debug)]
pub struct BatchPointer<T> {
pub batch_summary: Vec<T>,
#[serde(skip)]
pub status: Arc<Mutex<Option<DataStatus>>>,
pub data_fut: Arc<Mutex<Option<DataFetchFut>>>,
}

impl<T> BatchPointer<T>
Expand All @@ -38,14 +70,14 @@ where
pub fn new(metadata: Vec<T>) -> Self {
Self {
batch_summary: metadata,
status: Arc::new(Mutex::new(None)),
data_fut: Arc::new(Mutex::new(None)),
}
}

pub fn extend(&mut self, other: BatchPointer<T>) {
let other_data_status = other.status.lock().as_mut().unwrap().take();
let other_data_status = other.data_fut.lock().take().expect("must be initialized");
self.batch_summary.extend(other.batch_summary);
let mut status = self.status.lock();
let mut status = self.data_fut.lock();
*status = match &mut *status {
None => Some(other_data_status),
Some(status) => {
Expand Down Expand Up @@ -81,15 +113,15 @@ where
fn from(value: Vec<T>) -> Self {
Self {
batch_summary: value,
status: Arc::new(Mutex::new(None)),
data_fut: Arc::new(Mutex::new(None)),
}
}
}

impl<T: PartialEq> PartialEq for BatchPointer<T> {
fn eq(&self, other: &Self) -> bool {
self.batch_summary == other.batch_summary
&& Arc::as_ptr(&self.status) == Arc::as_ptr(&other.status)
&& Arc::as_ptr(&self.data_fut) == Arc::as_ptr(&other.data_fut)
}
}

Expand All @@ -112,15 +144,6 @@ impl<T> IntoIterator for BatchPointer<T> {
}
}

impl From<ProofWithData> for BatchPointer<ProofOfStore> {
fn from(value: ProofWithData) -> Self {
Self {
batch_summary: value.proofs,
status: value.status,
}
}
}

#[derive(Serialize, Deserialize, Debug, Clone, PartialEq, Eq)]
pub enum PayloadExecutionLimit {
None,
Expand Down
4 changes: 0 additions & 4 deletions consensus/consensus-types/src/proof_of_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -138,10 +138,6 @@ impl BatchInfo {
pub fn gas_bucket_start(&self) -> u64 {
self.gas_bucket_start
}

pub fn is_expired(&self) -> bool {
self.expiration() < aptos_infallible::duration_since_epoch().as_micros() as u64
}
}

impl Display for BatchInfo {
Expand Down
Loading

0 comments on commit 6d3d7b5

Please sign in to comment.