diff --git a/consensus/consensus-types/src/common.rs b/consensus/consensus-types/src/common.rs index fdbce2e0d4558..7dbc1888b7203 100644 --- a/consensus/consensus-types/src/common.rs +++ b/consensus/consensus-types/src/common.rs @@ -418,22 +418,16 @@ impl Payload { Payload::QuorumStoreInlineHybrid(b2, p3, m3) }, ( - Payload::QuorumStoreInlineHybrid(inline_batches, proofs, limit), - Payload::OptQuorumStore(opt_qs), + Payload::QuorumStoreInlineHybrid(_inline_batches, _proofs, _limit), + Payload::OptQuorumStore(_opt_qs), ) | ( - Payload::OptQuorumStore(opt_qs), - Payload::QuorumStoreInlineHybrid(inline_batches, proofs, limit), + Payload::OptQuorumStore(_opt_qs), + Payload::QuorumStoreInlineHybrid(_inline_batches, _proofs, _limit), ) => { - let execution_limits = PayloadExecutionLimit::max_txns_to_execute(limit); - let converted_payload = OptQuorumStorePayload::new( - inline_batches.into(), - Vec::new().into(), - proofs.into(), - execution_limits, - ); - let opt_qs3 = opt_qs.extend(converted_payload); - Payload::OptQuorumStore(opt_qs3) + unimplemented!( + "Cannot extend OptQuorumStore with QuorumStoreInlineHybrid or viceversa" + ) }, (Payload::OptQuorumStore(opt_qs1), Payload::OptQuorumStore(opt_qs2)) => { let opt_qs3 = opt_qs1.extend(opt_qs2); diff --git a/consensus/consensus-types/src/payload.rs b/consensus/consensus-types/src/payload.rs index 04482106e41ad..7226b622b4cb7 100644 --- a/consensus/consensus-types/src/payload.rs +++ b/consensus/consensus-types/src/payload.rs @@ -5,11 +5,18 @@ use crate::{ common::{DataStatus, ProofWithData}, proof_of_store::{BatchInfo, ProofOfStore}, }; +use aptos_executor_types::ExecutorResult; use aptos_infallible::Mutex; use aptos_types::{transaction::SignedTransaction, PeerId}; use core::fmt; +use futures::{ + future::{BoxFuture, Shared}, + stream::Stream, + FutureExt, +}; use serde::{Deserialize, Serialize}; use std::{ + fmt::Debug, ops::{Deref, DerefMut}, sync::Arc, }; @@ -24,11 +31,36 @@ pub trait TDataInfo { fn signers(&self, ordered_authors: &[PeerId]) -> Vec; } +pub struct DataFetchFut { + pub iteration: u32, + pub fut: Shared>>>, +} + +impl fmt::Debug for DataFetchFut { + fn fmt(&self, _: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + Ok(()) + } +} + +impl DataFetchFut { + pub fn extend(&mut self, other: DataFetchFut) { + let self_fut = self.fut.clone(); + self.fut = async move { + let result1 = self_fut.await?; + let result2 = other.fut.await?; + let result = [result1, result2].concat(); + Ok(result) + } + .boxed() + .shared(); + } +} + #[derive(Deserialize, Serialize, Clone, Debug)] pub struct BatchPointer { pub batch_summary: Vec, #[serde(skip)] - pub status: Arc>>, + pub data_fut: Arc>>, } impl BatchPointer @@ -38,14 +70,14 @@ where pub fn new(metadata: Vec) -> Self { Self { batch_summary: metadata, - status: Arc::new(Mutex::new(None)), + data_fut: Arc::new(Mutex::new(None)), } } pub fn extend(&mut self, other: BatchPointer) { - let other_data_status = other.status.lock().as_mut().unwrap().take(); + let other_data_status = other.data_fut.lock().take().expect("must be initialized"); self.batch_summary.extend(other.batch_summary); - let mut status = self.status.lock(); + let mut status = self.data_fut.lock(); *status = match &mut *status { None => Some(other_data_status), Some(status) => { @@ -81,7 +113,7 @@ where fn from(value: Vec) -> Self { Self { batch_summary: value, - status: Arc::new(Mutex::new(None)), + data_fut: Arc::new(Mutex::new(None)), } } } @@ -89,7 +121,7 @@ where impl PartialEq for BatchPointer { fn eq(&self, other: &Self) -> bool { self.batch_summary == other.batch_summary - && Arc::as_ptr(&self.status) == Arc::as_ptr(&other.status) + && Arc::as_ptr(&self.data_fut) == Arc::as_ptr(&other.data_fut) } } @@ -112,15 +144,6 @@ impl IntoIterator for BatchPointer { } } -impl From for BatchPointer { - fn from(value: ProofWithData) -> Self { - Self { - batch_summary: value.proofs, - status: value.status, - } - } -} - #[derive(Serialize, Deserialize, Debug, Clone, PartialEq, Eq)] pub enum PayloadExecutionLimit { None, diff --git a/consensus/consensus-types/src/proof_of_store.rs b/consensus/consensus-types/src/proof_of_store.rs index 066e5704b5d03..3f06a6cf0f274 100644 --- a/consensus/consensus-types/src/proof_of_store.rs +++ b/consensus/consensus-types/src/proof_of_store.rs @@ -138,10 +138,6 @@ impl BatchInfo { pub fn gas_bucket_start(&self) -> u64 { self.gas_bucket_start } - - pub fn is_expired(&self) -> bool { - self.expiration() < aptos_infallible::duration_since_epoch().as_micros() as u64 - } } impl Display for BatchInfo { diff --git a/consensus/src/payload_manager.rs b/consensus/src/payload_manager.rs index 0a8e65fb9dcda..af04d0035ca94 100644 --- a/consensus/src/payload_manager.rs +++ b/consensus/src/payload_manager.rs @@ -13,7 +13,7 @@ use crate::{ use aptos_consensus_types::{ block::Block, common::{DataStatus, Payload, ProofWithData, Round}, - payload::{BatchPointer, TDataInfo}, + payload::{BatchPointer, DataFetchFut, TDataInfo}, proof_of_store::BatchInfo, }; use aptos_crypto::HashValue; @@ -25,11 +25,13 @@ use aptos_infallible::Mutex; use aptos_logger::prelude::*; use aptos_types::{transaction::SignedTransaction, PeerId}; use async_trait::async_trait; -use futures::channel::mpsc::Sender; +use futures::{channel::mpsc::Sender, future::BoxFuture, FutureExt, Stream, StreamExt}; use std::{ + borrow::Borrow, collections::{btree_map::Entry, BTreeMap}, ops::Deref, sync::Arc, + task::Poll, }; use tokio::sync::oneshot; @@ -120,8 +122,8 @@ impl QuorumStorePayloadManager { } } - fn request_transactions<'a>( - batches: impl Iterator)>, + fn request_transactions( + batches: Vec<(BatchInfo, Vec)>, block_timestamp: u64, batch_reader: Arc, ) -> Vec<( @@ -218,7 +220,13 @@ impl TPayloadManager for QuorumStorePayloadManager { proof_with_status .proofs .iter() - .map(|proof| (proof.info(), proof.shuffled_signers(&self.ordered_authors))), + .map(|proof| { + ( + proof.info().clone(), + proof.shuffled_signers(&self.ordered_authors), + ) + }) + .collect(), timestamp, batch_reader, ); @@ -234,21 +242,25 @@ impl TPayloadManager for QuorumStorePayloadManager { timestamp: u64, ordered_authors: &[PeerId], ) { - if data_pointer.status.lock().is_some() { + let mut data_fut = data_pointer.data_fut.lock(); + if data_fut.is_some() { return; } - let receivers = QuorumStorePayloadManager::request_transactions( - data_pointer - .batch_summary - .iter() - .map(|proof| (proof.info(), proof.signers(ordered_authors))), - timestamp, - batch_reader, - ); - data_pointer - .status - .lock() - .replace(DataStatus::Requested(receivers)); + + let batches_and_responders = data_pointer + .batch_summary + .iter() + .map(|proof| { + let mut signers = proof.signers(ordered_authors); + // TODO(ibalajiarun): Add block author to signers + (proof.info().clone(), signers) + }) + .collect(); + let fut = + request_txns_from_quorum_store(batches_and_responders, timestamp, batch_reader) + .boxed() + .shared(); + *data_fut = Some(DataFetchFut { fut, iteration: 0 }) } match payload { @@ -477,86 +489,80 @@ async fn get_transactions_for_observer( )) } +async fn request_txns_from_quorum_store( + batches_and_responders: Vec<(BatchInfo, Vec)>, + timestamp: u64, + batch_reader: Arc, +) -> ExecutorResult> { + let mut vec_ret = Vec::new(); + let receivers = QuorumStorePayloadManager::request_transactions( + batches_and_responders, + timestamp, + batch_reader, + ); + for (digest, rx) in receivers { + match rx.await { + Err(e) => { + // We probably advanced epoch already. + warn!( + "Oneshot channel to get a batch was dropped with error {:?}", + e + ); + return Err(DataNotFound(digest)); + }, + Ok(Ok(data)) => { + vec_ret.push(data); + }, + Ok(Err(e)) => { + return Err(e); + }, + } + } + let ret: Vec = vec_ret.into_iter().flatten().collect(); + Ok(ret) +} + async fn process_payload_helper( data_ptr: &BatchPointer, batch_reader: Arc, block: &Block, ordered_authors: &[PeerId], ) -> ExecutorResult> { - let status = data_ptr.status.lock().take(); - match status.expect("Should have been updated before.") { - DataStatus::Cached(data) => { - counters::QUORUM_BATCH_READY_COUNT.inc(); - data_ptr - .status - .lock() - .replace(DataStatus::Cached(data.clone())); - Ok(data) - }, - DataStatus::Requested(receivers) => { - let _timer = counters::BATCH_WAIT_DURATION.start_timer(); - let mut vec_ret = Vec::new(); - if !receivers.is_empty() { - debug!( - "QSE: waiting for data on {} receivers, block_round {}", - receivers.len(), - block.round() - ); - } - let batches_and_responders = data_ptr.batch_summary.iter().map(|proof| { - let mut signers = proof.signers(ordered_authors); - if let Some(author) = block.author() { - signers.push(author); - } - (proof.info(), signers) - }); - for (digest, rx) in receivers { - match rx.await { - Err(e) => { - // We probably advanced epoch already. - warn!( - "Oneshot channel to get a batch was dropped with error {:?}", - e - ); - let new_receivers = QuorumStorePayloadManager::request_transactions( - batches_and_responders, - block.timestamp_usecs(), - batch_reader.clone(), - ); - // Could not get all data so requested again - data_ptr - .status - .lock() - .replace(DataStatus::Requested(new_receivers)); - return Err(DataNotFound(digest)); - }, - Ok(Ok(data)) => { - vec_ret.push(data); - }, - Ok(Err(e)) => { - let new_receivers = QuorumStorePayloadManager::request_transactions( - batches_and_responders, - block.timestamp_usecs(), - batch_reader.clone(), - ); - // Could not get all data so requested again - data_ptr - .status - .lock() - .replace(DataStatus::Requested(new_receivers)); - return Err(e); - }, - } - } - let ret: Vec = vec_ret.into_iter().flatten().collect(); - // execution asks for the data twice, so data is cached here for the second time. - data_ptr - .status - .lock() - .replace(DataStatus::Cached(ret.clone())); - Ok(ret) - }, + let (iteration, fut) = { + let data_fut_guard = data_ptr.data_fut.lock(); + let data_fut = data_fut_guard.as_ref().expect("must be initialized"); + (data_fut.iteration, data_fut.fut.clone()) + }; + + let result = fut.await; + // If error, reschedule before returning the result + if result.is_err() { + let mut data_fut_guard = data_ptr.data_fut.lock(); + let data_fut = data_fut_guard.as_mut().expect("must be initialized"); + // Protection against race, check the iteration number before rescheduling. + if data_fut.iteration == iteration { + let batches_and_responders = data_ptr + .batch_summary + .iter() + .map(|proof| { + let mut signers = proof.signers(ordered_authors); + if let Some(author) = block.author() { + signers.push(author); + } + (proof.info().clone(), signers) + }) + .collect(); + data_fut.fut = request_txns_from_quorum_store( + batches_and_responders, + block.timestamp_usecs(), + batch_reader, + ) + .boxed() + .shared(); + data_fut.iteration = iteration + 1; + } } + result } /// This is deprecated. Use `process_payload_helper` instead after migrating to @@ -596,9 +602,16 @@ async fn process_payload( e ); let new_receivers = QuorumStorePayloadManager::request_transactions( - proof_with_data.proofs.iter().map(|proof| { - (proof.info(), proof.shuffled_signers(ordered_authors)) - }), + proof_with_data + .proofs + .iter() + .map(|proof| { + ( + proof.info().clone(), + proof.shuffled_signers(ordered_authors), + ) + }) + .collect(), block.timestamp_usecs(), batch_reader.clone(), ); @@ -614,9 +627,16 @@ async fn process_payload( }, Ok(Err(e)) => { let new_receivers = QuorumStorePayloadManager::request_transactions( - proof_with_data.proofs.iter().map(|proof| { - (proof.info(), proof.shuffled_signers(ordered_authors)) - }), + proof_with_data + .proofs + .iter() + .map(|proof| { + ( + proof.info().clone(), + proof.shuffled_signers(ordered_authors), + ) + }) + .collect(), block.timestamp_usecs(), batch_reader.clone(), ); diff --git a/execution/executor-types/src/error.rs b/execution/executor-types/src/error.rs index a09324dbe9925..0e3367d0ec3ca 100644 --- a/execution/executor-types/src/error.rs +++ b/execution/executor-types/src/error.rs @@ -9,7 +9,7 @@ use serde::{Deserialize, Serialize}; use std::fmt::Display; use thiserror::Error; -#[derive(Debug, Deserialize, Error, PartialEq, Eq, Serialize)] +#[derive(Debug, Deserialize, Error, PartialEq, Eq, Serialize, Clone)] /// Different reasons for proposal rejection pub enum ExecutorError { #[error("Cannot find speculation result for block id {0}")]