Skip to content

Commit

Permalink
UseCaseAwareShuffler
Browse files Browse the repository at this point in the history
  • Loading branch information
msmouse committed Jun 18, 2024
1 parent 86b9d21 commit cea90c4
Show file tree
Hide file tree
Showing 6 changed files with 552 additions and 0 deletions.
1 change: 1 addition & 0 deletions consensus/src/transaction_shuffler/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ use std::sync::Arc;

mod deprecated_fairness;
mod sender_aware;
mod use_case_aware;

/// Interface to shuffle transactions
pub trait TransactionShuffler: Send + Sync {
Expand Down
259 changes: 259 additions & 0 deletions consensus/src/transaction_shuffler/use_case_aware/iterator.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,259 @@
// Copyright (c) Aptos Foundation
// SPDX-License-Identifier: Apache-2.0

use crate::transaction_shuffler::use_case_aware::{
transaction::{UseCaseKey, UseCaseTransaction},
utils::StrictMap,
Config,
};
use itertools::Itertools;
use std::collections::{BTreeMap, HashMap, VecDeque};

type OriginalIdx = usize;
type OutputIdx = usize;

#[derive(Clone, Copy, Eq, Ord, PartialEq, PartialOrd)]
struct PriorityKey {
try_delay_till: OutputIdx,
original_idx: OriginalIdx,
}

struct Txn {
original_idx: OriginalIdx,
use_case_key: UseCaseKey,
}

impl Txn {
fn new(original_idx: OriginalIdx, use_case_key: UseCaseKey) -> Self {
Self {
original_idx,
use_case_key,
}
}
}

#[derive(Default)]
struct Account {
try_delay_till: OutputIdx,
txns: VecDeque<Txn>,
}

impl Account {
fn new_unselected(txns: Vec<Txn>) -> Self {
Self {
try_delay_till: 0,
txns: txns.into(),
}
}

fn priority_key(&self) -> PriorityKey {
PriorityKey {
try_delay_till: self.try_delay_till,
original_idx: self.expect_first_txn().original_idx,
}
}

fn use_case_key(&self) -> UseCaseKey {
self.expect_first_txn().use_case_key.clone()
}

fn expect_first_txn(&self) -> &Txn {
self.txns.front().expect("Known to exist.")
}

fn pop_first_transaction(&mut self) -> Txn {
self.txns.pop_front().expect("Known to exist.")
}

fn is_empty(&self) -> bool {
self.txns.is_empty()
}

fn update_try_delay_till(&mut self, output_idx: OutputIdx, config: &Config) {
self.try_delay_till = output_idx + 1 + config.sender_spread_factor;
}
}

struct UseCase {
key: UseCaseKey,
try_delay_till: OutputIdx,
/// note: maybe use a heap
account_by_priority: BTreeMap<PriorityKey, Account>,
}

impl UseCase {
fn new_unselected(key: UseCaseKey, accounts: Vec<Account>) -> Self {
let account_by_priority = accounts
.into_iter()
.map(|account| (account.priority_key(), account))
.collect();

Self {
key,
try_delay_till: 0,
account_by_priority,
}
}

fn priority_key(&self) -> PriorityKey {
let mut key = self.expect_first_account().priority_key();
if self.try_delay_till > key.try_delay_till {
key.try_delay_till = self.try_delay_till;
}
key
}

fn expect_first_account(&self) -> &Account {
let (_priority_key, account) = self
.account_by_priority
.first_key_value()
.expect("Known to exist.");
account
}

fn pop_first_account(&mut self) -> Account {
let (_priority_key, account) = self
.account_by_priority
.pop_first()
.expect("Known to exist.");
account
}

fn is_empty(&self) -> bool {
self.account_by_priority.is_empty()
}

fn update_try_delay_till(&mut self, output_idx: OutputIdx, config: &Config) {
let try_delay = match &self.key {
UseCaseKey::Platform => config.platform_use_case_spread_factor,
UseCaseKey::ContractAddress(_) | UseCaseKey::Others => {
config.user_use_case_spread_factor
},
};
self.try_delay_till = output_idx + 1 + try_delay;
}

fn add_account(&mut self, account: Account) {
self.account_by_priority
.strict_insert(account.priority_key(), account)
}
}

pub(super) struct ShuffledTransactionIterator {
use_case_by_key: HashMap<UseCaseKey, UseCase>,
use_case_key_by_priority: BTreeMap<PriorityKey, UseCaseKey>,
output_idx: OutputIdx,
config: Config,
}

impl ShuffledTransactionIterator {
pub(super) fn new(txns: &[impl UseCaseTransaction], config: Config) -> Self {
let txns_by_sender = txns
.iter()
.enumerate()
.map(|(original_idx, transaction)| {
(
transaction.parse_sender(),
Txn::new(original_idx, transaction.parse_use_case()),
)
})
.into_group_map();

let accounts_by_use_case = txns_by_sender
.into_values()
.map(|txns| {
let account = Account::new_unselected(txns);
(account.use_case_key(), account)
})
.into_group_map();

let use_case_by_key: HashMap<_, _> = accounts_by_use_case
.into_iter()
.map(|(use_case_key, accounts)| {
let use_case = UseCase::new_unselected(use_case_key.clone(), accounts);
(use_case_key, use_case)
})
.collect();

let use_case_key_by_priority = use_case_by_key
.iter()
.map(|(key, use_case)| (use_case.priority_key(), key.clone()))
.collect();

Self {
use_case_by_key,
use_case_key_by_priority,
output_idx: 0,
config,
}
}
}

impl Iterator for ShuffledTransactionIterator {
type Item = OriginalIdx;

fn next(&mut self) -> Option<Self::Item> {
if self.use_case_key_by_priority.is_empty() {
return None;
}

// Select highest priority use case and output the first txn from it.
let (_priority_key, use_case_key) = self
.use_case_key_by_priority
.pop_first()
.expect("Known to exist.");
let mut account = self
.use_case_by_key
.expect_mut(&use_case_key)
.pop_first_account();
let txn = account.pop_first_transaction();

// Update account priority (try_delay_till) and potentially move the account to a new use case.
if !account.is_empty() {
account.update_try_delay_till(self.output_idx, &self.config);
let new_use_case_key = account.use_case_key();
if new_use_case_key != use_case_key {
match self.use_case_by_key.get_mut(&new_use_case_key) {
Some(new_use_case) => {
self.use_case_key_by_priority
.strict_remove(&new_use_case.priority_key());

new_use_case.add_account(account);

self.use_case_key_by_priority
.strict_insert(new_use_case.priority_key(), new_use_case_key)
},
None => {
let mut new_use_case =
UseCase::new_unselected(new_use_case_key.clone(), vec![account]);
new_use_case.update_try_delay_till(self.output_idx, &self.config);

self.use_case_key_by_priority
.strict_insert(new_use_case.priority_key(), new_use_case_key.clone());
self.use_case_by_key
.strict_insert(new_use_case_key, new_use_case);
},
}
} else {
// add the account back to the original use case
self.use_case_by_key
.expect_mut(&use_case_key)
.add_account(account);
}
}

// Update priority (try_delay_till) of the selected use case
let use_case = self.use_case_by_key.expect_mut(&use_case_key);
if use_case.is_empty() {
self.use_case_by_key.strict_remove(&use_case_key);
} else {
let priority_key = use_case.priority_key();
use_case.update_try_delay_till(self.output_idx, &self.config);
self.use_case_key_by_priority
.strict_insert(priority_key, use_case_key);
}

self.output_idx += 1;
Some(txn.original_idx)
}
}
33 changes: 33 additions & 0 deletions consensus/src/transaction_shuffler/use_case_aware/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
// Copyright (c) Aptos Foundation
// SPDX-License-Identifier: Apache-2.0

use crate::transaction_shuffler::TransactionShuffler;
use aptos_types::transaction::SignedTransaction;
use iterator::ShuffledTransactionIterator;

mod iterator;
mod transaction;
mod utils;

#[cfg(test)]
mod tests;

#[derive(Clone)]
pub(crate) struct Config {
sender_spread_factor: usize,
platform_use_case_spread_factor: usize,
user_use_case_spread_factor: usize,
}

pub struct UseCaseAwareShuffler {
config: Config,
}

impl TransactionShuffler for UseCaseAwareShuffler {
fn shuffle(&self, txns: Vec<SignedTransaction>) -> Vec<SignedTransaction> {
ShuffledTransactionIterator::new(&txns, self.config.clone())
.map(|original_idx| &txns[original_idx])
.cloned()
.collect()
}
}
Loading

0 comments on commit cea90c4

Please sign in to comment.