Skip to content

Commit

Permalink
fix
Browse files Browse the repository at this point in the history
  • Loading branch information
msmouse committed Jul 9, 2024
1 parent 3b90acc commit 1789170
Show file tree
Hide file tree
Showing 7 changed files with 64 additions and 52 deletions.
1 change: 0 additions & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 0 additions & 1 deletion consensus/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,6 @@ chrono = { workspace = true }
claims = { workspace = true }
clap = { workspace = true }
dashmap = { workspace = true }
derivative = { workspace = true }
enum_dispatch = { workspace = true }
fail = { workspace = true }
futures = { workspace = true }
Expand Down
47 changes: 39 additions & 8 deletions consensus/src/transaction_shuffler/use_case_aware/delayed_queue.rs
Original file line number Diff line number Diff line change
Expand Up @@ -176,6 +176,8 @@ impl UseCase {
}
}

/// Expects head account to exist (otherwise panic) and return both the DelayKey and the
/// account address for the entry.
fn expect_pop_head_account(&mut self) -> (DelayKey, AccountAddress) {
let (account_delay_key, address) = self.account_by_delay.pop_first().expect("Must exist.");
if let Some((next_account_delay_key, _)) = self.account_by_delay.first_key_value() {
Expand All @@ -202,20 +204,45 @@ impl UseCase {
}
}

/// Structure to track:
/// 1. all use cases and accounts that are subject to delaying, no matter they have pending txns
/// associated or not.
/// 2. all txns that are examined and delayed previously.
///
/// * A delayed txn is attached to an account and the account is attached to a priority queue in a use
/// case, which has an entry in the main priority queue.
/// * Empty accounts and use cases are still tracked for the delay so that a next txn in the
/// input stream is properly delayed if associated with such an account or use case.
#[derive(Debug, Default)]
pub(crate) struct DelayedQueue<Txn> {
/// Registry of all accounts, each of which includes the expected output_idx to delay until and
/// a queue (might be empty) of txns by that sender.
///
/// An empty account address is tracked in `account_placeholders_by_delay` while a non-empty
/// account address is tracked under `use_cases`.
accounts: HashMap<AccountAddress, Account<Txn>>,
/// Registry of all use cases, each of which includes the expected output_idx to delay until and
/// a priority queue (might be empty) of non-empty accounts whose head txn belongs to that use case.
///
/// An empty use case is tracked in `use_case_placeholders_by_delay` while a non-empty use case
/// is tracked in the top level `use_cases_by_delay`.
use_cases: HashMap<UseCaseKey, UseCase>,

/// empty accounts (those w/o known delayed txns), kept to track the delay
/// Main delay queue of txns. All use cases are non-empty of non-empty accounts.
/// All pending txns are reachable from this nested structure.
///
/// The DelayKey is derived from the head account's DelayKey combined with the use case's own
/// DelayKey.
///
/// The head txn of the head account of the head use case in this nested structure is the
/// next txn to be possibly ready.
use_cases_by_delay: BTreeMap<DelayKey, UseCaseKey>,
/// Empty account addresses by the DelayKey (those w/o known delayed txns), kept to track the delay.
account_placeholders_by_delay: BTreeMap<DelayKey, AccountAddress>,
/// empty use cases (those w/o known delayed txns), kept to track the delay
/// Empty UseCaseKeys by the DelayKey (those w/o known delayed txns), kept to track the delay.
use_case_placeholders_by_delay: BTreeMap<DelayKey, UseCaseKey>,

/// main delay queue, all use cases are non-empty of non-empty accounts
use_cases_by_delay: BTreeMap<DelayKey, UseCaseKey>,

/// externally set output index; when an item has try_delay_till <= output_idx, it's deemed ready
/// Externally set output index; when an item has try_delay_till <= output_idx, it's deemed ready
output_idx: OutputIdx,

config: Config,
Expand Down Expand Up @@ -263,8 +290,12 @@ where
}

pub fn bump_output_idx(&mut self, output_idx: OutputIdx) {
self.output_idx = output_idx;
self.drain_placeholders();
assert!(output_idx >= self.output_idx);
// It's possible that the queue returned nothing last round hence the output idx didn't move.
if output_idx > self.output_idx {
self.output_idx = output_idx;
self.drain_placeholders();
}
}

pub fn pop_head(&mut self, only_if_ready: bool) -> Option<Txn> {
Expand Down
22 changes: 0 additions & 22 deletions consensus/src/transaction_shuffler/use_case_aware/iterator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,48 +43,26 @@ where
}

pub(super) fn select_next_txn_inner(&mut self) -> Option<Txn> {
// println!("\n### {}: selection started.\n", self.output_idx);
// println!("Starting with state:");
// println!("{:#?}\n", self);

self.delayed_queue.bump_output_idx(self.output_idx);
// println!("After bumping the output idx:");
// println!("{:#?}\n", self);

// 1. if anything delayed became ready, return it
if let Some(txn) = self.delayed_queue.pop_head(true) {
// println!(
// "--- {}: Selected {:?} from the delayed queue",
// self.output_idx, txn
// );
return Some(txn);
}

// 2. Otherwise, seek in the input queue for something that shouldn't be delayed due to either
// the sender or the use case.
while let Some(txn) = self.input_queue.pop_front() {
let input_idx = self.input_idx;
// println!(
// "--- {}: examining {:?} from the input queue. input_idx: {input_idx}",
// self.output_idx, txn
// );
self.input_idx += 1;

if let Some(txn) = self.delayed_queue.queue_or_return(input_idx, txn) {
// println!(
// "--- {}: Selected {:?} from the input queue",
// self.output_idx, txn
// );
return Some(txn);
}
}

// 3. If nothing is ready, return the next eligible from the delay queue
self.delayed_queue.pop_head(false)
// println!(
// "--- {}: force select head {:?} from the delay queue",
// self.output_idx, ret
// );
}
}

