Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

net: add support for anonymous unix pipes #6127

Merged
merged 10 commits into from
Dec 30, 2023
281 changes: 253 additions & 28 deletions tokio/src/net/unix/pipe.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,8 @@ use crate::io::{AsyncRead, AsyncWrite, PollEvented, ReadBuf, Ready};
use mio::unix::pipe as mio_pipe;
use std::fs::File;
use std::io::{self, Read, Write};
use std::os::unix::fs::{FileTypeExt, OpenOptionsExt};
use std::os::unix::io::{AsFd, AsRawFd, BorrowedFd, FromRawFd, IntoRawFd, RawFd};
use std::os::unix::fs::OpenOptionsExt;
use std::os::unix::io::{AsFd, AsRawFd, BorrowedFd, FromRawFd, IntoRawFd, OwnedFd, RawFd};
use std::path::Path;
use std::pin::Pin;
use std::task::{Context, Poll};
Expand All @@ -16,6 +16,59 @@ cfg_io_util! {
use bytes::BufMut;
}

/// Creates a new anonymous Unix pipe.
///
/// This function will open a new pipe and associate both pipe ends with the default
/// event loop.
///
/// If you need to create a pipe for communication with a spawned process, you can
/// use [`Stdio::piped()`] instead.
///
/// [`Stdio::piped()`]: std::process::Stdio::piped
///
/// # Errors
///
/// If creating a pipe fails, this function will return with the related OS error.
///
/// # Examples
///
/// Create a pipe and pass the writing end to a spawned process.
///
/// ```no_run
/// use tokio::net::unix::pipe;
/// use tokio::process::Command;
/// # use tokio::io::AsyncReadExt;
/// # use std::error::Error;
///
/// # async fn dox() -> Result<(), Box<dyn Error>> {
/// let (tx, mut rx) = pipe::pipe()?;
/// let mut buffer = String::new();
///
/// let status = Command::new("echo")
/// .arg("Hello, world!")
/// .stdout(tx.into_blocking_fd()?)
/// .status();
/// rx.read_to_string(&mut buffer).await?;
///
/// assert!(status.await?.success());
/// assert_eq!(buffer, "Hello, world!\n");
/// # Ok(())
/// # }
/// ```
///
/// # Panics
///
/// This function panics if it is not called from within a runtime with
/// IO enabled.
///
/// The runtime is usually set implicitly when this function is called
/// from a future driven by a tokio runtime, otherwise runtime can be set
/// explicitly with [`Runtime::enter`](crate::runtime::Runtime::enter) function.
pub fn pipe() -> io::Result<(Sender, Receiver)> {
let (tx, rx) = mio_pipe::new()?;
Ok((Sender::from_mio(tx)?, Receiver::from_mio(rx)?))
}

