From 4c2abca2b279129b2febfffc1ad8869d6d926281 Mon Sep 17 00:00:00 2001 From: aldenhu Date: Wed, 19 Jun 2024 01:34:32 +0000 Subject: [PATCH] wip --- .../use_case_aware/delayed_queue.rs | 506 ++++++++++++++++++ .../use_case_aware/iterator.rs | 275 ++-------- .../use_case_aware/mod.rs | 27 +- .../use_case_aware/tests.rs | 91 ++-- .../{transaction.rs => types.rs} | 9 +- 5 files changed, 622 insertions(+), 286 deletions(-) create mode 100644 consensus/src/transaction_shuffler/use_case_aware/delayed_queue.rs rename consensus/src/transaction_shuffler/use_case_aware/{transaction.rs => types.rs} (83%) diff --git a/consensus/src/transaction_shuffler/use_case_aware/delayed_queue.rs b/consensus/src/transaction_shuffler/use_case_aware/delayed_queue.rs new file mode 100644 index 00000000000000..375c17661d6423 --- /dev/null +++ b/consensus/src/transaction_shuffler/use_case_aware/delayed_queue.rs @@ -0,0 +1,506 @@ +// Copyright (c) Aptos Foundation +// SPDX-License-Identifier: Apache-2.0 + +// FIXME(aldenhu): remove +#![allow(unused_variables)] + +use crate::transaction_shuffler::use_case_aware::{ + types::{InputIdx, OutputIdx, UseCaseAwareTransaction, UseCaseKey}, + utils::StrictMap, + Config, +}; +use move_core_types::account_address::AccountAddress; +use std::collections::{hash_map, BTreeMap, HashMap, VecDeque}; + +#[derive(Clone, Copy, Eq, Ord, PartialEq, PartialOrd)] +struct PriorityKey { + try_delay_till: OutputIdx, + input_idx: InputIdx, +} + +struct TxnWithInputIdx { + input_idx: InputIdx, + txn: Txn, +} + +struct Account { + try_delay_till: OutputIdx, + /// Head txn input_idx, tracked for use when the txns queue is empty, in which case + /// it keeps the value before the last txn was dequeued. + input_idx: InputIdx, + /// Head txn use case key, tracked for use when the txns queue is empty, in which case + /// it keeps the value before the last txn was dequeued. + use_case_key: UseCaseKey, + txns: VecDeque>, +} + +impl Account +where + Txn: UseCaseAwareTransaction, +{ + fn new_with_txn(try_delay_till: OutputIdx, input_idx: InputIdx, txn: Txn) -> Self { + let use_case_key = txn.parse_use_case(); + let txns = vec![TxnWithInputIdx { input_idx, txn }].into(); + Self { + try_delay_till, + input_idx, + use_case_key, + txns, + } + } + + fn new_empty(try_delay_till: OutputIdx, input_idx: InputIdx, use_case_key: UseCaseKey) -> Self { + Self { + try_delay_till, + input_idx, + use_case_key, + txns: VecDeque::new(), + } + } + + fn is_empty(&self) -> bool { + self.txns.is_empty() + } + + fn priority_key(&self) -> PriorityKey { + PriorityKey { + try_delay_till: self.try_delay_till, + input_idx: self.input_idx, + } + } + + fn use_case_key(&self) -> &UseCaseKey { + &self.use_case_key + } + + fn queue_txn(&mut self, input_idx: InputIdx, txn: Txn) { + if let Some(last_txn) = self.txns.back() { + assert!(last_txn.input_idx < input_idx); + } else { + self.use_case_key = txn.parse_use_case(); + self.input_idx = input_idx; + } + self.txns.push_back(TxnWithInputIdx { input_idx, txn }); + } + + fn expect_dequeue_txn(&mut self) -> Txn { + let txn = self.txns.pop_front().expect("Must exist."); + if let Some(next_txn) = self.txns.front() { + self.use_case_key = next_txn.txn.parse_use_case(); + self.input_idx = next_txn.input_idx; + } + txn.txn + } + + fn update_try_delay_till(&mut self, try_delay_till: OutputIdx) { + self.try_delay_till = try_delay_till; + } +} + +struct UseCase { + try_delay_till: OutputIdx, + /// Head account input_idx, tracked for use when the accounts queue is empty, in which case + /// it keeps the value before the last account was removed. + input_idx: InputIdx, + account_by_priority: BTreeMap, +} + +impl UseCase { + fn new_with_account( + try_delay_till: OutputIdx, + address: AccountAddress, + account: &Account, + ) -> Self + where + Txn: UseCaseAwareTransaction, + { + let account_prio_key = account.priority_key(); + let input_idx = account_prio_key.input_idx; + let mut account_by_priority = BTreeMap::new(); + account_by_priority.insert(account_prio_key, address); + + Self { + try_delay_till, + input_idx, + account_by_priority, + } + } + + fn is_empty(&self) -> bool { + self.account_by_priority.is_empty() + } + + fn priority_key(&self) -> PriorityKey { + // If head account will be ready later than the use case itself, respect that. + let try_delay_till = std::cmp::max( + self.try_delay_till, + self.account_by_priority + .first_key_value() + .map_or(0, |(k, _)| k.try_delay_till), + ); + + PriorityKey { + try_delay_till, + input_idx: self.input_idx, + } + } + + fn expect_pop_head_account(&mut self) -> (PriorityKey, AccountAddress) { + let (account_prio_key, address) = + self.account_by_priority.pop_first().expect("Must exist."); + if let Some((next_account_prio_key, _)) = self.account_by_priority.first_key_value() { + self.input_idx = next_account_prio_key.input_idx; + } + (account_prio_key, address) + } + + fn update_try_delay_till(&mut self, try_delay_till: OutputIdx) { + self.try_delay_till = try_delay_till; + } + + fn add_account(&mut self, address: AccountAddress, account: &Account) + where + Txn: UseCaseAwareTransaction, + { + let account_prio_key = account.priority_key(); + self.account_by_priority.insert(account_prio_key, address); + let (_, head_address) = self.account_by_priority.first_key_value().unwrap(); + if *head_address == address { + self.input_idx = account_prio_key.input_idx; + } + } +} + +enum State { + Initialized, + ExpectOutputIdxBump, + OutputIdxBumped, +} + +pub(crate) struct DelayedQueue { + state: State, + accounts: HashMap>, + use_cases: HashMap, + use_case_key_by_priority: BTreeMap, + output_idx: OutputIdx, + config: Config, +} + +impl DelayedQueue +where + Txn: UseCaseAwareTransaction, +{ + pub fn new(config: Config) -> Self { + Self { + state: State::Initialized, + accounts: HashMap::new(), + use_cases: HashMap::new(), + use_case_key_by_priority: BTreeMap::new(), + output_idx: 0, + config, + } + } + + fn assert_and_bump_output_idx(&mut self, output_idx: OutputIdx) { + match self.state { + State::Initialized => { + assert_eq!(output_idx, 0); + }, + State::ExpectOutputIdxBump => { + assert!(output_idx > self.output_idx); + self.output_idx = output_idx; + }, + State::OutputIdxBumped => { + panic!("Found bug: trying to bump output_idx without processing the last bump."); + }, + } + } + + fn assert_output_idx_bumped(&self) { + match self.state { + State::Initialized | State::ExpectOutputIdxBump => { + panic!("Found bug: output_idx not bumped yet."); + }, + State::OutputIdxBumped => {}, + } + } + + /// Remove stale (empty use cases with try_delay_till <= self.output_idx) placeholders. + fn drain_placeholders(&mut self) { + while let Some(use_case_entry) = self.use_case_key_by_priority.first_entry() { + let use_case_prio_key = use_case_entry.key(); + let use_case_key = use_case_entry.get().clone(); + + if use_case_prio_key.try_delay_till > self.output_idx { + break; + } + let use_case = self.use_cases.expect_mut(&use_case_key); + + let mut maybe_use_case_prio_changed = false; + while let Some(account_entry) = use_case.account_by_priority.first_entry() { + let account_prio_key = account_entry.key(); + let address = account_entry.get(); + + if account_prio_key.try_delay_till > self.output_idx { + break; + } + + let account = self.accounts.expect_mut(address); + if account.is_empty() { + maybe_use_case_prio_changed = true; + self.accounts.remove(address); + account_entry.remove(); + } else { + break; + } + } + + if use_case.is_empty() { + self.use_cases.remove(&use_case_key); + use_case_entry.remove(); + } else if maybe_use_case_prio_changed { + use_case_entry.remove(); + self.use_case_key_by_priority + .insert(use_case.priority_key(), use_case_key); + } + } + } + + pub fn bump_output_idx(&mut self, output_idx: OutputIdx) { + self.assert_and_bump_output_idx(output_idx); + self.drain_placeholders(); + self.state = State::OutputIdxBumped; + } + + fn pop_head_impl(&mut self, max_output_idx: Option) -> Option { + // See if the highest priority txn is ready. If not, return None. + let use_case_entry = match self.use_case_key_by_priority.first_entry() { + None => return None, + Some(ocupied_entry) => ocupied_entry, + }; + let use_case_prio_key = use_case_entry.key(); + + // If there's a required max_output_idx, check for it. + if let Some(output_idx) = max_output_idx { + if use_case_prio_key.try_delay_till > output_idx { + return None; + } + } + + // A previously delayed txn is ready. Pop it and update priorities. + + // Both the use case and account need to be removed from the priority queues. + let use_case_prio_key = *use_case_prio_key; + let use_case_key = use_case_entry.remove(); + let use_case = self.use_cases.expect_mut(&use_case_key); + let (account_prio_key, address) = use_case.expect_pop_head_account(); + assert!(account_prio_key.try_delay_till <= use_case_prio_key.try_delay_till); + assert_eq!(account_prio_key.input_idx, use_case_prio_key.input_idx); + + // Pop first txn from account (for returning it later). + let account = self.accounts.expect_mut(&address); + let txn = account.expect_dequeue_txn(); + + // Update priorities. + account.update_try_delay_till(self.output_idx + 1 + self.config.sender_spread_factor()); + use_case.update_try_delay_till( + self.output_idx + 1 + self.config.use_case_spread_factor(&use_case_key), + ); + + // n.b. Add account and original use case back to priority queue even if they are + // empty because they serve as placeholders to track the per account / per use case + // delays. + + // See if account now belongs to a different use case. + let new_use_case_key = account.use_case_key(); + + if *new_use_case_key != use_case_key { + // add the use case back to the priority queue + self.use_case_key_by_priority + .strict_insert(use_case.priority_key(), use_case_key.clone()); + // Add the account to the new use case. + match self.use_cases.entry(new_use_case_key.clone()) { + hash_map::Entry::Occupied(mut entry) => { + // existing use case, remove from priority queue + let new_use_case = entry.get_mut(); + self.use_case_key_by_priority + .strict_remove(&new_use_case.priority_key()); + new_use_case.add_account(address, account); + // add the new use case to the priority queue + self.use_case_key_by_priority + .strict_insert(new_use_case.priority_key(), new_use_case_key.clone()); + }, + hash_map::Entry::Vacant(entry) => { + // use case not tracked previously, try_delay_till = output_idx + 1 + let new_use_case = entry.insert(UseCase::new_with_account( + self.output_idx + 1, + address, + account, + )); + // add the new use case to the priority queue + self.use_case_key_by_priority + .strict_insert(new_use_case.priority_key(), new_use_case_key.clone()); + }, + }; + } else { + // add the account back to the original use case + use_case.add_account(address, account); + // and add the use case back to the priority queue (with updated priority) + self.use_case_key_by_priority + .strict_insert(use_case.priority_key(), use_case_key.clone()); + } + + self.state = State::ExpectOutputIdxBump; + Some(txn) + } + + pub fn pop_head_if_ready(&mut self) -> Option { + self.assert_output_idx_bumped(); + self.pop_head_impl(Some(self.output_idx)) + } + + /// No matter if eligible already at current output_idx, pop and return the earliest txn. + pub fn pop_head(&mut self) -> Option { + self.assert_output_idx_bumped(); + self.pop_head_impl(None) + } + + /// Txn from input queue directly selected for output, needs to bump delays for relevant + /// account and use case. + fn update_delays( + &mut self, + input_idx: InputIdx, + txn: &Txn, + address: AccountAddress, + use_case_key: UseCaseKey, + ) { + let account_try_delay_till = self.output_idx + 1 + self.config.sender_spread_factor(); + match self.accounts.get_mut(&address) { + Some(account) => { + // Account must be empty, otherwise the txn wouldn't have been selected for output. + assert!(account.is_empty()); + let old_use_case_key = account.use_case_key(); + // Account must be tracked under a different use case, otherwise it would've been purged + // in `.bump_output_idx()`. + assert_ne!(old_use_case_key, &use_case_key); + + let old_use_case = self.use_cases.expect_mut(old_use_case_key); + // Remove it from the old use case's priority queue and update the old use case's + // position in the priority queue. + self.use_case_key_by_priority + .strict_remove(&old_use_case.priority_key()); + old_use_case + .account_by_priority + .strict_remove(&account.priority_key()); + self.use_case_key_by_priority + .strict_insert(old_use_case.priority_key(), old_use_case_key.clone()); + + // Replace account with different keys. + *account = + Account::new_empty(account_try_delay_till, input_idx, use_case_key.clone()); + }, + None => { + let account = + Account::new_empty(account_try_delay_till, input_idx, use_case_key.clone()); + self.accounts.strict_insert(address, account); + }, + } + + match self.use_cases.entry(use_case_key.clone()) { + hash_map::Entry::Occupied(..) => { + // Use case must not been tracked, otherwise a txn would have been selected + // from it for this round or any new input txn for this use case would need to be + // delayed. + unreachable!("Bug: use case should not have been tracked.") + }, + hash_map::Entry::Vacant(entry) => { + // Use case must be subject to a greater delay than our newly tracked account, + // so no need to more it within the priority queue. + let use_case = UseCase::new_with_account( + self.output_idx + 1 + self.config.use_case_spread_factor(&use_case_key), + address, + &self.accounts[&address], + ); + self.use_case_key_by_priority + .strict_insert(use_case.priority_key(), use_case_key.clone()); + self.use_cases.strict_insert(use_case_key, use_case); + }, + } + } + + /// Txn has to be delayed, attach it to respective account and use case. + fn add_txn( + &mut self, + input_idx: InputIdx, + txn: Txn, + address: AccountAddress, + use_case_key: UseCaseKey, + ) { + match self.accounts.get_mut(&address) { + Some(account) => { + if account.is_empty() { + // account was empty (placeholder), it's not in any use case's priority queue. + account.queue_txn(input_idx, txn); + match self.use_cases.get_mut(&use_case_key) { + Some(use_case) => { + self.use_case_key_by_priority + .strict_remove(&use_case.priority_key()); + use_case.add_account(address, account); + self.use_case_key_by_priority + .strict_insert(use_case.priority_key(), use_case_key.clone()); + }, + None => { + let use_case = + UseCase::new_with_account(self.output_idx + 1, address, account); + self.use_case_key_by_priority + .strict_insert(use_case.priority_key(), use_case_key.clone()); + }, + } + } else { + // Account is not empty, appending a txn won't affect its position in the + // priority queue. + account.queue_txn(input_idx, txn); + } + }, + None => { + // Use case must exist but not ready, otherwise the txn wouldn't need to be delayed. + let use_case = self.use_cases.expect_mut(&use_case_key); + // ... Use case must be subject to a greater delay than our newly tracked account, + // so no need to more it within the priority queue. + assert!(use_case.try_delay_till > self.output_idx); + + // Account was not tracked before, create it and attach to respective use case. + let account = Account::new_with_txn(self.output_idx + 1, input_idx, txn); + use_case.add_account(address, &account); + self.accounts.insert(address, account); + }, + }; + } + + /// Return the txn back if relevant use case and sender are not subject to delaying. Otherwise, + /// Queue it up. + pub fn add_or_return(&mut self, input_idx: InputIdx, txn: Txn) -> Option { + self.assert_output_idx_bumped(); + + let address = txn.parse_sender(); + let account_opt = self.accounts.get_mut(&address); + let use_case_key = txn.parse_use_case(); + let use_case_opt = self.use_cases.get_mut(&use_case_key); + + let account_should_delay = account_opt + .as_ref() + .map_or(false, |account| account.try_delay_till > self.output_idx); + let use_case_should_delay = use_case_opt + .as_ref() + .map_or(false, |use_case| use_case.try_delay_till > self.output_idx); + + if !account_should_delay && !use_case_should_delay { + self.update_delays(input_idx, &txn, address, use_case_key); + self.state = State::ExpectOutputIdxBump; + Some(txn) + } else { + self.add_txn(input_idx, txn, address, use_case_key); + None + } + } +} diff --git a/consensus/src/transaction_shuffler/use_case_aware/iterator.rs b/consensus/src/transaction_shuffler/use_case_aware/iterator.rs index c95dafb16c5f01..00dd006fa016e8 100644 --- a/consensus/src/transaction_shuffler/use_case_aware/iterator.rs +++ b/consensus/src/transaction_shuffler/use_case_aware/iterator.rs @@ -2,257 +2,76 @@ // SPDX-License-Identifier: Apache-2.0 use crate::transaction_shuffler::use_case_aware::{ - transaction::{UseCaseKey, UseCaseTransaction}, - utils::StrictMap, + delayed_queue::DelayedQueue, + types::{InputIdx, OutputIdx, UseCaseAwareTransaction}, Config, }; -use itertools::Itertools; -use std::collections::{BTreeMap, HashMap, VecDeque}; +use std::collections::VecDeque; -type OriginalIdx = usize; -type OutputIdx = usize; - -#[derive(Clone, Copy, Eq, Ord, PartialEq, PartialOrd)] -struct PriorityKey { - try_delay_till: OutputIdx, - original_idx: OriginalIdx, -} - -struct Txn { - original_idx: OriginalIdx, - use_case_key: UseCaseKey, +pub(super) struct ShuffledTransactionIterator { + input_queue: VecDeque, + delayed_queue: DelayedQueue, + input_idx: InputIdx, + output_idx: OutputIdx, } -impl Txn { - fn new(original_idx: OriginalIdx, use_case_key: UseCaseKey) -> Self { +impl ShuffledTransactionIterator +where + Txn: UseCaseAwareTransaction, +{ + pub(super) fn new(config: Config) -> Self { Self { - original_idx, - use_case_key, + input_queue: VecDeque::new(), + delayed_queue: DelayedQueue::new(config), + input_idx: 0, + output_idx: 0, } } -} - -#[derive(Default)] -struct Account { - try_delay_till: OutputIdx, - txns: VecDeque, -} -impl Account { - fn new_unselected(txns: Vec) -> Self { - Self { - try_delay_till: 0, - txns: txns.into(), - } + pub(super) fn extended_with(mut self, txns: impl IntoIterator) -> Self { + self.input_queue.extend(txns); + self } - fn priority_key(&self) -> PriorityKey { - PriorityKey { - try_delay_till: self.try_delay_till, - original_idx: self.expect_first_txn().original_idx, + pub(super) fn select_next_txn(&mut self) -> Option { + let ret = self.select_next_txn_inner(); + if ret.is_some() { + self.output_idx += 1; } + ret } - fn use_case_key(&self) -> UseCaseKey { - self.expect_first_txn().use_case_key.clone() - } - - fn expect_first_txn(&self) -> &Txn { - self.txns.front().expect("Known to exist.") - } - - fn pop_first_transaction(&mut self) -> Txn { - self.txns.pop_front().expect("Known to exist.") - } - - fn is_empty(&self) -> bool { - self.txns.is_empty() - } - - fn update_try_delay_till(&mut self, output_idx: OutputIdx, config: &Config) { - self.try_delay_till = output_idx + config.sender_spread_factor; - } -} - -struct UseCase { - key: UseCaseKey, - try_delay_till: OutputIdx, - /// note: maybe use a heap - account_by_priority: BTreeMap, -} - -impl UseCase { - fn new_unselected(key: UseCaseKey, accounts: Vec) -> Self { - let account_by_priority = accounts - .into_iter() - .map(|account| (account.priority_key(), account)) - .collect(); + pub(super) fn select_next_txn_inner(&mut self) -> Option { + self.delayed_queue.bump_output_idx(self.output_idx); - Self { - key, - try_delay_till: 0, - account_by_priority, - } - } - - fn priority_key(&self) -> PriorityKey { - let mut key = self.expect_first_account().priority_key(); - if self.try_delay_till > key.try_delay_till { - key.try_delay_till = self.try_delay_till; + // 1. if anything delayed became ready, return it + if let Some(txn) = self.delayed_queue.pop_head_if_ready() { + return Some(txn); } - key - } - - fn expect_first_account(&self) -> &Account { - let (_priority_key, account) = self - .account_by_priority - .first_key_value() - .expect("Known to exist."); - account - } - fn pop_first_account(&mut self) -> Account { - let (_priority_key, account) = self - .account_by_priority - .pop_first() - .expect("Known to exist."); - account - } - - fn is_empty(&self) -> bool { - self.account_by_priority.is_empty() - } - - fn update_try_delay_till(&mut self, output_idx: OutputIdx, config: &Config) { - let try_delay = match &self.key { - UseCaseKey::Platform => config.platform_use_case_spread_factor, - UseCaseKey::ContractAddress(_) | UseCaseKey::Others => { - config.user_use_case_spread_factor - }, - }; - self.try_delay_till = output_idx + try_delay; - } - - fn add_account(&mut self, account: Account) { - self.account_by_priority - .strict_insert(account.priority_key(), account) - } -} - -pub(super) struct ShuffledTransactionIterator { - use_case_by_key: HashMap, - use_case_key_by_priority: BTreeMap, - output_idx: OutputIdx, - config: Config, -} - -impl ShuffledTransactionIterator { - pub(super) fn new(txns: &[impl UseCaseTransaction], config: Config) -> Self { - let txns_by_sender = txns - .iter() - .enumerate() - .map(|(original_idx, transaction)| { - ( - transaction.parse_sender(), - Txn::new(original_idx, transaction.parse_use_case()), - ) - }) - .into_group_map(); - - let accounts_by_use_case = txns_by_sender - .into_values() - .map(|txns| { - let account = Account::new_unselected(txns); - (account.use_case_key(), account) - }) - .into_group_map(); - - let use_case_by_key: HashMap<_, _> = accounts_by_use_case - .into_iter() - .map(|(use_case_key, accounts)| { - let use_case = UseCase::new_unselected(use_case_key.clone(), accounts); - (use_case_key, use_case) - }) - .collect(); - - let use_case_key_by_priority = use_case_by_key - .iter() - .map(|(key, use_case)| (use_case.priority_key(), key.clone())) - .collect(); + // 2. Otherwise, seek in the input queue for something that shouldn't be delayed due to either + // the sender or the use case. + while let Some(txn) = self.input_queue.pop_front() { + let input_idx = self.input_idx; + self.input_idx += 1; - Self { - use_case_by_key, - use_case_key_by_priority, - output_idx: 0, - config, + if let Some(txn) = self.delayed_queue.add_or_return(input_idx, txn) { + return Some(txn); + } } + + // 3. If nothing is ready, return the next eligible from the delay queue + self.delayed_queue.pop_head() } } -impl Iterator for ShuffledTransactionIterator { - type Item = OriginalIdx; +impl Iterator for ShuffledTransactionIterator +where + Txn: UseCaseAwareTransaction, +{ + type Item = Txn; fn next(&mut self) -> Option { - if self.use_case_key_by_priority.is_empty() { - return None; - } - - // Select highest priority use case and output the first txn from it. - let (_priority_key, use_case_key) = self - .use_case_key_by_priority - .pop_first() - .expect("Known to exist."); - let mut account = self - .use_case_by_key - .expect_mut(&use_case_key) - .pop_first_account(); - let txn = account.pop_first_transaction(); - - // Update account priority (try_delay_till) and potentially move the account to a new use case. - if !account.is_empty() { - account.update_try_delay_till(self.output_idx, &self.config); - let new_use_case_key = account.use_case_key(); - if new_use_case_key != use_case_key { - match self.use_case_by_key.get_mut(&new_use_case_key) { - Some(new_use_case) => { - self.use_case_key_by_priority - .strict_remove(&new_use_case.priority_key()); - - new_use_case.add_account(account); - - self.use_case_key_by_priority - .strict_insert(new_use_case.priority_key(), new_use_case_key) - }, - None => { - let mut new_use_case = - UseCase::new_unselected(new_use_case_key.clone(), vec![account]); - new_use_case.update_try_delay_till(self.output_idx, &self.config); - - self.use_case_key_by_priority - .strict_insert(new_use_case.priority_key(), new_use_case_key.clone()); - self.use_case_by_key - .strict_insert(new_use_case_key, new_use_case); - }, - } - } else { - // add the account back to the original use case - self.use_case_by_key - .expect_mut(&use_case_key) - .add_account(account); - } - } - - // Update priority (try_delay_till) of the selected use case - let use_case = self.use_case_by_key.expect_mut(&use_case_key); - if use_case.is_empty() { - self.use_case_by_key.strict_remove(&use_case_key); - } else { - use_case.update_try_delay_till(self.output_idx, &self.config); - self.use_case_key_by_priority - .strict_insert(use_case.priority_key(), use_case_key); - } - - self.output_idx += 1; - Some(txn.original_idx) + self.select_next_txn() } } diff --git a/consensus/src/transaction_shuffler/use_case_aware/mod.rs b/consensus/src/transaction_shuffler/use_case_aware/mod.rs index 61f911a68f8d35..a74abd64153dbd 100644 --- a/consensus/src/transaction_shuffler/use_case_aware/mod.rs +++ b/consensus/src/transaction_shuffler/use_case_aware/mod.rs @@ -1,14 +1,15 @@ // Copyright (c) Aptos Foundation // SPDX-License-Identifier: Apache-2.0 -use crate::transaction_shuffler::TransactionShuffler; +use crate::transaction_shuffler::{use_case_aware::types::UseCaseKey, TransactionShuffler}; use aptos_types::transaction::SignedTransaction; use iterator::ShuffledTransactionIterator; -mod iterator; -mod transaction; -mod utils; +pub(crate) mod iterator; +pub(crate) mod types; +pub(crate) mod utils; +pub(crate) mod delayed_queue; #[cfg(test)] mod tests; @@ -19,15 +20,27 @@ pub(crate) struct Config { user_use_case_spread_factor: usize, } +impl Config { + pub(crate) fn sender_spread_factor(&self) -> usize { + self.sender_spread_factor + } + + pub(crate) fn use_case_spread_factor(&self, use_case_key: &UseCaseKey) -> usize { + match use_case_key { + UseCaseKey::Platform => self.platform_use_case_spread_factor, + _ => self.user_use_case_spread_factor, + } + } +} + pub struct UseCaseAwareShuffler { config: Config, } impl TransactionShuffler for UseCaseAwareShuffler { fn shuffle(&self, txns: Vec) -> Vec { - ShuffledTransactionIterator::new(&txns, self.config.clone()) - .map(|original_idx| &txns[original_idx]) - .cloned() + ShuffledTransactionIterator::new(self.config.clone()) + .extended_with(txns) .collect() } } diff --git a/consensus/src/transaction_shuffler/use_case_aware/tests.rs b/consensus/src/transaction_shuffler/use_case_aware/tests.rs index dc849a1b89550a..e9cd09283b722b 100644 --- a/consensus/src/transaction_shuffler/use_case_aware/tests.rs +++ b/consensus/src/transaction_shuffler/use_case_aware/tests.rs @@ -3,17 +3,14 @@ use crate::transaction_shuffler::use_case_aware::{ iterator::ShuffledTransactionIterator, - transaction::{UseCaseKey, UseCaseTransaction}, + types::{UseCaseAwareTransaction, UseCaseKey}, Config, }; use itertools::Itertools; use move_core_types::account_address::AccountAddress; -use std::collections::HashMap; -#[derive(Clone, Copy)] struct Contract(usize); -#[derive(Clone, Copy, Eq, Hash, PartialEq)] struct Account(usize); impl Account { @@ -24,12 +21,26 @@ impl Account { } } -struct SeqNum(usize); - struct Transaction { contract: Contract, sender: Account, - seq_num: SeqNum, + original_idx: usize, +} + +impl UseCaseAwareTransaction for Transaction { + fn parse_sender(&self) -> AccountAddress { + self.sender.as_account_address() + } + + fn parse_use_case(&self) -> UseCaseKey { + use UseCaseKey::*; + + match self.contract.0 { + 0 => Platform, + 1 => Others, + c => ContractAddress(Account(c).as_account_address()), + } + } } const PP: Contract = Contract(0); @@ -41,39 +52,34 @@ const A1: Account = Account(1); const A2: Account = Account(2); const A3: Account = Account(3); -fn txns(txns: impl IntoIterator) -> Vec { - let mut account_seq_num = HashMap::new(); - +fn into_txns(txns: impl IntoIterator) -> Vec { + let mut original_idx = 0; txns.into_iter() .map(|(contract, sender)| { - let s = account_seq_num.entry(sender).or_insert(0); - let seq_num = SeqNum(*s); let txn = Transaction { contract, sender, - seq_num, + original_idx, }; - *s += 1; + original_idx += 1; txn }) .collect() } -impl UseCaseTransaction for Transaction { - fn parse_sender(&self) -> AccountAddress { - self.sender.as_account_address() - } - - fn parse_use_case(&self) -> UseCaseKey { - use UseCaseKey::*; - - match self.contract.0 { - 0 => Platform, - 1 => Others, - c => ContractAddress(Account(c).as_account_address()), - } - } +fn assert_shuffle_result( + config: Config, + txns: impl IntoIterator, + expected_order: impl IntoIterator, +) { + let txns = into_txns(txns); + let actual_order = ShuffledTransactionIterator::new(config) + .extended_with(txns) + .map(|txn| txn.original_idx) + .collect_vec(); + let expected_order = expected_order.into_iter().collect_vec(); + assert_eq!(expected_order, actual_order); } #[test] @@ -86,7 +92,7 @@ fn test_spread_by_sender() { user_use_case_spread_factor: 0, }; - let txns = txns([ + let txns = [ // 5 txns from A1 (PP, A1), (OO, A1), @@ -100,12 +106,9 @@ fn test_spread_by_sender() { // 2 txns from A3 (C1, A3), (C1, A3), - ]); + ]; - assert_eq!( - ShuffledTransactionIterator::new(&txns, config).collect_vec(), - vec![0, 5, 8, 1, 6, 9, 2, 7, 3, 4], - ) + assert_shuffle_result(config, txns, [0, 5, 8, 1, 6, 9, 2, 7, 3, 4]); } #[test] @@ -117,7 +120,7 @@ fn test_spread_by_use_case() { user_use_case_spread_factor: 3, }; - let txns = txns([ + let txns = [ // 5 txns from C1 (C1, A1), (C1, A1), @@ -131,15 +134,11 @@ fn test_spread_by_use_case() { // 2 txns from C3 (C3, A3), (C3, A3), - ]); + ]; - assert_eq!( - ShuffledTransactionIterator::new(&txns, config).collect_vec(), - vec![0, 5, 8, 1, 6, 9, 2, 7, 3, 4], - ) + assert_shuffle_result(config, txns, [0, 5, 8, 1, 6, 9, 2, 7, 3, 4]) } -/* #[test] fn test_platform_txn_priority() { let config = Config { @@ -149,7 +148,7 @@ fn test_platform_txn_priority() { user_use_case_spread_factor: 3, }; - let txns = txns([ + let txns = [ // 5 txns from C1 (C1, A1), (C1, A1), @@ -163,11 +162,7 @@ fn test_platform_txn_priority() { // 2 txns from C3 (PP, A3), (PP, A3), - ]); + ]; - assert_eq!( - ShuffledTransactionIterator::new(&txns, config).collect_vec(), - vec![0, 5, 6], - ) + assert_shuffle_result(config, txns, [0, 5, 6]); } -*/ diff --git a/consensus/src/transaction_shuffler/use_case_aware/transaction.rs b/consensus/src/transaction_shuffler/use_case_aware/types.rs similarity index 83% rename from consensus/src/transaction_shuffler/use_case_aware/transaction.rs rename to consensus/src/transaction_shuffler/use_case_aware/types.rs index 6a94f50ae7de1f..197ad98d10a786 100644 --- a/consensus/src/transaction_shuffler/use_case_aware/transaction.rs +++ b/consensus/src/transaction_shuffler/use_case_aware/types.rs @@ -4,7 +4,10 @@ use aptos_types::transaction::SignedTransaction; use move_core_types::account_address::AccountAddress; -#[derive(Clone, Eq, Hash, PartialEq)] +pub(crate) type InputIdx = usize; +pub(crate) type OutputIdx = usize; + +#[derive(Clone, Debug, Eq, Hash, PartialEq)] pub(crate) enum UseCaseKey { Platform, ContractAddress(AccountAddress), @@ -12,13 +15,13 @@ pub(crate) enum UseCaseKey { Others, } -pub(crate) trait UseCaseTransaction { +pub(crate) trait UseCaseAwareTransaction { fn parse_sender(&self) -> AccountAddress; fn parse_use_case(&self) -> UseCaseKey; } -impl UseCaseTransaction for SignedTransaction { +impl UseCaseAwareTransaction for SignedTransaction { fn parse_sender(&self) -> AccountAddress { self.sender() }