Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix waker behavior when invoked before a poll finishes #54

Merged
merged 1 commit into from
Sep 17, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 1 addition & 3 deletions src/asynch.rs
Original file line number Diff line number Diff line change
Expand Up @@ -148,9 +148,7 @@ pub fn block_on<F: Future>(future: F) -> F::Output {
match future.as_mut().poll(cx) {
Poll::Ready(result) => break result,
Poll::Pending => {
ExecutionState::with(|state| {
state.current_mut().block_unless_self_woken();
});
ExecutionState::with(|state| state.current_mut().sleep_unless_woken());
}
}

Expand Down
51 changes: 32 additions & 19 deletions src/runtime/execution.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use crate::runtime::failure::{init_panic_hook, persist_failure, persist_task_failure};
use crate::runtime::storage::{StorageKey, StorageMap};
use crate::runtime::task::clock::VectorClock;
use crate::runtime::task::{Task, TaskId, TaskState, DEFAULT_INLINE_TASKS};
use crate::runtime::task::{Task, TaskId, DEFAULT_INLINE_TASKS};
use crate::runtime::thread::continuation::PooledContinuation;
use crate::scheduler::{Schedule, Scheduler};
use crate::{Config, MaxSteps};
Expand Down Expand Up @@ -62,7 +62,12 @@ impl Execution {

EXECUTION_STATE.set(&state, move || {
// Spawn `f` as the first task
ExecutionState::spawn_thread(f, config.stack_size, None, Some(VectorClock::new()));
ExecutionState::spawn_thread(
f,
config.stack_size,
Some("main-thread".to_string()),
Some(VectorClock::new()),
);

// Run the test to completion
while self.step(config) {}
Expand Down Expand Up @@ -93,21 +98,29 @@ impl Execution {
NextStep::Task(Rc::clone(&task.continuation))
}
ScheduledTask::Finished => {
let task_states = state
.tasks
.iter()
.map(|t| (t.id, t.state, t.detached))
.collect::<SmallVec<[_; DEFAULT_INLINE_TASKS]>>();
if task_states
.iter()
.any(|(_, s, detached)| !detached && *s == TaskState::Blocked)
{
// The scheduler decided we're finished, so there are either no runnable tasks,
// or all runnable tasks are detached and there are no unfinished attached
// tasks. Therefore, it's a deadlock if there are unfinished attached tasks.
if state.tasks.iter().any(|t| !t.finished() && !t.detached) {
let blocked_tasks = state
.tasks
.iter()
.filter(|t| !t.finished())
.map(|t| {
format!(
"{} (task {}{}{})",
t.name().unwrap_or_else(|| "<unknown>".to_string()),
t.id().0,
if t.detached { ", detached" } else { "" },
if t.sleeping() { ", pending future" } else { "" },
)
})
.collect::<Vec<_>>();
NextStep::Failure(
format!("deadlock! runnable tasks: {:?}", task_states),
format!("deadlock! blocked tasks: [{}]", blocked_tasks.join(", ")),
state.current_schedule.clone(),
)
} else {
debug_assert!(state.tasks.iter().all(|t| t.detached || t.finished()));
NextStep::Finished
}
}
Expand Down Expand Up @@ -502,21 +515,21 @@ impl ExecutionState {
_ => {}
}

let mut blocked_attached = false;
let mut unfinished_attached = false;
let runnable = self
.tasks
.iter()
.inspect(|t| blocked_attached = blocked_attached || (t.blocked() && !t.detached))
.inspect(|t| unfinished_attached = unfinished_attached || (!t.finished() && !t.detached))
.filter(|t| t.runnable())
.map(|t| t.id)
.collect::<SmallVec<[_; DEFAULT_INLINE_TASKS]>>();

// We should finish execution when either
// (1) There are no runnable tasks, or
// (2) All runnable tasks have been detached AND there are no blocked attached tasks
// If there are some blocked attached tasks and all runnable tasks are detached,
// we must run some detached task so that blocked attached tasks may become unblocked.
if runnable.is_empty() || (!blocked_attached && runnable.iter().all(|id| self.get(*id).detached)) {
// (2) All runnable tasks have been detached AND there are no unfinished attached tasks
// If there are some unfinished attached tasks and all runnable tasks are detached, we must
// run some detached task to give them a chance to unblock some unfinished attached task.
if runnable.is_empty() || (!unfinished_attached && runnable.iter().all(|id| self.get(*id).detached)) {
self.next_task = ScheduledTask::Finished;
return Ok(());
}
Expand Down
1 change: 1 addition & 0 deletions src/runtime/storage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ pub(crate) struct StorageKey(pub usize, pub usize); // (identifier, type)
/// Values are Option<_> because we need to be able to incrementally destruct them, as it's valid
/// for TLS destructors to initialize new TLS slots. When a slot is destructed, its key is removed
/// from `order` and its value is replaced with None.
#[derive(Debug)]
pub(crate) struct StorageMap {
locals: HashMap<StorageKey, Option<Box<dyn Any>>>,
order: VecDeque<StorageKey>,
Expand Down
54 changes: 34 additions & 20 deletions src/runtime/task/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ pub(crate) const DEFAULT_INLINE_TASKS: usize = 16;

/// A `Task` represents a user-level unit of concurrency. Each task has an `id` that is unique within
/// the execution, and a `state` reflecting whether the task is runnable (enabled) or not.
#[derive(Debug)]
pub(crate) struct Task {
pub(super) id: TaskId,
pub(super) state: TaskState,
Expand All @@ -50,8 +51,8 @@ pub(crate) struct Task {
waiter: Option<TaskId>,

waker: Waker,
// Remember whether the waker was invoked while we were running so we don't re-block
woken_by_self: bool,
// Remember whether the waker was invoked while we were running
woken: bool,

name: Option<String>,

Expand All @@ -76,7 +77,7 @@ impl Task {
clock,
waiter: None,
waker,
woken_by_self: false,
woken: false,
detached: false,
name,
local_storage: StorageMap::new(),
Expand Down Expand Up @@ -106,10 +107,7 @@ impl Task {
let waker = ExecutionState::with(|state| state.current_mut().waker());
let cx = &mut Context::from_waker(&waker);
while future.as_mut().poll(cx).is_pending() {
ExecutionState::with(|state| {
// We need to block before thread::switch() unless we woke ourselves up
state.current_mut().block_unless_self_woken();
});
ExecutionState::with(|state| state.current_mut().sleep_unless_woken());
thread::switch();
}
},
Expand All @@ -132,6 +130,10 @@ impl Task {
self.state == TaskState::Blocked
}

pub(crate) fn sleeping(&self) -> bool {
self.state == TaskState::Sleeping
}

pub(crate) fn finished(&self) -> bool {
self.state == TaskState::Finished
}
Expand All @@ -149,6 +151,11 @@ impl Task {
self.state = TaskState::Blocked;
}

pub(crate) fn sleep(&mut self) {
assert!(self.state != TaskState::Finished);
self.state = TaskState::Sleeping;
}

pub(crate) fn unblock(&mut self) {
// Note we don't assert the task is blocked here. For example, a task invoking its own waker
// will not be blocked when this is called.
Expand All @@ -161,23 +168,25 @@ impl Task {
self.state = TaskState::Finished;
}

/// Potentially block this task after it was polled by the executor.
/// Potentially put this task to sleep after it was polled by the executor, unless someone has
/// called its waker first.
///
/// A synchronous Task should never call this, because we want threads to be
/// enabled-by-default to avoid bugs where Shuttle incorrectly omits a potential execution.
/// We also need to handle a special case where a task invoked its own waker, in which case
/// we should not block the task.
pub(crate) fn block_unless_self_woken(&mut self) {
let was_woken_by_self = std::mem::replace(&mut self.woken_by_self, false);
if !was_woken_by_self {
self.block();
/// A synchronous Task should never call this, because we want threads to be enabled-by-default
/// to avoid bugs where Shuttle incorrectly omits a potential execution.
pub(crate) fn sleep_unless_woken(&mut self) {
let was_woken = std::mem::replace(&mut self.woken, false);
if !was_woken {
self.sleep();
}
}

/// Remember that we have been unblocked while we were currently running, and therefore should
/// not be blocked again by `block_unless_self_woken`.
pub(super) fn set_woken_by_self(&mut self) {
self.woken_by_self = true;
/// Remember that our waker has been called, and so we should not block the next time the
/// executor tries to put us to sleep.
pub(super) fn wake(&mut self) {
self.woken = true;
if self.state == TaskState::Sleeping {
self.unblock();
}
}

/// Register a waiter for this thread to terminate. Returns a boolean indicating whether the
Expand Down Expand Up @@ -240,8 +249,13 @@ impl Task {

#[derive(PartialEq, Eq, Clone, Copy, Debug)]
pub(crate) enum TaskState {
/// Available to be scheduled
Runnable,
/// Blocked in a synchronization operation
Blocked,
/// A `Future` that returned `Pending` is waiting to be woken up
Sleeping,
/// Task has finished
Finished,
}

Expand Down
7 changes: 1 addition & 6 deletions src/runtime/task/waker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,12 +38,7 @@ unsafe fn raw_waker_wake(data: *const ()) {
return;
}

waiter.unblock();

let current = state.current_mut();
if current.id() == task_id {
current.set_woken_by_self();
}
waiter.wake();
});
}

Expand Down
6 changes: 6 additions & 0 deletions src/runtime/thread/continuation.rs
Original file line number Diff line number Diff line change
Expand Up @@ -247,6 +247,12 @@ impl DerefMut for PooledContinuation {
}
}

impl std::fmt::Debug for PooledContinuation {
fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
f.debug_struct("PooledContinuation").finish()
}
}

// Safety: these aren't sent across real threads
unsafe impl Send for PooledContinuation {}

Expand Down
85 changes: 83 additions & 2 deletions tests/asynch/waker.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,11 @@
use futures::future::poll_fn;
use shuttle::sync::atomic::{AtomicBool, Ordering};
use shuttle::sync::Mutex;
use shuttle::{asynch, check_dfs, thread};
use std::future::Future;
use std::pin::Pin;
use std::sync::Arc;
use std::task::{Context, Poll, Waker};

use shuttle::{asynch, check_dfs};
use test_env_log::test;

#[test]
Expand Down Expand Up @@ -49,3 +52,81 @@ fn wake_after_finish() {
None,
)
}

// Test that we can pass wakers across threads and have them work correctly
#[test]
fn wake_during_poll() {
check_dfs(
|| {
let waker: Arc<Mutex<Option<Waker>>> = Arc::new(Mutex::new(None));
let waker_clone = Arc::clone(&waker);
let signal = Arc::new(AtomicBool::new(false));
let signal_clone = Arc::clone(&signal);

// This thread might invoke `wake` before the other task finishes running a single
// invocation of `poll`. If that happens, that task must not be blocked.
thread::spawn(move || {
signal_clone.store(true, Ordering::SeqCst);

if let Some(waker) = waker_clone.lock().unwrap().take() {
waker.wake();
}
});

asynch::block_on(poll_fn(move |cx| {
*waker.lock().unwrap() = Some(cx.waker().clone());

if signal.load(Ordering::SeqCst) {
Poll::Ready(())
} else {
Poll::Pending
}
}));
},
None,
);
}

// Test that a waker invocation doesn't unblock a task that is blocked due to synchronization
// operations
#[test]
fn wake_during_blocked_poll() {
static RAN_WAKER: std::sync::atomic::AtomicBool = std::sync::atomic::AtomicBool::new(false);
check_dfs(
|| {
let waker: Arc<Mutex<Option<Waker>>> = Arc::new(Mutex::new(None));
let waker_clone = Arc::clone(&waker);
let counter = Arc::new(Mutex::new(0));
let counter_clone = Arc::clone(&counter);

thread::spawn(move || {
let mut counter = counter_clone.lock().unwrap();
thread::yield_now();
*counter += 1;
});

// If this `wake()` invocation happens while the thread above holds the `counter` lock
// and the `block_on` task below is blocked waiting to acquire that same lock, then
// `wake` must not unblock the `block_on` task. That is, `wake` should prevent the task
// from being blocked *the next time it returns Pending*, not just any time it is
// blocked.
thread::spawn(move || {
if let Some(waker) = waker_clone.lock().unwrap().take() {
RAN_WAKER.store(true, Ordering::SeqCst);
waker.wake();
jamesbornholt marked this conversation as resolved.
Show resolved Hide resolved
}
});

asynch::block_on(poll_fn(move |cx| {
*waker.lock().unwrap() = Some(cx.waker().clone());

let mut counter = counter.lock().unwrap();
*counter += 1;

Poll::Ready(())
}));
},
None,
);
assert!(RAN_WAKER.load(Ordering::SeqCst), "waker was not invoked by any test");
}