Skip to content

Commit

Permalink
Add consistency to header sync
Browse files Browse the repository at this point in the history
This PR fixes an edge case for header sync where the local node has a higher chain
header, but the remote node has a higher actual blockchain height. It also simplifies
the attempted header sync logic and error handling when a peer does not return headers
and introduces consistent naming, variable re-use and use of information.

When doing header sync, and determining from which peer to sync, we need to consider
our actual blockchain state when comparing chains. It is possible that our node has
valid block headers that will translate into a higher proof of work when fully synced
than the remote syncing peer, but lacking the blocks to back the block headers, thus
the remote chain has an actual higher proof of work blockchain.

With the current implementation, race conditions can exist when determining if the
peer is misbehaving, i.e. lying about their proof-of-work or not wanting to supply
block headers, due to a mismatch in tip height used for the check.

This PR fixes the race conditions by always using the latest local chain metadata
and block headers, ignoring all sync peers that do not exceed our own accumulated
difficulty and using consistent information in all the checks.

Add a header sync tests integration unit test.
  • Loading branch information
SWvheerden authored and hansieodendaal committed Oct 17, 2023
1 parent 9725fbd commit b5adc65
Show file tree
Hide file tree
Showing 15 changed files with 569 additions and 146 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Original file line number Diff line number Diff line change
Expand Up @@ -153,15 +153,15 @@ impl<B: BlockchainBackend + 'static> BaseNodeStateMachine<B> {
db.set_disable_add_block_flag();
HeaderSync(HeaderSyncState::new(sync_peers, local_metadata))
},
(HeaderSync(s), HeaderSyncFailed) => {
(HeaderSync(s), HeaderSyncFailed(_err)) => {
db.clear_disable_add_block_flag();
Waiting(s.into())
},
(HeaderSync(s), Continue | NetworkSilence) => {
db.clear_disable_add_block_flag();
Listening(s.into())
},
(HeaderSync(s), HeadersSynchronized(_)) => DecideNextSync(s.into()),
(HeaderSync(s), HeadersSynchronized(..)) => DecideNextSync(s.into()),

(DecideNextSync(_), ProceedToHorizonSync(peers)) => HorizonStateSync(peers.into()),
(DecideNextSync(s), Continue) => {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ use crate::base_node::{
Starting,
Waiting,
},
sync::{HorizonSyncInfo, SyncPeer},
sync::{AttemptSyncResult, HorizonSyncInfo, SyncPeer},
};

#[derive(Debug)]
Expand All @@ -57,8 +57,8 @@ pub enum BaseNodeState {
#[derive(Debug, Clone, PartialEq)]
pub enum StateEvent {
Initialized,
HeadersSynchronized(SyncPeer),
HeaderSyncFailed,
HeadersSynchronized(SyncPeer, AttemptSyncResult),
HeaderSyncFailed(String),
ProceedToHorizonSync(Vec<SyncPeer>),
ProceedToBlockSync(Vec<SyncPeer>),
HorizonStateSynchronized,
Expand Down Expand Up @@ -145,8 +145,8 @@ impl Display for StateEvent {
match self {
Initialized => write!(f, "Initialized"),
BlocksSynchronized => write!(f, "Synchronised Blocks"),
HeadersSynchronized(peer) => write!(f, "Headers Synchronized from peer `{}`", peer),
HeaderSyncFailed => write!(f, "Header Synchronization Failed"),
HeadersSynchronized(peer, result) => write!(f, "Headers Synchronized from peer `{}` ({:?})", peer, result),
HeaderSyncFailed(err) => write!(f, "Header Synchronization Failed ({})", err),
ProceedToHorizonSync(_) => write!(f, "Proceed to horizon sync"),
ProceedToBlockSync(_) => write!(f, "Proceed to block sync"),
HorizonStateSynchronized => write!(f, "Horizon State Synchronized"),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ use std::{cmp::Ordering, time::Instant};

use log::*;
use tari_common_types::chain_metadata::ChainMetadata;
use tari_comms::peer_manager::NodeId;

use crate::{
base_node::{
Expand All @@ -35,7 +36,6 @@ use crate::{
},
chain_storage::BlockchainBackend,
};

const LOG_TARGET: &str = "c::bn::header_sync";

#[derive(Clone, Debug)]
Expand Down Expand Up @@ -70,12 +70,42 @@ impl HeaderSyncState {
self.sync_peers
}

fn remove_sync_peer(&mut self, node_id: &NodeId) {
if let Some(pos) = self.sync_peers.iter().position(|p| p.node_id() == node_id) {
self.sync_peers.remove(pos);
}
}

// converting u64 to i64 is okay as the future time limit is the hundreds so way below u32 even
#[allow(clippy::too_many_lines)]
#[allow(clippy::cast_possible_wrap)]
pub async fn next_event<B: BlockchainBackend + 'static>(
&mut self,
shared: &mut BaseNodeStateMachine<B>,
) -> StateEvent {
// Only sync to peers with better claimed accumulated difficulty than the local chain: this may be possible
// at this stage due to read-write lock race conditions in the database
match shared.db.get_chain_metadata().await {
Ok(best_block_metadata) => {
let mut remove = Vec::new();
for sync_peer in &self.sync_peers {
if sync_peer.claimed_chain_metadata().accumulated_difficulty() <=
best_block_metadata.accumulated_difficulty()
{
remove.push(sync_peer.node_id().clone());
}
}
for node_id in remove {
self.remove_sync_peer(&node_id);
}
if self.sync_peers.is_empty() {
// Go back to Listening state
return StateEvent::Continue;
}
},
Err(e) => return StateEvent::FatalError(format!("{}", e)),
}

let mut synchronizer = HeaderSynchronizer::new(
shared.config.blockchain_sync_config.clone(),
shared.db.clone(),
Expand Down Expand Up @@ -128,7 +158,7 @@ impl HeaderSyncState {
let mut mdc = vec![];
log_mdc::iter(|k, v| mdc.push((k.to_owned(), v.to_owned())));
match synchronizer.synchronize().await {
Ok(sync_peer) => {
Ok((sync_peer, sync_result)) => {
log_mdc::extend(mdc);
info!(
target: LOG_TARGET,
Expand All @@ -144,9 +174,10 @@ impl HeaderSyncState {
}
}
self.is_synced = true;
StateEvent::HeadersSynchronized(sync_peer)
StateEvent::HeadersSynchronized(sync_peer, sync_result)
},
Err(err) => {
println!("HeaderSyncState::next_event - {}", err);
let _ignore = shared.status_event_sender.send(StatusInfo {
bootstrapped,
state_info: StateInfo::SyncFailed("HeaderSyncFailed".to_string()),
Expand All @@ -163,7 +194,7 @@ impl HeaderSyncState {
_ => {
log_mdc::extend(mdc);
debug!(target: LOG_TARGET, "Header sync failed: {}", err);
StateEvent::HeaderSyncFailed
StateEvent::HeaderSyncFailed(err.to_string())
},
}
},
Expand Down
2 changes: 1 addition & 1 deletion base_layer/core/src/base_node/sync/header_sync/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,4 +26,4 @@ pub use error::BlockHeaderSyncError;
mod validator;

mod synchronizer;
pub use synchronizer::HeaderSynchronizer;
pub use synchronizer::{AttemptSyncResult, HeaderSyncStatus, HeaderSynchronizer};
Loading

0 comments on commit b5adc65

Please sign in to comment.