Skip to content

Commit

Permalink
executor: add TypedExecutor (#993)
Browse files Browse the repository at this point in the history
Adds a `TypedExecutor` trait that describes how to spawn futures of a specific
type. This is useful for implementing functions that are generic over an executor
and wish to support both `Send` and `!Send` cases.
  • Loading branch information
carllerche authored Mar 21, 2019
1 parent cdde2e7 commit b1172f8
Show file tree
Hide file tree
Showing 19 changed files with 499 additions and 214 deletions.
3 changes: 3 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -21,3 +21,6 @@ members = [
"tokio-udp",
"tokio-uds",
]

[patch.crates-io]
tokio-executor = { path = "tokio-executor" }
16 changes: 16 additions & 0 deletions ci/azure-patch-crates.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
steps:
- script: |
set -e
# Remove any existing patch statements
mv Cargo.toml Cargo.toml.bck
sed -n '/\[patch.crates-io\]/q;p' Cargo.toml.bck > Cargo.toml
# Patch all crates
cat ci/patch.toml >> Cargo.toml
# Print `Cargo.toml` for debugging
echo "~~~~ Cargo.toml ~~~~"
cat Cargo.toml
echo "~~~~~~~~~~~~~~~~~~~~"
displayName: Patch Cargo.toml
8 changes: 1 addition & 7 deletions ci/azure-test-stable.yml
Original file line number Diff line number Diff line change
Expand Up @@ -27,13 +27,7 @@ jobs:
displayName: cargo test -p ${{ crate }}
workingDirectory: $(Build.SourcesDirectory)/${{ crate }}

- script: |
set -e
cat ci/patch.toml >> Cargo.toml
echo "~~~~ Cargo.toml ~~~~"
cat Cargo.toml
echo "~~~~~~~~~~~~~~~~~~~~"
displayName: Patch Cargo.toml
- template: azure-patch-crates.yml

- ${{ each crate in parameters.crates }}:
- script: cargo test
Expand Down
6 changes: 1 addition & 5 deletions ci/azure-tsan.yml
Original file line number Diff line number Diff line change
Expand Up @@ -14,14 +14,10 @@ jobs:
parameters:
rust_version: nightly-2018-11-18

- template: azure-patch-crates.yml
- script: |
set -e
cat ci/patch.toml >> Cargo.toml
echo "~~~~ Cargo.toml ~~~~"
cat Cargo.toml
echo "~~~~~~~~~~~~~~~~~~~~"
# Make sure the benchmarks compile
export ASAN_OPTIONS="detect_odr_violation=0 detect_leaks=0"
export TSAN_OPTIONS="suppressions=`pwd`/ci/tsan"
Expand Down
2 changes: 1 addition & 1 deletion tokio-current-thread/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -21,5 +21,5 @@ keywords = ["futures", "tokio"]
categories = ["concurrency", "asynchronous"]

[dependencies]
tokio-executor = "0.1.5"
tokio-executor = { version = "0.1.5", path = "../tokio-executor" }
futures = "0.1.19"
19 changes: 19 additions & 0 deletions tokio-current-thread/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -431,6 +431,16 @@ impl tokio_executor::Executor for CurrentThread {
}
}

impl<T> tokio_executor::TypedExecutor<T> for CurrentThread
where
T: Future<Item = (), Error = ()> + 'static,
{
fn spawn(&mut self, future: T) -> Result<(), SpawnError> {
self.borrow().spawn_local(Box::new(future), false);
Ok(())
}
}

impl<P: Park> fmt::Debug for CurrentThread<P> {
fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
fmt.debug_struct("CurrentThread")
Expand Down Expand Up @@ -742,6 +752,15 @@ impl tokio_executor::Executor for TaskExecutor {
}
}

impl<F> tokio_executor::TypedExecutor<F> for TaskExecutor
where
F: Future<Item = (), Error = ()> + 'static,
{
fn spawn(&mut self, future: F) -> Result<(), SpawnError> {
self.spawn_local(Box::new(future))
}
}

