diff --git a/src/asynch.rs b/src/asynch.rs index d607b7a9..5457dcca 100644 --- a/src/asynch.rs +++ b/src/asynch.rs @@ -148,9 +148,7 @@ pub fn block_on(future: F) -> F::Output { match future.as_mut().poll(cx) { Poll::Ready(result) => break result, Poll::Pending => { - ExecutionState::with(|state| { - state.current_mut().block_unless_self_woken(); - }); + ExecutionState::with(|state| state.current_mut().sleep_unless_woken()); } } diff --git a/src/runtime/execution.rs b/src/runtime/execution.rs index 23aa12b5..52ac5c42 100644 --- a/src/runtime/execution.rs +++ b/src/runtime/execution.rs @@ -1,6 +1,6 @@ use crate::runtime::failure::{init_panic_hook, persist_failure, persist_task_failure}; use crate::runtime::task::clock::VectorClock; -use crate::runtime::task::{Task, TaskId, TaskState, DEFAULT_INLINE_TASKS}; +use crate::runtime::task::{Task, TaskId, DEFAULT_INLINE_TASKS}; use crate::runtime::thread::continuation::PooledContinuation; use crate::scheduler::{Schedule, Scheduler}; use crate::{Config, MaxSteps}; @@ -61,7 +61,12 @@ impl Execution { EXECUTION_STATE.set(&state, move || { // Spawn `f` as the first task - ExecutionState::spawn_thread(f, config.stack_size, None, Some(VectorClock::new())); + ExecutionState::spawn_thread( + f, + config.stack_size, + Some("main-thread".to_string()), + Some(VectorClock::new()), + ); // Run the test to completion while self.step(config) {} @@ -92,21 +97,29 @@ impl Execution { NextStep::Task(Rc::clone(&task.continuation)) } ScheduledTask::Finished => { - let task_states = state - .tasks - .iter() - .map(|t| (t.id, t.state, t.detached)) - .collect::>(); - if task_states - .iter() - .any(|(_, s, detached)| !detached && *s == TaskState::Blocked) - { + // The scheduler decided we're finished, so there are either no runnable tasks, + // or all runnable tasks are detached and there are no unfinished attached + // tasks. Therefore, it's a deadlock if there are unfinished attached tasks. + if state.tasks.iter().any(|t| !t.finished() && !t.detached) { + let blocked_tasks = state + .tasks + .iter() + .filter(|t| !t.finished()) + .map(|t| { + format!( + "{} (task {}{}{})", + t.name().unwrap_or_else(|| "".to_string()), + t.id().0, + if t.detached { ", detached" } else { "" }, + if t.sleeping() { ", pending future" } else { "" }, + ) + }) + .collect::>(); NextStep::Failure( - format!("deadlock! runnable tasks: {:?}", task_states), + format!("deadlock! blocked tasks: [{}]", blocked_tasks.join(", ")), state.current_schedule.clone(), ) } else { - debug_assert!(state.tasks.iter().all(|t| t.detached || t.finished())); NextStep::Finished } } @@ -485,21 +498,21 @@ impl ExecutionState { _ => {} } - let mut blocked_attached = false; + let mut unfinished_attached = false; let runnable = self .tasks .iter() - .inspect(|t| blocked_attached = blocked_attached || (t.blocked() && !t.detached)) + .inspect(|t| unfinished_attached = unfinished_attached || (!t.finished() && !t.detached)) .filter(|t| t.runnable()) .map(|t| t.id) .collect::>(); // We should finish execution when either // (1) There are no runnable tasks, or - // (2) All runnable tasks have been detached AND there are no blocked attached tasks - // If there are some blocked attached tasks and all runnable tasks are detached, - // we must run some detached task so that blocked attached tasks may become unblocked. - if runnable.is_empty() || (!blocked_attached && runnable.iter().all(|id| self.get(*id).detached)) { + // (2) All runnable tasks have been detached AND there are no unfinished attached tasks + // If there are some unfinished attached tasks and all runnable tasks are detached, we must + // run some detached task to give them a chance to unblock some unfinished attached task. + if runnable.is_empty() || (!unfinished_attached && runnable.iter().all(|id| self.get(*id).detached)) { self.next_task = ScheduledTask::Finished; return Ok(()); } diff --git a/src/runtime/task/mod.rs b/src/runtime/task/mod.rs index d0323adb..50c655c9 100644 --- a/src/runtime/task/mod.rs +++ b/src/runtime/task/mod.rs @@ -39,6 +39,7 @@ pub(crate) const DEFAULT_INLINE_TASKS: usize = 16; /// A `Task` represents a user-level unit of concurrency. Each task has an `id` that is unique within /// the execution, and a `state` reflecting whether the task is runnable (enabled) or not. +#[derive(Debug)] pub(crate) struct Task { pub(super) id: TaskId, pub(super) state: TaskState, @@ -51,8 +52,8 @@ pub(crate) struct Task { waiter: Option, waker: Waker, - // Remember whether the waker was invoked while we were running so we don't re-block - woken_by_self: bool, + // Remember whether the waker was invoked while we were running + woken: bool, name: Option, @@ -77,7 +78,7 @@ impl Task { clock, waiter: None, waker, - woken_by_self: false, + woken: false, detached: false, name, local_storage: LocalMap::new(), @@ -107,10 +108,7 @@ impl Task { let waker = ExecutionState::with(|state| state.current_mut().waker()); let cx = &mut Context::from_waker(&waker); while future.as_mut().poll(cx).is_pending() { - ExecutionState::with(|state| { - // We need to block before thread::switch() unless we woke ourselves up - state.current_mut().block_unless_self_woken(); - }); + ExecutionState::with(|state| state.current_mut().sleep_unless_woken()); thread::switch(); } }, @@ -133,6 +131,10 @@ impl Task { self.state == TaskState::Blocked } + pub(crate) fn sleeping(&self) -> bool { + self.state == TaskState::Sleeping + } + pub(crate) fn finished(&self) -> bool { self.state == TaskState::Finished } @@ -150,6 +152,11 @@ impl Task { self.state = TaskState::Blocked; } + pub(crate) fn sleep(&mut self) { + assert!(self.state != TaskState::Finished); + self.state = TaskState::Sleeping; + } + pub(crate) fn unblock(&mut self) { // Note we don't assert the task is blocked here. For example, a task invoking its own waker // will not be blocked when this is called. @@ -162,23 +169,25 @@ impl Task { self.state = TaskState::Finished; } - /// Potentially block this task after it was polled by the executor. + /// Potentially put this task to sleep after it was polled by the executor, unless someone has + /// called its waker first. /// - /// A synchronous Task should never call this, because we want threads to be - /// enabled-by-default to avoid bugs where Shuttle incorrectly omits a potential execution. - /// We also need to handle a special case where a task invoked its own waker, in which case - /// we should not block the task. - pub(crate) fn block_unless_self_woken(&mut self) { - let was_woken_by_self = std::mem::replace(&mut self.woken_by_self, false); - if !was_woken_by_self { - self.block(); + /// A synchronous Task should never call this, because we want threads to be enabled-by-default + /// to avoid bugs where Shuttle incorrectly omits a potential execution. + pub(crate) fn sleep_unless_woken(&mut self) { + let was_woken = std::mem::replace(&mut self.woken, false); + if !was_woken { + self.sleep(); } } - /// Remember that we have been unblocked while we were currently running, and therefore should - /// not be blocked again by `block_unless_self_woken`. - pub(super) fn set_woken_by_self(&mut self) { - self.woken_by_self = true; + /// Remember that our waker has been called, and so we should not block the next time the + /// executor tries to put us to sleep. + pub(super) fn wake(&mut self) { + self.woken = true; + if self.state == TaskState::Sleeping { + self.unblock(); + } } /// Register a waiter for this thread to terminate. Returns a boolean indicating whether the @@ -241,8 +250,13 @@ impl Task { #[derive(PartialEq, Eq, Clone, Copy, Debug)] pub(crate) enum TaskState { + /// Available to be scheduled Runnable, + /// Blocked in a synchronization operation Blocked, + /// A `Future` that returned `Pending` is waiting to be woken up + Sleeping, + /// Task has finished Finished, } @@ -332,6 +346,7 @@ impl LocalKeyId { /// Values are Option<_> because we need to be able to incrementally destruct them, as it's valid /// for TLS destructors to initialize new TLS slots. When a slot is destructed, its key is removed /// from `order` and its value is replaced with None. +#[derive(Debug)] struct LocalMap { locals: HashMap>>, order: VecDeque, diff --git a/src/runtime/task/waker.rs b/src/runtime/task/waker.rs index 6bccc1a5..ee80bf3d 100644 --- a/src/runtime/task/waker.rs +++ b/src/runtime/task/waker.rs @@ -38,12 +38,7 @@ unsafe fn raw_waker_wake(data: *const ()) { return; } - waiter.unblock(); - - let current = state.current_mut(); - if current.id() == task_id { - current.set_woken_by_self(); - } + waiter.wake(); }); } diff --git a/src/runtime/thread/continuation.rs b/src/runtime/thread/continuation.rs index 0cab1124..819c94cc 100644 --- a/src/runtime/thread/continuation.rs +++ b/src/runtime/thread/continuation.rs @@ -247,6 +247,12 @@ impl DerefMut for PooledContinuation { } } +impl std::fmt::Debug for PooledContinuation { + fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result { + f.debug_struct("PooledContinuation").finish() + } +} + // Safety: these aren't sent across real threads unsafe impl Send for PooledContinuation {} diff --git a/tests/asynch/waker.rs b/tests/asynch/waker.rs index fcfcfd02..c0e5bf8e 100644 --- a/tests/asynch/waker.rs +++ b/tests/asynch/waker.rs @@ -1,8 +1,11 @@ +use futures::future::poll_fn; +use shuttle::sync::atomic::{AtomicBool, Ordering}; +use shuttle::sync::Mutex; +use shuttle::{asynch, check_dfs, thread}; use std::future::Future; use std::pin::Pin; +use std::sync::Arc; use std::task::{Context, Poll, Waker}; - -use shuttle::{asynch, check_dfs}; use test_env_log::test; #[test] @@ -49,3 +52,78 @@ fn wake_after_finish() { None, ) } + +// Test that we can pass wakers across threads and have them work correctly +#[test] +fn wake_during_poll() { + check_dfs( + || { + let waker: Arc>> = Arc::new(Mutex::new(None)); + let waker_clone = Arc::clone(&waker); + let signal = Arc::new(AtomicBool::new(false)); + let signal_clone = Arc::clone(&signal); + + // This thread might invoke `wake` before the other task finishes running a single + // invocation of `poll`. If that happens, that task must not be blocked. + thread::spawn(move || { + signal_clone.store(true, Ordering::SeqCst); + + if let Some(waker) = waker_clone.lock().unwrap().take() { + waker.wake(); + } + }); + + asynch::block_on(poll_fn(move |cx| { + *waker.lock().unwrap() = Some(cx.waker().clone()); + + if signal.load(Ordering::SeqCst) { + Poll::Ready(()) + } else { + Poll::Pending + } + })); + }, + None, + ); +} + +// Test that a waker invocation doesn't unblock a task that is blocked due to synchronization +// operations +#[test] +fn wake_during_blocked_poll() { + check_dfs( + || { + let waker: Arc>> = Arc::new(Mutex::new(None)); + let waker_clone = Arc::clone(&waker); + let counter = Arc::new(Mutex::new(0)); + let counter_clone = Arc::clone(&counter); + + thread::spawn(move || { + let mut counter = counter_clone.lock().unwrap(); + thread::yield_now(); + *counter += 1; + }); + + // If this `wake()` invocation happens while the thread above holds the `counter` lock + // and the `block_on` task below is blocked waiting to acquire that same lock, then + // `wake` must not unblock the `block_on` task. That is, `wake` should prevent the task + // from being blocked *the next time it returns Pending*, not just any time it is + // blocked. + thread::spawn(move || { + if let Some(waker) = waker_clone.lock().unwrap().take() { + waker.wake(); + } + }); + + asynch::block_on(poll_fn(move |cx| { + *waker.lock().unwrap() = Some(cx.waker().clone()); + + let mut counter = counter.lock().unwrap(); + *counter += 1; + + Poll::Ready(()) + })); + }, + None, + ); +}