Skip to content

Commit

Permalink
feat(core/sync): adds connecting sync status (#4698)
Browse files Browse the repository at this point in the history
Description
---
- Adds connecting sync status

Motivation and Context
---
Statuses like "starting block sync" did not show which peer is being synced from.
This PR changes the status to connecting and includes the sync peer that is being connected to.

How Has This Been Tested?
---
Manually
  • Loading branch information
sdbondi authored Sep 20, 2022
1 parent 80af7fa commit abde8e8
Show file tree
Hide file tree
Showing 11 changed files with 57 additions and 36 deletions.
2 changes: 1 addition & 1 deletion applications/tari_app_grpc/proto/base_node.proto
Original file line number Diff line number Diff line change
Expand Up @@ -156,7 +156,7 @@ enum BaseNodeState{
START_UP = 0;
HEADER_SYNC = 1;
HORIZON_SYNC = 2;
BLOCK_SYNC_STARTING= 3;
CONNECTING = 3;
BLOCK_SYNC = 4;
LISTENING = 5;
}
Expand Down
6 changes: 3 additions & 3 deletions applications/tari_app_grpc/src/conversions/base_node_state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@

use tari_core::base_node::state_machine_service::states::{
StateInfo,
StateInfo::{BlockSync, BlockSyncStarting, HeaderSync, HorizonSync, Listening, StartUp},
StateInfo::{BlockSync, Connecting, HeaderSync, HorizonSync, Listening, StartUp},
};

use crate::tari_rpc as grpc;
Expand All @@ -33,7 +33,7 @@ impl From<StateInfo> for grpc::BaseNodeState {
StartUp => grpc::BaseNodeState::HeaderSync,
HeaderSync(_) => grpc::BaseNodeState::HeaderSync,
HorizonSync(_) => grpc::BaseNodeState::HorizonSync,
BlockSyncStarting => grpc::BaseNodeState::BlockSyncStarting,
Connecting(_) => grpc::BaseNodeState::Connecting,
BlockSync(_) => grpc::BaseNodeState::BlockSync,
Listening(_) => grpc::BaseNodeState::Listening,
}
Expand All @@ -46,7 +46,7 @@ impl From<&StateInfo> for grpc::BaseNodeState {
StartUp => grpc::BaseNodeState::HeaderSync,
HeaderSync(_) => grpc::BaseNodeState::HeaderSync,
HorizonSync(_) => grpc::BaseNodeState::HorizonSync,
BlockSyncStarting => grpc::BaseNodeState::BlockSyncStarting,
Connecting(_) => grpc::BaseNodeState::Connecting,
BlockSync(_) => grpc::BaseNodeState::BlockSync,
Listening(_) => grpc::BaseNodeState::Listening,
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1394,7 +1394,7 @@ impl tari_rpc::base_node_server::BaseNode for BaseNodeGrpcServer {
local_height: info.local_height,
state: tari_rpc::SyncState::Header.into(),
},
StateInfo::BlockSyncStarting => tari_rpc::SyncProgressResponse {
StateInfo::Connecting(_) => tari_rpc::SyncProgressResponse {
tip_height: 0,
local_height: 0,
state: tari_rpc::SyncState::BlockStarting.into(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@
use std::{mem, time::Instant};

use log::*;
use randomx_rs::RandomXFlag;

use crate::{
base_node::{
Expand Down Expand Up @@ -59,16 +58,20 @@ impl BlockSync {

let status_event_sender = shared.status_event_sender.clone();
let bootstrapped = shared.is_bootstrapped();
let _result = status_event_sender.send(StatusInfo {
bootstrapped,
state_info: StateInfo::BlockSyncStarting,
randomx_vm_cnt: 0,
randomx_vm_flags: RandomXFlag::FLAG_DEFAULT,
});
let local_nci = shared.local_node_interface.clone();
let randomx_vm_cnt = shared.get_randomx_vm_cnt();
let randomx_vm_flags = shared.get_randomx_vm_flags();
let tip_height_metric = metrics::tip_height();
synchronizer.on_starting(move |sync_peer| {
let _result = status_event_sender.send(StatusInfo {
bootstrapped,
state_info: StateInfo::Connecting(sync_peer.clone()),
randomx_vm_cnt,
randomx_vm_flags,
});
});

let status_event_sender = shared.status_event_sender.clone();
synchronizer.on_progress(move |block, remote_tip_height, sync_peer| {
let local_height = block.height();
local_nci.publish_block_event(BlockEvent::ValidBlockAdded(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -166,25 +166,32 @@ impl Display for BaseNodeState {
#[derive(Debug, Clone, PartialEq)]
pub enum StateInfo {
StartUp,
Connecting(SyncPeer),
HeaderSync(Option<BlockSyncInfo>),
HorizonSync(HorizonSyncInfo),
BlockSyncStarting,
BlockSync(BlockSyncInfo),
Listening(ListeningInfo),
}

impl StateInfo {
pub fn short_desc(&self) -> String {
use StateInfo::{BlockSync, BlockSyncStarting, HeaderSync, HorizonSync, Listening, StartUp};
use StateInfo::{BlockSync, Connecting, HeaderSync, HorizonSync, Listening, StartUp};
match self {
StartUp => "Starting up".to_string(),
Connecting(sync_peer) => format!(
"Connecting to {}{}",
sync_peer.node_id().short_str(),
sync_peer
.latency()
.map(|l| format!(", Latency: {:.2?}", l))
.unwrap_or_else(|| "".to_string())
),
HeaderSync(None) => "Starting header sync".to_string(),
HeaderSync(Some(info)) => format!("Syncing headers: {}", info.sync_progress_string()),
HorizonSync(info) => info.to_progress_string(),

BlockSync(info) => format!("Syncing blocks: {}", info.sync_progress_string()),
Listening(_) => "Listening".to_string(),
BlockSyncStarting => "Starting block sync".to_string(),
}
}

Expand All @@ -196,25 +203,26 @@ impl StateInfo {
}

pub fn is_synced(&self) -> bool {
use StateInfo::{BlockSync, BlockSyncStarting, HeaderSync, HorizonSync, Listening, StartUp};
use StateInfo::{BlockSync, Connecting, HeaderSync, HorizonSync, Listening, StartUp};
match self {
StartUp | HeaderSync(_) | HorizonSync(_) | BlockSync(_) | BlockSyncStarting => false,
StartUp | Connecting(_) | HeaderSync(_) | HorizonSync(_) | BlockSync(_) => false,
Listening(info) => info.is_synced(),
}
}
}

impl Display for StateInfo {
fn fmt(&self, f: &mut Formatter<'_>) -> Result<(), Error> {
use StateInfo::{BlockSync, BlockSyncStarting, HeaderSync, HorizonSync, Listening, StartUp};
#[allow(clippy::enum_glob_use)]
use StateInfo::*;
match self {
StartUp => write!(f, "Node starting up"),
Connecting(sync_peer) => write!(f, "Connecting to {}", sync_peer),
HeaderSync(Some(info)) => write!(f, "Synchronizing block headers: {}", info),
HeaderSync(None) => write!(f, "Synchronizing block headers: Starting"),
HorizonSync(info) => write!(f, "Synchronizing horizon state: {}", info),
BlockSync(info) => write!(f, "Synchronizing blocks: {}", info),
Listening(info) => write!(f, "Listening: {}", info),
BlockSyncStarting => write!(f, "Synchronizing blocks: Starting"),
}
}
}
Expand Down Expand Up @@ -275,7 +283,7 @@ impl BlockSyncInfo {
self.sync_peer.node_id().short_str(),
self.local_height,
self.tip_height,
(self.local_height as f64 / self.tip_height as f64 * 100.0),
(self.local_height as f64 / self.tip_height as f64 * 100.0).floor(),
self.sync_peer
.items_per_second()
.map(|bps| format!(" {:.2?} blks/s", bps))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -88,10 +88,10 @@ impl HeaderSyncState {
let bootstrapped = shared.is_bootstrapped();
let randomx_vm_cnt = shared.get_randomx_vm_cnt();
let randomx_vm_flags = shared.get_randomx_vm_flags();
synchronizer.on_starting(move || {
synchronizer.on_starting(move |sync_peer| {
let _result = status_event_sender.send(StatusInfo {
bootstrapped,
state_info: StateInfo::HeaderSync(None),
state_info: StateInfo::Connecting(sync_peer.clone()),
randomx_vm_cnt,
randomx_vm_flags,
});
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ use super::{StateEvent, StateInfo};
use crate::{
base_node::{
state_machine_service::states::StatusInfo,
sync::{HorizonStateSynchronization, HorizonSyncInfo, HorizonSyncStatus, SyncPeer},
sync::{HorizonStateSynchronization, SyncPeer},
BaseNodeStateMachine,
},
chain_storage::BlockchainBackend,
Expand Down Expand Up @@ -101,12 +101,10 @@ impl HorizonStateSync {
let bootstrapped = shared.is_bootstrapped();
let randomx_vm_cnt = shared.get_randomx_vm_cnt();
let randomx_vm_flags = shared.get_randomx_vm_flags();
let sync_peers_node_id = sync_peers.iter().map(|p| p.node_id()).cloned().collect();
horizon_sync.on_starting(move || {
let info = HorizonSyncInfo::new(sync_peers_node_id, HorizonSyncStatus::Starting);
horizon_sync.on_starting(move |sync_peer| {
let _result = status_event_sender.send(StatusInfo {
bootstrapped,
state_info: StateInfo::HorizonSync(info),
state_info: StateInfo::Connecting(sync_peer.clone()),
randomx_vm_cnt,
randomx_vm_flags,
});
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,11 @@ impl<B: BlockchainBackend + 'static> BlockSynchronizer<B> {
}
}

pub fn on_starting<H>(&mut self, hook: H)
where for<'r> H: FnOnce(&SyncPeer) + Send + Sync + 'static {
self.hooks.add_on_starting_hook(hook);
}

pub fn on_progress<H>(&mut self, hook: H)
where H: Fn(Arc<ChainBlock>, u64, &SyncPeer) + Send + Sync + 'static {
self.hooks.add_on_progress_block_hook(hook);
Expand Down Expand Up @@ -123,6 +128,8 @@ impl<B: BlockchainBackend + 'static> BlockSynchronizer<B> {
async fn attempt_block_sync(&mut self, max_latency: Duration) -> Result<(), BlockSyncError> {
let sync_peer_node_ids = self.sync_peers.iter().map(|p| p.node_id()).cloned().collect::<Vec<_>>();
for (i, node_id) in sync_peer_node_ids.iter().enumerate() {
let sync_peer = &self.sync_peers[i];
self.hooks.call_on_starting_hook(sync_peer);
let mut conn = self.connect_to_sync_peer(node_id.clone()).await?;
let config = RpcClient::builder()
.with_deadline(self.config.rpc_deadline)
Expand Down Expand Up @@ -199,10 +206,10 @@ impl<B: BlockchainBackend + 'static> BlockSynchronizer<B> {
max_latency: Duration,
) -> Result<(), BlockSyncError> {
info!(target: LOG_TARGET, "Starting block sync from peer {}", sync_peer);
self.hooks.call_on_starting_hook();

let tip_header = self.db.fetch_last_header().await?;
let local_metadata = self.db.get_chain_metadata().await?;

if tip_header.height <= local_metadata.height_of_longest_chain() {
debug!(
target: LOG_TARGET,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@ impl<'a, B: BlockchainBackend + 'static> HeaderSynchronizer<'a, B> {
}

pub fn on_starting<H>(&mut self, hook: H)
where for<'r> H: FnOnce() + Send + Sync + 'static {
where for<'r> H: FnOnce(&SyncPeer) + Send + Sync + 'static {
self.hooks.add_on_starting_hook(hook);
}

Expand All @@ -104,7 +104,6 @@ impl<'a, B: BlockchainBackend + 'static> HeaderSynchronizer<'a, B> {

pub async fn synchronize(&mut self) -> Result<SyncPeer, BlockHeaderSyncError> {
debug!(target: LOG_TARGET, "Starting header sync.",);
self.hooks.call_on_starting_hook();

info!(
target: LOG_TARGET,
Expand All @@ -127,9 +126,14 @@ impl<'a, B: BlockchainBackend + 'static> HeaderSynchronizer<'a, B> {
}
}

#[allow(clippy::too_many_lines)]
pub async fn try_sync_from_all_peers(&mut self, max_latency: Duration) -> Result<SyncPeer, BlockHeaderSyncError> {
let sync_peer_node_ids = self.sync_peers.iter().map(|p| p.node_id()).cloned().collect::<Vec<_>>();
for (i, node_id) in sync_peer_node_ids.iter().enumerate() {
{
let sync_peer = &self.sync_peers[i];
self.hooks.call_on_starting_hook(sync_peer);
}
let mut conn = self.dial_sync_peer(node_id).await?;
debug!(
target: LOG_TARGET,
Expand Down
8 changes: 4 additions & 4 deletions base_layer/core/src/base_node/sync/hooks.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ use crate::{

#[derive(Default)]
pub(super) struct Hooks {
on_starting: Vec<Box<dyn FnOnce() + Send + Sync>>,
on_starting: Vec<Box<dyn FnOnce(&SyncPeer) + Send + Sync>>,
on_progress_header: Vec<Box<dyn Fn(u64, u64, &SyncPeer) + Send + Sync>>,
on_progress_block: Vec<Box<dyn Fn(Arc<ChainBlock>, u64, &SyncPeer) + Send + Sync>>,
on_progress_horizon_sync: Vec<Box<dyn Fn(HorizonSyncInfo) + Send + Sync>>,
Expand All @@ -41,12 +41,12 @@ pub(super) struct Hooks {

impl Hooks {
pub fn add_on_starting_hook<H>(&mut self, hook: H)
where H: FnOnce() + Send + Sync + 'static {
where H: FnOnce(&SyncPeer) + Send + Sync + 'static {
self.on_starting.push(Box::new(hook));
}

pub fn call_on_starting_hook(&mut self) {
self.on_starting.drain(..).for_each(|f| (f)());
pub fn call_on_starting_hook(&mut self, sync_peer: &SyncPeer) {
self.on_starting.drain(..).for_each(|f| (f)(sync_peer));
}

pub fn add_on_progress_header_hook<H>(&mut self, hook: H)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -128,7 +128,7 @@ impl<'a, B: BlockchainBackend + 'static> HorizonStateSynchronization<'a, B> {
}

pub fn on_starting<H>(&mut self, hook: H)
where for<'r> H: FnOnce() + Send + Sync + 'static {
where for<'r> H: FnOnce(&SyncPeer) + Send + Sync + 'static {
self.hooks.add_on_starting_hook(hook);
}

Expand Down Expand Up @@ -181,6 +181,7 @@ impl<'a, B: BlockchainBackend + 'static> HorizonStateSynchronization<'a, B> {

async fn sync(&mut self, header: &BlockHeader) -> Result<(), HorizonSyncError> {
for (i, sync_peer) in self.sync_peers.iter().enumerate() {
self.hooks.call_on_starting_hook(sync_peer);
let mut connection = self.connectivity.dial_peer(sync_peer.node_id().clone()).await?;
let config = RpcClient::builder()
.with_deadline(self.config.rpc_deadline)
Expand Down Expand Up @@ -220,7 +221,7 @@ impl<'a, B: BlockchainBackend + 'static> HorizonStateSynchronization<'a, B> {
) -> Result<(), HorizonSyncError> {
debug!(target: LOG_TARGET, "Initializing");
self.initialize().await?;
self.hooks.call_on_starting_hook();

debug!(target: LOG_TARGET, "Synchronizing kernels");
self.synchronize_kernels(sync_peer.clone(), client, to_header).await?;
debug!(target: LOG_TARGET, "Synchronizing outputs");
Expand Down

0 comments on commit abde8e8

Please sign in to comment.