Skip to content

Commit

Permalink
Lenient duplicate checks on HTTP API for block publication (sigp#5574)
Browse files Browse the repository at this point in the history
* start splitting gossip verification

* WIP

* Gossip verify separate (#7)

* save

* save

* make ProvenancedBlock concrete

* delete into gossip verified block contents

* get rid of IntoBlobSidecar trait

* remove IntoGossipVerified trait

* get tests compiling

* don't check sidecar slashability in publish

* remove second publish closure

* drop blob bool. also prefer using message index over index of position in list

* Merge remote-tracking branch 'origin/unstable' into gossip-verify-separate

* Fix low-hanging tests

* Fix tests and clean up

* Clean up imports

* more cleanup

* Merge remote-tracking branch 'origin/unstable' into gossip-verify-separate

* Further refine behaviour and add tests

* Merge remote-tracking branch 'origin/unstable' into gossip-verify-separate

* Merge remote-tracking branch 'origin/unstable' into gossip-verify-separate

* Remove empty line

* Fix test (block is not fully imported just gossip verified)

* Merge remote-tracking branch 'origin/unstable' into gossip-verify-separate

* Update for unstable & use empty blob list

* Update comment

* Add test for duplicate block case

* Merge remote-tracking branch 'origin/unstable' into gossip-verify-separate

* Clarify unreachable case

* Fix another publish_block case

* Remove unreachable case in filter chain segment

* Revert unrelated blob optimisation

* Merge remote-tracking branch 'origin/unstable' into gossip-verify-separate

* Merge remote-tracking branch 'origin/unstable' into gossip-verify-separate

* Fix merge conflicts

* Merge remote-tracking branch 'origin/unstable' into gossip-verify-separate

* Fix some compilation issues. Impl is fucked though

* Support peerDAS

* Fix tests

* Merge remote-tracking branch 'origin/unstable' into gossip-verify-separate

* Fix conflict

* Merge remote-tracking branch 'origin/unstable' into gossip-verify-separate

* Address review comments

* Merge remote-tracking branch 'origin/unstable' into gossip-verify-separate
  • Loading branch information
michaelsproul authored Sep 24, 2024
1 parent 1447eeb commit 2792705
Show file tree
Hide file tree
Showing 21 changed files with 1,064 additions and 509 deletions.
47 changes: 33 additions & 14 deletions beacon_node/beacon_chain/src/beacon_chain.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2740,7 +2740,10 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
// If the block is relevant, add it to the filtered chain segment.
Ok(_) => filtered_chain_segment.push((block_root, block)),
// If the block is already known, simply ignore this block.
Err(BlockError::BlockIsAlreadyKnown(_)) => continue,
//
// Note that `check_block_relevancy` is incapable of returning
// `DuplicateImportStatusUnknown` so we don't need to handle that case here.
Err(BlockError::DuplicateFullyImported(_)) => continue,
// If the block is the genesis block, simply ignore this block.
Err(BlockError::GenesisBlock) => continue,
// If the block is is for a finalized slot, simply ignore this block.
Expand Down Expand Up @@ -2886,7 +2889,7 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
}
}
}
Err(BlockError::BlockIsAlreadyKnown(block_root)) => {
Err(BlockError::DuplicateFullyImported(block_root)) => {
debug!(self.log,
"Ignoring already known blocks while processing chain segment";
"block_root" => ?block_root);
Expand Down Expand Up @@ -2977,6 +2980,7 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
pub async fn process_gossip_blob(
self: &Arc<Self>,
blob: GossipVerifiedBlob<T>,
publish_fn: impl FnOnce() -> Result<(), BlockError>,
) -> Result<AvailabilityProcessingStatus, BlockError> {
let block_root = blob.block_root();

Expand All @@ -2987,7 +2991,7 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
.fork_choice_read_lock()
.contains_block(&block_root)
{
return Err(BlockError::BlockIsAlreadyKnown(blob.block_root()));
return Err(BlockError::DuplicateFullyImported(blob.block_root()));
}

// No need to process and import blobs beyond the PeerDAS epoch.
Expand All @@ -3003,7 +3007,9 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
}
}

let r = self.check_gossip_blob_availability_and_import(blob).await;
let r = self
.check_gossip_blob_availability_and_import(blob, publish_fn)
.await;
self.remove_notified(&block_root, r)
}

