Skip to content

Commit

Permalink
feat(maitake-sync): Add wait_for and wait_for_value to WaitCell a…
Browse files Browse the repository at this point in the history
…nd WaitQueue (#479)

Implementing the interface that @hawkw so wisely suggested
  • Loading branch information
jamesmunns authored Jul 11, 2024
1 parent e456145 commit 6dc5a84
Show file tree
Hide file tree
Showing 3 changed files with 348 additions and 0 deletions.
154 changes: 154 additions & 0 deletions maitake-sync/src/wait_cell.rs
Original file line number Diff line number Diff line change
Expand Up @@ -327,6 +327,160 @@ impl WaitCell {
}
}

/// Asynchronously poll the given function `f` until a condition occurs,
/// using the [`WaitCell`] to only re-poll when notified.
///
/// This can be used to implement a "wait loop", turning a "try" function
/// (e.g. "try_recv" or "try_send") into an asynchronous function (e.g.
/// "recv" or "send").
///
/// In particular, this function correctly *registers* interest in the [`WaitCell`]
/// prior to polling the function, ensuring that there is not a chance of a race
/// where the condition occurs AFTER checking but BEFORE registering interest
/// in the [`WaitCell`], which could lead to deadlock.
///
/// This is intended to have similar behavior to `Condvar` in the standard library,
/// but asynchronous, and not requiring operating system intervention (or existence).
///
/// In particular, this can be used in cases where interrupts or events are used
/// to signify readiness or completion of some task, such as the completion of a
/// DMA transfer, or reception of an ethernet frame. In cases like this, the interrupt
/// can wake the cell, allowing the polling function to check status fields for
/// partial progress or completion.
///
/// Consider using [`Self::wait_for_value()`] if your function does return a value.
///
/// Consider using [`WaitQueue::wait_for()`](super::wait_queue::WaitQueue::wait_for)
/// if you need multiple waiters.
///
/// # Returns
///
/// * [`Ok`]`(())` if the closure returns `true`.
/// * [`Err`]`(`[`Closed`]`)` if the [`WaitCell`] is closed.
///
/// # Examples
///
/// ```
/// # use tokio::task;
/// # #[tokio::main(flavor = "current_thread")]
/// # async fn test() {
/// use std::sync::Arc;
/// use maitake_sync::WaitCell;
/// use std::sync::atomic::{AtomicU8, Ordering};
///
/// let queue = Arc::new(WaitCell::new());
/// let num = Arc::new(AtomicU8::new(0));
///
/// let waiter = task::spawn({
/// // clone items to move into the spawned task
/// let queue = queue.clone();
/// let num = num.clone();
/// async move {
/// queue.wait_for(|| num.load(Ordering::Relaxed) > 5).await;
/// println!("received wakeup!");
/// }
/// });
///
/// println!("poking task...");
///
/// for i in 0..20 {
/// num.store(i, Ordering::Relaxed);
/// queue.wake();
/// }
///
/// waiter.await.unwrap();
/// # }
/// # test();
/// ```
pub async fn wait_for<F: FnMut() -> bool>(&self, mut f: F) -> Result<(), Closed> {
loop {
let wait = self.subscribe().await;
if f() {
return Ok(());
}
wait.await?;
}
}

