Skip to content

Commit

Permalink
fix(sync): adds extra checks for sync stream termination (#3927)
Browse files Browse the repository at this point in the history
Description
---
- adds extra checks in block sync RPC service for premature stream termination 
- uses rolling average for slow peer detection in header and horizon sync
- prevent more than one UTXO sync session per peer

Motivation and Context
---
Peers may end a sync session and quickly initiate a new one. Because the previous session could be still sending the batch of data for the previous session, the new session will be denied until the data has been fetched. This PR adds additional checks after loading data from the database but before sending it to check if the session has ended. However, this race condition still exists as it is inherent to the rule of only allowing one sync session per peer. In my tests with 600ms max permitted sync latency I did not manage to trigger the `Forbidden: Existing sync session found for this client. Only a single session is permitted` error, so this may still be an issue.  But these changes should lessen the chance of this happening. 

How Has This Been Tested?
---
Manually, existing tests
  • Loading branch information
sdbondi authored Mar 25, 2022
1 parent 3f28a93 commit dd544cb
Show file tree
Hide file tree
Showing 7 changed files with 119 additions and 43 deletions.
17 changes: 10 additions & 7 deletions base_layer/core/src/base_node/sync/block_sync/synchronizer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -188,6 +188,7 @@ impl<B: BlockchainBackend + 'static> BlockSynchronizer<B> {
mut client: rpc::BaseNodeSyncRpcClient,
max_latency: Duration,
) -> Result<(), BlockSyncError> {
info!(target: LOG_TARGET, "Starting block sync from peer {}", sync_peer);
self.hooks.call_on_starting_hook();

let tip_header = self.db.fetch_last_header().await?;
Expand Down Expand Up @@ -320,7 +321,7 @@ impl<B: BlockchainBackend + 'static> BlockSynchronizer<B> {
.await?;

// Average time between receiving blocks from the peer - used to detect a slow sync peer
let last_avg_latency = avg_latency.calculate_average();
let last_avg_latency = avg_latency.calculate_average_with_min_samples(5);
if let Some(latency) = last_avg_latency {
sync_peer.set_latency(latency);
}
Expand All @@ -342,12 +343,14 @@ impl<B: BlockchainBackend + 'static> BlockSynchronizer<B> {
block.accumulated_data().accumulated_sha_difficulty,
latency
);
if last_avg_latency.map(|avg| avg > max_latency).unwrap_or(false) {
return Err(BlockSyncError::MaxLatencyExceeded {
peer: sync_peer.node_id().clone(),
latency,
max_latency,
});
if let Some(avg_latency) = last_avg_latency {
if avg_latency > max_latency {
return Err(BlockSyncError::MaxLatencyExceeded {
peer: sync_peer.node_id().clone(),
latency: avg_latency,
max_latency,
});
}
}

current_block = Some(block);
Expand Down
19 changes: 13 additions & 6 deletions base_layer/core/src/base_node/sync/header_sync/synchronizer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ use crate::{
base_node::sync::{hooks::Hooks, rpc, BlockchainSyncConfig, SyncPeer},
blocks::{BlockHeader, ChainBlock, ChainHeader},
chain_storage::{async_db::AsyncBlockchainDb, BlockchainBackend},
common::rolling_avg::RollingAverageTime,
consensus::ConsensusManager,
proof_of_work::randomx_factory::RandomXFactory,
proto::{
Expand Down Expand Up @@ -553,6 +554,7 @@ impl<'a, B: BlockchainBackend + 'static> HeaderSynchronizer<'a, B> {
split_info: ChainSplitInfo,
max_latency: Duration,
) -> Result<(), BlockHeaderSyncError> {
info!(target: LOG_TARGET, "Starting header sync from peer {}", sync_peer);
const COMMIT_EVERY_N_HEADERS: usize = 1000;

let mut has_switched_to_new_chain = false;
Expand Down Expand Up @@ -626,8 +628,10 @@ impl<'a, B: BlockchainBackend + 'static> HeaderSynchronizer<'a, B> {
let mut last_sync_timer = Instant::now();

let mut last_total_accumulated_difficulty = 0;
let mut avg_latency = RollingAverageTime::new(20);
while let Some(header) = header_stream.next().await {
let latency = last_sync_timer.elapsed();
avg_latency.add_sample(latency);
let header = BlockHeader::try_from(header?).map_err(BlockHeaderSyncError::ReceivedInvalidHeader)?;
debug!(
target: LOG_TARGET,
Expand Down Expand Up @@ -672,12 +676,15 @@ impl<'a, B: BlockchainBackend + 'static> HeaderSynchronizer<'a, B> {
self.hooks
.call_on_progress_header_hooks(current_height, split_info.remote_tip_height, &sync_peer);

if latency > max_latency {
return Err(BlockHeaderSyncError::MaxLatencyExceeded {
peer: sync_peer.node_id().clone(),
latency,
max_latency,
});
let last_avg_latency = avg_latency.calculate_average_with_min_samples(5);
if let Some(avg_latency) = last_avg_latency {
if avg_latency > max_latency {
return Err(BlockHeaderSyncError::MaxLatencyExceeded {
peer: sync_peer.node_id().clone(),
latency: avg_latency,
max_latency,
});
}
}

last_sync_timer = Instant::now();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ use croaring::Bitmap;
use futures::{stream::FuturesUnordered, StreamExt};
use log::*;
use tari_common_types::types::{Commitment, HashDigest, RangeProofService};
use tari_comms::connectivity::ConnectivityRequester;
use tari_comms::{connectivity::ConnectivityRequester, peer_manager::NodeId};
use tari_crypto::{
commitment::HomomorphicCommitment,
tari_utilities::{hex::Hex, Hashable},
Expand All @@ -58,6 +58,7 @@ use crate::{
MmrTree,
PrunedOutput,
},
common::rolling_avg::RollingAverageTime,
consensus::ConsensusManager,
proto::base_node::{
sync_utxo as proto_sync_utxo,
Expand Down Expand Up @@ -236,6 +237,7 @@ impl<'a, B: BlockchainBackend + 'static> HorizonStateSynchronization<'a, B> {
client: &mut rpc::BaseNodeSyncRpcClient,
to_header: &BlockHeader,
) -> Result<(), HorizonSyncError> {
info!(target: LOG_TARGET, "Starting kernel sync from peer {}", sync_peer);
let local_num_kernels = self.db().fetch_mmr_size(MmrTree::Kernel).await?;

let remote_num_kernels = to_header.kernel_mmr_size;
Expand Down Expand Up @@ -288,8 +290,10 @@ impl<'a, B: BlockchainBackend + 'static> HorizonStateSynchronization<'a, B> {
let mut mmr_position = local_num_kernels;
let end = remote_num_kernels;
let mut last_sync_timer = Instant::now();
let mut avg_latency = RollingAverageTime::new(20);
while let Some(kernel) = kernel_stream.next().await {
let latency = last_sync_timer.elapsed();
avg_latency.add_sample(latency);
let kernel: TransactionKernel = kernel?.try_into().map_err(HorizonSyncError::ConversionError)?;
kernel
.verify_signature()
Expand Down Expand Up @@ -368,13 +372,7 @@ impl<'a, B: BlockchainBackend + 'static> HorizonStateSynchronization<'a, B> {
self.hooks.call_on_progress_horizon_hooks(info);
}

if latency > self.max_latency {
return Err(HorizonSyncError::MaxLatencyExceeded {
peer: sync_peer.node_id().clone(),
latency,
max_latency: self.max_latency,
});
}
self.check_latency(sync_peer.node_id(), &avg_latency)?;

last_sync_timer = Instant::now();
}
Expand All @@ -387,12 +385,27 @@ impl<'a, B: BlockchainBackend + 'static> HorizonStateSynchronization<'a, B> {
Ok(())
}

fn check_latency(&self, peer: &NodeId, avg_latency: &RollingAverageTime) -> Result<(), HorizonSyncError> {
if let Some(avg_latency) = avg_latency.calculate_average_with_min_samples(5) {
if avg_latency > self.max_latency {
return Err(HorizonSyncError::MaxLatencyExceeded {
peer: peer.clone(),
latency: avg_latency,
max_latency: self.max_latency,
});
}
}

Ok(())
}

async fn synchronize_outputs(
&mut self,
mut sync_peer: SyncPeer,
client: &mut rpc::BaseNodeSyncRpcClient,
to_header: &BlockHeader,
) -> Result<(), HorizonSyncError> {
info!(target: LOG_TARGET, "Starting output sync from peer {}", sync_peer);
let local_num_outputs = self.db().fetch_mmr_size(MmrTree::Utxo).await?;

let remote_num_outputs = to_header.output_mmr_size;
Expand Down Expand Up @@ -466,9 +479,11 @@ impl<'a, B: BlockchainBackend + 'static> HorizonStateSynchronization<'a, B> {
let mut witness_mmr = MerkleMountainRange::<HashDigest, _>::new(witness_pruned_set);
let mut constants = self.rules.consensus_constants(current_header.height()).clone();
let mut last_sync_timer = Instant::now();
let mut avg_latency = RollingAverageTime::new(20);

while let Some(response) = output_stream.next().await {
let latency = last_sync_timer.elapsed();
avg_latency.add_sample(latency);
let res: SyncUtxosResponse = response?;

if res.mmr_index != 0 && res.mmr_index != mmr_position {
Expand Down Expand Up @@ -658,13 +673,7 @@ impl<'a, B: BlockchainBackend + 'static> HorizonStateSynchronization<'a, B> {
self.hooks.call_on_progress_horizon_hooks(info);
}

if latency > self.max_latency {
return Err(HorizonSyncError::MaxLatencyExceeded {
peer: sync_peer.node_id().clone(),
latency,
max_latency: self.max_latency,
});
}
self.check_latency(sync_peer.node_id(), &avg_latency)?;

last_sync_timer = Instant::now();
}
Expand Down
40 changes: 35 additions & 5 deletions base_layer/core/src/base_node/sync/rpc/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -203,6 +203,14 @@ impl<B: BlockchainBackend + 'static> BaseNodeSyncService for BaseNodeSyncRpcServ
.await
.map_err(RpcStatus::log_internal_error(LOG_TARGET));

if tx.is_closed() {
debug!(
target: LOG_TARGET,
"Block sync session for peer '{}' terminated early", peer_node_id
);
break;
}

match blocks {
Ok(blocks) if blocks.is_empty() => {
break;
Expand All @@ -226,6 +234,10 @@ impl<B: BlockchainBackend + 'static> BaseNodeSyncService for BaseNodeSyncRpcServ

// Ensure task stops if the peer prematurely stops their RPC session
if utils::mpsc::send_all(&tx, blocks).await.is_err() {
debug!(
target: LOG_TARGET,
"Block sync session for peer '{}' terminated early", peer_node_id
);
break;
}
},
Expand Down Expand Up @@ -288,7 +300,7 @@ impl<B: BlockchainBackend + 'static> BaseNodeSyncService for BaseNodeSyncRpcServ
task::spawn(
async move {
// Move token into this task
let session_token = session_token;
let peer_node_id = session_token;
let iter = NonOverlappingIntegerPairIter::new(
start_header.height + 1,
start_header.height.saturating_add(count).saturating_add(1),
Expand All @@ -304,6 +316,13 @@ impl<B: BlockchainBackend + 'static> BaseNodeSyncService for BaseNodeSyncRpcServ
.await
.map_err(RpcStatus::log_internal_error(LOG_TARGET));

if tx.is_closed() {
debug!(
target: LOG_TARGET,
"Header sync session for peer '{}' terminated early", peer_node_id
);
break;
}
match headers {
Ok(headers) if headers.is_empty() => {
break;
Expand All @@ -325,7 +344,7 @@ impl<B: BlockchainBackend + 'static> BaseNodeSyncService for BaseNodeSyncRpcServ
metrics::active_sync_peers().dec();
debug!(
target: LOG_TARGET,
"Header sync round complete for peer `{}`.", session_token,
"Header sync round complete for peer `{}`.", peer_node_id,
);
}
.instrument(span),
Expand Down Expand Up @@ -453,6 +472,8 @@ impl<B: BlockchainBackend + 'static> BaseNodeSyncService for BaseNodeSyncRpcServ

let session_token = self.try_add_exclusive_session(peer_node_id).await?;
task::spawn(async move {
// Move session token into task
let peer_node_id = session_token;
while current_height <= end_height {
if tx.is_closed() {
break;
Expand All @@ -461,6 +482,15 @@ impl<B: BlockchainBackend + 'static> BaseNodeSyncService for BaseNodeSyncRpcServ
.fetch_kernels_in_block(current_header_hash.clone())
.await
.map_err(RpcStatus::log_internal_error(LOG_TARGET));

if tx.is_closed() {
debug!(
target: LOG_TARGET,
"Kernel sync session for peer '{}' terminated early", peer_node_id
);
break;
}

match res {
Ok(kernels) if kernels.is_empty() => {
let _ = tx
Expand Down Expand Up @@ -525,7 +555,7 @@ impl<B: BlockchainBackend + 'static> BaseNodeSyncService for BaseNodeSyncRpcServ
metrics::active_sync_peers().dec();
debug!(
target: LOG_TARGET,
"Kernel sync round complete for peer `{}`.", session_token,
"Kernel sync round complete for peer `{}`.", peer_node_id,
);
});
Ok(Streaming::new(rx))
Expand All @@ -546,9 +576,9 @@ impl<B: BlockchainBackend + 'static> BaseNodeSyncService for BaseNodeSyncRpcServ
req.include_deleted_bitmaps
);

let _session_token = self.try_add_exclusive_session(peer_node_id.clone()).await?;
let session_token = self.try_add_exclusive_session(peer_node_id.clone()).await?;
let (tx, rx) = mpsc::channel(200);
let task = SyncUtxosTask::new(self.db());
let task = SyncUtxosTask::new(self.db(), session_token);
task.run(request, tx).await?;

Ok(Streaming::new(rx))
Expand Down
35 changes: 28 additions & 7 deletions base_layer/core/src/base_node/sync/rpc/sync_utxos_task.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ use std::{sync::Arc, time::Instant};

use log::*;
use tari_comms::{
peer_manager::NodeId,
protocol::rpc::{Request, RpcStatus, RpcStatusResultExt},
utils,
};
Expand All @@ -42,21 +43,21 @@ const LOG_TARGET: &str = "c::base_node::sync_rpc::sync_utxo_task";

pub(crate) struct SyncUtxosTask<B> {
db: AsyncBlockchainDb<B>,
peer_node_id: Arc<NodeId>,
}

impl<B> SyncUtxosTask<B>
where B: BlockchainBackend + 'static
{
pub(crate) fn new(db: AsyncBlockchainDb<B>) -> Self {
Self { db }
pub(crate) fn new(db: AsyncBlockchainDb<B>, peer_node_id: Arc<NodeId>) -> Self {
Self { db, peer_node_id }
}

pub(crate) async fn run(
self,
request: Request<SyncUtxosRequest>,
mut tx: mpsc::Sender<Result<SyncUtxosResponse, RpcStatus>>,
) -> Result<(), RpcStatus> {
let peer = request.context().peer_node_id().clone();
let msg = request.into_message();
let start_header = self
.db
Expand Down Expand Up @@ -105,7 +106,10 @@ where B: BlockchainBackend + 'static
let include_pruned_utxos = msg.include_pruned_utxos;
let include_deleted_bitmaps = msg.include_deleted_bitmaps;
task::spawn(async move {
debug!(target: LOG_TARGET, "Starting UTXO stream for peer '{}'", peer);
debug!(
target: LOG_TARGET,
"Starting UTXO stream for peer '{}'", self.peer_node_id
);
if let Err(err) = self
.start_streaming(
&mut tx,
Expand All @@ -118,10 +122,16 @@ where B: BlockchainBackend + 'static
)
.await
{
debug!(target: LOG_TARGET, "UTXO stream errored for peer '{}': {}", peer, err);
debug!(
target: LOG_TARGET,
"UTXO stream errored for peer '{}': {}", self.peer_node_id, err
);
let _ = tx.send(Err(err)).await;
}
debug!(target: LOG_TARGET, "UTXO stream completed for peer '{}'", peer);
debug!(
target: LOG_TARGET,
"UTXO stream completed for peer '{}'", self.peer_node_id
);
metrics::active_sync_peers().dec();
});

Expand Down Expand Up @@ -178,7 +188,10 @@ where B: BlockchainBackend + 'static
let end = current_header.output_mmr_size;

if tx.is_closed() {
debug!(target: LOG_TARGET, "Exiting sync_utxos early because client has gone",);
debug!(
target: LOG_TARGET,
"Peer '{}' exited UTXO sync session early", self.peer_node_id
);
break;
}

Expand All @@ -196,6 +209,14 @@ where B: BlockchainBackend + 'static
current_header.height,
deleted_diff.cardinality(),
);
if tx.is_closed() {
debug!(
target: LOG_TARGET,
"Peer '{}' exited UTXO sync session early", self.peer_node_id
);
break;
}

let utxos = utxos
.into_iter()
.skip(skip_outputs as usize)
Expand Down
7 changes: 7 additions & 0 deletions base_layer/core/src/common/rolling_avg.rs
Original file line number Diff line number Diff line change
Expand Up @@ -59,4 +59,11 @@ impl RollingAverageTime {
u64::try_from(total_time.as_nanos()).unwrap_or(u64::MAX) / self.samples.len() as u64,
))
}

pub fn calculate_average_with_min_samples(&self, min_samples: usize) -> Option<Duration> {
if self.samples.len() < min_samples {
return None;
}
self.calculate_average()
}
}
Loading

0 comments on commit dd544cb

Please sign in to comment.