From 2b29d82adf3869817bfb02ce1a2053f1df3cac9d Mon Sep 17 00:00:00 2001 From: William Manley Date: Fri, 12 Feb 2021 17:24:31 +0000 Subject: [PATCH] File: Try doing a non-blocking read before punting to the threadpool ...on Linux. If the data is already available in cache this will avoid cross-thread interaction and remove a copy. It should help with latency too as reads that can be satisfied now won't need to wait in queue until other fs operations are complete. --- tokio/Cargo.toml | 6 ++-- tokio/src/fs/file.rs | 79 ++++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 82 insertions(+), 3 deletions(-) diff --git a/tokio/Cargo.toml b/tokio/Cargo.toml index 33c371c98ae..2998574d9ad 100644 --- a/tokio/Cargo.toml +++ b/tokio/Cargo.toml @@ -43,7 +43,7 @@ full = [ "time", ] -fs = [] +fs = ["libc"] io-util = ["memchr", "bytes"] # stdin, stdout, stderr io-std = [] @@ -104,11 +104,11 @@ parking_lot = { version = "0.11.0", optional = true } tracing = { version = "0.1.21", default-features = false, features = ["std"], optional = true } # Not in full [target.'cfg(unix)'.dependencies] -libc = { version = "0.2.42", optional = true } +libc = { git = "https://github.com/rust-lang/libc", rev = "a61fd8c79c21b387cdf6e4bcabc8b04afe594960", optional = true } signal-hook-registry = { version = "1.1.1", optional = true } [target.'cfg(unix)'.dev-dependencies] -libc = { version = "0.2.42" } +libc = { git = "https://github.com/rust-lang/libc", rev = "a61fd8c79c21b387cdf6e4bcabc8b04afe594960" } nix = { version = "0.19.0" } [target.'cfg(windows)'.dependencies.winapi] diff --git a/tokio/src/fs/file.rs b/tokio/src/fs/file.rs index 5c06e732b09..67d33592ee7 100644 --- a/tokio/src/fs/file.rs +++ b/tokio/src/fs/file.rs @@ -499,6 +499,10 @@ impl AsyncRead for File { return Ready(Ok(())); } + if let Some(x) = read_nowait::try_nonblocking_read(me.std.as_ref(), dst) { + return Ready(x); + } + buf.ensure_capacity_for(dst); let std = me.std.clone(); @@ -756,3 +760,78 @@ impl Inner { } } } + +#[cfg(all(target_os = "linux", not(test)))] +mod read_nowait { + use crate::io::ReadBuf; + use libc::{c_int, c_void, iovec, off_t, preadv2}; + use std::{ + os::unix::prelude::AsRawFd, + sync::atomic::{AtomicBool, Ordering}, + }; + + static NONBLOCKING_READ_SUPPORTED: AtomicBool = AtomicBool::new(true); + + pub(crate) fn try_nonblocking_read( + file: &crate::fs::sys::File, + dst: &mut ReadBuf<'_>, + ) -> Option> { + if !NONBLOCKING_READ_SUPPORTED.load(Ordering::Relaxed) { + return None; + } + let out = preadv2_safe(file, dst, -1, libc::RWF_NOWAIT); + if let Err(err) = &out { + match err.raw_os_error() { + Some(libc::ENOSYS) => { + NONBLOCKING_READ_SUPPORTED.store(false, Ordering::Relaxed); + return None; + } + Some(libc::ENOTSUP) | Some(libc::EAGAIN) => return None, + _ => {} + } + } + Some(out) + } + + fn preadv2_safe( + file: &crate::fs::sys::File, + dst: &mut ReadBuf<'_>, + offset: off_t, + flags: c_int, + ) -> std::io::Result<()> { + unsafe { + /* We're manually have to defend against buffer overflows here. The slice API makes + * this fairly streightforward. */ + let unfilled = dst.unfilled_mut(); + let iov = iovec { + iov_base: unfilled.as_mut_ptr() as *mut c_void, + iov_len: unfilled.len(), + }; + /* We take a File object rather than an fd as reading from a sensitive fd may confuse + * other unsafe code that assumes that only they have access to that fd. */ + let bytes_read = preadv2(file.as_raw_fd(), &iov as *const iovec, 1, offset, flags); + if bytes_read < 0 { + Err(std::io::Error::last_os_error()) + } else { + /* preadv2 returns the number of bytes read, e.g. the number of bytes that have + * written into `unfilled`. So it's safe to assume that the data is now + * initialised */ + dst.assume_init(dst.filled().len() + bytes_read as usize); + dst.advance(bytes_read as usize); + Ok(()) + } + } + } +} + +#[cfg(any(not(target_os = "linux"), test))] +mod read_nowait { + use crate::io::ReadBuf; + + pub(crate) fn try_nonblocking_read( + _file: &crate::fs::sys::File, + _dst: &mut ReadBuf<'_>, + ) -> Option> { + None + } +}