/// Asynchronously poll the given function `f` until a condition occurs,
/// using the [`WaitCell`] to only re-poll when notified.
///
/// This can be used to implement a "wait loop", turning a "try" function
/// (e.g. "try_recv" or "try_send") into an asynchronous function (e.g.
/// "recv" or "send").
///
/// In particular, this function correctly *registers* interest in the [`WaitCell`]
/// prior to polling the function, ensuring that there is not a chance of a race
/// where the condition occurs AFTER checking but BEFORE registering interest
/// in the [`WaitCell`], which could lead to deadlock.
///
/// This is intended to have similar behavior to `Condvar` in the standard library,
/// but asynchronous, and not requiring operating system intervention (or existence).
///
/// In particular, this can be used in cases where interrupts or events are used
/// to signify readiness or completion of some task, such as the completion of a
/// DMA transfer, or reception of an ethernet frame. In cases like this, the interrupt
/// can wake the cell, allowing the polling function to check status fields for
/// partial progress or completion, and also return the status flags at the same time.
///
/// Consider using [`Self::wait_for()`] if your function does not return a value.
///
/// Consider using [`WaitQueue::wait_for_value()`](super::wait_queue::WaitQueue::wait_for_value) if you need multiple waiters.
///
/// * [`Ok`]`(T)` if the closure returns [`Some`]`(T)`.
/// * [`Err`]`(`[`Closed`]`)` if the [`WaitCell`] is closed.
///
/// # Examples
///
/// ```
/// # use tokio::task;
/// # #[tokio::main(flavor = "current_thread")]
/// # async fn test() {
/// use std::sync::Arc;
/// use maitake_sync::WaitCell;
/// use std::sync::atomic::{AtomicU8, Ordering};
///
/// let queue = Arc::new(WaitCell::new());
/// let num = Arc::new(AtomicU8::new(0));
///
/// let waiter = task::spawn({
/// // clone items to move into the spawned task
/// let queue = queue.clone();
/// let num = num.clone();
/// async move {
/// let rxd = queue.wait_for_value(|| {
/// let val = num.load(Ordering::Relaxed);
/// if val > 5 {
/// return Some(val);
/// }
/// None
/// }).await.unwrap();
/// assert!(rxd > 5);
/// println!("received wakeup with value: {rxd}");
/// }
/// });
///
/// println!("poking task...");
///
/// for i in 0..20 {
/// num.store(i, Ordering::Relaxed);
/// queue.wake();
/// }
///
/// waiter.await.unwrap();
/// # }
/// # test();
/// ```
pub async fn wait_for_value<T, F: FnMut() -> Option<T>>(&self, mut f: F) -> Result<T, Closed> {
loop {
let wait = self.subscribe().await;
if let Some(t) = f() {
return Ok(t);
}
wait.await?;
}
}

