diff --git a/tokio-util/src/time/delay_queue.rs b/tokio-util/src/time/delay_queue.rs index 4edd5cd641f..adc2bc5180a 100644 --- a/tokio-util/src/time/delay_queue.rs +++ b/tokio-util/src/time/delay_queue.rs @@ -36,14 +36,14 @@ use std::task::{self, Poll, Waker}; /// # `Stream` implementation /// /// Items are retrieved from the queue via [`DelayQueue::poll_expired`]. If no delays have -/// expired, no items are returned. In this case, `NotReady` is returned and the +/// expired, no items are returned. In this case, `Pending` is returned and the /// current task is registered to be notified once the next item's delay has /// expired. /// /// If no items are in the queue, i.e. `is_empty()` returns `true`, then `poll` /// returns `Ready(None)`. This indicates that the stream has reached an end. /// However, if a new item is inserted *after*, `poll` will once again start -/// returning items or `NotReady. +/// returning items or `Pending. /// /// Items are returned ordered by their expirations. Items that are configured /// to expire first will be returned first. There are no ordering guarantees @@ -538,7 +538,7 @@ impl DelayQueue { /// /// delay_queue.reset_at(&key, Instant::now() + Duration::from_secs(10)); /// - /// // "foo"is now scheduled to be returned in 10 seconds + /// // "foo" is now scheduled to be returned in 10 seconds /// # } /// ``` pub fn reset_at(&mut self, key: &Key, when: Instant) { @@ -548,6 +548,8 @@ impl DelayQueue { let when = self.normalize_deadline(when); self.slab[key.index].when = when; + self.slab[key.index].expired = false; + self.insert_idx(when, key.index); let next_deadline = self.next_deadline(); @@ -711,7 +713,7 @@ impl DelayQueue { /// Returns `true` if there are no items in the queue. /// /// Note that this function returns `false` even if all items have not yet - /// expired and a call to `poll` will return `NotReady`. + /// expired and a call to `poll` will return `Pending`. /// /// # Examples /// diff --git a/tokio-util/src/time/wheel/level.rs b/tokio-util/src/time/wheel/level.rs index 49f9bfb9cf0..e24bd4fbf39 100644 --- a/tokio-util/src/time/wheel/level.rs +++ b/tokio-util/src/time/wheel/level.rs @@ -233,14 +233,13 @@ fn slot_for(duration: u64, level: usize) -> usize { ((duration >> (level * 6)) % LEVEL_MULT as u64) as usize } -/* #[cfg(all(test, not(loom)))] mod test { use super::*; #[test] fn test_slot_for() { - for pos in 1..64 { + for pos in 0..64 { assert_eq!(pos as usize, slot_for(pos, 0)); } @@ -252,4 +251,3 @@ mod test { } } } -*/ diff --git a/tokio-util/src/time/wheel/mod.rs b/tokio-util/src/time/wheel/mod.rs index 478037a3098..8fed0bf431d 100644 --- a/tokio-util/src/time/wheel/mod.rs +++ b/tokio-util/src/time/wheel/mod.rs @@ -116,9 +116,17 @@ where Ok(()) } - /// Remove `item` from thee timing wheel. + /// Remove `item` from the timing wheel. pub(crate) fn remove(&mut self, item: &T::Borrowed, store: &mut T::Store) { let when = T::when(item, store); + + assert!( + self.elapsed <= when, + "elapsed={}; when={}", + self.elapsed, + when + ); + let level = self.level_for(when); self.levels[level].remove_entry(when, item, store); @@ -240,9 +248,11 @@ where } fn level_for(elapsed: u64, when: u64) -> usize { - let masked = elapsed ^ when; + const SLOT_MASK: u64 = (1 << 6) - 1; - assert!(masked != 0, "elapsed={}; when={}", elapsed, when); + // Mask in the trailing bits ignored by the level calculation in order to cap + // the possible leading zeros + let masked = elapsed ^ when | SLOT_MASK; let leading_zeros = masked.leading_zeros() as usize; let significant = 63 - leading_zeros; @@ -255,7 +265,7 @@ mod test { #[test] fn test_level_for() { - for pos in 1..64 { + for pos in 0..64 { assert_eq!( 0, level_for(0, pos), diff --git a/tokio-util/tests/time_delay_queue.rs b/tokio-util/tests/time_delay_queue.rs index d42dca87dab..2758a22d5f2 100644 --- a/tokio-util/tests/time_delay_queue.rs +++ b/tokio-util/tests/time_delay_queue.rs @@ -245,6 +245,35 @@ async fn reset_twice() { assert!(queue.is_woken()); } +/// Regression test: Given an entry inserted with a deadline in the past, so +/// that it is placed directly on the expired queue, reset the entry to a +/// deadline in the future. Validate that this leaves the entry and queue in an +/// internally consistent state by running an additional reset on the entry +/// before polling it to completion. +#[tokio::test] +async fn repeatedly_reset_entry_inserted_as_expired() { + time::pause(); + let mut queue = task::spawn(DelayQueue::new()); + let now = Instant::now(); + + let key = queue.insert_at("foo", now - ms(100)); + + queue.reset_at(&key, now + ms(100)); + queue.reset_at(&key, now + ms(50)); + + assert_pending!(poll!(queue)); + + time::sleep_until(now + ms(60)).await; + + assert!(queue.is_woken()); + + let entry = assert_ready_ok!(poll!(queue)).into_inner(); + assert_eq!(entry, "foo"); + + let entry = assert_ready!(poll!(queue)); + assert!(entry.is_none()); +} + #[tokio::test] async fn remove_expired_item() { time::pause(); @@ -261,6 +290,38 @@ async fn remove_expired_item() { assert_eq!(entry.into_inner(), "foo"); } +/// Regression test: it should be possible to remove entries which fall in the +/// 0th slot of the internal timer wheel — that is, entries whose expiration +/// (a) falls at the beginning of one of the wheel's hierarchical levels and (b) +/// is equal to the wheel's current elapsed time. +#[tokio::test] +async fn remove_at_timer_wheel_threshold() { + time::pause(); + + let mut queue = task::spawn(DelayQueue::new()); + + let now = Instant::now(); + + let key1 = queue.insert_at("foo", now + ms(64)); + let key2 = queue.insert_at("bar", now + ms(64)); + + sleep(ms(80)).await; + + let entry = assert_ready_ok!(poll!(queue)).into_inner(); + + match entry { + "foo" => { + let entry = queue.remove(&key2).into_inner(); + assert_eq!(entry, "bar"); + } + "bar" => { + let entry = queue.remove(&key1).into_inner(); + assert_eq!(entry, "foo"); + } + other => panic!("other: {:?}", other), + } +} + #[tokio::test] async fn expires_before_last_insert() { time::pause(); diff --git a/tokio/src/time/driver/wheel/level.rs b/tokio/src/time/driver/wheel/level.rs index 58280b10a6e..81d6b58c71f 100644 --- a/tokio/src/time/driver/wheel/level.rs +++ b/tokio/src/time/driver/wheel/level.rs @@ -255,14 +255,13 @@ fn slot_for(duration: u64, level: usize) -> usize { ((duration >> (level * 6)) % LEVEL_MULT as u64) as usize } -/* #[cfg(all(test, not(loom)))] mod test { use super::*; #[test] fn test_slot_for() { - for pos in 1..64 { + for pos in 0..64 { assert_eq!(pos as usize, slot_for(pos, 0)); } @@ -274,4 +273,3 @@ mod test { } } } -*/ diff --git a/tokio/src/time/driver/wheel/mod.rs b/tokio/src/time/driver/wheel/mod.rs index 164cac4679a..24bf517c740 100644 --- a/tokio/src/time/driver/wheel/mod.rs +++ b/tokio/src/time/driver/wheel/mod.rs @@ -122,6 +122,13 @@ impl Wheel { if when == u64::max_value() { self.pending.remove(item); } else { + debug_assert!( + self.elapsed <= when, + "elapsed={}; when={}", + self.elapsed, + when + ); + let level = self.level_for(when); self.levels[level].remove_entry(item); @@ -281,15 +288,17 @@ impl Wheel { } fn level_for(elapsed: u64, when: u64) -> usize { - let mut masked = elapsed ^ when; + const SLOT_MASK: u64 = (1 << 6) - 1; + + // Mask in the trailing bits ignored by the level calculation in order to cap + // the possible leading zeros + let mut masked = elapsed ^ when | SLOT_MASK; if masked >= MAX_DURATION { // Fudge the timer into the top level masked = MAX_DURATION - 1; } - assert!(masked != 0, "elapsed={}; when={}", elapsed, when); - let leading_zeros = masked.leading_zeros() as usize; let significant = 63 - leading_zeros; @@ -302,7 +311,7 @@ mod test { #[test] fn test_level_for() { - for pos in 1..64 { + for pos in 0..64 { assert_eq!( 0, level_for(0, pos),