Skip to content

Commit

Permalink
reset woken of outer future after polled
Browse files Browse the repository at this point in the history
  • Loading branch information
suikammd committed Oct 10, 2021
1 parent d1cc6af commit 2b80b42
Show file tree
Hide file tree
Showing 2 changed files with 62 additions and 3 deletions.
7 changes: 4 additions & 3 deletions tokio/src/runtime/basic_scheduler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -219,13 +219,11 @@ impl<P: Park> Inner<P> {
let _enter = crate::runtime::enter(false);
let waker = scheduler.spawner.waker_ref();
let mut cx = std::task::Context::from_waker(&waker);
let mut polled = false;

pin!(future);

'outer: loop {
if scheduler.spawner.reset_woken() || !polled {
polled = true;
if scheduler.spawner.reset_woken() {
scheduler.stats.incr_poll_count();
if let Ready(v) = crate::coop::budget(|| future.as_mut().poll(&mut cx)) {
return v;
Expand Down Expand Up @@ -418,6 +416,9 @@ impl Spawner {
}

fn waker_ref(&self) -> WakerRef<'_> {
// Set woken to true when enter block_on, ensure outer future
// be polled for the first time when enter loop
self.shared.woken.store(true, Release);
waker_ref(&self.shared)
}

Expand Down
58 changes: 58 additions & 0 deletions tokio/src/runtime/tests/loom_basic_scheduler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,45 @@ fn block_on_num_polls() {
});
}

#[test]
fn assert_no_unnecessary_polls() {
loom::model(|| {
// // After we poll outer future, woken should reset to false
let rt = Builder::new_current_thread().build().unwrap();
let (tx, rx) = oneshot::channel();
let pending_cnt = Arc::new(AtomicUsize::new(0));

rt.spawn(async move {
for _ in 0..24 {
task::yield_now().await;
}
tx.send(()).unwrap();
});

let pending_cnt_clone = pending_cnt.clone();
rt.block_on(async move {
// use task::yield_now() to ensure woken set to true
// ResetFuture will be polled at most once
// Here comes two cases
// 1. recv no message from channel, ResetFuture will be polled
// but get Pending and we record ResetFuture.pending_cnt ++.
// Then when message arrive, ResetFuture returns Ready. So we
// expect ResetFuture.pending_cnt = 1
// 2. recv message from channel, ResetFuture returns Ready immediately.
// We expect ResetFuture.pending_cnt = 0
task::yield_now().await;
ResetFuture {
rx,
pending_cnt: pending_cnt_clone,
}
.await;
});

let pending_cnt = pending_cnt.load(Acquire);
assert!(pending_cnt <= 1);
});
}

struct BlockedFuture {
rx: Receiver<()>,
num_polls: Arc<AtomicUsize>,
Expand All @@ -80,3 +119,22 @@ impl Future for BlockedFuture {
}
}
}

struct ResetFuture {
rx: Receiver<()>,
pending_cnt: Arc<AtomicUsize>,
}

impl Future for ResetFuture {
type Output = ();

fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
match Pin::new(&mut self.rx).poll(cx) {
Poll::Pending => {
self.pending_cnt.fetch_add(1, Release);
Poll::Pending
}
_ => Poll::Ready(()),
}
}
}

0 comments on commit 2b80b42

Please sign in to comment.