Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add tokio02-style mpsc sender #3105

Closed
wants to merge 30 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
30 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions tokio-util/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ codec = ["tokio/stream"]
time = ["tokio/time","slab"]
io = []
rt = ["tokio/rt"]
sync = ["tokio/sync"]

[dependencies]
tokio = { version = "0.3.0", path = "../tokio" }
Expand Down
9 changes: 9 additions & 0 deletions tokio-util/src/cfg.rs
Original file line number Diff line number Diff line change
Expand Up @@ -47,3 +47,12 @@ macro_rules! cfg_rt {
)*
}
}
macro_rules! cfg_sync {
($($item:item)*) => {
$(
#[cfg(feature = "sync")]
#[cfg_attr(docsrs, doc(cfg(feature = "sync")))]
$item
)*
}
}
2 changes: 2 additions & 0 deletions tokio-util/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,8 @@ pub mod sync;

pub mod either;

pub mod pollify;

#[cfg(feature = "time")]
pub mod time;

Expand Down
192 changes: 192 additions & 0 deletions tokio-util/src/pollify.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,192 @@
//! Low-level utilities for `poll`-ifying `async fn`-based apis.

use std::{
fmt::{Debug, Formatter, Result as FmtResult},
future::Future,
marker::PhantomPinned,
pin::Pin,
task::{Context, Poll},
};

/// Type that has `async fn`.
/// `Pollify` can wrap instance of such type and
/// provide `poll`-based api.
pub trait AsyncOp {
/// Future returned by this async fn.
/// Can be selected as `Box<dyn std::future::Future<Output=/*return type*/>>`
type Fut: Future;

/// Creates a future.
/// # Safety
/// This function is unsafe to call
/// Calling `op` exclusively borrows for the whole lifetime
/// of returned future. This will be expressed using generic associated
/// types which are not stable yet.
unsafe fn start_operation(&mut self) -> Self::Fut;
}

enum State<F: Future> {
/// We have neither future nor its result.
Empty,
/// We have a ready output.
// ALERT: this is self-reference to the `inner`.
Ready(F::Output),
/// We have a future in progress
// ALERT: contained future references both itself and `inner`.
Progress(F),
}

enum StateKind {
Empty,
Ready,
}

impl<F: Future> State<F> {
fn is_in_progress(&self) -> bool {
matches!(self, State::Progress(_))
}

fn pinned_take(self: Pin<&mut Self>) -> Result<Self, Pin<&mut F>> {
if self.is_in_progress() {
unsafe {
Err(self.map_unchecked_mut(|this| match this {
State::Progress(f) => f,
_ => unreachable!(),
}))
}
} else {
let this = unsafe { Pin::into_inner_unchecked(self) };
Ok(std::mem::replace(this, Self::Empty))
}
}

fn reset(mut self: Pin<&mut Self>) -> StateKind {
if self.is_in_progress() {
self.set(State::Empty);
StateKind::Empty
} else {
let this = unsafe { Pin::into_inner_unchecked(self.as_mut()) };
let kind = match this {
State::Empty => StateKind::Empty,
State::Ready(_) => StateKind::Ready,
State::Progress(_) => unreachable!(),
};
self.set(State::Empty);
kind
}
}
}

impl<F: Future> Debug for State<F> {
fn fmt(&self, f: &mut Formatter<'_>) -> FmtResult {
match self {
State::Empty => f.debug_tuple("Empty").finish(),
State::Ready(_) => f.debug_tuple("Ready").field(&"_").finish(),
State::Progress(_) => f.debug_tuple("Progress").field(&"_").finish(),
}
}
}

/// Wraps a type with `async fn` and exposes `poll` for it.
pub struct Pollify<A: AsyncOp> {
inner: A,
state: State<A::Fut>,
_pinned: PhantomPinned,
}

impl<A: AsyncOp + Debug> Debug for Pollify<A> {
fn fmt(&self, f: &mut Formatter<'_>) -> FmtResult {
f.debug_struct("Pollify")
.field("inner", &self.inner)
.field("state", &self.state)
.finish()
}
}

