Skip to content

Commit

Permalink
Merge pull request #368 from Nukesor/fix-flaky-unix-test
Browse files Browse the repository at this point in the history
fix: Flaky tests with parallel tasks
  • Loading branch information
Nukesor authored Oct 1, 2022
2 parents df2d29a + 4566a29 commit b0e6c35
Show file tree
Hide file tree
Showing 3 changed files with 13 additions and 7 deletions.
4 changes: 4 additions & 0 deletions daemon/task_handler/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,10 @@ impl TaskHandler {
/// - Whether whe should perform a shutdown.
/// - If the client requested a reset: reset the state if all children have been killed and handled.
/// - Check whether we can spawn new tasks.
///
/// This first step waits for 200ms while receiving new messages.
/// This prevents this loop from running hot, but also means that we only check if a new task
/// can be scheduled or if tasks are finished, every 200ms.
pub fn run(&mut self) {
loop {
self.receive_messages();
Expand Down
14 changes: 8 additions & 6 deletions tests/daemon/worker_environment_variables.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ async fn test_single_worker() -> Result<()> {
let daemon = daemon().await?;
let shared = &daemon.settings.shared;

// Add some tasks that instantly finish.
// Add some tasks that finish instantly.
for _ in 0..3 {
assert_success(add_env_task(shared, "sleep 0.1").await?);
}
Expand Down Expand Up @@ -42,21 +42,23 @@ async fn test_multiple_worker() -> Result<()> {
let daemon = daemon().await?;
let shared = &daemon.settings.shared;

// Spawn three tasks and wait for them
// Spawn three tasks that run in parallel and wait for them.
for _ in 0..3 {
assert_success(add_env_task_to_group(shared, "sleep 0.1", "test_3").await?);
assert_success(add_env_task_to_group(shared, "sleep 0.3", "test_3").await?);
}
wait_for_task_condition(shared, 2, |task| task.is_done()).await?;

// The first three tasks should have the same worker id's as the task ids.
// They ran in parallel and each should have their own worker id assigned.
let state = get_state(shared).await?;
for task_id in 0..3 {
assert_worker_envs(shared, &state, task_id, task_id, "test_3").await?;
}

// Spawn two more tasks and wait for them
// Spawn two more tasks and wait for them.
// They should now get worker0 and worker1, as there aren't any other running tasks.
for _ in 0..2 {
assert_success(add_env_task_to_group(shared, "sleep 0.5", "test_3").await?);
assert_success(add_env_task_to_group(shared, "sleep 0.3", "test_3").await?);
}
wait_for_task_condition(shared, 4, |task| task.is_done()).await?;

Expand All @@ -78,7 +80,7 @@ async fn test_worker_for_new_pool() -> Result<()> {
// Add a new group
add_group_with_slots(shared, "testgroup", 1).await?;

// Add some tasks that instantly finish.
// Add a tasks that finishes instantly.
assert_success(add_env_task_to_group(shared, "sleep 0.1", "testgroup").await?);
wait_for_task_condition(shared, 0, |task| task.is_done()).await?;

Expand Down
2 changes: 1 addition & 1 deletion tests/helper/env.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ pub async fn assert_worker_envs(
assert_eq!(
task.envs.get("PUEUE_WORKER_ID"),
Some(&worker.to_string()),
"Worker id hasn't been correctly for task {task_id}",
"Worker id hasn't been correctly set for task {task_id}",
);

// Get the log output for the task.
Expand Down

0 comments on commit b0e6c35

Please sign in to comment.