Skip to content

Commit

Permalink
add(callbacks): queued_count and stashed_count callback vars
Browse files Browse the repository at this point in the history
  • Loading branch information
Nukesor committed Dec 1, 2024
1 parent d43beef commit 87d405f
Show file tree
Hide file tree
Showing 8 changed files with 114 additions and 29 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,7 @@ TLDR: The new task state representation is more verbose but significantly cleane
- Add `command` filter to `pueue status`. [#524](https://github.com/Nukesor/pueue/issues/524) [#560](https://github.com/Nukesor/pueue/pull/560)
- Allow `pueue status` to order tasks by `enqueue_at`. [#554](https://github.com/Nukesor/pueue/issues/554)
- Added Windows service on Windows to allow a true daemon experience. [#344](https://github.com/Nukesor/pueue/issues/344) [#567](https://github.com/Nukesor/pueue/pull/567)
- Add `queued_count` and `stashed_count` to callback template variables. This allows users to fire callbacks when whole groups are finished. [#578](https://github.com/Nukesor/pueue/issues/578)

### Fixed

Expand Down
20 changes: 19 additions & 1 deletion pueue/src/daemon/callbacks.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,20 +15,24 @@ use super::state_helper::LockedState;
/// Users can specify a callback that's fired whenever a task finishes.
/// The callback is performed by spawning a new subprocess.
pub fn spawn_callback(settings: &Settings, state: &mut LockedState, task: &Task) {
info!("Checking callback: {:?}", settings.daemon.callback);
// Return early, if there's no callback specified
let Some(template_string) = &settings.daemon.callback else {
return;
};
info!("Found callback: {template_string}");

// Build the command to be called from the template string in the configuration file.
let callback_command = match build_callback_command(settings, task, template_string) {
let callback_command = match build_callback_command(settings, state, task, template_string) {
Ok(callback_command) => callback_command,
Err(err) => {
error!("Failed to create callback command from template with error: {err}");
return;
}
};

info!("Callback started for task ");

let mut command = compile_shell_command(settings, &callback_command);

// Spawn the callback subprocess and log if it fails.
Expand All @@ -49,6 +53,7 @@ pub fn spawn_callback(settings: &Settings, state: &mut LockedState, task: &Task)
/// finished task.
pub fn build_callback_command(
settings: &Settings,
state: &mut LockedState,
task: &Task,
template_string: &str,
) -> Result<String, RenderError> {
Expand All @@ -62,7 +67,20 @@ pub fn build_callback_command(
parameters.insert("id", task.id.to_string());
parameters.insert("command", task.command.clone());
parameters.insert("path", (*task.path.to_string_lossy()).to_owned());

// Add group information to template
// This includes how many stashed and queued tasks are left in the group.
parameters.insert("group", task.group.clone());
let queued_tasks = state
.filter_tasks_of_group(|task| task.is_queued(), &task.group)
.matching_ids
.len();
parameters.insert("queued_count", queued_tasks.to_string());
let stashed_tasks = state
.filter_tasks_of_group(|task| task.is_stashed(), &task.group)
.matching_ids
.len();
parameters.insert("stashed_count", stashed_tasks.to_string());

// Result takes the TaskResult Enum strings, unless it didn't finish yet.
if let TaskStatus::Done { result, .. } = &task.status {
Expand Down
3 changes: 2 additions & 1 deletion pueue/src/daemon/process_handler/finish.rs
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,8 @@ pub fn handle_finished_tasks(settings: &Settings, state: &mut LockedState) {
None => TaskResult::Killed,
};

info!("Task {task_id} finished with result: {result:?}");

// Update the tasks's state and return a clone for callback handling.
let task = {
let task = state
Expand Down Expand Up @@ -147,7 +149,6 @@ fn get_finished(state: &mut LockedState) -> Vec<((usize, String, usize), Option<
// Child process did not exit yet
Ok(None) => continue,
Ok(_exit_status) => {
info!("Task {task_id} just finished");
finished.push(((*task_id, group.clone(), *worker_id), None));
}
}
Expand Down
43 changes: 43 additions & 0 deletions pueue/tests/daemon/integration/callback.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
use std::fs::read_to_string;

use anyhow::{Context, Result};

use crate::helper::*;

/// Make sure that callback commands are executed while variables are
/// templated into the command as expected.
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn test_callback_variables() -> Result<()> {
let (mut settings, tempdir) = daemon_base_setup()?;

// Configure the daemon to use a callback command that echos some variables into a file
// that's located in the temporary runtime directory of the daemon.
let tempdir_path = tempdir.path().to_path_buf();
let echo_command =
"echo '{{queued_count}}\n{{stashed_count}}\n{{command}}\n{{id}}\n{{result}}'";
settings.daemon.callback = Some(format!(
"{echo_command} > {}/testfile",
tempdir_path.to_string_lossy()
));
settings
.save(&Some(tempdir_path.join("pueue.yml")))
.context("Couldn't write pueue config to temporary directory")?;

// Create the daemon with the changed settings.
let daemon = daemon_with_settings(settings, tempdir).await?;
let shared = &daemon.settings.shared;

// Create one stashed task.
assert_success(create_stashed_task(shared, "stashed", None).await?);
// Create a task that'll then trigger the callback
assert_success(add_task(shared, "ls").await?);

// Give the callback command some time to be executed.
sleep_ms(3000).await;

let callback_output = read_to_string(tempdir_path.join("testfile"))?;

assert_eq!(callback_output, "0\n1\nls\n1\nSuccess\n");

Ok(())
}
1 change: 1 addition & 0 deletions pueue/tests/daemon/integration/mod.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
mod add;
mod aliases;
mod callback;
mod clean;
mod edit;
mod environment_variables;
Expand Down
32 changes: 5 additions & 27 deletions pueue/tests/daemon/integration/stashed.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,43 +4,22 @@ use pueue_lib::state::GroupStatus;
use rstest::rstest;

use pueue_lib::network::message::*;
use pueue_lib::settings::Shared;
use pueue_lib::task::*;

use crate::helper::*;

/// Helper to pause the whole daemon
pub async fn add_stashed_task(
shared: &Shared,
command: &str,
stashed: bool,
enqueue_at: Option<DateTime<Local>>,
) -> Result<Message> {
let mut message = create_add_message(shared, command);
message.stashed = stashed;
message.enqueue_at = enqueue_at;

send_message(shared, message)
.await
.context("Failed to to add task message")
}

/// Tasks can be stashed and scheduled for being enqueued at a specific point in time.
///
/// Furthermore these stashed tasks can then be manually enqueued again.
#[rstest]
#[case(true, None)]
#[case(true, Some(Local::now() + TimeDelta::try_minutes(2).unwrap()))]
#[case(false, Some(Local::now() + TimeDelta::try_minutes(2).unwrap()))]
#[case(None)]
#[case(Some(Local::now() + TimeDelta::try_minutes(2).unwrap()))]
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn test_enqueued_tasks(
#[case] stashed: bool,
#[case] enqueue_at: Option<DateTime<Local>>,
) -> Result<()> {
async fn test_enqueued_tasks(#[case] enqueue_at: Option<DateTime<Local>>) -> Result<()> {
let daemon = daemon().await?;
let shared = &daemon.settings.shared;

assert_success(add_stashed_task(shared, "sleep 10", stashed, enqueue_at).await?);
assert_success(create_stashed_task(shared, "sleep 10", enqueue_at).await?);

// The task should be added in stashed state.
let task = wait_for_task_condition(shared, 0, |task| task.is_stashed()).await?;
Expand Down Expand Up @@ -77,10 +56,9 @@ async fn test_delayed_tasks() -> Result<()> {
let shared = &daemon.settings.shared;

// The task will be stashed and automatically enqueued after about 1 second.
let response = add_stashed_task(
let response = create_stashed_task(
shared,
"sleep 10",
true,
Some(Local::now() + TimeDelta::try_seconds(1).unwrap()),
)
.await?;
Expand Down
27 changes: 27 additions & 0 deletions pueue/tests/helper/mod.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
//! This module contains helper functions, which are used by both, the client and daemon tests.
use ::log::{warn, LevelFilter};
use anyhow::Result;
use simplelog::{Config, ConfigBuilder, TermLogger, TerminalMode};
use tokio::io::{self, AsyncWriteExt};

pub use pueue_lib::state::PUEUE_DEFAULT_GROUP;
Expand Down Expand Up @@ -27,6 +29,31 @@ pub use wait::*;
// Global acceptable test timeout
const TIMEOUT: u64 = 5000;

/// Use this function to enable log output for in-runtime daemon output.
#[allow(dead_code)]
pub fn enable_logger() {
let level = LevelFilter::Debug;

// Try to initialize the logger with the timezone set to the Local time of the machine.
let mut builder = ConfigBuilder::new();
let logger_config = match builder.set_time_offset_to_local() {
Err(_) => {
warn!("Failed to determine the local time of this machine. Fallback to UTC.");
Config::default()
}
Ok(builder) => builder.build(),
};

// Init a terminal logger
TermLogger::init(
level,
logger_config.clone(),
TerminalMode::Stderr,
simplelog::ColorChoice::Auto,
)
.unwrap()
}

/// A helper function to sleep for ms time.
/// Only used to avoid the biolerplate of importing the same stuff all over the place.
pub async fn sleep_ms(ms: u64) {
Expand Down
16 changes: 16 additions & 0 deletions pueue/tests/helper/task.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ use std::env::vars;

use anyhow::{anyhow, Context, Result};

use chrono::{DateTime, Local};
use pueue_lib::network::message::*;
use pueue_lib::settings::*;
use pueue_lib::task::{Task, TaskStatus};
Expand All @@ -27,6 +28,21 @@ pub fn create_add_message(shared: &Shared, command: &str) -> AddMessage {
}
}

/// Helper to create a stashed task
pub async fn create_stashed_task(
shared: &Shared,
command: &str,
enqueue_at: Option<DateTime<Local>>,
) -> Result<Message> {
let mut message = create_add_message(shared, command);
message.stashed = true;
message.enqueue_at = enqueue_at;

send_message(shared, message)
.await
.context("Failed to to add task message")
}

/// Helper to either continue the daemon or start specific tasks
pub async fn start_tasks(shared: &Shared, tasks: TaskSelection) -> Result<Message> {
let message = StartMessage { tasks };
Expand Down

0 comments on commit 87d405f

Please sign in to comment.