Skip to content

Commit

Permalink
add unagg attestations to op_pool during processing. fmt
Browse files Browse the repository at this point in the history
  • Loading branch information
GeemoCandama committed Jul 28, 2023
1 parent 023f290 commit a171c59
Show file tree
Hide file tree
Showing 5 changed files with 112 additions and 97 deletions.
21 changes: 0 additions & 21 deletions beacon_node/beacon_chain/src/beacon_chain.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4344,27 +4344,6 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
.op_pool
.get_bls_to_execution_changes(&state, &self.spec);

// Iterate through the naive aggregation pool and ensure all the attestations from there
// are included in the operation pool.
let unagg_import_timer =
metrics::start_timer(&metrics::BLOCK_PRODUCTION_UNAGGREGATED_TIMES);
for attestation in self.naive_aggregation_pool.read().iter() {
let import = |attestation: &Attestation<T::EthSpec>| {
let attesting_indices = get_attesting_indices_from_state(&state, attestation)?;
self.op_pool
.insert_attestation(attestation.clone(), attesting_indices)
};
if let Err(e) = import(attestation) {
// Don't stop block production if there's an error, just create a log.
error!(
self.log,
"Attestation did not transfer to op pool";
"reason" => ?e
);
}
}
drop(unagg_import_timer);

// Override the beacon node's graffiti with graffiti from the validator, if present.
let graffiti = match validator_graffiti {
Some(graffiti) => graffiti,
Expand Down
14 changes: 14 additions & 0 deletions beacon_node/http_api/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1608,6 +1608,20 @@ pub fn serve<T: BeaconChainTypes>(
format!("Naive aggregation pool: {:?}", e),
));
}

if let Err(e) = chain.add_to_block_inclusion_pool(attestation) {
error!(log,
"Failure adding verified attestation to the operation pool";
"error" => ?e,
"request_index" => index,
"committee_index" => committee_index,
"slot" => slot,
);
failures.push(api_types::Failure::new(
index,
format!("Operation pool: {:?}", e),
));
}
}

if num_already_known > 0 {
Expand Down
10 changes: 10 additions & 0 deletions beacon_node/network/src/network_beacon_processor/gossip_methods.rs
Original file line number Diff line number Diff line change
Expand Up @@ -372,6 +372,16 @@ impl<T: BeaconChainTypes> NetworkBeaconProcessor<T> {
)
}

if let Err(e) = self.chain.add_to_block_inclusion_pool(verified_attestation) {
debug!(
self.log,
"Attestation invalid for op pool";
"reason" => ?e,
"peer" => %peer_id,
"beacon_block_root" => ?beacon_block_root,
);
}

metrics::inc_counter(
&metrics::BEACON_PROCESSOR_UNAGGREGATED_ATTESTATION_IMPORTED_TOTAL,
);
Expand Down
7 changes: 4 additions & 3 deletions beacon_node/operation_pool/src/attestation_storage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,8 @@ pub struct AttestationMap<T: EthSpec> {
#[derive(Debug, Default, PartialEq)]
pub struct AttestationDataMap<T: EthSpec> {
pub aggregate_attestations: HashMap<CompactAttestationData, Vec<CompactIndexedAttestation<T>>>,
pub unaggregate_attestations: HashMap<CompactAttestationData, Vec<CompactIndexedAttestation<T>>>,
pub unaggregate_attestations:
HashMap<CompactAttestationData, Vec<CompactIndexedAttestation<T>>>,
}

impl<T: EthSpec> SplitAttestation<T> {
Expand Down Expand Up @@ -202,12 +203,12 @@ impl<T: EthSpec> AttestationMap<T> {
self.checkpoint_map
.retain(|checkpoint_key, _| current_epoch <= checkpoint_key.target_epoch + 1);
}

pub fn get_attestation_map(
&self,
checkpoint_key: &CheckpointKey,
) -> Option<&AttestationDataMap<T>> {
self.checkpoint_map.get(checkpoint_key)
self.checkpoint_map.get(checkpoint_key)
}

/// Statistics about all attestations stored in the map.
Expand Down
157 changes: 84 additions & 73 deletions beacon_node/operation_pool/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ mod sync_aggregate_id;

pub use crate::bls_to_execution_changes::ReceivedPreCapella;
pub use attestation::{earliest_attestation_validators, AttMaxCover};
use attestation_storage::{CompactIndexedAttestation, CompactAttestationData, AttestationDataMap};
use attestation_storage::{AttestationDataMap, CompactAttestationData, CompactIndexedAttestation};
pub use attestation_storage::{AttestationRef, SplitAttestation};
use bron_kerbosch::bron_kerbosch;
pub use max_cover::MaxCover;
Expand Down Expand Up @@ -241,7 +241,7 @@ impl<T: EthSpec> OperationPool<T> {
AttMaxCover::new(att, state, reward_cache, total_active_balance, spec)
})
}

