Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Updates to Parker #563

Merged
merged 5 commits into from
Nov 29, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
102 changes: 57 additions & 45 deletions crossbeam-utils/src/sync/parker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,18 +3,17 @@ use std::marker::PhantomData;
use std::sync::atomic::AtomicUsize;
use std::sync::atomic::Ordering::SeqCst;
use std::sync::{Arc, Condvar, Mutex};
use std::time::Duration;
use std::time::{Duration, Instant};

/// A thread parking primitive.
///
/// Conceptually, each `Parker` has an associated token which is initially not present:
///
/// * The [`park`] method blocks the current thread unless or until the token is available, at
/// which point it automatically consumes the token. It may also return *spuriously*, without
/// consuming the token.
/// which point it automatically consumes the token.
///
/// * The [`park_timeout`] method works the same as [`park`], but blocks for a specified maximum
/// time.
/// * The [`park_timeout`] and [`park_deadline`] methods work the same as [`park`], but block for
/// a specified maximum time.
///
/// * The [`unpark`] method atomically makes the token available if it wasn't already. Because the
/// token is initially absent, [`unpark`] followed by [`park`] will result in the second call
Expand Down Expand Up @@ -43,13 +42,13 @@ use std::time::Duration;
/// u.unpark();
/// });
///
/// // Wakes up when `u.unpark()` provides the token, but may also wake up
/// // spuriously before that without consuming the token.
/// // Wakes up when `u.unpark()` provides the token.
/// p.park();
/// ```
///
/// [`park`]: Parker::park
/// [`park_timeout`]: Parker::park_timeout
/// [`park_deadline`]: Parker::park_deadline
/// [`unpark`]: Unparker::unpark
pub struct Parker {
unparker: Unparker,
Expand Down Expand Up @@ -90,9 +89,6 @@ impl Parker {

/// Blocks the current thread until the token is made available.
///
/// A call to `park` may wake up spuriously without consuming the token, and callers should be
/// prepared for this possibility.
///
/// # Examples
///
/// ```
Expand All @@ -113,9 +109,6 @@ impl Parker {

/// Blocks the current thread until the token is made available, but only for a limited time.
///
/// A call to `park_timeout` may wake up spuriously without consuming the token, and callers
/// should be prepared for this possibility.
///
/// # Examples
///
/// ```
Expand All @@ -128,7 +121,25 @@ impl Parker {
/// p.park_timeout(Duration::from_millis(500));
/// ```
pub fn park_timeout(&self, timeout: Duration) {
self.unparker.inner.park(Some(timeout));
self.park_deadline(Instant::now() + timeout)
}

/// Blocks the current thread until the token is made available, or until a certain deadline.
///
/// # Examples
///
/// ```
/// use std::time::{Duration, Instant};
/// use crossbeam_utils::sync::Parker;
///
/// let p = Parker::new();
/// let deadline = Instant::now() + Duration::from_millis(500);
///
/// // Waits for the token to become available, but will not wait longer than 500 ms.
/// p.park_deadline(deadline);
/// ```
pub fn park_deadline(&self, deadline: Instant) {
self.unparker.inner.park(Some(deadline))
}

/// Returns a reference to an associated [`Unparker`].
Expand Down Expand Up @@ -227,8 +238,7 @@ impl Unparker {
/// u.unpark();
/// });
///
/// // Wakes up when `u.unpark()` provides the token, but may also wake up
/// // spuriously before that without consuming the token.
/// // Wakes up when `u.unpark()` provides the token.
/// p.park();
/// ```
///
Expand Down Expand Up @@ -302,7 +312,7 @@ struct Inner {
}

impl Inner {
fn park(&self, timeout: Option<Duration>) {
fn park(&self, deadline: Option<Instant>) {
// If we were previously notified then we consume this notification and return quickly.
if self
.state
Expand All @@ -313,8 +323,8 @@ impl Inner {
}

// If the timeout is zero, then there is no need to actually block.
if let Some(ref dur) = timeout {
if *dur == Duration::from_millis(0) {
if let Some(deadline) = deadline {
if deadline <= Instant::now() {
return;
}
}
Expand All @@ -338,36 +348,38 @@ impl Inner {
Err(n) => panic!("inconsistent park_timeout state: {}", n),
}

match timeout {
None => {
loop {
// Block the current thread on the conditional variable.
m = self.cvar.wait(m).unwrap();

if self
.state
.compare_exchange(NOTIFIED, EMPTY, SeqCst, SeqCst)
.is_ok()
{
// got a notification
return;
loop {
// Block the current thread on the conditional variable.
m = match deadline {
None => self.cvar.wait(m).unwrap(),
Some(deadline) => {
let now = Instant::now();
if now < deadline {
// We could check for a timeout here, in the return value of wait_timeout,
// but in the case that a timeout and an unpark arrive simultaneously, we
// prefer to report the former.
self.cvar.wait_timeout(m, deadline - now).unwrap().0
} else {
// We've timed out; swap out the state back to empty on our way out
match self.state.swap(EMPTY, SeqCst) {
NOTIFIED | PARKED => return,
n => panic!("inconsistent park_timeout state: {}", n),
};
}

// spurious wakeup, go back to sleep
}
}
Some(timeout) => {
// Wait with a timeout, and if we spuriously wake up or otherwise wake up from a
// notification we just want to unconditionally set `state` back to `EMPTY`, either
// consuming a notification or un-flagging ourselves as parked.
let (_m, _result) = self.cvar.wait_timeout(m, timeout).unwrap();
};

match self.state.swap(EMPTY, SeqCst) {
NOTIFIED => {} // got a notification
PARKED => {} // no notification
n => panic!("inconsistent park_timeout state: {}", n),
}
if self
.state
.compare_exchange(NOTIFIED, EMPTY, SeqCst, SeqCst)
.is_ok()
{
// got a notification
return;
}

// Spurious wakeup, go back to sleep. Alternatively, if we timed out, it will be caught
// in the branch above, when we discover the deadline is in the past
}
}

Expand Down
4 changes: 2 additions & 2 deletions crossbeam-utils/tests/parker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ fn park_timeout_unpark_before() {
fn park_timeout_unpark_not_called() {
let p = Parker::new();
for _ in 0..10 {
p.park_timeout(Duration::from_millis(10));
p.park_timeout(Duration::from_millis(10))
}
}

Expand All @@ -34,7 +34,7 @@ fn park_timeout_unpark_called_other_thread() {
u.unpark();
});

p.park_timeout(Duration::from_millis(u32::MAX as u64));
p.park_timeout(Duration::from_millis(u32::MAX as u64))
})
.unwrap();
}
Expand Down