Expand All @@ -3012,6 +3018,7 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
pub async fn process_gossip_data_columns(
self: &Arc<Self>,
data_columns: Vec<GossipVerifiedDataColumn<T>>,
publish_fn: impl FnOnce() -> Result<(), BlockError>,
) -> Result<
(
AvailabilityProcessingStatus,
Expand All @@ -3037,11 +3044,16 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
.fork_choice_read_lock()
.contains_block(&block_root)
{
return Err(BlockError::BlockIsAlreadyKnown(block_root));
return Err(BlockError::DuplicateFullyImported(block_root));
}

let r = self
.check_gossip_data_columns_availability_and_import(slot, block_root, data_columns)
.check_gossip_data_columns_availability_and_import(
slot,
block_root,
data_columns,
publish_fn,
)
.await;
self.remove_notified_custody_columns(&block_root, r)
}
Expand All @@ -3061,7 +3073,7 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
.fork_choice_read_lock()
.contains_block(&block_root)
{
return Err(BlockError::BlockIsAlreadyKnown(block_root));
return Err(BlockError::DuplicateFullyImported(block_root));
}

// Reject RPC blobs referencing unknown parents. Otherwise we allow potentially invalid data
Expand Down Expand Up @@ -3127,7 +3139,7 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
.fork_choice_read_lock()
.contains_block(&block_root)
{
return Err(BlockError::BlockIsAlreadyKnown(block_root));
return Err(BlockError::DuplicateFullyImported(block_root));
}

// Reject RPC columns referencing unknown parents. Otherwise we allow potentially invalid data
Expand Down Expand Up @@ -3225,7 +3237,7 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
unverified_block: B,
notify_execution_layer: NotifyExecutionLayer,
block_source: BlockImportSource,
publish_fn: impl FnOnce() -> Result<(), BlockError> + Send + 'static,
publish_fn: impl FnOnce() -> Result<(), BlockError>,
) -> Result<AvailabilityProcessingStatus, BlockError> {
// Start the Prometheus timer.
let _full_timer = metrics::start_timer(&metrics::BLOCK_PROCESSING_TIMES);
Expand Down Expand Up @@ -3407,22 +3419,25 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
let availability = self
.data_availability_checker
.put_pending_executed_block(block)?;
self.process_availability(slot, availability).await
self.process_availability(slot, availability, || Ok(()))
.await
}

/// Checks if the provided blob can make any cached blocks available, and imports immediately
/// if so, otherwise caches the blob in the data availability checker.
async fn check_gossip_blob_availability_and_import(
self: &Arc<Self>,
blob: GossipVerifiedBlob<T>,
publish_fn: impl FnOnce() -> Result<(), BlockError>,
) -> Result<AvailabilityProcessingStatus, BlockError> {
let slot = blob.slot();
if let Some(slasher) = self.slasher.as_ref() {
slasher.accept_block_header(blob.signed_block_header());
}
let availability = self.data_availability_checker.put_gossip_blob(blob)?;

self.process_availability(slot, availability).await
self.process_availability(slot, availability, publish_fn)
.await
}

/// Checks if the provided data column can make any cached blocks available, and imports immediately
Expand All @@ -3432,6 +3447,7 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
slot: Slot,
block_root: Hash256,
data_columns: Vec<GossipVerifiedDataColumn<T>>,
publish_fn: impl FnOnce() -> Result<(), BlockError>,
) -> Result<
(
AvailabilityProcessingStatus,
Expand All @@ -3449,7 +3465,7 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
.data_availability_checker
.put_gossip_data_columns(slot, block_root, data_columns)?;

self.process_availability(slot, availability)
self.process_availability(slot, availability, publish_fn)
.await
.map(|result| (result, data_columns_to_publish))
}
Expand Down Expand Up @@ -3490,7 +3506,8 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
.data_availability_checker
.put_rpc_blobs(block_root, epoch, blobs)?;

self.process_availability(slot, availability).await
self.process_availability(slot, availability, || Ok(()))
.await
}

/// Checks if the provided columns can make any cached blocks available, and imports immediately
Expand Down Expand Up @@ -3538,7 +3555,7 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
custody_columns,
)?;

self.process_availability(slot, availability)
self.process_availability(slot, availability, || Ok(()))
.await
.map(|result| (result, data_columns_to_publish))
}
Expand All @@ -3551,9 +3568,11 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
self: &Arc<Self>,
slot: Slot,
availability: Availability<T::EthSpec>,
publish_fn: impl FnOnce() -> Result<(), BlockError>,
) -> Result<AvailabilityProcessingStatus, BlockError> {
match availability {
Availability::Available(block) => {
publish_fn()?;
// Block is fully available, import into fork choice
self.import_available_block(block).await
}
Expand Down
Loading

0 comments on commit 2792705

Please sign in to comment.