/// Options and flags which can be used to configure how a FIFO file is opened.
///
/// This builder allows configuring how to create a pipe end from a FIFO file.
Expand Down Expand Up @@ -218,7 +271,7 @@ impl OpenOptions {

let file = options.open(path)?;

if !self.unchecked && !is_fifo(&file)? {
if !self.unchecked && !is_pipe(file.as_fd())? {
return Err(io::Error::new(io::ErrorKind::InvalidInput, "not a pipe"));
}

Expand Down Expand Up @@ -338,15 +391,40 @@ impl Sender {
/// The runtime is usually set implicitly when this function is called
/// from a future driven by a tokio runtime, otherwise runtime can be set
/// explicitly with [`Runtime::enter`](crate::runtime::Runtime::enter) function.
pub fn from_file(mut file: File) -> io::Result<Sender> {
if !is_fifo(&file)? {
pub fn from_file(file: File) -> io::Result<Sender> {
Sender::from_owned_fd(file.into())
}

/// Creates a new `Sender` from an [`OwnedFd`].
///
/// This function is intended to construct a pipe from an [`OwnedFd`] representing
/// an anonymous pipe or a special FIFO file. It will check if the file descriptor
/// is a pipe and has write access, set it in non-blocking mode and perform the
/// conversion.
///
/// # Errors
///
/// Fails with `io::ErrorKind::InvalidInput` if the file descriptor is not a pipe
/// or it does not have write access. Also fails with any standard OS error if it
/// occurs.
///
/// # Panics
///
/// This function panics if it is not called from within a runtime with
/// IO enabled.
///
/// The runtime is usually set implicitly when this function is called
/// from a future driven by a tokio runtime, otherwise runtime can be set
/// explicitly with [`Runtime::enter`](crate::runtime::Runtime::enter) function.
pub fn from_owned_fd(owned_fd: OwnedFd) -> io::Result<Sender> {
if !is_pipe(owned_fd.as_fd())? {
return Err(io::Error::new(io::ErrorKind::InvalidInput, "not a pipe"));
}

let flags = get_file_flags(&file)?;
let flags = get_file_flags(owned_fd.as_fd())?;
if has_write_access(flags) {
set_nonblocking(&mut file, flags)?;
Sender::from_file_unchecked(file)
set_nonblocking(owned_fd.as_fd(), flags)?;
Sender::from_owned_fd_unchecked(owned_fd)
} else {
Err(io::Error::new(
io::ErrorKind::InvalidInput,
Expand Down Expand Up @@ -394,8 +472,28 @@ impl Sender {
/// from a future driven by a tokio runtime, otherwise runtime can be set
/// explicitly with [`Runtime::enter`](crate::runtime::Runtime::enter) function.
pub fn from_file_unchecked(file: File) -> io::Result<Sender> {
let raw_fd = file.into_raw_fd();
let mio_tx = unsafe { mio_pipe::Sender::from_raw_fd(raw_fd) };
Sender::from_owned_fd_unchecked(file.into())
}

/// Creates a new `Sender` from an [`OwnedFd`] without checking pipe properties.
///
/// This function is intended to construct a pipe from an [`OwnedFd`] representing
/// an anonymous pipe or a special FIFO file. The conversion assumes nothing about
/// the underlying pipe; it is left up to the user to make sure that the file
/// descriptor represents the writing end of a pipe and the pipe is set in
/// non-blocking mode.
///
/// # Panics
///
/// This function panics if it is not called from within a runtime with
/// IO enabled.
///
/// The runtime is usually set implicitly when this function is called
/// from a future driven by a tokio runtime, otherwise runtime can be set
/// explicitly with [`Runtime::enter`](crate::runtime::Runtime::enter) function.
pub fn from_owned_fd_unchecked(owned_fd: OwnedFd) -> io::Result<Sender> {
// Safety: OwnedFd represents a valid, open file descriptor.
let mio_tx = unsafe { mio_pipe::Sender::from_raw_fd(owned_fd.into_raw_fd()) };
Sender::from_mio(mio_tx)
}

Expand Down Expand Up @@ -623,6 +721,31 @@ impl Sender {
.registration()
.try_io(Interest::WRITABLE, || (&*self.io).write_vectored(buf))
}

/// Converts the pipe into an [`OwnedFd`] in blocking mode.
///
/// This function will deregister this pipe end from the event loop, set
/// it in blocking mode and perform the conversion.
pub fn into_blocking_fd(self) -> io::Result<OwnedFd> {
let fd = self.into_nonblocking_fd()?;
set_blocking(&fd)?;
Ok(fd)
}

/// Converts the pipe into an [`OwnedFd`] in nonblocking mode.
///
/// This function will deregister this pipe end from the event loop and
/// perform the conversion. Returned file descriptor will be in nonblocking
/// mode.
Darksonn marked this conversation as resolved.
Show resolved Hide resolved
pub fn into_nonblocking_fd(self) -> io::Result<OwnedFd> {
let mio_pipe = self.io.into_inner()?;

// Safety: the pipe is now deregistered from the event loop
// and we are the only owner of this pipe end.
let owned_fd = unsafe { OwnedFd::from_raw_fd(mio_pipe.into_raw_fd()) };

Ok(owned_fd)
}
}

impl AsyncWrite for Sender {
Expand Down Expand Up @@ -764,15 +887,40 @@ impl Receiver {
/// The runtime is usually set implicitly when this function is called
/// from a future driven by a tokio runtime, otherwise runtime can be set
/// explicitly with [`Runtime::enter`](crate::runtime::Runtime::enter) function.
pub fn from_file(mut file: File) -> io::Result<Receiver> {
if !is_fifo(&file)? {
pub fn from_file(file: File) -> io::Result<Receiver> {
Receiver::from_owned_fd(file.into())
}

/// Creates a new `Receiver` from an [`OwnedFd`].
///
/// This function is intended to construct a pipe from an [`OwnedFd`] representing
/// an anonymous pipe or a special FIFO file. It will check if the file descriptor
/// is a pipe and has read access, set it in non-blocking mode and perform the
/// conversion.
///
/// # Errors
///
/// Fails with `io::ErrorKind::InvalidInput` if the file descriptor is not a pipe
/// or it does not have read access. Also fails with any standard OS error if it
/// occurs.
///
/// # Panics
///
/// This function panics if it is not called from within a runtime with
/// IO enabled.
///
/// The runtime is usually set implicitly when this function is called
/// from a future driven by a tokio runtime, otherwise runtime can be set
/// explicitly with [`Runtime::enter`](crate::runtime::Runtime::enter) function.
pub fn from_owned_fd(owned_fd: OwnedFd) -> io::Result<Receiver> {
if !is_pipe(owned_fd.as_fd())? {
return Err(io::Error::new(io::ErrorKind::InvalidInput, "not a pipe"));
}

let flags = get_file_flags(&file)?;
let flags = get_file_flags(owned_fd.as_fd())?;
if has_read_access(flags) {
set_nonblocking(&mut file, flags)?;
Receiver::from_file_unchecked(file)
set_nonblocking(owned_fd.as_fd(), flags)?;
Receiver::from_owned_fd_unchecked(owned_fd)
} else {
Err(io::Error::new(
io::ErrorKind::InvalidInput,
Expand Down Expand Up @@ -820,8 +968,28 @@ impl Receiver {
/// from a future driven by a tokio runtime, otherwise runtime can be set
/// explicitly with [`Runtime::enter`](crate::runtime::Runtime::enter) function.
pub fn from_file_unchecked(file: File) -> io::Result<Receiver> {
let raw_fd = file.into_raw_fd();
let mio_rx = unsafe { mio_pipe::Receiver::from_raw_fd(raw_fd) };
Receiver::from_owned_fd_unchecked(file.into())
}

/// Creates a new `Receiver` from an [`OwnedFd`] without checking pipe properties.
///
/// This function is intended to construct a pipe from an [`OwnedFd`] representing
/// an anonymous pipe or a special FIFO file. The conversion assumes nothing about
/// the underlying pipe; it is left up to the user to make sure that the file
/// descriptor represents the reading end of a pipe and the pipe is set in
/// non-blocking mode.
///
/// # Panics
///
/// This function panics if it is not called from within a runtime with
/// IO enabled.
///
/// The runtime is usually set implicitly when this function is called
/// from a future driven by a tokio runtime, otherwise runtime can be set
/// explicitly with [`Runtime::enter`](crate::runtime::Runtime::enter) function.
pub fn from_owned_fd_unchecked(owned_fd: OwnedFd) -> io::Result<Receiver> {
// Safety: OwnedFd represents a valid, open file descriptor.
let mio_rx = unsafe { mio_pipe::Receiver::from_raw_fd(owned_fd.into_raw_fd()) };
Receiver::from_mio(mio_rx)
}

Expand Down Expand Up @@ -1146,6 +1314,31 @@ impl Receiver {
})
}
}

/// Converts the pipe into an [`OwnedFd`] in blocking mode.
///
/// This function will deregister this pipe end from the event loop, set
/// it in blocking mode and perform the conversion.
pub fn into_blocking_fd(self) -> io::Result<OwnedFd> {
let fd = self.into_nonblocking_fd()?;
set_blocking(&fd)?;
Ok(fd)
}

/// Converts the pipe into an [`OwnedFd`] in nonblocking mode.
///
/// This function will deregister this pipe end from the event loop and
/// perform the conversion. Returned file descriptor will be in nonblocking
/// mode.
pub fn into_nonblocking_fd(self) -> io::Result<OwnedFd> {
let mio_pipe = self.io.into_inner()?;

// Safety: the pipe is now deregistered from the event loop
// and we are the only owner of this pipe end.
let owned_fd = unsafe { OwnedFd::from_raw_fd(mio_pipe.into_raw_fd()) };

Ok(owned_fd)
}
}

impl AsyncRead for Receiver {
Expand All @@ -1172,15 +1365,27 @@ impl AsFd for Receiver {
}
}

/// Checks if file is a FIFO
fn is_fifo(file: &File) -> io::Result<bool> {
Ok(file.metadata()?.file_type().is_fifo())
/// Checks if the file descriptor is a pipe or a FIFO.
fn is_pipe(fd: BorrowedFd<'_>) -> io::Result<bool> {
// Safety: `libc::stat` is C-like struct used for syscalls and all-zero
// byte pattern forms a valid value.
let mut stat: libc::stat = unsafe { std::mem::zeroed() };

// Safety: it's safe to call `fstat` with a valid, open file descriptor
// and a valid pointer to a `stat` struct.
let r = unsafe { libc::fstat(fd.as_raw_fd(), &mut stat) };
Darksonn marked this conversation as resolved.
Show resolved Hide resolved

if r == -1 {
Err(io::Error::last_os_error())
} else {
Ok((stat.st_mode as libc::mode_t & libc::S_IFMT) == libc::S_IFIFO)
}
}

/// Gets file descriptor's flags by fcntl.
fn get_file_flags(file: &File) -> io::Result<libc::c_int> {
let fd = file.as_raw_fd();
let flags = unsafe { libc::fcntl(fd, libc::F_GETFL) };
fn get_file_flags(fd: BorrowedFd<'_>) -> io::Result<libc::c_int> {
// Safety: it's safe to use `fcntl` to read flags of a valid, open file descriptor.
let flags = unsafe { libc::fcntl(fd.as_raw_fd(), libc::F_GETFL) };
if flags < 0 {
Err(io::Error::last_os_error())
} else {
Expand All @@ -1200,18 +1405,38 @@ fn has_write_access(flags: libc::c_int) -> bool {
mode == libc::O_WRONLY || mode == libc::O_RDWR
}

/// Sets file's flags with `O_NONBLOCK` by fcntl.
fn set_nonblocking(file: &mut File, current_flags: libc::c_int) -> io::Result<()> {
let fd = file.as_raw_fd();

/// Sets file descriptor's flags with `O_NONBLOCK` by fcntl.
fn set_nonblocking(fd: BorrowedFd<'_>, current_flags: libc::c_int) -> io::Result<()> {
let flags = current_flags | libc::O_NONBLOCK;

if flags != current_flags {
let ret = unsafe { libc::fcntl(fd, libc::F_SETFL, flags) };
// Safety: it's safe to use `fcntl` to set the `O_NONBLOCK` flag of a valid,
// open file descriptor.
let ret = unsafe { libc::fcntl(fd.as_raw_fd(), libc::F_SETFL, flags) };
if ret < 0 {
return Err(io::Error::last_os_error());
}
}

Ok(())
}

/// Removes `O_NONBLOCK` from fd's flags.
fn set_blocking<T: AsRawFd>(fd: &T) -> io::Result<()> {
// Safety: it's safe to use `fcntl` to read flags of a valid, open file descriptor.
let previous = unsafe { libc::fcntl(fd.as_raw_fd(), libc::F_GETFL) };
if previous == -1 {
return Err(io::Error::last_os_error());
}

let new = previous & !libc::O_NONBLOCK;

// Safety: it's safe to use `fcntl` to unset the `O_NONBLOCK` flag of a valid,
// open file descriptor.
let r = unsafe { libc::fcntl(fd.as_raw_fd(), libc::F_SETFL, new) };
if r == -1 {
Err(io::Error::last_os_error())
} else {
Ok(())
}
}
Loading
Loading