Skip to content

Commit

Permalink
Documentation dump
Browse files Browse the repository at this point in the history
  • Loading branch information
h33p committed Dec 6, 2023
1 parent 3a793ce commit f971465
Show file tree
Hide file tree
Showing 13 changed files with 469 additions and 36 deletions.
2 changes: 2 additions & 0 deletions mfio/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -44,10 +44,12 @@ windows-sys = { version = "0.48", features = ["Win32_System_Threading", "Win32_F

[dev-dependencies]
tokio = { version = "1.24", features = ["rt", "macros", "rt-multi-thread"] }
smol = "1"
criterion = { version = "0.5", git = "https://github.com/h33p/criterion.rs", branch = "tput2", features = ["async_tokio", "async_smol", "async_futures"] }
pollster = "0.2"
bytemuck = { version = "1", features = ["derive"] }

[features]
default = ["std", "http"]
std = ["parking_lot"]
cglue-trait = []
38 changes: 38 additions & 0 deletions mfio/src/backend/fd.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,10 @@
//! File descriptor based waker.
//!
//! [`FdWaker`] allows one to wake a runtime by pushing a write operation to the underlying file
//! descriptor, be it a pipe, eventfd, or anything else that can be pollable for readability.
//!
//! Create a [`FdWakerOwner`] from a [`AsRawFd`] object to allow controlling the waker properties.
use core::mem::ManuallyDrop;
use core::sync::atomic::{AtomicU8, Ordering};
use core::task::{RawWaker, RawWakerVTable, Waker};
Expand All @@ -6,6 +13,37 @@ use std::io::{ErrorKind, Write};
use std::os::fd::{AsRawFd, FromRawFd, IntoRawFd};
use tarc::{Arc, BaseArc};

/// Owner of [`FdWaker`]s.
///
/// When this type gets dropped, the underlying file descriptor gets closed and released. This
/// effectively breaks all remaining wakers, however, the references to them stay valid.
///
/// # Examples
///
/// Poll for the pipe to become readable:
///
/// ```
/// use mfio::backend::fd::FdWakerOwner;
/// use nix::poll::*;
///
/// let (wake_read, wake_write) = nix::unistd::pipe().unwrap();
///
/// let waker_owner = FdWakerOwner::from(wake_write);
///
/// std::thread::spawn({
/// let waker = waker_owner.clone().into_waker();
/// move || {
/// std::thread::sleep(std::time::Duration::from_millis(500));
/// waker.wake();
/// }
/// });
///
/// let mut fd = [PollFd::new(wake_read, PollFlags::POLLIN)];
/// assert_ne!(0, poll(&mut fd[..], 5000).unwrap());
///
/// // Let's verify that we did indeed get woken up.
/// assert!(fd[0].revents().unwrap().contains(PollFlags::POLLIN));
/// ```
#[repr(transparent)]
pub struct FdWakerOwner<F: AsRawFd>(FdWaker<F>);

Expand Down
35 changes: 35 additions & 0 deletions mfio/src/backend/integrations/async_io.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,40 @@ use std::os::fd::BorrowedFd;
use super::super::*;
use super::{BorrowingFn, Integration};

/// async-io integration.
///
/// Unlike [`Null`], this integration supports backends with polling handles, however, only
/// async-io based runtimes are supported, such as smol and async_std.
///
/// Internally, this uses async-io's [`Async`] to wait for readiness of the polling FD, which means
/// only unix platforms are supported.
///
/// # Examples
///
/// Using the integration with smol:
///
/// ```
/// # mod sample {
/// # include!("../../sample.rs");
/// # }
/// # use sample::SampleIo;
/// use mfio::prelude::v1::*;
///
/// # #[cfg(all(unix, not(miri)))]
/// smol::block_on(async {
/// let mut handle = SampleIo::new(vec![1, 2, 3, 4]);
///
/// // Run the integration. Prefer to use `run_with_mut`, so that panics can be avoided.
/// AsyncIo::run_with_mut(&mut handle, |handle| async move {
/// // Read value
/// let val = handle.read(0).await.unwrap();
/// assert_eq!(1u8, val);
/// })
/// .await
/// });
/// # #[cfg(not(all(unix, not(miri))))]
/// # fn main() {}
/// ```
#[derive(Clone, Copy, Default)]
pub struct AsyncIo;

Expand Down Expand Up @@ -37,6 +71,7 @@ enum AsyncIoState<'a, B: IoBackend + ?Sized + 'a, Func, F> {
Finished,
}

#[doc(hidden)]
pub struct AsyncIoImpl<'a, B: LinksIoBackend + 'a, Func, F> {
backend: B,
state: AsyncIoState<'a, B::Link, Func, F>,
Expand Down
29 changes: 29 additions & 0 deletions mfio/src/backend/integrations/null.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,34 @@
use super::super::*;
use super::{BorrowingFn, Integration};

/// Minimal integration.
///
/// This integration works in all async runtimes, however, it does not support the backend's
/// `PollingHandle`. If the backend returns `Some(handle)`, then this integration panics.
///
/// # Examples
///
/// Running with `pollster`:
///
/// ```
/// # mod sample {
/// # include!("../../sample.rs");
/// # }
/// # use sample::SampleIo;
/// use mfio::prelude::v1::*;
///
/// pollster::block_on(async {
/// let mut handle = SampleIo::new(vec![1, 2, 3, 4]);
///
/// // Run the integration. Prefer to use `run_with_mut`, so that panics can be avoided.
/// Null::run_with_mut(&mut handle, |handle| async move {
/// // Read value
/// let val = handle.read(0).await.unwrap();
/// assert_eq!(1u8, val);
/// })
/// .await
/// });
/// ```
#[derive(Clone, Copy, Default)]
pub struct Null;

Expand All @@ -31,6 +59,7 @@ enum NullState<'a, B: IoBackend + ?Sized + 'a, Func, F> {
Finished,
}

#[doc(hidden)]
pub struct NullImpl<'a, B: LinksIoBackend + 'a, Func, F> {
backend: B,
state: NullState<'a, B::Link, Func, F>,
Expand Down
34 changes: 34 additions & 0 deletions mfio/src/backend/integrations/tokio.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,39 @@ use tokio::io::{unix::AsyncFd, Interest};
use super::super::*;
use super::{BorrowingFn, Integration};

/// Tokio integration.
///
/// Unlike [`Null`], this integration supports backends with polling handles, however, only tokio
/// runtime is supported.
///
/// Internally, this uses tokio's [`AsyncFd`] to wait for readiness of the polling handle, which
/// means only unix platforms are supported.
///
/// # Examples
///
/// ```
/// # mod sample {
/// # include!("../../sample.rs");
/// # }
/// # use sample::SampleIo;
/// use mfio::prelude::v1::*;
///
/// #[tokio::main]
/// # #[cfg(all(unix, not(miri)))]
/// async fn main() {
/// let mut handle = SampleIo::new(vec![1, 2, 3, 4]);
///
/// // Run the integration. Prefer to use `run_with_mut`, so that panics can be avoided.
/// Tokio::run_with_mut(&mut handle, |handle| async move {
/// // Read value
/// let val = handle.read(0).await.unwrap();
/// assert_eq!(1u8, val);
/// })
/// .await
/// }
/// # #[cfg(not(all(unix, not(miri))))]
/// # fn main() {}
/// ```
#[derive(Clone, Copy, Default)]
pub struct Tokio;

Expand Down Expand Up @@ -42,6 +75,7 @@ enum TokioState<'a, B: IoBackend + ?Sized + 'a, Func, F> {
Finished,
}

#[doc(hidden)]
pub struct TokioImpl<'a, B: LinksIoBackend + 'a, Func, F> {
backend: B,
state: TokioState<'a, B::Link, Func, F>,
Expand Down
50 changes: 49 additions & 1 deletion mfio/src/backend/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,8 @@ pub mod handle;
#[cfg(all(windows, feature = "std"))]
pub mod windows;

// TODO: rename DefaultHandle to OsHandle, and get rid of Infallible one.

#[cfg(all(unix, feature = "std"))]
pub type DefaultHandle = RawFd;
#[cfg(all(windows, feature = "std"))]
Expand All @@ -53,6 +55,12 @@ struct NestedBackend {
release: unsafe extern "C" fn(*const ()),
}

/// Stores a backend.
///
/// This type is always stored on backends, and is acquired by users in [`IoBackend::get_backend`].
/// A backend can only be acquired once at a time, however, it does not matter who does it.
///
/// Once the backend is acquired, it can be used to drive I/O to completion.
#[repr(C)]
pub struct BackendContainer<B: ?Sized> {
nest: UnsafeCell<Option<NestedBackend>>,
Expand All @@ -64,8 +72,16 @@ unsafe impl<B: ?Sized + Send> Send for BackendContainer<B> {}
unsafe impl<B: ?Sized + Send> Sync for BackendContainer<B> {}

impl<B: ?Sized> BackendContainer<B> {
/// Acquire a backend.
///
/// This function locks the backend and the returned handle keeps it locked, until the handle
/// gets released.
///
/// # Panics
///
/// Panics if the backend has already been acquired.
pub fn acquire(&self, wake_flags: Option<Arc<AtomicU8>>) -> BackendHandle<B> {
if self.lock.swap(true, Ordering::Acquire) {
if self.lock.swap(true, Ordering::AcqRel) {
panic!("Tried to acquire backend twice!");
}

Expand All @@ -78,6 +94,12 @@ impl<B: ?Sized> BackendContainer<B> {
}
}

/// Acquires a backend in nested mode.
///
/// This function is useful when layered I/O backends are desirable. When polling, first, this
/// backend will be polled, and afterwards, the provided handle. The ordering is consistent
/// with the behavior of first polling the user's future, and then polling the backend. In the
/// end, backends will be peeled off layer by layer, until the innermost backend is reached.
pub fn acquire_nested<B2: ?Sized + Future<Output = ()>>(
&self,
mut handle: BackendHandle<B2>,
Expand Down Expand Up @@ -122,6 +144,7 @@ impl<B: ?Sized> BackendContainer<B> {
}

impl BackendContainer<DynBackend> {
/// Creates a new [`DynBackend`] container.
pub fn new_dyn<T: Future<Output = ()> + Send + 'static>(backend: T) -> Self {
Self {
backend: UnsafeCell::new(Box::pin(backend) as Pin<Box<dyn Future<Output = ()> + Send>>),
Expand All @@ -131,6 +154,14 @@ impl BackendContainer<DynBackend> {
}
}

/// Handle to a backend.
///
/// This handle can be used to drive arbitrary future to completion by attaching a backend to it.
/// This is typically done using [`WithBackend`] that is constructed in
/// [`IoBackendExt::with_backend`].
///
/// Usually, the user would want to bypass this type and use [`IoBackendExt::block_on`], or an
/// [`Integration`] equivalent.
pub struct BackendHandle<'a, B: ?Sized> {
owner: &'a BackendContainer<B>,
backend: Pin<&'a mut B>,
Expand Down Expand Up @@ -166,6 +197,13 @@ impl<'a, B: ?Sized> core::ops::DerefMut for BackendHandle<'a, B> {
}
}

/// Future combined with a backend.
///
/// This future can be used to drive arbitrary future to completion by attaching a backend to it.
/// Construct this type using [`IoBackendExt::with_backend`].
///
/// Usually, the user would want to bypass this type and use [`IoBackendExt::block_on`], or an
/// [`Integration`] equivalent.
pub struct WithBackend<'a, Backend: ?Sized, Fut: ?Sized> {
backend: BackendHandle<'a, Backend>,
future: Fut,
Expand Down Expand Up @@ -221,6 +259,12 @@ impl<'a, Backend: Future + ?Sized, Fut: Future + ?Sized> Future for WithBackend<
}
}

/// Cooperative polling handle.
///
/// This handle contains a handle and necessary metadata needed to cooperatively drive mfio code to
/// completion.
///
/// This handle is typically created on the [`IoBackend`] side.
pub struct PollingHandle<'a, Handle = DefaultHandle> {
pub handle: Handle,
pub cur_flags: &'a PollingFlags,
Expand Down Expand Up @@ -337,6 +381,9 @@ impl PollingFlags {
///
/// This trait is implemented at the outer-most stateful object of the I/O context. A `IoBackend`
/// has the opportunity to expose efficient ways of driving said backend to completion.
///
/// Users may want to call methods available on [`IoBackendExt`], instead of the ones on this
/// trait.
pub trait IoBackend<Handle: Pollable = DefaultHandle> {
type Backend: Future<Output = ()> + Send + ?Sized;

Expand All @@ -362,6 +409,7 @@ pub trait IoBackend<Handle: Pollable = DefaultHandle> {
fn get_backend(&self) -> BackendHandle<Self::Backend>;
}

/// Helpers for [`IoBackend`].
pub trait IoBackendExt<Handle: Pollable>: IoBackend<Handle> {
/// Builds a composite future that also polls the backend future.
///
Expand Down
Loading

0 comments on commit f971465

Please sign in to comment.