Skip to content

Commit

Permalink
Separated into function. Now calculating maximal clique aggregates
Browse files Browse the repository at this point in the history
  • Loading branch information
GeemoCandama committed Jul 26, 2023
1 parent 3d8bbe8 commit 5329233
Showing 1 changed file with 127 additions and 152 deletions.
279 changes: 127 additions & 152 deletions beacon_node/operation_pool/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ mod sync_aggregate_id;

pub use crate::bls_to_execution_changes::ReceivedPreCapella;
pub use attestation::{earliest_attestation_validators, AttMaxCover};
use attestation_storage::{CompactIndexedAttestation, AttestationDataMap, CompactAttestationData};
use attestation_storage::{CompactIndexedAttestation, CompactAttestationData, AttestationDataMap};
pub use attestation_storage::{AttestationRef, SplitAttestation};
use bron_kerbosch::bron_kerbosch;
pub use max_cover::MaxCover;
Expand Down Expand Up @@ -248,21 +248,88 @@ impl<T: EthSpec> OperationPool<T> {
checkpoint_key: &'a CheckpointKey,
all_attestations: &'a AttestationMap<T>,
state: &'a BeaconState<T>,
reward_cache: &'a RewardCache,
total_active_balance: u64,
validity_filter: impl FnMut(&AttestationRef<'a, T>) -> bool + Send,
mut validity_filter: impl FnMut(&AttestationRef<'a, T>) -> bool + Send,
num_valid: &mut i64,
spec: &'a ChainSpec,
) -> impl Iterator<Item = AttMaxCover<'a, T>> + Send {
// all_attestations
// .get_attestations(checkpoint_key)
// .filter(|att| {
// att.data.slot + spec.min_attestation_inclusion_delay <= state.slot()
// && state.slot() <= att.data.slot + T::slots_per_epoch()
// })
// .filter(validity_filter)
// .filter_map(move |att| {
// AttMaxCover::new(att, state, reward_cache, total_active_balance, spec)
// })
) -> Vec<(&CompactAttestationData, CompactIndexedAttestation<T>)> {
let mut cliqued_atts: Vec<(&CompactAttestationData, CompactIndexedAttestation<T>)> = vec![];
if let Some(AttestationDataMap { aggregate_attestations, unaggregate_attestations })
= all_attestations.get_attestation_map(checkpoint_key) {
for (data, aggregates) in aggregate_attestations {
if data.slot + spec.min_attestation_inclusion_delay <= state.slot()
&& state.slot() <= data.slot + T::slots_per_epoch() {
let aggregates: Vec<&CompactIndexedAttestation<T>> = aggregates
.iter()
.filter(|indexed| validity_filter(&AttestationRef {
checkpoint: checkpoint_key,
data: &data,
indexed
}))
.collect();
*num_valid += aggregates.len() as i64;

let cliques = bron_kerbosch(&aggregates, is_compatible);

// This assumes that the values from bron_kerbosch are valid indices of
// aggregates.
let mut clique_aggregates = cliques
.iter()
.map(|clique| {
let mut res_att = aggregates[clique[0]].clone();
for ind in clique.iter().skip(1) {
res_att.aggregate(&aggregates[*ind]);
}
res_att
});

if let Some(unaggregate_attestations) = unaggregate_attestations.get(&data) {
for attestation in unaggregate_attestations
.iter()
.filter(|indexed| validity_filter(&AttestationRef {
checkpoint: checkpoint_key,
data: &data,
indexed
}))
{
*num_valid += 1;
for mut clique_aggregate in &mut clique_aggregates {
if !clique_aggregate.attesting_indices.contains(&attestation.attesting_indices[0]) {
clique_aggregate.aggregate(attestation);
}
}
}
}

cliqued_atts.extend(
clique_aggregates
.map(|indexed| (data, indexed))
);
}
}
for (data, attestations) in unaggregate_attestations {
if data.slot + spec.min_attestation_inclusion_delay <= state.slot()
&& state.slot() <= data.slot + T::slots_per_epoch() {
if !aggregate_attestations.contains_key(&data) {
let mut valid_attestations = attestations
.iter()
.filter(|indexed| validity_filter(&AttestationRef {
checkpoint: checkpoint_key,
data: &data,
indexed
}));

if let Some(first) = valid_attestations.next() {
let mut agg = first.clone();
for attestation in valid_attestations {
agg.aggregate(&attestation);
}
cliqued_atts.push((data, agg));
}
}
}
}
}
cliqued_atts
}

/// Get a list of attestations for inclusion in a block.
Expand All @@ -274,8 +341,8 @@ impl<T: EthSpec> OperationPool<T> {
pub fn get_attestations(
&self,
state: &BeaconState<T>,
mut prev_epoch_validity_filter: impl for<'a> FnMut(&AttestationRef<'a, T>) -> bool + Send,
mut curr_epoch_validity_filter: impl for<'a> FnMut(&AttestationRef<'a, T>) -> bool + Send,
prev_epoch_validity_filter: impl for<'a> FnMut(&AttestationRef<'a, T>) -> bool + Send,
curr_epoch_validity_filter: impl for<'a> FnMut(&AttestationRef<'a, T>) -> bool + Send,
spec: &ChainSpec,
) -> Result<Vec<Attestation<T>>, OpPoolError> {
// Attestations for the current fork, which may be from the current or previous epoch.
Expand All @@ -297,142 +364,50 @@ impl<T: EthSpec> OperationPool<T> {
let mut num_prev_valid = 0_i64;
let mut num_curr_valid = 0_i64;

let mut prev_cliqued_atts: Vec<(&CompactAttestationData, CompactIndexedAttestation<T>)> = vec![];

if let Some(prev_attestation_map) = all_attestations.get_attestation_map(&prev_epoch_key) {
for (data, aggregates) in prev_attestation_map.aggregate_attestations.iter() {
let aggregates: Vec<&CompactIndexedAttestation<T>> = aggregates
.iter()
.map(|indexed| AttestationRef {
checkpoint: &prev_epoch_key,
data: &data,
indexed,
})
.filter(|att| {
att.data.slot + spec.min_attestation_inclusion_delay <= state.slot()
&& state.slot() <= att.data.slot + T::slots_per_epoch()
})
.filter(&mut prev_epoch_validity_filter)
.map(|att_ref| att_ref.indexed)
.collect();
num_prev_valid += aggregates.len() as i64;


let cliques = bron_kerbosch(&aggregates, is_compatible);

let mut clique_aggregates: Vec<CompactIndexedAttestation<T>> = cliques
.iter()
.map(|clique| {
let mut res_att = aggregates[clique[0]].clone();
for ind in clique.iter().skip(1) {
res_att.aggregate(&aggregates[*ind]);
}
res_att
})
.collect();

// DONT ACTUALLY USE THIS
let empty_vec = vec![];
let unaggregate_attestations: Vec<&CompactIndexedAttestation<T>> = prev_attestation_map.unaggregate_attestations
.get(&data)
.or_else(|| Some(&empty_vec))
.unwrap()
.iter()
.map(|indexed| AttestationRef {
checkpoint: &prev_epoch_key,
data: &data,
indexed,
})
.filter(|att| {
att.data.slot + spec.min_attestation_inclusion_delay <= state.slot()
&& state.slot() <= att.data.slot + T::slots_per_epoch()
})
.filter(&mut prev_epoch_validity_filter)
.map(|attestation| attestation.indexed)
.collect();

num_prev_valid += unaggregate_attestations.len() as i64;

for attestation in unaggregate_attestations {
for clique_aggregate in &mut clique_aggregates {
if !clique_aggregate.attesting_indices.contains(&attestation.attesting_indices[0]) {
clique_aggregate.aggregate(attestation);
}
}
}

// let clique_aggregates: Vec<AttMaxCover<T>> = clique_aggregates
// .iter()
// .map(|indexed| AttestationRef {
// checkpoint: &prev_epoch_key,
// data: &data,
// indexed,
// })
// .filter_map(move |att| {
// AttMaxCover::new(att, state, &reward_cache, total_active_balance, spec)
// })
// .collect();
let clique_aggregates: Vec<(&CompactAttestationData, CompactIndexedAttestation<T>)> = clique_aggregates
.into_iter()
.map(|indexed| (data, indexed))
.collect();


prev_cliqued_atts.extend(clique_aggregates);
}

for (data, attestations) in &prev_attestation_map.unaggregate_attestations {
if !prev_attestation_map.aggregate_attestations.contains_key(&data) {
let valid_attestations: Vec<&CompactIndexedAttestation<T>> = attestations
.iter()
.map(|indexed| AttestationRef {
checkpoint: &prev_epoch_key,
data: &data,
indexed,
})
.filter(|att| {
att.data.slot + spec.min_attestation_inclusion_delay <= state.slot()
&& state.slot() <= att.data.slot + T::slots_per_epoch()
})
.filter(&mut prev_epoch_validity_filter)
.map(|attestation| attestation.indexed)
.collect();

let mut agg = valid_attestations[0].clone();
for attestation in valid_attestations.iter().skip(1) {
agg.aggregate(&attestation);
}
prev_cliqued_atts.push((data, agg));
}
}
}
let prev_cliqued_atts = if prev_epoch_key != curr_epoch_key {
self.get_clique_aggregate_attestations_for_epoch(
&prev_epoch_key,
&*all_attestations,
state,
prev_epoch_validity_filter,
&mut num_prev_valid,
spec
)
} else {
vec![]
};

let prev_cliqued_atts_max_cover: Vec<AttMaxCover<T>> = prev_cliqued_atts
.iter()
.map(|(data, indexed)| AttestationRef {
checkpoint: &prev_epoch_key,
data,
indexed,
})
.filter_map(|att| {
AttMaxCover::new(att, state, &reward_cache, total_active_balance, spec)
})
.collect();
let prev_epoch_cliqued_atts: Vec<AttMaxCover<T>> = prev_cliqued_atts.iter()
.map(|(data, indexed)| AttestationRef {
checkpoint: &prev_epoch_key,
data,
indexed,
})
.filter_map(|att| {
AttMaxCover::new(att, state, &reward_cache, total_active_balance, spec)
})
.collect();

let curr_cliqued_atts = self.get_clique_aggregate_attestations_for_epoch(
&curr_epoch_key,
&*all_attestations,
state,
curr_epoch_validity_filter,
&mut num_curr_valid,
spec
);

let curr_epoch_cliqued_atts: Vec<AttMaxCover<T>> = curr_cliqued_atts.iter()
.map(|(data, indexed)| AttestationRef {
checkpoint: &prev_epoch_key,
data,
indexed,
})
.filter_map(|att| {
AttMaxCover::new(att, state, &reward_cache, total_active_balance, spec)
})
.collect();

let curr_epoch_att: Vec<AttMaxCover<_>> = self
.get_valid_attestations_for_epoch(
&curr_epoch_key,
&*all_attestations,
state,
&reward_cache,
total_active_balance,
curr_epoch_validity_filter,
spec,
)
.collect();
num_curr_valid += curr_epoch_att.len() as i64;
// .inspect(|_| num_curr_valid += 1);

let prev_epoch_limit = if let BeaconState::Base(base_state) = state {
std::cmp::min(
Expand All @@ -451,13 +426,13 @@ impl<T: EthSpec> OperationPool<T> {
if prev_epoch_key == curr_epoch_key {
vec![]
} else {
maximum_cover(prev_cliqued_atts_max_cover, prev_epoch_limit, "prev_epoch_attestations")
maximum_cover(prev_epoch_cliqued_atts, prev_epoch_limit, "prev_epoch_attestations")
}
},
move || {
let _timer = metrics::start_timer(&metrics::ATTESTATION_CURR_EPOCH_PACKING_TIME);
maximum_cover(
curr_epoch_att,
curr_epoch_cliqued_atts,
T::MaxAttestations::to_usize(),
"curr_epoch_attestations",
)
Expand Down

0 comments on commit 5329233

Please sign in to comment.