diff --git a/Cargo.toml b/Cargo.toml index 4c1cc38ef..ae2f2aaec 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -33,7 +33,7 @@ async-trait = "0.1" dirs = "3" chrono = { version = "^0.4", features = ["serde"] } chrono-english = "^0.1.0" -ctrlc = "3" +ctrlc = { version = "3", features = ["termination"] } rand = "^0.7" strum = "^0.19" strum_macros = "^0.19" diff --git a/daemon/instructions.rs b/daemon/instructions.rs index c92342572..007b7b69e 100644 --- a/daemon/instructions.rs +++ b/daemon/instructions.rs @@ -1,6 +1,8 @@ use std::collections::BTreeMap; use std::sync::mpsc::Sender; +use log::debug; + use pueue::log::{clean_log_handles, read_and_compress_log_files}; use pueue::message::*; use pueue::state::SharedState; @@ -32,6 +34,7 @@ pub fn handle_message(message: Message, sender: &Sender, state: &Shared Message::Status => get_status(state), Message::Log(message) => get_log(message, state), Message::Parallel(message) => set_parallel_tasks(message, state), + Message::DaemonShutdown => shutdown(sender, state), _ => create_failure_message("Not implemented yet"), } } @@ -542,3 +545,29 @@ fn set_parallel_tasks(message: ParallelMessage, state: &SharedState) -> Message group )) } + +/// Initialize the shutdown procedure. +/// At first, the unix socket will be removed. +/// +/// Next, the DaemonShutdown Message will be forwarded to the TaskHandler. +/// The TaskHandler then gracefully shuts down all child processes +/// and exits with std::proces::exit(0). +fn shutdown(sender: &Sender, state: &SharedState) -> Message { + // Remove the unix socket + { + let state = state.lock().unwrap(); + if state.settings.shared.use_unix_socket { + let path = &state.settings.shared.unix_socket_path; + debug!("Check if a unit socket exists."); + if std::path::PathBuf::from(&path).exists() { + std::fs::remove_file(&path).expect("Failed to remove unix socket on shutdown"); + } + debug!("Removed the unix socket."); + } + } + + // Notify the task handler + sender.send(Message::DaemonShutdown).expect(SENDER_ERR); + + create_success_message("Daemon is shutting down") +} diff --git a/daemon/main.rs b/daemon/main.rs index 349704709..b9176dc5d 100644 --- a/daemon/main.rs +++ b/daemon/main.rs @@ -9,6 +9,7 @@ use anyhow::Result; use simplelog::{Config, LevelFilter, SimpleLogger}; use structopt::StructOpt; +use pueue::message::Message; use pueue::settings::Settings; use pueue::state::State; @@ -35,21 +36,6 @@ async fn main() -> Result<()> { println!("{:?}", error); } - // This is somewhat ugly. Right now this is the only place where we can - // do cleanup on shutdown. Thereby we do stuff in here, which doesn't have anything - // to do with the TaskHandler. - // - // 1. Remove any existing the unix sockets. - if settings.shared.use_unix_socket { - let unix_socket_path = settings.shared.unix_socket_path.clone(); - ctrlc::set_handler(move || { - if std::path::PathBuf::from(&unix_socket_path).exists() { - std::fs::remove_file(&unix_socket_path) - .expect("Failed to remove unix socket on shutdown"); - } - })?; - } - // Parse commandline options. let opt = Opt::from_args(); @@ -76,6 +62,28 @@ async fn main() -> Result<()> { let (sender, receiver) = channel(); let mut task_handler = TaskHandler::new(state.clone(), receiver); + // This section handles Shutdown via SigTerm/SigInt process signals + // 1. Remove the unix socket (if it exists). + // 2. Notify the TaskHandler, so it can shutdown gracefully. + // + // The actual program exit will be done via the TaskHandler. + let unix_socket_path = settings.shared.unix_socket_path.clone(); + let sender_clone = sender.clone(); + ctrlc::set_handler(move || { + // Clean up the unix socket + if settings.shared.use_unix_socket { + if std::path::PathBuf::from(&unix_socket_path).exists() { + std::fs::remove_file(&unix_socket_path) + .expect("Failed to remove unix socket on shutdown"); + } + } + + // Notify the task handler + sender_clone + .send(Message::DaemonShutdown) + .expect("Failed to send Message to TaskHandler on Shutdown"); + })?; + thread::spawn(move || { task_handler.run(); }); diff --git a/daemon/socket.rs b/daemon/socket.rs index 14213f7e9..9b89f6198 100644 --- a/daemon/socket.rs +++ b/daemon/socket.rs @@ -94,11 +94,6 @@ async fn handle_incoming( // The client requested the output of a task. // Since we allow streaming, this needs to be handled seperately. handle_follow(&pueue_directory, &mut socket, &state, message).await? - } else if let Message::DaemonShutdown = message { - // Simply shut down the daemon right after sending a success response. - let response = create_success_message("Daemon is shutting down"); - send_message(response, &mut socket).await?; - std::process::exit(0); } else { // Process a normal message. handle_message(message, &sender, &state) diff --git a/daemon/task_handler.rs b/daemon/task_handler.rs index 22ac2480d..b5fd283d0 100644 --- a/daemon/task_handler.rs +++ b/daemon/task_handler.rs @@ -53,6 +53,7 @@ impl TaskHandler { state.settings.daemon.pause_on_failure, ) }; + TaskHandler { state, receiver, @@ -66,22 +67,6 @@ impl TaskHandler { } } -/// The task handler needs to kill all child processes as soon, as the program exits. -/// This is needed to prevent detached processes. -impl Drop for TaskHandler { - fn drop(&mut self) { - let task_ids: Vec = self.children.keys().cloned().collect(); - for task_id in task_ids { - let mut child = self - .children - .remove(&task_id) - .expect("Failed killing children"); - info!("Killing child {}", task_id); - kill_child(task_id, &mut child, true); - } - } -} - impl TaskHandler { /// Main loop of the task handler. /// In here a few things happen: @@ -434,7 +419,7 @@ impl TaskHandler { /// This function is also responsible for waiting fn receive_commands(&mut self) { // Sleep for a few milliseconds. We don't want to hurt the CPU. - let timeout = Duration::from_millis(100); + let timeout = Duration::from_millis(200); // Don't use recv_timeout for now, until this bug get's fixed. // https://github.com/rust-lang/rust/issues/39364 //match self.receiver.recv_timeout(timeout) { @@ -452,6 +437,7 @@ impl TaskHandler { Message::Kill(message) => self.kill(message), Message::Send(message) => self.send(message), Message::Reset(children) => self.reset(children), + Message::DaemonShutdown => self.shutdown(), _ => info!("Received unhandled message {:?}", message), } } @@ -800,4 +786,27 @@ impl TaskHandler { self.callbacks.remove(*id); } } + + /// Gracefully shutdown the task handler. + /// This includes killing all child processes + /// + /// Afterwards we can actually exit the program + fn shutdown(&mut self) { + info!("Killing all children due to shutdown."); + + let task_ids: Vec = self.children.keys().cloned().collect(); + for task_id in task_ids { + let child = self.children.remove(&task_id); + + if let Some(mut child) = child { + info!("Killing child {}", &task_id); + kill_child(task_id, &mut child, true); + } else { + error!("Fail to get child {} for killing", &task_id); + } + } + + // Exit pueued + std::process::exit(0) + } } diff --git a/shared/settings.rs b/shared/settings.rs index d3a4c1992..58eac5a9f 100644 --- a/shared/settings.rs +++ b/shared/settings.rs @@ -102,12 +102,12 @@ impl Settings { /// configuration files at those locations. /// All configs will be merged by importance. fn parse_config(settings: &mut Config) -> Result<()> { - println!("Parsing config files"); + //println!("Parsing config files"); for directory in get_config_directories()?.into_iter() { let path = directory.join("pueue.yml"); - println!("Checking path: {:?}", &path); + //println!("Checking path: {:?}", &path); if path.exists() { - println!("Parsing config file at: {:?}", path); + //println!("Parsing config file at: {:?}", path); let config_file = config::File::with_name(path.to_str().unwrap()); settings.merge(config_file)?; }