Skip to content

Commit

Permalink
[LOGGING] - Log Errors or Panic When Critical Tasks exit (#3577)
Browse files Browse the repository at this point in the history
* add health check task

* add a periodic heart beat for main tasks

* rebase and format

* cleanup

* minor cleanup

* change task id to include task name and a number

* cleanup

* cleanup async-std and tokio scopes in loops

* cleanup

* remove uneeded helper functions to avoid duplication

* minor cleanup

* pin fused receive stream on task startup

* fix tests

* address comments
  • Loading branch information
lukeiannucci authored Aug 21, 2024
1 parent 6eb5357 commit 30bcd4d
Show file tree
Hide file tree
Showing 25 changed files with 490 additions and 80 deletions.
2 changes: 2 additions & 0 deletions crates/hotshot/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ use hotshot_types::{
traits::{network::BroadcastDelay, node_implementation::Versions},
};
use rand::Rng;
use tasks::add_health_check_task;
use url::Url;

/// Contains traits consumed by [`SystemContext`]
Expand Down Expand Up @@ -634,6 +635,7 @@ impl<TYPES: NodeType, I: NodeImplementation<TYPES>, V: Versions> SystemContext<T

add_network_tasks::<TYPES, I, V>(&mut handle).await;
add_consensus_tasks::<TYPES, I, V>(&mut handle).await;
add_health_check_task::<TYPES, I, V>(&mut handle).await;

handle
}
Expand Down
76 changes: 46 additions & 30 deletions crates/hotshot/src/tasks/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@

/// Provides trait to create task states from a `SystemContextHandle`
pub mod task_state;
use hotshot_task::task::{NetworkHandle, Task};
use std::{collections::HashSet, sync::Arc, time::Duration};

use async_broadcast::broadcast;
Expand All @@ -18,14 +19,14 @@ use futures::{
future::{BoxFuture, FutureExt},
stream, StreamExt,
};
use hotshot_task::task::Task;
#[cfg(feature = "rewind")]
use hotshot_task_impls::rewind::RewindTaskState;
use hotshot_task_impls::{
da::DaTaskState,
events::HotShotEvent,
network,
network::{NetworkEventTaskState, NetworkMessageTaskState},
health_check::HealthCheckTaskState,
helpers::broadcast_event,
network::{self, NetworkEventTaskState, NetworkMessageTaskState},
request::NetworkRequestState,
response::{run_response_task, NetworkResponseState},
transactions::TransactionTaskState,
Expand Down Expand Up @@ -68,14 +69,7 @@ pub async fn add_request_network_task<
>(
handle: &mut SystemContextHandle<TYPES, I, V>,
) {
let state = NetworkRequestState::<TYPES, I>::create_from(handle).await;

let task = Task::new(
state,
handle.internal_event_stream.0.clone(),
handle.internal_event_stream.1.activate_cloned(),
);
handle.consensus_registry.run_task(task);
handle.add_task(NetworkRequestState::<TYPES, I>::create_from(handle).await);
}

/// Add a task which responds to requests on the network.
Expand All @@ -91,9 +85,12 @@ pub fn add_response_task<TYPES: NodeType, I: NodeImplementation<TYPES>, V: Versi
handle.private_key().clone(),
handle.hotshot.id,
);
let task_name = state.get_task_name();
handle.network_registry.register(run_response_task::<TYPES>(
state,
handle.internal_event_stream.0.clone(),
handle.internal_event_stream.1.activate_cloned(),
handle.generate_task_id(task_name),
));
}

Expand All @@ -117,9 +114,10 @@ pub fn add_network_message_task<
let network = Arc::clone(channel);
let mut state = network_state.clone();
let shutdown_signal = create_shutdown_event_monitor(handle).fuse();
let stream = handle.internal_event_stream.0.clone();
let task_id = handle.generate_task_id(network_state.get_task_name());
let handle_task_id = task_id.clone();
let task_handle = async_spawn(async move {
futures::pin_mut!(shutdown_signal);

let recv_stream = stream::unfold((), |()| async {
let msgs = match network.recv_msgs().await {
Ok(msgs) => {
Expand All @@ -144,9 +142,10 @@ pub fn add_network_message_task<
Some((msgs, ()))
});

let heartbeat_interval =
Task::<HealthCheckTaskState<TYPES>>::get_periodic_interval_in_secs();
let fused_recv_stream = recv_stream.boxed().fuse();
futures::pin_mut!(fused_recv_stream);

futures::pin_mut!(fused_recv_stream, heartbeat_interval, shutdown_signal);
loop {
futures::select! {
() = shutdown_signal => {
Expand All @@ -168,10 +167,16 @@ pub fn add_network_message_task<
return;
}
}
_ = Task::<HealthCheckTaskState<TYPES>>::handle_periodic_delay(&mut heartbeat_interval) => {
broadcast_event(Arc::new(HotShotEvent::HeartBeat(handle_task_id.clone())), &stream).await;
}
}
}
});
handle.network_registry.register(task_handle);
handle.network_registry.register(NetworkHandle {
handle: task_handle,
task_id,
});
}

