From 6b845cb70f865216827bce35c47b0e13db0f44da Mon Sep 17 00:00:00 2001 From: Arne Beer <contact@arne.beer> Date: Thu, 15 Feb 2024 18:03:42 +0100 Subject: [PATCH] add: Allow 0 in group limits for unlimited parallel tasks --- CHANGELOG.md | 3 +- pueue/src/client/cli.rs | 18 ++-------- pueue/src/daemon/task_handler/spawn_task.rs | 6 ++++ .../daemon/integration/parallel_tasks.rs | 33 ++++++++++++++++++- 4 files changed, 43 insertions(+), 17 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 4b676f79..7ee698c2 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -8,8 +8,9 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), ### Added -- Support modification of task priorities via `pueue edit --priority/-o` and `pueue restart --edit-priority/-o` [#449](https://github.com/Nukesor/pueue/issues/449) +- Support modification of task priorities via `pueue edit --priority/-o` and `pueue restart --edit-priority/-o` [#449](https://github.com/Nukesor/pueue/issues/449). - If no output directory is provided in `completions`, the generated file is printed to `stdout` [#489](https://github.com/Nukesor/pueue/issues/489). +- Allow setting the `parallel_tasks` value of groups to `0`. Setting this value allows unlimited tasks for that group [#500](https://github.com/Nukesor/pueue/issues/500). ### Fixed diff --git a/pueue/src/client/cli.rs b/pueue/src/client/cli.rs index fbf21cbf..8a74d05a 100644 --- a/pueue/src/client/cli.rs +++ b/pueue/src/client/cli.rs @@ -477,7 +477,7 @@ https://github.com/Nukesor/pueue/issues/350#issue-1359083118" This limit is only considered when tasks are scheduled.")] Parallel { /// The amount of allowed parallel tasks. - #[arg(value_parser = min_one)] + /// Setting this to 0 means an unlimited amount of parallel tasks. parallel_tasks: Option<usize>, /// Set the amount for a specific group. @@ -504,7 +504,8 @@ pub enum GroupCommand { name: String, /// Set the amount of parallel tasks this group can have. - #[arg(short, long, value_parser = min_one)] + /// Setting this to 0 means an unlimited amount of parallel tasks. + #[arg(short, long)] parallel: Option<usize>, }, @@ -573,16 +574,3 @@ fn parse_delay_until(src: &str) -> Result<DateTime<Local>, String> { "could not parse as seconds or date expression", )) } - -/// Validator function. The input string has to be parsable as int and bigger than 0 -fn min_one(value: &str) -> Result<usize, String> { - match value.parse::<usize>() { - Ok(value) => { - if value < 1 { - return Err("You must provide a value that's bigger than 0".into()); - } - Ok(value) - } - Err(_) => Err("Failed to parse integer".into()), - } -} diff --git a/pueue/src/daemon/task_handler/spawn_task.rs b/pueue/src/daemon/task_handler/spawn_task.rs index dc94dc72..c5563394 100644 --- a/pueue/src/daemon/task_handler/spawn_task.rs +++ b/pueue/src/daemon/task_handler/spawn_task.rs @@ -49,6 +49,12 @@ impl TaskHandler { return false; } + // If parallel tasks are set to `0`, this means an unlimited amount of tasks may + // run at any given time. + if group.parallel_tasks == 0 { + return true; + } + // Get the currently running tasks by looking at the actually running processes. // They're sorted by group, which makes this quite convenient. let running_tasks = match self.children.0.get(&task.group) { diff --git a/pueue/tests/daemon/integration/parallel_tasks.rs b/pueue/tests/daemon/integration/parallel_tasks.rs index 96d6cd29..68e53100 100644 --- a/pueue/tests/daemon/integration/parallel_tasks.rs +++ b/pueue/tests/daemon/integration/parallel_tasks.rs @@ -1,7 +1,7 @@ use anyhow::Result; use pretty_assertions::assert_eq; -use pueue_lib::task::*; +use pueue_lib::{network::message::ParallelMessage, task::*}; use crate::helper::*; @@ -56,3 +56,34 @@ async fn test_parallel_tasks() -> Result<()> { } Ok(()) } + +/// Test that a group with a parallel limit of `0` has an unlimited amount of tasks. +#[tokio::test(flavor = "multi_thread", worker_threads = 2)] +async fn test_unlimited_parallel_tasks() -> Result<()> { + let daemon = daemon().await?; + let shared = &daemon.settings.shared; + + // Add a new group with 1 slot + add_group_with_slots(shared, "testgroup", 1).await?; + + // Add 10 long running tasks to this group, only 1 should be immediately started. + for _ in 0..10 { + assert_success(add_task_to_group(shared, "sleep 600", "testgroup").await?); + } + // Ensure the first tasks is started. + wait_for_task_condition(shared, 0, |task| task.is_running()).await?; + + // Update the parallel limit of the group to 0 + let message = ParallelMessage { + group: "testgroup".to_string(), + parallel_tasks: 0, + }; + assert_success(send_message(shared, message).await?); + + // Make sure all other tasks are started as well in quick succession. + for task_id in 1..10 { + wait_for_task_condition(shared, task_id, |task| task.is_running()).await?; + } + + Ok(()) +}