Expand Down
3 changes: 1 addition & 2 deletions testsuite/forge-cli/src/suites/dag.rs
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,7 @@ fn dag_realistic_env_max_load_test(
}
OnChainExecutionConfig::V4(config_v4) => {
config_v4.block_gas_limit_type = BlockGasLimitType::NoLimit;
config_v4.transaction_shuffler_type = TransactionShufflerType::SenderAwareV2(32);
config_v4.transaction_shuffler_type = TransactionShufflerType::default_for_genesis();
}
}
helm_values["chain"]["on_chain_execution_config"] =
Expand Down Expand Up @@ -207,7 +207,6 @@ fn dag_reconfig_enable_test() -> ForgeConfig {
}
OnChainExecutionConfig::V4(config_v4) => {
config_v4.block_gas_limit_type = BlockGasLimitType::NoLimit;
config_v4.transaction_shuffler_type = TransactionShufflerType::SenderAwareV2(32);
}
}
helm_values["chain"]["on_chain_execution_config"] =
Expand Down
2 changes: 1 addition & 1 deletion testsuite/smoke-test/src/aptos_cli/validator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -388,7 +388,7 @@ async fn test_onchain_shuffling_change() {

assert_eq!(
current_execution_config.transaction_shuffler_type(),
TransactionShufflerType::SenderAwareV2(32),
TransactionShufflerType::default_for_genesis(),
);

assert_reordering(&mut swarm, true).await;
Expand Down
40 changes: 23 additions & 17 deletions types/src/on_chain_config/execution_config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -70,11 +70,7 @@ impl OnChainExecutionConfig {
/// Features that are ready for deployment can be enabled here.
pub fn default_for_genesis() -> Self {
OnChainExecutionConfig::V4(ExecutionConfigV4 {
transaction_shuffler_type: TransactionShufflerType::UseCaseAware {
sender_spread_factor: 32,
platform_use_case_spread_factor: 0,
user_use_case_spread_factor: 32,
},
transaction_shuffler_type: TransactionShufflerType::default_for_genesis(),
block_gas_limit_type: BlockGasLimitType::default_for_genesis(),
transaction_deduper_type: TransactionDeduperType::TxnHashAndAuthenticatorV1,
})
Expand Down Expand Up @@ -164,6 +160,16 @@ pub enum TransactionShufflerType {
},
}

impl TransactionShufflerType {
pub fn default_for_genesis() -> Self {
TransactionShufflerType::UseCaseAware {
sender_spread_factor: 32,
platform_use_case_spread_factor: 0,
user_use_case_spread_factor: 4,
}
}
}

#[derive(Clone, Debug, Deserialize, Eq, PartialEq, Serialize)]
#[serde(rename_all = "snake_case")] // cannot use tag = "type" as nested enums cannot work, and bcs doesn't support it
pub enum TransactionDeduperType {
Expand Down Expand Up @@ -346,46 +352,46 @@ mod test {
#[test]
fn test_config_serialization() {
let config = OnChainExecutionConfig::V1(ExecutionConfigV1 {
transaction_shuffler_type: TransactionShufflerType::SenderAwareV2(32),
transaction_shuffler_type: TransactionShufflerType::default_for_genesis(),
});

let s = serde_yaml::to_string(&config).unwrap();
let result = serde_yaml::from_str::<OnChainExecutionConfig>(&s).unwrap();
assert!(matches!(
assert_eq!(
result.transaction_shuffler_type(),
TransactionShufflerType::SenderAwareV2(32)
));
TransactionShufflerType::default_for_genesis(),
);

// V2 test with random per-block gas limit
let rand_gas_limit = rand::thread_rng().gen_range(0, 1000000) as u64;
let config = OnChainExecutionConfig::V2(ExecutionConfigV2 {
transaction_shuffler_type: TransactionShufflerType::SenderAwareV2(32),
transaction_shuffler_type: TransactionShufflerType::default_for_genesis(),
block_gas_limit: Some(rand_gas_limit),
});

let s = serde_yaml::to_string(&config).unwrap();
let result = serde_yaml::from_str::<OnChainExecutionConfig>(&s).unwrap();
assert!(matches!(
assert_eq!(
result.transaction_shuffler_type(),
TransactionShufflerType::SenderAwareV2(32)
));
TransactionShufflerType::default_for_genesis(),
);
assert_eq!(
result.block_gas_limit_type(),
BlockGasLimitType::Limit(rand_gas_limit)
);

// V2 test with no per-block gas limit
let config = OnChainExecutionConfig::V2(ExecutionConfigV2 {
transaction_shuffler_type: TransactionShufflerType::SenderAwareV2(32),
transaction_shuffler_type: TransactionShufflerType::default_for_genesis(),
block_gas_limit: None,
});

let s = serde_yaml::to_string(&config).unwrap();
let result = serde_yaml::from_str::<OnChainExecutionConfig>(&s).unwrap();
assert!(matches!(
assert_eq!(
result.transaction_shuffler_type(),
TransactionShufflerType::SenderAwareV2(32)
));
TransactionShufflerType::default_for_genesis(),
);
assert_eq!(result.block_gas_limit_type(), BlockGasLimitType::NoLimit);
}

Expand Down

0 comments on commit 1789170

Please sign in to comment.