Skip to content

Commit

Permalink
Refactor workflow delegator in transaction generator (#15179)
Browse files Browse the repository at this point in the history
  • Loading branch information
vusirikala authored Nov 4, 2024
1 parent f345832 commit 016e8e9
Showing 1 changed file with 79 additions and 66 deletions.
145 changes: 79 additions & 66 deletions crates/transaction-generator-lib/src/workflow_delegator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ use aptos_sdk::{
types::{transaction::SignedTransaction, LocalAccount},
};
use std::{
cmp,
fmt::Debug,
sync::{
atomic::{AtomicU64, AtomicUsize, Ordering},
Arc,
Expand All @@ -24,9 +24,10 @@ use std::{

#[derive(Clone)]
enum StageTracking {
// stage is externally modified
// Stage is externally modified. This is used by executor benchmark tests
ExternallySet(Arc<AtomicUsize>),
// we move to a next stage when all accounts have finished with the current stage
// We move to a next stage when all accounts have finished with the current stage
// This is used by transaction emitter (forge and tests on mainnet, devnet, testnet)
WhenDone {
stage_counter: Arc<AtomicUsize>,
stage_start_time: Arc<AtomicU64>,
Expand Down Expand Up @@ -70,7 +71,7 @@ impl StageTracking {
///
/// pool_i is filled by gen_i, and consumed by gen_i+1, and so there is one less pools than generators.
///
/// We start with stage 0, which calls gen_0 pool_per_stage times, which populates pool_0 with accounts.
/// We start with stage 0, which calls gen_0 stage_switch_conditions[0].len() times, which populates pool_0 with accounts.
///
/// After that, in stage 1, we call gen_1, which consumes accounts from pool_0, and moves them to pool_1.
/// We do this until pool_0 is empty.
Expand All @@ -86,26 +87,19 @@ impl StageTracking {
struct WorkflowTxnGenerator {
stage: StageTracking,
generators: Vec<Box<dyn TransactionGenerator>>,
pool_per_stage: Vec<Arc<ObjectPool<LocalAccount>>>,
num_for_first_stage: usize,
// Internal counter, so multiple workers (WorkflowTxnGenerator) can coordinate how many times to execute the first stage
completed_for_first_stage: Arc<AtomicUsize>,
stage_switch_conditions: Vec<StageSwitchCondition>,
}

impl WorkflowTxnGenerator {
fn new(
stage: StageTracking,
generators: Vec<Box<dyn TransactionGenerator>>,
pool_per_stage: Vec<Arc<ObjectPool<LocalAccount>>>,
num_for_first_stage: usize,
completed_for_first_stage: Arc<AtomicUsize>,
stage_switch_conditions: Vec<StageSwitchCondition>,
) -> Self {
Self {
stage,
generators,
pool_per_stage,
num_for_first_stage,
completed_for_first_stage,
stage_switch_conditions,
}
}
}
Expand All @@ -114,7 +108,7 @@ impl TransactionGenerator for WorkflowTxnGenerator {
fn generate_transactions(
&mut self,
account: &LocalAccount,
mut num_to_create: usize,
num_to_create: usize,
) -> Vec<SignedTransaction> {
assert_ne!(num_to_create, 0);
let stage = match self.stage.load_current_stage() {
Expand All @@ -128,42 +122,18 @@ impl TransactionGenerator for WorkflowTxnGenerator {
},
};

if stage == 0 {
// We can treat completed_for_first_stage as a stream of indices [0, +inf),
// where we want to execute only first num_for_first_stage (i.e. [0, num_for_first_stage) )
// So here we grab num_to_create "indices" from completed_for_first_stage counter,
// and then skip those that are in [num_for_first_stage, +inf) range.
let prev = self
.completed_for_first_stage
.fetch_add(num_to_create, Ordering::Relaxed);
num_to_create = cmp::min(num_to_create, self.num_for_first_stage.saturating_sub(prev));
}
// if stage is not 0, then grabing from the pool itself, inside of the generator.generate_transactions
// acts as coordinator, as it will generate as many transactions as number of accounts it could grab from the pool.

match &self.stage {
StageTracking::WhenDone {
stage_counter,
stage_start_time,
delay_between_stages,
} => {
if stage == 0 {
if num_to_create == 0 {
info!("TransactionGenerator Workflow: Stage 0 is full with {} accounts, moving to stage 1", self.pool_per_stage.first().unwrap().len());
stage_start_time.store(
StageTracking::current_timestamp() + delay_between_stages.as_secs(),
Ordering::Relaxed,
);
let _ = stage_counter.compare_exchange(
0,
1,
Ordering::Relaxed,
Ordering::Relaxed,
);
return Vec::new();
}
} else if stage < self.pool_per_stage.len()
&& self.pool_per_stage.get(stage - 1).unwrap().len() == 0
if stage < self.stage_switch_conditions.len()
&& self
.stage_switch_conditions
.get(stage)
.unwrap()
.should_switch()
{
info!("TransactionGenerator Workflow: Stage {} has consumed all accounts, moving to stage {}", stage, stage + 1);
stage_start_time.store(
Expand All @@ -180,48 +150,94 @@ impl TransactionGenerator for WorkflowTxnGenerator {
}
},
StageTracking::ExternallySet(_) => {
if stage == 0 && num_to_create == 0 {
if stage >= self.stage_switch_conditions.len()
|| (stage < self.stage_switch_conditions.len()
&& self
.stage_switch_conditions
.get(stage)
.unwrap()
.should_switch())
{
info!("TransactionGenerator Workflow: Stage {} has consumed all accounts, moving to stage {}", stage, stage + 1);
return Vec::new();
}
},
}

sample!(
SampleRate::Duration(Duration::from_secs(2)),
info!("Cur stage: {}, pool sizes: {:?}", stage, self.pool_per_stage.iter().map(|p| p.len()).collect::<Vec<_>>());
info!("Cur stage: {}, stage switch conditions: {:?}", stage, self.stage_switch_conditions);
);

let result = if let Some(generator) = self.generators.get_mut(stage) {
generator.generate_transactions(account, num_to_create)
} else {
Vec::new()
};

if let Some(switch_condition) = self.stage_switch_conditions.get_mut(stage) {
switch_condition.reduce_txn_count(result.len());
}
result
}
}

#[derive(Clone)]
enum StageSwitchCondition {
WhenPoolBecomesEmpty(Arc<ObjectPool<LocalAccount>>),
MaxTransactions(Arc<AtomicUsize>),
}

impl StageSwitchCondition {
fn should_switch(&self) -> bool {
match self {
StageSwitchCondition::WhenPoolBecomesEmpty(pool) => pool.len() == 0,
StageSwitchCondition::MaxTransactions(max) => max.load(Ordering::Relaxed) == 0,
}
}

fn reduce_txn_count(&mut self, count: usize) {
match self {
StageSwitchCondition::WhenPoolBecomesEmpty(_) => {},
StageSwitchCondition::MaxTransactions(max) => {
let current = max.load(Ordering::Relaxed);
if count > current {
max.store(0, Ordering::Relaxed);
} else {
max.fetch_sub(count, Ordering::Relaxed);
}
},
}
}
}
impl Debug for StageSwitchCondition {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
StageSwitchCondition::WhenPoolBecomesEmpty(pool) => {
write!(f, "WhenPoolBecomesEmpty({})", pool.len())
},
StageSwitchCondition::MaxTransactions(max) => {
write!(f, "MaxTransactions({})", max.load(Ordering::Relaxed))
},
}
}
}

pub struct WorkflowTxnGeneratorCreator {
stage: StageTracking,
creators: Vec<Box<dyn TransactionGeneratorCreator>>,
pool_per_stage: Vec<Arc<ObjectPool<LocalAccount>>>,
num_for_first_stage: usize,
completed_for_first_stage: Arc<AtomicUsize>,
stage_switch_conditions: Vec<StageSwitchCondition>,
}

impl WorkflowTxnGeneratorCreator {
fn new(
stage: StageTracking,
creators: Vec<Box<dyn TransactionGeneratorCreator>>,
pool_per_stage: Vec<Arc<ObjectPool<LocalAccount>>>,
num_for_first_stage: usize,
stage_switch_conditions: Vec<StageSwitchCondition>,
) -> Self {
Self {
stage,
creators,
pool_per_stage,
num_for_first_stage,
completed_for_first_stage: Arc::new(AtomicUsize::new(0)),
stage_switch_conditions,
}
}

Expand Down Expand Up @@ -273,7 +289,7 @@ impl WorkflowTxnGeneratorCreator {
txn_executor,
num_modules,
mint_entry_point.package_name(),
Some(20_00000000),
Some(40_0000_0000),
)
.await;

Expand Down Expand Up @@ -327,12 +343,11 @@ impl WorkflowTxnGeneratorCreator {
Some(burnt_pool.clone()),
)),
];
Self::new(
stage_tracking,
creators,
vec![created_pool, minted_pool, burnt_pool],
count,
)
Self::new(stage_tracking, creators, vec![
StageSwitchCondition::MaxTransactions(Arc::new(AtomicUsize::new(count))),
StageSwitchCondition::WhenPoolBecomesEmpty(created_pool),
StageSwitchCondition::WhenPoolBecomesEmpty(minted_pool),
])
},
}
}
Expand All @@ -346,9 +361,7 @@ impl TransactionGeneratorCreator for WorkflowTxnGeneratorCreator {
.iter()
.map(|c| c.create_transaction_generator())
.collect(),
self.pool_per_stage.clone(),
self.num_for_first_stage,
self.completed_for_first_stage.clone(),
self.stage_switch_conditions.clone(),
))
}
}

0 comments on commit 016e8e9

Please sign in to comment.