impl<F> Executor<F> for TaskExecutor
where
F: Future<Item = (), Error = ()> + 'static,
Expand Down
3 changes: 3 additions & 0 deletions tokio-executor/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -23,3 +23,6 @@ categories = ["concurrency", "asynchronous"]
[dependencies]
crossbeam-utils = "0.6.2"
futures = "0.1.19"

[dev-dependencies]
tokio = { version = "0.1.17", path = "../tokio" }
50 changes: 50 additions & 0 deletions tokio-executor/src/error.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
use std::error::Error;
use std::fmt;

/// Errors returned by `Executor::spawn`.
///
/// Spawn errors should represent relatively rare scenarios. Currently, the two
/// scenarios represented by `SpawnError` are:
///
/// * An executor being at capacity or full. As such, the executor is not able
/// to accept a new future. This error state is expected to be transient.
/// * An executor has been shutdown and can no longer accept new futures. This
/// error state is expected to be permanent.
#[derive(Debug)]
pub struct SpawnError {
is_shutdown: bool,
}

impl SpawnError {
/// Return a new `SpawnError` reflecting a shutdown executor failure.
pub fn shutdown() -> Self {
SpawnError { is_shutdown: true }
}

/// Return a new `SpawnError` reflecting an executor at capacity failure.
pub fn at_capacity() -> Self {
SpawnError { is_shutdown: false }
}

/// Returns `true` if the error reflects a shutdown executor failure.
pub fn is_shutdown(&self) -> bool {
self.is_shutdown
}

/// Returns `true` if the error reflects an executor at capacity failure.
pub fn is_at_capacity(&self) -> bool {
!self.is_shutdown
}
}

impl fmt::Display for SpawnError {
fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
write!(fmt, "{}", self.description())
}
}

impl Error for SpawnError {
fn description(&self) -> &str {
"attempted to spawn task while the executor is at capacity or shut down"
}
}
151 changes: 151 additions & 0 deletions tokio-executor/src/executor.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,151 @@
use futures::Future;
use SpawnError;

