Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Bron-Kerbosch attestation aggregation #4507

Open
wants to merge 41 commits into
base: unstable
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 39 commits
Commits
Show all changes
41 commits
Select commit Hold shift + click to select a range
59d9c0a
Add bron_kerbosch from the satalia and sigp implementation
GeemoCandama Jul 14, 2023
889da77
I needed to rebase and fmt
GeemoCandama Jul 14, 2023
eb5d6d2
op-pool maintains unaggregated_attestations and calculates bron kerbo…
GeemoCandama Jul 28, 2023
7bac0dd
add unagg attestations to op_pool during processing. fmt
GeemoCandama Jul 28, 2023
3fa1dd5
added a couple of comments
GeemoCandama Jul 28, 2023
2aeb8c6
delete unused
GeemoCandama Aug 1, 2023
22d0d35
fix using wrong checkpoint key
GeemoCandama Aug 15, 2023
673851a
avoid underflow on zero length vector
GeemoCandama Aug 22, 2023
3f0011b
This test is no longer the correct logic since we do not aggregate th…
GeemoCandama Aug 22, 2023
2d211be
remove collect
GeemoCandama Sep 1, 2023
649c4bd
using u64 instead of &u64 I think this might have been the issue but …
GeemoCandama Sep 2, 2023
4e81596
remove subset attestations
GeemoCandama Sep 28, 2023
f14b56f
sproul patch: fixes iterator issue
GeemoCandama Oct 4, 2023
77bbfcc
use swap_remove
GeemoCandama Oct 7, 2023
b02301a
sort unstable by
GeemoCandama Oct 7, 2023
f77c5c5
minor
GeemoCandama Oct 7, 2023
d2bd6af
bron_kerbosch done in parallel iter without changing the validity clo…
GeemoCandama Oct 13, 2023
5d2c4ba
using keyed version of max cover
GeemoCandama Oct 19, 2023
f7404d4
fmt
GeemoCandama Oct 19, 2023
9e781e1
Remove allocations from `find_pivot`
michaelsproul Oct 20, 2023
06aed57
Fix bug in max cover and optimise
michaelsproul Oct 20, 2023
b23449f
add quickcheck tests
GeemoCandama Oct 20, 2023
01d2a13
corrected test
GeemoCandama Oct 22, 2023
a4fa4cc
added test and fixed other stuff. Test is failing
GeemoCandama Oct 25, 2023
2dba127
HashTrieSet
GeemoCandama Oct 29, 2023
b93979a
Better use of HashTrieSet
GeemoCandama Oct 30, 2023
e6f87ef
fmt
GeemoCandama Oct 30, 2023
e1a01e3
Fix degeneracy order and remove more clones
michaelsproul Oct 31, 2023
c53d236
Back to vecs
michaelsproul Oct 31, 2023
74d0a15
Merge pull request #1 from michaelsproul/bron_kerbosch_attestation_ag…
GeemoCandama Oct 31, 2023
7a8da73
adding limit for BK vertices
GeemoCandama Nov 2, 2023
41fc685
Allocation optimisations & Clippy cleanup
michaelsproul Nov 2, 2023
50ab6c8
Merge pull request #2 from michaelsproul/bron_kerbosch_attestation_ag…
GeemoCandama Nov 3, 2023
bca7b9f
Add const for aggregates-per-data
michaelsproul Nov 2, 2023
498c2c1
Fix compute_neighbourhood and tests!
michaelsproul Nov 3, 2023
c6b9938
Simplify stats
michaelsproul Nov 3, 2023
9ad61b9
Merge pull request #3 from michaelsproul/bron_kerbosch_attestation_ag…
GeemoCandama Nov 3, 2023
fdee0a0
Merge remote-tracking branch 'origin/unstable' into bron_kerbosch_att…
michaelsproul Nov 3, 2023
0e2f24e
change validity filter and use it only when necessary
GeemoCandama Nov 3, 2023
376c408
avoid deadlock by sequential iter and collecting then par iter
GeemoCandama Nov 14, 2023
32dd188
remove rayon from bron-kerbosch processing
GeemoCandama Jan 23, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 20 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -139,6 +139,7 @@ rayon = "1.7"
regex = "1"
reqwest = { version = "0.11", default-features = false, features = ["blocking", "json", "stream", "rustls-tls", "native-tls-vendored"] }
ring = "0.16"
rpds = "1.0.1"
rusqlite = { version = "0.28", features = ["bundled"] }
serde = { version = "1", features = ["derive"] }
serde_json = "1"
Expand Down
76 changes: 41 additions & 35 deletions beacon_node/beacon_chain/src/beacon_chain.rs
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,10 @@ use futures::channel::mpsc::Sender;
use itertools::process_results;
use itertools::Itertools;
use kzg::Kzg;
use operation_pool::{AttestationRef, OperationPool, PersistedOperationPool, ReceivedPreCapella};
use operation_pool::{
CheckpointKey, CompactAttestationData, OperationPool, PersistedOperationPool,
ReceivedPreCapella,
};
use parking_lot::{Mutex, RwLock};
use proto_array::{DoNotReOrg, ProposerHeadError};
use safe_arith::SafeArith;
Expand All @@ -92,7 +95,6 @@ use slog::{crit, debug, error, info, trace, warn, Logger};
use slot_clock::SlotClock;
use ssz::Encode;
use state_processing::{
common::get_attesting_indices_from_state,
per_block_processing,
per_block_processing::{
errors::AttestationValidationError, get_expected_withdrawals,
Expand Down Expand Up @@ -167,6 +169,13 @@ const PREPARE_PROPOSER_HISTORIC_EPOCHS: u64 = 4;
/// impact whilst having 8 epochs without a block is a comfortable grace period.
const MAX_PER_SLOT_FORK_CHOICE_DISTANCE: u64 = 256;

/// The maximum number of aggregates per `AttestationData` to supply to Bron-Kerbosch (BK).
///
/// This value is chosen to be sufficient for ~16 aggregators on mainnet, and in practice should
/// never be reached. Higher values *could* lead to exponential blow-up in the running time of BK
/// if an attacker found a way to generate a lot of distinct aggregates.
const MAX_AGGREGATES_PER_DATA_FOR_CLIQUES: usize = 20;

/// Reported to the user when the justified block has an invalid execution payload.
pub const INVALID_JUSTIFIED_PAYLOAD_SHUTDOWN_REASON: &str =
"Justified block has an invalid execution payload.";
Expand Down Expand Up @@ -2235,15 +2244,16 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
pub fn filter_op_pool_attestation(
&self,
filter_cache: &mut HashMap<(Hash256, Epoch), bool>,
att: &AttestationRef<T::EthSpec>,
checkpoint: &CheckpointKey,
data: &CompactAttestationData,
state: &BeaconState<T::EthSpec>,
) -> bool {
*filter_cache
.entry((att.data.beacon_block_root, att.checkpoint.target_epoch))
.entry((data.beacon_block_root, checkpoint.target_epoch))
.or_insert_with(|| {
self.shuffling_is_compatible(
&att.data.beacon_block_root,
att.checkpoint.target_epoch,
&data.beacon_block_root,
checkpoint.target_epoch,
state,
)
})
Expand Down Expand Up @@ -4753,27 +4763,6 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
.op_pool
.get_bls_to_execution_changes(&state, &self.spec);

// Iterate through the naive aggregation pool and ensure all the attestations from there
// are included in the operation pool.
let unagg_import_timer =
metrics::start_timer(&metrics::BLOCK_PRODUCTION_UNAGGREGATED_TIMES);
for attestation in self.naive_aggregation_pool.read().iter() {
let import = |attestation: &Attestation<T::EthSpec>| {
let attesting_indices = get_attesting_indices_from_state(&state, attestation)?;
self.op_pool
.insert_attestation(attestation.clone(), attesting_indices)
};
if let Err(e) = import(attestation) {
// Don't stop block production if there's an error, just create a log.
error!(
self.log,
"Attestation did not transfer to op pool";
"reason" => ?e
);
}
}
drop(unagg_import_timer);

// Override the beacon node's graffiti with graffiti from the validator, if present.
let graffiti = match validator_graffiti {
Some(graffiti) => graffiti,
Expand All @@ -4783,21 +4772,38 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
let attestation_packing_timer =
metrics::start_timer(&metrics::BLOCK_PRODUCTION_ATTESTATION_TIMES);

let mut prev_filter_cache = HashMap::new();
let prev_attestation_filter = |att: &AttestationRef<T::EthSpec>| {
self.filter_op_pool_attestation(&mut prev_filter_cache, att, &state)
};
let mut curr_filter_cache = HashMap::new();
let curr_attestation_filter = |att: &AttestationRef<T::EthSpec>| {
self.filter_op_pool_attestation(&mut curr_filter_cache, att, &state)
};
let prev_chain = self.clone();
let prev_filter_cache_lock = Mutex::new(HashMap::new());
let prev_attestation_filter =
|checkpoint: &CheckpointKey, data: &CompactAttestationData| {
let mut prev_filter_cache = prev_filter_cache_lock.lock();
prev_chain.filter_op_pool_attestation(
&mut prev_filter_cache,
checkpoint,
data,
&state,
)
};
let curr_chain = self.clone();
let curr_filter_cache_lock = Mutex::new(HashMap::new());
let curr_attestation_filter =
|checkpoint: &CheckpointKey, data: &CompactAttestationData| {
let mut curr_filter_cache = curr_filter_cache_lock.lock();
curr_chain.filter_op_pool_attestation(
&mut curr_filter_cache,
checkpoint,
data,
&state,
)
};

let mut attestations = self
.op_pool
.get_attestations(
&state,
prev_attestation_filter,
curr_attestation_filter,
MAX_AGGREGATES_PER_DATA_FOR_CLIQUES,
&self.spec,
)
.map_err(BlockProductionError::OpPoolError)?;
Expand Down
4 changes: 0 additions & 4 deletions beacon_node/beacon_chain/src/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -121,10 +121,6 @@ lazy_static! {
"beacon_block_production_slot_process_seconds",
"Time taken to advance the state to the block production slot"
);
pub static ref BLOCK_PRODUCTION_UNAGGREGATED_TIMES: Result<Histogram> = try_create_histogram(
"beacon_block_production_unaggregated_seconds",
"Time taken to import the naive aggregation pool for block production"
);
pub static ref BLOCK_PRODUCTION_ATTESTATION_TIMES: Result<Histogram> = try_create_histogram(
"beacon_block_production_attestation_seconds",
"Time taken to pack attestations into a block"
Expand Down
14 changes: 14 additions & 0 deletions beacon_node/http_api/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1894,6 +1894,20 @@ pub fn serve<T: BeaconChainTypes>(
format!("Naive aggregation pool: {:?}", e),
));
}

if let Err(e) = chain.add_to_block_inclusion_pool(attestation) {
error!(log,
"Failure adding verified attestation to the operation pool";
"error" => ?e,
"request_index" => index,
"committee_index" => committee_index,
"slot" => slot,
);
failures.push(api_types::Failure::new(
index,
format!("Operation pool: {:?}", e),
));
}
}

if num_already_known > 0 {
Expand Down
10 changes: 10 additions & 0 deletions beacon_node/network/src/network_beacon_processor/gossip_methods.rs
Original file line number Diff line number Diff line change
Expand Up @@ -376,6 +376,16 @@ impl<T: BeaconChainTypes> NetworkBeaconProcessor<T> {
)
}

if let Err(e) = self.chain.add_to_block_inclusion_pool(verified_attestation) {
debug!(
self.log,
"Attestation invalid for op pool";
"reason" => ?e,
"peer" => %peer_id,
"beacon_block_root" => ?beacon_block_root,
);
}

metrics::inc_counter(
&metrics::BEACON_PROCESSOR_UNAGGREGATED_ATTESTATION_IMPORTED_TOTAL,
);
Expand Down
4 changes: 3 additions & 1 deletion beacon_node/operation_pool/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,13 @@ serde = { workspace = true }
store = { workspace = true }
bitvec = { workspace = true }
rand = { workspace = true }
rpds = { workspace = true }

[dev-dependencies]
beacon_chain = { workspace = true }
tokio = { workspace = true }
maplit = { workspace = true }
quickcheck = { workspace = true }

[features]
portable = ["beacon_chain/portable"]
portable = ["beacon_chain/portable"]
8 changes: 7 additions & 1 deletion beacon_node/operation_pool/src/attestation.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use crate::attestation_storage::AttestationRef;
use crate::attestation_storage::{AttestationRef, CompactAttestationData};
use crate::max_cover::MaxCover;
use crate::reward_cache::RewardCache;
use state_processing::common::{
Expand Down Expand Up @@ -127,6 +127,7 @@ impl<'a, T: EthSpec> MaxCover for AttMaxCover<'a, T> {
type Object = Attestation<T>;
type Intermediate = AttestationRef<'a, T>;
type Set = HashMap<u64, u64>;
type Key = CompactAttestationData;

fn intermediate(&self) -> &AttestationRef<'a, T> {
&self.att
Expand Down Expand Up @@ -164,6 +165,10 @@ impl<'a, T: EthSpec> MaxCover for AttMaxCover<'a, T> {
fn score(&self) -> usize {
self.fresh_validators_rewards.values().sum::<u64>() as usize
}

fn key(&self) -> CompactAttestationData {
self.att.data.clone()
}
}

/// Extract the validators for which `attestation` would be their earliest in the epoch.
Expand All @@ -188,6 +193,7 @@ pub fn earliest_attestation_validators<T: EthSpec>(
} else if attestation.checkpoint.target_epoch == state.previous_epoch() {
&base_state.previous_epoch_attestations
} else {
#[allow(clippy::unwrap_used)]
return BitList::with_capacity(0).unwrap();
};

Expand Down
Loading