Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add Sender::closed future #102

Open
wants to merge 4 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
77 changes: 77 additions & 0 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,9 @@ struct Channel<T> {
/// Stream operations while the channel is empty and not closed.
stream_ops: Event,

/// Closed operations while the channel is not closed.
closed_ops: Event,

/// The number of currently active `Sender`s.
sender_count: AtomicUsize,

Expand All @@ -89,6 +92,7 @@ impl<T> Channel<T> {
// Notify all receive and stream operations.
self.recv_ops.notify(usize::MAX);
self.stream_ops.notify(usize::MAX);
self.closed_ops.notify(usize::MAX);

true
} else {
Expand Down Expand Up @@ -128,6 +132,7 @@ pub fn bounded<T>(cap: usize) -> (Sender<T>, Receiver<T>) {
send_ops: Event::new(),
recv_ops: Event::new(),
stream_ops: Event::new(),
closed_ops: Event::new(),
sender_count: AtomicUsize::new(1),
receiver_count: AtomicUsize::new(1),
});
Expand Down Expand Up @@ -169,6 +174,7 @@ pub fn unbounded<T>() -> (Sender<T>, Receiver<T>) {
send_ops: Event::new(),
recv_ops: Event::new(),
stream_ops: Event::new(),
closed_ops: Event::new(),
sender_count: AtomicUsize::new(1),
receiver_count: AtomicUsize::new(1),
});
Expand Down Expand Up @@ -258,6 +264,29 @@ impl<T> Sender<T> {
})
}

/// Completes when all receiver have dropped.
///
/// This allows the producers to get notified when interest in the produced values is canceled and immediately stop doing work.
///
/// # Examples
///
/// ```
/// # futures_lite::future::block_on(async {
/// use async_channel::{unbounded, SendError};
///
/// let (s, r) = unbounded::<i32>();
/// drop(r);
/// s.closed().await;
/// # });
/// ```
pub fn closed(&self) -> Closed<'_, T> {
Closed::_new(ClosedInner {
sender: self,
listener: None,
_pin: PhantomPinned,
})
}

/// Sends a message into this channel using the blocking strategy.
///
/// If the channel is full, this method will block until there is room.
Expand Down Expand Up @@ -1280,6 +1309,54 @@ impl<'a, T> EventListenerFuture for RecvInner<'a, T> {
}
}

easy_wrapper! {
/// A future returned by [`Receiver::recv()`].
#[derive(Debug)]
#[must_use = "futures do nothing unless you `.await` or poll them"]
pub struct Closed<'a, T>(ClosedInner<'a, T> => ());
#[cfg(all(feature = "std", not(target_family = "wasm")))]
pub(crate) wait();
}

pin_project! {
#[derive(Debug)]
#[project(!Unpin)]
struct ClosedInner<'a, T> {
// Reference to the sender.
sender: &'a Sender<T>,

// Listener waiting on the channel.
listener: Option<EventListener>,

// Keeping this type `!Unpin` enables future optimizations.
#[pin]
_pin: PhantomPinned
}
}

impl<'a, T> EventListenerFuture for ClosedInner<'a, T> {
type Output = ();

/// Run this future with the given `Strategy`.
fn poll_with_strategy<'x, S: Strategy<'x>>(
self: Pin<&mut Self>,
strategy: &mut S,
cx: &mut S::Context,
) -> Poll<()> {
let this = self.project();

// Check if the channel is closed.
if !this.sender.is_closed() {
// Channel is not closed yet - now start listening for notifications.
*this.listener = Some(this.sender.channel.closed_ops.listen());

// Poll using the given strategy
ready!(S::poll(strategy, &mut *this.listener, cx));
}
Poll::Ready(())
}
}

#[cfg(feature = "std")]
use std::process::abort;

Expand Down
23 changes: 23 additions & 0 deletions tests/bounded.rs
Original file line number Diff line number Diff line change
Expand Up @@ -184,6 +184,29 @@ fn send() {
.run();
}

#[cfg(not(target_family = "wasm"))]
#[test]
fn closed() {
let (s, r) = bounded(1);

Parallel::new()
.add(|| {
future::block_on(s.send(7)).unwrap();
let before = s.closed();
let mut before = std::pin::pin!(before);
assert!(future::block_on(future::poll_once(&mut before)).is_none());
sleep(ms(1000));
assert_eq!(future::block_on(future::poll_once(before)), Some(()));
assert_eq!(future::block_on(future::poll_once(s.closed())), Some(()));
})
.add(|| {
assert_eq!(future::block_on(r.recv()), Ok(7));
sleep(ms(500));
drop(r);
})
.run();
}

#[cfg(not(target_family = "wasm"))]
#[test]
fn force_send() {
Expand Down