Skip to content

Commit

Permalink
Auto merge of #506 - asomers:aio3, r=posborne
Browse files Browse the repository at this point in the history
Improve AIO API

- Turn most `aio_*` functions into `AioCb` methods
- Add runtime checks to `AioCb` methods
- Implement `Drop` for `AioCb`
  • Loading branch information
homu committed Feb 27, 2017
2 parents 7e27e4c + e29c0ef commit 06d9b04
Show file tree
Hide file tree
Showing 4 changed files with 279 additions and 136 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ This project adheres to [Semantic Versioning](http://semver.org/).
and Android ([([#438](https://github.com/nix-rust/nix/pull/438))
- Added support for POSIX AIO
([#483](https://github.com/nix-rust/nix/pull/483))
([#506](https://github.com/nix-rust/nix/pull/506))
- Added support for XNU system control sockets
([#478](https://github.com/nix-rust/nix/pull/478))
- Added support for `ioctl` calls on BSD platforms
Expand Down
203 changes: 134 additions & 69 deletions src/sys/aio.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,14 +2,18 @@ use {Error, Errno, Result};
use std::os::unix::io::RawFd;
use libc::{c_void, off_t, size_t};
use libc;
use std::fmt;
use std::fmt::Debug;
use std::io::Write;
use std::io::stderr;
use std::marker::PhantomData;
use std::mem;
use std::ptr::{null, null_mut};
use sys::signal::*;
use sys::time::TimeSpec;

/// Mode for `aio_fsync`. Controls whether only data or both data and metadata
/// are synced.
/// Mode for `AioCb::fsync`. Controls whether only data or both data and
/// metadata are synced.
#[repr(i32)]
#[derive(Clone, Copy, Debug, PartialEq)]
pub enum AioFsyncMode {
Expand Down Expand Up @@ -44,31 +48,34 @@ pub enum LioMode {
LIO_NOWAIT = libc::LIO_NOWAIT,
}

/// Return values for `aio_cancel`
/// Return values for `AioCb::cancel and aio_cancel_all`
#[repr(i32)]
#[derive(Clone, Copy, Debug, PartialEq)]
pub enum AioCancelStat {
/// All outstanding requests were canceled
AioCanceled = libc::AIO_CANCELED,
/// Some requests were not canceled. Their status should be checked with
/// `aio_error`
/// `AioCb::error`
AioNotCanceled = libc::AIO_NOTCANCELED,
/// All of the requests have already finished
AioAllDone = libc::AIO_ALLDONE,
}

/// The basic structure used by all aio functions. Each `aiocb` represents one
/// I/O request.
#[repr(C)]
pub struct AioCb<'a> {
aiocb: libc::aiocb,
/// Tracks whether the buffer pointed to by aiocb.aio_buf is mutable
mutable: bool,
/// Could this `AioCb` potentially have any in-kernel state?
in_progress: bool,
phantom: PhantomData<&'a mut [u8]>
}

impl<'a> AioCb<'a> {
/// Constructs a new `AioCb` with no associated buffer.
///
/// The resulting `AioCb` structure is suitable for use with `aio_fsync`.
/// The resulting `AioCb` structure is suitable for use with `AioCb::fsync`.
/// * `fd` File descriptor. Required for all aio functions.
/// * `prio` If POSIX Prioritized IO is supported, then the operation will
/// be prioritized at the process's priority level minus `prio`
Expand All @@ -81,7 +88,8 @@ impl<'a> AioCb<'a> {
a.aio_nbytes = 0;
a.aio_buf = null_mut();

let aiocb = AioCb { aiocb: a, phantom: PhantomData};
let aiocb = AioCb { aiocb: a, mutable: false, in_progress: false,
phantom: PhantomData};
aiocb
}

Expand All @@ -102,37 +110,41 @@ impl<'a> AioCb<'a> {
let mut a = AioCb::common_init(fd, prio, sigev_notify);
a.aio_offset = offs;
a.aio_nbytes = buf.len() as size_t;
// casting an immutable buffer to a mutable pointer looks unsafe, but
// technically its only unsafe to dereference it, not to create it.
a.aio_buf = buf.as_ptr() as *mut c_void;
a.aio_lio_opcode = opcode as ::c_int;

let aiocb = AioCb { aiocb: a, phantom: PhantomData};
let aiocb = AioCb { aiocb: a, mutable: true, in_progress: false,
phantom: PhantomData};
aiocb
}

/// Like `from_mut_slice`, but works on constant slices rather than
/// mutable slices.
///
/// This is technically unsafe, but in practice it's fine
/// to use with any aio functions except `aio_read` and `lio_listio` (with
/// `opcode` set to `LIO_READ`). This method is useful when writing a const
/// buffer with `aio_write`, since from_mut_slice can't work with const
/// buffers.
/// An `AioCb` created this way cannot be used with `read`, and its
/// `LioOpcode` cannot be set to `LIO_READ`. This method is useful when
/// writing a const buffer with `AioCb::write`, since from_mut_slice can't
/// work with const buffers.
// Note: another solution to the problem of writing const buffers would be
// to genericize AioCb for both &mut [u8] and &[u8] buffers. aio_read could
// take the former and aio_write could take the latter. However, then
// lio_listio wouldn't work, because that function needs a slice of AioCb,
// and they must all be the same type. We're basically stuck with using an
// unsafe function, since aio (as designed in C) is an unsafe API.
pub unsafe fn from_slice(fd: RawFd, offs: off_t, buf: &'a [u8],
prio: ::c_int, sigev_notify: SigevNotify,
opcode: LioOpcode) -> AioCb {
// to genericize AioCb for both &mut [u8] and &[u8] buffers. AioCb::read
// could take the former and AioCb::write could take the latter. However,
// then lio_listio wouldn't work, because that function needs a slice of
// AioCb, and they must all be the same type. We're basically stuck with
// using an unsafe function, since aio (as designed in C) is an unsafe API.
pub fn from_slice(fd: RawFd, offs: off_t, buf: &'a [u8],
prio: ::c_int, sigev_notify: SigevNotify,
opcode: LioOpcode) -> AioCb {
let mut a = AioCb::common_init(fd, prio, sigev_notify);
a.aio_offset = offs;
a.aio_nbytes = buf.len() as size_t;
a.aio_buf = buf.as_ptr() as *mut c_void;
assert!(opcode != LioOpcode::LIO_READ, "Can't read into an immutable buffer");
a.aio_lio_opcode = opcode as ::c_int;

let aiocb = AioCb { aiocb: a, phantom: PhantomData};
let aiocb = AioCb { aiocb: a, mutable: false, in_progress: false,
phantom: PhantomData};
aiocb
}

Expand All @@ -153,56 +165,73 @@ impl<'a> AioCb<'a> {
pub fn set_sigev_notify(&mut self, sigev_notify: SigevNotify) {
self.aiocb.aio_sigevent = SigEvent::new(sigev_notify).sigevent();
}
}

/// Cancels outstanding AIO requests. If `aiocb` is `None`, then all requests
/// for `fd` will be cancelled. Otherwise, only the given `AioCb` will be
/// cancelled.
pub fn aio_cancel(fd: RawFd, aiocb: Option<&mut AioCb>) -> Result<AioCancelStat> {
let p: *mut libc::aiocb = match aiocb {
None => null_mut(),
Some(x) => &mut x.aiocb
};
match unsafe { libc::aio_cancel(fd, p) } {
libc::AIO_CANCELED => Ok(AioCancelStat::AioCanceled),
libc::AIO_NOTCANCELED => Ok(AioCancelStat::AioNotCanceled),
libc::AIO_ALLDONE => Ok(AioCancelStat::AioAllDone),
-1 => Err(Error::last()),
_ => panic!("unknown aio_cancel return value")
/// Cancels an outstanding AIO request.
pub fn cancel(&mut self) -> Result<AioCancelStat> {
match unsafe { libc::aio_cancel(self.aiocb.aio_fildes, &mut self.aiocb) } {
libc::AIO_CANCELED => Ok(AioCancelStat::AioCanceled),
libc::AIO_NOTCANCELED => Ok(AioCancelStat::AioNotCanceled),
libc::AIO_ALLDONE => Ok(AioCancelStat::AioAllDone),
-1 => Err(Error::last()),
_ => panic!("unknown aio_cancel return value")
}
}
}

/// Retrieve error status of an asynchronous operation. If the request has not
/// yet completed, returns `EINPROGRESS`. Otherwise, returns `Ok` or any other
/// error.
pub fn aio_error(aiocb: &mut AioCb) -> Result<()> {
let p: *mut libc::aiocb = &mut aiocb.aiocb;
match unsafe { libc::aio_error(p) } {
0 => Ok(()),
num if num > 0 => Err(Error::from_errno(Errno::from_i32(num))),
-1 => Err(Error::last()),
num => panic!("unknown aio_error return value {:?}", num)
/// Retrieve error status of an asynchronous operation. If the request has
/// not yet completed, returns `EINPROGRESS`. Otherwise, returns `Ok` or
/// any other error.
pub fn error(&mut self) -> Result<()> {
match unsafe { libc::aio_error(&mut self.aiocb as *mut libc::aiocb) } {
0 => Ok(()),
num if num > 0 => Err(Error::from_errno(Errno::from_i32(num))),
-1 => Err(Error::last()),
num => panic!("unknown aio_error return value {:?}", num)
}
}
}

/// An asynchronous version of `fsync`.
pub fn aio_fsync(mode: AioFsyncMode, aiocb: &mut AioCb) -> Result<()> {
let p: *mut libc::aiocb = &mut aiocb.aiocb;
Errno::result(unsafe { libc::aio_fsync(mode as ::c_int, p) }).map(drop)
}
/// An asynchronous version of `fsync`.
pub fn fsync(&mut self, mode: AioFsyncMode) -> Result<()> {
let p: *mut libc::aiocb = &mut self.aiocb;
self.in_progress = true;
Errno::result(unsafe { libc::aio_fsync(mode as ::c_int, p) }).map(drop)
}

/// Asynchronously reads from a file descriptor into a buffer
pub fn read(&mut self) -> Result<()> {
assert!(self.mutable, "Can't read into an immutable buffer");
let p: *mut libc::aiocb = &mut self.aiocb;
self.in_progress = true;
Errno::result(unsafe { libc::aio_read(p) }).map(drop)
}

/// Retrieve return status of an asynchronous operation. Should only be
/// called once for each `AioCb`, after `AioCb::error` indicates that it has
/// completed. The result is the same as for `read`, `write`, of `fsync`.
// Note: this should be just `return`, but that's a reserved word
pub fn aio_return(&mut self) -> Result<isize> {
let p: *mut libc::aiocb = &mut self.aiocb;
self.in_progress = false;
Errno::result(unsafe { libc::aio_return(p) })
}

/// Asynchronously writes from a buffer to a file descriptor
pub fn write(&mut self) -> Result<()> {
let p: *mut libc::aiocb = &mut self.aiocb;
self.in_progress = true;
Errno::result(unsafe { libc::aio_write(p) }).map(drop)
}

/// Asynchronously reads from a file descriptor into a buffer
pub fn aio_read(aiocb: &mut AioCb) -> Result<()> {
let p: *mut libc::aiocb = &mut aiocb.aiocb;
Errno::result(unsafe { libc::aio_read(p) }).map(drop)
}

/// Retrieve return status of an asynchronous operation. Should only be called
/// once for each `AioCb`, after `aio_error` indicates that it has completed.
/// The result the same as for `read`, `write`, of `fsync`.
pub fn aio_return(aiocb: &mut AioCb) -> Result<isize> {
let p: *mut libc::aiocb = &mut aiocb.aiocb;
Errno::result(unsafe { libc::aio_return(p) })
/// Cancels outstanding AIO requests. All requests for `fd` will be cancelled.
pub fn aio_cancel_all(fd: RawFd) -> Result<AioCancelStat> {
match unsafe { libc::aio_cancel(fd, null_mut()) } {
libc::AIO_CANCELED => Ok(AioCancelStat::AioCanceled),
libc::AIO_NOTCANCELED => Ok(AioCancelStat::AioNotCanceled),
libc::AIO_ALLDONE => Ok(AioCancelStat::AioAllDone),
-1 => Err(Error::last()),
_ => panic!("unknown aio_cancel return value")
}
}

/// Suspends the calling process until at least one of the specified `AioCb`s
Expand All @@ -224,11 +253,6 @@ pub fn aio_suspend(list: &[&AioCb], timeout: Option<TimeSpec>) -> Result<()> {
}).map(drop)
}

/// Asynchronously writes from a buffer to a file descriptor
pub fn aio_write(aiocb: &mut AioCb) -> Result<()> {
let p: *mut libc::aiocb = &mut aiocb.aiocb;
Errno::result(unsafe { libc::aio_write(p) }).map(drop)
}

/// Submits multiple asynchronous I/O requests with a single system call. The
/// order in which the requests are carried out is not specified.
Expand All @@ -247,3 +271,44 @@ pub fn lio_listio(mode: LioMode, list: &[&mut AioCb],
libc::lio_listio(mode as i32, p, list.len() as i32, sigevp)
}).map(drop)
}

impl<'a> Debug for AioCb<'a> {
fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
fmt.debug_struct("AioCb")
.field("aio_fildes", &self.aiocb.aio_fildes)
.field("aio_offset", &self.aiocb.aio_offset)
.field("aio_buf", &self.aiocb.aio_buf)
.field("aio_nbytes", &self.aiocb.aio_nbytes)
.field("aio_lio_opcode", &self.aiocb.aio_lio_opcode)
.field("aio_reqprio", &self.aiocb.aio_reqprio)
.field("aio_sigevent", &SigEvent::from(&self.aiocb.aio_sigevent))
.field("mutable", &self.mutable)
.field("in_progress", &self.in_progress)
.field("phantom", &self.phantom)
.finish()
}
}

impl<'a> Drop for AioCb<'a> {
/// If the `AioCb` has no remaining state in the kernel, just drop it.
/// Otherwise, collect its error and return values, so as not to leak
/// resources.
fn drop(&mut self) {
if self.in_progress {
// Well-written programs should never get here. They should always
// wait for an AioCb to complete before dropping it
let _ = write!(stderr(), "WARNING: dropped an in-progress AioCb");
loop {
let ret = aio_suspend(&[&self], None);
match ret {
Ok(()) => break,
Err(Error::Sys(Errno::EINVAL)) => panic!(
"Inconsistent AioCb.in_progress value"),
Err(Error::Sys(Errno::EINTR)) => (), // Retry interrupted syscall
_ => panic!("Unexpected aio_suspend return value {:?}", ret)
};
}
let _ = self.aio_return();
}
}
}
29 changes: 29 additions & 0 deletions src/sys/signal.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@

use libc;
use {Errno, Error, Result};
use std::fmt;
use std::fmt::Debug;
use std::mem;
#[cfg(any(target_os = "dragonfly", target_os = "freebsd"))]
use std::os::unix::io::RawFd;
Expand Down Expand Up @@ -509,6 +511,33 @@ impl SigEvent {
}
}

impl Debug for SigEvent {
#[cfg(any(target_os = "linux", target_os = "freebsd"))]
fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
fmt.debug_struct("SigEvent")
.field("sigev_notify", &self.sigevent.sigev_notify)
.field("sigev_signo", &self.sigevent.sigev_signo)
.field("sigev_value", &self.sigevent.sigev_value.sival_ptr)
.field("sigev_notify_thread_id",
&self.sigevent.sigev_notify_thread_id)
.finish()
}

#[cfg(not(any(target_os = "linux", target_os = "freebsd")))]
fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
fmt.debug_struct("SigEvent")
.field("sigev_notify", &self.sigevent.sigev_notify)
.field("sigev_signo", &self.sigevent.sigev_signo)
.field("sigev_value", &self.sigevent.sigev_value.sival_ptr)
.finish()
}
}

impl<'a> From<&'a libc::sigevent> for SigEvent {
fn from(sigevent: &libc::sigevent) -> Self {
SigEvent{ sigevent: sigevent.clone() }
}
}

#[cfg(test)]
mod tests {
Expand Down
Loading

0 comments on commit 06d9b04

Please sign in to comment.