/// Add the network task to handle events and send messages.
Expand All @@ -194,12 +199,7 @@ pub fn add_network_event_task<
storage: Arc::clone(&handle.storage()),
upgrade_lock: handle.hotshot.upgrade_lock.clone(),
};
let task = Task::new(
network_state,
handle.internal_event_stream.0.clone(),
handle.internal_event_stream.1.activate_cloned(),
);
handle.consensus_registry.run_task(task);
handle.add_task(network_state);
}

/// Adds consensus-related tasks to a `SystemContextHandle`.
Expand Down Expand Up @@ -331,13 +331,15 @@ where

add_consensus_tasks::<TYPES, I, V>(&mut handle).await;
self.add_network_tasks(&mut handle).await;
add_health_check_task(&mut handle).await;

handle
}

/// Add byzantine network tasks with the trait
#[allow(clippy::too_many_lines)]
async fn add_network_tasks(&'static mut self, handle: &mut SystemContextHandle<TYPES, I, V>) {
let task_id = self.get_task_name();
let state_in = Arc::new(RwLock::new(self));
let state_out = Arc::clone(&state_in);
// channels between the task spawned in this function and the network tasks.
Expand Down Expand Up @@ -376,8 +378,6 @@ where
// and broadcast the transformed events to the replacement event stream we just created.
let shutdown_signal = create_shutdown_event_monitor(handle).fuse();
let send_handle = async_spawn(async move {
futures::pin_mut!(shutdown_signal);

let recv_stream = stream::unfold(original_receiver, |mut recv| async move {
match recv.recv().await {
Ok(event) => Some((Ok(event), recv)),
Expand All @@ -388,7 +388,7 @@ where
.boxed();

let fused_recv_stream = recv_stream.fuse();
futures::pin_mut!(fused_recv_stream);
futures::pin_mut!(fused_recv_stream, shutdown_signal);

loop {
futures::select! {
Expand Down Expand Up @@ -424,8 +424,6 @@ where
// and broadcast the transformed events to the original internal event stream
let shutdown_signal = create_shutdown_event_monitor(handle).fuse();
let recv_handle = async_spawn(async move {
futures::pin_mut!(shutdown_signal);

let network_recv_stream =
stream::unfold(receiver_from_network, |mut recv| async move {
match recv.recv().await {
Expand All @@ -436,7 +434,7 @@ where
});

let fused_network_recv_stream = network_recv_stream.boxed().fuse();
futures::pin_mut!(fused_network_recv_stream);
futures::pin_mut!(fused_network_recv_stream, shutdown_signal);

loop {
futures::select! {
Expand Down Expand Up @@ -467,8 +465,19 @@ where
}
});

handle.network_registry.register(send_handle);
handle.network_registry.register(recv_handle);
handle.network_registry.register(NetworkHandle {
handle: send_handle,
task_id: handle.generate_task_id(task_id),
});
handle.network_registry.register(NetworkHandle {
handle: recv_handle,
task_id: handle.generate_task_id(task_id),
});
}

/// Gets the name of the current task
fn get_task_name(&self) -> &'static str {
std::any::type_name::<dyn EventTransformerState<TYPES, I, V>>()
}
}

Expand Down Expand Up @@ -659,3 +668,10 @@ pub async fn add_network_tasks<TYPES: NodeType, I: NodeImplementation<TYPES>, V:
network::vid_filter,
);
}

/// Add the health check task
pub async fn add_health_check_task<TYPES: NodeType, I: NodeImplementation<TYPES>, V: Versions>(
handle: &mut SystemContextHandle<TYPES, I, V>,
) {
handle.add_task(HealthCheckTaskState::<TYPES>::create_from(handle).await);
}
16 changes: 15 additions & 1 deletion crates/hotshot/src/tasks/task_state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ use async_trait::async_trait;
use chrono::Utc;
use hotshot_task_impls::{
builder::BuilderClient, consensus::ConsensusTaskState, consensus2::Consensus2TaskState,
da::DaTaskState, quorum_proposal::QuorumProposalTaskState,
da::DaTaskState, health_check::HealthCheckTaskState, quorum_proposal::QuorumProposalTaskState,
quorum_proposal_recv::QuorumProposalRecvTaskState, quorum_vote::QuorumVoteTaskState,
request::NetworkRequestState, rewind::RewindTaskState, transactions::TransactionTaskState,
upgrade::UpgradeTaskState, vid::VidTaskState, view_sync::ViewSyncTaskState,
Expand Down Expand Up @@ -383,3 +383,17 @@ impl<TYPES: NodeType, I: NodeImplementation<TYPES>, V: Versions> CreateTaskState
}
}
}

#[async_trait]
impl<TYPES: NodeType, I: NodeImplementation<TYPES>, V: Versions> CreateTaskState<TYPES, I, V>
for HealthCheckTaskState<TYPES>
{
async fn create_from(handle: &SystemContextHandle<TYPES, I, V>) -> Self {
let heartbeat_timeout_duration_in_secs = 30;
HealthCheckTaskState::new(
handle.hotshot.id,
handle.get_task_ids(),
heartbeat_timeout_duration_in_secs,
)
}
}
20 changes: 20 additions & 0 deletions crates/hotshot/src/types/handle.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ use hotshot_types::{
error::HotShotError,
traits::{election::Membership, network::ConnectedNetwork, node_implementation::NodeType},
};
use rand::Rng;
#[cfg(async_executor_impl = "tokio")]
use tokio::task::JoinHandle;
use tracing::instrument;
Expand Down Expand Up @@ -68,15 +69,34 @@ impl<TYPES: NodeType, I: NodeImplementation<TYPES> + 'static, V: Versions>
{
/// Adds a hotshot consensus-related task to the `SystemContextHandle`.
pub fn add_task<S: TaskState<Event = HotShotEvent<TYPES>> + 'static>(&mut self, task_state: S) {
let task_name = task_state.get_task_name();
let task = Task::new(
task_state,
self.internal_event_stream.0.clone(),
self.internal_event_stream.1.activate_cloned(),
self.generate_task_id(task_name),
);

self.consensus_registry.run_task(task);
}

#[must_use]
/// generate a task id for a task
pub fn generate_task_id(&self, task_name: &str) -> String {
let random = rand::thread_rng().gen_range(0..=9999);
let tasks_spawned =
self.consensus_registry.task_handles.len() + self.network_registry.handles.len();
format!("{task_name}_{tasks_spawned}_{random}")
}

#[must_use]
/// Get a list of all the running tasks ids
pub fn get_task_ids(&self) -> Vec<String> {
let mut task_ids = self.consensus_registry.get_task_ids();
task_ids.extend(self.network_registry.get_task_ids());
task_ids
}

/// obtains a stream to expose to the user
pub fn event_stream(&self) -> impl Stream<Item = Event<TYPES>> {
self.output_event_stream.1.activate_cloned()
Expand Down
4 changes: 4 additions & 0 deletions crates/task-impls/src/consensus/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -773,4 +773,8 @@ impl<TYPES: NodeType, I: NodeImplementation<TYPES>, V: Versions> TaskState
}
}
}

fn get_task_name(&self) -> &'static str {
std::any::type_name::<ConsensusTaskState<TYPES, I, V>>()
}
}
4 changes: 4 additions & 0 deletions crates/task-impls/src/consensus2/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -172,4 +172,8 @@ impl<TYPES: NodeType, I: NodeImplementation<TYPES>, V: Versions> TaskState

