Skip to content

Commit

Permalink
Implement block announces in full node (#2143)
Browse files Browse the repository at this point in the history
* Merge two blocks together

* Prepare the ground for the announcements code

* Implement the networking part

* Use correct list of sources for announcement

* Fix encode_block_announce

* Add try_add_known_block_to_source

* Docfix

Co-authored-by: mergify[bot] <37929162+mergify[bot]@users.noreply.github.com>
  • Loading branch information
tomaka and mergify[bot] committed Mar 18, 2022
1 parent 05d582a commit 8136f62
Show file tree
Hide file tree
Showing 7 changed files with 186 additions and 27 deletions.
102 changes: 78 additions & 24 deletions bin/full-node/src/run/consensus_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ use crate::run::{database_thread, jaeger_service, network_service};

use core::num::NonZeroU32;
use futures::{lock::Mutex, prelude::*};
use hashbrown::HashSet;
use smoldot::{
author,
chain::chain_information,
Expand Down Expand Up @@ -925,6 +926,7 @@ impl SyncBackground {
all::ProcessOne::VerifyBodyHeader(verify) => {
let hash_to_verify = verify.hash();
let height_to_verify = verify.height();
let scale_encoded_header_to_verify = verify.scale_encoded_header().to_owned(); // TODO: copy :-/

let span = tracing::debug_span!(
"block-verification",
Expand Down Expand Up @@ -957,40 +959,92 @@ impl SyncBackground {
break;
}
all::BlockVerification::Success {
is_new_best: true,
is_new_best,
sync: sync_out,
..
} => {
span.record("outcome", &"success");
span.record("is_new_best", &true);
span.record("is_new_best", &is_new_best);

// Processing has made a step forward.

// Update the networking.
let fut = self.network_service.set_local_best_block(
self.network_chain_index,
sync_out.best_block_hash(),
sync_out.best_block_number(),
);
fut.await;
if is_new_best {
// Update the networking.
let fut = self.network_service.set_local_best_block(
self.network_chain_index,
sync_out.best_block_hash(),
sync_out.best_block_number(),
);
fut.await;

// Reset the block authoring, in order to potentially build a
// block on top of this new best.
self.block_authoring = None;

// Update the externally visible best block state.
let mut lock = self.sync_state.lock().await;
lock.best_block_hash = sync_out.best_block_hash();
lock.best_block_number = sync_out.best_block_number();
drop(lock);
}

// Reset the block authoring, in order to potentially build a
// block on top of this new best.
self.block_authoring = None;
self.sync = sync_out;

// Update the externally visible best block state.
let mut lock = self.sync_state.lock().await;
lock.best_block_hash = sync_out.best_block_hash();
lock.best_block_number = sync_out.best_block_number();
drop(lock);
// Announce the newly-verified block to all the sources that might
// not be aware of it. We can never be guaranteed that a certain
// source does *not* know about a block, however it is not a big
// problem to send a block announce to a source that already knows
// about that block. For this reason, the list of sources we send
// the block announce to is `all_sources - sources_that_know_it`.
//
// Note that not sending block announces to sources that already
// know that block means that these sources might also miss the
// fact that our local best block has been updated. This is in
// practice not a problem either.
let sources_to_announce_to = {
let mut all_sources =
self.sync
.sources()
.collect::<HashSet<_, fnv::FnvBuildHasher>>();
for knows in self.sync.knows_non_finalized_block(
height_to_verify,
&hash_to_verify,
) {
all_sources.remove(&knows);
}
all_sources
};

for source_id in sources_to_announce_to {
let peer_id = match &self.sync[source_id] {
Some(pid) => pid,
None => continue,
};

if self
.network_service
.clone()
.send_block_announce(
peer_id,
0,
&scale_encoded_header_to_verify,
is_new_best,
)
.await
.is_ok()
{
// Note that `try_add_known_block_to_source` might have
// no effect, which is not a problem considering that this
// block tracking is mostly about optimizations and
// politeness.
self.sync.try_add_known_block_to_source(
source_id,
height_to_verify,
hash_to_verify,
);
}
}

self.sync = sync_out;
break;
}
all::BlockVerification::Success { sync: sync_out, .. } => {
span.record("outcome", &"success");
span.record("is_new_best", &false);
self.sync = sync_out;
break;
}

Expand Down
14 changes: 14 additions & 0 deletions bin/full-node/src/run/network_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ use smoldot::{
async_std_connection, connection,
multiaddr::{Multiaddr, ProtocolRef},
peer_id::{self, PeerId},
peers,
},
network::{protocol, service},
};
Expand Down Expand Up @@ -705,6 +706,19 @@ impl NetworkService {
.await
}

pub async fn send_block_announce(
self: Arc<Self>,
target: &PeerId,
chain_index: usize,
scale_encoded_header: &[u8],
is_best: bool,
) -> Result<(), peers::QueueNotificationError> {
self.inner
.network
.send_block_announce(&target, chain_index, scale_encoded_header, is_best)
.await
}

/// Sends a blocks request to the given peer.
// TODO: more docs
// TODO: proper error type
Expand Down
2 changes: 2 additions & 0 deletions src/network/protocol/block_announces.rs
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@ impl Role {
}

/// Decoded block announcement notification.
// TODO: this has both the scale_encoded and decoded header, which makes it weird if you want to build the struct manually
#[derive(Debug)]
pub struct BlockAnnounceRef<'a> {
/// SCALE-encoded header in the announce. Same as [`BlockAnnounceRef::header`].
Expand All @@ -84,6 +85,7 @@ pub fn encode_block_announce(
.scale_encoding()
.map(either::Left)
.chain(iter::once(either::Right(is_best)))
.chain(iter::once(either::Right([0u8])))
}

/// Decodes a block announcement.
Expand Down
28 changes: 28 additions & 0 deletions src/network/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -796,6 +796,34 @@ where
protocol::decode_call_proof_response(&response).map_err(CallProofRequestError::Decode)
}

// TODO: there this extra parameter in block announces that is unused on many chains but not always
pub async fn send_block_announce(
&self,
target: &peer_id::PeerId,
chain_index: usize,
scale_encoded_header: &[u8],
is_best: bool,
) -> Result<(), QueueNotificationError> {
let buffers_to_send = protocol::encode_block_announce(protocol::BlockAnnounceRef {
scale_encoded_header,
is_best,
header: header::decode(scale_encoded_header).unwrap(), // TODO: hack
});

let notification = buffers_to_send.fold(Vec::new(), |mut a, b| {
a.extend_from_slice(b.as_ref());
a
});

self.inner
.queue_notification(
target,
chain_index * NOTIFICATIONS_PROTOCOLS_PER_CHAIN,
notification,
)
.await
}

///
///
/// Must be passed the SCALE-encoded transaction.
Expand Down
41 changes: 41 additions & 0 deletions src/sync/all.rs
Original file line number Diff line number Diff line change
Expand Up @@ -724,6 +724,40 @@ impl<TRq, TSrc, TBl> AllSync<TRq, TSrc, TBl> {
}
}

/// Try register a new block that the source is aware of.
///
/// Some syncing strategies do not track blocks known to sources, in which case this function
/// has no effect
///
/// Has no effect if `height` is inferior or equal to the finalized block height, or if the
/// source was already known to know this block.
///
/// The block does not need to be known by the data structure.
///
/// This is automatically done for the blocks added through block announces or block requests..
///
/// # Panic
///
/// Panics if the [`SourceId`] is out of range.
///
pub fn try_add_known_block_to_source(
&mut self,
source_id: SourceId,
height: u64,
hash: [u8; 32],
) {
debug_assert!(self.shared.sources.contains(source_id.0));
match (
&mut self.inner,
self.shared.sources.get(source_id.0).unwrap(),
) {
(AllSyncInner::AllForks(sync), SourceMapping::AllForks(src)) => {
sync.add_known_block_to_source(*src, height, hash)
}
_ => {}
}
}

/// Returns the details of a request to start towards a source.
///
/// This method doesn't modify the state machine in any way. [`AllSync::add_request`] must be
Expand Down Expand Up @@ -1985,6 +2019,13 @@ impl<TRq, TSrc, TBl> HeaderBodyVerify<TRq, TSrc, TBl> {
}
}

/// Returns the SCALE-encoded header of the block about to be verified.
pub fn scale_encoded_header(&self) -> &[u8] {
match &self.inner {
HeaderBodyVerifyInner::Optimistic(verify) => verify.scale_encoded_header(),
}
}

/// Start the verification process.
pub fn start(
self,
Expand Down
20 changes: 20 additions & 0 deletions src/sync/all_forks.rs
Original file line number Diff line number Diff line change
Expand Up @@ -398,6 +398,26 @@ impl<TBl, TRq, TSrc> AllForksSync<TBl, TRq, TSrc> {
self.inner.blocks.knows_non_finalized_block(height, hash)
}

/// Registers a new block that the source is aware of.
///
/// Has no effect if `height` is inferior or equal to the finalized block height, or if the
/// source was already known to know this block.
///
/// The block does not need to be known by the data structure.
///
/// This is automatically done for the blocks added through [`AllForksSync::block_announce`],
/// [`AllForksSync::prepare_add_source`] or [`FinishAncestrySearch::add_block`].
///
/// # Panic
///
/// Panics if the [`SourceId`] is out of range.
///
pub fn add_known_block_to_source(&mut self, source_id: SourceId, height: u64, hash: [u8; 32]) {
self.inner
.blocks
.add_known_block_to_source(source_id, height, hash);
}

/// Returns the current best block of the given source.
///
/// This corresponds either the latest call to [`AllForksSync::block_announce`] where
Expand Down
6 changes: 3 additions & 3 deletions src/sync/optimistic.rs
Original file line number Diff line number Diff line change
Expand Up @@ -723,12 +723,12 @@ impl<TRq, TSrc, TBl> BlockVerify<TRq, TSrc, TBl> {
/// Returns the height of the block about to be verified.
pub fn height(&self) -> u64 {
// TODO: unwrap?
header::decode(self.header()).unwrap().number
header::decode(self.scale_encoded_header()).unwrap().number
}

/// Returns the hash of the block about to be verified.
pub fn hash(&self) -> [u8; 32] {
header::hash_from_scale_encoded_header(self.header())
header::hash_from_scale_encoded_header(self.scale_encoded_header())
}

/// Returns true if [`Config::full`] was `Some` at initialization.
Expand All @@ -737,7 +737,7 @@ impl<TRq, TSrc, TBl> BlockVerify<TRq, TSrc, TBl> {
}

/// Returns the SCALE-encoded header of the block about to be verified.
fn header(&self) -> &[u8] {
pub fn scale_encoded_header(&self) -> &[u8] {
&self
.inner
.verification_queue
Expand Down

0 comments on commit 8136f62

Please sign in to comment.