Skip to content

Commit

Permalink
Fix waker behavior when invoked before a poll finishes
Browse files Browse the repository at this point in the history
It's possible for a task's waker to be invoked in the middle of a call
to that task's `poll` by the executor. We had accounted for that
possibility if the task called *its own* waker, but that's not good
enough: the waker can escape to other threads that can invoke it before
`poll` finishes (e.g., if the task blocks to acquire a lock).

This change fixes the waker behavior by clarifying the semantics of a
call to `wake`: a task whose waker is invoked should not be blocked
*when it next returns Pending to the executor*, and should be woken if
that has already happened. To do this, we introduce a new `Sleeping`
state for tasks, that has the same semantics as `Blocked` but that is
recognized by waker invocations, which will only unblock a task in
`Sleeping` state. This also removes the special case "woken by self"
behavior -- being woken by *any* thread should be enough to trigger this
sleep logic.
  • Loading branch information
jamesbornholt committed Sep 17, 2021
1 parent b37e984 commit 06a74d0
Show file tree
Hide file tree
Showing 7 changed files with 158 additions and 50 deletions.
4 changes: 1 addition & 3 deletions src/asynch.rs
Original file line number Diff line number Diff line change
Expand Up @@ -148,9 +148,7 @@ pub fn block_on<F: Future>(future: F) -> F::Output {
match future.as_mut().poll(cx) {
Poll::Ready(result) => break result,
Poll::Pending => {
ExecutionState::with(|state| {
state.current_mut().block_unless_self_woken();
});
ExecutionState::with(|state| state.current_mut().sleep_unless_woken());
}
}

Expand Down
51 changes: 32 additions & 19 deletions src/runtime/execution.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use crate::runtime::failure::{init_panic_hook, persist_failure, persist_task_failure};
use crate::runtime::storage::{StorageKey, StorageMap};
use crate::runtime::task::clock::VectorClock;
use crate::runtime::task::{Task, TaskId, TaskState, DEFAULT_INLINE_TASKS};
use crate::runtime::task::{Task, TaskId, DEFAULT_INLINE_TASKS};
use crate::runtime::thread::continuation::PooledContinuation;
use crate::scheduler::{Schedule, Scheduler};
use crate::{Config, MaxSteps};
Expand Down Expand Up @@ -62,7 +62,12 @@ impl Execution {

EXECUTION_STATE.set(&state, move || {
// Spawn `f` as the first task
ExecutionState::spawn_thread(f, config.stack_size, None, Some(VectorClock::new()));
ExecutionState::spawn_thread(
f,
config.stack_size,
Some("main-thread".to_string()),
Some(VectorClock::new()),
);

// Run the test to completion
while self.step(config) {}
Expand Down Expand Up @@ -93,21 +98,29 @@ impl Execution {
NextStep::Task(Rc::clone(&task.continuation))
}
ScheduledTask::Finished => {
let task_states = state
.tasks
.iter()
.map(|t| (t.id, t.state, t.detached))
.collect::<SmallVec<[_; DEFAULT_INLINE_TASKS]>>();
if task_states
.iter()
.any(|(_, s, detached)| !detached && *s == TaskState::Blocked)
{
// The scheduler decided we're finished, so there are either no runnable tasks,
// or all runnable tasks are detached and there are no unfinished attached
// tasks. Therefore, it's a deadlock if there are unfinished attached tasks.
if state.tasks.iter().any(|t| !t.finished() && !t.detached) {
let blocked_tasks = state
.tasks
.iter()
.filter(|t| !t.finished())
.map(|t| {
format!(
"{} (task {}{}{})",
t.name().unwrap_or_else(|| "<unknown>".to_string()),
t.id().0,
if t.detached { ", detached" } else { "" },
if t.sleeping() { ", pending future" } else { "" },
)
})
.collect::<Vec<_>>();
NextStep::Failure(
format!("deadlock! runnable tasks: {:?}", task_states),
format!("deadlock! blocked tasks: [{}]", blocked_tasks.join(", ")),
state.current_schedule.clone(),
)
} else {
debug_assert!(state.tasks.iter().all(|t| t.detached || t.finished()));
NextStep::Finished
}
}
Expand Down Expand Up @@ -502,21 +515,21 @@ impl ExecutionState {
_ => {}
}

let mut blocked_attached = false;
let mut unfinished_attached = false;
let runnable = self
.tasks
.iter()
.inspect(|t| blocked_attached = blocked_attached || (t.blocked() && !t.detached))
.inspect(|t| unfinished_attached = unfinished_attached || (!t.finished() && !t.detached))
.filter(|t| t.runnable())
.map(|t| t.id)
.collect::<SmallVec<[_; DEFAULT_INLINE_TASKS]>>();

// We should finish execution when either
// (1) There are no runnable tasks, or
// (2) All runnable tasks have been detached AND there are no blocked attached tasks
// If there are some blocked attached tasks and all runnable tasks are detached,
// we must run some detached task so that blocked attached tasks may become unblocked.
if runnable.is_empty() || (!blocked_attached && runnable.iter().all(|id| self.get(*id).detached)) {
// (2) All runnable tasks have been detached AND there are no unfinished attached tasks
// If there are some unfinished attached tasks and all runnable tasks are detached, we must
// run some detached task to give them a chance to unblock some unfinished attached task.
if runnable.is_empty() || (!unfinished_attached && runnable.iter().all(|id| self.get(*id).detached)) {
self.next_task = ScheduledTask::Finished;
return Ok(());
}
Expand Down
1 change: 1 addition & 0 deletions src/runtime/storage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ pub(crate) struct StorageKey(pub usize, pub usize); // (identifier, type)
/// Values are Option<_> because we need to be able to incrementally destruct them, as it's valid
/// for TLS destructors to initialize new TLS slots. When a slot is destructed, its key is removed
/// from `order` and its value is replaced with None.
#[derive(Debug)]
pub(crate) struct StorageMap {
locals: HashMap<StorageKey, Option<Box<dyn Any>>>,
order: VecDeque<StorageKey>,
Expand Down
54 changes: 34 additions & 20 deletions src/runtime/task/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ pub(crate) const DEFAULT_INLINE_TASKS: usize = 16;

/// A `Task` represents a user-level unit of concurrency. Each task has an `id` that is unique within
/// the execution, and a `state` reflecting whether the task is runnable (enabled) or not.
#[derive(Debug)]
pub(crate) struct Task {
pub(super) id: TaskId,
pub(super) state: TaskState,
Expand All @@ -50,8 +51,8 @@ pub(crate) struct Task {
waiter: Option<TaskId>,

waker: Waker,
// Remember whether the waker was invoked while we were running so we don't re-block
woken_by_self: bool,
// Remember whether the waker was invoked while we were running
woken: bool,

name: Option<String>,

Expand All @@ -76,7 +77,7 @@ impl Task {
clock,
waiter: None,
waker,
woken_by_self: false,
woken: false,
detached: false,
name,
local_storage: StorageMap::new(),
Expand Down Expand Up @@ -106,10 +107,7 @@ impl Task {
let waker = ExecutionState::with(|state| state.current_mut().waker());
let cx = &mut Context::from_waker(&waker);
while future.as_mut().poll(cx).is_pending() {
ExecutionState::with(|state| {
// We need to block before thread::switch() unless we woke ourselves up
state.current_mut().block_unless_self_woken();
});
ExecutionState::with(|state| state.current_mut().sleep_unless_woken());
thread::switch();
}
},
Expand All @@ -132,6 +130,10 @@ impl Task {
self.state == TaskState::Blocked
}

pub(crate) fn sleeping(&self) -> bool {
self.state == TaskState::Sleeping
}

pub(crate) fn finished(&self) -> bool {
self.state == TaskState::Finished
}
Expand All @@ -149,6 +151,11 @@ impl Task {
self.state = TaskState::Blocked;
}

pub(crate) fn sleep(&mut self) {
assert!(self.state != TaskState::Finished);
self.state = TaskState::Sleeping;
}

pub(crate) fn unblock(&mut self) {
// Note we don't assert the task is blocked here. For example, a task invoking its own waker
// will not be blocked when this is called.
Expand All @@ -161,23 +168,25 @@ impl Task {
self.state = TaskState::Finished;
}

/// Potentially block this task after it was polled by the executor.
/// Potentially put this task to sleep after it was polled by the executor, unless someone has
/// called its waker first.
///
/// A synchronous Task should never call this, because we want threads to be
/// enabled-by-default to avoid bugs where Shuttle incorrectly omits a potential execution.
/// We also need to handle a special case where a task invoked its own waker, in which case
/// we should not block the task.
pub(crate) fn block_unless_self_woken(&mut self) {
let was_woken_by_self = std::mem::replace(&mut self.woken_by_self, false);
if !was_woken_by_self {
self.block();
/// A synchronous Task should never call this, because we want threads to be enabled-by-default
/// to avoid bugs where Shuttle incorrectly omits a potential execution.
pub(crate) fn sleep_unless_woken(&mut self) {
let was_woken = std::mem::replace(&mut self.woken, false);
if !was_woken {
self.sleep();
}
}

/// Remember that we have been unblocked while we were currently running, and therefore should
/// not be blocked again by `block_unless_self_woken`.
pub(super) fn set_woken_by_self(&mut self) {
self.woken_by_self = true;
/// Remember that our waker has been called, and so we should not block the next time the
/// executor tries to put us to sleep.
pub(super) fn wake(&mut self) {
self.woken = true;
if self.state == TaskState::Sleeping {
self.unblock();
}
}

/// Register a waiter for this thread to terminate. Returns a boolean indicating whether the
Expand Down Expand Up @@ -240,8 +249,13 @@ impl Task {

#[derive(PartialEq, Eq, Clone, Copy, Debug)]
pub(crate) enum TaskState {
/// Available to be scheduled
Runnable,
/// Blocked in a synchronization operation
Blocked,
/// A `Future` that returned `Pending` is waiting to be woken up
Sleeping,
/// Task has finished
Finished,
}

Expand Down
7 changes: 1 addition & 6 deletions src/runtime/task/waker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,12 +38,7 @@ unsafe fn raw_waker_wake(data: *const ()) {
return;
}

waiter.unblock();

let current = state.current_mut();
if current.id() == task_id {
current.set_woken_by_self();
}
waiter.wake();
});
}

Expand Down
6 changes: 6 additions & 0 deletions src/runtime/thread/continuation.rs
Original file line number Diff line number Diff line change
Expand Up @@ -247,6 +247,12 @@ impl DerefMut for PooledContinuation {
}
}

impl std::fmt::Debug for PooledContinuation {
fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
f.debug_struct("PooledContinuation").finish()
}
}

// Safety: these aren't sent across real threads
unsafe impl Send for PooledContinuation {}

Expand Down
85 changes: 83 additions & 2 deletions tests/asynch/waker.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,11 @@
use futures::future::poll_fn;
use shuttle::sync::atomic::{AtomicBool, Ordering};
use shuttle::sync::Mutex;
use shuttle::{asynch, check_dfs, thread};
use std::future::Future;
use std::pin::Pin;
use std::sync::Arc;
use std::task::{Context, Poll, Waker};

use shuttle::{asynch, check_dfs};
use test_env_log::test;

#[test]
Expand Down Expand Up @@ -49,3 +52,81 @@ fn wake_after_finish() {
None,
)
}

// Test that we can pass wakers across threads and have them work correctly
#[test]
fn wake_during_poll() {
check_dfs(
|| {
let waker: Arc<Mutex<Option<Waker>>> = Arc::new(Mutex::new(None));
let waker_clone = Arc::clone(&waker);
let signal = Arc::new(AtomicBool::new(false));
let signal_clone = Arc::clone(&signal);

// This thread might invoke `wake` before the other task finishes running a single
// invocation of `poll`. If that happens, that task must not be blocked.
thread::spawn(move || {
signal_clone.store(true, Ordering::SeqCst);

if let Some(waker) = waker_clone.lock().unwrap().take() {
waker.wake();
}
});

asynch::block_on(poll_fn(move |cx| {
*waker.lock().unwrap() = Some(cx.waker().clone());

if signal.load(Ordering::SeqCst) {
Poll::Ready(())
} else {
Poll::Pending
}
}));
},
None,
);
}

// Test that a waker invocation doesn't unblock a task that is blocked due to synchronization
// operations
#[test]
fn wake_during_blocked_poll() {
static RAN_WAKER: std::sync::atomic::AtomicBool = std::sync::atomic::AtomicBool::new(false);
check_dfs(
|| {
let waker: Arc<Mutex<Option<Waker>>> = Arc::new(Mutex::new(None));
let waker_clone = Arc::clone(&waker);
let counter = Arc::new(Mutex::new(0));
let counter_clone = Arc::clone(&counter);

thread::spawn(move || {
let mut counter = counter_clone.lock().unwrap();
thread::yield_now();
*counter += 1;
});

// If this `wake()` invocation happens while the thread above holds the `counter` lock
// and the `block_on` task below is blocked waiting to acquire that same lock, then
// `wake` must not unblock the `block_on` task. That is, `wake` should prevent the task
// from being blocked *the next time it returns Pending*, not just any time it is
// blocked.
thread::spawn(move || {
if let Some(waker) = waker_clone.lock().unwrap().take() {
RAN_WAKER.store(true, Ordering::SeqCst);
waker.wake();
}
});

asynch::block_on(poll_fn(move |cx| {
*waker.lock().unwrap() = Some(cx.waker().clone());

let mut counter = counter.lock().unwrap();
*counter += 1;

Poll::Ready(())
}));
},
None,
);
assert!(RAN_WAKER.load(Ordering::SeqCst), "waker was not invoked by any test");
}

0 comments on commit 06a74d0

Please sign in to comment.