Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(tari-pulse): await shutdown signal in main loop #6696

Merged
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions applications/minotari_node/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -143,6 +143,7 @@ pub struct BaseNodeConfig {
/// Obscure GRPC error responses
pub report_grpc_error: bool,
// Interval to check if the base node is still in sync with the network
#[serde(with = "serializers::seconds")]
pub tari_pulse_interval: Duration,
}

Expand Down
62 changes: 45 additions & 17 deletions base_layer/core/src/base_node/tari_pulse_service/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ use hickory_client::{
rr::{DNSClass, Name, RData, Record, RecordType},
tcp::TcpClientStream,
};
use log::{error, info};
use log::{error, info, warn};
use serde::{Deserialize, Serialize};
use tari_p2p::Network;
use tari_service_framework::{async_trait, ServiceInitializationError, ServiceInitializer, ServiceInitializerContext};
Expand Down Expand Up @@ -75,13 +75,18 @@ fn get_network_dns_name(network: Network) -> Name {
pub struct TariPulseService {
dns_name: Name,
config: TariPulseConfig,
shutdown_signal: ShutdownSignal,
}

impl TariPulseService {
pub async fn new(config: TariPulseConfig) -> Result<Self, anyhow::Error> {
pub async fn new(config: TariPulseConfig, shutdown_signal: ShutdownSignal) -> Result<Self, anyhow::Error> {
let dns_name: Name = get_network_dns_name(config.clone().network);
info!(target: LOG_TARGET, "Tari Pulse Service initialized with DNS name: {}", dns_name);
Ok(Self { dns_name, config })
Ok(Self {
dns_name,
config,
shutdown_signal,
})
}

pub fn default_trust_anchor() -> TrustAnchor {
Expand Down Expand Up @@ -116,21 +121,44 @@ impl TariPulseService {
notify_passed_checkpoints: watch::Sender<bool>,
) {
let mut interval = time::interval(self.config.check_interval);
let mut interval_failed = time::interval(Duration::from_millis(100));
let mut shutdown_signal = self.shutdown_signal.clone();

loop {
let passed_checkpoints = match self.passed_checkpoints(&mut base_node_service).await {
Ok(passed) => passed,
Err(err) => {
error!(target: LOG_TARGET, "Error checking if node passed checkpoints: {:?}", err);
interval_failed.tick().await;
continue;
},
};
tokio::select! {
_ = interval.tick() => {
let passed_checkpoints = match self.passed_checkpoints(&mut base_node_service).await {
Ok(passed) => {
interval = time::interval(self.config.check_interval); // reset interval if back to healthy
interval.tick().await;
SWvheerden marked this conversation as resolved.
Show resolved Hide resolved
passed
},
Err(err) => {
warn!(target: LOG_TARGET, "Failed to check if node has passed checkpoints: {:?}", err);
let interval_in_secs = interval.period().as_secs();
if interval_in_secs > (60 * 30) {
warn!(target: LOG_TARGET, "Reached maximum retry interval of 30 minutes. Exiting");
break;
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would not break here, just leave the interval at max 30 mins

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also after removing this tick().await after assign new interval the loop continues to max value right away.
Screenshot from 2024-11-22 11-59-15

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks like after assigning new interval we have to tick().await to make sure that the next iteration of the loop actually awaits

}
info!(target: LOG_TARGET, "Retrying in {} seconds", interval_in_secs);
// increase interval if node repeatly fails to fetch checkpoints
interval = time::interval(Duration::from_secs(interval_in_secs * 2));
interval.tick().await;
continue;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ditto no need to wait again

Suggested change
interval = time::interval(Duration::from_secs(interval_in_secs * 2));
interval.tick().await;
continue;
interval = time::interval(Duration::from_secs(interval_in_secs * 2));
continue;

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

same as above, we need to .await first for next invoke to actually sleep.

},
};

notify_passed_checkpoints
.send(!passed_checkpoints)
.expect("Channel should be open");
interval.tick().await;
notify_passed_checkpoints
.send(!passed_checkpoints)
.expect("Channel should be open");
},
_ = shutdown_signal.wait() => {
info!(
target: LOG_TARGET,
"Tari Pulse shutting down because the shutdown signal was received"
);
break;
},
}
}
}

Expand Down Expand Up @@ -231,7 +259,7 @@ impl ServiceInitializer for TariPulseServiceInitializer {

context.spawn_when_ready(move |handles| async move {
let base_node_service = handles.expect_handle::<LocalNodeCommsInterface>();
let mut tari_pulse_service = TariPulseService::new(config)
let mut tari_pulse_service = TariPulseService::new(config, shutdown_signal.clone())
.await
.expect("Should be able to get the service");
let tari_pulse_service = tari_pulse_service.run(base_node_service, sender);
Expand Down
Loading