Skip to content

Commit

Permalink
Notify lookup sync of gossip processing results (#5722)
Browse files Browse the repository at this point in the history
Squashed commit of the following:

commit e6c8955
Author: dapplion <35266934+dapplion@users.noreply.github.com>
Date:   Mon May 6 22:52:48 2024 +0900

    Notify lookup sync of gossip processing results
  • Loading branch information
michaelsproul committed May 7, 2024
1 parent 01f63ef commit 833ce42
Show file tree
Hide file tree
Showing 10 changed files with 193 additions and 123 deletions.
2 changes: 1 addition & 1 deletion beacon_node/beacon_chain/src/data_availability_checker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ impl<T: BeaconChainTypes> DataAvailabilityChecker<T> {
})
}

/// Checks if the block root is currenlty in the availability cache awaiting processing because
/// Checks if the block root is currenlty in the availability cache awaiting import because
/// of missing components.
pub fn has_block(&self, block_root: &Hash256) -> bool {
self.availability_cache.has_block(block_root)
Expand Down
26 changes: 17 additions & 9 deletions beacon_node/network/src/network_beacon_processor/gossip_methods.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,10 @@ use crate::{
metrics,
network_beacon_processor::{InvalidBlockStorage, NetworkBeaconProcessor},
service::NetworkMessage,
sync::SyncMessage,
sync::{
manager::{BlockProcessSource, BlockProcessType},
SyncMessage,
},
};
use beacon_chain::blob_verification::{GossipBlobError, GossipVerifiedBlob};
use beacon_chain::block_verification_types::AsBlock;
Expand Down Expand Up @@ -1187,19 +1190,18 @@ impl<T: BeaconChainTypes> NetworkBeaconProcessor<T> {
"block_root" => %block_root,
);
}
Err(BlockError::ParentUnknown(block)) => {
// Inform the sync manager to find parents for this block
// This should not occur. It should be checked by `should_forward_block`
Err(BlockError::ParentUnknown(_)) => {
// This should not occur. It should be checked by `should_forward_block`.
// Do not send sync message UnknownParentBlock to prevent conflicts with the
// BlockComponentProcessed message below. If this error ever happens, lookup sync
// can recover by receiving another block / blob / attestation referencing the
// chain that includes this block.
error!(
self.log,
"Block with unknown parent attempted to be processed";
"block_root" => %block_root,
"peer_id" => %peer_id
);
self.send_sync_message(SyncMessage::UnknownParentBlock(
peer_id,
block.clone(),
block_root,
));
}
Err(ref e @ BlockError::ExecutionPayloadError(ref epe)) if !epe.penalize_peer() => {
debug!(
Expand Down Expand Up @@ -1263,6 +1265,12 @@ impl<T: BeaconChainTypes> NetworkBeaconProcessor<T> {
&self.log,
);
}

self.send_sync_message(SyncMessage::BlockComponentProcessed {
process_type: BlockProcessType::SingleBlock,
source: BlockProcessSource::Gossip(block_root),
result: result.into(),
});
}

pub fn process_gossip_voluntary_exit(
Expand Down
17 changes: 7 additions & 10 deletions beacon_node/network/src/network_beacon_processor/mod.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use crate::{
service::NetworkMessage,
sync::{manager::BlockProcessType, SyncMessage},
sync::{manager::BlockProcessSource, SyncMessage},
};
use beacon_chain::block_verification_types::RpcBlock;
use beacon_chain::{builder::Witness, eth1_chain::CachingEth1Backend, BeaconChain};
Expand Down Expand Up @@ -407,13 +407,13 @@ impl<T: BeaconChainTypes> NetworkBeaconProcessor<T> {
block_root: Hash256,
block: RpcBlock<T::EthSpec>,
seen_timestamp: Duration,
process_type: BlockProcessType,
source: BlockProcessSource,
) -> Result<(), Error<T::EthSpec>> {
let process_fn = self.clone().generate_rpc_beacon_block_process_fn(
block_root,
block,
seen_timestamp,
process_type,
source,
);
self.try_send(BeaconWorkEvent {
drop_during_sync: false,
Expand All @@ -428,18 +428,15 @@ impl<T: BeaconChainTypes> NetworkBeaconProcessor<T> {
block_root: Hash256,
blobs: FixedBlobSidecarList<T::EthSpec>,
seen_timestamp: Duration,
process_type: BlockProcessType,
source: BlockProcessSource,
) -> Result<(), Error<T::EthSpec>> {
let blob_count = blobs.iter().filter(|b| b.is_some()).count();
if blob_count == 0 {
return Ok(());
}
let process_fn = self.clone().generate_rpc_blobs_process_fn(
block_root,
blobs,
seen_timestamp,
process_type,
);
let process_fn =
self.clone()
.generate_rpc_blobs_process_fn(block_root, blobs, seen_timestamp, source);
self.try_send(BeaconWorkEvent {
drop_during_sync: false,
work: Work::RpcBlobs { process_fn },
Expand Down
50 changes: 26 additions & 24 deletions beacon_node/network/src/network_beacon_processor/sync_methods.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
use crate::metrics;
use crate::network_beacon_processor::{NetworkBeaconProcessor, FUTURE_SLOT_TOLERANCE};
use crate::sync::manager::BlockProcessSource;
use crate::sync::BatchProcessResult;
use crate::sync::{
manager::{BlockProcessType, SyncMessage},
Expand Down Expand Up @@ -53,7 +54,7 @@ impl<T: BeaconChainTypes> NetworkBeaconProcessor<T> {
block_root: Hash256,
block: RpcBlock<T::EthSpec>,
seen_timestamp: Duration,
process_type: BlockProcessType,
source: BlockProcessSource,
) -> AsyncFn {
let process_fn = async move {
let reprocess_tx = self.reprocess_tx.clone();
Expand All @@ -62,7 +63,7 @@ impl<T: BeaconChainTypes> NetworkBeaconProcessor<T> {
block_root,
block,
seen_timestamp,
process_type,
source,
reprocess_tx,
duplicate_cache,
)
Expand All @@ -77,20 +78,21 @@ impl<T: BeaconChainTypes> NetworkBeaconProcessor<T> {
block_root: Hash256,
block: RpcBlock<T::EthSpec>,
seen_timestamp: Duration,
process_type: BlockProcessType,
source: BlockProcessSource,
) -> (AsyncFn, BlockingFn) {
// An async closure which will import the block.
let process_fn = self.clone().generate_rpc_beacon_block_process_fn(
block_root,
block,
seen_timestamp,
process_type.clone(),
source.clone(),
);
// A closure which will ignore the block.
let ignore_fn = move || {
// Sync handles these results
self.send_sync_message(SyncMessage::BlockComponentProcessed {
process_type,
process_type: BlockProcessType::SingleBlock,
source,
result: crate::sync::manager::BlockProcessingResult::Ignored,
});
};
Expand All @@ -104,7 +106,7 @@ impl<T: BeaconChainTypes> NetworkBeaconProcessor<T> {
block_root: Hash256,
block: RpcBlock<T::EthSpec>,
seen_timestamp: Duration,
process_type: BlockProcessType,
source: BlockProcessSource,
reprocess_tx: mpsc::Sender<ReprocessQueueMessage>,
duplicate_cache: DuplicateCache,
) {
Expand All @@ -115,15 +117,15 @@ impl<T: BeaconChainTypes> NetworkBeaconProcessor<T> {
"Gossip block is being processed";
"action" => "sending rpc block to reprocessing queue",
"block_root" => %block_root,
"process_type" => ?process_type,
"source" => ?source,
);

// Send message to work reprocess queue to retry the block
let (process_fn, ignore_fn) = self.clone().generate_rpc_beacon_block_fns(
block_root,
block,
seen_timestamp,
process_type,
source,
);
let reprocess_msg = ReprocessQueueMessage::RpcBlock(QueuedRpcBlock {
beacon_block_root: block_root,
Expand All @@ -148,7 +150,7 @@ impl<T: BeaconChainTypes> NetworkBeaconProcessor<T> {
"proposer" => block.message().proposer_index(),
"slot" => block.slot(),
"commitments" => commitments_formatted,
"process_type" => ?process_type,
"source" => ?source,
);

let result = self
Expand All @@ -170,21 +172,20 @@ impl<T: BeaconChainTypes> NetworkBeaconProcessor<T> {
if reprocess_tx.try_send(reprocess_msg).is_err() {
error!(self.log, "Failed to inform block import"; "source" => "rpc", "block_root" => %hash)
};
if matches!(process_type, BlockProcessType::SingleBlock { .. }) {
self.chain.block_times_cache.write().set_time_observed(
hash,
slot,
seen_timestamp,
None,
None,
);
self.chain.block_times_cache.write().set_time_observed(
hash,
slot,
seen_timestamp,
None,
None,
);

self.chain.recompute_head_at_current_slot().await;
}
self.chain.recompute_head_at_current_slot().await;
}
// Sync handles these results
self.send_sync_message(SyncMessage::BlockComponentProcessed {
process_type,
process_type: BlockProcessType::SingleBlock,
source,
result: result.into(),
});

Expand All @@ -201,11 +202,11 @@ impl<T: BeaconChainTypes> NetworkBeaconProcessor<T> {
block_root: Hash256,
blobs: FixedBlobSidecarList<T::EthSpec>,
seen_timestamp: Duration,
process_type: BlockProcessType,
source: BlockProcessSource,
) -> AsyncFn {
let process_fn = async move {
self.clone()
.process_rpc_blobs(block_root, blobs, seen_timestamp, process_type)
.process_rpc_blobs(block_root, blobs, seen_timestamp, source)
.await;
};
Box::pin(process_fn)
Expand All @@ -217,7 +218,7 @@ impl<T: BeaconChainTypes> NetworkBeaconProcessor<T> {
block_root: Hash256,
blobs: FixedBlobSidecarList<T::EthSpec>,
seen_timestamp: Duration,
process_type: BlockProcessType,
source: BlockProcessSource,
) {
let Some(slot) = blobs
.iter()
Expand Down Expand Up @@ -298,7 +299,8 @@ impl<T: BeaconChainTypes> NetworkBeaconProcessor<T> {

// Sync handles these results
self.send_sync_message(SyncMessage::BlockComponentProcessed {
process_type,
process_type: BlockProcessType::SingleBlob,
source,
result: result.into(),
});
}
Expand Down
21 changes: 8 additions & 13 deletions beacon_node/network/src/sync/block_lookups/common.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,8 @@ use crate::sync::block_lookups::single_block_lookup::{
LookupRequestError, SingleBlockLookup, SingleLookupRequestState,
};
use crate::sync::block_lookups::{BlobRequestState, BlockRequestState, PeerId};
use crate::sync::manager::{BlockProcessType, Id, SLOT_IMPORT_TOLERANCE};
use crate::sync::network_context::SyncNetworkContext;
use crate::sync::manager::{Id, SLOT_IMPORT_TOLERANCE};
use crate::sync::network_context::{LookupRequestResult, SyncNetworkContext};
use beacon_chain::block_verification_types::RpcBlock;
use beacon_chain::BeaconChainTypes;
use std::sync::Arc;
Expand Down Expand Up @@ -45,7 +45,7 @@ pub trait RequestState<T: BeaconChainTypes> {
peer_id: PeerId,
downloaded_block_expected_blobs: Option<usize>,
cx: &mut SyncNetworkContext<T>,
) -> Result<bool, LookupRequestError>;
) -> Result<LookupRequestResult, LookupRequestError>;

/* Response handling methods */

Expand Down Expand Up @@ -80,7 +80,7 @@ impl<T: BeaconChainTypes> RequestState<T> for BlockRequestState<T::EthSpec> {
peer_id: PeerId,
_: Option<usize>,
cx: &mut SyncNetworkContext<T>,
) -> Result<bool, LookupRequestError> {
) -> Result<LookupRequestResult, LookupRequestError> {
cx.block_lookup_request(id, peer_id, self.requested_block_root)
.map_err(LookupRequestError::SendFailed)
}
Expand All @@ -97,10 +97,10 @@ impl<T: BeaconChainTypes> RequestState<T> for BlockRequestState<T::EthSpec> {
peer_id: _,
} = download_result;
cx.send_block_for_processing(
id,
block_root,
RpcBlock::new_without_blobs(Some(block_root), value),
seen_timestamp,
BlockProcessType::SingleBlock { id },
)
.map_err(LookupRequestError::SendFailed)
}
Expand Down Expand Up @@ -128,7 +128,7 @@ impl<T: BeaconChainTypes> RequestState<T> for BlobRequestState<T::EthSpec> {
peer_id: PeerId,
downloaded_block_expected_blobs: Option<usize>,
cx: &mut SyncNetworkContext<T>,
) -> Result<bool, LookupRequestError> {
) -> Result<LookupRequestResult, LookupRequestError> {
cx.blob_lookup_request(
id,
peer_id,
Expand All @@ -149,13 +149,8 @@ impl<T: BeaconChainTypes> RequestState<T> for BlobRequestState<T::EthSpec> {
seen_timestamp,
peer_id: _,
} = download_result;
cx.send_blobs_for_processing(
block_root,
value,
seen_timestamp,
BlockProcessType::SingleBlob { id },
)
.map_err(LookupRequestError::SendFailed)
cx.send_blobs_for_processing(id, block_root, value, seen_timestamp)
.map_err(LookupRequestError::SendFailed)
}

fn response_type() -> ResponseType {
Expand Down
Loading

0 comments on commit 833ce42

Please sign in to comment.