Skip to content

Commit

Permalink
Rework pueued shutdown and signal handling
Browse files Browse the repository at this point in the history
  • Loading branch information
Nukesor committed Oct 16, 2020
1 parent 517e082 commit babca0f
Show file tree
Hide file tree
Showing 6 changed files with 80 additions and 41 deletions.
2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ async-trait = "0.1"
dirs = "3"
chrono = { version = "^0.4", features = ["serde"] }
chrono-english = "^0.1.0"
ctrlc = "3"
ctrlc = { version = "3", features = ["termination"] }
rand = "^0.7"
strum = "^0.19"
strum_macros = "^0.19"
Expand Down
29 changes: 29 additions & 0 deletions daemon/instructions.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
use std::collections::BTreeMap;
use std::sync::mpsc::Sender;

use log::debug;

use pueue::log::{clean_log_handles, read_and_compress_log_files};
use pueue::message::*;
use pueue::state::SharedState;
Expand Down Expand Up @@ -32,6 +34,7 @@ pub fn handle_message(message: Message, sender: &Sender<Message>, state: &Shared
Message::Status => get_status(state),
Message::Log(message) => get_log(message, state),
Message::Parallel(message) => set_parallel_tasks(message, state),
Message::DaemonShutdown => shutdown(sender, state),
_ => create_failure_message("Not implemented yet"),
}
}
Expand Down Expand Up @@ -542,3 +545,29 @@ fn set_parallel_tasks(message: ParallelMessage, state: &SharedState) -> Message
group
))
}

/// Initialize the shutdown procedure.
/// At first, the unix socket will be removed.
///
/// Next, the DaemonShutdown Message will be forwarded to the TaskHandler.
/// The TaskHandler then gracefully shuts down all child processes
/// and exits with std::proces::exit(0).
fn shutdown(sender: &Sender<Message>, state: &SharedState) -> Message {
// Remove the unix socket
{
let state = state.lock().unwrap();
if state.settings.shared.use_unix_socket {
let path = &state.settings.shared.unix_socket_path;
debug!("Check if a unit socket exists.");
if std::path::PathBuf::from(&path).exists() {
std::fs::remove_file(&path).expect("Failed to remove unix socket on shutdown");
}
debug!("Removed the unix socket.");
}
}

// Notify the task handler
sender.send(Message::DaemonShutdown).expect(SENDER_ERR);

create_success_message("Daemon is shutting down")
}
36 changes: 21 additions & 15 deletions daemon/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ use anyhow::Result;
use simplelog::{Config, LevelFilter, SimpleLogger};
use structopt::StructOpt;

use pueue::message::Message;
use pueue::settings::Settings;
use pueue::state::State;

Expand All @@ -35,21 +36,6 @@ async fn main() -> Result<()> {
println!("{:?}", error);
}

// This is somewhat ugly. Right now this is the only place where we can
// do cleanup on shutdown. Thereby we do stuff in here, which doesn't have anything
// to do with the TaskHandler.
//
// 1. Remove any existing the unix sockets.
if settings.shared.use_unix_socket {
let unix_socket_path = settings.shared.unix_socket_path.clone();
ctrlc::set_handler(move || {
if std::path::PathBuf::from(&unix_socket_path).exists() {
std::fs::remove_file(&unix_socket_path)
.expect("Failed to remove unix socket on shutdown");
}
})?;
}

// Parse commandline options.
let opt = Opt::from_args();