/// Joins all subtasks.
async fn cancel_subtasks(&mut self) {}

fn get_task_name(&self) -> &'static str {
std::any::type_name::<Consensus2TaskState<TYPES, I, V>>()
}
}
4 changes: 4 additions & 0 deletions crates/task-impls/src/da.rs
Original file line number Diff line number Diff line change
Expand Up @@ -379,4 +379,8 @@ impl<TYPES: NodeType, I: NodeImplementation<TYPES>> TaskState for DaTaskState<TY
}

async fn cancel_subtasks(&mut self) {}

fn get_task_name(&self) -> &'static str {
std::any::type_name::<DaTaskState<TYPES, I>>()
}
}
10 changes: 10 additions & 0 deletions crates/task-impls/src/events.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,10 @@ impl<TYPES: NodeType> TaskEvent for HotShotEvent<TYPES> {
fn shutdown_event() -> Self {
HotShotEvent::Shutdown
}

fn heartbeat_event(task_id: String) -> Self {
HotShotEvent::HeartBeat(task_id)
}
}

/// Wrapper type for the event to notify tasks that a proposal for a view is missing
Expand Down Expand Up @@ -216,6 +220,9 @@ pub enum HotShotEvent<TYPES: NodeType> {
/// 2. The proposal has been correctly signed by the leader of the current view
/// 3. The justify QC is valid
QuorumProposalPreliminarilyValidated(Proposal<TYPES, QuorumProposal<TYPES>>),

/// Periodic heart beat event for health checking
HeartBeat(String),
}

impl<TYPES: NodeType> Display for HotShotEvent<TYPES> {
Expand Down Expand Up @@ -463,6 +470,9 @@ impl<TYPES: NodeType> Display for HotShotEvent<TYPES> {
proposal.data.view_number()
)
}
HotShotEvent::HeartBeat(task_id) => {
write!(f, "HeartBeat(task_id={task_id:?}")
}
}
}
}
7 changes: 6 additions & 1 deletion crates/task-impls/src/harness.rs
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,12 @@ pub async fn run_harness<TYPES, S: TaskState<Event = HotShotEvent<TYPES>> + Send
allow_extra_output,
};

let task = Task::new(state, to_test.clone(), from_test.clone());
let task = Task::new(
state,
to_test.clone(),
from_test.clone(),
"task_0".to_string(),
);

let handle = task.run();
let test_future = async move {
Expand Down
Loading

0 comments on commit 30bcd4d

Please sign in to comment.