// TODO(eliza): is this an API we want to have?
/*
/// Returns `true` if this `WaitCell` is [closed](Self::close).
Expand Down
191 changes: 191 additions & 0 deletions maitake-sync/src/wait_queue.rs
Original file line number Diff line number Diff line change
Expand Up @@ -684,6 +684,197 @@ impl WaitQueue {
}
}

/// Asynchronously poll the given function `f` until a condition occurs,
/// using the [`WaitQueue`] to only re-poll when notified.
///
/// This can be used to implement a "wait loop", turning a "try" function
/// (e.g. "try_recv" or "try_send") into an asynchronous function (e.g.
/// "recv" or "send").
///
/// In particular, this function correctly *registers* interest in the [`WaitQueue`]
/// prior to polling the function, ensuring that there is not a chance of a race
/// where the condition occurs AFTER checking but BEFORE registering interest
/// in the [`WaitQueue`], which could lead to deadlock.
///
/// This is intended to have similar behavior to `Condvar` in the standard library,
/// but asynchronous, and not requiring operating system intervention (or existence).
///
/// In particular, this can be used in cases where interrupts or events are used
/// to signify readiness or completion of some task, such as the completion of a
/// DMA transfer, or reception of an ethernet frame. In cases like this, the interrupt
/// can wake the queue, allowing the polling function to check status fields for
/// partial progress or completion.
///
/// Consider using [`Self::wait_for_value()`] if your function does return a value.
///
/// Consider using [`WaitCell::wait_for()`](super::wait_cell::WaitCell::wait_for)
/// if you do not need multiple waiters.
///
/// # Returns
///
/// * [`Ok`]`(())` if the closure returns `true`.
/// * [`Err`]`(`[`Closed`](crate::Closed)`)` if the [`WaitQueue`] is closed.
///
/// # Examples
///
/// ```
/// # use tokio::task;
/// # #[tokio::main(flavor = "current_thread")]
/// # async fn test() {
/// use std::sync::Arc;
/// use maitake_sync::WaitQueue;
/// use std::sync::atomic::{AtomicU8, Ordering};
///
/// let queue = Arc::new(WaitQueue::new());
/// let num = Arc::new(AtomicU8::new(0));
///
/// let waiter1 = task::spawn({
/// // clone items to move into the spawned task
/// let queue = queue.clone();
/// let num = num.clone();
/// async move {
/// queue.wait_for(|| num.load(Ordering::Relaxed) > 5).await;
/// println!("received wakeup!");
/// }
/// });
///
/// let waiter2 = task::spawn({
/// // clone items to move into the spawned task
/// let queue = queue.clone();
/// let num = num.clone();
/// async move {
/// queue.wait_for(|| num.load(Ordering::Relaxed) > 10).await;
/// println!("received wakeup!");
/// }
/// });
///
/// println!("poking task...");
///
/// for i in 0..20 {
/// num.store(i, Ordering::Relaxed);
/// queue.wake();
/// }
///
/// waiter1.await.unwrap();
/// waiter2.await.unwrap();
/// # }
/// # test();
/// ```
pub async fn wait_for<F: FnMut() -> bool>(&self, mut f: F) -> WaitResult<()> {
loop {
let wait = self.wait();
let mut wait = core::pin::pin!(wait);
let _ = wait.as_mut().subscribe()?;
if f() {
return Ok(());
}
wait.await?;
}
}

/// Asynchronously poll the given function `f` until a condition occurs,
/// using the [`WaitQueue`] to only re-poll when notified.
///
/// This can be used to implement a "wait loop", turning a "try" function
/// (e.g. "try_recv" or "try_send") into an asynchronous function (e.g.
/// "recv" or "send").
///
/// In particular, this function correctly *registers* interest in the [`WaitQueue`]
/// prior to polling the function, ensuring that there is not a chance of a race
/// where the condition occurs AFTER checking but BEFORE registering interest
/// in the [`WaitQueue`], which could lead to deadlock.
///
/// This is intended to have similar behavior to `Condvar` in the standard library,
/// but asynchronous, and not requiring operating system intervention (or existence).
///
/// In particular, this can be used in cases where interrupts or events are used
/// to signify readiness or completion of some task, such as the completion of a
/// DMA transfer, or reception of an ethernet frame. In cases like this, the interrupt
/// can wake the queue, allowing the polling function to check status fields for
/// partial progress or completion, and also return the status flags at the same time.
///
/// Consider using [`Self::wait_for()`] if your function does not return a value.
///
/// Consider using [`WaitCell::wait_for_value()`](super::wait_cell::WaitCell::wait_for_value)
/// if you do not need multiple waiters.
///
/// * [`Ok`]`(T)` if the closure returns [`Some`]`(T)`.
/// * [`Err`]`(`[`Closed`](crate::Closed)`)` if the [`WaitQueue`] is closed.
///
/// # Examples
///
/// ```
/// # use tokio::task;
/// # #[tokio::main(flavor = "current_thread")]
/// # async fn test() {
/// use std::sync::Arc;
/// use maitake_sync::WaitQueue;
/// use std::sync::atomic::{AtomicU8, Ordering};
///
/// let queue = Arc::new(WaitQueue::new());
/// let num = Arc::new(AtomicU8::new(0));
///
/// let waiter1 = task::spawn({
/// // clone items to move into the spawned task
/// let queue = queue.clone();
/// let num = num.clone();
/// async move {
/// let rxd = queue.wait_for_value(|| {
/// let val = num.load(Ordering::Relaxed);
/// if val > 5 {
/// return Some(val);
/// }
/// None
/// }).await.unwrap();
/// assert!(rxd > 5);
/// println!("received wakeup with value: {rxd}");
/// }
/// });
///
/// let waiter2 = task::spawn({
/// // clone items to move into the spawned task
/// let queue = queue.clone();
/// let num = num.clone();
/// async move {
/// let rxd = queue.wait_for_value(|| {
/// let val = num.load(Ordering::Relaxed);
/// if val > 10 {
/// return Some(val);
/// }
/// None
/// }).await.unwrap();
/// assert!(rxd > 10);
/// println!("received wakeup with value: {rxd}");
/// }
/// });
///
/// println!("poking task...");
///
/// for i in 0..20 {
/// num.store(i, Ordering::Relaxed);
/// queue.wake();
/// }
///
/// waiter1.await.unwrap();
/// waiter2.await.unwrap();
/// # }
/// # test();
/// ```
pub async fn wait_for_value<T, F: FnMut() -> Option<T>>(&self, mut f: F) -> WaitResult<T> {
loop {
let wait = self.wait();
let mut wait = core::pin::pin!(wait);
match wait.as_mut().subscribe() {
Poll::Ready(wr) => wr?,
Poll::Pending => {}
}
if let Some(t) = f() {
return Ok(t);
}
wait.await?;
}
}

/// Returns a [`Waiter`] entry in this queue.
///
/// This is factored out into a separate function because it's used by both
Expand Down
3 changes: 3 additions & 0 deletions maitake/src/time/timer/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,9 @@ use core::cell::RefCell;
use core::sync::atomic::{AtomicU64, Ordering};
use std::sync::Arc;

use crate::time::{Clock, timer::Ticks};
use std::time::Duration;

crate::loom::thread_local! {
static CLOCK: RefCell<Option<Arc<TestClockState>>> = RefCell::new(None);
}
Expand Down

0 comments on commit 6dc5a84

Please sign in to comment.