Skip to content

Commit

Permalink
[optqs] failure tracker based fallback
Browse files Browse the repository at this point in the history
  • Loading branch information
ibalajiarun committed Aug 21, 2024
1 parent fbdc550 commit 92103ca
Show file tree
Hide file tree
Showing 2 changed files with 82 additions and 2 deletions.
80 changes: 78 additions & 2 deletions consensus/src/liveness/proposal_generator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
// Parts of the project are originally copyright © Meta Platforms, Inc.
// SPDX-License-Identifier: Apache-2.0

use super::proposer_election::ProposerElection;
use super::{proposer_election::ProposerElection, round_state::NewRoundReason};
use crate::{
block_storage::BlockReader,
counters::{
Expand All @@ -16,6 +16,7 @@ use crate::{
util::time_service::TimeService,
};
use anyhow::{bail, ensure, format_err, Context};
use aptos_collections::BoundedVecDeque;
use aptos_config::config::{
ChainHealthBackoffValues, ExecutionBackpressureConfig, PipelineBackpressureValues,
};
Expand Down Expand Up @@ -222,6 +223,67 @@ impl PipelineBackpressureConfig {
}
}

struct LastProposalStatuses {
statuses: BoundedVecDeque<NewRoundReason>,
last_consecutive_success_count: usize,
}

impl LastProposalStatuses {
fn new(max_proposals: usize) -> Self {
Self {
statuses: BoundedVecDeque::new(max_proposals),
last_consecutive_success_count: 0,
}
}

fn push(&mut self, status: NewRoundReason) {
self.push(status);

self.last_consecutive_success_count = if matches!(status, NewRoundReason::Timeout) {
0
} else {
max(self.last_consecutive_success_count + 1, self.statuses.len())
};
}

fn last_consecutive_successes(&self) -> usize {
self.last_consecutive_success_count
}

fn len(&self) -> usize {
self.statuses.len()
}
}

pub struct ExponentialWindowFailureTracker {
window: usize,
max_window: usize,
status: LastProposalStatuses,
}

impl ExponentialWindowFailureTracker {
fn new(max_window: usize) -> Self {
Self {
window: 2,
max_window,
status: LastProposalStatuses::new(max_window),
}
}

fn update_and_check_failures(&mut self, status: NewRoundReason) -> bool {
self.status.push(status);

if self.status.last_consecutive_successes() == 0 {
self.window *= 2;
self.window = self.window.max(self.max_window);
} else if self.status.last_consecutive_successes() == self.status.len() {
self.window = 2;
}

self.status.last_consecutive_successes() < self.window
}
}

/// ProposalGenerator is responsible for generating the proposed block on demand: it's typically
/// used by a validator that believes it's a valid candidate for serving as a proposer at a given
/// round.
Expand Down Expand Up @@ -267,6 +329,8 @@ pub struct ProposalGenerator {
vtxn_config: ValidatorTxnConfig,

allow_batches_without_pos_in_proposal: bool,

failure_tracker: Mutex<ExponentialWindowFailureTracker>,
}

impl ProposalGenerator {
Expand Down Expand Up @@ -305,6 +369,7 @@ impl ProposalGenerator {
quorum_store_enabled,
vtxn_config,
allow_batches_without_pos_in_proposal,
failure_tracker: Mutex::new(ExponentialWindowFailureTracker::new(100)),
}
}

Expand Down Expand Up @@ -342,6 +407,7 @@ impl ProposalGenerator {
pub async fn generate_proposal(
&self,
round: Round,
round_reason: NewRoundReason,
proposer_election: Arc<dyn ProposerElection + Send + Sync>,
wait_callback: BoxFuture<'static, ()>,
) -> anyhow::Result<BlockData> {
Expand All @@ -353,6 +419,16 @@ impl ProposalGenerator {
bail!("Already proposed in the round {}", round);
}
}
let opt_batch_txns_pct = if self.opt_qs_config {
let mut failure_tracker = self.failure_tracker.lock();
if failure_tracker.update_and_check_failures(round_reason) {
0
} else {
50
}
} else {
0
};

let hqc = self.ensure_highest_quorum_cert(round)?;

Expand Down Expand Up @@ -456,7 +532,7 @@ impl ProposalGenerator {
soft_max_txns_after_filtering: max_txns_from_block_to_execute
.unwrap_or(max_block_txns_after_filtering),
max_inline_txns: self.max_inline_txns,
opt_batch_txns_pct: 0,
opt_batch_txns_pct,
user_txn_filter: payload_filter,
pending_ordering,
pending_uncommitted_blocks: pending_blocks.len(),
Expand Down
4 changes: 4 additions & 0 deletions crates/aptos-collections/src/bounded_vec_deque.rs
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,10 @@ impl<T> BoundedVecDeque<T> {
pub fn iter(&self) -> Iter<'_, T> {
self.inner.iter()
}

pub fn len(&self) -> usize {
self.inner.len()
}
}

impl<T> IntoIterator for BoundedVecDeque<T> {
Expand Down

0 comments on commit 92103ca

Please sign in to comment.