Skip to content

Commit

Permalink
runtime: reset woken of outer future after polled (tokio-rs#4157)
Browse files Browse the repository at this point in the history
  • Loading branch information
suikammd authored and Oliver Giersch committed Oct 28, 2021
1 parent 92a6d5d commit 8581ce2
Show file tree
Hide file tree
Showing 2 changed files with 66 additions and 8 deletions.
16 changes: 8 additions & 8 deletions tokio/src/runtime/basic_scheduler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ use std::cell::RefCell;
use std::collections::VecDeque;
use std::fmt;
use std::future::Future;
use std::sync::atomic::Ordering::{AcqRel, Acquire, Release};
use std::sync::atomic::Ordering::{AcqRel, Release};
use std::sync::Arc;
use std::task::Poll::{Pending, Ready};
use std::time::Duration;
Expand Down Expand Up @@ -219,13 +219,11 @@ impl<P: Park> Inner<P> {
let _enter = crate::runtime::enter(false);
let waker = scheduler.spawner.waker_ref();
let mut cx = std::task::Context::from_waker(&waker);
let mut polled = false;

pin!(future);

'outer: loop {
if scheduler.spawner.was_woken() || !polled {
polled = true;
if scheduler.spawner.reset_woken() {
scheduler.stats.incr_poll_count();
if let Ready(v) = crate::coop::budget(|| future.as_mut().poll(&mut cx)) {
return v;
Expand Down Expand Up @@ -418,13 +416,15 @@ impl Spawner {
}

fn waker_ref(&self) -> WakerRef<'_> {
// clear the woken bit
self.shared.woken.swap(false, AcqRel);
// Set woken to true when enter block_on, ensure outer future
// be polled for the first time when enter loop
self.shared.woken.store(true, Release);
waker_ref(&self.shared)
}

fn was_woken(&self) -> bool {
self.shared.woken.load(Acquire)
// reset woken to false and return original value
pub(crate) fn reset_woken(&self) -> bool {
self.shared.woken.swap(false, AcqRel)
}
}

Expand Down
58 changes: 58 additions & 0 deletions tokio/src/runtime/tests/loom_basic_scheduler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,45 @@ fn block_on_num_polls() {
});
}

#[test]
fn assert_no_unnecessary_polls() {
loom::model(|| {
// // After we poll outer future, woken should reset to false
let rt = Builder::new_current_thread().build().unwrap();
let (tx, rx) = oneshot::channel();
let pending_cnt = Arc::new(AtomicUsize::new(0));

rt.spawn(async move {
for _ in 0..24 {
task::yield_now().await;
}
tx.send(()).unwrap();
});

let pending_cnt_clone = pending_cnt.clone();
rt.block_on(async move {
// use task::yield_now() to ensure woken set to true
// ResetFuture will be polled at most once
// Here comes two cases
// 1. recv no message from channel, ResetFuture will be polled
// but get Pending and we record ResetFuture.pending_cnt ++.
// Then when message arrive, ResetFuture returns Ready. So we
// expect ResetFuture.pending_cnt = 1
// 2. recv message from channel, ResetFuture returns Ready immediately.
// We expect ResetFuture.pending_cnt = 0
task::yield_now().await;
ResetFuture {
rx,
pending_cnt: pending_cnt_clone,
}
.await;
});

let pending_cnt = pending_cnt.load(Acquire);
assert!(pending_cnt <= 1);
});
}

struct BlockedFuture {
rx: Receiver<()>,
num_polls: Arc<AtomicUsize>,
Expand All @@ -80,3 +119,22 @@ impl Future for BlockedFuture {
}
}
}

struct ResetFuture {
rx: Receiver<()>,
pending_cnt: Arc<AtomicUsize>,
}

impl Future for ResetFuture {
type Output = ();

fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
match Pin::new(&mut self.rx).poll(cx) {
Poll::Pending => {
self.pending_cnt.fetch_add(1, Release);
Poll::Pending
}
_ => Poll::Ready(()),
}
}
}

0 comments on commit 8581ce2

Please sign in to comment.