Skip to content

Commit

Permalink
[CLI] Use JoinSet rather than a vec of JoinHandles
Browse files Browse the repository at this point in the history
  • Loading branch information
banool committed Oct 11, 2023
1 parent 2112044 commit 4c10535
Show file tree
Hide file tree
Showing 2 changed files with 39 additions and 34 deletions.
33 changes: 22 additions & 11 deletions crates/aptos/src/node/local_testnet/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ use std::{
path::{Path, PathBuf},
pin::Pin,
};
use tokio::task::JoinHandle;
use tokio::task::JoinSet;
use tracing::{info, warn};
use tracing_subscriber::fmt::MakeWriter;

Expand Down Expand Up @@ -302,11 +302,11 @@ impl CliCommand<()> for RunLocalTestnet {
.flat_map(|m| m.get_post_healthy_steps())
.collect();

let mut tasks: Vec<JoinHandle<()>> = Vec::new();
let mut join_set = JoinSet::new();

// Start each of the services.
for manager in managers.into_iter() {
tasks.push(manager.run());
join_set.spawn(manager.run());
}

// Wait for all the services to start up.
Expand All @@ -329,21 +329,32 @@ impl CliCommand<()> for RunLocalTestnet {
// see `ShutdownStep` for more info. In particular, to speak to how "best effort"
// this really is, to make sure ctrl-c happens more or less instantly, we only
// register this handler after all the services have started.
tasks.push(tokio::spawn(async move {
let abort_handle = join_set.spawn(async move {
tokio::signal::ctrl_c()
.await
.expect("Failed to register ctrl-c hook");
}));

// Wait for all of the tasks. We should never get past this point unless
// something goes goes wrong or the user signals for the process to end.
let num_tasks = tasks.len();
let (_, finished_future_index, _) = futures::future::select_all(tasks).await;
Ok(())
});
let ctrl_c_task_id = abort_handle.id();

// Wait for one of the tasks to end. We should never get past this point unless
// something goes goes wrong or the user signals for the process to end. We
// unwrap once because we know for certain the set is not empty and that's the
// only condition in which this can return `None`.
let result = join_set.join_next_with_id().await.unwrap();

// We want to print a different message depending on which task ended. We can
// determine if the task that ended was the ctrl-c task based on the ID of the
// task.
let finished_task_id = match &result {
Ok((id, _)) => *id,
Err(err) => err.id(),
};

// Because we added the ctrl-c task last, we can figure out if that was the one
// that ended based on `finished_future_index`. We modify our messaging and the
// return value based on this.
let was_ctrl_c = finished_future_index == num_tasks - 1;
let was_ctrl_c = finished_task_id == ctrl_c_task_id;
if was_ctrl_c {
eprintln!("\nReceived ctrl-c, running shutdown steps...");
} else {
Expand Down
40 changes: 17 additions & 23 deletions crates/aptos/src/node/local_testnet/traits.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,7 @@
use super::health_checker::HealthChecker;
use anyhow::{Context, Result};
use async_trait::async_trait;
use futures::FutureExt;
use std::{collections::HashSet, fmt::Debug};
use tokio::task::JoinHandle;
use tracing::warn;

#[async_trait]
Expand Down Expand Up @@ -40,32 +38,28 @@ pub trait ServiceManager: Debug + Send + Sync + 'static {
fn get_prerequisite_health_checkers(&self) -> HashSet<&HealthChecker>;

/// This is the function we use from the outside to start the service. It makes
/// sure all the prerequisite services have started and then spawns a tokio task to
/// run the service. The user should never need to override this implementation.
fn run(self: Box<Self>) -> JoinHandle<()> {
/// sure all the prerequisite services have started and then calls the inner
/// function to run the service. The user should never need to override this
/// implementation.
async fn run(self: Box<Self>) -> Result<()> {
// We make a new function here so that each task waits for its prereqs within
// its own run function. This way we can start each service in any order.
let name = self.get_name();
let name_clone = name.to_string();
let future = async move {
for health_checker in self.get_prerequisite_health_checkers() {
health_checker
.wait(Some(&self.get_name()))
.await
.context("Prerequisite service did not start up successfully")?;
}
self.run_service()
for health_checker in self.get_prerequisite_health_checkers() {
health_checker
.wait(Some(&self.get_name()))
.await
.context("Service ended with an error")?;
warn!(
"Service {} ended unexpectedly without any error",
name_clone
);
Ok(())
};
tokio::spawn(future.map(move |result: Result<()>| {
warn!("{} stopped unexpectedly {:#?}", name, result);
}))
.context("Prerequisite service did not start up successfully")?;
}
self.run_service()
.await
.context("Service ended with an error")?;
warn!(
"Service {} ended unexpectedly without any error",
name_clone
);
Ok(())
}

/// The ServiceManager may return PostHealthySteps. The tool will run these after
Expand Down

0 comments on commit 4c10535

Please sign in to comment.