diff --git a/Cargo.lock b/Cargo.lock index af9d0a0ad88..35eb0478a2e 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -215,6 +215,15 @@ version = "1.6.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "bddcadddf5e9015d310179a59bb28c4d4b9920ad0f11e8e14dbadf654890c9a6" +[[package]] +name = "archery" +version = "1.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ab7d8a6d00b222909638a01ddcc8c533219e9d5bfada1613afae43481f2fc699" +dependencies = [ + "static_assertions", +] + [[package]] name = "arrayref" version = "0.3.7" @@ -5415,8 +5424,10 @@ dependencies = [ "lighthouse_metrics", "maplit", "parking_lot 0.12.1", + "quickcheck", "rand", "rayon", + "rpds", "serde", "state_processing", "store", @@ -6457,6 +6468,15 @@ dependencies = [ "winapi", ] +[[package]] +name = "rpds" +version = "1.0.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "99334e9410cf4d16241bb88b27bc282e140327a4c4759be76f8a96e6d0cd0f35" +dependencies = [ + "archery", +] + [[package]] name = "rtnetlink" version = "0.10.1" diff --git a/Cargo.toml b/Cargo.toml index 9adb913ff5e..7a29e197c50 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -139,6 +139,7 @@ rayon = "1.7" regex = "1" reqwest = { version = "0.11", default-features = false, features = ["blocking", "json", "stream", "rustls-tls", "native-tls-vendored"] } ring = "0.16" +rpds = "1.0.1" rusqlite = { version = "0.28", features = ["bundled"] } serde = { version = "1", features = ["derive"] } serde_json = "1" diff --git a/beacon_node/beacon_chain/src/beacon_chain.rs b/beacon_node/beacon_chain/src/beacon_chain.rs index f2378b4f9ed..7ed41c7752e 100644 --- a/beacon_node/beacon_chain/src/beacon_chain.rs +++ b/beacon_node/beacon_chain/src/beacon_chain.rs @@ -83,7 +83,10 @@ use futures::channel::mpsc::Sender; use itertools::process_results; use itertools::Itertools; use kzg::Kzg; -use operation_pool::{AttestationRef, OperationPool, PersistedOperationPool, ReceivedPreCapella}; +use operation_pool::{ + CheckpointKey, CompactAttestationData, OperationPool, PersistedOperationPool, + ReceivedPreCapella, +}; use parking_lot::{Mutex, RwLock}; use proto_array::{DoNotReOrg, ProposerHeadError}; use safe_arith::SafeArith; @@ -92,7 +95,6 @@ use slog::{crit, debug, error, info, trace, warn, Logger}; use slot_clock::SlotClock; use ssz::Encode; use state_processing::{ - common::get_attesting_indices_from_state, per_block_processing, per_block_processing::{ errors::AttestationValidationError, get_expected_withdrawals, @@ -167,6 +169,13 @@ const PREPARE_PROPOSER_HISTORIC_EPOCHS: u64 = 4; /// impact whilst having 8 epochs without a block is a comfortable grace period. const MAX_PER_SLOT_FORK_CHOICE_DISTANCE: u64 = 256; +/// The maximum number of aggregates per `AttestationData` to supply to Bron-Kerbosch (BK). +/// +/// This value is chosen to be sufficient for ~16 aggregators on mainnet, and in practice should +/// never be reached. Higher values *could* lead to exponential blow-up in the running time of BK +/// if an attacker found a way to generate a lot of distinct aggregates. +const MAX_AGGREGATES_PER_DATA_FOR_CLIQUES: usize = 20; + /// Reported to the user when the justified block has an invalid execution payload. pub const INVALID_JUSTIFIED_PAYLOAD_SHUTDOWN_REASON: &str = "Justified block has an invalid execution payload."; @@ -2235,15 +2244,16 @@ impl BeaconChain { pub fn filter_op_pool_attestation( &self, filter_cache: &mut HashMap<(Hash256, Epoch), bool>, - att: &AttestationRef, + checkpoint: &CheckpointKey, + data: &CompactAttestationData, state: &BeaconState, ) -> bool { *filter_cache - .entry((att.data.beacon_block_root, att.checkpoint.target_epoch)) + .entry((data.beacon_block_root, checkpoint.target_epoch)) .or_insert_with(|| { self.shuffling_is_compatible( - &att.data.beacon_block_root, - att.checkpoint.target_epoch, + &data.beacon_block_root, + checkpoint.target_epoch, state, ) }) @@ -4753,27 +4763,6 @@ impl BeaconChain { .op_pool .get_bls_to_execution_changes(&state, &self.spec); - // Iterate through the naive aggregation pool and ensure all the attestations from there - // are included in the operation pool. - let unagg_import_timer = - metrics::start_timer(&metrics::BLOCK_PRODUCTION_UNAGGREGATED_TIMES); - for attestation in self.naive_aggregation_pool.read().iter() { - let import = |attestation: &Attestation| { - let attesting_indices = get_attesting_indices_from_state(&state, attestation)?; - self.op_pool - .insert_attestation(attestation.clone(), attesting_indices) - }; - if let Err(e) = import(attestation) { - // Don't stop block production if there's an error, just create a log. - error!( - self.log, - "Attestation did not transfer to op pool"; - "reason" => ?e - ); - } - } - drop(unagg_import_timer); - // Override the beacon node's graffiti with graffiti from the validator, if present. let graffiti = match validator_graffiti { Some(graffiti) => graffiti, @@ -4783,14 +4772,30 @@ impl BeaconChain { let attestation_packing_timer = metrics::start_timer(&metrics::BLOCK_PRODUCTION_ATTESTATION_TIMES); - let mut prev_filter_cache = HashMap::new(); - let prev_attestation_filter = |att: &AttestationRef| { - self.filter_op_pool_attestation(&mut prev_filter_cache, att, &state) - }; - let mut curr_filter_cache = HashMap::new(); - let curr_attestation_filter = |att: &AttestationRef| { - self.filter_op_pool_attestation(&mut curr_filter_cache, att, &state) - }; + let prev_chain = self.clone(); + let prev_filter_cache_lock = Mutex::new(HashMap::new()); + let prev_attestation_filter = + |checkpoint: &CheckpointKey, data: &CompactAttestationData| { + let mut prev_filter_cache = prev_filter_cache_lock.lock(); + prev_chain.filter_op_pool_attestation( + &mut prev_filter_cache, + checkpoint, + data, + &state, + ) + }; + let curr_chain = self.clone(); + let curr_filter_cache_lock = Mutex::new(HashMap::new()); + let curr_attestation_filter = + |checkpoint: &CheckpointKey, data: &CompactAttestationData| { + let mut curr_filter_cache = curr_filter_cache_lock.lock(); + curr_chain.filter_op_pool_attestation( + &mut curr_filter_cache, + checkpoint, + data, + &state, + ) + }; let mut attestations = self .op_pool @@ -4798,6 +4803,7 @@ impl BeaconChain { &state, prev_attestation_filter, curr_attestation_filter, + MAX_AGGREGATES_PER_DATA_FOR_CLIQUES, &self.spec, ) .map_err(BlockProductionError::OpPoolError)?; diff --git a/beacon_node/beacon_chain/src/metrics.rs b/beacon_node/beacon_chain/src/metrics.rs index a23bcdc0b55..45857993342 100644 --- a/beacon_node/beacon_chain/src/metrics.rs +++ b/beacon_node/beacon_chain/src/metrics.rs @@ -121,10 +121,6 @@ lazy_static! { "beacon_block_production_slot_process_seconds", "Time taken to advance the state to the block production slot" ); - pub static ref BLOCK_PRODUCTION_UNAGGREGATED_TIMES: Result = try_create_histogram( - "beacon_block_production_unaggregated_seconds", - "Time taken to import the naive aggregation pool for block production" - ); pub static ref BLOCK_PRODUCTION_ATTESTATION_TIMES: Result = try_create_histogram( "beacon_block_production_attestation_seconds", "Time taken to pack attestations into a block" diff --git a/beacon_node/http_api/src/lib.rs b/beacon_node/http_api/src/lib.rs index 309db204ae2..70fc467cf6f 100644 --- a/beacon_node/http_api/src/lib.rs +++ b/beacon_node/http_api/src/lib.rs @@ -1894,6 +1894,20 @@ pub fn serve( format!("Naive aggregation pool: {:?}", e), )); } + + if let Err(e) = chain.add_to_block_inclusion_pool(attestation) { + error!(log, + "Failure adding verified attestation to the operation pool"; + "error" => ?e, + "request_index" => index, + "committee_index" => committee_index, + "slot" => slot, + ); + failures.push(api_types::Failure::new( + index, + format!("Operation pool: {:?}", e), + )); + } } if num_already_known > 0 { diff --git a/beacon_node/network/src/network_beacon_processor/gossip_methods.rs b/beacon_node/network/src/network_beacon_processor/gossip_methods.rs index 82daf74efe0..24eb4d2a37c 100644 --- a/beacon_node/network/src/network_beacon_processor/gossip_methods.rs +++ b/beacon_node/network/src/network_beacon_processor/gossip_methods.rs @@ -376,6 +376,16 @@ impl NetworkBeaconProcessor { ) } + if let Err(e) = self.chain.add_to_block_inclusion_pool(verified_attestation) { + debug!( + self.log, + "Attestation invalid for op pool"; + "reason" => ?e, + "peer" => %peer_id, + "beacon_block_root" => ?beacon_block_root, + ); + } + metrics::inc_counter( &metrics::BEACON_PROCESSOR_UNAGGREGATED_ATTESTATION_IMPORTED_TOTAL, ); diff --git a/beacon_node/operation_pool/Cargo.toml b/beacon_node/operation_pool/Cargo.toml index 36595994f0a..9241b552239 100644 --- a/beacon_node/operation_pool/Cargo.toml +++ b/beacon_node/operation_pool/Cargo.toml @@ -19,11 +19,13 @@ serde = { workspace = true } store = { workspace = true } bitvec = { workspace = true } rand = { workspace = true } +rpds = { workspace = true } [dev-dependencies] beacon_chain = { workspace = true } tokio = { workspace = true } maplit = { workspace = true } +quickcheck = { workspace = true } [features] -portable = ["beacon_chain/portable"] \ No newline at end of file +portable = ["beacon_chain/portable"] diff --git a/beacon_node/operation_pool/src/attestation.rs b/beacon_node/operation_pool/src/attestation.rs index 97c291aa855..9507368c632 100644 --- a/beacon_node/operation_pool/src/attestation.rs +++ b/beacon_node/operation_pool/src/attestation.rs @@ -1,4 +1,4 @@ -use crate::attestation_storage::AttestationRef; +use crate::attestation_storage::{AttestationRef, CompactAttestationData}; use crate::max_cover::MaxCover; use crate::reward_cache::RewardCache; use state_processing::common::{ @@ -127,6 +127,7 @@ impl<'a, T: EthSpec> MaxCover for AttMaxCover<'a, T> { type Object = Attestation; type Intermediate = AttestationRef<'a, T>; type Set = HashMap; + type Key = CompactAttestationData; fn intermediate(&self) -> &AttestationRef<'a, T> { &self.att @@ -164,6 +165,10 @@ impl<'a, T: EthSpec> MaxCover for AttMaxCover<'a, T> { fn score(&self) -> usize { self.fresh_validators_rewards.values().sum::() as usize } + + fn key(&self) -> CompactAttestationData { + self.att.data.clone() + } } /// Extract the validators for which `attestation` would be their earliest in the epoch. @@ -188,6 +193,7 @@ pub fn earliest_attestation_validators( } else if attestation.checkpoint.target_epoch == state.previous_epoch() { &base_state.previous_epoch_attestations } else { + #[allow(clippy::unwrap_used)] return BitList::with_capacity(0).unwrap(); }; diff --git a/beacon_node/operation_pool/src/attestation_storage.rs b/beacon_node/operation_pool/src/attestation_storage.rs index dac5e25b349..5a5209e5bb0 100644 --- a/beacon_node/operation_pool/src/attestation_storage.rs +++ b/beacon_node/operation_pool/src/attestation_storage.rs @@ -12,7 +12,7 @@ pub struct CheckpointKey { pub target_epoch: Epoch, } -#[derive(Debug, PartialEq, Eq, Hash)] +#[derive(Debug, PartialEq, Eq, Hash, Clone)] pub struct CompactAttestationData { pub slot: Slot, pub index: u64, @@ -20,7 +20,7 @@ pub struct CompactAttestationData { pub target_root: Hash256, } -#[derive(Debug, PartialEq)] +#[derive(Debug, PartialEq, Clone)] pub struct CompactIndexedAttestation { pub attesting_indices: Vec, pub aggregation_bits: BitList, @@ -48,7 +48,9 @@ pub struct AttestationMap { #[derive(Debug, Default, PartialEq)] pub struct AttestationDataMap { - attestations: HashMap>>, + pub aggregate_attestations: HashMap>>, + pub unaggregate_attestations: + HashMap>>, } impl SplitAttestation { @@ -152,37 +154,29 @@ impl AttestationMap { } = SplitAttestation::new(attestation, attesting_indices); let attestation_map = self.checkpoint_map.entry(checkpoint).or_default(); - let attestations = attestation_map.attestations.entry(data).or_default(); - - // Greedily aggregate the attestation with all existing attestations. - // NOTE: this is sub-optimal and in future we will remove this in favour of max-clique - // aggregation. - let mut aggregated = false; + let attestations = if indexed.attesting_indices.len() > 1 { + attestation_map + .aggregate_attestations + .entry(data) + .or_default() + } else { + attestation_map + .unaggregate_attestations + .entry(data) + .or_default() + }; + let mut observed = false; for existing_attestation in attestations.iter_mut() { - if existing_attestation.signers_disjoint_from(&indexed) { - existing_attestation.aggregate(&indexed); - aggregated = true; - } else if *existing_attestation == indexed { - aggregated = true; + if *existing_attestation == indexed { + observed = true; } } - if !aggregated { + if !observed { attestations.push(indexed); } } - /// Iterate all attestations matching the given `checkpoint_key`. - pub fn get_attestations<'a>( - &'a self, - checkpoint_key: &'a CheckpointKey, - ) -> impl Iterator> + 'a { - self.checkpoint_map - .get(checkpoint_key) - .into_iter() - .flat_map(|attestation_map| attestation_map.iter(checkpoint_key)) - } - /// Iterate all attestations in the map. pub fn iter(&self) -> impl Iterator> { self.checkpoint_map @@ -196,6 +190,13 @@ impl AttestationMap { .retain(|checkpoint_key, _| current_epoch <= checkpoint_key.target_epoch + 1); } + pub fn get_attestation_map( + &self, + checkpoint_key: &CheckpointKey, + ) -> Option<&AttestationDataMap> { + self.checkpoint_map.get(checkpoint_key) + } + /// Statistics about all attestations stored in the map. pub fn stats(&self) -> AttestationStats { self.checkpoint_map @@ -216,24 +217,48 @@ impl AttestationDataMap { &'a self, checkpoint_key: &'a CheckpointKey, ) -> impl Iterator> + 'a { - self.attestations.iter().flat_map(|(data, vec_indexed)| { - vec_indexed.iter().map(|indexed| AttestationRef { - checkpoint: checkpoint_key, - data, - indexed, - }) - }) + let aggregates = self + .aggregate_attestations + .iter() + .flat_map(|(data, vec_indexed)| { + vec_indexed.iter().map(|indexed| AttestationRef { + checkpoint: checkpoint_key, + data, + indexed, + }) + }); + + let unaggregates = self + .aggregate_attestations + .iter() + .flat_map(|(data, vec_indexed)| { + vec_indexed.iter().map(|indexed| AttestationRef { + checkpoint: checkpoint_key, + data, + indexed, + }) + }); + + aggregates.chain(unaggregates) } pub fn stats(&self) -> AttestationStats { let mut stats = AttestationStats::default(); - for aggregates in self.attestations.values() { + for aggregates in self.aggregate_attestations.values() { stats.num_attestations += aggregates.len(); stats.num_attestation_data += 1; stats.max_aggregates_per_data = std::cmp::max(stats.max_aggregates_per_data, aggregates.len()); } + + for (data, unaggregates) in self.unaggregate_attestations.iter() { + stats.num_attestations += unaggregates.len(); + if !self.aggregate_attestations.contains_key(data) { + stats.num_attestation_data += 1; + } + } + stats } } diff --git a/beacon_node/operation_pool/src/attester_slashing.rs b/beacon_node/operation_pool/src/attester_slashing.rs index f5916384d4b..1793425d94a 100644 --- a/beacon_node/operation_pool/src/attester_slashing.rs +++ b/beacon_node/operation_pool/src/attester_slashing.rs @@ -42,6 +42,7 @@ impl<'a, T: EthSpec> MaxCover for AttesterSlashingMaxCover<'a, T> { type Intermediate = AttesterSlashing; /// The type used to represent sets. type Set = HashMap; + type Key = (); fn intermediate(&self) -> &AttesterSlashing { self.slashing @@ -69,4 +70,5 @@ impl<'a, T: EthSpec> MaxCover for AttesterSlashingMaxCover<'a, T> { fn score(&self) -> usize { self.effective_balances.values().sum::() as usize } + fn key(&self) -> Self::Key {} } diff --git a/beacon_node/operation_pool/src/bron_kerbosch.rs b/beacon_node/operation_pool/src/bron_kerbosch.rs new file mode 100644 index 00000000000..28b8a6222e0 --- /dev/null +++ b/beacon_node/operation_pool/src/bron_kerbosch.rs @@ -0,0 +1,261 @@ +use crate::OpPoolError as Error; +use rpds::HashTrieSet; + +/// Entry point for the Bron-Kerbosh algorithm. Takes a vector of `vertices` of type +/// `T : Compatible`. Returns all the maximal cliques (as a matrix of indices) for the graph +/// `G = (V,E)` where `V` is `vertices` and `E` encodes the `is_compatible` relationship. +pub fn bron_kerbosch bool>( + vertices: &[T], + is_compatible: F, +) -> Result>, Error> { + // create empty vector to store cliques + let mut cliques: Vec> = vec![]; + + if !vertices.is_empty() { + // build neighbourhoods and degeneracy ordering, also move to index-based reasoning + let neighbourhoods = + compute_neighbourhoods(vertices, is_compatible).ok_or(Error::BronKerboschLogicError)?; + let ordering = degeneracy_order(vertices.len(), &neighbourhoods); + + let mut publish_clique = |c| cliques.push(c); + + for (i, &vi) in ordering.iter().enumerate() { + let vi_neighbourhood = neighbourhoods + .get(vi) + .ok_or(Error::BronKerboschLogicError)?; + let p = ordering + .get(i + 1..ordering.len()) + .ok_or(Error::BronKerboschLogicError)? + .iter() + .filter(|pj| vi_neighbourhood.contains(pj)) + .copied() + .collect(); + let r = HashTrieSet::default().insert(vi); + let x = ordering + .get(0..i) + .ok_or(Error::BronKerboschLogicError)? + .iter() + .filter(|xj| vi_neighbourhood.contains(xj)) + .copied() + .collect(); + bron_kerbosch_aux(r, p, x, &neighbourhoods, &mut publish_clique)?; + } + } + + Ok(cliques) +} + +/// A function to the neighbourhoods for all nodes in the list. The neighbourhood `N(a)` of a +/// vertex `a` in `vertices` is the set of vertices `b` in `vertices` such that +/// `is_compatible(&a, &b) == true`. The function assumes that `is_compatible` is symmetric, +/// and returns a symmetric matrix (`Vec>`) of indices, where each index corresponds +/// to the relative vertex in `vertices`. +fn compute_neighbourhoods bool>( + vertices: &[T], + is_compatible: F, +) -> Option>> { + let mut neighbourhoods = vec![]; + neighbourhoods.resize_with(vertices.len(), Vec::new); + for (i, vi) in vertices.get(0..vertices.len() - 1)?.iter().enumerate() { + for (j, vj) in vertices.iter().enumerate().skip(i + 1) { + if is_compatible(vi, vj) { + neighbourhoods.get_mut(i)?.push(j); + neighbourhoods.get_mut(j)?.push(i); + } + } + } + Some(neighbourhoods) +} + +/// Produces a degeneracy ordering of a set of vertices. +fn degeneracy_order(num_vertices: usize, neighbourhoods: &[Vec]) -> Vec { + let mut v: Vec = (0..num_vertices).collect(); + v.sort_unstable_by_key(|i| neighbourhoods.get(*i).map(|n| n.len()).unwrap_or(0)); + v +} + +/// Auxiliary function to be used in the recursive call of the Bron-Kerbosh algorithm. +/// Parameters +/// * `r` - a working clique that is being built +/// * `p` - a set of candidate vertices to be added to r +/// * `x` - a set of vertices that have been explored and shouldn't be added to r +/// * `neighbourhoods` - a data structure to hold the neighbourhoods of each vertex +/// * `publish_clique` - a callback function to call whenever a clique has been produced +fn bron_kerbosch_aux( + r: HashTrieSet, + mut p: HashTrieSet, + mut x: HashTrieSet, + neighbourhoods: &Vec>, + publish_clique: &mut F, +) -> Result<(), Error> +where + F: FnMut(HashTrieSet), +{ + if p.is_empty() && x.is_empty() { + publish_clique(r); + return Ok(()); + } + + let pivot = find_pivot(&p, &x, neighbourhoods)?; + let pivot_neighbours = neighbourhoods + .get(pivot) + .ok_or(Error::BronKerboschLogicError)?; + + let ip = hash_set_filter(&p, |e| !pivot_neighbours.contains(e)); + + for v in ip.iter() { + let n_set = neighbourhoods + .get(*v) + .ok_or(Error::BronKerboschLogicError)?; + + let nr = r.insert(*v); + let np = hash_set_filter(&p, |e| n_set.contains(e)); + let nx = hash_set_filter(&x, |e| n_set.contains(e)); + + bron_kerbosch_aux(nr, np, nx, neighbourhoods, publish_clique)?; + + p.remove_mut(v); + x.insert_mut(*v); + } + Ok(()) +} + +/// Identifies pivot for Bron-Kerbosh pivoting technique. +fn find_pivot( + p: &HashTrieSet, + x: &HashTrieSet, + neighbourhoods: &[Vec], +) -> Result { + p.iter() + .chain(x.iter()) + .min_by_key(|&e| { + p.iter() + .filter(|ee| { + neighbourhoods + .get(*e) + .map(|n| n.contains(ee)) + .unwrap_or(false) + }) + .count() + }) + .copied() + .ok_or(Error::BronKerboschLogicError) +} + +/// Store the members of `set` matching `predicate` in a new set. +fn hash_set_filter

(set: &HashTrieSet, predicate: P) -> HashTrieSet +where + P: Fn(&usize) -> bool, +{ + let mut new_set = set.clone(); + for e in set.iter() { + if !predicate(e) { + new_set.remove_mut(e); + } + } + new_set +} + +#[cfg(test)] +mod tests { + use super::*; + use quickcheck::quickcheck; + use std::collections::HashSet; + #[test] + fn bron_kerbosch_small_test() { + let vertices: Vec = (0..7).collect(); + let edges = [ + (0, 1), + (0, 2), + (0, 3), + (1, 2), + (1, 3), + (2, 3), + (0, 4), + (4, 5), + (4, 6), + (1, 6), + (0, 6), + (4, 6), + ]; + + let is_compatible = |first: &usize, second: &usize| -> bool { + edges.contains(&(*first, *second)) || edges.contains(&(*first, *second)) + }; + + println!("{:?}", bron_kerbosch(&vertices, is_compatible).unwrap()); + } + + quickcheck! { + fn no_panic(vertices: Vec, adjacencies: HashSet<(usize, usize)>) -> bool { + let is_compatible = |i: &usize, j: &usize| adjacencies.contains(&(*i, *j)) || adjacencies.contains(&(*j, *i)); + bron_kerbosch(&vertices, is_compatible).unwrap(); + true + } + + fn at_least_one_clique_returned(vertices: Vec, adjacencies: HashSet<(usize, usize)>) -> bool { + if vertices.is_empty() { + return true; + } + let is_compatible = |i: &usize, j: &usize| adjacencies.contains(&(*i, *j)) || adjacencies.contains(&(*j, *i)); + let res = bron_kerbosch(&vertices, is_compatible).unwrap(); + !res.is_empty() + } + + fn no_clique_is_empty(vertices: Vec, adjacencies: HashSet<(usize, usize)>) -> bool { + if vertices.is_empty() { + return true; + } + let is_compatible = |i: &usize, j: &usize| adjacencies.contains(&(*i, *j)) || adjacencies.contains(&(*j, *i)); + let res = bron_kerbosch(&vertices, is_compatible).unwrap(); + for clique in res.iter() { + if clique.is_empty() { + return false; + } + } + true + } + + fn all_claimed_cliques_are_cliques(vertices: Vec, adjacencies: HashSet<(usize, usize)>) -> bool { + let is_compatible = |i: &usize, j: &usize| adjacencies.contains(&(*i, *j)) || adjacencies.contains(&(*j, *i)); + let claimed_cliques = bron_kerbosch(&vertices, is_compatible).unwrap(); + for clique in claimed_cliques { + for (ind1, vertex) in clique.iter().enumerate() { + for (ind2, other_vertex) in clique.iter().enumerate() { + if ind1 == ind2 { + continue + } + if !is_compatible(&vertices[*vertex], &vertices[*other_vertex]) { + return false; + } + } + } + } + true + } + + fn no_clique_is_a_subset_of_other_clique(vertices: Vec, adjacencies: HashSet<(usize, usize)>) -> bool { + let is_compatible = |i: &usize, j: &usize| adjacencies.contains(&(*i, *j)) || adjacencies.contains(&(*j, *i)); + let claimed_cliques = bron_kerbosch(&vertices, is_compatible).unwrap(); + let is_subset = |set1: HashTrieSet, set2: HashTrieSet| -> bool { + for vertex in set1.iter() { + if !set2.contains(vertex) { + return false; + } + } + true + }; + for (ind1, clique) in claimed_cliques.iter().enumerate() { + for (ind2, other_clique) in claimed_cliques.iter().enumerate() { + if ind1 == ind2 { + continue; + } + if is_subset(clique.clone(), other_clique.clone()) { + return false; + } + } + } + true + } + } +} diff --git a/beacon_node/operation_pool/src/lib.rs b/beacon_node/operation_pool/src/lib.rs index 7e1ddb1fd2f..c3c55c04b05 100644 --- a/beacon_node/operation_pool/src/lib.rs +++ b/beacon_node/operation_pool/src/lib.rs @@ -1,8 +1,22 @@ +// Clippy lint set up +#![cfg_attr( + not(test), + deny( + clippy::unwrap_used, + clippy::expect_used, + clippy::panic, + clippy::unreachable, + clippy::todo, + clippy::indexing_slicing + ) +)] + mod attestation; mod attestation_id; mod attestation_storage; mod attester_slashing; mod bls_to_execution_changes; +mod bron_kerbosch; mod max_cover; mod metrics; mod persistence; @@ -11,7 +25,11 @@ mod sync_aggregate_id; pub use crate::bls_to_execution_changes::ReceivedPreCapella; pub use attestation::{earliest_attestation_validators, AttMaxCover}; -pub use attestation_storage::{AttestationRef, SplitAttestation}; +use attestation_storage::{AttestationDataMap, AttestationMap, CompactIndexedAttestation}; +pub use attestation_storage::{ + AttestationRef, CheckpointKey, CompactAttestationData, SplitAttestation, +}; +use bron_kerbosch::bron_kerbosch; pub use max_cover::MaxCover; pub use persistence::{ PersistedOperationPool, PersistedOperationPoolV12, PersistedOperationPoolV14, @@ -19,7 +37,6 @@ pub use persistence::{ }; pub use reward_cache::RewardCache; -use crate::attestation_storage::{AttestationMap, CheckpointKey}; use crate::bls_to_execution_changes::BlsToExecutionChanges; use crate::sync_aggregate_id::SyncAggregateId; use attester_slashing::AttesterSlashingMaxCover; @@ -27,6 +44,7 @@ use max_cover::maximum_cover; use parking_lot::{RwLock, RwLockWriteGuard}; use rand::seq::SliceRandom; use rand::thread_rng; +use rayon::prelude::*; use state_processing::per_block_processing::errors::AttestationValidationError; use state_processing::per_block_processing::{ get_slashable_indices_modular, verify_exit, VerifySignatures, @@ -35,6 +53,7 @@ use state_processing::{SigVerifiedOp, VerifyOperation}; use std::collections::{hash_map::Entry, HashMap, HashSet}; use std::marker::PhantomData; use std::ptr; +use std::sync::atomic::{AtomicUsize, Ordering}; use types::{ sync_aggregate::Error as SyncAggregateError, typenum::Unsigned, AbstractExecPayload, Attestation, AttestationData, AttesterSlashing, BeaconState, BeaconStateError, ChainSpec, @@ -75,11 +94,12 @@ pub enum OpPoolError { RewardCacheValidatorUnknown(BeaconStateError), RewardCacheOutOfBounds, IncorrectOpPoolVariant, + BronKerboschLogicError, } #[derive(Default)] pub struct AttestationStats { - /// Total number of attestations for all committeees/indices/votes. + /// Total number of attestations for all committees/indices/votes. pub num_attestations: usize, /// Number of unique `AttestationData` attested to. pub num_attestation_data: usize, @@ -215,28 +235,121 @@ impl OperationPool { self.attestations.read().stats() } - /// Return all valid attestations for the given epoch, for use in max cover. + /// Return a vector of aggregate/unaggregate attestations which are maximal cliques + /// with resepct to the graph with attestations as vertices and an edge encoding + /// compatibility for aggregation. #[allow(clippy::too_many_arguments)] - fn get_valid_attestations_for_epoch<'a>( + #[allow(clippy::type_complexity)] + fn get_clique_aggregate_attestations_for_epoch<'a>( &'a self, checkpoint_key: &'a CheckpointKey, all_attestations: &'a AttestationMap, state: &'a BeaconState, - reward_cache: &'a RewardCache, - total_active_balance: u64, - validity_filter: impl FnMut(&AttestationRef<'a, T>) -> bool + Send, + validity_filter: impl Fn(&CheckpointKey, &CompactAttestationData) -> bool + Send + Sync, + num_valid: &AtomicUsize, + max_vertices: usize, spec: &'a ChainSpec, - ) -> impl Iterator> + Send { - all_attestations - .get_attestations(checkpoint_key) - .filter(|att| { - att.data.slot + spec.min_attestation_inclusion_delay <= state.slot() - && state.slot() <= att.data.slot + T::slots_per_epoch() + ) -> Result>)>, OpPoolError> + { + let Some(AttestationDataMap { + aggregate_attestations, + unaggregate_attestations, + }) = all_attestations.get_attestation_map(checkpoint_key) + else { + return Ok(vec![]); + }; + let mut cliques_from_aggregates: Vec<_> = aggregate_attestations + .iter() + .filter(|(data, _)| { + data.slot + spec.min_attestation_inclusion_delay <= state.slot() + && state.slot() <= data.slot + T::slots_per_epoch() + }) + .map(|(data, aggregates)| { + let aggregates: Vec<&CompactIndexedAttestation> = aggregates + .iter() + .filter(|_| validity_filter(checkpoint_key, data)) + .collect(); + num_valid.fetch_add(aggregates.len(), Ordering::Relaxed); + (data, aggregates) + }) + .map(|(data, mut aggregates)| { + // Take the N aggregates with the highest number of set bits + // This is needed to avoid the bron_kerbosch algorithm generating millions of + // cliques + let aggregates = if aggregates.len() > max_vertices { + let (left, _, _) = aggregates.select_nth_unstable_by_key(max_vertices, |a| { + std::cmp::Reverse(a.aggregation_bits.num_set_bits()) + }); + left + } else { + aggregates.as_slice() + }; + // aggregate each clique's corresponding attestations + let mut clique_aggregates: Vec> = + bron_kerbosch(aggregates, is_compatible)? + .iter() + .map(|clique| { + aggregate_attestations_by_indices(aggregates, clique.iter().copied()) + .ok_or(OpPoolError::BronKerboschLogicError) + }) + .collect::, _>>()?; + let mut indices_to_remove = Vec::new(); + clique_aggregates.sort_unstable_by_key(|att| att.attesting_indices.len()); + for (index, clique) in clique_aggregates.iter().enumerate() { + for bigger_clique in clique_aggregates.iter().skip(index + 1) { + if clique + .aggregation_bits + .is_subset(&bigger_clique.aggregation_bits) + { + indices_to_remove.push(index); + break; + } + } + } + + for index in indices_to_remove.iter().rev() { + clique_aggregates.swap_remove(*index); + } + + // aggregate unaggregate attestations into the clique aggregates + // if compatible + if let Some(unaggregate_attestations) = unaggregate_attestations.get(data) { + for attestation in unaggregate_attestations { + num_valid.fetch_add(1, Ordering::Relaxed); + for clique_aggregate in &mut clique_aggregates { + if clique_aggregate.signers_disjoint_from(attestation) { + clique_aggregate.aggregate(attestation); + } + } + } + } + + Ok((data, clique_aggregates)) }) - .filter(validity_filter) - .filter_map(move |att| { - AttMaxCover::new(att, state, reward_cache, total_active_balance, spec) + .collect::, OpPoolError>>()?; + + // include aggregated attestations from unaggregated attestations whose + // attestation data doesn't appear in aggregated_attestations + unaggregate_attestations + .iter() + .filter(|(data, _)| { + data.slot + spec.min_attestation_inclusion_delay <= state.slot() + && state.slot() <= data.slot + T::slots_per_epoch() + && !aggregate_attestations.contains_key(data) }) + .for_each(|(data, unaggregates)| { + let mut unaggregates = unaggregates.iter(); + if let Some(att) = unaggregates.next() { + if !validity_filter(checkpoint_key, data) { + return; + } + + let mut att = att.clone(); + unaggregates.for_each(|valid_att| att.aggregate(valid_att)); + cliques_from_aggregates.push((data, vec![att])) + } + }); + Ok(cliques_from_aggregates) } /// Get a list of attestations for inclusion in a block. @@ -248,8 +361,13 @@ impl OperationPool { pub fn get_attestations( &self, state: &BeaconState, - prev_epoch_validity_filter: impl for<'a> FnMut(&AttestationRef<'a, T>) -> bool + Send, - curr_epoch_validity_filter: impl for<'a> FnMut(&AttestationRef<'a, T>) -> bool + Send, + prev_epoch_validity_filter: impl for<'a> Fn(&CheckpointKey, &CompactAttestationData) -> bool + + Send + + Sync, + curr_epoch_validity_filter: impl for<'a> Fn(&CheckpointKey, &CompactAttestationData) -> bool + + Send + + Sync, + max_vertices: usize, spec: &ChainSpec, ) -> Result>, OpPoolError> { // Attestations for the current fork, which may be from the current or previous epoch. @@ -268,31 +386,57 @@ impl OperationPool { // Split attestations for the previous & current epochs, so that we // can optimise them individually in parallel. - let mut num_prev_valid = 0_i64; - let mut num_curr_valid = 0_i64; + let num_prev_valid = AtomicUsize::new(0); + let num_curr_valid = AtomicUsize::new(0); - let prev_epoch_att = self - .get_valid_attestations_for_epoch( + // If we're in the genesis epoch, just use the current epoch attestations. + let prev_epoch_cliqued_atts = if prev_epoch_key != curr_epoch_key { + self.get_clique_aggregate_attestations_for_epoch( &prev_epoch_key, &*all_attestations, state, - &reward_cache, - total_active_balance, prev_epoch_validity_filter, + &num_prev_valid, + max_vertices, spec, - ) - .inspect(|_| num_prev_valid += 1); - let curr_epoch_att = self - .get_valid_attestations_for_epoch( - &curr_epoch_key, - &*all_attestations, - state, - &reward_cache, - total_active_balance, - curr_epoch_validity_filter, - spec, - ) - .inspect(|_| num_curr_valid += 1); + )? + } else { + vec![] + }; + + let prev_epoch_cliqued_atts = prev_epoch_cliqued_atts.iter().flat_map(|(data, atts)| { + atts.iter() + .map(|indexed| AttestationRef { + checkpoint: &prev_epoch_key, + data, + indexed, + }) + .filter_map(|att| { + AttMaxCover::new(att, state, &reward_cache, total_active_balance, spec) + }) + }); + + let curr_epoch_cliqued_atts = self.get_clique_aggregate_attestations_for_epoch( + &curr_epoch_key, + &*all_attestations, + state, + curr_epoch_validity_filter, + &num_curr_valid, + max_vertices, + spec, + )?; + + let curr_epoch_cliqued_atts = curr_epoch_cliqued_atts.iter().flat_map(|(data, atts)| { + atts.iter() + .map(|indexed| AttestationRef { + checkpoint: &curr_epoch_key, + data, + indexed, + }) + .filter_map(|att| { + AttMaxCover::new(att, state, &reward_cache, total_active_balance, spec) + }) + }); let prev_epoch_limit = if let BeaconState::Base(base_state) = state { std::cmp::min( @@ -311,21 +455,31 @@ impl OperationPool { if prev_epoch_key == curr_epoch_key { vec![] } else { - maximum_cover(prev_epoch_att, prev_epoch_limit, "prev_epoch_attestations") + maximum_cover( + prev_epoch_cliqued_atts, + prev_epoch_limit, + "prev_epoch_attestations", + ) } }, move || { let _timer = metrics::start_timer(&metrics::ATTESTATION_CURR_EPOCH_PACKING_TIME); maximum_cover( - curr_epoch_att, + curr_epoch_cliqued_atts, T::MaxAttestations::to_usize(), "curr_epoch_attestations", ) }, ); - metrics::set_gauge(&metrics::NUM_PREV_EPOCH_ATTESTATIONS, num_prev_valid); - metrics::set_gauge(&metrics::NUM_CURR_EPOCH_ATTESTATIONS, num_curr_valid); + metrics::set_gauge( + &metrics::NUM_PREV_EPOCH_ATTESTATIONS, + num_prev_valid.load(Ordering::Relaxed) as i64, + ); + metrics::set_gauge( + &metrics::NUM_CURR_EPOCH_ATTESTATIONS, + num_curr_valid.load(Ordering::Relaxed) as i64, + ); Ok(max_cover::merge_solutions( curr_cover, @@ -707,6 +861,25 @@ impl OperationPool { } } +fn is_compatible( + x: &&CompactIndexedAttestation, + y: &&CompactIndexedAttestation, +) -> bool { + x.signers_disjoint_from(y) +} + +fn aggregate_attestations_by_indices( + attestations: &[&CompactIndexedAttestation], + mut indices: impl Iterator, +) -> Option> { + let first = indices.next()?; + let mut attestation: CompactIndexedAttestation = (*attestations.get(first)?).clone(); + for i in indices { + attestation.aggregate(attestations.get(i)?); + } + Some(attestation) +} + /// Filter up to a maximum number of operations out of an iterator. fn filter_limit_operations<'a, T: 'a, V: 'a, I, F, G>( operations: I, @@ -773,6 +946,7 @@ mod release_tests { }; use lazy_static::lazy_static; use maplit::hashset; + use state_processing::state_advance::complete_state_advance; use state_processing::{common::get_attesting_indices_from_state, VerifyOperation}; use std::collections::BTreeSet; use types::consts::altair::SYNC_COMMITTEE_SUBNET_COUNT; @@ -963,12 +1137,12 @@ mod release_tests { } } - assert_eq!(op_pool.num_attestations(), committees.len()); + assert_eq!(op_pool.num_attestations(), 128); // Before the min attestation inclusion delay, get_attestations shouldn't return anything. assert_eq!( op_pool - .get_attestations(&state, |_| true, |_| true, spec) + .get_attestations(&state, |_, _| true, |_, _| true, 16, spec) .expect("should have attestations") .len(), 0 @@ -978,7 +1152,7 @@ mod release_tests { *state.slot_mut() += spec.min_attestation_inclusion_delay; let block_attestations = op_pool - .get_attestations(&state, |_| true, |_| true, spec) + .get_attestations(&state, |_, _| true, |_, _| true, 16, spec) .expect("Should have block attestations"); assert_eq!(block_attestations.len(), committees.len()); @@ -990,7 +1164,7 @@ mod release_tests { // Prune attestations shouldn't do anything at this point. op_pool.prune_attestations(state.current_epoch()); - assert_eq!(op_pool.num_attestations(), committees.len()); + assert_eq!(op_pool.num_attestations(), 128); // But once we advance to more than an epoch after the attestation, it should prune it // out of existence. @@ -1044,7 +1218,7 @@ mod release_tests { fn attestation_pairwise_overlapping() { let (harness, ref spec) = attestation_test_state::(1); - let state = harness.get_current_state(); + let mut state = harness.get_current_state(); let op_pool = OperationPool::::new(); @@ -1056,8 +1230,8 @@ mod release_tests { .map(BeaconCommittee::into_owned) .collect::>(); - let num_validators = - MainnetEthSpec::slots_per_epoch() as usize * spec.target_committee_size; + // NOTE: this test is VERY sensitive to the number of attestations + let num_validators = 16 * spec.target_committee_size; let attestations = harness.make_attestations( (0..num_validators).collect::>().as_slice(), @@ -1071,6 +1245,11 @@ mod release_tests { // Create attestations that overlap on `step_size` validators, like: // {0,1,2,3}, {2,3,4,5}, {4,5,6,7}, ... for (atts1, _) in attestations { + assert_eq!( + (atts1.len() - step_size) % (2 * step_size), + 0, + "to get two aggregates we need to be able to split by 2*step_size" + ); let atts2 = atts1.clone(); let aggs1 = atts1 .chunks_exact(step_size * 2) @@ -1121,12 +1300,38 @@ mod release_tests { } } + let num_valid = AtomicUsize::new(0); + let (_, curr) = CheckpointKey::keys_for_state(&state); + let all_attestations = op_pool.attestations.read(); + + complete_state_advance(&mut state, None, Slot::new(1), spec).unwrap(); + let clique_attestations = op_pool + .get_clique_aggregate_attestations_for_epoch( + &curr, + &all_attestations, + &state, + |_, _| true, + &num_valid, + 32, + spec, + ) + .unwrap(); + let best_attestations = op_pool + .get_attestations(&state, |_, _| true, |_, _| true, 32, spec) + .unwrap(); + + // There should only be attestations for 1 attestation data. + let stats = op_pool.attestation_stats(); + assert_eq!(committees.len(), 1); + assert_eq!(stats.num_attestation_data, 1); + + // There should be multiple cliques for this single attestation data. + assert_eq!(clique_attestations.len(), 1); + assert!(clique_attestations[0].1.len() > 1); + // The attestations should get aggregated into two attestations that comprise all // validators. - let stats = op_pool.attestation_stats(); - assert_eq!(stats.num_attestation_data, committees.len()); - assert_eq!(stats.num_attestations, 2 * committees.len()); - assert_eq!(stats.max_aggregates_per_data, 2); + assert_eq!(best_attestations.len(), 2); } /// Create a bunch of attestations signed by a small number of validators, and another @@ -1214,7 +1419,7 @@ mod release_tests { *state.slot_mut() += spec.min_attestation_inclusion_delay; let best_attestations = op_pool - .get_attestations(&state, |_| true, |_| true, spec) + .get_attestations(&state, |_, _| true, |_, _| true, 32, spec) .expect("should have best attestations"); assert_eq!(best_attestations.len(), max_attestations); @@ -1311,7 +1516,7 @@ mod release_tests { *state.slot_mut() += spec.min_attestation_inclusion_delay; let best_attestations = op_pool - .get_attestations(&state, |_| true, |_| true, spec) + .get_attestations(&state, |_, _| true, |_, _| true, 32, spec) .expect("should have valid best attestations"); assert_eq!(best_attestations.len(), max_attestations); diff --git a/beacon_node/operation_pool/src/max_cover.rs b/beacon_node/operation_pool/src/max_cover.rs index 2e629f786b3..c7d52b60562 100644 --- a/beacon_node/operation_pool/src/max_cover.rs +++ b/beacon_node/operation_pool/src/max_cover.rs @@ -1,3 +1,5 @@ +use std::{collections::HashMap, hash::Hash}; + use crate::metrics; use itertools::Itertools; @@ -15,6 +17,8 @@ pub trait MaxCover: Clone { type Intermediate: Clone; /// The type used to represent sets. type Set: Clone; + /// The type used to represent keys. + type Key: Clone + PartialEq + Eq + Hash; /// Extract the intermediate object. fn intermediate(&self) -> &Self::Intermediate; @@ -28,6 +32,8 @@ pub trait MaxCover: Clone { fn update_covering_set(&mut self, max_obj: &Self::Intermediate, max_set: &Self::Set); /// The quality of this item's covering set, usually its cardinality. fn score(&self) -> usize; + /// Convert value to keyed value. + fn key(&self) -> Self::Key; } /// Helper struct to track which items of the input are still available for inclusion. @@ -56,11 +62,15 @@ where T: MaxCover, { // Construct an initial vec of all items, marked available. - let mut all_items: Vec<_> = items_iter + let mut all_items: HashMap>> = items_iter .into_iter() .map(MaxCoverItem::new) .filter(|x| x.item.score() != 0) - .collect(); + .map(|val| (val.item.key(), val)) + .fold(HashMap::new(), |mut acc, (key, val)| { + acc.entry(key).or_default().push(val); + acc + }); metrics::set_int_gauge( &metrics::MAX_COVER_NON_ZERO_ITEMS, @@ -72,29 +82,33 @@ where for _ in 0..limit { // Select the item with the maximum score. - let best = match all_items + let best = all_items .iter_mut() - .filter(|x| x.available && x.item.score() != 0) - .max_by_key(|x| x.item.score()) - { - Some(x) => { - x.available = false; - x.item.clone() + .flat_map(|(key, items)| items.iter_mut().map(move |x| (key, x))) + .filter(|(_, val)| val.available && val.item.score() != 0) + .max_by_key(|(_, val)| val.item.score()); + + let (best_key, best_item) = match best { + Some((key, val)) => { + val.available = false; + (key.clone(), val.item.clone()) } None => return result, }; // Update the covering sets of the other items, for the inclusion of the selected item. // Items covered by the selected item can't be re-covered. - all_items - .iter_mut() - .filter(|x| x.available && x.item.score() != 0) - .for_each(|x| { - x.item - .update_covering_set(best.intermediate(), best.covering_set()) - }); + if let Some(items) = all_items.get_mut(&best_key) { + items + .iter_mut() + .filter(|x| x.available && x.item.score() != 0) + .for_each(|x| { + x.item + .update_covering_set(best_item.intermediate(), best_item.covering_set()) + }); + } - result.push(best); + result.push(best_item); } result @@ -128,6 +142,7 @@ mod test { type Object = Self; type Intermediate = Self; type Set = Self; + type Key = (); fn intermediate(&self) -> &Self { self @@ -149,6 +164,8 @@ mod test { fn score(&self) -> usize { self.len() } + + fn key(&self) {} } fn example_system() -> Vec> {