/// A value that executes futures.
///
/// The [`spawn`] function is used to submit a future to an executor. Once
/// submitted, the executor takes ownership of the future and becomes
/// responsible for driving the future to completion.
///
/// The strategy employed by the executor to handle the future is less defined
/// and is left up to the `Executor` implementation. The `Executor` instance is
/// expected to call [`poll`] on the future once it has been notified, however
/// the "when" and "how" can vary greatly.
///
/// For example, the executor might be a thread pool, in which case a set of
/// threads have already been spawned up and the future is inserted into a
/// queue. A thread will acquire the future and poll it.
///
/// The `Executor` trait is only for futures that **are** `Send`. These are most
/// common. There currently is no trait that describes executors that operate
/// entirely on the current thread (i.e., are able to spawn futures that are not
/// `Send`). Note that single threaded executors can still implement `Executor`,
/// but only futures that are `Send` can be spawned via the trait.
///
/// This trait is primarily intended to implemented by executors and used to
/// back `tokio::spawn`. Libraries and applications **may** use this trait to
/// bound generics, but doing so will limit usage to futures that implement
/// `Send`. Instead, libraries and applications are recommended to use
/// [`TypedExecutor`] as a bound.
///
/// # Errors
///
/// The [`spawn`] function returns `Result` with an error type of `SpawnError`.
/// This error type represents the reason that the executor was unable to spawn
/// the future. The two current represented scenarios are:
///
/// * An executor being at capacity or full. As such, the executor is not able
/// to accept a new future. This error state is expected to be transient.
/// * An executor has been shutdown and can no longer accept new futures. This
/// error state is expected to be permanent.
///
/// If a caller encounters an at capacity error, the caller should try to shed
/// load. This can be as simple as dropping the future that was spawned.
///
/// If the caller encounters a shutdown error, the caller should attempt to
/// gracefully shutdown.
///
/// # Examples
///
/// ```rust
/// # extern crate futures;
/// # extern crate tokio_executor;
/// # use tokio_executor::Executor;
/// # fn docs(my_executor: &mut Executor) {
/// use futures::future::lazy;
/// my_executor.spawn(Box::new(lazy(|| {
/// println!("running on the executor");
/// Ok(())
/// }))).unwrap();
/// # }
/// # fn main() {}
/// ```
///
/// [`spawn`]: #tymethod.spawn
/// [`poll`]: https://docs.rs/futures/0.1/futures/future/trait.Future.html#tymethod.poll
/// [`TypedExecutor`]: ../trait.TypedExecutor.html
pub trait Executor {
/// Spawns a future object to run on this executor.
///
/// `future` is passed to the executor, which will begin running it. The
/// future may run on the current thread or another thread at the discretion
/// of the `Executor` implementation.
///
/// # Panics
///
/// Implementations are encouraged to avoid panics. However, panics are
/// permitted and the caller should check the implementation specific
/// documentation for more details on possible panics.
///
/// # Examples
///
/// ```rust
/// # extern crate futures;
/// # extern crate tokio_executor;
/// # use tokio_executor::Executor;
/// # fn docs(my_executor: &mut Executor) {
/// use futures::future::lazy;
/// my_executor.spawn(Box::new(lazy(|| {
/// println!("running on the executor");
/// Ok(())
/// }))).unwrap();
/// # }
/// # fn main() {}
/// ```
fn spawn(
&mut self,
future: Box<Future<Item = (), Error = ()> + Send>,
) -> Result<(), SpawnError>;

/// Provides a best effort **hint** to whether or not `spawn` will succeed.
///
/// This function may return both false positives **and** false negatives.
/// If `status` returns `Ok`, then a call to `spawn` will *probably*
/// succeed, but may fail. If `status` returns `Err`, a call to `spawn` will
/// *probably* fail, but may succeed.
///
/// This allows a caller to avoid creating the task if the call to `spawn`
/// has a high likelihood of failing.
///
/// # Panics
///
/// This function must not panic. Implementers must ensure that panics do
/// not happen.
///
/// # Examples
///
/// ```rust
/// # extern crate futures;
/// # extern crate tokio_executor;
/// # use tokio_executor::Executor;
/// # fn docs(my_executor: &mut Executor) {
/// use futures::future::lazy;
///
/// if my_executor.status().is_ok() {
/// my_executor.spawn(Box::new(lazy(|| {
/// println!("running on the executor");
/// Ok(())
/// }))).unwrap();
/// } else {
/// println!("the executor is not in a good state");
/// }
/// # }
/// # fn main() {}
/// ```
fn status(&self) -> Result<(), SpawnError> {
Ok(())
}
}

impl<E: Executor + ?Sized> Executor for Box<E> {
fn spawn(
&mut self,
future: Box<Future<Item = (), Error = ()> + Send>,
) -> Result<(), SpawnError> {
(**self).spawn(future)
}

fn status(&self) -> Result<(), SpawnError> {
(**self).status()
}
}
13 changes: 13 additions & 0 deletions tokio-executor/src/global.rs
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,19 @@ impl super::Executor for DefaultExecutor {
}
}

impl<T> super::TypedExecutor<T> for DefaultExecutor
where
T: Future<Item = (), Error = ()> + Send + 'static,
{
fn spawn(&mut self, future: T) -> Result<(), SpawnError> {
super::Executor::spawn(self, Box::new(future))
}

fn status(&self) -> Result<(), SpawnError> {
super::Executor::status(self)
}
}

impl<T> future::Executor<T> for DefaultExecutor
where
T: Future<Item = (), Error = ()> + Send + 'static,
Expand Down
Loading

0 comments on commit b1172f8

Please sign in to comment.