From 23236b5835b6eb64628830238b562140f641ad94 Mon Sep 17 00:00:00 2001 From: Stan Bondi Date: Thu, 8 Sep 2022 17:43:29 +0400 Subject: [PATCH] fix(core/sync): handle deadline timeouts by changing peer --- .../src/base_node/sync/block_sync/synchronizer.rs | 13 +++++++++++-- base_layer/core/src/base_node/sync/config.rs | 6 +++++- .../src/base_node/sync/header_sync/synchronizer.rs | 10 ++++++++-- .../sync/horizon_state_sync/synchronizer.rs | 12 ++++++++++-- 4 files changed, 34 insertions(+), 7 deletions(-) diff --git a/base_layer/core/src/base_node/sync/block_sync/synchronizer.rs b/base_layer/core/src/base_node/sync/block_sync/synchronizer.rs index b578e6ac08..8a5542b893 100644 --- a/base_layer/core/src/base_node/sync/block_sync/synchronizer.rs +++ b/base_layer/core/src/base_node/sync/block_sync/synchronizer.rs @@ -29,7 +29,12 @@ use std::{ use futures::StreamExt; use log::*; use num_format::{Locale, ToFormattedString}; -use tari_comms::{connectivity::ConnectivityRequester, peer_manager::NodeId, PeerConnection}; +use tari_comms::{ + connectivity::ConnectivityRequester, + peer_manager::NodeId, + protocol::rpc::{RpcClient, RpcError}, + PeerConnection, +}; use tari_utilities::hex::Hex; use tracing; @@ -119,8 +124,11 @@ impl BlockSynchronizer { let sync_peer_node_ids = self.sync_peers.iter().map(|p| p.node_id()).cloned().collect::>(); for (i, node_id) in sync_peer_node_ids.iter().enumerate() { let mut conn = self.connect_to_sync_peer(node_id.clone()).await?; + let config = RpcClient::builder() + .with_deadline(self.config.rpc_deadline) + .with_deadline_grace_period(Duration::from_secs(5)); let mut client = conn - .connect_rpc_using_builder(rpc::BaseNodeSyncRpcClient::builder().with_deadline(Duration::from_secs(60))) + .connect_rpc_using_builder::(config) .await?; let latency = client .get_last_request_latency() @@ -158,6 +166,7 @@ impl BlockSynchronizer { self.ban_peer(node_id, &err).await?; return Err(err.into()); }, + Err(err @ BlockSyncError::RpcError(RpcError::ReplyTimeout)) | Err(err @ BlockSyncError::MaxLatencyExceeded { .. }) => { warn!(target: LOG_TARGET, "{}", err); if i == self.sync_peers.len() - 1 { diff --git a/base_layer/core/src/base_node/sync/config.rs b/base_layer/core/src/base_node/sync/config.rs index ec26d2f918..5d3a331aae 100644 --- a/base_layer/core/src/base_node/sync/config.rs +++ b/base_layer/core/src/base_node/sync/config.rs @@ -48,17 +48,21 @@ pub struct BlockchainSyncConfig { pub forced_sync_peers: Vec, /// Number of threads to use for validation pub validation_concurrency: usize, + /// The RPC deadline to set on sync clients. If this deadline is reached, a new sync peer will be selected for + /// sync. + pub rpc_deadline: Duration, } impl Default for BlockchainSyncConfig { fn default() -> Self { Self { - initial_max_sync_latency: Duration::from_secs(10), + initial_max_sync_latency: Duration::from_secs(20), max_latency_increase: Duration::from_secs(2), ban_period: Duration::from_secs(30 * 60), short_ban_period: Duration::from_secs(60), forced_sync_peers: Default::default(), validation_concurrency: 6, + rpc_deadline: Duration::from_secs(10), } } } diff --git a/base_layer/core/src/base_node/sync/header_sync/synchronizer.rs b/base_layer/core/src/base_node/sync/header_sync/synchronizer.rs index 28b3be7e46..889a3568c0 100644 --- a/base_layer/core/src/base_node/sync/header_sync/synchronizer.rs +++ b/base_layer/core/src/base_node/sync/header_sync/synchronizer.rs @@ -31,7 +31,7 @@ use tari_common_types::{chain_metadata::ChainMetadata, types::HashOutput}; use tari_comms::{ connectivity::ConnectivityRequester, peer_manager::NodeId, - protocol::rpc::{RpcError, RpcHandshakeError}, + protocol::rpc::{RpcClient, RpcError, RpcHandshakeError}, PeerConnection, }; use tari_utilities::hex::Hex; @@ -136,7 +136,12 @@ impl<'a, B: BlockchainBackend + 'static> HeaderSynchronizer<'a, B> { "Attempting to synchronize headers with `{}`", node_id ); - let mut client = conn.connect_rpc::().await?; + let config = RpcClient::builder() + .with_deadline(self.config.rpc_deadline) + .with_deadline_grace_period(Duration::from_secs(5)); + let mut client = conn + .connect_rpc_using_builder::(config) + .await?; let latency = client .get_last_request_latency() @@ -208,6 +213,7 @@ impl<'a, B: BlockchainBackend + 'static> HeaderSynchronizer<'a, B> { self.ban_peer_long(node_id, BanReason::GeneralHeaderSyncFailure(err)) .await?; }, + Err(err @ BlockHeaderSyncError::RpcError(RpcError::ReplyTimeout)) | Err(err @ BlockHeaderSyncError::MaxLatencyExceeded { .. }) => { warn!(target: LOG_TARGET, "{}", err); if i == self.sync_peers.len() - 1 { diff --git a/base_layer/core/src/base_node/sync/horizon_state_sync/synchronizer.rs b/base_layer/core/src/base_node/sync/horizon_state_sync/synchronizer.rs index 0b35d8d6eb..3e01e96910 100644 --- a/base_layer/core/src/base_node/sync/horizon_state_sync/synchronizer.rs +++ b/base_layer/core/src/base_node/sync/horizon_state_sync/synchronizer.rs @@ -32,7 +32,11 @@ use croaring::Bitmap; use futures::{stream::FuturesUnordered, StreamExt}; use log::*; use tari_common_types::types::{Commitment, RangeProofService}; -use tari_comms::{connectivity::ConnectivityRequester, peer_manager::NodeId}; +use tari_comms::{ + connectivity::ConnectivityRequester, + peer_manager::NodeId, + protocol::rpc::{RpcClient, RpcError}, +}; use tari_crypto::{commitment::HomomorphicCommitment, tari_utilities::hex::Hex}; use tokio::task; @@ -178,7 +182,10 @@ impl<'a, B: BlockchainBackend + 'static> HorizonStateSynchronization<'a, B> { async fn sync(&mut self, header: &BlockHeader) -> Result<(), HorizonSyncError> { for (i, sync_peer) in self.sync_peers.iter().enumerate() { let mut connection = self.connectivity.dial_peer(sync_peer.node_id().clone()).await?; - let mut client = connection.connect_rpc::().await?; + let config = RpcClient::builder() + .with_deadline(self.config.rpc_deadline) + .with_deadline_grace_period(Duration::from_secs(3)); + let mut client = connection.connect_rpc_using_builder(config).await?; match self.begin_sync(sync_peer.clone(), &mut client, header).await { Ok(_) => match self.finalize_horizon_sync(sync_peer).await { @@ -188,6 +195,7 @@ impl<'a, B: BlockchainBackend + 'static> HorizonStateSynchronization<'a, B> { return Err(err); }, }, + Err(err @ HorizonSyncError::RpcError(RpcError::ReplyTimeout)) | Err(err @ HorizonSyncError::MaxLatencyExceeded { .. }) => { warn!(target: LOG_TARGET, "{}", err); if i == self.sync_peers.len() - 1 {