Skip to content

Commit

Permalink
wen_restart: replace get_aggregate_result() with more methods (#254)
Browse files Browse the repository at this point in the history
* Replace AggregateResult with more methods.

* Rename slots_to_repair() to slots_to_repair_iter().
  • Loading branch information
wen-coding authored and willhickey committed Mar 16, 2024
1 parent 683e107 commit 5d72196
Show file tree
Hide file tree
Showing 2 changed files with 50 additions and 43 deletions.
72 changes: 38 additions & 34 deletions wen-restart/src/last_voted_fork_slots_aggregate.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,11 +22,6 @@ pub struct LastVotedForkSlotsAggregate {
slots_to_repair: HashSet<Slot>,
}

pub struct LastVotedForkSlotsAggregateResult {
pub slots_to_repair: Vec<Slot>,
pub active_percent: f64, /* 0 ~ 100.0 */
}

impl LastVotedForkSlotsAggregate {
pub(crate) fn new(
root_slot: Slot,
Expand Down Expand Up @@ -131,16 +126,16 @@ impl LastVotedForkSlotsAggregate {
Some(record)
}

pub(crate) fn get_aggregate_result(&self) -> LastVotedForkSlotsAggregateResult {
pub(crate) fn active_percent(&self) -> f64 {
let total_stake = self.epoch_stakes.total_stake();
let total_active_stake = self.active_peers.iter().fold(0, |sum: u64, pubkey| {
sum.saturating_add(Self::validator_stake(&self.epoch_stakes, pubkey))
});
let active_percent = total_active_stake as f64 / total_stake as f64 * 100.0;
LastVotedForkSlotsAggregateResult {
slots_to_repair: self.slots_to_repair.iter().cloned().collect(),
active_percent,
}
total_active_stake as f64 / total_stake as f64 * 100.0
}

pub(crate) fn slots_to_repair_iter(&self) -> impl Iterator<Item = &Slot> {
self.slots_to_repair.iter()
}
}

Expand Down Expand Up @@ -237,11 +232,15 @@ mod tests {
}),
);
}
let result = test_state.slots_aggregate.get_aggregate_result();
let mut expected_active_percent =
(initial_num_active_validators + 1) as f64 / TOTAL_VALIDATOR_COUNT as f64 * 100.0;
assert_eq!(result.active_percent, expected_active_percent);
assert!(result.slots_to_repair.is_empty());
assert_eq!(
test_state.slots_aggregate.active_percent(),
(initial_num_active_validators + 1) as f64 / TOTAL_VALIDATOR_COUNT as f64 * 100.0
);
assert!(test_state
.slots_aggregate
.slots_to_repair_iter()
.next()
.is_none());

let new_active_validator = test_state.validator_voting_keypairs
[initial_num_active_validators + 1]
Expand All @@ -267,11 +266,14 @@ mod tests {
wallclock: now,
}),
);
let result = test_state.slots_aggregate.get_aggregate_result();
expected_active_percent =
let expected_active_percent =
(initial_num_active_validators + 2) as f64 / TOTAL_VALIDATOR_COUNT as f64 * 100.0;
assert_eq!(result.active_percent, expected_active_percent);
let mut actual_slots = Vec::from_iter(result.slots_to_repair);
assert_eq!(
test_state.slots_aggregate.active_percent(),
expected_active_percent
);
let mut actual_slots =
Vec::from_iter(test_state.slots_aggregate.slots_to_repair_iter().cloned());
actual_slots.sort();
assert_eq!(actual_slots, test_state.last_voted_fork_slots);

Expand Down Expand Up @@ -299,9 +301,12 @@ mod tests {
wallclock: now,
}),
);
let result = test_state.slots_aggregate.get_aggregate_result();
assert_eq!(result.active_percent, expected_active_percent);
let mut actual_slots = Vec::from_iter(result.slots_to_repair);
assert_eq!(
test_state.slots_aggregate.active_percent(),
expected_active_percent
);
let mut actual_slots =
Vec::from_iter(test_state.slots_aggregate.slots_to_repair_iter().cloned());
actual_slots.sort();
assert_eq!(actual_slots, vec![root_slot + 1]);

