diff --git a/CHANGELOG.md b/CHANGELOG.md index 03e14123..bd2722c3 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -81,6 +81,7 @@ TLDR: The new task state representation is more verbose but significantly cleane - Add `command` filter to `pueue status`. [#524](https://github.com/Nukesor/pueue/issues/524) [#560](https://github.com/Nukesor/pueue/pull/560) - Allow `pueue status` to order tasks by `enqueue_at`. [#554](https://github.com/Nukesor/pueue/issues/554) - Added Windows service on Windows to allow a true daemon experience. [#344](https://github.com/Nukesor/pueue/issues/344) [#567](https://github.com/Nukesor/pueue/pull/567) +- Add `queued_count` and `stashed_count` to callback template variables. This allows users to fire callbacks when whole groups are finished. [#578](https://github.com/Nukesor/pueue/issues/578) ### Fixed diff --git a/pueue/src/daemon/callbacks.rs b/pueue/src/daemon/callbacks.rs index 0e05513f..cdcd7765 100644 --- a/pueue/src/daemon/callbacks.rs +++ b/pueue/src/daemon/callbacks.rs @@ -15,13 +15,15 @@ use super::state_helper::LockedState; /// Users can specify a callback that's fired whenever a task finishes. /// The callback is performed by spawning a new subprocess. pub fn spawn_callback(settings: &Settings, state: &mut LockedState, task: &Task) { + info!("Checking callback: {:?}", settings.daemon.callback); // Return early, if there's no callback specified let Some(template_string) = &settings.daemon.callback else { return; }; + info!("Found callback: {template_string}"); // Build the command to be called from the template string in the configuration file. - let callback_command = match build_callback_command(settings, task, template_string) { + let callback_command = match build_callback_command(settings, state, task, template_string) { Ok(callback_command) => callback_command, Err(err) => { error!("Failed to create callback command from template with error: {err}"); @@ -29,6 +31,8 @@ pub fn spawn_callback(settings: &Settings, state: &mut LockedState, task: &Task) } }; + info!("Callback started for task "); + let mut command = compile_shell_command(settings, &callback_command); // Spawn the callback subprocess and log if it fails. @@ -49,6 +53,7 @@ pub fn spawn_callback(settings: &Settings, state: &mut LockedState, task: &Task) /// finished task. pub fn build_callback_command( settings: &Settings, + state: &mut LockedState, task: &Task, template_string: &str, ) -> Result { @@ -62,7 +67,20 @@ pub fn build_callback_command( parameters.insert("id", task.id.to_string()); parameters.insert("command", task.command.clone()); parameters.insert("path", (*task.path.to_string_lossy()).to_owned()); + + // Add group information to template + // This includes how many stashed and queued tasks are left in the group. parameters.insert("group", task.group.clone()); + let queued_tasks = state + .filter_tasks_of_group(|task| task.is_queued(), &task.group) + .matching_ids + .len(); + parameters.insert("queued_count", queued_tasks.to_string()); + let stashed_tasks = state + .filter_tasks_of_group(|task| task.is_stashed(), &task.group) + .matching_ids + .len(); + parameters.insert("stashed_count", stashed_tasks.to_string()); // Result takes the TaskResult Enum strings, unless it didn't finish yet. if let TaskStatus::Done { result, .. } = &task.status { diff --git a/pueue/src/daemon/process_handler/finish.rs b/pueue/src/daemon/process_handler/finish.rs index a124f355..292a18a7 100644 --- a/pueue/src/daemon/process_handler/finish.rs +++ b/pueue/src/daemon/process_handler/finish.rs @@ -97,6 +97,8 @@ pub fn handle_finished_tasks(settings: &Settings, state: &mut LockedState) { None => TaskResult::Killed, }; + info!("Task {task_id} finished with result: {result:?}"); + // Update the tasks's state and return a clone for callback handling. let task = { let task = state @@ -147,7 +149,6 @@ fn get_finished(state: &mut LockedState) -> Vec<((usize, String, usize), Option< // Child process did not exit yet Ok(None) => continue, Ok(_exit_status) => { - info!("Task {task_id} just finished"); finished.push(((*task_id, group.clone(), *worker_id), None)); } } diff --git a/pueue/tests/daemon/integration/callback.rs b/pueue/tests/daemon/integration/callback.rs new file mode 100644 index 00000000..c5021d9e --- /dev/null +++ b/pueue/tests/daemon/integration/callback.rs @@ -0,0 +1,43 @@ +use std::fs::read_to_string; + +use anyhow::{Context, Result}; + +use crate::helper::*; + +/// Make sure that callback commands are executed while variables are +/// templated into the command as expected. +#[tokio::test(flavor = "multi_thread", worker_threads = 2)] +async fn test_callback_variables() -> Result<()> { + let (mut settings, tempdir) = daemon_base_setup()?; + + // Configure the daemon to use a callback command that echos some variables into a file + // that's located in the temporary runtime directory of the daemon. + let tempdir_path = tempdir.path().to_path_buf(); + let echo_command = + "echo '{{queued_count}}\n{{stashed_count}}\n{{command}}\n{{id}}\n{{result}}'"; + settings.daemon.callback = Some(format!( + "{echo_command} > {}/testfile", + tempdir_path.to_string_lossy() + )); + settings + .save(&Some(tempdir_path.join("pueue.yml"))) + .context("Couldn't write pueue config to temporary directory")?; + + // Create the daemon with the changed settings. + let daemon = daemon_with_settings(settings, tempdir).await?; + let shared = &daemon.settings.shared; + + // Create one stashed task. + assert_success(create_stashed_task(shared, "stashed", None).await?); + // Create a task that'll then trigger the callback + assert_success(add_task(shared, "ls").await?); + + // Give the callback command some time to be executed. + sleep_ms(3000).await; + + let callback_output = read_to_string(tempdir_path.join("testfile"))?; + + assert_eq!(callback_output, "0\n1\nls\n1\nSuccess\n"); + + Ok(()) +} diff --git a/pueue/tests/daemon/integration/mod.rs b/pueue/tests/daemon/integration/mod.rs index 65099ac7..509d6b5d 100644 --- a/pueue/tests/daemon/integration/mod.rs +++ b/pueue/tests/daemon/integration/mod.rs @@ -1,5 +1,6 @@ mod add; mod aliases; +mod callback; mod clean; mod edit; mod environment_variables; diff --git a/pueue/tests/daemon/integration/stashed.rs b/pueue/tests/daemon/integration/stashed.rs index beee041a..915e81d3 100644 --- a/pueue/tests/daemon/integration/stashed.rs +++ b/pueue/tests/daemon/integration/stashed.rs @@ -4,43 +4,22 @@ use pueue_lib::state::GroupStatus; use rstest::rstest; use pueue_lib::network::message::*; -use pueue_lib::settings::Shared; use pueue_lib::task::*; use crate::helper::*; -/// Helper to pause the whole daemon -pub async fn add_stashed_task( - shared: &Shared, - command: &str, - stashed: bool, - enqueue_at: Option>, -) -> Result { - let mut message = create_add_message(shared, command); - message.stashed = stashed; - message.enqueue_at = enqueue_at; - - send_message(shared, message) - .await - .context("Failed to to add task message") -} - /// Tasks can be stashed and scheduled for being enqueued at a specific point in time. /// /// Furthermore these stashed tasks can then be manually enqueued again. #[rstest] -#[case(true, None)] -#[case(true, Some(Local::now() + TimeDelta::try_minutes(2).unwrap()))] -#[case(false, Some(Local::now() + TimeDelta::try_minutes(2).unwrap()))] +#[case(None)] +#[case(Some(Local::now() + TimeDelta::try_minutes(2).unwrap()))] #[tokio::test(flavor = "multi_thread", worker_threads = 2)] -async fn test_enqueued_tasks( - #[case] stashed: bool, - #[case] enqueue_at: Option>, -) -> Result<()> { +async fn test_enqueued_tasks(#[case] enqueue_at: Option>) -> Result<()> { let daemon = daemon().await?; let shared = &daemon.settings.shared; - assert_success(add_stashed_task(shared, "sleep 10", stashed, enqueue_at).await?); + assert_success(create_stashed_task(shared, "sleep 10", enqueue_at).await?); // The task should be added in stashed state. let task = wait_for_task_condition(shared, 0, |task| task.is_stashed()).await?; @@ -77,10 +56,9 @@ async fn test_delayed_tasks() -> Result<()> { let shared = &daemon.settings.shared; // The task will be stashed and automatically enqueued after about 1 second. - let response = add_stashed_task( + let response = create_stashed_task( shared, "sleep 10", - true, Some(Local::now() + TimeDelta::try_seconds(1).unwrap()), ) .await?; diff --git a/pueue/tests/helper/mod.rs b/pueue/tests/helper/mod.rs index bc0a96ec..85cfa053 100644 --- a/pueue/tests/helper/mod.rs +++ b/pueue/tests/helper/mod.rs @@ -1,5 +1,7 @@ //! This module contains helper functions, which are used by both, the client and daemon tests. +use ::log::{warn, LevelFilter}; use anyhow::Result; +use simplelog::{Config, ConfigBuilder, TermLogger, TerminalMode}; use tokio::io::{self, AsyncWriteExt}; pub use pueue_lib::state::PUEUE_DEFAULT_GROUP; @@ -27,6 +29,31 @@ pub use wait::*; // Global acceptable test timeout const TIMEOUT: u64 = 5000; +/// Use this function to enable log output for in-runtime daemon output. +#[allow(dead_code)] +pub fn enable_logger() { + let level = LevelFilter::Debug; + + // Try to initialize the logger with the timezone set to the Local time of the machine. + let mut builder = ConfigBuilder::new(); + let logger_config = match builder.set_time_offset_to_local() { + Err(_) => { + warn!("Failed to determine the local time of this machine. Fallback to UTC."); + Config::default() + } + Ok(builder) => builder.build(), + }; + + // Init a terminal logger + TermLogger::init( + level, + logger_config.clone(), + TerminalMode::Stderr, + simplelog::ColorChoice::Auto, + ) + .unwrap() +} + /// A helper function to sleep for ms time. /// Only used to avoid the biolerplate of importing the same stuff all over the place. pub async fn sleep_ms(ms: u64) { diff --git a/pueue/tests/helper/task.rs b/pueue/tests/helper/task.rs index b7e87735..4cca395a 100644 --- a/pueue/tests/helper/task.rs +++ b/pueue/tests/helper/task.rs @@ -3,6 +3,7 @@ use std::env::vars; use anyhow::{anyhow, Context, Result}; +use chrono::{DateTime, Local}; use pueue_lib::network::message::*; use pueue_lib::settings::*; use pueue_lib::task::{Task, TaskStatus}; @@ -27,6 +28,21 @@ pub fn create_add_message(shared: &Shared, command: &str) -> AddMessage { } } +/// Helper to create a stashed task +pub async fn create_stashed_task( + shared: &Shared, + command: &str, + enqueue_at: Option>, +) -> Result { + let mut message = create_add_message(shared, command); + message.stashed = true; + message.enqueue_at = enqueue_at; + + send_message(shared, message) + .await + .context("Failed to to add task message") +} + /// Helper to either continue the daemon or start specific tasks pub async fn start_tasks(shared: &Shared, tasks: TaskSelection) -> Result { let message = StartMessage { tasks };