Skip to content

Commit

Permalink
more simplification
Browse files Browse the repository at this point in the history
  • Loading branch information
hansieodendaal committed Oct 23, 2023
1 parent 7591bda commit 4eeb873
Show file tree
Hide file tree
Showing 3 changed files with 194 additions and 77 deletions.
131 changes: 76 additions & 55 deletions base_layer/core/src/base_node/sync/header_sync/synchronizer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -251,12 +251,12 @@ impl<'a, B: BlockchainBackend + 'static> HeaderSynchronizer<'a, B> {
// Fetch best local data at the beginning of the sync process
let best_block_metadata = self.db.get_chain_metadata().await?;
let best_header = self.db.fetch_last_chain_header().await?;
let best_block = self
let best_block_header = self
.db
.fetch_chain_header(best_block_metadata.height_of_longest_chain())
.await?;
let best_header_height = best_header.height();
let best_block_height = best_block.height();
let best_block_height = best_block_header.height();

if best_header_height < best_block_height ||
best_block_height < self.local_cached_metadata.height_of_longest_chain()
Expand All @@ -268,34 +268,13 @@ impl<'a, B: BlockchainBackend + 'static> HeaderSynchronizer<'a, B> {

// - At this point we may have more (InSyncOrAhead), equal (InSyncOrAhead), or less headers (Lagging) than the
// peer, but they claimed better POW before we attempted sync.
// - If the peer responds with a lesser POW than what they previously claimed, this method will return a
// ban-able error.
// - If the peer lied about the start hash index or sent an invalid remote tip height, this method will return a
// ban-able error.
// - This method will return ban-able errors for certain offenses.
let (header_sync_status, peer_response) = self
.determine_sync_status(sync_peer, best_block.clone(), &mut client)
.determine_sync_status(sync_peer, best_header.clone(), best_block_header.clone(), &mut client)
.await?;

