Skip to content

Commit

Permalink
time: fix wake-up with interval on Ready (#5553)
Browse files Browse the repository at this point in the history
When `tokio::time::Interval::poll_tick()` returns `Poll::Pending`, it
schedules itself for being woken up again through the waker of the
passed context, which is correct behavior.

However when `Poll::Ready(_)` is returned, the interval timer should be
reset but not scheduled to be woken up again as this is up to the
caller.

This commit fixes the bug by introducing a `reset_without_reregister`
method on `TimerEntry` which is called by `Intervall::poll_tick(cx)` in
case the delay poll returns `Poll::Ready(_)`.

Co-authored-by: Simon B. Gasse <sgasse@users.noreply.github.com>
  • Loading branch information
sgasse and sgasse authored Mar 27, 2023
1 parent 822af18 commit 68b02db
Show file tree
Hide file tree
Showing 5 changed files with 142 additions and 12 deletions.
14 changes: 8 additions & 6 deletions tokio/src/runtime/time/entry.rs
Original file line number Diff line number Diff line change
Expand Up @@ -527,19 +527,21 @@ impl TimerEntry {
unsafe { self.driver().clear_entry(NonNull::from(self.inner())) };
}

pub(crate) fn reset(mut self: Pin<&mut Self>, new_time: Instant) {
pub(crate) fn reset(mut self: Pin<&mut Self>, new_time: Instant, reregister: bool) {
unsafe { self.as_mut().get_unchecked_mut() }.deadline = new_time;
unsafe { self.as_mut().get_unchecked_mut() }.registered = true;
unsafe { self.as_mut().get_unchecked_mut() }.registered = reregister;

let tick = self.driver().time_source().deadline_to_tick(new_time);

if self.inner().extend_expiration(tick).is_ok() {
return;
}

unsafe {
self.driver()
.reregister(&self.driver.driver().io, tick, self.inner().into());
if reregister {
unsafe {
self.driver()
.reregister(&self.driver.driver().io, tick, self.inner().into());
}
}
}

Expand All @@ -553,7 +555,7 @@ impl TimerEntry {

if !self.registered {
let deadline = self.deadline;
self.as_mut().reset(deadline);
self.as_mut().reset(deadline, true);
}

let this = unsafe { self.get_unchecked_mut() };
Expand Down
2 changes: 1 addition & 1 deletion tokio/src/runtime/time/tests/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -164,7 +164,7 @@ fn reset_future() {
.as_mut()
.poll_elapsed(&mut Context::from_waker(futures::task::noop_waker_ref()));

entry.as_mut().reset(start + Duration::from_secs(2));
entry.as_mut().reset(start + Duration::from_secs(2), true);

// shouldn't complete before 2s
block_on(futures::future::poll_fn(|cx| {
Expand Down
5 changes: 4 additions & 1 deletion tokio/src/time/interval.rs
Original file line number Diff line number Diff line change
Expand Up @@ -482,7 +482,10 @@ impl Interval {
timeout + self.period
};

self.delay.as_mut().reset(next);
// When we arrive here, the internal delay returned `Poll::Ready`.
// Reset the delay but do not register it. It should be registered with
// the next call to [`poll_tick`].
self.delay.as_mut().reset_without_reregister(next);

// Return the time when we were scheduled to tick
Poll::Ready(timeout)
Expand Down
15 changes: 14 additions & 1 deletion tokio/src/time/sleep.rs
Original file line number Diff line number Diff line change
Expand Up @@ -353,9 +353,22 @@ impl Sleep {
self.reset_inner(deadline)
}

/// Resets the `Sleep` instance to a new deadline without reregistering it
/// to be woken up.
///
/// Calling this function allows changing the instant at which the `Sleep`
/// future completes without having to create new associated state and
/// without having it registered. This is required in e.g. the
/// [crate::time::Interval] where we want to reset the internal [Sleep]
/// without having it wake up the last task that polled it.
pub(crate) fn reset_without_reregister(self: Pin<&mut Self>, deadline: Instant) {
let mut me = self.project();
me.entry.as_mut().reset(deadline, false);
}

fn reset_inner(self: Pin<&mut Self>, deadline: Instant) {
let mut me = self.project();
me.entry.as_mut().reset(deadline);
me.entry.as_mut().reset(deadline, true);

#[cfg(all(tokio_unstable, feature = "tracing"))]
{
Expand Down
118 changes: 115 additions & 3 deletions tokio/tests/time_interval.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,12 @@
#![warn(rust_2018_idioms)]
#![cfg(feature = "full")]

use tokio::time::{self, Duration, Instant, MissedTickBehavior};
use tokio_test::{assert_pending, assert_ready_eq, task};
use std::pin::Pin;
use std::task::{Context, Poll};

use std::task::Poll;
use futures::{Stream, StreamExt};
use tokio::time::{self, Duration, Instant, Interval, MissedTickBehavior};
use tokio_test::{assert_pending, assert_ready_eq, task};

// Takes the `Interval` task, `start` variable, and optional time deltas
// For each time delta, it polls the `Interval` and asserts that the result is
Expand Down Expand Up @@ -209,3 +211,113 @@ fn poll_next(interval: &mut task::Spawn<time::Interval>) -> Poll<Instant> {
fn ms(n: u64) -> Duration {
Duration::from_millis(n)
}

/// Helper struct to test the [tokio::time::Interval::poll_tick()] method.
///
/// `poll_tick()` should register the waker in the context only if it returns
/// `Poll::Pending`, not when returning `Poll::Ready`. This struct contains an
/// interval timer and counts up on every tick when used as stream. When the
/// counter is a multiple of four, it yields the current counter value.
/// Depending on the value for `wake_on_pending`, it will reschedule itself when
/// it returns `Poll::Pending` or not. When used with `wake_on_pending=false`,
/// we expect that the stream stalls because the timer will **not** reschedule
/// the next wake-up itself once it returned `Poll::Ready`.
struct IntervalStreamer {
counter: u32,
timer: Interval,
wake_on_pending: bool,
}

impl Stream for IntervalStreamer {
type Item = u32;

fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
let this = Pin::into_inner(self);

if this.counter > 12 {
return Poll::Ready(None);
}

match this.timer.poll_tick(cx) {
Poll::Pending => Poll::Pending,
Poll::Ready(_) => {
this.counter += 1;
if this.counter % 4 == 0 {
Poll::Ready(Some(this.counter))
} else {
if this.wake_on_pending {
// Schedule this task for wake-up
cx.waker().wake_by_ref();
}
Poll::Pending
}
}
}
}
}

#[tokio::test(start_paused = true)]
async fn stream_with_interval_poll_tick_self_waking() {
let stream = IntervalStreamer {
counter: 0,
timer: tokio::time::interval(tokio::time::Duration::from_millis(10)),
wake_on_pending: true,
};

let (res_tx, mut res_rx) = tokio::sync::mpsc::channel(12);

// Wrap task in timeout so that it will finish eventually even if the stream
// stalls.
tokio::spawn(tokio::time::timeout(
tokio::time::Duration::from_millis(150),
async move {
tokio::pin!(stream);

while let Some(item) = stream.next().await {
res_tx.send(item).await.ok();
}
},
));

let mut items = Vec::with_capacity(3);
while let Some(result) = res_rx.recv().await {
items.push(result);
}

// We expect the stream to yield normally and thus three items.
assert_eq!(items, vec![4, 8, 12]);
}

#[tokio::test(start_paused = true)]
async fn stream_with_interval_poll_tick_no_waking() {
let stream = IntervalStreamer {
counter: 0,
timer: tokio::time::interval(tokio::time::Duration::from_millis(10)),
wake_on_pending: false,
};

let (res_tx, mut res_rx) = tokio::sync::mpsc::channel(12);

// Wrap task in timeout so that it will finish eventually even if the stream
// stalls.
tokio::spawn(tokio::time::timeout(
tokio::time::Duration::from_millis(150),
async move {
tokio::pin!(stream);

while let Some(item) = stream.next().await {
res_tx.send(item).await.ok();
}
},
));

let mut items = Vec::with_capacity(0);
while let Some(result) = res_rx.recv().await {
items.push(result);
}

// We expect the stream to stall because it does not reschedule itself on
// `Poll::Pending` and neither does [tokio::time::Interval] reschedule the
// task when returning `Poll::Ready`.
assert_eq!(items, vec![]);
}

0 comments on commit 68b02db

Please sign in to comment.