Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement block announces in full node #2143

Merged
merged 8 commits into from
Mar 18, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
102 changes: 78 additions & 24 deletions bin/full-node/src/run/consensus_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ use crate::run::{database_thread, jaeger_service, network_service};

use core::num::NonZeroU32;
use futures::{lock::Mutex, prelude::*};
use hashbrown::HashSet;
use smoldot::{
author,
chain::chain_information,
Expand Down Expand Up @@ -925,6 +926,7 @@ impl SyncBackground {
all::ProcessOne::VerifyBodyHeader(verify) => {
let hash_to_verify = verify.hash();
let height_to_verify = verify.height();
let scale_encoded_header_to_verify = verify.scale_encoded_header().to_owned(); // TODO: copy :-/

let span = tracing::debug_span!(
"block-verification",
Expand Down Expand Up @@ -957,40 +959,92 @@ impl SyncBackground {
break;
}
all::BlockVerification::Success {
is_new_best: true,
is_new_best,
sync: sync_out,
..
} => {
span.record("outcome", &"success");
span.record("is_new_best", &true);
span.record("is_new_best", &is_new_best);

// Processing has made a step forward.

// Update the networking.
let fut = self.network_service.set_local_best_block(
self.network_chain_index,
sync_out.best_block_hash(),
sync_out.best_block_number(),
);
fut.await;
if is_new_best {
// Update the networking.
let fut = self.network_service.set_local_best_block(
self.network_chain_index,
sync_out.best_block_hash(),
sync_out.best_block_number(),
);
fut.await;

// Reset the block authoring, in order to potentially build a
// block on top of this new best.
self.block_authoring = None;

// Update the externally visible best block state.
let mut lock = self.sync_state.lock().await;
lock.best_block_hash = sync_out.best_block_hash();
lock.best_block_number = sync_out.best_block_number();
drop(lock);
}

// Reset the block authoring, in order to potentially build a
// block on top of this new best.
self.block_authoring = None;
self.sync = sync_out;

// Update the externally visible best block state.
let mut lock = self.sync_state.lock().await;
lock.best_block_hash = sync_out.best_block_hash();
lock.best_block_number = sync_out.best_block_number();
drop(lock);
// Announce the newly-verified block to all the sources that might
// not be aware of it. We can never be guaranteed that a certain
// source does *not* know about a block, however it is not a big
// problem to send a block announce to a source that already knows
// about that block. For this reason, the list of sources we send
// the block announce to is `all_sources - sources_that_know_it`.
//
// Note that not sending block announces to sources that already
// know that block means that these sources might also miss the
// fact that our local best block has been updated. This is in
// practice not a problem either.
let sources_to_announce_to = {
let mut all_sources =
self.sync
.sources()
.collect::<HashSet<_, fnv::FnvBuildHasher>>();
for knows in self.sync.knows_non_finalized_block(
height_to_verify,
&hash_to_verify,
) {
all_sources.remove(&knows);
melekes marked this conversation as resolved.
Show resolved Hide resolved
}
all_sources
};

for source_id in sources_to_announce_to {
let peer_id = match &self.sync[source_id] {
Some(pid) => pid,
None => continue,
};

if self
.network_service
.clone()
.send_block_announce(
peer_id,
0,
&scale_encoded_header_to_verify,
is_new_best,
)
.await
.is_ok()
{
// Note that `try_add_known_block_to_source` might have
// no effect, which is not a problem considering that this
// block tracking is mostly about optimizations and
// politeness.
self.sync.try_add_known_block_to_source(
source_id,
height_to_verify,
hash_to_verify,
);
}
}

self.sync = sync_out;
break;
}
all::BlockVerification::Success { sync: sync_out, .. } => {
span.record("outcome", &"success");
span.record("is_new_best", &false);
self.sync = sync_out;
break;
}

Expand Down
14 changes: 14 additions & 0 deletions bin/full-node/src/run/network_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ use smoldot::{
async_std_connection, connection,
multiaddr::{Multiaddr, ProtocolRef},
peer_id::{self, PeerId},
peers,
},
network::{protocol, service},
};
Expand Down Expand Up @@ -705,6 +706,19 @@ impl NetworkService {
.await
}

pub async fn send_block_announce(
self: Arc<Self>,
target: &PeerId,
chain_index: usize,
scale_encoded_header: &[u8],
is_best: bool,
) -> Result<(), peers::QueueNotificationError> {
self.inner
.network
.send_block_announce(&target, chain_index, scale_encoded_header, is_best)
.await
}

/// Sends a blocks request to the given peer.
// TODO: more docs
// TODO: proper error type
Expand Down
2 changes: 2 additions & 0 deletions src/network/protocol/block_announces.rs
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@ impl Role {
}

/// Decoded block announcement notification.
// TODO: this has both the scale_encoded and decoded header, which makes it weird if you want to build the struct manually
#[derive(Debug)]
pub struct BlockAnnounceRef<'a> {
/// SCALE-encoded header in the announce. Same as [`BlockAnnounceRef::header`].
Expand All @@ -84,6 +85,7 @@ pub fn encode_block_announce(
.scale_encoding()
.map(either::Left)
.chain(iter::once(either::Right(is_best)))
.chain(iter::once(either::Right([0u8])))
}

/// Decodes a block announcement.
Expand Down
28 changes: 28 additions & 0 deletions src/network/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -796,6 +796,34 @@ where
protocol::decode_call_proof_response(&response).map_err(CallProofRequestError::Decode)
}

// TODO: there this extra parameter in block announces that is unused on many chains but not always
pub async fn send_block_announce(
&self,
target: &peer_id::PeerId,
chain_index: usize,
scale_encoded_header: &[u8],
is_best: bool,
) -> Result<(), QueueNotificationError> {
let buffers_to_send = protocol::encode_block_announce(protocol::BlockAnnounceRef {
scale_encoded_header,
is_best,
header: header::decode(scale_encoded_header).unwrap(), // TODO: hack
melekes marked this conversation as resolved.
Show resolved Hide resolved
});

let notification = buffers_to_send.fold(Vec::new(), |mut a, b| {
a.extend_from_slice(b.as_ref());
a
});

self.inner
.queue_notification(
target,
chain_index * NOTIFICATIONS_PROTOCOLS_PER_CHAIN,
notification,
)
.await
}

///
///
/// Must be passed the SCALE-encoded transaction.
Expand Down
41 changes: 41 additions & 0 deletions src/sync/all.rs
Original file line number Diff line number Diff line change
Expand Up @@ -724,6 +724,40 @@ impl<TRq, TSrc, TBl> AllSync<TRq, TSrc, TBl> {
}
}

/// Try register a new block that the source is aware of.
///
/// Some syncing strategies do not track blocks known to sources, in which case this function
/// has no effect
///
/// Has no effect if `height` is inferior or equal to the finalized block height, or if the
/// source was already known to know this block.
///
/// The block does not need to be known by the data structure.
///
/// This is automatically done for the blocks added through block announces or block requests..
///
/// # Panic
///
/// Panics if the [`SourceId`] is out of range.
///
pub fn try_add_known_block_to_source(
&mut self,
source_id: SourceId,
height: u64,
hash: [u8; 32],
) {
debug_assert!(self.shared.sources.contains(source_id.0));
match (
&mut self.inner,
self.shared.sources.get(source_id.0).unwrap(),
) {
(AllSyncInner::AllForks(sync), SourceMapping::AllForks(src)) => {
sync.add_known_block_to_source(*src, height, hash)
}
_ => {}
}
}

/// Returns the details of a request to start towards a source.
///
/// This method doesn't modify the state machine in any way. [`AllSync::add_request`] must be
Expand Down Expand Up @@ -1985,6 +2019,13 @@ impl<TRq, TSrc, TBl> HeaderBodyVerify<TRq, TSrc, TBl> {
}
}