impl<A: AsyncOp> Pollify<A> {
/// State must be pinned when it is in `Progress` state.
fn pin_project_state(self: Pin<&mut Self>) -> Pin<&mut State<A::Fut>> {
unsafe { self.map_unchecked_mut(|this| &mut this.state) }
}

/// Inner must be pinned because state can contain references to it
fn pin_project_inner(self: Pin<&mut Self>) -> Pin<&mut A> {
unsafe { self.map_unchecked_mut(|this| &mut this.inner) }
}
}

impl<A: AsyncOp> Pollify<A> {
/// Wraps a [tokio sender](tokio::sync::mpsc::Sender).
pub fn new(inner: A) -> Self {
Pollify {
inner,
state: State::Empty,
_pinned: PhantomPinned,
}
}

/// Returns sender readiness state
pub fn is_ready(&self) -> bool {
matches!(self.state, State::Ready(_))
}

/// Extracts operation result. Pollify is now in empty state
/// and can be polled again.
/// # Panics
/// This method panics if the `Sender` is not ready.
pub fn extract(self: Pin<&mut Self>) -> <A::Fut as Future>::Output {
match self.pin_project_state().pinned_take() {
Ok(State::Ready(out)) => out,
_ => panic!("extract() called, but no output available"),
}
}

/// Returns reference to future result.
pub fn output_ref(&self) -> &<A::Fut as Future>::Output {
match &self.state {
State::Ready(output) => output,
_ => panic!("output_ref() called, but no output available"),
}
}

/// Cancels operation. Pollify is now in empty state
/// and can be polled again.
///
/// Returns true if before the call operation was completed.
pub fn cancel(mut self: Pin<&mut Self>) -> bool {
let was_ready = matches!(self.as_mut().pin_project_state().reset(), StateKind::Ready);
was_ready
}

/// Polls current async operation, starting one if necessary.
///
/// This function can not be called when the `Pollify` is ready.
pub fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<()> {
// let state = unsafe { Pin::into_inner_unchecked(self.as_mut().pin_project_state()) };

let fut = match self.as_mut().pin_project_state().pinned_take() {
Ok(State::Ready(_)) => panic!("poll_ready() must not be called on ready sender"),
Ok(State::Progress(_)) => unreachable!(),
Ok(State::Empty) => {
let inner = self.as_mut().pin_project_inner();
let fut = unsafe { Pin::into_inner_unchecked(inner).start_operation() };
self.as_mut().pin_project_state().set(State::Progress(fut));
self.as_mut().pin_project_state().pinned_take().unwrap_err()
}
Err(fut) => fut,
};
let poll = fut.poll(cx);
match poll {
Poll::Pending => Poll::Pending,
Poll::Ready(output) => {
self.pin_project_state().set(State::Ready(output));
Poll::Ready(())
}
}
}

/// Returns reference to wrapped value
pub fn inner(&self) -> &A {
&self.inner
}
}
9 changes: 9 additions & 0 deletions tokio-util/src/sync/mod.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,15 @@
//! Synchronization primitives

cfg_sync! {
mod mpsc;
}
#[cfg(feature = "sync")]
pub use mpsc::Sender;

mod cancellation_token;
pub use cancellation_token::{CancellationToken, WaitForCancellationFuture};

mod intrusive_double_linked_list;

#[cfg(test)]
mod tests;
142 changes: 142 additions & 0 deletions tokio-util/src/sync/mpsc.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,142 @@
//! Tokio-02 style MPSC channel.
use crate::pollify::Pollify;
use std::{
future::Future,
marker::PhantomData,
pin::Pin,
task::{Context, Poll},
};
use tokio::sync::mpsc::{error::SendError, Permit};

/// Wrapper for Box<dyn Future> conditionally implementing Send and Sync
struct FutureBox<'a, T, M>(Pin<Box<dyn Future<Output = T> + 'a>>, PhantomData<Box<M>>);

impl<'a, T, M> Future for FutureBox<'a, T, M> {
type Output = T;
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
Pin::new(&mut Pin::into_inner(self).0).as_mut().poll(cx)
}
}

unsafe impl<'a, T, M> Send for FutureBox<'a, T, M> where M: Send {}
unsafe impl<'a, T, M> Sync for FutureBox<'a, T, M> where M: Sync {}

