Skip to content

Commit

Permalink
[consensus] fallback heuristics for optimistic quorum store (#14346)
Browse files Browse the repository at this point in the history
* [consensus] Fallback heuristics for optimistic quorum store
* [optqs] set minimum batch age for optimistic batch proposals
* new unit tests and existing test fixes
* allow handling OptQS payload by default
  • Loading branch information
ibalajiarun authored Oct 11, 2024
1 parent a84e2be commit b2781bf
Show file tree
Hide file tree
Showing 38 changed files with 980 additions and 238 deletions.
4 changes: 1 addition & 3 deletions consensus/consensus-types/src/common.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ use crate::{
payload::{OptQuorumStorePayload, PayloadExecutionLimit},
proof_of_store::{BatchInfo, ProofCache, ProofOfStore},
};
use anyhow::bail;
use aptos_crypto::{
hash::{CryptoHash, CryptoHasher},
HashValue,
Expand Down Expand Up @@ -520,8 +519,7 @@ impl Payload {
(true, Payload::OptQuorumStore(opt_quorum_store)) => {
let proof_with_data = opt_quorum_store.proof_with_data();
Self::verify_with_cache(&proof_with_data.batch_summary, validator, proof_cache)?;
// TODO(ibalajiarun): Remove this log when OptQS is enabled.
bail!("OptQuorumStore Payload is not expected yet");
Ok(())
},
(_, _) => Err(anyhow::anyhow!(
"Wrong payload type. Expected Payload::InQuorumStore {} got {} ",
Expand Down
1 change: 1 addition & 0 deletions consensus/consensus-types/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ pub mod order_vote;
pub mod order_vote_msg;
pub mod order_vote_proposal;
pub mod payload;
pub mod payload_pull_params;
pub mod pipeline;
pub mod pipeline_execution_result;
pub mod pipelined_block;
Expand Down
91 changes: 91 additions & 0 deletions consensus/consensus-types/src/payload_pull_params.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,91 @@
// Copyright (c) Aptos Foundation
// SPDX-License-Identifier: Apache-2.0

use crate::{
common::{Author, PayloadFilter},
utils::PayloadTxnsSize,
};
use std::{collections::HashSet, time::Duration};

#[derive(Clone)]
pub struct OptQSPayloadPullParams {
pub exclude_authors: HashSet<Author>,
pub minimum_batch_age_usecs: u64,
}

pub struct PayloadPullParameters {
pub max_poll_time: Duration,
pub max_txns: PayloadTxnsSize,
pub max_txns_after_filtering: u64,
pub soft_max_txns_after_filtering: u64,
pub max_inline_txns: PayloadTxnsSize,
pub user_txn_filter: PayloadFilter,
pub pending_ordering: bool,
pub pending_uncommitted_blocks: usize,
pub recent_max_fill_fraction: f32,
pub block_timestamp: Duration,
pub maybe_optqs_payload_pull_params: Option<OptQSPayloadPullParams>,
}

impl std::fmt::Debug for OptQSPayloadPullParams {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("OptQSPayloadPullParams")
.field("exclude_authors", &self.exclude_authors)
.field("minimum_batch_age_useds", &self.minimum_batch_age_usecs)
.finish()
}
}

impl PayloadPullParameters {
pub fn new_for_test(
max_poll_time: Duration,
max_txns: u64,
max_txns_bytes: u64,
max_txns_after_filtering: u64,
soft_max_txns_after_filtering: u64,
max_inline_txns: u64,
max_inline_txns_bytes: u64,
user_txn_filter: PayloadFilter,
pending_ordering: bool,
pending_uncommitted_blocks: usize,
recent_max_fill_fraction: f32,
block_timestamp: Duration,
) -> Self {
Self {
max_poll_time,
max_txns: PayloadTxnsSize::new(max_txns, max_txns_bytes),
max_txns_after_filtering,
soft_max_txns_after_filtering,
max_inline_txns: PayloadTxnsSize::new(max_inline_txns, max_inline_txns_bytes),
user_txn_filter,
pending_ordering,
pending_uncommitted_blocks,
recent_max_fill_fraction,
block_timestamp,
maybe_optqs_payload_pull_params: None,
}
}
}

impl std::fmt::Debug for PayloadPullParameters {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("PayloadPullParameters")
.field("max_poll_time", &self.max_poll_time)
.field("max_items", &self.max_txns)
.field("max_unique_items", &self.max_txns_after_filtering)
.field(
"soft_max_txns_after_filtering",
&self.soft_max_txns_after_filtering,
)
.field("max_inline_items", &self.max_inline_txns)
.field("pending_ordering", &self.pending_ordering)
.field(
"pending_uncommitted_blocks",
&self.pending_uncommitted_blocks,
)
.field("recent_max_fill_fraction", &self.recent_max_fill_fraction)
.field("block_timestamp", &self.block_timestamp)
.field("optqs_params", &self.maybe_optqs_payload_pull_params)
.finish()
}
}
5 changes: 3 additions & 2 deletions consensus/consensus-types/src/request_response.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@

use crate::{
common::{Payload, PayloadFilter},
payload_pull_params::OptQSPayloadPullParams,
utils::PayloadTxnsSize,
};
use anyhow::Result;
Expand All @@ -16,8 +17,8 @@ pub struct GetPayloadRequest {
pub max_txns_after_filtering: u64,
// soft max number of transactions after filtering in the block (i.e. include one that crosses it)
pub soft_max_txns_after_filtering: u64,
// target txns with opt batches in max_txns as pct
pub opt_batch_txns_pct: u8,
// opt payload pull params
pub maybe_optqs_payload_pull_params: Option<OptQSPayloadPullParams>,
// max number of inline transactions (transactions without a proof of store)
pub max_inline_txns: PayloadTxnsSize,
// return non full
Expand Down
9 changes: 8 additions & 1 deletion consensus/consensus-types/src/round_timeout.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,22 +7,29 @@ use crate::{
timeout_2chain::TwoChainTimeout,
};
use anyhow::{ensure, Context};
use aptos_bitvec::BitVec;
use aptos_crypto::bls12381;
use aptos_short_hex_str::AsShortHexStr;
use aptos_types::validator_verifier::ValidatorVerifier;
use serde::{Deserialize, Serialize};

#[derive(Deserialize, Serialize, Clone, PartialEq, Eq)]
#[derive(Deserialize, Serialize, Clone, PartialEq, Eq, Hash, Debug)]
pub enum RoundTimeoutReason {
Unknown,
ProposalNotReceived,
PayloadUnavailable { missing_authors: BitVec },
NoQC,
}

impl std::fmt::Display for RoundTimeoutReason {
fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
match self {
RoundTimeoutReason::Unknown => write!(f, "Unknown"),
RoundTimeoutReason::ProposalNotReceived => write!(f, "ProposalNotReceived"),
RoundTimeoutReason::PayloadUnavailable { .. } => {
write!(f, "PayloadUnavailable",)
},
RoundTimeoutReason::NoQC => write!(f, "NoQC"),
}
}
}
Expand Down
16 changes: 9 additions & 7 deletions consensus/src/block_storage/block_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ use crate::{
util::time_service::TimeService,
};
use anyhow::{bail, ensure, format_err, Context};
use aptos_bitvec::BitVec;
use aptos_consensus_types::{
block::Block,
common::Round,
Expand Down Expand Up @@ -472,18 +473,19 @@ impl BlockStore {
self.pending_blocks.clone()
}

pub async fn wait_for_payload(&self, block: &Block) -> anyhow::Result<()> {
tokio::time::timeout(
Duration::from_secs(1),
self.payload_manager.get_transactions(block),
)
.await??;
pub async fn wait_for_payload(&self, block: &Block, deadline: Duration) -> anyhow::Result<()> {
let duration = deadline.saturating_sub(self.time_service.get_current_timestamp());
tokio::time::timeout(duration, self.payload_manager.get_transactions(block)).await??;
Ok(())
}

pub fn check_payload(&self, proposal: &Block) -> bool {
pub fn check_payload(&self, proposal: &Block) -> Result<(), BitVec> {
self.payload_manager.check_payload_availability(proposal)
}

pub fn get_block_for_round(&self, round: Round) -> Option<Arc<PipelinedBlock>> {
self.inner.read().get_block_for_round(round)
}
}

impl BlockReader for BlockStore {
Expand Down
34 changes: 31 additions & 3 deletions consensus/src/block_storage/block_tree.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,13 @@ use aptos_consensus_types::{
};
use aptos_crypto::HashValue;
use aptos_logger::prelude::*;
use aptos_types::{block_info::BlockInfo, ledger_info::LedgerInfoWithSignatures};
use aptos_types::{
block_info::{BlockInfo, Round},
ledger_info::LedgerInfoWithSignatures,
};
use mirai_annotations::{checked_verify_eq, precondition};
use std::{
collections::{vec_deque::VecDeque, HashMap, HashSet},
collections::{vec_deque::VecDeque, BTreeMap, HashMap, HashSet},
sync::Arc,
};

Expand Down Expand Up @@ -89,6 +92,9 @@ pub struct BlockTree {
pruned_block_ids: VecDeque<HashValue>,
/// Num pruned blocks to keep in memory.
max_pruned_blocks_in_mem: usize,

/// Round to Block index. We expect only one block per round.
round_to_ids: BTreeMap<Round, HashValue>,
}

impl BlockTree {
Expand All @@ -108,6 +114,8 @@ impl BlockTree {
let root_id = root.id();

let mut id_to_block = HashMap::new();
let mut round_to_ids = BTreeMap::new();
round_to_ids.insert(root.round(), root_id);
id_to_block.insert(root_id, LinkableBlock::new(root));
counters::NUM_BLOCKS_IN_TREE.set(1);

Expand All @@ -132,6 +140,7 @@ impl BlockTree {
pruned_block_ids,
max_pruned_blocks_in_mem,
highest_2chain_timeout_cert,
round_to_ids,
}
}

Expand Down Expand Up @@ -165,7 +174,10 @@ impl BlockTree {

fn remove_block(&mut self, block_id: HashValue) {
// Remove the block from the store
self.id_to_block.remove(&block_id);
if let Some(block) = self.id_to_block.remove(&block_id) {
let round = block.executed_block().round();
self.round_to_ids.remove(&round);
};
self.id_to_quorum_cert.remove(&block_id);
}

Expand All @@ -178,6 +190,12 @@ impl BlockTree {
.map(|lb| lb.executed_block().clone())
}

pub(super) fn get_block_for_round(&self, round: Round) -> Option<Arc<PipelinedBlock>> {
self.round_to_ids
.get(&round)
.and_then(|block_id| self.get_block(block_id))
}

pub(super) fn ordered_root(&self) -> Arc<PipelinedBlock> {
self.get_block(&self.ordered_root_id)
.expect("Root must exist")
Expand Down Expand Up @@ -241,6 +259,16 @@ impl BlockTree {
let linkable_block = LinkableBlock::new(block);
let arc_block = Arc::clone(linkable_block.executed_block());
assert!(self.id_to_block.insert(block_id, linkable_block).is_none());
// Note: the assumption is that we have/enforce unequivocal proposer election.
if let Some(old_block_id) = self.round_to_ids.get(&arc_block.round()) {
warn!(
"Multiple blocks received for round {}. Previous block id: {}",
arc_block.round(),
old_block_id
);
} else {
self.round_to_ids.insert(arc_block.round(), block_id);
}
counters::NUM_BLOCKS_IN_TREE.inc();
Ok(arc_block)
}
Expand Down
5 changes: 3 additions & 2 deletions consensus/src/dag/dag_driver.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,13 +21,14 @@ use crate::{
},
DAGRpcResult, RpcHandler,
},
payload_client::{PayloadClient, PayloadPullParameters},
payload_client::PayloadClient,
};
use anyhow::{bail, ensure};
use aptos_collections::BoundedVecDeque;
use aptos_config::config::DagPayloadConfig;
use aptos_consensus_types::{
common::{Author, Payload, PayloadFilter},
payload_pull_params::PayloadPullParameters,
utils::PayloadTxnsSize,
};
use aptos_crypto::hash::CryptoHash;
Expand Down Expand Up @@ -266,7 +267,7 @@ impl DagDriver {
max_txns_after_filtering: max_txns,
soft_max_txns_after_filtering: max_txns,
max_inline_txns: PayloadTxnsSize::new(100, 100 * 1024),
opt_batch_txns_pct: 0,
maybe_optqs_payload_pull_params: None,
user_txn_filter: payload_filter,
pending_ordering: false,
pending_uncommitted_blocks: 0,
Expand Down
3 changes: 2 additions & 1 deletion consensus/src/dag/tests/helpers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ use crate::{
},
payload_manager::TPayloadManager,
};
use aptos_bitvec::BitVec;
use aptos_consensus_types::{
block::Block,
common::{Author, Payload, Round},
Expand All @@ -26,7 +27,7 @@ impl TPayloadManager for MockPayloadManager {

fn notify_commit(&self, _block_timestamp: u64, _payloads: Vec<Payload>) {}

fn check_payload_availability(&self, _block: &Block) -> bool {
fn check_payload_availability(&self, _block: &Block) -> Result<(), BitVec> {
unimplemented!()
}

Expand Down
12 changes: 12 additions & 0 deletions consensus/src/epoch_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ use crate::{
proposal_generator::{
ChainHealthBackoffConfig, PipelineBackpressureConfig, ProposalGenerator,
},
proposal_status_tracker::{ExponentialWindowFailureTracker, OptQSPullParamsProvider},
proposer_election::ProposerElection,
rotating_proposer_election::{choose_leader, RotatingProposer},
round_proposer_election::RoundProposer,
Expand Down Expand Up @@ -826,6 +827,15 @@ impl<P: OnChainConfigProvider> EpochManager<P> {
self.pending_blocks.clone(),
));

let failures_tracker = Arc::new(Mutex::new(ExponentialWindowFailureTracker::new(
100,
epoch_state.verifier.get_ordered_account_addresses(),
)));
let opt_qs_payload_param_provider = Arc::new(OptQSPullParamsProvider::new(
self.config.quorum_store.enable_opt_quorum_store,
failures_tracker.clone(),
));

info!(epoch = epoch, "Create ProposalGenerator");
// txn manager is required both by proposal generator (to pull the proposers)
// and by event processor (to update their status).
Expand Down Expand Up @@ -854,6 +864,7 @@ impl<P: OnChainConfigProvider> EpochManager<P> {
self.config
.quorum_store
.allow_batches_without_pos_in_proposal,
opt_qs_payload_param_provider,
);
let (round_manager_tx, round_manager_rx) = aptos_channel::new(
QueueStyle::KLAST,
Expand Down Expand Up @@ -887,6 +898,7 @@ impl<P: OnChainConfigProvider> EpochManager<P> {
onchain_randomness_config,
onchain_jwk_consensus_config,
fast_rand_config,
failures_tracker,
);

round_manager.init(last_vote).await;
Expand Down
2 changes: 2 additions & 0 deletions consensus/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,8 @@ mod network_tests;
mod payload_client;
mod pending_order_votes;
mod pending_votes;
#[cfg(test)]
mod pending_votes_test;
pub mod persistent_liveness_storage;
mod pipeline;
pub mod quorum_store;
Expand Down
1 change: 1 addition & 0 deletions consensus/src/liveness/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
pub(crate) mod cached_proposer_election;
pub(crate) mod leader_reputation;
pub(crate) mod proposal_generator;
pub(crate) mod proposal_status_tracker;
pub(crate) mod proposer_election;
pub(crate) mod rotating_proposer_election;
pub(crate) mod round_proposer_election;
Expand Down
Loading

0 comments on commit b2781bf

Please sign in to comment.