Skip to content

Commit

Permalink
Merge branch 'develop' into tooling-dashboard/add-feature-flags
Browse files Browse the repository at this point in the history
  • Loading branch information
evavirseda authored Oct 25, 2024
2 parents 9a27180 + 4cff616 commit 68d912b
Show file tree
Hide file tree
Showing 9 changed files with 82 additions and 407 deletions.
20 changes: 5 additions & 15 deletions consensus/core/src/authority_node.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ use crate::{
core::{Core, CoreSignals},
core_thread::{ChannelCoreThreadDispatcher, CoreThreadHandle},
dag_state::DagState,
leader_schedule::{LeaderSchedule, LeaderSwapTable},
leader_schedule::LeaderSchedule,
leader_timeout::{LeaderTimeoutTask, LeaderTimeoutTaskHandle},
metrics::initialise_metrics,
network::{NetworkClient as _, NetworkManager, tonic_network::TonicManager},
Expand Down Expand Up @@ -202,20 +202,10 @@ where
let block_manager =
BlockManager::new(context.clone(), dag_state.clone(), block_verifier.clone());

let leader_schedule = if context
.protocol_config
.mysticeti_leader_scoring_and_schedule()
{
Arc::new(LeaderSchedule::from_store(
context.clone(),
dag_state.clone(),
))
} else {
Arc::new(LeaderSchedule::new(
context.clone(),
LeaderSwapTable::default(),
))
};
let leader_schedule = Arc::new(LeaderSchedule::from_store(
context.clone(),
dag_state.clone(),
));

let commit_consumer_monitor = commit_consumer.monitor();
let commit_observer = CommitObserver::new(
Expand Down
295 changes: 70 additions & 225 deletions consensus/core/src/core.rs
Original file line number Diff line number Diff line change
Expand Up @@ -533,117 +533,86 @@ impl Core {
.with_label_values(&["Core::try_commit"])
.start_timer();

if !self
.context
.protocol_config
.mysticeti_leader_scoring_and_schedule()
{
let decided_leaders = self.committer.try_decide(self.last_decided_leader);
if let Some(last) = decided_leaders.last() {
self.last_decided_leader = last.slot();
self.context
.metrics
.node_metrics
.last_decided_leader_round
.set(self.last_decided_leader.round as i64);
let mut committed_subdags = Vec::new();
// TODO: Add optimization to abort early without quorum for a round.
loop {
// LeaderSchedule has a limit to how many sequenced leaders can be committed
// before a change is triggered. Calling into leader schedule will get you
// how many commits till next leader change. We will loop back and recalculate
// any discarded leaders with the new schedule.
let mut commits_until_update = self
.leader_schedule
.commits_until_leader_schedule_update(self.dag_state.clone());
if commits_until_update == 0 {
let last_commit_index = self.dag_state.read().last_commit_index();
tracing::info!(
"Leader schedule change triggered at commit index {last_commit_index}"
);
self.leader_schedule.update_leader_schedule(&self.dag_state);
commits_until_update = self
.leader_schedule
.commits_until_leader_schedule_update(self.dag_state.clone());

fail_point!("consensus-after-leader-schedule-change");
}
assert!(commits_until_update > 0);

let committed_leaders = decided_leaders
// TODO: limit commits by commits_until_update, which may be needed when leader
// schedule length is reduced.
let decided_leaders = self.committer.try_decide(self.last_decided_leader);

let Some(last_decided) = decided_leaders.last().cloned() else {
break;
};
tracing::debug!(
"Decided {} leaders and {commits_until_update} commits can be made before next leader schedule change",
decided_leaders.len()
);

let mut sequenced_leaders = decided_leaders
.into_iter()
.filter_map(|leader| leader.into_committed_block())
.collect::<Vec<_>>();
if !committed_leaders.is_empty() {
debug!(
"Committing leaders: {}",
committed_leaders
.iter()
.map(|b| b.reference().to_string())
.join(",")
);
}
self.commit_observer.handle_commit(committed_leaders)
} else {
let mut committed_subdags = Vec::new();
// TODO: Add optimization to abort early without quorum for a round.
loop {
// LeaderSchedule has a limit to how many sequenced leaders can be committed
// before a change is triggered. Calling into leader schedule will get you
// how many commits till next leader change. We will loop back and recalculate
// any discarded leaders with the new schedule.
let mut commits_until_update = self
.leader_schedule
.commits_until_leader_schedule_update(self.dag_state.clone());
if commits_until_update == 0 {
let last_commit_index = self.dag_state.read().last_commit_index();
tracing::info!(
"Leader schedule change triggered at commit index {last_commit_index}"
);
self.leader_schedule.update_leader_schedule(&self.dag_state);
commits_until_update = self
.leader_schedule
.commits_until_leader_schedule_update(self.dag_state.clone());

fail_point!("consensus-after-leader-schedule-change");
}
assert!(commits_until_update > 0);

// TODO: limit commits by commits_until_update, which may be needed when leader
// schedule length is reduced.
let decided_leaders = self.committer.try_decide(self.last_decided_leader);

let Some(last_decided) = decided_leaders.last().cloned() else {
break;
};
tracing::debug!(
"Decided {} leaders and {commits_until_update} commits can be made before next leader schedule change",
decided_leaders.len()
);

let mut sequenced_leaders = decided_leaders
.into_iter()
.filter_map(|leader| leader.into_committed_block())
.collect::<Vec<_>>();

// If the sequenced leaders are truncated to fit the leader schedule, use the
// last sequenced leader as the last decided leader. Otherwise,
// use the last decided leader from try_commit().
let sequenced_leaders = if sequenced_leaders.len() >= commits_until_update {
let _ = sequenced_leaders.split_off(commits_until_update);
self.last_decided_leader = sequenced_leaders.last().unwrap().slot();
sequenced_leaders
} else {
self.last_decided_leader = last_decided.slot();
sequenced_leaders
};

self.context
.metrics
.node_metrics
.last_decided_leader_round
.set(self.last_decided_leader.round as i64);
// If the sequenced leaders are truncated to fit the leader schedule, use the
// last sequenced leader as the last decided leader. Otherwise,
// use the last decided leader from try_commit().
let sequenced_leaders = if sequenced_leaders.len() >= commits_until_update {
let _ = sequenced_leaders.split_off(commits_until_update);
self.last_decided_leader = sequenced_leaders.last().unwrap().slot();
sequenced_leaders
} else {
self.last_decided_leader = last_decided.slot();
sequenced_leaders
};

if sequenced_leaders.is_empty() {
break;
}
tracing::debug!(
"Committing {} leaders: {}",
sequenced_leaders.len(),
sequenced_leaders
.iter()
.map(|b| b.reference().to_string())
.join(",")
);
self.context
.metrics
.node_metrics
.last_decided_leader_round
.set(self.last_decided_leader.round as i64);

// TODO: refcount subdags
let subdags = self.commit_observer.handle_commit(sequenced_leaders)?;
self.dag_state
.write()
.add_unscored_committed_subdags(subdags.clone());
committed_subdags.extend(subdags);
if sequenced_leaders.is_empty() {
break;
}
tracing::debug!(
"Committing {} leaders: {}",
sequenced_leaders.len(),
sequenced_leaders
.iter()
.map(|b| b.reference().to_string())
.join(",")
);

Ok(committed_subdags)
// TODO: refcount subdags
let subdags = self.commit_observer.handle_commit(sequenced_leaders)?;
self.dag_state
.write()
.add_unscored_committed_subdags(subdags.clone());
committed_subdags.extend(subdags);
}

Ok(committed_subdags)
}

pub(crate) fn get_missing_blocks(&self) -> BTreeSet<BlockRef> {
Expand Down Expand Up @@ -964,7 +933,7 @@ mod test {
CommitConsumer, CommitIndex,
block::{TestBlock, genesis_blocks},
block_verifier::NoopBlockVerifier,
commit::{CommitAPI as _, CommitRange},
commit::CommitAPI as _,
leader_scoring::ReputationScores,
storage::{Store, WriteBatch, mem_store::MemStore},
test_dag_builder::DagBuilder,
Expand Down Expand Up @@ -1793,130 +1762,6 @@ mod test {
}
}

#[tokio::test(flavor = "current_thread", start_paused = true)]
async fn test_no_leader_schedule_change() {
telemetry_subscribers::init_for_testing();
let default_params = Parameters::default();

let (mut context, _) = Context::new_for_test(4);
context
.protocol_config
.set_mysticeti_leader_scoring_and_schedule_for_testing(false);
// create the cores and their signals for all the authorities
let mut cores = create_cores(context, vec![1, 1, 1, 1]);

// Now iterate over a few rounds and ensure the corresponding signals are
// created while network advances
let mut last_round_blocks = Vec::new();
for round in 1..=30 {
let mut this_round_blocks = Vec::new();

for core_fixture in &mut cores {
// Wait for min round delay to allow blocks to be proposed.
sleep(default_params.min_round_delay).await;
// add the blocks from last round
// this will trigger a block creation for the round and a signal should be
// emitted
core_fixture
.core
.add_blocks(last_round_blocks.clone())
.unwrap();

// A "new round" signal should be received given that all the blocks of previous
// round have been processed
let new_round = receive(
Duration::from_secs(1),
core_fixture.signal_receivers.new_round_receiver(),
)
.await;
assert_eq!(new_round, round);

// Check that a new block has been proposed.
let block = tokio::time::timeout(
Duration::from_secs(1),
core_fixture.block_receiver.recv(),
)
.await
.unwrap()
.unwrap();
assert_eq!(block.round(), round);
assert_eq!(block.author(), core_fixture.core.context.own_index);

// append the new block to this round blocks
this_round_blocks.push(core_fixture.core.last_proposed_block().clone());

let block = core_fixture.core.last_proposed_block();

// ensure that produced block is referring to the blocks of last_round
assert_eq!(
block.ancestors().len(),
core_fixture.core.context.committee.size()
);
for ancestor in block.ancestors() {
if block.round() > 1 {
// don't bother with round 1 block which just contains the genesis blocks.
assert!(
last_round_blocks
.iter()
.any(|block| block.reference() == *ancestor),
"Reference from previous round should be added"
);
}
}
}

last_round_blocks = this_round_blocks;
}

for core_fixture in cores {
// Check commits have been persisted to store
let last_commit = core_fixture
.store
.read_last_commit()
.unwrap()
.expect("last commit should be set");
// There are 28 leader rounds with rounds completed up to and including
// round 29. Round 30 blocks will only include their own blocks, so the
// 28th leader will not be committed.
assert_eq!(last_commit.index(), 27);
let all_stored_commits = core_fixture
.store
.scan_commits((0..=CommitIndex::MAX).into())
.unwrap();
assert_eq!(all_stored_commits.len(), 27);
assert_eq!(
core_fixture
.core
.leader_schedule
.leader_swap_table
.read()
.bad_nodes
.len(),
0
);
assert_eq!(
core_fixture
.core
.leader_schedule
.leader_swap_table
.read()
.good_nodes
.len(),
0
);
let expected_reputation_scores = ReputationScores::new(CommitRange::default(), vec![]);
assert_eq!(
core_fixture
.core
.leader_schedule
.leader_swap_table
.read()
.reputation_scores,
expected_reputation_scores
);
}
}

#[tokio::test(flavor = "current_thread", start_paused = true)]
async fn test_commit_on_leader_schedule_change_boundary_without_multileader() {
telemetry_subscribers::init_for_testing();
Expand Down
Loading

0 comments on commit 68d912b

Please sign in to comment.