Skip to content

Commit

Permalink
bron_kerbosch done in parallel iter without changing the validity clo…
Browse files Browse the repository at this point in the history
…sure type
  • Loading branch information
GeemoCandama committed Oct 18, 2023
1 parent f77c5c5 commit d2bd6af
Showing 1 changed file with 119 additions and 104 deletions.
223 changes: 119 additions & 104 deletions beacon_node/operation_pool/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ use types::{
Epoch, EthSpec, ProposerSlashing, SignedBeaconBlock, SignedBlsToExecutionChange,
SignedVoluntaryExit, Slot, SyncAggregate, SyncCommitteeContribution, Validator,
};
use rayon::prelude::*;

type SyncContributions<T> = RwLock<HashMap<SyncAggregateId, Vec<SyncCommitteeContribution<T>>>>;

Expand Down Expand Up @@ -230,112 +231,120 @@ impl<T: EthSpec> OperationPool<T> {
mut validity_filter: impl FnMut(&AttestationRef<'a, T>) -> bool + Send,
num_valid: &mut i64,
spec: &'a ChainSpec,
) -> Vec<(&CompactAttestationData, CompactIndexedAttestation<T>)> {
let mut cliqued_atts: Vec<(&CompactAttestationData, CompactIndexedAttestation<T>)> = vec![];

) -> Vec<(&CompactAttestationData, Vec<CompactIndexedAttestation<T>>)> {
if let Some(AttestationDataMap {
aggregate_attestations,
unaggregate_attestations,
}) = all_attestations.get_attestation_map(checkpoint_key)
aggregate_attestations,
unaggregate_attestations,
}) = all_attestations.get_attestation_map(checkpoint_key)
{
for (data, aggregates) in aggregate_attestations {
if data.slot + spec.min_attestation_inclusion_delay <= state.slot()
let mut cliques_from_aggregates: Vec<_> = aggregate_attestations.into_iter().filter(|(data, _)| {
data.slot + spec.min_attestation_inclusion_delay <= state.slot()
&& state.slot() <= data.slot + T::slots_per_epoch()
{
let aggregates: Vec<&CompactIndexedAttestation<T>> = aggregates
.iter()
.filter(|indexed| {
validity_filter(&AttestationRef {
checkpoint: checkpoint_key,
data: &data,
indexed,
})
})
.collect();
*num_valid += aggregates.len() as i64;

// aggregate each cliques corresponding attestaiions
let mut clique_aggregates: Vec<CompactIndexedAttestation<_>> = bron_kerbosch(&aggregates, is_compatible)
.iter()
.map(|clique| {
clique.iter().skip(1).fold(aggregates[clique[0]].clone(), |mut acc, &ind| {
acc.aggregate(&aggregates[ind]);
acc
})
})
.map(|(data, aggregates)| {
let aggregates: Vec<&CompactIndexedAttestation<T>> = aggregates
.iter()
.filter(|indexed| {
validity_filter(&AttestationRef {
checkpoint: checkpoint_key,
data: &data,
indexed,
})
.collect();
let mut indices_to_remove = Vec::new();
clique_aggregates
.sort_unstable_by(|a, b| a.attesting_indices.len().cmp(&b.attesting_indices.len()));
for (index, clique) in clique_aggregates.iter().enumerate() {
for bigger_clique in clique_aggregates.iter().skip(index + 1) {
if clique
.aggregation_bits
})
.collect();
*num_valid += aggregates.len() as i64;
(data, aggregates)
})
.collect::<Vec<(&CompactAttestationData, Vec<&CompactIndexedAttestation<_>>)>>()
.into_par_iter()
.map(|(data, aggregates)| {
// aggregate each cliques corresponding attestaiions
let mut clique_aggregates: Vec<CompactIndexedAttestation<_>> = bron_kerbosch(&aggregates, is_compatible)
.iter()
.map(|clique| {
clique.iter().skip(1).fold(aggregates[clique[0]].clone(), |mut acc, &ind| {
acc.aggregate(&aggregates[ind]);
acc
})
})
.collect();
let mut indices_to_remove = Vec::new();
clique_aggregates
.sort_unstable_by(|a, b| a.attesting_indices.len().cmp(&b.attesting_indices.len()));
for (index, clique) in clique_aggregates.iter().enumerate() {
for bigger_clique in clique_aggregates.iter().skip(index + 1) {
if clique
.aggregation_bits
.is_subset(&bigger_clique.aggregation_bits)
{
indices_to_remove.push(index);
break;
}
}
}

for index in indices_to_remove.iter().rev() {
clique_aggregates.swap_remove(*index);
{
indices_to_remove.push(index);
break;
}
}
}

// aggregate unaggregate attestations into the clique aggregates
// if compatible
if let Some(unaggregate_attestations) = unaggregate_attestations.get(&data) {
for attestation in unaggregate_attestations.iter().filter(|indexed| {
validity_filter(&AttestationRef {
checkpoint: checkpoint_key,
data: &data,
indexed,
})
}) {
*num_valid += 1;
for clique_aggregate in &mut clique_aggregates {
if !clique_aggregate
.attesting_indices
for index in indices_to_remove.iter().rev() {
clique_aggregates.swap_remove(*index);
}
(data, clique_aggregates)
})
.collect::<Vec<(&CompactAttestationData, Vec<CompactIndexedAttestation<_>>)>>()
.into_iter()
.map(|(data, mut clique_aggregates)| {
// aggregate unaggregate attestations into the clique aggregates
// if compatible
if let Some(unaggregate_attestations) = unaggregate_attestations.get(&data) {
for attestation in unaggregate_attestations.iter().filter(|indexed| {
validity_filter(&AttestationRef {
checkpoint: checkpoint_key,
data: &data,
indexed,
})
}) {
*num_valid += 1;
for clique_aggregate in &mut clique_aggregates {
if !clique_aggregate
.attesting_indices
.contains(&attestation.attesting_indices[0])
{
clique_aggregate.aggregate(attestation);
}
}
{
clique_aggregate.aggregate(attestation);
}
}
}

cliqued_atts
.extend(clique_aggregates.into_iter().map(|indexed| (data, indexed)));
}
}
(data, clique_aggregates)
})
.collect();

// include aggregated attestations from unaggregated attestations whose
// attestation data doesn't appear in aggregated_attestations
for (data, attestations) in unaggregate_attestations {
if data.slot + spec.min_attestation_inclusion_delay <= state.slot()
&& state.slot() <= data.slot + T::slots_per_epoch()
{
if !aggregate_attestations.contains_key(&data) {
let mut valid_attestations = attestations.iter().filter(|indexed| {
validity_filter(&AttestationRef {
checkpoint: checkpoint_key,
data: &data,
indexed,
})
unaggregate_attestations
.iter()
.filter(|(data, _)| {
data.slot + spec.min_attestation_inclusion_delay <= state.slot()
&& state.slot() <= data.slot + T::slots_per_epoch()
&& !aggregate_attestations.contains_key(&data)
})
.for_each(|(data, unaggregates)| {
let mut valid_attestations = unaggregates.iter().filter(|indexed| {
validity_filter(&AttestationRef {
checkpoint: checkpoint_key,
data: &data,
indexed,
})
});
if let Some(att) = valid_attestations.next() {
let mut att = att.clone();
valid_attestations.for_each(|valid_att| {
att.aggregate(valid_att)
});

if let Some(first) = valid_attestations.next() {
let mut agg = first.clone();
for attestation in valid_attestations {
agg.aggregate(&attestation);
}
cliqued_atts.push((data, agg));
}
cliques_from_aggregates.push((data, vec![att]))
}
}
}
});
cliques_from_aggregates
} else {
vec![]
}
cliqued_atts
}

/// Get a list of attestations for inclusion in a block.
Expand Down Expand Up @@ -386,14 +395,17 @@ impl<T: EthSpec> OperationPool<T> {

let prev_epoch_cliqued_atts = prev_epoch_cliqued_atts
.iter()
.map(|(data, indexed)| AttestationRef {
checkpoint: &prev_epoch_key,
data,
indexed,
.map(|(data, atts)| {
atts.iter().map(|indexed| AttestationRef {
checkpoint: &prev_epoch_key,
data,
indexed,
})
.filter_map(|att| {
AttMaxCover::new(att, state, &reward_cache, total_active_balance, spec)
})
})
.filter_map(|att| {
AttMaxCover::new(att, state, &reward_cache, total_active_balance, spec)
});
.flat_map(|att_max_cover| att_max_cover);

let curr_epoch_cliqued_atts = self.get_clique_aggregate_attestations_for_epoch(
&curr_epoch_key,
Expand All @@ -406,14 +418,17 @@ impl<T: EthSpec> OperationPool<T> {

let curr_epoch_cliqued_atts = curr_epoch_cliqued_atts
.iter()
.map(|(data, indexed)| AttestationRef {
checkpoint: &curr_epoch_key,
data,
indexed,
.map(|(data, atts)| {
atts.iter().map(|indexed| AttestationRef {
checkpoint: &curr_epoch_key,
data,
indexed,
})
.filter_map(|att| {
AttMaxCover::new(att, state, &reward_cache, total_active_balance, spec)
})
})
.filter_map(|att| {
AttMaxCover::new(att, state, &reward_cache, total_active_balance, spec)
});
.flat_map(|att_max_cover| att_max_cover);

let prev_epoch_limit = if let BeaconState::Base(base_state) = state {
std::cmp::min(
Expand Down

0 comments on commit d2bd6af

Please sign in to comment.