diff --git a/consensus/src/liveness/proposal_generator.rs b/consensus/src/liveness/proposal_generator.rs index 411d24c7ac2fae..364597700390df 100644 --- a/consensus/src/liveness/proposal_generator.rs +++ b/consensus/src/liveness/proposal_generator.rs @@ -2,7 +2,7 @@ // Parts of the project are originally copyright © Meta Platforms, Inc. // SPDX-License-Identifier: Apache-2.0 -use super::proposer_election::ProposerElection; +use super::{proposer_election::ProposerElection, round_state::NewRoundReason}; use crate::{ block_storage::BlockReader, counters::{ @@ -16,6 +16,7 @@ use crate::{ util::time_service::TimeService, }; use anyhow::{bail, ensure, format_err, Context}; +use aptos_collections::BoundedVecDeque; use aptos_config::config::{ ChainHealthBackoffValues, ExecutionBackpressureConfig, PipelineBackpressureValues, }; @@ -222,6 +223,67 @@ impl PipelineBackpressureConfig { } } +struct LastProposalStatuses { + statuses: BoundedVecDeque, + last_consecutive_success_count: usize, +} + +impl LastProposalStatuses { + fn new(max_proposals: usize) -> Self { + Self { + statuses: BoundedVecDeque::new(max_proposals), + last_consecutive_success_count: 0, + } + } + + fn push(&mut self, status: NewRoundReason) { + self.push(status); + + self.last_consecutive_success_count = if matches!(status, NewRoundReason::Timeout) { + 0 + } else { + max(self.last_consecutive_success_count + 1, self.statuses.len()) + }; + } + + fn last_consecutive_successes(&self) -> usize { + self.last_consecutive_success_count + } + + fn len(&self) -> usize { + self.statuses.len() + } +} + +pub struct ExponentialWindowFailureTracker { + window: usize, + max_window: usize, + status: LastProposalStatuses, +} + +impl ExponentialWindowFailureTracker { + fn new(max_window: usize) -> Self { + Self { + window: 2, + max_window, + status: LastProposalStatuses::new(max_window), + } + } + + fn update_and_check_failures(&mut self, status: NewRoundReason) -> bool { + self.status.push(status); + + if self.status.last_consecutive_successes() == 0 { + self.window *= 2; + self.window = self.window.max(self.max_window); + } else if self.status.last_consecutive_successes() == self.status.len() { + self.window = 2; + } + + self.status.last_consecutive_successes() < self.window + } +} + /// ProposalGenerator is responsible for generating the proposed block on demand: it's typically /// used by a validator that believes it's a valid candidate for serving as a proposer at a given /// round. @@ -267,6 +329,8 @@ pub struct ProposalGenerator { vtxn_config: ValidatorTxnConfig, allow_batches_without_pos_in_proposal: bool, + + failure_tracker: Mutex, } impl ProposalGenerator { @@ -305,6 +369,7 @@ impl ProposalGenerator { quorum_store_enabled, vtxn_config, allow_batches_without_pos_in_proposal, + failure_tracker: Mutex::new(ExponentialWindowFailureTracker::new(100)), } } @@ -342,6 +407,7 @@ impl ProposalGenerator { pub async fn generate_proposal( &self, round: Round, + round_reason: NewRoundReason, proposer_election: Arc, wait_callback: BoxFuture<'static, ()>, ) -> anyhow::Result { @@ -353,6 +419,16 @@ impl ProposalGenerator { bail!("Already proposed in the round {}", round); } } + let opt_batch_txns_pct = if self.opt_qs_config { + let mut failure_tracker = self.failure_tracker.lock(); + if failure_tracker.update_and_check_failures(round_reason) { + 0 + } else { + 50 + } + } else { + 0 + }; let hqc = self.ensure_highest_quorum_cert(round)?; @@ -456,7 +532,7 @@ impl ProposalGenerator { soft_max_txns_after_filtering: max_txns_from_block_to_execute .unwrap_or(max_block_txns_after_filtering), max_inline_txns: self.max_inline_txns, - opt_batch_txns_pct: 0, + opt_batch_txns_pct, user_txn_filter: payload_filter, pending_ordering, pending_uncommitted_blocks: pending_blocks.len(), diff --git a/crates/aptos-collections/src/bounded_vec_deque.rs b/crates/aptos-collections/src/bounded_vec_deque.rs index 6435b7371f3317..0a7b82d2bcc836 100644 --- a/crates/aptos-collections/src/bounded_vec_deque.rs +++ b/crates/aptos-collections/src/bounded_vec_deque.rs @@ -52,6 +52,10 @@ impl BoundedVecDeque { pub fn iter(&self) -> Iter<'_, T> { self.inner.iter() } + + pub fn len(&self) -> usize { + self.inner.len() + } } impl IntoIterator for BoundedVecDeque {