#[allow(clippy::too_many_arguments)]
fn get_clique_aggregate_attestations_for_epoch<'a>(
&'a self,
Expand All @@ -251,72 +251,76 @@ impl<T: EthSpec> OperationPool<T> {
mut validity_filter: impl FnMut(&AttestationRef<'a, T>) -> bool + Send,
num_valid: &mut i64,
spec: &'a ChainSpec,
) -> Vec<(&CompactAttestationData, CompactIndexedAttestation<T>)> {
) -> Vec<(&CompactAttestationData, CompactIndexedAttestation<T>)> {
let mut cliqued_atts: Vec<(&CompactAttestationData, CompactIndexedAttestation<T>)> = vec![];
if let Some(AttestationDataMap { aggregate_attestations, unaggregate_attestations })
= all_attestations.get_attestation_map(checkpoint_key) {
if let Some(AttestationDataMap {
aggregate_attestations,
unaggregate_attestations,
}) = all_attestations.get_attestation_map(checkpoint_key)
{
for (data, aggregates) in aggregate_attestations {
if data.slot + spec.min_attestation_inclusion_delay <= state.slot()
&& state.slot() <= data.slot + T::slots_per_epoch() {
&& state.slot() <= data.slot + T::slots_per_epoch()
{
let aggregates: Vec<&CompactIndexedAttestation<T>> = aggregates
.iter()
.filter(|indexed| validity_filter(&AttestationRef {
.filter(|indexed| {
validity_filter(&AttestationRef {
checkpoint: checkpoint_key,
data: &data,
indexed
}))
indexed,
})
})
.collect();
*num_valid += aggregates.len() as i64;

let cliques = bron_kerbosch(&aggregates, is_compatible);

// This assumes that the values from bron_kerbosch are valid indices of
// aggregates.
let mut clique_aggregates = cliques
.iter()
.map(|clique| {
let mut res_att = aggregates[clique[0]].clone();
for ind in clique.iter().skip(1) {
res_att.aggregate(&aggregates[*ind]);
}
res_att
});
let mut clique_aggregates = cliques.iter().map(|clique| {
let mut res_att = aggregates[clique[0]].clone();
for ind in clique.iter().skip(1) {
res_att.aggregate(&aggregates[*ind]);
}
res_att
});

if let Some(unaggregate_attestations) = unaggregate_attestations.get(&data) {
for attestation in unaggregate_attestations
.iter()
.filter(|indexed| validity_filter(&AttestationRef {
for attestation in unaggregate_attestations.iter().filter(|indexed| {
validity_filter(&AttestationRef {
checkpoint: checkpoint_key,
data: &data,
indexed
}))
{
indexed,
})
}) {
*num_valid += 1;
for mut clique_aggregate in &mut clique_aggregates {
if !clique_aggregate.attesting_indices.contains(&attestation.attesting_indices[0]) {
if !clique_aggregate
.attesting_indices
.contains(&attestation.attesting_indices[0])
{
clique_aggregate.aggregate(attestation);
}
}
}
}

cliqued_atts.extend(
clique_aggregates
.map(|indexed| (data, indexed))
);
cliqued_atts.extend(clique_aggregates.map(|indexed| (data, indexed)));
}
}
for (data, attestations) in unaggregate_attestations {
if data.slot + spec.min_attestation_inclusion_delay <= state.slot()
&& state.slot() <= data.slot + T::slots_per_epoch() {
&& state.slot() <= data.slot + T::slots_per_epoch()
{
if !aggregate_attestations.contains_key(&data) {
let mut valid_attestations = attestations
.iter()
.filter(|indexed| validity_filter(&AttestationRef {
let mut valid_attestations = attestations.iter().filter(|indexed| {
validity_filter(&AttestationRef {
checkpoint: checkpoint_key,
data: &data,
indexed
}));
indexed,
})
});

if let Some(first) = valid_attestations.next() {
let mut agg = first.clone();
Expand Down Expand Up @@ -364,50 +368,51 @@ impl<T: EthSpec> OperationPool<T> {
let mut num_prev_valid = 0_i64;
let mut num_curr_valid = 0_i64;

let prev_cliqued_atts = if prev_epoch_key != curr_epoch_key {
let prev_cliqued_atts = if prev_epoch_key != curr_epoch_key {
self.get_clique_aggregate_attestations_for_epoch(
&prev_epoch_key,
&*all_attestations,
state,
prev_epoch_validity_filter,
&mut num_prev_valid,
spec
&prev_epoch_key,
&*all_attestations,
state,
prev_epoch_validity_filter,
&mut num_prev_valid,
spec,
)
} else {
vec![]
};

let prev_epoch_cliqued_atts: Vec<AttMaxCover<T>> = prev_cliqued_atts.iter()
.map(|(data, indexed)| AttestationRef {
checkpoint: &prev_epoch_key,
data,
indexed,
})
.filter_map(|att| {
AttMaxCover::new(att, state, &reward_cache, total_active_balance, spec)
})
.collect();
let prev_epoch_cliqued_atts: Vec<AttMaxCover<T>> = prev_cliqued_atts
.iter()
.map(|(data, indexed)| AttestationRef {
checkpoint: &prev_epoch_key,
data,
indexed,
})
.filter_map(|att| {
AttMaxCover::new(att, state, &reward_cache, total_active_balance, spec)
})
.collect();

let curr_cliqued_atts = self.get_clique_aggregate_attestations_for_epoch(
&curr_epoch_key,
&*all_attestations,
state,
curr_epoch_validity_filter,
&mut num_curr_valid,
spec
&curr_epoch_key,
&*all_attestations,
state,
curr_epoch_validity_filter,
&mut num_curr_valid,
spec,
);

let curr_epoch_cliqued_atts: Vec<AttMaxCover<T>> = curr_cliqued_atts.iter()
.map(|(data, indexed)| AttestationRef {
checkpoint: &prev_epoch_key,
data,
indexed,
})
.filter_map(|att| {
AttMaxCover::new(att, state, &reward_cache, total_active_balance, spec)
})
.collect();

let curr_epoch_cliqued_atts: Vec<AttMaxCover<T>> = curr_cliqued_atts
.iter()
.map(|(data, indexed)| AttestationRef {
checkpoint: &prev_epoch_key,
data,
indexed,
})
.filter_map(|att| {
AttMaxCover::new(att, state, &reward_cache, total_active_balance, spec)
})
.collect();

let prev_epoch_limit = if let BeaconState::Base(base_state) = state {
std::cmp::min(
Expand All @@ -426,7 +431,11 @@ impl<T: EthSpec> OperationPool<T> {
if prev_epoch_key == curr_epoch_key {
vec![]
} else {
maximum_cover(prev_epoch_cliqued_atts, prev_epoch_limit, "prev_epoch_attestations")
maximum_cover(
prev_epoch_cliqued_atts,
prev_epoch_limit,
"prev_epoch_attestations",
)
}
},
move || {
Expand Down Expand Up @@ -822,13 +831,15 @@ impl<T: EthSpec> OperationPool<T> {
}
}

fn is_compatible<T: EthSpec>(x: &&CompactIndexedAttestation<T>, y: &&CompactIndexedAttestation<T>) -> bool {
fn is_compatible<T: EthSpec>(
x: &&CompactIndexedAttestation<T>,
y: &&CompactIndexedAttestation<T>,
) -> bool {
let x_attester_set: HashSet<_> = x.attesting_indices.iter().collect();
let y_attester_set: HashSet<_> = y.attesting_indices.iter().collect();
x_attester_set.is_disjoint(&y_attester_set)
}


/// Filter up to a maximum number of operations out of an iterator.
fn filter_limit_operations<'a, T: 'a, V: 'a, I, F, G>(
operations: I,
Expand Down

0 comments on commit a171c59

Please sign in to comment.