Expand All @@ -320,9 +325,12 @@ mod tests {
),
None,
);
let result = test_state.slots_aggregate.get_aggregate_result();
assert_eq!(result.active_percent, expected_active_percent);
let mut actual_slots = Vec::from_iter(result.slots_to_repair);
assert_eq!(
test_state.slots_aggregate.active_percent(),
expected_active_percent
);
let mut actual_slots =
Vec::from_iter(test_state.slots_aggregate.slots_to_repair_iter().cloned());
actual_slots.sort();
assert_eq!(actual_slots, vec![root_slot + 1]);
}
Expand All @@ -339,8 +347,7 @@ mod tests {
last_vote_bankhash: last_vote_bankhash.to_string(),
shred_version: SHRED_VERSION as u32,
};
let result = test_state.slots_aggregate.get_aggregate_result();
assert_eq!(result.active_percent, 10.0);
assert_eq!(test_state.slots_aggregate.active_percent(), 10.0);
assert_eq!(
test_state
.slots_aggregate
Expand All @@ -354,8 +361,7 @@ mod tests {
.unwrap(),
Some(record.clone()),
);
let result = test_state.slots_aggregate.get_aggregate_result();
assert_eq!(result.active_percent, 20.0);
assert_eq!(test_state.slots_aggregate.active_percent(), 20.0);
// Now if you get the same result from Gossip again, it should be ignored.
assert_eq!(
test_state.slots_aggregate.aggregate(
Expand Down Expand Up @@ -399,8 +405,7 @@ mod tests {
}),
);
// percentage doesn't change since it's a replace.
let result = test_state.slots_aggregate.get_aggregate_result();
assert_eq!(result.active_percent, 20.0);
assert_eq!(test_state.slots_aggregate.active_percent(), 20.0);

// Record from validator with zero stake should be ignored.
assert_eq!(
Expand All @@ -419,8 +424,7 @@ mod tests {
None,
);
// percentage doesn't change since the previous aggregate is ignored.
let result = test_state.slots_aggregate.get_aggregate_result();
assert_eq!(result.active_percent, 20.0);
assert_eq!(test_state.slots_aggregate.active_percent(), 20.0);
}

#[test]
Expand Down
21 changes: 12 additions & 9 deletions wen-restart/src/wen_restart.rs
Original file line number Diff line number Diff line change
Expand Up @@ -154,34 +154,37 @@ pub(crate) fn aggregate_restart_last_voted_fork_slots(
.insert(from, record);
}
}
let result = last_voted_fork_slots_aggregate.get_aggregate_result();
// Because all operations on the aggregate are called from this single thread, we can
// fetch all results separately without worrying about them being out of sync. We can
// also use returned iterator without the vector changing underneath us.
let active_percent = last_voted_fork_slots_aggregate.active_percent();
let mut filtered_slots: Vec<Slot>;
{
let my_bank_forks = bank_forks.read().unwrap();
filtered_slots = result
.slots_to_repair
.into_iter()
filtered_slots = last_voted_fork_slots_aggregate
.slots_to_repair_iter()
.filter(|slot| {
if slot <= &root_slot || is_full_slots.contains(slot) {
if *slot <= &root_slot || is_full_slots.contains(*slot) {
return false;
}
let is_full = my_bank_forks
.get(*slot)
.get(**slot)
.map_or(false, |bank| bank.is_frozen());
if is_full {
is_full_slots.insert(*slot);
is_full_slots.insert(**slot);
}
!is_full
})
.cloned()
.collect();
}
filtered_slots.sort();
info!(
"Active peers: {} Slots to repair: {:?}",
result.active_percent, &filtered_slots
active_percent, &filtered_slots
);
if filtered_slots.is_empty()
&& result.active_percent > wait_for_supermajority_threshold_percent as f64
&& active_percent > wait_for_supermajority_threshold_percent as f64
{
*wen_restart_repair_slots.write().unwrap() = vec![];
break;
Expand Down

0 comments on commit 5d72196

Please sign in to comment.