Expand All @@ -76,6 +62,26 @@ async fn main() -> Result<()> {
let (sender, receiver) = channel();
let mut task_handler = TaskHandler::new(state.clone(), receiver);

// This section handles Shutdown via SigTerm/SigInt process signals
// 1. Remove the unix socket (if it exists).
// 2. Notify the TaskHandler, so it can shutdown gracefully.
//
// The actual program exit will be done via the TaskHandler.
let unix_socket_path = settings.shared.unix_socket_path.clone();
let sender_clone = sender.clone();
ctrlc::set_handler(move || {
// Clean up the unix socket if we're using it and it exists.
if settings.shared.use_unix_socket && std::path::PathBuf::from(&unix_socket_path).exists() {
std::fs::remove_file(&unix_socket_path)
.expect("Failed to remove unix socket on shutdown");
}

// Notify the task handler
sender_clone
.send(Message::DaemonShutdown)
.expect("Failed to send Message to TaskHandler on Shutdown");
})?;

thread::spawn(move || {
task_handler.run();
});
Expand Down
5 changes: 0 additions & 5 deletions daemon/socket.rs
Original file line number Diff line number Diff line change
Expand Up @@ -94,11 +94,6 @@ async fn handle_incoming(
// The client requested the output of a task.
// Since we allow streaming, this needs to be handled seperately.
handle_follow(&pueue_directory, &mut socket, &state, message).await?
} else if let Message::DaemonShutdown = message {
// Simply shut down the daemon right after sending a success response.
let response = create_success_message("Daemon is shutting down");
send_message(response, &mut socket).await?;
std::process::exit(0);
} else {
// Process a normal message.
handle_message(message, &sender, &state)
Expand Down
43 changes: 26 additions & 17 deletions daemon/task_handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ impl TaskHandler {
state.settings.daemon.pause_on_failure,
)
};

TaskHandler {
state,
receiver,
Expand All @@ -66,22 +67,6 @@ impl TaskHandler {
}
}

/// The task handler needs to kill all child processes as soon, as the program exits.
/// This is needed to prevent detached processes.
impl Drop for TaskHandler {
fn drop(&mut self) {
let task_ids: Vec<usize> = self.children.keys().cloned().collect();
for task_id in task_ids {
let mut child = self
.children
.remove(&task_id)
.expect("Failed killing children");
info!("Killing child {}", task_id);
kill_child(task_id, &mut child, true);
}
}
}

impl TaskHandler {
/// Main loop of the task handler.
/// In here a few things happen:
Expand Down Expand Up @@ -434,7 +419,7 @@ impl TaskHandler {
/// This function is also responsible for waiting
fn receive_commands(&mut self) {
// Sleep for a few milliseconds. We don't want to hurt the CPU.
let timeout = Duration::from_millis(100);
let timeout = Duration::from_millis(200);
// Don't use recv_timeout for now, until this bug get's fixed.
// https://github.com/rust-lang/rust/issues/39364
//match self.receiver.recv_timeout(timeout) {
Expand All @@ -452,6 +437,7 @@ impl TaskHandler {
Message::Kill(message) => self.kill(message),
Message::Send(message) => self.send(message),
Message::Reset(children) => self.reset(children),
Message::DaemonShutdown => self.shutdown(),
_ => info!("Received unhandled message {:?}", message),
}
}
Expand Down Expand Up @@ -800,4 +786,27 @@ impl TaskHandler {
self.callbacks.remove(*id);
}
}

/// Gracefully shutdown the task handler.
/// This includes killing all child processes
///
/// Afterwards we can actually exit the program
fn shutdown(&mut self) {
info!("Killing all children due to shutdown.");

let task_ids: Vec<usize> = self.children.keys().cloned().collect();
for task_id in task_ids {
let child = self.children.remove(&task_id);

if let Some(mut child) = child {
info!("Killing child {}", &task_id);
kill_child(task_id, &mut child, true);
} else {
error!("Fail to get child {} for killing", &task_id);
}
}

// Exit pueued
std::process::exit(0)
}
}
6 changes: 3 additions & 3 deletions shared/settings.rs
Original file line number Diff line number Diff line change
Expand Up @@ -102,12 +102,12 @@ impl Settings {
/// configuration files at those locations.
/// All configs will be merged by importance.
fn parse_config(settings: &mut Config) -> Result<()> {
println!("Parsing config files");
//println!("Parsing config files");
for directory in get_config_directories()?.into_iter() {
let path = directory.join("pueue.yml");
println!("Checking path: {:?}", &path);
//println!("Checking path: {:?}", &path);
if path.exists() {
println!("Parsing config file at: {:?}", path);
//println!("Parsing config file at: {:?}", path);
let config_file = config::File::with_name(path.to_str().unwrap());
settings.merge(config_file)?;
}
Expand Down

0 comments on commit babca0f

Please sign in to comment.