Skip to content

Commit

Permalink
Fixed flaky worker_steal_count test
Browse files Browse the repository at this point in the history
  • Loading branch information
jofas committed Oct 23, 2024
1 parent da745ff commit a47cc64
Showing 1 changed file with 42 additions and 21 deletions.
63 changes: 42 additions & 21 deletions tokio/tests/rt_unstable_metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
))]

use std::future::Future;
use std::sync::{Arc, Mutex};
use std::sync::{mpsc, Arc, Mutex};
use std::task::Poll;
use std::thread;
use tokio::macros::support::poll_fn;
Expand Down Expand Up @@ -295,37 +295,34 @@ fn worker_noop_count() {
}

#[test]
#[ignore] // this test is flaky, see https://github.com/tokio-rs/tokio/issues/6470
fn worker_steal_count() {
// This metric only applies to the multi-threaded runtime.
//
// We use a blocking channel to backup one worker thread.
use std::sync::mpsc::channel;

let rt = threaded_no_lifo();
let metrics = rt.metrics();

rt.block_on(async {
let (tx, rx) = channel();

// Move to the runtime.
tokio::spawn(async move {
// Spawn the task that sends to the channel
//
// Since the lifo slot is disabled, this task is stealable.
tokio::spawn(async move {
tx.send(()).unwrap();
});
let mut successfully_spawned_stealable_task = false;

// Blocking receive on the channel.
rx.recv().unwrap();
})
.await
.unwrap();
rt.block_on(async {
for _ in 0..10 {
// The call to `try_spawn_stealable_task` may time out, which means
// that the sending task couldn't be scheduled due to a deadlock in
// the runtime.
// This is expected behaviour, we just retry until we succeed or
// exhaust all tries, the latter causing this test to fail.
if try_spawn_stealable_task().await.is_ok() {
successfully_spawned_stealable_task = true;
break;
}
}
});

drop(rt);

if !successfully_spawned_stealable_task {
panic!("exhausted every try to schedule the stealable task");
}

let n: u64 = (0..metrics.num_workers())
.map(|i| metrics.worker_steal_count(i))
.sum();
Expand Down Expand Up @@ -838,6 +835,30 @@ fn io_driver_ready_count() {
assert_eq!(metrics.io_driver_ready_count(), 1);
}

async fn try_spawn_stealable_task() -> Result<(), mpsc::RecvTimeoutError> {
// We use a blocking channel to synchronize the tasks.
let (tx, rx) = mpsc::channel();

// Make sure we are in the context of the runtime.
tokio::spawn(async move {
// Spawn the task that sends to the channel.
//
// Note that the runtime needs to have the lifo slot disabled to make
// this task stealable.
tokio::spawn(async move {
tx.send(()).unwrap();
});

// Blocking receive on the channel, timing out if the sending task
// wasn't scheduled in time.
rx.recv_timeout(Duration::from_secs(1))
})
.await
.unwrap()?;

Ok(())
}

fn current_thread() -> Runtime {
tokio::runtime::Builder::new_current_thread()
.enable_all()
Expand Down

0 comments on commit a47cc64

Please sign in to comment.