From e1287fd413a67c1929bcf90e06602814a7ed0623 Mon Sep 17 00:00:00 2001 From: aldenhu Date: Tue, 18 Jun 2024 00:10:36 +0000 Subject: [PATCH] UseCaseAwareShuffler --- Cargo.lock | 1 + consensus/Cargo.toml | 1 + consensus/src/transaction_shuffler/mod.rs | 17 + .../use_case_aware/delayed_queue.rs | 535 ++++++++++++++++++ .../use_case_aware/iterator.rs | 78 +++ .../use_case_aware/mod.rs | 48 ++ .../use_case_aware/tests/manual.rs | 203 +++++++ .../use_case_aware/tests/mod.rs | 94 +++ .../use_case_aware/tests/proptests.rs | 49 ++ .../use_case_aware/types.rs | 58 ++ .../use_case_aware/utils.rs | 43 ++ testsuite/forge-cli/src/suites/dag.rs | 3 +- .../smoke-test/src/aptos_cli/validator.rs | 2 +- types/src/on_chain_config/execution_config.rs | 41 +- 14 files changed, 1157 insertions(+), 16 deletions(-) create mode 100644 consensus/src/transaction_shuffler/use_case_aware/delayed_queue.rs create mode 100644 consensus/src/transaction_shuffler/use_case_aware/iterator.rs create mode 100644 consensus/src/transaction_shuffler/use_case_aware/mod.rs create mode 100644 consensus/src/transaction_shuffler/use_case_aware/tests/manual.rs create mode 100644 consensus/src/transaction_shuffler/use_case_aware/tests/mod.rs create mode 100644 consensus/src/transaction_shuffler/use_case_aware/tests/proptests.rs create mode 100644 consensus/src/transaction_shuffler/use_case_aware/types.rs create mode 100644 consensus/src/transaction_shuffler/use_case_aware/utils.rs diff --git a/Cargo.lock b/Cargo.lock index 3bf2a461eae60..ba4c885477ce9 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -919,6 +919,7 @@ dependencies = [ "once_cell", "ordered-float 3.9.2", "proptest", + "proptest-derive", "rand 0.7.3", "rayon", "scopeguard", diff --git a/consensus/Cargo.toml b/consensus/Cargo.toml index 4d6f00ed82159..73b103fc080ba 100644 --- a/consensus/Cargo.toml +++ b/consensus/Cargo.toml @@ -100,6 +100,7 @@ claims = { workspace = true } mockall = { workspace = true } move-core-types = { workspace = true } proptest = { workspace = true } +proptest-derive = { workspace = true } tempfile = { workspace = true } [features] diff --git a/consensus/src/transaction_shuffler/mod.rs b/consensus/src/transaction_shuffler/mod.rs index bb1bfdc311c87..a75ce60e0cdd6 100644 --- a/consensus/src/transaction_shuffler/mod.rs +++ b/consensus/src/transaction_shuffler/mod.rs @@ -8,6 +8,7 @@ use std::sync::Arc; mod deprecated_fairness; mod sender_aware; +mod use_case_aware; /// Interface to shuffle transactions pub trait TransactionShuffler: Send + Sync { @@ -61,5 +62,21 @@ pub fn create_transaction_shuffler( entry_fun_conflict_window_size: entry_fun_conflict_window_size as usize, }) }, + UseCaseAware { + sender_spread_factor, + platform_use_case_spread_factor, + user_use_case_spread_factor, + } => { + let config = use_case_aware::Config { + sender_spread_factor, + platform_use_case_spread_factor, + user_use_case_spread_factor, + }; + info!( + config = ?config, + "Using use case aware transaction shuffling." + ); + Arc::new(use_case_aware::UseCaseAwareShuffler { config }) + }, } } diff --git a/consensus/src/transaction_shuffler/use_case_aware/delayed_queue.rs b/consensus/src/transaction_shuffler/use_case_aware/delayed_queue.rs new file mode 100644 index 0000000000000..9bc49f1bec152 --- /dev/null +++ b/consensus/src/transaction_shuffler/use_case_aware/delayed_queue.rs @@ -0,0 +1,535 @@ +// Copyright (c) Aptos Foundation +// SPDX-License-Identifier: Apache-2.0 + +use crate::transaction_shuffler::use_case_aware::{ + types::{InputIdx, OutputIdx, UseCaseAwareTransaction, UseCaseKey}, + utils::StrictMap, + Config, +}; +use move_core_types::account_address::AccountAddress; +use std::{ + collections::{hash_map, BTreeMap, HashMap, VecDeque}, + fmt::Debug, +}; + +/// Key used in priority queues. +/// Part of the key is a txn's input index which guarantees in any priority queue, of use cases or +/// accounts, there are not two entries that share the same delay key. Also, when `try_delay_till` +/// is identical, an entry relating to a earlier txn is prioritized. +#[derive(Clone, Copy, Eq, Ord, PartialEq, PartialOrd)] +struct DelayKey { + try_delay_till: OutputIdx, + input_idx: InputIdx, +} + +impl DelayKey { + fn new(try_delay_till: OutputIdx, input_idx: InputIdx) -> Self { + Self { + try_delay_till, + input_idx, + } + } +} + +impl Debug for DelayKey { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + write!(f, "DelayKey({}, {})", self.try_delay_till, self.input_idx) + } +} + +struct TxnWithInputIdx { + input_idx: InputIdx, + txn: Txn, +} + +impl Debug for TxnWithInputIdx +where + Txn: Debug, +{ + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + write!(f, "Txn({}: {:?})", self.input_idx, self.txn) + } +} + +#[derive(Debug)] +struct Account { + try_delay_till: OutputIdx, + /// Head txn input_idx, tracked for use when the txns queue is empty, in which case + /// it keeps the value before the last txn was dequeued. + input_idx: InputIdx, + txns: VecDeque>, +} + +impl Account +where + Txn: UseCaseAwareTransaction, +{ + fn new_with_txn(try_delay_till: OutputIdx, input_idx: InputIdx, txn: Txn) -> Self { + let txns = vec![TxnWithInputIdx { input_idx, txn }].into(); + Self { + try_delay_till, + input_idx, + txns, + } + } + + fn new_empty(try_delay_till: OutputIdx, input_idx: InputIdx) -> Self { + Self { + try_delay_till, + input_idx, + txns: VecDeque::new(), + } + } + + fn is_empty(&self) -> bool { + self.txns.is_empty() + } + + fn delay_key(&self) -> DelayKey { + DelayKey { + try_delay_till: self.try_delay_till, + input_idx: self.input_idx, + } + } + + fn expect_first_txn(&self) -> &TxnWithInputIdx { + self.txns.front().expect("Must exist.") + } + + fn expect_use_case_key(&self) -> UseCaseKey { + self.expect_first_txn().txn.parse_use_case() + } + + fn queue_txn(&mut self, input_idx: InputIdx, txn: Txn) { + if let Some(last_txn) = self.txns.back() { + assert!(last_txn.input_idx < input_idx); + } else { + self.input_idx = input_idx; + } + self.txns.push_back(TxnWithInputIdx { input_idx, txn }); + } + + fn expect_dequeue_txn(&mut self) -> TxnWithInputIdx { + let txn = self.txns.pop_front().expect("Must exist."); + if let Some(next_txn) = self.txns.front() { + self.input_idx = next_txn.input_idx; + } + txn + } + + fn update_try_delay_till(&mut self, try_delay_till: OutputIdx) { + self.try_delay_till = try_delay_till; + } +} + +#[derive(Debug)] +struct UseCase { + try_delay_till: OutputIdx, + /// Head account input_idx, tracked for use when the accounts queue is empty, in which case + /// it keeps the value before the last account was removed. + input_idx: InputIdx, + account_by_delay: BTreeMap, +} + +impl UseCase { + fn new_empty(try_delay_till: OutputIdx, input_idx: InputIdx) -> Self { + Self { + try_delay_till, + input_idx, + account_by_delay: BTreeMap::new(), + } + } + + fn new_with_account( + try_delay_till: OutputIdx, + address: AccountAddress, + account: &Account, + ) -> Self + where + Txn: UseCaseAwareTransaction, + { + let mut account_by_delay = BTreeMap::new(); + account_by_delay.strict_insert(account.delay_key(), address); + Self { + try_delay_till, + input_idx: account.input_idx, + account_by_delay, + } + } + + fn is_empty(&self) -> bool { + self.account_by_delay.is_empty() + } + + fn delay_key(&self) -> DelayKey { + // If head account will be ready later than the use case itself, respect that. + let try_delay_till = std::cmp::max( + self.try_delay_till, + self.account_by_delay + .first_key_value() + .map_or(0, |(k, _)| k.try_delay_till), + ); + + DelayKey { + try_delay_till, + input_idx: self.input_idx, + } + } + + /// Expects head account to exist (otherwise panic) and return both the DelayKey and the + /// account address for the entry. + fn expect_pop_head_account(&mut self) -> (DelayKey, AccountAddress) { + let (account_delay_key, address) = self.account_by_delay.pop_first().expect("Must exist."); + if let Some((next_account_delay_key, _)) = self.account_by_delay.first_key_value() { + self.input_idx = next_account_delay_key.input_idx; + } + (account_delay_key, address) + } + + fn update_try_delay_till(&mut self, try_delay_till: OutputIdx) { + self.try_delay_till = try_delay_till; + } + + fn add_account(&mut self, address: AccountAddress, account: &Account) + where + Txn: UseCaseAwareTransaction, + { + let account_delay_key = account.delay_key(); + self.account_by_delay + .strict_insert(account_delay_key, address); + let (_, head_address) = self.account_by_delay.first_key_value().unwrap(); + if head_address == &address { + self.input_idx = account_delay_key.input_idx; + } + } +} + +/// Structure to track: +/// 1. all use cases and accounts that are subject to delaying, no matter they have pending txns +/// associated or not. +/// 2. all txns that are examined and delayed previously. +/// +/// * A delayed txn is attached to an account and the account is attached to a priority queue in a use +/// case, which has an entry in the main priority queue. +/// * Empty accounts and use cases are still tracked for the delay so that a next txn in the +/// input stream is properly delayed if associated with such an account or use case. +#[derive(Debug, Default)] +pub(crate) struct DelayedQueue { + /// Registry of all accounts, each of which includes the expected output_idx to delay until and + /// a queue (might be empty) of txns by that sender. + /// + /// An empty account address is tracked in `account_placeholders_by_delay` while a non-empty + /// account address is tracked under `use_cases`. + accounts: HashMap>, + /// Registry of all use cases, each of which includes the expected output_idx to delay until and + /// a priority queue (might be empty) of non-empty accounts whose head txn belongs to that use case. + /// + /// An empty use case is tracked in `use_case_placeholders_by_delay` while a non-empty use case + /// is tracked in the top level `use_cases_by_delay`. + use_cases: HashMap, + + /// Main delay queue of txns. All use cases are non-empty of non-empty accounts. + /// All pending txns are reachable from this nested structure. + /// + /// The DelayKey is derived from the head account's DelayKey combined with the use case's own + /// DelayKey. + /// + /// The head txn of the head account of the head use case in this nested structure is the + /// next txn to be possibly ready. + use_cases_by_delay: BTreeMap, + /// Empty account addresses by the DelayKey (those w/o known delayed txns), kept to track the delay. + account_placeholders_by_delay: BTreeMap, + /// Empty UseCaseKeys by the DelayKey (those w/o known delayed txns), kept to track the delay. + use_case_placeholders_by_delay: BTreeMap, + + /// Externally set output index; when an item has try_delay_till <= output_idx, it's deemed ready + output_idx: OutputIdx, + + config: Config, +} + +impl DelayedQueue +where + Txn: UseCaseAwareTransaction, +{ + pub fn new(config: Config) -> Self { + Self { + accounts: HashMap::new(), + use_cases: HashMap::new(), + + account_placeholders_by_delay: BTreeMap::new(), + use_case_placeholders_by_delay: BTreeMap::new(), + + use_cases_by_delay: BTreeMap::new(), + + output_idx: 0, + + config, + } + } + + /// Remove stale (empty use cases and accounts with try_delay_till <= self.output_idx) placeholders. + fn drain_placeholders(&mut self) { + let least_to_keep = DelayKey::new(self.output_idx + 1, 0); + + let remaining_use_case_placeholders = self + .use_case_placeholders_by_delay + .split_off(&least_to_keep); + let remaining_account_placeholders = + self.account_placeholders_by_delay.split_off(&least_to_keep); + + self.use_case_placeholders_by_delay + .iter() + .for_each(|(_delay_key, use_case_key)| self.use_cases.strict_remove(use_case_key)); + self.account_placeholders_by_delay + .iter() + .for_each(|(_delay_key, address)| self.accounts.strict_remove(address)); + + self.use_case_placeholders_by_delay = remaining_use_case_placeholders; + self.account_placeholders_by_delay = remaining_account_placeholders; + } + + pub fn bump_output_idx(&mut self, output_idx: OutputIdx) { + assert!(output_idx >= self.output_idx); + // It's possible that the queue returned nothing last round hence the output idx didn't move. + if output_idx > self.output_idx { + self.output_idx = output_idx; + self.drain_placeholders(); + } + } + + pub fn pop_head(&mut self, only_if_ready: bool) -> Option { + // See if any delayed txn exists. If not, return None. + let use_case_entry = match self.use_cases_by_delay.first_entry() { + None => { + return None; + }, + Some(occupied_entry) => occupied_entry, + }; + let use_case_delay_key = use_case_entry.key(); + + // Check readiness. + if only_if_ready && use_case_delay_key.try_delay_till > self.output_idx { + return None; + } + + // Gonna return the front txn of the front account of the front use case. + + // First, both the use case and account need to be removed from the priority queues. + let use_case_delay_key = *use_case_delay_key; + let use_case_key = use_case_entry.remove(); + let use_case = self.use_cases.expect_mut(&use_case_key); + let (account_delay_key, address) = use_case.expect_pop_head_account(); + assert!(account_delay_key.try_delay_till <= use_case_delay_key.try_delay_till); + assert_eq!(account_delay_key.input_idx, use_case_delay_key.input_idx); + + // Pop first txn from account (for returning it later). + let account = self.accounts.expect_mut(&address); + let txn = account.expect_dequeue_txn(); + + // Update priorities. + account.update_try_delay_till(self.output_idx + 1 + self.config.sender_spread_factor()); + use_case.update_try_delay_till( + self.output_idx + 1 + self.config.use_case_spread_factor(&use_case_key), + ); + + // Add account and original use case back to delay queues. + + if account.is_empty() { + self.account_placeholders_by_delay + .strict_insert(account.delay_key(), address); + if use_case.is_empty() { + self.use_case_placeholders_by_delay + .strict_insert(use_case.delay_key(), use_case_key.clone()); + } else { + self.use_cases_by_delay + .strict_insert(use_case.delay_key(), use_case_key.clone()); + } + } else { + // See if account now belongs to a different use case. + let new_use_case_key = account.expect_use_case_key(); + if new_use_case_key == use_case_key { + use_case.add_account(address, account); + self.use_cases_by_delay + .strict_insert(use_case.delay_key(), use_case_key.clone()); + } else { + // Account now belongs to a different use case. + + // Add original use case back to delay queue. + if use_case.is_empty() { + self.use_case_placeholders_by_delay + .strict_insert(use_case.delay_key(), use_case_key.clone()); + } else { + self.use_cases_by_delay + .strict_insert(use_case.delay_key(), use_case_key.clone()); + } + + // Add the account to the new use case. + match self.use_cases.entry(new_use_case_key.clone()) { + hash_map::Entry::Occupied(mut occupied_entry) => { + // Existing use case, remove from priority queues. + let new_use_case = occupied_entry.get_mut(); + if new_use_case.is_empty() { + self.use_case_placeholders_by_delay + .strict_remove(&new_use_case.delay_key()); + } else { + self.use_cases_by_delay + .strict_remove(&new_use_case.delay_key()); + } + // Add account to use case. + new_use_case.add_account(address, account); + // Add new use case back to delay queue. + self.use_cases_by_delay + .strict_insert(new_use_case.delay_key(), new_use_case_key.clone()); + }, + hash_map::Entry::Vacant(entry) => { + // Use case not tracked previously, try_delay_till = output_idx + 1 + let new_use_case = entry.insert(UseCase::new_with_account( + self.output_idx + 1, + address, + account, + )); + self.use_cases_by_delay + .strict_insert(new_use_case.delay_key(), new_use_case_key.clone()); + }, + } + } + } + + Some(txn.txn) + } + + /// Txn has to be delayed, attach it to respective account and use case. + fn queue_txn( + &mut self, + input_idx: InputIdx, + address: AccountAddress, + use_case_key: UseCaseKey, + txn: Txn, + ) { + match self.accounts.get_mut(&address) { + Some(account) => { + if account.is_empty() { + // Account placeholder exists, move it from the placeholder queue to the main queue. + self.account_placeholders_by_delay + .remove(&account.delay_key()); + account.queue_txn(input_idx, txn); + match self.use_cases.entry(use_case_key.clone()) { + hash_map::Entry::Occupied(occupied) => { + let use_case = occupied.into_mut(); + if use_case.is_empty() { + self.use_case_placeholders_by_delay + .strict_remove(&use_case.delay_key()); + } else { + self.use_cases_by_delay.strict_remove(&use_case.delay_key()); + } + use_case.add_account(address, account); + self.use_cases_by_delay + .strict_insert(use_case.delay_key(), use_case_key.clone()); + }, + hash_map::Entry::Vacant(vacant) => { + // Use case not tracked previously, the use case is ready at the current + // output_idx, instead of output_idx +1 -- it makes a difference if + // a txn later in the input queue that's of the same use case but not + // blocked by account delay is tested for readiness. + let use_case = + UseCase::new_with_account(self.output_idx, address, account); + self.use_cases_by_delay + .strict_insert(use_case.delay_key(), use_case_key.clone()); + vacant.insert(use_case); + }, + } + } else { + // Account tracked and not empty, so appending a new txn to it won't affect positions + // in delay queues + account.queue_txn(input_idx, txn); + } + }, + None => { + // Account not previously tracked. + let account = Account::new_with_txn(self.output_idx + 1, input_idx, txn); + // Account didn't exist before, so use case must have been tracked, otherwise the + // txn whould've been selected for output, bypassing the queue. + let use_case = self.use_cases.expect_mut(&use_case_key); + if use_case.is_empty() { + self.use_case_placeholders_by_delay + .strict_remove(&use_case.delay_key()); + } else { + self.use_cases_by_delay.strict_remove(&use_case.delay_key()); + } + use_case.add_account(address, &account); + + self.accounts.strict_insert(address, account); + self.use_cases_by_delay + .strict_insert(use_case.delay_key(), use_case_key.clone()); + }, + } + } + + /// Txn from input queue directly selected for output, needs to bump delays for relevant + /// account and use case. + fn update_delays_for_selected_txn( + &mut self, + input_idx: InputIdx, + address: AccountAddress, + use_case_key: UseCaseKey, + ) { + let account_try_delay_till = self.output_idx + 1 + self.config.sender_spread_factor(); + let use_case_try_delay_till = + self.output_idx + 1 + self.config.use_case_spread_factor(&use_case_key); + + match self.use_cases.entry(use_case_key.clone()) { + hash_map::Entry::Occupied(occupied) => { + let use_case = occupied.into_mut(); + // Txn wouldn't have been selected for output if the use case is empty (tracking + // for a try_delay_till > self.output_idx) + assert!(!use_case.is_empty()); + + self.use_cases_by_delay.strict_remove(&use_case.delay_key()); + use_case.update_try_delay_till(use_case_try_delay_till); + self.use_cases_by_delay + .strict_insert(use_case.delay_key(), use_case_key); + }, + hash_map::Entry::Vacant(vacant) => { + let use_case = UseCase::new_empty(use_case_try_delay_till, input_idx); + self.use_case_placeholders_by_delay + .strict_insert(use_case.delay_key(), use_case_key); + vacant.insert(use_case); + }, + } + + // Notice this function is called after the txn is selected for output due to no delaying + // needed, so the account must not have been tracked before, otherwise it wouldn't have been + // selected for output. + let new_account = Account::new_empty(account_try_delay_till, input_idx); + let new_account_delay_key = new_account.delay_key(); + self.accounts.strict_insert(address, new_account); + self.account_placeholders_by_delay + .strict_insert(new_account_delay_key, address); + } + + /// Return the txn back if relevant use case and sender are not subject to delaying. Otherwise, + /// Queue it up. + pub fn queue_or_return(&mut self, input_idx: InputIdx, txn: Txn) -> Option { + let address = txn.parse_sender(); + let account_opt = self.accounts.get_mut(&address); + let use_case_key = txn.parse_use_case(); + let use_case_opt = self.use_cases.get_mut(&use_case_key); + + let account_should_delay = account_opt.as_ref().map_or(false, |account| { + !account.is_empty() // needs delaying due to queued txns under the same account + || account.try_delay_till > self.output_idx + }); + let use_case_should_delay = use_case_opt + .as_ref() + .map_or(false, |use_case| use_case.try_delay_till > self.output_idx); + + if account_should_delay || use_case_should_delay { + self.queue_txn(input_idx, address, use_case_key, txn); + None + } else { + self.update_delays_for_selected_txn(input_idx, address, use_case_key); + Some(txn) + } + } +} diff --git a/consensus/src/transaction_shuffler/use_case_aware/iterator.rs b/consensus/src/transaction_shuffler/use_case_aware/iterator.rs new file mode 100644 index 0000000000000..5645cf9907b52 --- /dev/null +++ b/consensus/src/transaction_shuffler/use_case_aware/iterator.rs @@ -0,0 +1,78 @@ +// Copyright (c) Aptos Foundation +// SPDX-License-Identifier: Apache-2.0 + +use crate::transaction_shuffler::use_case_aware::{ + delayed_queue::DelayedQueue, + types::{InputIdx, OutputIdx, UseCaseAwareTransaction}, + Config, +}; +use std::{collections::VecDeque, fmt::Debug}; + +#[derive(Debug)] +pub(super) struct ShuffledTransactionIterator { + input_queue: VecDeque, + delayed_queue: DelayedQueue, + input_idx: InputIdx, + output_idx: OutputIdx, +} + +impl ShuffledTransactionIterator +where + Txn: UseCaseAwareTransaction + Debug, +{ + pub(super) fn new(config: Config) -> Self { + Self { + input_queue: VecDeque::new(), + delayed_queue: DelayedQueue::new(config), + input_idx: 0, + output_idx: 0, + } + } + + pub(super) fn extended_with(mut self, txns: impl IntoIterator) -> Self { + self.input_queue.extend(txns); + self + } + + pub(super) fn select_next_txn(&mut self) -> Option { + let ret = self.select_next_txn_inner(); + if ret.is_some() { + self.output_idx += 1; + } + ret + } + + pub(super) fn select_next_txn_inner(&mut self) -> Option { + self.delayed_queue.bump_output_idx(self.output_idx); + + // 1. if anything delayed became ready, return it + if let Some(txn) = self.delayed_queue.pop_head(true) { + return Some(txn); + } + + // 2. Otherwise, seek in the input queue for something that shouldn't be delayed due to either + // the sender or the use case. + while let Some(txn) = self.input_queue.pop_front() { + let input_idx = self.input_idx; + self.input_idx += 1; + + if let Some(txn) = self.delayed_queue.queue_or_return(input_idx, txn) { + return Some(txn); + } + } + + // 3. If nothing is ready, return the next eligible from the delay queue + self.delayed_queue.pop_head(false) + } +} + +impl Iterator for ShuffledTransactionIterator +where + Txn: UseCaseAwareTransaction + Debug, +{ + type Item = Txn; + + fn next(&mut self) -> Option { + self.select_next_txn() + } +} diff --git a/consensus/src/transaction_shuffler/use_case_aware/mod.rs b/consensus/src/transaction_shuffler/use_case_aware/mod.rs new file mode 100644 index 0000000000000..5a6daff997bbb --- /dev/null +++ b/consensus/src/transaction_shuffler/use_case_aware/mod.rs @@ -0,0 +1,48 @@ +// Copyright (c) Aptos Foundation +// SPDX-License-Identifier: Apache-2.0 + +use crate::transaction_shuffler::{use_case_aware::types::UseCaseKey, TransactionShuffler}; +use aptos_types::transaction::SignedTransaction; +use iterator::ShuffledTransactionIterator; + +pub(crate) mod iterator; +pub(crate) mod types; +pub(crate) mod utils; + +pub(crate) mod delayed_queue; +#[cfg(test)] +mod tests; + +#[derive(Clone, Debug, Default)] +pub(crate) struct Config { + pub sender_spread_factor: usize, + pub platform_use_case_spread_factor: usize, + pub user_use_case_spread_factor: usize, +} + +impl Config { + pub(crate) fn sender_spread_factor(&self) -> usize { + self.sender_spread_factor + } + + pub(crate) fn use_case_spread_factor(&self, use_case_key: &UseCaseKey) -> usize { + use UseCaseKey::*; + + match use_case_key { + Platform => self.platform_use_case_spread_factor, + ContractAddress(..) | Others => self.user_use_case_spread_factor, + } + } +} + +pub struct UseCaseAwareShuffler { + pub config: Config, +} + +impl TransactionShuffler for UseCaseAwareShuffler { + fn shuffle(&self, txns: Vec) -> Vec { + ShuffledTransactionIterator::new(self.config.clone()) + .extended_with(txns) + .collect() + } +} diff --git a/consensus/src/transaction_shuffler/use_case_aware/tests/manual.rs b/consensus/src/transaction_shuffler/use_case_aware/tests/manual.rs new file mode 100644 index 0000000000000..5b2a7d1a25c83 --- /dev/null +++ b/consensus/src/transaction_shuffler/use_case_aware/tests/manual.rs @@ -0,0 +1,203 @@ +// Copyright (c) Aptos Foundation +// SPDX-License-Identifier: Apache-2.0 + +use crate::transaction_shuffler::use_case_aware::{ + iterator::ShuffledTransactionIterator, + tests, + tests::{Account, Contract}, + Config, +}; +use itertools::Itertools; + +const PP: Contract = Contract::Platform; +const OO: Contract = Contract::Others; +const C1: Contract = Contract::User(0xF1); +const C2: Contract = Contract::User(0xF2); +const C3: Contract = Contract::User(0xF3); +const A1: Account = Account(1); +const A2: Account = Account(2); +const A3: Account = Account(3); +const A4: Account = Account(4); + +fn assert_shuffle_result( + config: Config, + txns: impl IntoIterator, + expected_order: impl IntoIterator, +) { + let txns = tests::into_txns(txns); + let actual_order = ShuffledTransactionIterator::new(config) + .extended_with(txns) + .map(|txn| txn.original_idx) + .collect_vec(); + let expected_order = expected_order.into_iter().collect_vec(); + assert_eq!(actual_order, expected_order, "actual != expected"); +} + +fn three_senders_txns() -> [(Contract, Account); 10] { + [ + // 5 txns from A1 + (PP, A1), + (OO, A1), + (C1, A1), + (C2, A1), + (C3, A1), + // 3 txns from A2 + (PP, A2), + (PP, A2), + (PP, A2), + // 2 txns from A3 + (C1, A3), + (C1, A3), + ] +} + +#[test] +fn test_no_spreading() { + let config = Config { + sender_spread_factor: 0, + platform_use_case_spread_factor: 0, + user_use_case_spread_factor: 0, + }; + let txns = three_senders_txns(); + + assert_shuffle_result(config, txns, [0, 1, 2, 3, 4, 5, 6, 7, 8, 9]); +} + +#[test] +fn test_spread_by_sender_1() { + let config = Config { + sender_spread_factor: 1, + // ignore use case conflicts + platform_use_case_spread_factor: 0, + // ignore use case conflicts + user_use_case_spread_factor: 0, + }; + let txns = three_senders_txns(); + + assert_shuffle_result(config, txns, [0, 5, 1, 6, 2, 7, 3, 8, 4, 9]); +} + +#[test] +fn test_spread_by_sender_by_large_factor() { + for sender_spread_factor in [2, 3, 4] { + let config = Config { + sender_spread_factor, + platform_use_case_spread_factor: 0, + user_use_case_spread_factor: 0, + }; + let txns = three_senders_txns(); + + assert_shuffle_result(config, txns, [0, 5, 8, 1, 6, 9, 2, 7, 3, 4]); + } +} + +fn three_contracts_txns() -> [(Contract, Account); 10] { + [ + // 5 txns from C1 + (C1, A1), + (C1, A1), + (C1, A1), + (C1, A1), + (C1, A1), + // 3 txns from C2 + (C2, A2), + (C2, A2), + (C2, A2), + // 2 txns from C3 + (C3, A3), + (C3, A3), + ] +} + +#[test] +fn test_spread_by_use_case_1() { + let config = Config { + sender_spread_factor: 0, + platform_use_case_spread_factor: 0, + user_use_case_spread_factor: 1, + }; + let txns = three_contracts_txns(); + + assert_shuffle_result(config, txns, [0, 5, 1, 6, 2, 7, 3, 8, 4, 9]); +} + +#[test] +fn test_spread_by_use_case_by_large_factor() { + for user_use_case_spread_factor in [2, 3, 4] { + let config = Config { + sender_spread_factor: 0, + platform_use_case_spread_factor: 0, + user_use_case_spread_factor, + }; + let txns = three_contracts_txns(); + + assert_shuffle_result(config, txns, [0, 5, 8, 1, 6, 9, 2, 7, 3, 4]); + } +} + +fn user_and_platform_use_cases() -> [(Contract, Account); 10] { + [ + // 5 txns from C1 + (C1, A1), + (C1, A1), + (C1, A1), + (C1, A1), + (C1, A1), + // 3 txns from C2 + (PP, A2), + (PP, A2), + (PP, A2), + // 2 txns from C3 + (PP, A3), + (PP, A3), + ] +} + +#[test] +fn test_platform_txn_priority_0() { + let config = Config { + sender_spread_factor: 0, + platform_use_case_spread_factor: 0, + user_use_case_spread_factor: 3, + }; + let txns = user_and_platform_use_cases(); + + assert_shuffle_result(config, txns, [0, 5, 6, 7, 1, 8, 9, 2, 3, 4]); +} + +#[test] +fn test_platform_txn_priority_1() { + let config = Config { + sender_spread_factor: 0, + platform_use_case_spread_factor: 1, + user_use_case_spread_factor: 3, + }; + let txns = user_and_platform_use_cases(); + + assert_shuffle_result(config, txns, [0, 5, 6, 1, 7, 8, 2, 9, 3, 4]); +} + +#[test] +fn test_spread_sender_within_use_case() { + let config = Config { + sender_spread_factor: 2, + platform_use_case_spread_factor: 0, + user_use_case_spread_factor: 1, + }; + let txns = [ + // 5 txns from C1 + (C1, A1), + (C1, A1), + (C1, A2), + (C1, A2), + (C1, A2), + // 3 txns from C2 + (C2, A3), + (C2, A3), + (C2, A3), + (C2, A4), + (C2, A4), + ]; + + assert_shuffle_result(config, txns, [0, 5, 2, 8, 1, 6, 3, 9, 4, 7]); +} diff --git a/consensus/src/transaction_shuffler/use_case_aware/tests/mod.rs b/consensus/src/transaction_shuffler/use_case_aware/tests/mod.rs new file mode 100644 index 0000000000000..58860bd4c11cc --- /dev/null +++ b/consensus/src/transaction_shuffler/use_case_aware/tests/mod.rs @@ -0,0 +1,94 @@ +// Copyright (c) Aptos Foundation +// SPDX-License-Identifier: Apache-2.0 + +use crate::transaction_shuffler::use_case_aware::types::{UseCaseAwareTransaction, UseCaseKey}; +use move_core_types::account_address::AccountAddress; +use proptest_derive::Arbitrary; +use std::fmt::Debug; + +mod manual; +mod proptests; + +#[derive(Arbitrary)] +enum Contract { + Platform, + Others, + User(u8), +} + +impl Debug for Contract { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + use Contract::*; + + write!(f, "c{}", match self { + Platform => "PP".to_string(), + Others => "OO".to_string(), + User(addr) => hex::encode_upper(addr.to_be_bytes()), + }) + } +} + +#[derive(Arbitrary)] +struct Account(u8); + +impl Debug for Account { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + write!(f, "a{}", hex::encode_upper(self.0.to_be_bytes())) + } +} + +impl Account { + fn as_account_address(&self) -> AccountAddress { + let mut addr = [0u8; 32]; + addr[31..].copy_from_slice(&self.0.to_be_bytes()); + AccountAddress::new(addr) + } +} + +struct Transaction { + contract: Contract, + sender: Account, + original_idx: usize, +} + +impl Debug for Transaction { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + write!( + f, + "t{}:{:?}{:?}", + self.original_idx, self.contract, self.sender + ) + } +} + +impl UseCaseAwareTransaction for Transaction { + fn parse_sender(&self) -> AccountAddress { + self.sender.as_account_address() + } + + fn parse_use_case(&self) -> UseCaseKey { + use UseCaseKey::*; + + match self.contract { + Contract::Platform => Platform, + Contract::Others => Others, + Contract::User(c) => ContractAddress(Account(c).as_account_address()), + } + } +} + +fn into_txns(txns: impl IntoIterator) -> Vec { + let mut original_idx = 0; + txns.into_iter() + .map(|(contract, sender)| { + let txn = Transaction { + contract, + sender, + original_idx, + }; + + original_idx += 1; + txn + }) + .collect() +} diff --git a/consensus/src/transaction_shuffler/use_case_aware/tests/proptests.rs b/consensus/src/transaction_shuffler/use_case_aware/tests/proptests.rs new file mode 100644 index 0000000000000..ef84499776d34 --- /dev/null +++ b/consensus/src/transaction_shuffler/use_case_aware/tests/proptests.rs @@ -0,0 +1,49 @@ +// Copyright (c) Aptos Foundation +// SPDX-License-Identifier: Apache-2.0 + +use crate::transaction_shuffler::use_case_aware::{ + iterator::ShuffledTransactionIterator, + tests::{into_txns, Account, Contract, Transaction}, + Config, +}; +use itertools::Itertools; +use proptest::{collection::vec, prelude::*}; +use std::collections::HashMap; + +fn txn_indices_by_account(txns: &[Transaction]) -> HashMap> { + txns.iter() + .map(|txn| (txn.sender.0, txn.original_idx)) + .into_group_map() +} + +proptest! { + #[test] + fn test_no_panic( + txns in vec(any::<(Contract, Account)>(), 0..100) + .prop_map(into_txns), + sender_factor in 0..100usize, + platform_factor in 0..100usize, + user_contract_factor in 0..100usize, + ) { + let num_txns = txns.len(); + let txns_by_account = txn_indices_by_account(&txns); + + let config = Config { + sender_spread_factor: sender_factor, + platform_use_case_spread_factor: platform_factor, + user_use_case_spread_factor: user_contract_factor, + }; + + let shuffled_txns = ShuffledTransactionIterator::new(config) + .extended_with(txns) + .collect_vec(); + + prop_assert_eq!( + txn_indices_by_account(&shuffled_txns), + txns_by_account + ); + + let txn_indices = shuffled_txns.into_iter().map(|txn| txn.original_idx).sorted().collect_vec(); + prop_assert_eq!(txn_indices, (0..num_txns).collect_vec()); + } +} diff --git a/consensus/src/transaction_shuffler/use_case_aware/types.rs b/consensus/src/transaction_shuffler/use_case_aware/types.rs new file mode 100644 index 0000000000000..3ffda393e0640 --- /dev/null +++ b/consensus/src/transaction_shuffler/use_case_aware/types.rs @@ -0,0 +1,58 @@ +// Copyright (c) Aptos Foundation +// SPDX-License-Identifier: Apache-2.0 + +use aptos_types::transaction::SignedTransaction; +use move_core_types::account_address::AccountAddress; + +pub(crate) type InputIdx = usize; +pub(crate) type OutputIdx = usize; + +#[derive(Clone, Eq, Hash, PartialEq)] +pub(crate) enum UseCaseKey { + Platform, + ContractAddress(AccountAddress), + // ModuleBundle (deprecated anyway), scripts, Multisig. + Others, +} + +impl std::fmt::Debug for UseCaseKey { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + use UseCaseKey::*; + + match self { + Platform => write!(f, "PP"), + ContractAddress(addr) => write!(f, "c{}", hex::encode_upper(&addr[31..])), + Others => write!(f, "OO"), + } + } +} + +pub(crate) trait UseCaseAwareTransaction { + fn parse_sender(&self) -> AccountAddress; + + fn parse_use_case(&self) -> UseCaseKey; +} + +impl UseCaseAwareTransaction for SignedTransaction { + fn parse_sender(&self) -> AccountAddress { + self.sender() + } + + fn parse_use_case(&self) -> UseCaseKey { + use aptos_types::transaction::TransactionPayload::*; + use UseCaseKey::*; + + match self.payload() { + Script(_) | ModuleBundle(_) | Multisig(_) => Others, + EntryFunction(entry_fun) => { + let module_id = entry_fun.module(); + if module_id.address().is_special() { + Platform + } else { + // n.b. Generics ignored. + ContractAddress(*module_id.address()) + } + }, + } + } +} diff --git a/consensus/src/transaction_shuffler/use_case_aware/utils.rs b/consensus/src/transaction_shuffler/use_case_aware/utils.rs new file mode 100644 index 0000000000000..a0e75d075766e --- /dev/null +++ b/consensus/src/transaction_shuffler/use_case_aware/utils.rs @@ -0,0 +1,43 @@ +// Copyright (c) Aptos Foundation +// SPDX-License-Identifier: Apache-2.0 + +use std::{ + collections::{BTreeMap, HashMap}, + hash::Hash, +}; + +pub(crate) trait StrictMap { + fn strict_insert(&mut self, key: K, value: V); + + fn strict_remove(&mut self, key: &K); + + fn expect_mut(&mut self, key: &K) -> &mut V; +} + +impl StrictMap for HashMap { + fn strict_insert(&mut self, key: K, value: V) { + assert!(self.insert(key, value).is_none()) + } + + fn strict_remove(&mut self, key: &K) { + assert!(self.remove(key).is_some()) + } + + fn expect_mut(&mut self, key: &K) -> &mut V { + self.get_mut(key).expect("Known to exist.") + } +} + +impl StrictMap for BTreeMap { + fn strict_insert(&mut self, key: K, value: V) { + assert!(self.insert(key, value).is_none()) + } + + fn strict_remove(&mut self, key: &K) { + assert!(self.remove(key).is_some()) + } + + fn expect_mut(&mut self, key: &K) -> &mut V { + self.get_mut(key).expect("Known to exist.") + } +} diff --git a/testsuite/forge-cli/src/suites/dag.rs b/testsuite/forge-cli/src/suites/dag.rs index 1d7eabdd5263e..0365d2aef66b5 100644 --- a/testsuite/forge-cli/src/suites/dag.rs +++ b/testsuite/forge-cli/src/suites/dag.rs @@ -108,7 +108,7 @@ fn dag_realistic_env_max_load_test( } OnChainExecutionConfig::V4(config_v4) => { config_v4.block_gas_limit_type = BlockGasLimitType::NoLimit; - config_v4.transaction_shuffler_type = TransactionShufflerType::SenderAwareV2(32); + config_v4.transaction_shuffler_type = TransactionShufflerType::default_for_genesis(); } } helm_values["chain"]["on_chain_execution_config"] = @@ -207,7 +207,6 @@ fn dag_reconfig_enable_test() -> ForgeConfig { } OnChainExecutionConfig::V4(config_v4) => { config_v4.block_gas_limit_type = BlockGasLimitType::NoLimit; - config_v4.transaction_shuffler_type = TransactionShufflerType::SenderAwareV2(32); } } helm_values["chain"]["on_chain_execution_config"] = diff --git a/testsuite/smoke-test/src/aptos_cli/validator.rs b/testsuite/smoke-test/src/aptos_cli/validator.rs index 5de3d15fa3e27..69e7bd3429074 100644 --- a/testsuite/smoke-test/src/aptos_cli/validator.rs +++ b/testsuite/smoke-test/src/aptos_cli/validator.rs @@ -388,7 +388,7 @@ async fn test_onchain_shuffling_change() { assert_eq!( current_execution_config.transaction_shuffler_type(), - TransactionShufflerType::SenderAwareV2(32), + TransactionShufflerType::default_for_genesis(), ); assert_reordering(&mut swarm, true).await; diff --git a/types/src/on_chain_config/execution_config.rs b/types/src/on_chain_config/execution_config.rs index 6f091d11cd30c..97ce9773022af 100644 --- a/types/src/on_chain_config/execution_config.rs +++ b/types/src/on_chain_config/execution_config.rs @@ -70,7 +70,7 @@ impl OnChainExecutionConfig { /// Features that are ready for deployment can be enabled here. pub fn default_for_genesis() -> Self { OnChainExecutionConfig::V4(ExecutionConfigV4 { - transaction_shuffler_type: TransactionShufflerType::SenderAwareV2(32), + transaction_shuffler_type: TransactionShufflerType::default_for_genesis(), block_gas_limit_type: BlockGasLimitType::default_for_genesis(), transaction_deduper_type: TransactionDeduperType::TxnHashAndAuthenticatorV1, }) @@ -153,6 +153,21 @@ pub enum TransactionShufflerType { module_conflict_window_size: u32, entry_fun_conflict_window_size: u32, }, + UseCaseAware { + sender_spread_factor: usize, + platform_use_case_spread_factor: usize, + user_use_case_spread_factor: usize, + }, +} + +impl TransactionShufflerType { + pub fn default_for_genesis() -> Self { + TransactionShufflerType::UseCaseAware { + sender_spread_factor: 32, + platform_use_case_spread_factor: 0, + user_use_case_spread_factor: 4, + } + } } #[derive(Clone, Debug, Deserialize, Eq, PartialEq, Serialize)] @@ -337,29 +352,29 @@ mod test { #[test] fn test_config_serialization() { let config = OnChainExecutionConfig::V1(ExecutionConfigV1 { - transaction_shuffler_type: TransactionShufflerType::SenderAwareV2(32), + transaction_shuffler_type: TransactionShufflerType::default_for_genesis(), }); let s = serde_yaml::to_string(&config).unwrap(); let result = serde_yaml::from_str::(&s).unwrap(); - assert!(matches!( + assert_eq!( result.transaction_shuffler_type(), - TransactionShufflerType::SenderAwareV2(32) - )); + TransactionShufflerType::default_for_genesis(), + ); // V2 test with random per-block gas limit let rand_gas_limit = rand::thread_rng().gen_range(0, 1000000) as u64; let config = OnChainExecutionConfig::V2(ExecutionConfigV2 { - transaction_shuffler_type: TransactionShufflerType::SenderAwareV2(32), + transaction_shuffler_type: TransactionShufflerType::default_for_genesis(), block_gas_limit: Some(rand_gas_limit), }); let s = serde_yaml::to_string(&config).unwrap(); let result = serde_yaml::from_str::(&s).unwrap(); - assert!(matches!( + assert_eq!( result.transaction_shuffler_type(), - TransactionShufflerType::SenderAwareV2(32) - )); + TransactionShufflerType::default_for_genesis(), + ); assert_eq!( result.block_gas_limit_type(), BlockGasLimitType::Limit(rand_gas_limit) @@ -367,16 +382,16 @@ mod test { // V2 test with no per-block gas limit let config = OnChainExecutionConfig::V2(ExecutionConfigV2 { - transaction_shuffler_type: TransactionShufflerType::SenderAwareV2(32), + transaction_shuffler_type: TransactionShufflerType::default_for_genesis(), block_gas_limit: None, }); let s = serde_yaml::to_string(&config).unwrap(); let result = serde_yaml::from_str::(&s).unwrap(); - assert!(matches!( + assert_eq!( result.transaction_shuffler_type(), - TransactionShufflerType::SenderAwareV2(32) - )); + TransactionShufflerType::default_for_genesis(), + ); assert_eq!(result.block_gas_limit_type(), BlockGasLimitType::NoLimit); }