diff --git a/beacon_node/beacon_chain/src/beacon_chain.rs b/beacon_node/beacon_chain/src/beacon_chain.rs index d4f8b57add2..14636d974eb 100644 --- a/beacon_node/beacon_chain/src/beacon_chain.rs +++ b/beacon_node/beacon_chain/src/beacon_chain.rs @@ -4724,13 +4724,17 @@ impl BeaconChain { let attestation_packing_timer = metrics::start_timer(&metrics::BLOCK_PRODUCTION_ATTESTATION_TIMES); - let mut prev_filter_cache = HashMap::new(); + let prev_chain = self.clone(); + let prev_filter_cache_lock = Mutex::new(HashMap::new()); let prev_attestation_filter = |att: &AttestationRef| { - self.filter_op_pool_attestation(&mut prev_filter_cache, att, &state) + let mut prev_filter_cache = prev_filter_cache_lock.lock(); + prev_chain.filter_op_pool_attestation(&mut prev_filter_cache, att, &state) }; - let mut curr_filter_cache = HashMap::new(); + let curr_chain = self.clone(); + let curr_filter_cache_lock = Mutex::new(HashMap::new()); let curr_attestation_filter = |att: &AttestationRef| { - self.filter_op_pool_attestation(&mut curr_filter_cache, att, &state) + let mut curr_filter_cache = curr_filter_cache_lock.lock(); + curr_chain.filter_op_pool_attestation(&mut curr_filter_cache, att, &state) }; let mut attestations = self diff --git a/beacon_node/operation_pool/src/attestation.rs b/beacon_node/operation_pool/src/attestation.rs index 14edc32fe0d..9507368c632 100644 --- a/beacon_node/operation_pool/src/attestation.rs +++ b/beacon_node/operation_pool/src/attestation.rs @@ -193,6 +193,7 @@ pub fn earliest_attestation_validators( } else if attestation.checkpoint.target_epoch == state.previous_epoch() { &base_state.previous_epoch_attestations } else { + #[allow(clippy::unwrap_used)] return BitList::with_capacity(0).unwrap(); }; diff --git a/beacon_node/operation_pool/src/attestation_storage.rs b/beacon_node/operation_pool/src/attestation_storage.rs index fa01fd047fb..e077fd9e5e0 100644 --- a/beacon_node/operation_pool/src/attestation_storage.rs +++ b/beacon_node/operation_pool/src/attestation_storage.rs @@ -163,7 +163,7 @@ impl AttestationMap { attestation_map .unaggregate_attestations .entry(data) - .or_insert_with(Vec::new) + .or_default() }; let mut observed = false; for existing_attestation in attestations.iter_mut() { diff --git a/beacon_node/operation_pool/src/bron_kerbosch.rs b/beacon_node/operation_pool/src/bron_kerbosch.rs index 3cb95a47afe..b66ecade037 100644 --- a/beacon_node/operation_pool/src/bron_kerbosch.rs +++ b/beacon_node/operation_pool/src/bron_kerbosch.rs @@ -1,37 +1,48 @@ +use crate::OpPoolError as Error; use rpds::HashTrieSet; + /// Entry point for the Bron-Kerbosh algorithm. Takes a vector of `vertices` of type /// `T : Compatible`. Returns all the maximal cliques (as a matrix of indices) for the graph /// `G = (V,E)` where `V` is `vertices` and `E` encodes the `is_compatible` relationship. pub fn bron_kerbosch bool>( vertices: &[T], is_compatible: F, -) -> Vec> { +) -> Result>, Error> { // create empty vector to store cliques let mut cliques: Vec> = vec![]; - if vertices.len() > 0 { + if !vertices.is_empty() { // build neighbourhoods and degeneracy ordering, also move to index-based reasoning - let neighbourhoods = compute_neighbourhoods(vertices, is_compatible); + let neighbourhoods = + compute_neighbourhoods(vertices, is_compatible).ok_or(Error::BronKerboschLogicError)?; let ordering = degeneracy_order(vertices.len(), &neighbourhoods); let mut publish_clique = |c| cliques.push(c); - for i in 0..ordering.len() { - let vi = ordering[i]; - let p = (i + 1..ordering.len()) - .filter(|j| neighbourhoods[vi].contains(&ordering[*j])) - .map(|j| ordering[j]) + for (i, &vi) in ordering.iter().enumerate() { + let vi_neighbourhood = neighbourhoods + .get(vi) + .ok_or(Error::BronKerboschLogicError)?; + let p = ordering + .get(i + 1..ordering.len()) + .ok_or(Error::BronKerboschLogicError)? + .iter() + .filter(|pj| vi_neighbourhood.contains(pj)) + .copied() .collect(); let r = HashTrieSet::default().insert(vi); - let x = (0..i) - .filter(|j| neighbourhoods[vi].contains(&ordering[*j])) - .map(|j| ordering[j]) + let x = ordering + .get(0..i) + .ok_or(Error::BronKerboschLogicError)? + .iter() + .filter(|xj| vi_neighbourhood.contains(xj)) + .copied() .collect(); - bron_kerbosch_aux(r, p, x, &neighbourhoods, &mut publish_clique) + bron_kerbosch_aux(r, p, x, &neighbourhoods, &mut publish_clique)?; } } - cliques + Ok(cliques) } /// A function to the neighbourhoods for all nodes in the list. The neighbourhood `N(a)` of a @@ -42,24 +53,24 @@ pub fn bron_kerbosch bool>( fn compute_neighbourhoods bool>( vertices: &[T], is_compatible: F, -) -> Vec> { +) -> Option>> { let mut neighbourhoods = vec![]; neighbourhoods.resize_with(vertices.len(), Vec::new); - for i in 0..vertices.len() - 1 { - for j in i + 1..vertices.len() { - if is_compatible(&vertices[i], &vertices[j]) { - neighbourhoods[i].push(j); - neighbourhoods[j].push(i); + for (i, vi) in vertices.get(0..vertices.len() - 1)?.iter().enumerate() { + for (j, vj) in vertices.get(i + 1..vertices.len())?.iter().enumerate() { + if is_compatible(vi, vj) { + neighbourhoods.get_mut(i)?.push(j); + neighbourhoods.get_mut(j)?.push(i); } } } - neighbourhoods + Some(neighbourhoods) } /// Produces a degeneracy ordering of a set of vertices. fn degeneracy_order(num_vertices: usize, neighbourhoods: &[Vec]) -> Vec { let mut v: Vec = (0..num_vertices).collect(); - v.sort_unstable_by_key(|i| neighbourhoods[*i].len()); + v.sort_unstable_by_key(|i| neighbourhoods.get(*i).map(|n| n.len()).unwrap_or(0)); v } @@ -76,31 +87,37 @@ fn bron_kerbosch_aux( mut x: HashTrieSet, neighbourhoods: &Vec>, publish_clique: &mut F, -) where +) -> Result<(), Error> +where F: FnMut(HashTrieSet), { if p.is_empty() && x.is_empty() { publish_clique(r); - return; + return Ok(()); } - let pivot = find_pivot(&p, &x, neighbourhoods); - let pivot_neighbours = &neighbourhoods[pivot]; + let pivot = find_pivot(&p, &x, neighbourhoods)?; + let pivot_neighbours = neighbourhoods + .get(pivot) + .ok_or(Error::BronKerboschLogicError)?; - let ip = hash_set_filter(&p, |e| pivot_neighbours.contains(e)); + let ip = hash_set_filter(&p, |e| !pivot_neighbours.contains(e)); for v in ip.iter() { - let n_set = &neighbourhoods[*v]; + let n_set = neighbourhoods + .get(*v) + .ok_or(Error::BronKerboschLogicError)?; let nr = r.insert(*v); - let np = hash_set_filter(&p, |e| !n_set.contains(e)); - let nx = hash_set_filter(&x, |e| !n_set.contains(e)); + let np = hash_set_filter(&p, |e| n_set.contains(e)); + let nx = hash_set_filter(&x, |e| n_set.contains(e)); - bron_kerbosch_aux(nr, np, nx, neighbourhoods, publish_clique); + bron_kerbosch_aux(nr, np, nx, neighbourhoods, publish_clique)?; p.remove_mut(v); x.insert_mut(*v); } + Ok(()) } /// Identifies pivot for Bron-Kerbosh pivoting technique. @@ -108,24 +125,31 @@ fn find_pivot( p: &HashTrieSet, x: &HashTrieSet, neighbourhoods: &[Vec], -) -> usize { - *p.iter() +) -> Result { + p.iter() .chain(x.iter()) .min_by_key(|&e| { p.iter() - .filter(|ee| neighbourhoods[*e].contains(ee)) + .filter(|ee| { + neighbourhoods + .get(*e) + .map(|n| n.contains(ee)) + .unwrap_or(false) + }) .count() }) - .unwrap() + .copied() + .ok_or(Error::BronKerboschLogicError) } +/// Store the members of `set` matching `predicate` in a new set. fn hash_set_filter

(set: &HashTrieSet, predicate: P) -> HashTrieSet where P: Fn(&usize) -> bool, { let mut new_set = set.clone(); for e in set.iter() { - if predicate(e) { + if !predicate(e) { new_set.remove_mut(e); } } @@ -140,7 +164,7 @@ mod tests { #[test] fn bron_kerbosch_small_test() { let vertices: Vec = (0..7).collect(); - let edges = vec![ + let edges = [ (0, 1), (0, 2), (0, 3), @@ -159,31 +183,31 @@ mod tests { edges.contains(&(*first, *second)) || edges.contains(&(*first, *second)) }; - println!("{:?}", bron_kerbosch(&vertices, is_compatible)); + println!("{:?}", bron_kerbosch(&vertices, is_compatible).unwrap()); } quickcheck! { fn no_panic(vertices: Vec, adjacencies: HashSet<(usize, usize)>) -> bool { let is_compatible = |i: &usize, j: &usize| adjacencies.contains(&(*i, *j)) || adjacencies.contains(&(*j, *i)); - bron_kerbosch(&vertices, is_compatible); + bron_kerbosch(&vertices, is_compatible).unwrap(); true } fn at_least_one_clique_returned(vertices: Vec, adjacencies: HashSet<(usize, usize)>) -> bool { - if vertices.len() == 0 { + if vertices.is_empty() { return true; } let is_compatible = |i: &usize, j: &usize| adjacencies.contains(&(*i, *j)) || adjacencies.contains(&(*j, *i)); - let res = bron_kerbosch(&vertices, is_compatible); - res.len() >= 1 + let res = bron_kerbosch(&vertices, is_compatible).unwrap(); + !res.is_empty() } fn no_clique_is_empty(vertices: Vec, adjacencies: HashSet<(usize, usize)>) -> bool { - if vertices.len() == 0 { + if vertices.is_empty() { return true; } let is_compatible = |i: &usize, j: &usize| adjacencies.contains(&(*i, *j)) || adjacencies.contains(&(*j, *i)); - let res = bron_kerbosch(&vertices, is_compatible); + let res = bron_kerbosch(&vertices, is_compatible).unwrap(); for clique in res.iter() { if clique.is_empty() { return false; @@ -194,7 +218,7 @@ mod tests { fn all_claimed_cliques_are_cliques(vertices: Vec, adjacencies: HashSet<(usize, usize)>) -> bool { let is_compatible = |i: &usize, j: &usize| adjacencies.contains(&(*i, *j)) || adjacencies.contains(&(*j, *i)); - let claimed_cliques = bron_kerbosch(&vertices, is_compatible); + let claimed_cliques = bron_kerbosch(&vertices, is_compatible).unwrap(); for clique in claimed_cliques { for (ind1, vertex) in clique.iter().enumerate() { for (ind2, other_vertex) in clique.iter().enumerate() { @@ -212,10 +236,10 @@ mod tests { fn no_clique_is_a_subset_of_other_clique(vertices: Vec, adjacencies: HashSet<(usize, usize)>) -> bool { let is_compatible = |i: &usize, j: &usize| adjacencies.contains(&(*i, *j)) || adjacencies.contains(&(*j, *i)); - let claimed_cliques = bron_kerbosch(&vertices, is_compatible); + let claimed_cliques = bron_kerbosch(&vertices, is_compatible).unwrap(); let is_subset = |set1: HashTrieSet, set2: HashTrieSet| -> bool { for vertex in set1.iter() { - if !set2.contains(&vertex) { + if !set2.contains(vertex) { return false; } } diff --git a/beacon_node/operation_pool/src/lib.rs b/beacon_node/operation_pool/src/lib.rs index 47a0ddeba49..2e64a412957 100644 --- a/beacon_node/operation_pool/src/lib.rs +++ b/beacon_node/operation_pool/src/lib.rs @@ -1,3 +1,16 @@ +// Clippy lint set up +#![cfg_attr( + not(test), + deny( + clippy::unwrap_used, + clippy::expect_used, + clippy::panic, + clippy::unreachable, + clippy::todo, + clippy::indexing_slicing + ) +)] + mod attestation; mod attestation_id; mod attestation_storage; @@ -39,6 +52,7 @@ use state_processing::{SigVerifiedOp, VerifyOperation}; use std::collections::{hash_map::Entry, HashMap, HashSet}; use std::marker::PhantomData; use std::ptr; +use std::sync::atomic::{AtomicUsize, Ordering}; use types::{ sync_aggregate::Error as SyncAggregateError, typenum::Unsigned, AbstractExecPayload, Attestation, AttestationData, AttesterSlashing, BeaconState, BeaconStateError, ChainSpec, @@ -79,6 +93,7 @@ pub enum OpPoolError { RewardCacheValidatorUnknown(BeaconStateError), RewardCacheOutOfBounds, IncorrectOpPoolVariant, + BronKerboschLogicError, } #[derive(Default)] @@ -223,145 +238,131 @@ impl OperationPool { /// with resepct to the graph with attestations as vertices and an edge encoding /// compatibility for aggregation. #[allow(clippy::too_many_arguments)] + #[allow(clippy::type_complexity)] fn get_clique_aggregate_attestations_for_epoch<'a>( &'a self, checkpoint_key: &'a CheckpointKey, all_attestations: &'a AttestationMap, state: &'a BeaconState, - mut validity_filter: impl FnMut(&AttestationRef<'a, T>) -> bool + Send, - num_valid: &mut i64, + validity_filter: impl Fn(&AttestationRef<'a, T>) -> bool + Send + Sync, + num_valid: &AtomicUsize, max_vertices: usize, spec: &'a ChainSpec, - ) -> Vec<(&CompactAttestationData, Vec>)> { - if let Some(AttestationDataMap { + ) -> Result>)>, OpPoolError> + { + let Some(AttestationDataMap { aggregate_attestations, unaggregate_attestations, }) = all_attestations.get_attestation_map(checkpoint_key) - { - let mut cliques_from_aggregates: Vec<_> = aggregate_attestations - .into_iter() - .filter(|(data, _)| { - data.slot + spec.min_attestation_inclusion_delay <= state.slot() - && state.slot() <= data.slot + T::slots_per_epoch() - }) - .map(|(data, aggregates)| { - let aggregates: Vec<&CompactIndexedAttestation> = aggregates - .iter() - .filter(|indexed| { - validity_filter(&AttestationRef { - checkpoint: checkpoint_key, - data: &data, - indexed, - }) + else { + return Ok(vec![]); + }; + let mut cliques_from_aggregates: Vec<_> = aggregate_attestations + .into_par_iter() + .filter(|(data, _)| { + data.slot + spec.min_attestation_inclusion_delay <= state.slot() + && state.slot() <= data.slot + T::slots_per_epoch() + }) + .map(|(data, aggregates)| { + let aggregates: Vec<&CompactIndexedAttestation> = aggregates + .iter() + .filter(|indexed| { + validity_filter(&AttestationRef { + checkpoint: checkpoint_key, + data, + indexed, }) - .collect(); - *num_valid += aggregates.len() as i64; - (data, aggregates) - }) - .collect::>)>>() - .into_par_iter() - .map(|(data, mut aggregates)| { - // Take the N aggregates with the highest number of set bits - // This is needed to avoid the bron_kerbosch algorithm generating millions of - // cliques - let aggregates = if aggregates.len() > max_vertices { - let (left, _, _) = aggregates.select_nth_unstable_by_key(max_vertices, |a| { - std::cmp::Reverse(a.aggregation_bits.num_set_bits()) - }); - left - } else { - aggregates.as_slice() - }; - // aggregate each cliques corresponding attestaiions - let mut clique_aggregates: Vec> = - bron_kerbosch(&aggregates, is_compatible) - .iter() - .map(|clique| { - clique.iter().skip(1).fold( - aggregates[*clique.iter().next().unwrap()].clone(), - |mut acc, &ind| { - acc.aggregate(&aggregates[ind]); - acc - }, - ) - }) - .collect(); - let mut indices_to_remove = Vec::new(); - clique_aggregates.sort_unstable_by(|a, b| { - a.attesting_indices.len().cmp(&b.attesting_indices.len()) + }) + .collect(); + num_valid.fetch_add(aggregates.len(), Ordering::Relaxed); + (data, aggregates) + }) + .map(|(data, mut aggregates)| { + // Take the N aggregates with the highest number of set bits + // This is needed to avoid the bron_kerbosch algorithm generating millions of + // cliques + let aggregates = if aggregates.len() > max_vertices { + let (left, _, _) = aggregates.select_nth_unstable_by_key(max_vertices, |a| { + std::cmp::Reverse(a.aggregation_bits.num_set_bits()) }); - for (index, clique) in clique_aggregates.iter().enumerate() { - for bigger_clique in clique_aggregates.iter().skip(index + 1) { - if clique - .aggregation_bits - .is_subset(&bigger_clique.aggregation_bits) - { - indices_to_remove.push(index); - break; - } + left + } else { + aggregates.as_slice() + }; + // aggregate each clique's corresponding attestations + let mut clique_aggregates: Vec> = + bron_kerbosch(aggregates, is_compatible)? + .iter() + .map(|clique| { + aggregate_attestations_by_indices(aggregates, clique.iter().copied()) + .ok_or(OpPoolError::BronKerboschLogicError) + }) + .collect::, _>>()?; + let mut indices_to_remove = Vec::new(); + clique_aggregates.sort_unstable_by_key(|att| att.attesting_indices.len()); + for (index, clique) in clique_aggregates.iter().enumerate() { + for bigger_clique in clique_aggregates.iter().skip(index + 1) { + if clique + .aggregation_bits + .is_subset(&bigger_clique.aggregation_bits) + { + indices_to_remove.push(index); + break; } } + } - for index in indices_to_remove.iter().rev() { - clique_aggregates.swap_remove(*index); - } - (data, clique_aggregates) - }) - .collect::>)>>() - .into_iter() - .map(|(data, mut clique_aggregates)| { - // aggregate unaggregate attestations into the clique aggregates - // if compatible - if let Some(unaggregate_attestations) = unaggregate_attestations.get(&data) { - for attestation in unaggregate_attestations.iter().filter(|indexed| { - validity_filter(&AttestationRef { - checkpoint: checkpoint_key, - data: &data, - indexed, - }) - }) { - *num_valid += 1; - for clique_aggregate in &mut clique_aggregates { - if !clique_aggregate - .attesting_indices - .contains(&attestation.attesting_indices[0]) - { - clique_aggregate.aggregate(attestation); - } - } - } - } - (data, clique_aggregates) - }) - .collect(); - - // include aggregated attestations from unaggregated attestations whose - // attestation data doesn't appear in aggregated_attestations - unaggregate_attestations - .iter() - .filter(|(data, _)| { - data.slot + spec.min_attestation_inclusion_delay <= state.slot() - && state.slot() <= data.slot + T::slots_per_epoch() - && !aggregate_attestations.contains_key(&data) - }) - .for_each(|(data, unaggregates)| { - let mut valid_attestations = unaggregates.iter().filter(|indexed| { + for index in indices_to_remove.iter().rev() { + clique_aggregates.swap_remove(*index); + } + + // aggregate unaggregate attestations into the clique aggregates + // if compatible + if let Some(unaggregate_attestations) = unaggregate_attestations.get(data) { + for attestation in unaggregate_attestations.iter().filter(|indexed| { validity_filter(&AttestationRef { checkpoint: checkpoint_key, - data: &data, + data, indexed, }) - }); - if let Some(att) = valid_attestations.next() { - let mut att = att.clone(); - valid_attestations.for_each(|valid_att| att.aggregate(valid_att)); - cliques_from_aggregates.push((data, vec![att])) + }) { + num_valid.fetch_add(1, Ordering::Relaxed); + for clique_aggregate in &mut clique_aggregates { + if clique_aggregate.signers_disjoint_from(attestation) { + clique_aggregate.aggregate(attestation); + } + } } + } + + Ok((data, clique_aggregates)) + }) + .collect::, OpPoolError>>()?; + + // include aggregated attestations from unaggregated attestations whose + // attestation data doesn't appear in aggregated_attestations + unaggregate_attestations + .iter() + .filter(|(data, _)| { + data.slot + spec.min_attestation_inclusion_delay <= state.slot() + && state.slot() <= data.slot + T::slots_per_epoch() + && !aggregate_attestations.contains_key(data) + }) + .for_each(|(data, unaggregates)| { + let mut valid_attestations = unaggregates.iter().filter(|indexed| { + validity_filter(&AttestationRef { + checkpoint: checkpoint_key, + data, + indexed, + }) }); - cliques_from_aggregates - } else { - vec![] - } + if let Some(att) = valid_attestations.next() { + let mut att = att.clone(); + valid_attestations.for_each(|valid_att| att.aggregate(valid_att)); + cliques_from_aggregates.push((data, vec![att])) + } + }); + Ok(cliques_from_aggregates) } /// Get a list of attestations for inclusion in a block. @@ -373,8 +374,8 @@ impl OperationPool { pub fn get_attestations( &self, state: &BeaconState, - prev_epoch_validity_filter: impl for<'a> FnMut(&AttestationRef<'a, T>) -> bool + Send, - curr_epoch_validity_filter: impl for<'a> FnMut(&AttestationRef<'a, T>) -> bool + Send, + prev_epoch_validity_filter: impl for<'a> Fn(&AttestationRef<'a, T>) -> bool + Send + Sync, + curr_epoch_validity_filter: impl for<'a> Fn(&AttestationRef<'a, T>) -> bool + Send + Sync, max_vertices: usize, spec: &ChainSpec, ) -> Result>, OpPoolError> { @@ -394,8 +395,8 @@ impl OperationPool { // Split attestations for the previous & current epochs, so that we // can optimise them individually in parallel. - let mut num_prev_valid = 0_i64; - let mut num_curr_valid = 0_i64; + let num_prev_valid = AtomicUsize::new(0); + let num_curr_valid = AtomicUsize::new(0); // If we're in the genesis epoch, just use the current epoch attestations. let prev_epoch_cliqued_atts = if prev_epoch_key != curr_epoch_key { @@ -404,53 +405,47 @@ impl OperationPool { &*all_attestations, state, prev_epoch_validity_filter, - &mut num_prev_valid, + &num_prev_valid, max_vertices, spec, - ) + )? } else { vec![] }; - let prev_epoch_cliqued_atts = prev_epoch_cliqued_atts - .iter() - .map(|(data, atts)| { - atts.iter() - .map(|indexed| AttestationRef { - checkpoint: &prev_epoch_key, - data, - indexed, - }) - .filter_map(|att| { - AttMaxCover::new(att, state, &reward_cache, total_active_balance, spec) - }) - }) - .flat_map(|att_max_cover| att_max_cover); + let prev_epoch_cliqued_atts = prev_epoch_cliqued_atts.iter().flat_map(|(data, atts)| { + atts.iter() + .map(|indexed| AttestationRef { + checkpoint: &prev_epoch_key, + data, + indexed, + }) + .filter_map(|att| { + AttMaxCover::new(att, state, &reward_cache, total_active_balance, spec) + }) + }); let curr_epoch_cliqued_atts = self.get_clique_aggregate_attestations_for_epoch( &curr_epoch_key, &*all_attestations, state, curr_epoch_validity_filter, - &mut num_curr_valid, + &num_curr_valid, max_vertices, spec, - ); - - let curr_epoch_cliqued_atts = curr_epoch_cliqued_atts - .iter() - .map(|(data, atts)| { - atts.iter() - .map(|indexed| AttestationRef { - checkpoint: &curr_epoch_key, - data, - indexed, - }) - .filter_map(|att| { - AttMaxCover::new(att, state, &reward_cache, total_active_balance, spec) - }) - }) - .flat_map(|att_max_cover| att_max_cover); + )?; + + let curr_epoch_cliqued_atts = curr_epoch_cliqued_atts.iter().flat_map(|(data, atts)| { + atts.iter() + .map(|indexed| AttestationRef { + checkpoint: &curr_epoch_key, + data, + indexed, + }) + .filter_map(|att| { + AttMaxCover::new(att, state, &reward_cache, total_active_balance, spec) + }) + }); let prev_epoch_limit = if let BeaconState::Base(base_state) = state { std::cmp::min( @@ -486,8 +481,14 @@ impl OperationPool { }, ); - metrics::set_gauge(&metrics::NUM_PREV_EPOCH_ATTESTATIONS, num_prev_valid); - metrics::set_gauge(&metrics::NUM_CURR_EPOCH_ATTESTATIONS, num_curr_valid); + metrics::set_gauge( + &metrics::NUM_PREV_EPOCH_ATTESTATIONS, + num_prev_valid.load(Ordering::Relaxed) as i64, + ); + metrics::set_gauge( + &metrics::NUM_CURR_EPOCH_ATTESTATIONS, + num_curr_valid.load(Ordering::Relaxed) as i64, + ); Ok(max_cover::merge_solutions( curr_cover, @@ -876,6 +877,18 @@ fn is_compatible( x.signers_disjoint_from(y) } +fn aggregate_attestations_by_indices( + attestations: &[&CompactIndexedAttestation], + mut indices: impl Iterator, +) -> Option> { + let first = indices.next()?; + let mut attestation: CompactIndexedAttestation = (*attestations.get(first)?).clone(); + for i in indices { + attestation.aggregate(attestations.get(i)?); + } + Some(attestation) +} + /// Filter up to a maximum number of operations out of an iterator. fn filter_limit_operations<'a, T: 'a, V: 'a, I, F, G>( operations: I, diff --git a/beacon_node/operation_pool/src/max_cover.rs b/beacon_node/operation_pool/src/max_cover.rs index 293fffdf594..c7d52b60562 100644 --- a/beacon_node/operation_pool/src/max_cover.rs +++ b/beacon_node/operation_pool/src/max_cover.rs @@ -68,7 +68,7 @@ where .filter(|x| x.item.score() != 0) .map(|val| (val.item.key(), val)) .fold(HashMap::new(), |mut acc, (key, val)| { - acc.entry(key).or_insert(Vec::new()).push(val); + acc.entry(key).or_default().push(val); acc }); @@ -98,7 +98,7 @@ where // Update the covering sets of the other items, for the inclusion of the selected item. // Items covered by the selected item can't be re-covered. - all_items.get_mut(&best_key).map(|items| { + if let Some(items) = all_items.get_mut(&best_key) { items .iter_mut() .filter(|x| x.available && x.item.score() != 0) @@ -106,7 +106,7 @@ where x.item .update_covering_set(best_item.intermediate(), best_item.covering_set()) }); - }); + } result.push(best_item); } @@ -165,7 +165,7 @@ mod test { self.len() } - fn key(&self) -> () {} + fn key(&self) {} } fn example_system() -> Vec> {