diff --git a/crossbeam-utils/src/sync/parker.rs b/crossbeam-utils/src/sync/parker.rs index fc13d2e96..bf9d6f347 100644 --- a/crossbeam-utils/src/sync/parker.rs +++ b/crossbeam-utils/src/sync/parker.rs @@ -3,18 +3,17 @@ use std::marker::PhantomData; use std::sync::atomic::AtomicUsize; use std::sync::atomic::Ordering::SeqCst; use std::sync::{Arc, Condvar, Mutex}; -use std::time::Duration; +use std::time::{Duration, Instant}; /// A thread parking primitive. /// /// Conceptually, each `Parker` has an associated token which is initially not present: /// /// * The [`park`] method blocks the current thread unless or until the token is available, at -/// which point it automatically consumes the token. It may also return *spuriously*, without -/// consuming the token. +/// which point it automatically consumes the token. /// -/// * The [`park_timeout`] method works the same as [`park`], but blocks for a specified maximum -/// time. +/// * The [`park_timeout`] and [`park_deadline`] methods work the same as [`park`], but block for +/// a specified maximum time. /// /// * The [`unpark`] method atomically makes the token available if it wasn't already. Because the /// token is initially absent, [`unpark`] followed by [`park`] will result in the second call @@ -43,13 +42,13 @@ use std::time::Duration; /// u.unpark(); /// }); /// -/// // Wakes up when `u.unpark()` provides the token, but may also wake up -/// // spuriously before that without consuming the token. +/// // Wakes up when `u.unpark()` provides the token. /// p.park(); /// ``` /// /// [`park`]: Parker::park /// [`park_timeout`]: Parker::park_timeout +/// [`park_deadline`]: Parker::park_deadline /// [`unpark`]: Unparker::unpark pub struct Parker { unparker: Unparker, @@ -90,9 +89,6 @@ impl Parker { /// Blocks the current thread until the token is made available. /// - /// A call to `park` may wake up spuriously without consuming the token, and callers should be - /// prepared for this possibility. - /// /// # Examples /// /// ``` @@ -113,9 +109,6 @@ impl Parker { /// Blocks the current thread until the token is made available, but only for a limited time. /// - /// A call to `park_timeout` may wake up spuriously without consuming the token, and callers - /// should be prepared for this possibility. - /// /// # Examples /// /// ``` @@ -128,7 +121,25 @@ impl Parker { /// p.park_timeout(Duration::from_millis(500)); /// ``` pub fn park_timeout(&self, timeout: Duration) { - self.unparker.inner.park(Some(timeout)); + self.park_deadline(Instant::now() + timeout) + } + + /// Blocks the current thread until the token is made available, or until a certain deadline. + /// + /// # Examples + /// + /// ``` + /// use std::time::{Duration, Instant}; + /// use crossbeam_utils::sync::Parker; + /// + /// let p = Parker::new(); + /// let deadline = Instant::now() + Duration::from_millis(500); + /// + /// // Waits for the token to become available, but will not wait longer than 500 ms. + /// p.park_deadline(deadline); + /// ``` + pub fn park_deadline(&self, deadline: Instant) { + self.unparker.inner.park(Some(deadline)) } /// Returns a reference to an associated [`Unparker`]. @@ -227,8 +238,7 @@ impl Unparker { /// u.unpark(); /// }); /// - /// // Wakes up when `u.unpark()` provides the token, but may also wake up - /// // spuriously before that without consuming the token. + /// // Wakes up when `u.unpark()` provides the token. /// p.park(); /// ``` /// @@ -302,7 +312,7 @@ struct Inner { } impl Inner { - fn park(&self, timeout: Option) { + fn park(&self, deadline: Option) { // If we were previously notified then we consume this notification and return quickly. if self .state @@ -313,8 +323,8 @@ impl Inner { } // If the timeout is zero, then there is no need to actually block. - if let Some(ref dur) = timeout { - if *dur == Duration::from_millis(0) { + if let Some(deadline) = deadline { + if deadline <= Instant::now() { return; } } @@ -338,36 +348,38 @@ impl Inner { Err(n) => panic!("inconsistent park_timeout state: {}", n), } - match timeout { - None => { - loop { - // Block the current thread on the conditional variable. - m = self.cvar.wait(m).unwrap(); - - if self - .state - .compare_exchange(NOTIFIED, EMPTY, SeqCst, SeqCst) - .is_ok() - { - // got a notification - return; + loop { + // Block the current thread on the conditional variable. + m = match deadline { + None => self.cvar.wait(m).unwrap(), + Some(deadline) => { + let now = Instant::now(); + if now < deadline { + // We could check for a timeout here, in the return value of wait_timeout, + // but in the case that a timeout and an unpark arrive simultaneously, we + // prefer to report the former. + self.cvar.wait_timeout(m, deadline - now).unwrap().0 + } else { + // We've timed out; swap out the state back to empty on our way out + match self.state.swap(EMPTY, SeqCst) { + NOTIFIED | PARKED => return, + n => panic!("inconsistent park_timeout state: {}", n), + }; } - - // spurious wakeup, go back to sleep } - } - Some(timeout) => { - // Wait with a timeout, and if we spuriously wake up or otherwise wake up from a - // notification we just want to unconditionally set `state` back to `EMPTY`, either - // consuming a notification or un-flagging ourselves as parked. - let (_m, _result) = self.cvar.wait_timeout(m, timeout).unwrap(); + }; - match self.state.swap(EMPTY, SeqCst) { - NOTIFIED => {} // got a notification - PARKED => {} // no notification - n => panic!("inconsistent park_timeout state: {}", n), - } + if self + .state + .compare_exchange(NOTIFIED, EMPTY, SeqCst, SeqCst) + .is_ok() + { + // got a notification + return; } + + // Spurious wakeup, go back to sleep. Alternatively, if we timed out, it will be caught + // in the branch above, when we discover the deadline is in the past } } diff --git a/crossbeam-utils/tests/parker.rs b/crossbeam-utils/tests/parker.rs index f657eb1cf..2bf9c37d4 100644 --- a/crossbeam-utils/tests/parker.rs +++ b/crossbeam-utils/tests/parker.rs @@ -18,7 +18,7 @@ fn park_timeout_unpark_before() { fn park_timeout_unpark_not_called() { let p = Parker::new(); for _ in 0..10 { - p.park_timeout(Duration::from_millis(10)); + p.park_timeout(Duration::from_millis(10)) } } @@ -34,7 +34,7 @@ fn park_timeout_unpark_called_other_thread() { u.unpark(); }); - p.park_timeout(Duration::from_millis(u32::MAX as u64)); + p.park_timeout(Duration::from_millis(u32::MAX as u64)) }) .unwrap(); }