Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Make it possible to time out after a deadline #1

Merged
merged 1 commit into from
May 15, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ jobs:
matrix:
rust:
- stable
- 1.56.1
- 1.77.1
steps:
- name: Checkout sources
uses: actions/checkout@v3
Expand Down
5 changes: 4 additions & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ name = "async-event"
version = "0.1.0"
authors = ["Serge Barral <serge.barral@asynchronics.com>"]
edition = "2021"
rust-version = "1.56"
rust-version = "1.77"
license = "MIT OR Apache-2.0"
repository = "https://github.com/asynchronics/async-event"
readme = "README.md"
Expand All @@ -18,6 +18,9 @@ An efficient async condition variable for lock-free algorithms.
categories = ["asynchronous", "concurrency"]
keywords = ["async", "event", "atomic", "futures"]

[dependencies]
pin-project-lite = "0.2"

[dev-dependencies]
tokio = { version = "1", features = ["full"] }
futures-executor = "0.3"
Expand Down
76 changes: 75 additions & 1 deletion src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,12 @@
//! });
//! ```

// Temporary workaround until the `async_event_loom` flag can be whitelisted
// without a `build.rs` [1].
//
// [1]: (https://github.com/rust-lang/rust/issues/124800).
#![allow(unexpected_cfgs)]

mod loom_exports;

use std::future::Future;
Expand All @@ -81,6 +87,7 @@ use std::task::{Context, Poll, Waker};
use loom_exports::cell::UnsafeCell;
use loom_exports::sync::atomic::{self, AtomicBool};
use loom_exports::sync::Mutex;
use pin_project_lite::pin_project;

/// An object that can receive or send notifications.
pub struct Event {
Expand Down Expand Up @@ -130,9 +137,29 @@ impl Event {

/// Returns a future that can be `await`ed until the provided predicate is
/// satisfied.
pub fn wait_until<F: FnMut() -> Option<T>, T>(&self, predicate: F) -> WaitUntil<F, T> {
pub fn wait_until<F, T>(&self, predicate: F) -> WaitUntil<F, T>
where
F: FnMut() -> Option<T>,
{
WaitUntil::new(&self.wait_set, predicate)
}

/// Returns a future that can be `await`ed until the provided predicate is
/// satisfied or until the provided future completes.
///
/// The deadline is specified as a `Future` that is expected to resolves to
/// `()` after some duration, such as a `tokio::time::Sleep` future.
pub fn wait_until_or_timeout<F, T, D>(
&self,
predicate: F,
deadline: D,
) -> WaitUntilOrTimeout<F, T, D>
where
F: FnMut() -> Option<T>,
D: Future<Output = ()>,
{
WaitUntilOrTimeout::new(&self.wait_set, predicate, deadline)
}
}

impl Default for Event {
Expand Down Expand Up @@ -378,6 +405,53 @@ enum WaitUntilState {
Completed,
}

pin_project! {
/// A future that can be `await`ed until a predicate is satisfied or until a
/// deadline elapses.
pub struct WaitUntilOrTimeout<'a, F: FnMut() -> Option<T>, T, D: Future<Output = ()>> {
wait_until: WaitUntil<'a, F, T>,
#[pin]
deadline: D,
}
}

impl<'a, F, T, D> WaitUntilOrTimeout<'a, F, T, D>
where
F: FnMut() -> Option<T>,
D: Future<Output = ()>,
{
/// Creates a future associated with the specified event sink that can be
/// `await`ed until the specified predicate is satisfied, or until the
/// specified timeout future completes.
fn new(wait_set: &'a WaitSet, predicate: F, deadline: D) -> Self {
Self {
wait_until: WaitUntil::new(wait_set, predicate),
deadline,
}
}
}

impl<'a, F, T, D> Future for WaitUntilOrTimeout<'a, F, T, D>
where
F: FnMut() -> Option<T>,
D: Future<Output = ()>,
{
type Output = Option<T>;

#[inline]
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let this = self.project();

if let Poll::Ready(value) = Pin::new(this.wait_until).poll(cx) {
Poll::Ready(Some(value))
} else if this.deadline.poll(cx).is_ready() {
Poll::Ready(None)
} else {
Poll::Pending
}
}
}

/// A set of notifiers.
///
/// The set wraps a Mutex-protected list of notifiers and manages a flag for
Expand Down
Loading