diff --git a/bin/full-node/src/run/consensus_service.rs b/bin/full-node/src/run/consensus_service.rs index 020cd16557..286c738ad4 100644 --- a/bin/full-node/src/run/consensus_service.rs +++ b/bin/full-node/src/run/consensus_service.rs @@ -28,6 +28,7 @@ use crate::run::{database_thread, jaeger_service, network_service}; use core::num::NonZeroU32; use futures::{lock::Mutex, prelude::*}; +use hashbrown::HashSet; use smoldot::{ author, chain::chain_information, @@ -925,6 +926,7 @@ impl SyncBackground { all::ProcessOne::VerifyBodyHeader(verify) => { let hash_to_verify = verify.hash(); let height_to_verify = verify.height(); + let scale_encoded_header_to_verify = verify.scale_encoded_header().to_owned(); // TODO: copy :-/ let span = tracing::debug_span!( "block-verification", @@ -957,40 +959,92 @@ impl SyncBackground { break; } all::BlockVerification::Success { - is_new_best: true, + is_new_best, sync: sync_out, .. } => { span.record("outcome", &"success"); - span.record("is_new_best", &true); + span.record("is_new_best", &is_new_best); // Processing has made a step forward. - // Update the networking. - let fut = self.network_service.set_local_best_block( - self.network_chain_index, - sync_out.best_block_hash(), - sync_out.best_block_number(), - ); - fut.await; + if is_new_best { + // Update the networking. + let fut = self.network_service.set_local_best_block( + self.network_chain_index, + sync_out.best_block_hash(), + sync_out.best_block_number(), + ); + fut.await; + + // Reset the block authoring, in order to potentially build a + // block on top of this new best. + self.block_authoring = None; + + // Update the externally visible best block state. + let mut lock = self.sync_state.lock().await; + lock.best_block_hash = sync_out.best_block_hash(); + lock.best_block_number = sync_out.best_block_number(); + drop(lock); + } - // Reset the block authoring, in order to potentially build a - // block on top of this new best. - self.block_authoring = None; + self.sync = sync_out; - // Update the externally visible best block state. - let mut lock = self.sync_state.lock().await; - lock.best_block_hash = sync_out.best_block_hash(); - lock.best_block_number = sync_out.best_block_number(); - drop(lock); + // Announce the newly-verified block to all the sources that might + // not be aware of it. We can never be guaranteed that a certain + // source does *not* know about a block, however it is not a big + // problem to send a block announce to a source that already knows + // about that block. For this reason, the list of sources we send + // the block announce to is `all_sources - sources_that_know_it`. + // + // Note that not sending block announces to sources that already + // know that block means that these sources might also miss the + // fact that our local best block has been updated. This is in + // practice not a problem either. + let sources_to_announce_to = { + let mut all_sources = + self.sync + .sources() + .collect::>(); + for knows in self.sync.knows_non_finalized_block( + height_to_verify, + &hash_to_verify, + ) { + all_sources.remove(&knows); + } + all_sources + }; + + for source_id in sources_to_announce_to { + let peer_id = match &self.sync[source_id] { + Some(pid) => pid, + None => continue, + }; + + if self + .network_service + .clone() + .send_block_announce( + peer_id, + 0, + &scale_encoded_header_to_verify, + is_new_best, + ) + .await + .is_ok() + { + // Note that `try_add_known_block_to_source` might have + // no effect, which is not a problem considering that this + // block tracking is mostly about optimizations and + // politeness. + self.sync.try_add_known_block_to_source( + source_id, + height_to_verify, + hash_to_verify, + ); + } + } - self.sync = sync_out; - break; - } - all::BlockVerification::Success { sync: sync_out, .. } => { - span.record("outcome", &"success"); - span.record("is_new_best", &false); - self.sync = sync_out; break; } diff --git a/bin/full-node/src/run/network_service.rs b/bin/full-node/src/run/network_service.rs index 5609f6d9d1..4d32887371 100644 --- a/bin/full-node/src/run/network_service.rs +++ b/bin/full-node/src/run/network_service.rs @@ -40,6 +40,7 @@ use smoldot::{ async_std_connection, connection, multiaddr::{Multiaddr, ProtocolRef}, peer_id::{self, PeerId}, + peers, }, network::{protocol, service}, }; @@ -705,6 +706,19 @@ impl NetworkService { .await } + pub async fn send_block_announce( + self: Arc, + target: &PeerId, + chain_index: usize, + scale_encoded_header: &[u8], + is_best: bool, + ) -> Result<(), peers::QueueNotificationError> { + self.inner + .network + .send_block_announce(&target, chain_index, scale_encoded_header, is_best) + .await + } + /// Sends a blocks request to the given peer. // TODO: more docs // TODO: proper error type diff --git a/src/network/protocol/block_announces.rs b/src/network/protocol/block_announces.rs index e87e7cc5e0..ef4484bb66 100644 --- a/src/network/protocol/block_announces.rs +++ b/src/network/protocol/block_announces.rs @@ -60,6 +60,7 @@ impl Role { } /// Decoded block announcement notification. +// TODO: this has both the scale_encoded and decoded header, which makes it weird if you want to build the struct manually #[derive(Debug)] pub struct BlockAnnounceRef<'a> { /// SCALE-encoded header in the announce. Same as [`BlockAnnounceRef::header`]. @@ -84,6 +85,7 @@ pub fn encode_block_announce( .scale_encoding() .map(either::Left) .chain(iter::once(either::Right(is_best))) + .chain(iter::once(either::Right([0u8]))) } /// Decodes a block announcement. diff --git a/src/network/service.rs b/src/network/service.rs index 63f1c39f35..7fca483170 100644 --- a/src/network/service.rs +++ b/src/network/service.rs @@ -796,6 +796,34 @@ where protocol::decode_call_proof_response(&response).map_err(CallProofRequestError::Decode) } + // TODO: there this extra parameter in block announces that is unused on many chains but not always + pub async fn send_block_announce( + &self, + target: &peer_id::PeerId, + chain_index: usize, + scale_encoded_header: &[u8], + is_best: bool, + ) -> Result<(), QueueNotificationError> { + let buffers_to_send = protocol::encode_block_announce(protocol::BlockAnnounceRef { + scale_encoded_header, + is_best, + header: header::decode(scale_encoded_header).unwrap(), // TODO: hack + }); + + let notification = buffers_to_send.fold(Vec::new(), |mut a, b| { + a.extend_from_slice(b.as_ref()); + a + }); + + self.inner + .queue_notification( + target, + chain_index * NOTIFICATIONS_PROTOCOLS_PER_CHAIN, + notification, + ) + .await + } + /// /// /// Must be passed the SCALE-encoded transaction. diff --git a/src/sync/all.rs b/src/sync/all.rs index 48d745b302..9a30088445 100644 --- a/src/sync/all.rs +++ b/src/sync/all.rs @@ -724,6 +724,40 @@ impl AllSync { } } + /// Try register a new block that the source is aware of. + /// + /// Some syncing strategies do not track blocks known to sources, in which case this function + /// has no effect + /// + /// Has no effect if `height` is inferior or equal to the finalized block height, or if the + /// source was already known to know this block. + /// + /// The block does not need to be known by the data structure. + /// + /// This is automatically done for the blocks added through block announces or block requests.. + /// + /// # Panic + /// + /// Panics if the [`SourceId`] is out of range. + /// + pub fn try_add_known_block_to_source( + &mut self, + source_id: SourceId, + height: u64, + hash: [u8; 32], + ) { + debug_assert!(self.shared.sources.contains(source_id.0)); + match ( + &mut self.inner, + self.shared.sources.get(source_id.0).unwrap(), + ) { + (AllSyncInner::AllForks(sync), SourceMapping::AllForks(src)) => { + sync.add_known_block_to_source(*src, height, hash) + } + _ => {} + } + } + /// Returns the details of a request to start towards a source. /// /// This method doesn't modify the state machine in any way. [`AllSync::add_request`] must be @@ -1985,6 +2019,13 @@ impl HeaderBodyVerify { } } + /// Returns the SCALE-encoded header of the block about to be verified. + pub fn scale_encoded_header(&self) -> &[u8] { + match &self.inner { + HeaderBodyVerifyInner::Optimistic(verify) => verify.scale_encoded_header(), + } + } + /// Start the verification process. pub fn start( self, diff --git a/src/sync/all_forks.rs b/src/sync/all_forks.rs index caaf7cdd84..2a94f8a7b7 100644 --- a/src/sync/all_forks.rs +++ b/src/sync/all_forks.rs @@ -398,6 +398,26 @@ impl AllForksSync { self.inner.blocks.knows_non_finalized_block(height, hash) } + /// Registers a new block that the source is aware of. + /// + /// Has no effect if `height` is inferior or equal to the finalized block height, or if the + /// source was already known to know this block. + /// + /// The block does not need to be known by the data structure. + /// + /// This is automatically done for the blocks added through [`AllForksSync::block_announce`], + /// [`AllForksSync::prepare_add_source`] or [`FinishAncestrySearch::add_block`]. + /// + /// # Panic + /// + /// Panics if the [`SourceId`] is out of range. + /// + pub fn add_known_block_to_source(&mut self, source_id: SourceId, height: u64, hash: [u8; 32]) { + self.inner + .blocks + .add_known_block_to_source(source_id, height, hash); + } + /// Returns the current best block of the given source. /// /// This corresponds either the latest call to [`AllForksSync::block_announce`] where diff --git a/src/sync/optimistic.rs b/src/sync/optimistic.rs index 701d078824..aaba9b4f3d 100644 --- a/src/sync/optimistic.rs +++ b/src/sync/optimistic.rs @@ -723,12 +723,12 @@ impl BlockVerify { /// Returns the height of the block about to be verified. pub fn height(&self) -> u64 { // TODO: unwrap? - header::decode(self.header()).unwrap().number + header::decode(self.scale_encoded_header()).unwrap().number } /// Returns the hash of the block about to be verified. pub fn hash(&self) -> [u8; 32] { - header::hash_from_scale_encoded_header(self.header()) + header::hash_from_scale_encoded_header(self.scale_encoded_header()) } /// Returns true if [`Config::full`] was `Some` at initialization. @@ -737,7 +737,7 @@ impl BlockVerify { } /// Returns the SCALE-encoded header of the block about to be verified. - fn header(&self) -> &[u8] { + pub fn scale_encoded_header(&self) -> &[u8] { &self .inner .verification_queue