diff --git a/beacon_node/operation_pool/src/lib.rs b/beacon_node/operation_pool/src/lib.rs index e33beebe939..598026316d0 100644 --- a/beacon_node/operation_pool/src/lib.rs +++ b/beacon_node/operation_pool/src/lib.rs @@ -12,7 +12,7 @@ mod sync_aggregate_id; pub use crate::bls_to_execution_changes::ReceivedPreCapella; pub use attestation::{earliest_attestation_validators, AttMaxCover}; -use attestation_storage::{CompactIndexedAttestation, AttestationDataMap, CompactAttestationData}; +use attestation_storage::{CompactIndexedAttestation, CompactAttestationData, AttestationDataMap}; pub use attestation_storage::{AttestationRef, SplitAttestation}; use bron_kerbosch::bron_kerbosch; pub use max_cover::MaxCover; @@ -248,21 +248,88 @@ impl OperationPool { checkpoint_key: &'a CheckpointKey, all_attestations: &'a AttestationMap, state: &'a BeaconState, - reward_cache: &'a RewardCache, - total_active_balance: u64, - validity_filter: impl FnMut(&AttestationRef<'a, T>) -> bool + Send, + mut validity_filter: impl FnMut(&AttestationRef<'a, T>) -> bool + Send, + num_valid: &mut i64, spec: &'a ChainSpec, - ) -> impl Iterator> + Send { - // all_attestations - // .get_attestations(checkpoint_key) - // .filter(|att| { - // att.data.slot + spec.min_attestation_inclusion_delay <= state.slot() - // && state.slot() <= att.data.slot + T::slots_per_epoch() - // }) - // .filter(validity_filter) - // .filter_map(move |att| { - // AttMaxCover::new(att, state, reward_cache, total_active_balance, spec) - // }) + ) -> Vec<(&CompactAttestationData, CompactIndexedAttestation)> { + let mut cliqued_atts: Vec<(&CompactAttestationData, CompactIndexedAttestation)> = vec![]; + if let Some(AttestationDataMap { aggregate_attestations, unaggregate_attestations }) + = all_attestations.get_attestation_map(checkpoint_key) { + for (data, aggregates) in aggregate_attestations { + if data.slot + spec.min_attestation_inclusion_delay <= state.slot() + && state.slot() <= data.slot + T::slots_per_epoch() { + let aggregates: Vec<&CompactIndexedAttestation> = aggregates + .iter() + .filter(|indexed| validity_filter(&AttestationRef { + checkpoint: checkpoint_key, + data: &data, + indexed + })) + .collect(); + *num_valid += aggregates.len() as i64; + + let cliques = bron_kerbosch(&aggregates, is_compatible); + + // This assumes that the values from bron_kerbosch are valid indices of + // aggregates. + let mut clique_aggregates = cliques + .iter() + .map(|clique| { + let mut res_att = aggregates[clique[0]].clone(); + for ind in clique.iter().skip(1) { + res_att.aggregate(&aggregates[*ind]); + } + res_att + }); + + if let Some(unaggregate_attestations) = unaggregate_attestations.get(&data) { + for attestation in unaggregate_attestations + .iter() + .filter(|indexed| validity_filter(&AttestationRef { + checkpoint: checkpoint_key, + data: &data, + indexed + })) + { + *num_valid += 1; + for mut clique_aggregate in &mut clique_aggregates { + if !clique_aggregate.attesting_indices.contains(&attestation.attesting_indices[0]) { + clique_aggregate.aggregate(attestation); + } + } + } + } + + cliqued_atts.extend( + clique_aggregates + .map(|indexed| (data, indexed)) + ); + } + } + for (data, attestations) in unaggregate_attestations { + if data.slot + spec.min_attestation_inclusion_delay <= state.slot() + && state.slot() <= data.slot + T::slots_per_epoch() { + if !aggregate_attestations.contains_key(&data) { + let mut valid_attestations = attestations + .iter() + .filter(|indexed| validity_filter(&AttestationRef { + checkpoint: checkpoint_key, + data: &data, + indexed + })); + + if let Some(first) = valid_attestations.next() { + let mut agg = first.clone(); + for attestation in valid_attestations { + agg.aggregate(&attestation); + } + cliqued_atts.push((data, agg)); + } + } + } + } + } + cliqued_atts } /// Get a list of attestations for inclusion in a block. @@ -274,8 +341,8 @@ impl OperationPool { pub fn get_attestations( &self, state: &BeaconState, - mut prev_epoch_validity_filter: impl for<'a> FnMut(&AttestationRef<'a, T>) -> bool + Send, - mut curr_epoch_validity_filter: impl for<'a> FnMut(&AttestationRef<'a, T>) -> bool + Send, + prev_epoch_validity_filter: impl for<'a> FnMut(&AttestationRef<'a, T>) -> bool + Send, + curr_epoch_validity_filter: impl for<'a> FnMut(&AttestationRef<'a, T>) -> bool + Send, spec: &ChainSpec, ) -> Result>, OpPoolError> { // Attestations for the current fork, which may be from the current or previous epoch. @@ -297,142 +364,50 @@ impl OperationPool { let mut num_prev_valid = 0_i64; let mut num_curr_valid = 0_i64; - let mut prev_cliqued_atts: Vec<(&CompactAttestationData, CompactIndexedAttestation)> = vec![]; - - if let Some(prev_attestation_map) = all_attestations.get_attestation_map(&prev_epoch_key) { - for (data, aggregates) in prev_attestation_map.aggregate_attestations.iter() { - let aggregates: Vec<&CompactIndexedAttestation> = aggregates - .iter() - .map(|indexed| AttestationRef { - checkpoint: &prev_epoch_key, - data: &data, - indexed, - }) - .filter(|att| { - att.data.slot + spec.min_attestation_inclusion_delay <= state.slot() - && state.slot() <= att.data.slot + T::slots_per_epoch() - }) - .filter(&mut prev_epoch_validity_filter) - .map(|att_ref| att_ref.indexed) - .collect(); - num_prev_valid += aggregates.len() as i64; - - - let cliques = bron_kerbosch(&aggregates, is_compatible); - - let mut clique_aggregates: Vec> = cliques - .iter() - .map(|clique| { - let mut res_att = aggregates[clique[0]].clone(); - for ind in clique.iter().skip(1) { - res_att.aggregate(&aggregates[*ind]); - } - res_att - }) - .collect(); - - // DONT ACTUALLY USE THIS - let empty_vec = vec![]; - let unaggregate_attestations: Vec<&CompactIndexedAttestation> = prev_attestation_map.unaggregate_attestations - .get(&data) - .or_else(|| Some(&empty_vec)) - .unwrap() - .iter() - .map(|indexed| AttestationRef { - checkpoint: &prev_epoch_key, - data: &data, - indexed, - }) - .filter(|att| { - att.data.slot + spec.min_attestation_inclusion_delay <= state.slot() - && state.slot() <= att.data.slot + T::slots_per_epoch() - }) - .filter(&mut prev_epoch_validity_filter) - .map(|attestation| attestation.indexed) - .collect(); - - num_prev_valid += unaggregate_attestations.len() as i64; - - for attestation in unaggregate_attestations { - for clique_aggregate in &mut clique_aggregates { - if !clique_aggregate.attesting_indices.contains(&attestation.attesting_indices[0]) { - clique_aggregate.aggregate(attestation); - } - } - } - - // let clique_aggregates: Vec> = clique_aggregates - // .iter() - // .map(|indexed| AttestationRef { - // checkpoint: &prev_epoch_key, - // data: &data, - // indexed, - // }) - // .filter_map(move |att| { - // AttMaxCover::new(att, state, &reward_cache, total_active_balance, spec) - // }) - // .collect(); - let clique_aggregates: Vec<(&CompactAttestationData, CompactIndexedAttestation)> = clique_aggregates - .into_iter() - .map(|indexed| (data, indexed)) - .collect(); - - - prev_cliqued_atts.extend(clique_aggregates); - } - - for (data, attestations) in &prev_attestation_map.unaggregate_attestations { - if !prev_attestation_map.aggregate_attestations.contains_key(&data) { - let valid_attestations: Vec<&CompactIndexedAttestation> = attestations - .iter() - .map(|indexed| AttestationRef { - checkpoint: &prev_epoch_key, - data: &data, - indexed, - }) - .filter(|att| { - att.data.slot + spec.min_attestation_inclusion_delay <= state.slot() - && state.slot() <= att.data.slot + T::slots_per_epoch() - }) - .filter(&mut prev_epoch_validity_filter) - .map(|attestation| attestation.indexed) - .collect(); - - let mut agg = valid_attestations[0].clone(); - for attestation in valid_attestations.iter().skip(1) { - agg.aggregate(&attestation); - } - prev_cliqued_atts.push((data, agg)); - } - } - } + let prev_cliqued_atts = if prev_epoch_key != curr_epoch_key { + self.get_clique_aggregate_attestations_for_epoch( + &prev_epoch_key, + &*all_attestations, + state, + prev_epoch_validity_filter, + &mut num_prev_valid, + spec + ) + } else { + vec![] + }; - let prev_cliqued_atts_max_cover: Vec> = prev_cliqued_atts - .iter() - .map(|(data, indexed)| AttestationRef { - checkpoint: &prev_epoch_key, - data, - indexed, - }) - .filter_map(|att| { - AttMaxCover::new(att, state, &reward_cache, total_active_balance, spec) - }) - .collect(); + let prev_epoch_cliqued_atts: Vec> = prev_cliqued_atts.iter() + .map(|(data, indexed)| AttestationRef { + checkpoint: &prev_epoch_key, + data, + indexed, + }) + .filter_map(|att| { + AttMaxCover::new(att, state, &reward_cache, total_active_balance, spec) + }) + .collect(); + + let curr_cliqued_atts = self.get_clique_aggregate_attestations_for_epoch( + &curr_epoch_key, + &*all_attestations, + state, + curr_epoch_validity_filter, + &mut num_curr_valid, + spec + ); + let curr_epoch_cliqued_atts: Vec> = curr_cliqued_atts.iter() + .map(|(data, indexed)| AttestationRef { + checkpoint: &prev_epoch_key, + data, + indexed, + }) + .filter_map(|att| { + AttMaxCover::new(att, state, &reward_cache, total_active_balance, spec) + }) + .collect(); - let curr_epoch_att: Vec> = self - .get_valid_attestations_for_epoch( - &curr_epoch_key, - &*all_attestations, - state, - &reward_cache, - total_active_balance, - curr_epoch_validity_filter, - spec, - ) - .collect(); - num_curr_valid += curr_epoch_att.len() as i64; - // .inspect(|_| num_curr_valid += 1); let prev_epoch_limit = if let BeaconState::Base(base_state) = state { std::cmp::min( @@ -451,13 +426,13 @@ impl OperationPool { if prev_epoch_key == curr_epoch_key { vec![] } else { - maximum_cover(prev_cliqued_atts_max_cover, prev_epoch_limit, "prev_epoch_attestations") + maximum_cover(prev_epoch_cliqued_atts, prev_epoch_limit, "prev_epoch_attestations") } }, move || { let _timer = metrics::start_timer(&metrics::ATTESTATION_CURR_EPOCH_PACKING_TIME); maximum_cover( - curr_epoch_att, + curr_epoch_cliqued_atts, T::MaxAttestations::to_usize(), "curr_epoch_attestations", )