Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

rt: refactor current-thread scheduler (take 2) #4395

Merged
merged 2 commits into from
Jan 12, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
420 changes: 224 additions & 196 deletions tokio/src/runtime/basic_scheduler.rs

Large diffs are not rendered by default.

2 changes: 1 addition & 1 deletion tokio/src/runtime/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -283,7 +283,7 @@ cfg_rt! {
#[derive(Debug)]
enum Kind {
/// Execute all tasks on the current-thread.
CurrentThread(BasicScheduler<driver::Driver>),
CurrentThread(BasicScheduler),

/// Execute tasks across multiple threads.
#[cfg(feature = "rt-multi-thread")]
Expand Down
20 changes: 11 additions & 9 deletions tokio/src/runtime/tests/loom_basic_scheduler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,20 +34,22 @@ fn assert_at_most_num_polls(rt: Arc<Runtime>, at_most_polls: usize) {
#[test]
fn block_on_num_polls() {
loom::model(|| {
// we expect at most 3 number of polls because there are
// three points at which we poll the future. At any of these
// points it can be ready:
// we expect at most 4 number of polls because there are three points at
// which we poll the future and an opportunity for a false-positive.. At
// any of these points it can be ready:
//
// - when we fail to steal the parker and we block on a
// notification that it is available.
// - when we fail to steal the parker and we block on a notification
// that it is available.
//
// - when we steal the parker and we schedule the future
//
// - when the future is woken up and we have ran the max
// number of tasks for the current tick or there are no
// more tasks to run.
// - when the future is woken up and we have ran the max number of tasks
// for the current tick or there are no more tasks to run.
//
let at_most = 3;
// - a thread is notified that the parker is available but a third
// thread acquires it before the notified thread can.
//
let at_most = 4;

let rt1 = Arc::new(Builder::new_current_thread().build().unwrap());
let rt2 = rt1.clone();
Expand Down
3 changes: 0 additions & 3 deletions tokio/src/runtime/thread_pool/mod.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,5 @@
//! Threadpool

mod atomic_cell;
use atomic_cell::AtomicCell;

mod idle;
use self::idle::Idle;

Expand Down
3 changes: 2 additions & 1 deletion tokio/src/runtime/thread_pool/worker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -66,8 +66,9 @@ use crate::runtime::enter::EnterContext;
use crate::runtime::park::{Parker, Unparker};
use crate::runtime::stats::{RuntimeStats, WorkerStatsBatcher};
use crate::runtime::task::{Inject, JoinHandle, OwnedTasks};
use crate::runtime::thread_pool::{AtomicCell, Idle};
use crate::runtime::thread_pool::Idle;
use crate::runtime::{queue, task, Callback};
use crate::util::atomic_cell::AtomicCell;
use crate::util::FastRand;

use std::cell::RefCell;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,30 +3,30 @@ use crate::loom::sync::atomic::AtomicPtr;
use std::ptr;
use std::sync::atomic::Ordering::AcqRel;

pub(super) struct AtomicCell<T> {
pub(crate) struct AtomicCell<T> {
data: AtomicPtr<T>,
}

unsafe impl<T: Send> Send for AtomicCell<T> {}
unsafe impl<T: Send> Sync for AtomicCell<T> {}

impl<T> AtomicCell<T> {
pub(super) fn new(data: Option<Box<T>>) -> AtomicCell<T> {
pub(crate) fn new(data: Option<Box<T>>) -> AtomicCell<T> {
AtomicCell {
data: AtomicPtr::new(to_raw(data)),
}
}

pub(super) fn swap(&self, val: Option<Box<T>>) -> Option<Box<T>> {
pub(crate) fn swap(&self, val: Option<Box<T>>) -> Option<Box<T>> {
let old = self.data.swap(to_raw(val), AcqRel);
from_raw(old)
}

pub(super) fn set(&self, val: Box<T>) {
pub(crate) fn set(&self, val: Box<T>) {
let _ = self.swap(Some(val));
}

pub(super) fn take(&self) -> Option<Box<T>> {
pub(crate) fn take(&self) -> Option<Box<T>> {
self.swap(None)
}
}
Expand Down
3 changes: 3 additions & 0 deletions tokio/src/util/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,9 @@ cfg_io_driver! {
pub(crate) mod slab;
}

#[cfg(feature = "rt")]
pub(crate) mod atomic_cell;

#[cfg(any(
// io driver uses `WakeList` directly
feature = "net",
Expand Down
29 changes: 29 additions & 0 deletions tokio/tests/rt_basic.rs
Original file line number Diff line number Diff line change
Expand Up @@ -168,6 +168,35 @@ fn drop_tasks_in_context() {
assert!(SUCCESS.load(Ordering::SeqCst));
}

#[test]
#[should_panic(expected = "boom")]
fn wake_in_drop_after_panic() {
let (tx, rx) = oneshot::channel::<()>();

struct WakeOnDrop(Option<oneshot::Sender<()>>);

impl Drop for WakeOnDrop {
fn drop(&mut self) {
self.0.take().unwrap().send(()).unwrap();
}
}

let rt = rt();

rt.spawn(async move {
let _wake_on_drop = WakeOnDrop(Some(tx));
// wait forever
futures::future::pending::<()>().await;
});

let _join = rt.spawn(async move { rx.await });

rt.block_on(async {
tokio::task::yield_now().await;
panic!("boom");
});
}

#[test]
#[should_panic(
expected = "A Tokio 1.x context was found, but timers are disabled. Call `enable_time` on the runtime builder to enable timers."
Expand Down