diff --git a/consensus/core/src/authority_node.rs b/consensus/core/src/authority_node.rs index 3a755b37e66..b07c487bfac 100644 --- a/consensus/core/src/authority_node.rs +++ b/consensus/core/src/authority_node.rs @@ -23,7 +23,7 @@ use crate::{ core::{Core, CoreSignals}, core_thread::{ChannelCoreThreadDispatcher, CoreThreadHandle}, dag_state::DagState, - leader_schedule::{LeaderSchedule, LeaderSwapTable}, + leader_schedule::LeaderSchedule, leader_timeout::{LeaderTimeoutTask, LeaderTimeoutTaskHandle}, metrics::initialise_metrics, network::{NetworkClient as _, NetworkManager, tonic_network::TonicManager}, @@ -202,20 +202,10 @@ where let block_manager = BlockManager::new(context.clone(), dag_state.clone(), block_verifier.clone()); - let leader_schedule = if context - .protocol_config - .mysticeti_leader_scoring_and_schedule() - { - Arc::new(LeaderSchedule::from_store( - context.clone(), - dag_state.clone(), - )) - } else { - Arc::new(LeaderSchedule::new( - context.clone(), - LeaderSwapTable::default(), - )) - }; + let leader_schedule = Arc::new(LeaderSchedule::from_store( + context.clone(), + dag_state.clone(), + )); let commit_consumer_monitor = commit_consumer.monitor(); let commit_observer = CommitObserver::new( diff --git a/consensus/core/src/core.rs b/consensus/core/src/core.rs index f085b9dd23e..8031ddd064a 100644 --- a/consensus/core/src/core.rs +++ b/consensus/core/src/core.rs @@ -533,117 +533,86 @@ impl Core { .with_label_values(&["Core::try_commit"]) .start_timer(); - if !self - .context - .protocol_config - .mysticeti_leader_scoring_and_schedule() - { - let decided_leaders = self.committer.try_decide(self.last_decided_leader); - if let Some(last) = decided_leaders.last() { - self.last_decided_leader = last.slot(); - self.context - .metrics - .node_metrics - .last_decided_leader_round - .set(self.last_decided_leader.round as i64); + let mut committed_subdags = Vec::new(); + // TODO: Add optimization to abort early without quorum for a round. + loop { + // LeaderSchedule has a limit to how many sequenced leaders can be committed + // before a change is triggered. Calling into leader schedule will get you + // how many commits till next leader change. We will loop back and recalculate + // any discarded leaders with the new schedule. + let mut commits_until_update = self + .leader_schedule + .commits_until_leader_schedule_update(self.dag_state.clone()); + if commits_until_update == 0 { + let last_commit_index = self.dag_state.read().last_commit_index(); + tracing::info!( + "Leader schedule change triggered at commit index {last_commit_index}" + ); + self.leader_schedule.update_leader_schedule(&self.dag_state); + commits_until_update = self + .leader_schedule + .commits_until_leader_schedule_update(self.dag_state.clone()); + + fail_point!("consensus-after-leader-schedule-change"); } + assert!(commits_until_update > 0); - let committed_leaders = decided_leaders + // TODO: limit commits by commits_until_update, which may be needed when leader + // schedule length is reduced. + let decided_leaders = self.committer.try_decide(self.last_decided_leader); + + let Some(last_decided) = decided_leaders.last().cloned() else { + break; + }; + tracing::debug!( + "Decided {} leaders and {commits_until_update} commits can be made before next leader schedule change", + decided_leaders.len() + ); + + let mut sequenced_leaders = decided_leaders .into_iter() .filter_map(|leader| leader.into_committed_block()) .collect::>(); - if !committed_leaders.is_empty() { - debug!( - "Committing leaders: {}", - committed_leaders - .iter() - .map(|b| b.reference().to_string()) - .join(",") - ); - } - self.commit_observer.handle_commit(committed_leaders) - } else { - let mut committed_subdags = Vec::new(); - // TODO: Add optimization to abort early without quorum for a round. - loop { - // LeaderSchedule has a limit to how many sequenced leaders can be committed - // before a change is triggered. Calling into leader schedule will get you - // how many commits till next leader change. We will loop back and recalculate - // any discarded leaders with the new schedule. - let mut commits_until_update = self - .leader_schedule - .commits_until_leader_schedule_update(self.dag_state.clone()); - if commits_until_update == 0 { - let last_commit_index = self.dag_state.read().last_commit_index(); - tracing::info!( - "Leader schedule change triggered at commit index {last_commit_index}" - ); - self.leader_schedule.update_leader_schedule(&self.dag_state); - commits_until_update = self - .leader_schedule - .commits_until_leader_schedule_update(self.dag_state.clone()); - - fail_point!("consensus-after-leader-schedule-change"); - } - assert!(commits_until_update > 0); - - // TODO: limit commits by commits_until_update, which may be needed when leader - // schedule length is reduced. - let decided_leaders = self.committer.try_decide(self.last_decided_leader); - - let Some(last_decided) = decided_leaders.last().cloned() else { - break; - }; - tracing::debug!( - "Decided {} leaders and {commits_until_update} commits can be made before next leader schedule change", - decided_leaders.len() - ); - - let mut sequenced_leaders = decided_leaders - .into_iter() - .filter_map(|leader| leader.into_committed_block()) - .collect::>(); - - // If the sequenced leaders are truncated to fit the leader schedule, use the - // last sequenced leader as the last decided leader. Otherwise, - // use the last decided leader from try_commit(). - let sequenced_leaders = if sequenced_leaders.len() >= commits_until_update { - let _ = sequenced_leaders.split_off(commits_until_update); - self.last_decided_leader = sequenced_leaders.last().unwrap().slot(); - sequenced_leaders - } else { - self.last_decided_leader = last_decided.slot(); - sequenced_leaders - }; - self.context - .metrics - .node_metrics - .last_decided_leader_round - .set(self.last_decided_leader.round as i64); + // If the sequenced leaders are truncated to fit the leader schedule, use the + // last sequenced leader as the last decided leader. Otherwise, + // use the last decided leader from try_commit(). + let sequenced_leaders = if sequenced_leaders.len() >= commits_until_update { + let _ = sequenced_leaders.split_off(commits_until_update); + self.last_decided_leader = sequenced_leaders.last().unwrap().slot(); + sequenced_leaders + } else { + self.last_decided_leader = last_decided.slot(); + sequenced_leaders + }; - if sequenced_leaders.is_empty() { - break; - } - tracing::debug!( - "Committing {} leaders: {}", - sequenced_leaders.len(), - sequenced_leaders - .iter() - .map(|b| b.reference().to_string()) - .join(",") - ); + self.context + .metrics + .node_metrics + .last_decided_leader_round + .set(self.last_decided_leader.round as i64); - // TODO: refcount subdags - let subdags = self.commit_observer.handle_commit(sequenced_leaders)?; - self.dag_state - .write() - .add_unscored_committed_subdags(subdags.clone()); - committed_subdags.extend(subdags); + if sequenced_leaders.is_empty() { + break; } + tracing::debug!( + "Committing {} leaders: {}", + sequenced_leaders.len(), + sequenced_leaders + .iter() + .map(|b| b.reference().to_string()) + .join(",") + ); - Ok(committed_subdags) + // TODO: refcount subdags + let subdags = self.commit_observer.handle_commit(sequenced_leaders)?; + self.dag_state + .write() + .add_unscored_committed_subdags(subdags.clone()); + committed_subdags.extend(subdags); } + + Ok(committed_subdags) } pub(crate) fn get_missing_blocks(&self) -> BTreeSet { @@ -964,7 +933,7 @@ mod test { CommitConsumer, CommitIndex, block::{TestBlock, genesis_blocks}, block_verifier::NoopBlockVerifier, - commit::{CommitAPI as _, CommitRange}, + commit::CommitAPI as _, leader_scoring::ReputationScores, storage::{Store, WriteBatch, mem_store::MemStore}, test_dag_builder::DagBuilder, @@ -1793,130 +1762,6 @@ mod test { } } - #[tokio::test(flavor = "current_thread", start_paused = true)] - async fn test_no_leader_schedule_change() { - telemetry_subscribers::init_for_testing(); - let default_params = Parameters::default(); - - let (mut context, _) = Context::new_for_test(4); - context - .protocol_config - .set_mysticeti_leader_scoring_and_schedule_for_testing(false); - // create the cores and their signals for all the authorities - let mut cores = create_cores(context, vec![1, 1, 1, 1]); - - // Now iterate over a few rounds and ensure the corresponding signals are - // created while network advances - let mut last_round_blocks = Vec::new(); - for round in 1..=30 { - let mut this_round_blocks = Vec::new(); - - for core_fixture in &mut cores { - // Wait for min round delay to allow blocks to be proposed. - sleep(default_params.min_round_delay).await; - // add the blocks from last round - // this will trigger a block creation for the round and a signal should be - // emitted - core_fixture - .core - .add_blocks(last_round_blocks.clone()) - .unwrap(); - - // A "new round" signal should be received given that all the blocks of previous - // round have been processed - let new_round = receive( - Duration::from_secs(1), - core_fixture.signal_receivers.new_round_receiver(), - ) - .await; - assert_eq!(new_round, round); - - // Check that a new block has been proposed. - let block = tokio::time::timeout( - Duration::from_secs(1), - core_fixture.block_receiver.recv(), - ) - .await - .unwrap() - .unwrap(); - assert_eq!(block.round(), round); - assert_eq!(block.author(), core_fixture.core.context.own_index); - - // append the new block to this round blocks - this_round_blocks.push(core_fixture.core.last_proposed_block().clone()); - - let block = core_fixture.core.last_proposed_block(); - - // ensure that produced block is referring to the blocks of last_round - assert_eq!( - block.ancestors().len(), - core_fixture.core.context.committee.size() - ); - for ancestor in block.ancestors() { - if block.round() > 1 { - // don't bother with round 1 block which just contains the genesis blocks. - assert!( - last_round_blocks - .iter() - .any(|block| block.reference() == *ancestor), - "Reference from previous round should be added" - ); - } - } - } - - last_round_blocks = this_round_blocks; - } - - for core_fixture in cores { - // Check commits have been persisted to store - let last_commit = core_fixture - .store - .read_last_commit() - .unwrap() - .expect("last commit should be set"); - // There are 28 leader rounds with rounds completed up to and including - // round 29. Round 30 blocks will only include their own blocks, so the - // 28th leader will not be committed. - assert_eq!(last_commit.index(), 27); - let all_stored_commits = core_fixture - .store - .scan_commits((0..=CommitIndex::MAX).into()) - .unwrap(); - assert_eq!(all_stored_commits.len(), 27); - assert_eq!( - core_fixture - .core - .leader_schedule - .leader_swap_table - .read() - .bad_nodes - .len(), - 0 - ); - assert_eq!( - core_fixture - .core - .leader_schedule - .leader_swap_table - .read() - .good_nodes - .len(), - 0 - ); - let expected_reputation_scores = ReputationScores::new(CommitRange::default(), vec![]); - assert_eq!( - core_fixture - .core - .leader_schedule - .leader_swap_table - .read() - .reputation_scores, - expected_reputation_scores - ); - } - } - #[tokio::test(flavor = "current_thread", start_paused = true)] async fn test_commit_on_leader_schedule_change_boundary_without_multileader() { telemetry_subscribers::init_for_testing(); diff --git a/consensus/core/src/dag_state.rs b/consensus/core/src/dag_state.rs index 1d4d8492fce..6ab63a8d445 100644 --- a/consensus/core/src/dag_state.rs +++ b/consensus/core/src/dag_state.rs @@ -127,17 +127,10 @@ impl DagState { last_committed_rounds[block_ref.author] = max(last_committed_rounds[block_ref.author], block_ref.round); } - if context - .protocol_config - .mysticeti_leader_scoring_and_schedule() - { - let committed_subdag = load_committed_subdag_from_store( - store.as_ref(), - commit.clone(), - vec![], - ); // We don't need to recover reputation scores for unscored_committed_subdags - unscored_committed_subdags.push(committed_subdag); - } + + let committed_subdag = + load_committed_subdag_from_store(store.as_ref(), commit.clone(), vec![]); // We don't need to recover reputation scores for unscored_committed_subdags + unscored_committed_subdags.push(committed_subdag); }); } @@ -656,10 +649,7 @@ impl DagState { // We empty the unscored committed subdags to calculate reputation scores. assert!(self.unscored_committed_subdags.is_empty()); - let commit_info = CommitInfo { - committed_rounds: self.last_committed_rounds.clone(), - reputation_scores, - }; + let commit_info = CommitInfo::new(self.last_committed_rounds.clone(), reputation_scores); let last_commit = self .last_commit .as_ref() @@ -741,22 +731,7 @@ impl DagState { // Flush buffered data to storage. let blocks = std::mem::take(&mut self.blocks_to_write); let commits = std::mem::take(&mut self.commits_to_write); - let commit_info_to_write = if self - .context - .protocol_config - .mysticeti_leader_scoring_and_schedule() - { - std::mem::take(&mut self.commit_info_to_write) - } else if commits.is_empty() { - vec![] - } else { - let last_commit_ref = commits.last().as_ref().unwrap().reference(); - let commit_info = CommitInfo::new( - self.last_committed_rounds.clone(), - ReputationScores::default(), - ); - vec![(last_commit_ref, commit_info)] - }; + let commit_info_to_write = std::mem::take(&mut self.commit_info_to_write); if blocks.is_empty() && commits.is_empty() { return; diff --git a/consensus/core/src/leader_schedule.rs b/consensus/core/src/leader_schedule.rs index f5bea6cff07..62b9e522b98 100644 --- a/consensus/core/src/leader_schedule.rs +++ b/consensus/core/src/leader_schedule.rs @@ -535,117 +535,6 @@ mod tests { ); } - #[tokio::test] - async fn test_leader_schedule_from_store_with_no_scores() { - telemetry_subscribers::init_for_testing(); - let mut context = Context::new_for_test(4).0; - context - .protocol_config - .set_mysticeti_leader_scoring_and_schedule_for_testing(false); - let context = Arc::new(context); - let store = Arc::new(MemStore::new()); - - let leader_timestamp = context.clock.timestamp_utc_ms(); - let blocks = vec![ - VerifiedBlock::new_for_test( - TestBlock::new(10, 2) - .set_timestamp_ms(leader_timestamp) - .build(), - ), - VerifiedBlock::new_for_test(TestBlock::new(9, 0).build()), - VerifiedBlock::new_for_test(TestBlock::new(9, 2).build()), - VerifiedBlock::new_for_test(TestBlock::new(9, 3).build()), - ]; - - let leader = blocks[0].clone(); - let leader_ref = leader.reference(); - let last_commit_index = 10; - let last_commit = TrustedCommit::new_for_test( - last_commit_index, - CommitDigest::MIN, - leader_timestamp, - leader_ref, - blocks - .iter() - .map(|block| block.reference()) - .collect::>(), - ); - - // The CommitInfo for the first 10 commits are written to store. This is the - // info that LeaderSchedule will be recovered from - let committed_rounds = vec![9, 9, 10, 9]; - let commit_ref = CommitRef::new(10, CommitDigest::MIN); - let commit_info = CommitInfo { - reputation_scores: ReputationScores::default(), - committed_rounds, - }; - - store - .write( - WriteBatch::default() - .commit_info(vec![(commit_ref, commit_info)]) - .blocks(blocks) - .commits(vec![last_commit]), - ) - .unwrap(); - - // CommitIndex '11' will be written to store. This should result in the cached - // last_committed_rounds & unscored subdags in DagState to be updated with the - // latest commit information on recovery. - let leader_timestamp = context.clock.timestamp_utc_ms(); - let blocks = vec![ - VerifiedBlock::new_for_test( - TestBlock::new(11, 3) - .set_timestamp_ms(leader_timestamp) - .build(), - ), - VerifiedBlock::new_for_test(TestBlock::new(10, 0).build()), - VerifiedBlock::new_for_test(TestBlock::new(10, 1).build()), - VerifiedBlock::new_for_test(TestBlock::new(10, 3).build()), - ]; - - let leader = blocks[0].clone(); - let leader_ref = leader.reference(); - let last_commit_index = 11; - let expected_last_committed_rounds = vec![10, 10, 10, 11]; - let last_commit = TrustedCommit::new_for_test( - last_commit_index, - CommitDigest::MIN, - leader_timestamp, - leader_ref, - blocks - .iter() - .map(|block| block.reference()) - .collect::>(), - ); - store - .write( - WriteBatch::default() - .blocks(blocks) - .commits(vec![last_commit]), - ) - .unwrap(); - - let dag_state = Arc::new(RwLock::new(DagState::new(context.clone(), store))); - - // Check that DagState recovery from stored CommitInfo worked correctly - assert_eq!( - expected_last_committed_rounds, - dag_state.read().last_committed_rounds() - ); - - // Leader Scoring & Schedule Change is disabled, unscored subdags should not be - // accumulated. - assert_eq!(0, dag_state.read().unscored_committed_subdags_count()); - - let leader_schedule = LeaderSchedule::from_store(context.clone(), dag_state.clone()); - - // Check that LeaderSchedule recovery from stored CommitInfo worked correctly - let leader_swap_table = leader_schedule.leader_swap_table.read(); - assert_eq!(leader_swap_table.good_nodes.len(), 0); - assert_eq!(leader_swap_table.bad_nodes.len(), 0); - } - #[tokio::test] async fn test_leader_schedule_from_store() { telemetry_subscribers::init_for_testing(); @@ -692,10 +581,7 @@ mod tests { let reputation_scores = ReputationScores::new(commit_range, vec![4, 1, 1, 3]); let committed_rounds = vec![9, 9, 10, 9]; let commit_ref = expected_commits[9].reference(); - let commit_info = CommitInfo { - reputation_scores, - committed_rounds, - }; + let commit_info = CommitInfo::new(committed_rounds, reputation_scores); // CommitIndex '11' will be written to store. This should result in the cached // last_committed_rounds & unscored subdags in DagState to be updated with the diff --git a/crates/iota-open-rpc/spec/openrpc.json b/crates/iota-open-rpc/spec/openrpc.json index 44dd2ef8b52..6cd8db0c7de 100644 --- a/crates/iota-open-rpc/spec/openrpc.json +++ b/crates/iota-open-rpc/spec/openrpc.json @@ -1305,7 +1305,6 @@ "enable_poseidon": true, "enable_vdf": true, "hardened_otw_check": true, - "mysticeti_leader_scoring_and_schedule": true, "no_extraneous_module_bytes": true, "passkey_auth": true, "rethrow_serialization_type_layout_errors": true, diff --git a/crates/iota-protocol-config/src/lib.rs b/crates/iota-protocol-config/src/lib.rs index 7d62f0910ce..7659fd85068 100644 --- a/crates/iota-protocol-config/src/lib.rs +++ b/crates/iota-protocol-config/src/lib.rs @@ -166,10 +166,6 @@ struct FeatureFlags { #[serde(skip_serializing_if = "Option::is_none")] zklogin_max_epoch_upper_bound_delta: Option, - // Controls leader scoring & schedule change in Mysticeti consensus. - #[serde(skip_serializing_if = "is_false")] - mysticeti_leader_scoring_and_schedule: bool, - // Enable VDF #[serde(skip_serializing_if = "is_false")] enable_vdf: bool, @@ -1037,10 +1033,6 @@ impl ProtocolConfig { self.feature_flags.consensus_network } - pub fn mysticeti_leader_scoring_and_schedule(&self) -> bool { - self.feature_flags.mysticeti_leader_scoring_and_schedule - } - pub fn enable_vdf(&self) -> bool { self.feature_flags.enable_vdf } @@ -1628,8 +1620,6 @@ impl ProtocolConfig { cfg.feature_flags.consensus_choice = ConsensusChoice::Mysticeti; // Use tonic networking for Mysticeti. cfg.feature_flags.consensus_network = ConsensusNetwork::Tonic; - // Enable leader scoring & schedule change on mainnet for mysticeti. - cfg.feature_flags.mysticeti_leader_scoring_and_schedule = true; cfg.feature_flags.per_object_congestion_control_mode = PerObjectCongestionControlMode::TotalTxCount; @@ -1782,10 +1772,6 @@ impl ProtocolConfig { self.feature_flags.bridge = false } - pub fn set_mysticeti_leader_scoring_and_schedule_for_testing(&mut self, val: bool) { - self.feature_flags.mysticeti_leader_scoring_and_schedule = val; - } - pub fn set_passkey_auth_for_testing(&mut self, val: bool) { self.feature_flags.passkey_auth = val } diff --git a/crates/iota-protocol-config/src/snapshots/iota_protocol_config__test__Mainnet_version_1.snap b/crates/iota-protocol-config/src/snapshots/iota_protocol_config__test__Mainnet_version_1.snap index 19cbe528622..40ac4f18320 100644 --- a/crates/iota-protocol-config/src/snapshots/iota_protocol_config__test__Mainnet_version_1.snap +++ b/crates/iota-protocol-config/src/snapshots/iota_protocol_config__test__Mainnet_version_1.snap @@ -7,8 +7,6 @@ feature_flags: consensus_transaction_ordering: ByGasPrice per_object_congestion_control_mode: TotalTxCount zklogin_max_epoch_upper_bound_delta: 30 - mysticeti_leader_scoring_and_schedule: true - rethrow_serialization_type_layout_errors: true max_tx_size_bytes: 131072 max_input_objects: 2048 max_size_written_objects: 5000000 diff --git a/crates/iota-protocol-config/src/snapshots/iota_protocol_config__test__Testnet_version_1.snap b/crates/iota-protocol-config/src/snapshots/iota_protocol_config__test__Testnet_version_1.snap index d9c40394b74..1a37c689667 100644 --- a/crates/iota-protocol-config/src/snapshots/iota_protocol_config__test__Testnet_version_1.snap +++ b/crates/iota-protocol-config/src/snapshots/iota_protocol_config__test__Testnet_version_1.snap @@ -7,8 +7,6 @@ feature_flags: consensus_transaction_ordering: ByGasPrice per_object_congestion_control_mode: TotalTxCount zklogin_max_epoch_upper_bound_delta: 30 - mysticeti_leader_scoring_and_schedule: true - mysticeti_num_leaders_per_round: 1 max_tx_size_bytes: 131072 max_input_objects: 2048 max_size_written_objects: 5000000 diff --git a/crates/iota-protocol-config/src/snapshots/iota_protocol_config__test__version_1.snap b/crates/iota-protocol-config/src/snapshots/iota_protocol_config__test__version_1.snap index 91c43303ac6..2a22e37ff45 100644 --- a/crates/iota-protocol-config/src/snapshots/iota_protocol_config__test__version_1.snap +++ b/crates/iota-protocol-config/src/snapshots/iota_protocol_config__test__version_1.snap @@ -9,9 +9,7 @@ feature_flags: enable_group_ops_native_function_msm: true per_object_congestion_control_mode: TotalTxCount zklogin_max_epoch_upper_bound_delta: 30 - mysticeti_leader_scoring_and_schedule: true enable_vdf: true - mysticeti_num_leaders_per_round: 1 passkey_auth: true authority_capabilities_v2: true max_tx_size_bytes: 131072