#[derive(Debug)]
struct Chan<'a, T>(tokio::sync::mpsc::Sender<T>, PhantomData<&'a ()>);
impl<'a, T: 'a> crate::pollify::AsyncOp for Chan<'a, T> {
type Fut = FutureBox<'a, AcquireFutOutput<'a, T>, T>;

unsafe fn start_operation(&mut self) -> Self::Fut {
// SAFETY: guaranteed by AsyncOp invariant.
let long_lived_sender = std::mem::transmute::<
&mut tokio::sync::mpsc::Sender<T>,
&'a mut tokio::sync::mpsc::Sender<T>,
>(&mut self.0);
FutureBox(Box::pin(long_lived_sender.reserve()), PhantomData)
}
}

/// Fat sender with `poll_ready` support.
#[derive(Debug)]
pub struct Sender<'a, T: Send + 'a> {
used_in_poll_ready: PhantomData<&'a ()>,
pollify: Pollify<Chan<'a, T>>,
}

impl<'a, T: Send + 'a> Clone for Sender<'a, T> {
fn clone(&self) -> Self {
Self::new(self.pollify.inner().0.clone())
}
}

type AcquireFutOutput<'a, T> = Result<Permit<'a, T>, SendError<()>>;

impl<'a, T: Send + 'a> Sender<'a, T> {
/// Wraps a [tokio sender](tokio::sync::mpsc::Sender).
pub fn new(inner: tokio::sync::mpsc::Sender<T>) -> Self {
let chan = Chan(inner, PhantomData);
Sender {
pollify: Pollify::new(chan),
used_in_poll_ready: PhantomData,
}
}

fn pin_project_inner(self: Pin<&mut Self>) -> Pin<&mut Pollify<Chan<'a, T>>> {
unsafe { self.map_unchecked_mut(|this| &mut this.pollify) }
}

/// Returns sender readiness state
pub fn is_ready(&self) -> bool {
self.pollify.is_ready()
}

/// Sends a message.
///
/// This method panics if the `Sender` is not ready.
pub fn send(self: Pin<&mut Self>, value: T) {
if !self.pollify.is_ready() {
panic!("called send() on non-ready Sender")
}
let permit = self
.pin_project_inner()
.extract()
.expect("poll_ready would handle Err");
permit.send(value);
}

/// Disarm permit. This releases the reserved slot in the bounded channel.
/// If acquire was in progress, it is aborted.
///
/// Returns true if before the call this sender owned a permit.
pub fn disarm(self: Pin<&mut Self>) -> bool {
self.pin_project_inner().cancel()
}

/// Tries to acquire a permit.
///
/// This function can not be called when the `Sender` is ready.
pub fn poll_ready(
self: Pin<&mut Self>,
cx: &mut Context<'_>,
) -> Poll<Result<(), SendError<()>>> {
let mut inner = self.pin_project_inner();
match inner.as_mut().poll(cx) {
Poll::Ready(()) => {
if inner.as_mut().output_ref().is_ok() {
Poll::Ready(Ok(()))
} else {
let output = inner.extract();
Poll::Ready(Err(output.unwrap_err()))
}
}
Poll::Pending => Poll::Pending,
}
}
}

#[cfg(test)]
mod _props {
use super::*;
fn _verify_not_unpin<U: Send>(x: Sender<'_, U>) {
trait Foo {
fn is_ready(&self) -> bool;
}

impl<T: Unpin> Foo for T {
fn is_ready(&self) -> bool {
false
}
}

assert!(x.is_ready());
}

fn _verify_send<U: Send>(x: Sender<'_, U>) {
fn inner(_x: impl Send) {}
inner(x)
}
fn _verify_sync<U: Send + Sync>(x: Sender<'_, U>) {
fn inner(_x: impl Sync) {}
inner(x)
}
}
3 changes: 2 additions & 1 deletion tokio-util/src/sync/tests/mod.rs
Original file line number Diff line number Diff line change
@@ -1 +1,2 @@

#[cfg(feature = "sync")]
mod mpsc;
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It seems that cancellation token tests are just ignored currently.

Loading