Skip to content

Commit

Permalink
rt: fix deadlock in shutdown (#3228)
Browse files Browse the repository at this point in the history
Previously, the runtime shutdown logic would first-hand control over all cores
to a single thread, which would sequentially shut down all tasks on the core
and then wait for them to complete.

This could deadlock when one task is waiting for a later core's task to
complete. For example, in the newly added test, we have a `block_in_place` task
that is waiting for another task to be dropped. If the latter task adds its
core to the shutdown list later than the former, we end up waiting forever for
the `block_in_place` task to complete.

Additionally, there also was a bug wherein we'd attempt to park on the parker
after shutting it down which was fixed as part of the refactors above.

This change restructures the code to bring all tasks to a halt (and do any
parking needed) before we collapse to a single thread to avoid this deadlock.

There was also an issue in which canceled tasks would not unpark the
originating thread, due to what appears to be some sort of optimization gone
wrong. This has been fixed to be much more conservative in selecting when not
to unpark the source thread (this may be too conservative; please take a look
at the changes to `release()`).

Fixes: #2789
  • Loading branch information
bdonlan authored Dec 8, 2020
1 parent 62023df commit 57dffb9
Show file tree
Hide file tree
Showing 3 changed files with 52 additions and 25 deletions.
61 changes: 36 additions & 25 deletions tokio/src/runtime/thread_pool/worker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -78,11 +78,12 @@ pub(super) struct Shared {
/// Coordinates idle workers
idle: Idle,

/// Workers have have observed the shutdown signal
/// Cores that have observed the shutdown signal
///
/// The core is **not** placed back in the worker to avoid it from being
/// stolen by a thread that was spawned as part of `block_in_place`.
shutdown_workers: Mutex<Vec<(Box<Core>, Arc<Worker>)>>,
#[allow(clippy::vec_box)] // we're moving an already-boxed value
shutdown_cores: Mutex<Vec<Box<Core>>>,
}

/// Used to communicate with a worker from other threads.
Expand Down Expand Up @@ -157,7 +158,7 @@ pub(super) fn create(size: usize, park: Parker) -> (Arc<Shared>, Launch) {
remotes: remotes.into_boxed_slice(),
inject: queue::Inject::new(),
idle: Idle::new(size),
shutdown_workers: Mutex::new(vec![]),
shutdown_cores: Mutex::new(vec![]),
});

let mut launch = Launch(vec![]);
Expand Down Expand Up @@ -328,8 +329,10 @@ impl Context {
}
}

core.pre_shutdown(&self.worker);

// Signal shutdown
self.worker.shared.shutdown(core, self.worker.clone());
self.worker.shared.shutdown(core);
Err(())
}

Expand Down Expand Up @@ -546,11 +549,9 @@ impl Core {
}
}

// Shutdown the core
fn shutdown(&mut self, worker: &Worker) {
// Take the core
let mut park = self.park.take().expect("park missing");

// Signals all tasks to shut down, and waits for them to complete. Must run
// before we enter the single-threaded phase of shutdown processing.
fn pre_shutdown(&mut self, worker: &Worker) {
// Signal to all tasks to shut down.
for header in self.tasks.iter() {
header.shutdown();
Expand All @@ -564,8 +565,17 @@ impl Core {
}

// Wait until signalled
let park = self.park.as_mut().expect("park missing");
park.park().expect("park failed");
}
}

// Shutdown the core
fn shutdown(&mut self) {
assert!(self.tasks.is_empty());

// Take the core
let mut park = self.park.take().expect("park missing");

// Drain the queue
while self.next_local_task().is_some() {}
Expand Down Expand Up @@ -630,18 +640,23 @@ impl task::Schedule for Arc<Worker> {
use std::ptr::NonNull;

enum Immediate {
// Task has been synchronously removed from the Core owned by the
// current thread
Removed(Option<Task>),
Core(bool),
// Task is owned by another thread, so we need to notify it to clean
// up the task later.
MaybeRemote,
}

let immediate = CURRENT.with(|maybe_cx| {
let cx = match maybe_cx {
Some(cx) => cx,
None => return Immediate::Core(false),
None => return Immediate::MaybeRemote,
};

if !self.eq(&cx.worker) {
return Immediate::Core(cx.core.borrow().is_some());
// Task owned by another core, so we need to notify it.
return Immediate::MaybeRemote;
}

let mut maybe_core = cx.core.borrow_mut();
Expand All @@ -656,15 +671,15 @@ impl task::Schedule for Arc<Worker> {
}
}

Immediate::Core(false)
Immediate::MaybeRemote
});

// Checks if we were called from within a worker, allowing for immediate
// removal of a scheduled task. Else we have to go through the slower
// process below where we remotely mark a task as dropped.
let worker_has_core = match immediate {
match immediate {
Immediate::Removed(task) => return task,
Immediate::Core(worker_has_core) => worker_has_core,
Immediate::MaybeRemote => (),
};

// Track the task to be released by the worker that owns it
Expand All @@ -682,10 +697,6 @@ impl task::Schedule for Arc<Worker> {

self.remote().pending_drop.push(task);

if worker_has_core {
return None;
}

// The worker core has been handed off to another thread. In the
// event that the scheduler is currently shutting down, the thread
// that owns the task may be waiting on the release to complete
Expand Down Expand Up @@ -799,16 +810,16 @@ impl Shared {
/// its core back into its handle.
///
/// If all workers have reached this point, the final cleanup is performed.
fn shutdown(&self, core: Box<Core>, worker: Arc<Worker>) {
let mut workers = self.shutdown_workers.lock();
workers.push((core, worker));
fn shutdown(&self, core: Box<Core>) {
let mut cores = self.shutdown_cores.lock();
cores.push(core);

if workers.len() != self.remotes.len() {
if cores.len() != self.remotes.len() {
return;
}

for (mut core, worker) in workers.drain(..) {
core.shutdown(&worker);
for mut core in cores.drain(..) {
core.shutdown();
}

// Drain the injection queue
Expand Down
2 changes: 2 additions & 0 deletions tokio/src/time/driver/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -189,6 +189,8 @@ where

let mut lock = self.inner.lock();

assert!(!lock.is_shutdown);

let next_wake = lock.wheel.next_expiration_time();
lock.next_wake =
next_wake.map(|t| NonZeroU64::new(t).unwrap_or_else(|| NonZeroU64::new(1).unwrap()));
Expand Down
14 changes: 14 additions & 0 deletions tokio/tests/rt_threaded.rs
Original file line number Diff line number Diff line change
Expand Up @@ -382,6 +382,20 @@ fn max_threads() {
.unwrap();
}

#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn hang_on_shutdown() {
let (sync_tx, sync_rx) = std::sync::mpsc::channel::<()>();
tokio::spawn(async move {
tokio::task::block_in_place(|| sync_rx.recv().ok());
});

tokio::spawn(async {
tokio::time::sleep(std::time::Duration::from_secs(2)).await;
drop(sync_tx);
});
tokio::time::sleep(std::time::Duration::from_secs(1)).await;
}

fn rt() -> Runtime {
Runtime::new().unwrap()
}

0 comments on commit 57dffb9

Please sign in to comment.