diff --git a/tokio/src/net/unix/pipe.rs b/tokio/src/net/unix/pipe.rs index 0b2508a9257..7c279134dbf 100644 --- a/tokio/src/net/unix/pipe.rs +++ b/tokio/src/net/unix/pipe.rs @@ -6,8 +6,8 @@ use crate::io::{AsyncRead, AsyncWrite, PollEvented, ReadBuf, Ready}; use mio::unix::pipe as mio_pipe; use std::fs::File; use std::io::{self, Read, Write}; -use std::os::unix::fs::{FileTypeExt, OpenOptionsExt}; -use std::os::unix::io::{AsFd, AsRawFd, BorrowedFd, FromRawFd, IntoRawFd, RawFd}; +use std::os::unix::fs::OpenOptionsExt; +use std::os::unix::io::{AsFd, AsRawFd, BorrowedFd, FromRawFd, IntoRawFd, OwnedFd, RawFd}; use std::path::Path; use std::pin::Pin; use std::task::{Context, Poll}; @@ -16,6 +16,59 @@ cfg_io_util! { use bytes::BufMut; } +/// Creates a new anonymous Unix pipe. +/// +/// This function will open a new pipe and associate both pipe ends with the default +/// event loop. +/// +/// If you need to create a pipe for communication with a spawned process, you can +/// use [`Stdio::piped()`] instead. +/// +/// [`Stdio::piped()`]: std::process::Stdio::piped +/// +/// # Errors +/// +/// If creating a pipe fails, this function will return with the related OS error. +/// +/// # Examples +/// +/// Create a pipe and pass the writing end to a spawned process. +/// +/// ```no_run +/// use tokio::net::unix::pipe; +/// use tokio::process::Command; +/// # use tokio::io::AsyncReadExt; +/// # use std::error::Error; +/// +/// # async fn dox() -> Result<(), Box> { +/// let (tx, mut rx) = pipe::pipe()?; +/// let mut buffer = String::new(); +/// +/// let status = Command::new("echo") +/// .arg("Hello, world!") +/// .stdout(tx.into_blocking_fd()?) +/// .status(); +/// rx.read_to_string(&mut buffer).await?; +/// +/// assert!(status.await?.success()); +/// assert_eq!(buffer, "Hello, world!\n"); +/// # Ok(()) +/// # } +/// ``` +/// +/// # Panics +/// +/// This function panics if it is not called from within a runtime with +/// IO enabled. +/// +/// The runtime is usually set implicitly when this function is called +/// from a future driven by a tokio runtime, otherwise runtime can be set +/// explicitly with [`Runtime::enter`](crate::runtime::Runtime::enter) function. +pub fn pipe() -> io::Result<(Sender, Receiver)> { + let (tx, rx) = mio_pipe::new()?; + Ok((Sender::from_mio(tx)?, Receiver::from_mio(rx)?)) +} + /// Options and flags which can be used to configure how a FIFO file is opened. /// /// This builder allows configuring how to create a pipe end from a FIFO file. @@ -218,7 +271,7 @@ impl OpenOptions { let file = options.open(path)?; - if !self.unchecked && !is_fifo(&file)? { + if !self.unchecked && !is_pipe(file.as_fd())? { return Err(io::Error::new(io::ErrorKind::InvalidInput, "not a pipe")); } @@ -338,15 +391,40 @@ impl Sender { /// The runtime is usually set implicitly when this function is called /// from a future driven by a tokio runtime, otherwise runtime can be set /// explicitly with [`Runtime::enter`](crate::runtime::Runtime::enter) function. - pub fn from_file(mut file: File) -> io::Result { - if !is_fifo(&file)? { + pub fn from_file(file: File) -> io::Result { + Sender::from_owned_fd(file.into()) + } + + /// Creates a new `Sender` from an [`OwnedFd`]. + /// + /// This function is intended to construct a pipe from an [`OwnedFd`] representing + /// an anonymous pipe or a special FIFO file. It will check if the file descriptor + /// is a pipe and has write access, set it in non-blocking mode and perform the + /// conversion. + /// + /// # Errors + /// + /// Fails with `io::ErrorKind::InvalidInput` if the file descriptor is not a pipe + /// or it does not have write access. Also fails with any standard OS error if it + /// occurs. + /// + /// # Panics + /// + /// This function panics if it is not called from within a runtime with + /// IO enabled. + /// + /// The runtime is usually set implicitly when this function is called + /// from a future driven by a tokio runtime, otherwise runtime can be set + /// explicitly with [`Runtime::enter`](crate::runtime::Runtime::enter) function. + pub fn from_owned_fd(owned_fd: OwnedFd) -> io::Result { + if !is_pipe(owned_fd.as_fd())? { return Err(io::Error::new(io::ErrorKind::InvalidInput, "not a pipe")); } - let flags = get_file_flags(&file)?; + let flags = get_file_flags(owned_fd.as_fd())?; if has_write_access(flags) { - set_nonblocking(&mut file, flags)?; - Sender::from_file_unchecked(file) + set_nonblocking(owned_fd.as_fd(), flags)?; + Sender::from_owned_fd_unchecked(owned_fd) } else { Err(io::Error::new( io::ErrorKind::InvalidInput, @@ -394,8 +472,28 @@ impl Sender { /// from a future driven by a tokio runtime, otherwise runtime can be set /// explicitly with [`Runtime::enter`](crate::runtime::Runtime::enter) function. pub fn from_file_unchecked(file: File) -> io::Result { - let raw_fd = file.into_raw_fd(); - let mio_tx = unsafe { mio_pipe::Sender::from_raw_fd(raw_fd) }; + Sender::from_owned_fd_unchecked(file.into()) + } + + /// Creates a new `Sender` from an [`OwnedFd`] without checking pipe properties. + /// + /// This function is intended to construct a pipe from an [`OwnedFd`] representing + /// an anonymous pipe or a special FIFO file. The conversion assumes nothing about + /// the underlying pipe; it is left up to the user to make sure that the file + /// descriptor represents the writing end of a pipe and the pipe is set in + /// non-blocking mode. + /// + /// # Panics + /// + /// This function panics if it is not called from within a runtime with + /// IO enabled. + /// + /// The runtime is usually set implicitly when this function is called + /// from a future driven by a tokio runtime, otherwise runtime can be set + /// explicitly with [`Runtime::enter`](crate::runtime::Runtime::enter) function. + pub fn from_owned_fd_unchecked(owned_fd: OwnedFd) -> io::Result { + // Safety: OwnedFd represents a valid, open file descriptor. + let mio_tx = unsafe { mio_pipe::Sender::from_raw_fd(owned_fd.into_raw_fd()) }; Sender::from_mio(mio_tx) } @@ -623,6 +721,31 @@ impl Sender { .registration() .try_io(Interest::WRITABLE, || (&*self.io).write_vectored(buf)) } + + /// Converts the pipe into an [`OwnedFd`] in blocking mode. + /// + /// This function will deregister this pipe end from the event loop, set + /// it in blocking mode and perform the conversion. + pub fn into_blocking_fd(self) -> io::Result { + let fd = self.into_nonblocking_fd()?; + set_blocking(&fd)?; + Ok(fd) + } + + /// Converts the pipe into an [`OwnedFd`] in nonblocking mode. + /// + /// This function will deregister this pipe end from the event loop and + /// perform the conversion. The returned file descriptor will be in nonblocking + /// mode. + pub fn into_nonblocking_fd(self) -> io::Result { + let mio_pipe = self.io.into_inner()?; + + // Safety: the pipe is now deregistered from the event loop + // and we are the only owner of this pipe end. + let owned_fd = unsafe { OwnedFd::from_raw_fd(mio_pipe.into_raw_fd()) }; + + Ok(owned_fd) + } } impl AsyncWrite for Sender { @@ -764,15 +887,40 @@ impl Receiver { /// The runtime is usually set implicitly when this function is called /// from a future driven by a tokio runtime, otherwise runtime can be set /// explicitly with [`Runtime::enter`](crate::runtime::Runtime::enter) function. - pub fn from_file(mut file: File) -> io::Result { - if !is_fifo(&file)? { + pub fn from_file(file: File) -> io::Result { + Receiver::from_owned_fd(file.into()) + } + + /// Creates a new `Receiver` from an [`OwnedFd`]. + /// + /// This function is intended to construct a pipe from an [`OwnedFd`] representing + /// an anonymous pipe or a special FIFO file. It will check if the file descriptor + /// is a pipe and has read access, set it in non-blocking mode and perform the + /// conversion. + /// + /// # Errors + /// + /// Fails with `io::ErrorKind::InvalidInput` if the file descriptor is not a pipe + /// or it does not have read access. Also fails with any standard OS error if it + /// occurs. + /// + /// # Panics + /// + /// This function panics if it is not called from within a runtime with + /// IO enabled. + /// + /// The runtime is usually set implicitly when this function is called + /// from a future driven by a tokio runtime, otherwise runtime can be set + /// explicitly with [`Runtime::enter`](crate::runtime::Runtime::enter) function. + pub fn from_owned_fd(owned_fd: OwnedFd) -> io::Result { + if !is_pipe(owned_fd.as_fd())? { return Err(io::Error::new(io::ErrorKind::InvalidInput, "not a pipe")); } - let flags = get_file_flags(&file)?; + let flags = get_file_flags(owned_fd.as_fd())?; if has_read_access(flags) { - set_nonblocking(&mut file, flags)?; - Receiver::from_file_unchecked(file) + set_nonblocking(owned_fd.as_fd(), flags)?; + Receiver::from_owned_fd_unchecked(owned_fd) } else { Err(io::Error::new( io::ErrorKind::InvalidInput, @@ -820,8 +968,28 @@ impl Receiver { /// from a future driven by a tokio runtime, otherwise runtime can be set /// explicitly with [`Runtime::enter`](crate::runtime::Runtime::enter) function. pub fn from_file_unchecked(file: File) -> io::Result { - let raw_fd = file.into_raw_fd(); - let mio_rx = unsafe { mio_pipe::Receiver::from_raw_fd(raw_fd) }; + Receiver::from_owned_fd_unchecked(file.into()) + } + + /// Creates a new `Receiver` from an [`OwnedFd`] without checking pipe properties. + /// + /// This function is intended to construct a pipe from an [`OwnedFd`] representing + /// an anonymous pipe or a special FIFO file. The conversion assumes nothing about + /// the underlying pipe; it is left up to the user to make sure that the file + /// descriptor represents the reading end of a pipe and the pipe is set in + /// non-blocking mode. + /// + /// # Panics + /// + /// This function panics if it is not called from within a runtime with + /// IO enabled. + /// + /// The runtime is usually set implicitly when this function is called + /// from a future driven by a tokio runtime, otherwise runtime can be set + /// explicitly with [`Runtime::enter`](crate::runtime::Runtime::enter) function. + pub fn from_owned_fd_unchecked(owned_fd: OwnedFd) -> io::Result { + // Safety: OwnedFd represents a valid, open file descriptor. + let mio_rx = unsafe { mio_pipe::Receiver::from_raw_fd(owned_fd.into_raw_fd()) }; Receiver::from_mio(mio_rx) } @@ -1146,6 +1314,31 @@ impl Receiver { }) } } + + /// Converts the pipe into an [`OwnedFd`] in blocking mode. + /// + /// This function will deregister this pipe end from the event loop, set + /// it in blocking mode and perform the conversion. + pub fn into_blocking_fd(self) -> io::Result { + let fd = self.into_nonblocking_fd()?; + set_blocking(&fd)?; + Ok(fd) + } + + /// Converts the pipe into an [`OwnedFd`] in nonblocking mode. + /// + /// This function will deregister this pipe end from the event loop and + /// perform the conversion. Returned file descriptor will be in nonblocking + /// mode. + pub fn into_nonblocking_fd(self) -> io::Result { + let mio_pipe = self.io.into_inner()?; + + // Safety: the pipe is now deregistered from the event loop + // and we are the only owner of this pipe end. + let owned_fd = unsafe { OwnedFd::from_raw_fd(mio_pipe.into_raw_fd()) }; + + Ok(owned_fd) + } } impl AsyncRead for Receiver { @@ -1172,15 +1365,27 @@ impl AsFd for Receiver { } } -/// Checks if file is a FIFO -fn is_fifo(file: &File) -> io::Result { - Ok(file.metadata()?.file_type().is_fifo()) +/// Checks if the file descriptor is a pipe or a FIFO. +fn is_pipe(fd: BorrowedFd<'_>) -> io::Result { + // Safety: `libc::stat` is C-like struct used for syscalls and all-zero + // byte pattern forms a valid value. + let mut stat: libc::stat = unsafe { std::mem::zeroed() }; + + // Safety: it's safe to call `fstat` with a valid, open file descriptor + // and a valid pointer to a `stat` struct. + let r = unsafe { libc::fstat(fd.as_raw_fd(), &mut stat) }; + + if r == -1 { + Err(io::Error::last_os_error()) + } else { + Ok((stat.st_mode as libc::mode_t & libc::S_IFMT) == libc::S_IFIFO) + } } /// Gets file descriptor's flags by fcntl. -fn get_file_flags(file: &File) -> io::Result { - let fd = file.as_raw_fd(); - let flags = unsafe { libc::fcntl(fd, libc::F_GETFL) }; +fn get_file_flags(fd: BorrowedFd<'_>) -> io::Result { + // Safety: it's safe to use `fcntl` to read flags of a valid, open file descriptor. + let flags = unsafe { libc::fcntl(fd.as_raw_fd(), libc::F_GETFL) }; if flags < 0 { Err(io::Error::last_os_error()) } else { @@ -1200,14 +1405,14 @@ fn has_write_access(flags: libc::c_int) -> bool { mode == libc::O_WRONLY || mode == libc::O_RDWR } -/// Sets file's flags with `O_NONBLOCK` by fcntl. -fn set_nonblocking(file: &mut File, current_flags: libc::c_int) -> io::Result<()> { - let fd = file.as_raw_fd(); - +/// Sets file descriptor's flags with `O_NONBLOCK` by fcntl. +fn set_nonblocking(fd: BorrowedFd<'_>, current_flags: libc::c_int) -> io::Result<()> { let flags = current_flags | libc::O_NONBLOCK; if flags != current_flags { - let ret = unsafe { libc::fcntl(fd, libc::F_SETFL, flags) }; + // Safety: it's safe to use `fcntl` to set the `O_NONBLOCK` flag of a valid, + // open file descriptor. + let ret = unsafe { libc::fcntl(fd.as_raw_fd(), libc::F_SETFL, flags) }; if ret < 0 { return Err(io::Error::last_os_error()); } @@ -1215,3 +1420,23 @@ fn set_nonblocking(file: &mut File, current_flags: libc::c_int) -> io::Result<() Ok(()) } + +/// Removes `O_NONBLOCK` from fd's flags. +fn set_blocking(fd: &T) -> io::Result<()> { + // Safety: it's safe to use `fcntl` to read flags of a valid, open file descriptor. + let previous = unsafe { libc::fcntl(fd.as_raw_fd(), libc::F_GETFL) }; + if previous == -1 { + return Err(io::Error::last_os_error()); + } + + let new = previous & !libc::O_NONBLOCK; + + // Safety: it's safe to use `fcntl` to unset the `O_NONBLOCK` flag of a valid, + // open file descriptor. + let r = unsafe { libc::fcntl(fd.as_raw_fd(), libc::F_SETFL, new) }; + if r == -1 { + Err(io::Error::last_os_error()) + } else { + Ok(()) + } +} diff --git a/tokio/tests/net_unix_pipe.rs b/tokio/tests/net_unix_pipe.rs index c96d6e70fbd..6706880ed1b 100644 --- a/tokio/tests/net_unix_pipe.rs +++ b/tokio/tests/net_unix_pipe.rs @@ -427,3 +427,108 @@ async fn try_read_buf() -> std::io::Result<()> { Ok(()) } + +#[tokio::test] +async fn anon_pipe_simple_send() -> io::Result<()> { + const DATA: &[u8] = b"this is some data to write to the pipe"; + + let (mut writer, mut reader) = pipe::pipe()?; + + // Create a reading task which should wait for data from the pipe. + let mut read_fut = task::spawn(async move { + let mut buf = vec![0; DATA.len()]; + reader.read_exact(&mut buf).await?; + Ok::<_, io::Error>(buf) + }); + assert_pending!(read_fut.poll()); + + writer.write_all(DATA).await?; + + // Let the IO driver poll events for the reader. + while !read_fut.is_woken() { + tokio::task::yield_now().await; + } + + // Reading task should be ready now. + let read_data = assert_ready_ok!(read_fut.poll()); + assert_eq!(&read_data, DATA); + + Ok(()) +} + +#[tokio::test] +async fn anon_pipe_spawn_echo() -> std::io::Result<()> { + use tokio::process::Command; + + const DATA: &str = "this is some data to write to the pipe"; + + let (tx, mut rx) = pipe::pipe()?; + + let status = Command::new("echo") + .arg("-n") + .arg(DATA) + .stdout(tx.into_blocking_fd()?) + .status(); + + let mut buf = vec![0; DATA.len()]; + rx.read_exact(&mut buf).await?; + assert_eq!(String::from_utf8(buf).unwrap(), DATA); + + let exit_code = status.await?; + assert!(exit_code.success()); + + // Check if the pipe is closed. + buf = Vec::new(); + let total = assert_ok!(rx.try_read(&mut buf)); + assert_eq!(total, 0); + + Ok(()) +} + +#[tokio::test] +#[cfg(target_os = "linux")] +async fn anon_pipe_from_owned_fd() -> std::io::Result<()> { + use nix::fcntl::OFlag; + use std::os::unix::io::{FromRawFd, OwnedFd}; + + const DATA: &[u8] = b"this is some data to write to the pipe"; + + let fds = nix::unistd::pipe2(OFlag::O_CLOEXEC | OFlag::O_NONBLOCK)?; + let (rx_fd, tx_fd) = unsafe { (OwnedFd::from_raw_fd(fds.0), OwnedFd::from_raw_fd(fds.1)) }; + + let mut rx = pipe::Receiver::from_owned_fd(rx_fd)?; + let mut tx = pipe::Sender::from_owned_fd(tx_fd)?; + + let mut buf = vec![0; DATA.len()]; + tx.write_all(DATA).await?; + rx.read_exact(&mut buf).await?; + assert_eq!(buf, DATA); + + Ok(()) +} + +#[tokio::test] +async fn anon_pipe_into_nonblocking_fd() -> std::io::Result<()> { + let (tx, rx) = pipe::pipe()?; + + let tx_fd = tx.into_nonblocking_fd()?; + let rx_fd = rx.into_nonblocking_fd()?; + + assert!(is_nonblocking(&tx_fd)?); + assert!(is_nonblocking(&rx_fd)?); + + Ok(()) +} + +#[tokio::test] +async fn anon_pipe_into_blocking_fd() -> std::io::Result<()> { + let (tx, rx) = pipe::pipe()?; + + let tx_fd = tx.into_blocking_fd()?; + let rx_fd = rx.into_blocking_fd()?; + + assert!(!is_nonblocking(&tx_fd)?); + assert!(!is_nonblocking(&rx_fd)?); + + Ok(()) +}