/// Returns the SCALE-encoded header of the block about to be verified.
pub fn scale_encoded_header(&self) -> &[u8] {
match &self.inner {
HeaderBodyVerifyInner::Optimistic(verify) => verify.scale_encoded_header(),
}
}

/// Start the verification process.
pub fn start(
self,
Expand Down
20 changes: 20 additions & 0 deletions src/sync/all_forks.rs
Original file line number Diff line number Diff line change
Expand Up @@ -398,6 +398,26 @@ impl<TBl, TRq, TSrc> AllForksSync<TBl, TRq, TSrc> {
self.inner.blocks.knows_non_finalized_block(height, hash)
}

/// Registers a new block that the source is aware of.
///
/// Has no effect if `height` is inferior or equal to the finalized block height, or if the
/// source was already known to know this block.
///
/// The block does not need to be known by the data structure.
///
/// This is automatically done for the blocks added through [`AllForksSync::block_announce`],
/// [`AllForksSync::prepare_add_source`] or [`FinishAncestrySearch::add_block`].
///
/// # Panic
///
/// Panics if the [`SourceId`] is out of range.
///
pub fn add_known_block_to_source(&mut self, source_id: SourceId, height: u64, hash: [u8; 32]) {
self.inner
.blocks
.add_known_block_to_source(source_id, height, hash);
}

/// Returns the current best block of the given source.
///
/// This corresponds either the latest call to [`AllForksSync::block_announce`] where
Expand Down
6 changes: 3 additions & 3 deletions src/sync/optimistic.rs
Original file line number Diff line number Diff line change
Expand Up @@ -723,12 +723,12 @@ impl<TRq, TSrc, TBl> BlockVerify<TRq, TSrc, TBl> {
/// Returns the height of the block about to be verified.
pub fn height(&self) -> u64 {
// TODO: unwrap?
header::decode(self.header()).unwrap().number
header::decode(self.scale_encoded_header()).unwrap().number
}

/// Returns the hash of the block about to be verified.
pub fn hash(&self) -> [u8; 32] {
header::hash_from_scale_encoded_header(self.header())
header::hash_from_scale_encoded_header(self.scale_encoded_header())
}

/// Returns true if [`Config::full`] was `Some` at initialization.
Expand All @@ -737,7 +737,7 @@ impl<TRq, TSrc, TBl> BlockVerify<TRq, TSrc, TBl> {
}

/// Returns the SCALE-encoded header of the block about to be verified.
fn header(&self) -> &[u8] {
pub fn scale_encoded_header(&self) -> &[u8] {
&self
.inner
.verification_queue
Expand Down