match header_sync_status.clone() {
HeaderSyncStatus::InSyncOrAhead => {
// Our POW is less than the peer's POW, as verified before the attempted header sync, therefore, if the
// peer did not supply any headers and we know we are behind based on the peer's claimed metadata, then
// we can ban the peer.
if best_header.height() == best_block.height() {
warn!(
target: LOG_TARGET,
"Peer `{}` did not provide any headers although they have a better chain and more headers: their \
difficulty: {}, our difficulty: {}. Peer will be banned.",
sync_peer.node_id(),
sync_peer.claimed_chain_metadata().accumulated_difficulty(),
best_block.accumulated_data().total_accumulated_difficulty,
);
return Err(BlockHeaderSyncError::PeerSentInaccurateChainMetadata {
claimed: sync_peer.claimed_chain_metadata().accumulated_difficulty(),
actual: None,
local: best_block.accumulated_data().total_accumulated_difficulty,
});
}
debug!(
target: LOG_TARGET,
"Headers are in sync at height {} but tip is {}. Proceeding to archival/pruned block sync",
Expand Down Expand Up @@ -330,9 +309,10 @@ impl<'a, B: BlockchainBackend + 'static> HeaderSynchronizer<'a, B> {
}
}

#[allow(clippy::too_many_lines)]
async fn find_chain_split(
&mut self,
peer: &NodeId,
peer_node_id: &NodeId,
client: &mut rpc::BaseNodeSyncRpcClient,
header_count: u64,
) -> Result<FindChainSplitResult, BlockHeaderSyncError> {
Expand All @@ -347,7 +327,13 @@ impl<'a, B: BlockchainBackend + 'static> HeaderSynchronizer<'a, B> {
loop {
iter_count += 1;
if iter_count > MAX_CHAIN_SPLIT_ITERS {
return Err(BlockHeaderSyncError::ChainSplitNotFound(peer.clone()));
warn!(
target: LOG_TARGET,
"Peer `{}` did not provide a chain split after {} headers requested. Peer will be banned.",
peer_node_id,
NUM_CHAIN_SPLIT_HEADERS * MAX_CHAIN_SPLIT_ITERS,
);
return Err(BlockHeaderSyncError::ChainSplitNotFound(peer_node_id.clone()));
}

let block_hashes = self
Expand All @@ -359,13 +345,19 @@ impl<'a, B: BlockchainBackend + 'static> HeaderSynchronizer<'a, B> {
"Determining if chain splits between {} and {} headers back from the tip (peer: `{}`, {} hashes sent)",
offset,
offset + NUM_CHAIN_SPLIT_HEADERS,
peer,
peer_node_id,
block_hashes.len()
);

// No further hashes to send.
if block_hashes.is_empty() {
return Err(BlockHeaderSyncError::ChainSplitNotFound(peer.clone()));
warn!(
target: LOG_TARGET,
"Peer `{}` did not provide a chain split after {} headers requested. Peer will be banned.",
peer_node_id,
NUM_CHAIN_SPLIT_HEADERS * MAX_CHAIN_SPLIT_ITERS,
);
return Err(BlockHeaderSyncError::ChainSplitNotFound(peer_node_id.clone()));
}

let request = FindChainSplitRequest {
Expand All @@ -379,7 +371,13 @@ impl<'a, B: BlockchainBackend + 'static> HeaderSynchronizer<'a, B> {
// This round we sent less hashes than the max, so the next round will not have any more hashes to
// send. Exit early in this case.
if block_hashes.len() < NUM_CHAIN_SPLIT_HEADERS {
return Err(BlockHeaderSyncError::ChainSplitNotFound(peer.clone()));
warn!(
target: LOG_TARGET,
"Peer `{}` did not provide a chain split after {} headers requested. Peer will be banned.",
peer_node_id,
NUM_CHAIN_SPLIT_HEADERS * MAX_CHAIN_SPLIT_ITERS,
);
return Err(BlockHeaderSyncError::ChainSplitNotFound(peer_node_id.clone()));
}
// Chain split not found, let's go further back
offset = NUM_CHAIN_SPLIT_HEADERS * iter_count;
Expand All @@ -390,26 +388,48 @@ impl<'a, B: BlockchainBackend + 'static> HeaderSynchronizer<'a, B> {
},
};
if resp.headers.len() > NUM_INITIAL_HEADERS_TO_REQUEST {
warn!(
target: LOG_TARGET,
"Peer `{}` sent too many headers {}, only requested {}. Peer will be banned.",
peer_node_id,
resp.headers.len(),
NUM_INITIAL_HEADERS_TO_REQUEST,
);
return Err(BlockHeaderSyncError::PeerSentTooManyHeaders(resp.headers.len()));
}
if resp.fork_hash_index >= block_hashes.len() as u64 {
warn!(
target: LOG_TARGET,
"Peer `{}` sent hash index {} out of range {}. Peer will be banned.",
peer_node_id,
resp.fork_hash_index,
block_hashes.len(),
);
return Err(BlockHeaderSyncError::FoundHashIndexOutOfRange(
block_hashes.len() as u64,
resp.fork_hash_index,
));
}
#[allow(clippy::cast_possible_truncation)]
if !resp.headers.is_empty() && resp.headers[0].prev_hash != block_hashes[resp.fork_hash_index as usize] {
warn!(
target: LOG_TARGET,
"Peer `{}` sent hash an invalid protocol response, incorrect fork hash index {}. Peer will be banned.",
peer_node_id,
resp.fork_hash_index,
);
return Err(BlockHeaderSyncError::InvalidProtocolResponse(
"Peer sent incorrect fork hash index".into(),
));
}
#[allow(clippy::cast_possible_truncation)]
let chain_split_hash = block_hashes[resp.fork_hash_index as usize];

return Ok(FindChainSplitResult {
block_hashes,
reorg_steps_back: resp.fork_hash_index.saturating_add(offset as u64),
peer_headers: resp.headers,
peer_fork_hash_index: resp.fork_hash_index,
chain_split_hash,
});
}
}
Expand All @@ -422,9 +442,11 @@ impl<'a, B: BlockchainBackend + 'static> HeaderSynchronizer<'a, B> {
async fn determine_sync_status(
&mut self,
sync_peer: &SyncPeer,
best_header: ChainHeader,
best_block_header: ChainHeader,
client: &mut rpc::BaseNodeSyncRpcClient,
) -> Result<(HeaderSyncStatus, FindChainSplitResult), BlockHeaderSyncError> {
// This method will return ban-able errors for certain offenses.
let chain_split_result = self
.find_chain_split(sync_peer.node_id(), client, NUM_INITIAL_HEADERS_TO_REQUEST as u64)
.await?;
Expand All @@ -441,6 +463,24 @@ impl<'a, B: BlockchainBackend + 'static> HeaderSynchronizer<'a, B> {
// If the peer returned no new headers, they may still have more blocks than we have, thus have a higher
// accumulated difficulty.
if chain_split_result.peer_headers.is_empty() {
// Our POW is less than the peer's POW, as verified before the attempted header sync, therefore, if the
// peer did not supply any headers and we know we are behind based on the peer's claimed metadata, then
// we can ban the peer.
if best_header.height() == best_block_header.height() {
warn!(
target: LOG_TARGET,
"Peer `{}` did not provide any headers although they have a better chain and more headers: their \
difficulty: {}, our difficulty: {}. Peer will be banned.",
sync_peer.node_id(),
sync_peer.claimed_chain_metadata().accumulated_difficulty(),
best_block_header.accumulated_data().total_accumulated_difficulty,
);
return Err(BlockHeaderSyncError::PeerSentInaccurateChainMetadata {
claimed: sync_peer.claimed_chain_metadata().accumulated_difficulty(),
actual: None,
local: best_block_header.accumulated_data().total_accumulated_difficulty,
});
}
debug!(target: LOG_TARGET, "Peer `{}` sent no headers; headers already in sync with peer.", sync_peer.node_id());
return Ok((HeaderSyncStatus::InSyncOrAhead, chain_split_result));
}
Expand All @@ -454,28 +494,9 @@ impl<'a, B: BlockchainBackend + 'static> HeaderSynchronizer<'a, B> {
.map_err(BlockHeaderSyncError::ReceivedInvalidHeader)?;
let num_new_headers = headers.len();

// NOTE: We can trust that the header associated with this hash exists because `block_hashes` was supplied by
// this node. Bounds checking for fork_hash_index has been done above in 'find_chain_split'.
#[allow(clippy::cast_possible_truncation)]
let chain_split_hash = match chain_split_result
.block_hashes
.get(chain_split_result.peer_fork_hash_index as usize)
{
Some(val) => val,
None => {
error!(
target: LOG_TARGET,
"Internal database inconsistency found - hash index `{}` does not exist",
chain_split_result.peer_fork_hash_index
);
return Err(BlockHeaderSyncError::InternalError(format!(
"Has index {} does not exist",
chain_split_result.peer_fork_hash_index
)));
},
};

self.header_validator.initialize_state(chain_split_hash).await?;
self.header_validator
.initialize_state(&chain_split_result.chain_split_hash)
.await?;
for header in headers {
debug!(
target: LOG_TARGET,
Expand All @@ -495,7 +516,7 @@ impl<'a, B: BlockchainBackend + 'static> HeaderSynchronizer<'a, B> {
let chain_split_info = ChainSplitInfo {
best_block_header,
reorg_steps_back: chain_split_result.reorg_steps_back,
chain_split_hash: *chain_split_hash,
chain_split_hash: chain_split_result.chain_split_hash,
};
Ok((
HeaderSyncStatus::Lagging(Box::new(chain_split_info)),
Expand Down Expand Up @@ -810,10 +831,10 @@ impl<'a, B: BlockchainBackend + 'static> HeaderSynchronizer<'a, B> {

#[derive(Debug, Clone)]
struct FindChainSplitResult {
block_hashes: Vec<HashOutput>,
reorg_steps_back: u64,
peer_headers: Vec<ProtoBlockHeader>,
peer_fork_hash_index: u64,
chain_split_hash: HashOutput,
}

/// Information about the chain split from the remote node.
Expand Down
10 changes: 2 additions & 8 deletions base_layer/core/tests/helpers/sync.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,16 +33,10 @@ use crate::helpers::{

static EMISSION: [u64; 2] = [10, 10];

pub async fn sync_headers(
alice_state_machine: &mut BaseNodeStateMachine<TempDatabase>,
pub fn sync_headers_initialize_with_ping_pong_data(
alice_node: &NodeInterfaces,
bob_node: &NodeInterfaces,
) -> StateEvent {
let mut header_sync = sync_headers_initialize(alice_node, bob_node);
sync_headers_execute(alice_state_machine, &mut header_sync).await
}

pub fn sync_headers_initialize(alice_node: &NodeInterfaces, bob_node: &NodeInterfaces) -> HeaderSyncState {
) -> HeaderSyncState {
HeaderSyncState::new(
vec![SyncPeer::from(PeerChainMetadata::new(
bob_node.node_identity.node_id().clone(),
Expand Down
Loading

0 comments on commit 4eeb873

Please sign in to comment.