Skip to content

Commit

Permalink
auto merge of #11894 : alexcrichton/rust/io-clone, r=brson
Browse files Browse the repository at this point in the history
This is part of the overall strategy I would like to take when approaching
issue #11165. The only two I/O objects that reasonably want to be "split" are
the network stream objects. Everything else can be "split" by just creating
another version.

The initial idea I had was the literally split the object into a reader and a
writer half, but that would just introduce lots of clutter with extra interfaces
that were a little unnnecssary, or it would return a ~Reader and a ~Writer which
means you couldn't access things like the remote peer name or local socket name.

The solution I found to be nicer was to just clone the stream itself. The clone
is just a clone of the handle, nothing fancy going on at the kernel level.
Conceptually I found this very easy to wrap my head around (everything else
supports clone()), and it solved the "split" problem at the same time.

The cloning support is pretty specific per platform/lib combination:

* native/win32 - uses some specific WSA apis to clone the SOCKET handle
* native/unix - uses dup() to get another file descriptor
* green/all - This is where things get interesting. When we support full clones
              of a handle, this implies that we're allowing simultaneous writes
              and reads to happen. It turns out that libuv doesn't support two
              simultaneous reads or writes of the same object. It does support
              *one* read and *one* write at the same time, however. Some extra
              infrastructure was added to just block concurrent writers/readers
              until the previous read/write operation was completed.

I've added tests to the tcp/unix modules to make sure that this functionality is
supported everywhere.
  • Loading branch information
bors committed Feb 5, 2014
2 parents 55684ba + 56080c4 commit 6aad3bf
Show file tree
Hide file tree
Showing 18 changed files with 812 additions and 82 deletions.
47 changes: 31 additions & 16 deletions src/libnative/io/file.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@

//! Blocking posix-based file I/O
use std::sync::arc::UnsafeArc;
use std::c_str::CString;
use std::io::IoError;
use std::io;
Expand Down Expand Up @@ -55,9 +56,13 @@ pub fn keep_going(data: &[u8], f: |*u8, uint| -> i64) -> i64 {

pub type fd_t = libc::c_int;

struct Inner {
fd: fd_t,
close_on_drop: bool,
}

pub struct FileDesc {
priv fd: fd_t,
priv close_on_drop: bool,
priv inner: UnsafeArc<Inner>
}

impl FileDesc {
Expand All @@ -70,7 +75,10 @@ impl FileDesc {
/// Note that all I/O operations done on this object will be *blocking*, but
/// they do not require the runtime to be active.
pub fn new(fd: fd_t, close_on_drop: bool) -> FileDesc {
FileDesc { fd: fd, close_on_drop: close_on_drop }
FileDesc { inner: UnsafeArc::new(Inner {
fd: fd,
close_on_drop: close_on_drop
}) }
}

// FIXME(#10465) these functions should not be public, but anything in
Expand All @@ -80,7 +88,7 @@ impl FileDesc {
#[cfg(windows)] type rlen = libc::c_uint;
#[cfg(not(windows))] type rlen = libc::size_t;
let ret = retry(|| unsafe {
libc::read(self.fd,
libc::read(self.fd(),
buf.as_ptr() as *mut libc::c_void,
buf.len() as rlen) as libc::c_int
});
Expand All @@ -97,7 +105,7 @@ impl FileDesc {
#[cfg(not(windows))] type wlen = libc::size_t;
let ret = keep_going(buf, |buf, len| {
unsafe {
libc::write(self.fd, buf as *libc::c_void, len as wlen) as i64
libc::write(self.fd(), buf as *libc::c_void, len as wlen) as i64
}
});
if ret < 0 {
Expand All @@ -107,7 +115,11 @@ impl FileDesc {
}
}

pub fn fd(&self) -> fd_t { self.fd }
pub fn fd(&self) -> fd_t {
// This unsafety is fine because we're just reading off the file
// descriptor, no one is modifying this.
unsafe { (*self.inner.get()).fd }
}
}

impl io::Reader for FileDesc {
Expand All @@ -130,7 +142,7 @@ impl rtio::RtioFileStream for FileDesc {
self.inner_write(buf)
}
fn pread(&mut self, buf: &mut [u8], offset: u64) -> Result<int, IoError> {
return os_pread(self.fd, buf.as_ptr(), buf.len(), offset);
return os_pread(self.fd(), buf.as_ptr(), buf.len(), offset);

#[cfg(windows)]
fn os_pread(fd: c_int, buf: *u8, amt: uint, offset: u64) -> IoResult<int> {
Expand Down Expand Up @@ -162,7 +174,7 @@ impl rtio::RtioFileStream for FileDesc {
}
}
fn pwrite(&mut self, buf: &[u8], offset: u64) -> Result<(), IoError> {
return os_pwrite(self.fd, buf.as_ptr(), buf.len(), offset);
return os_pwrite(self.fd(), buf.as_ptr(), buf.len(), offset);

#[cfg(windows)]
fn os_pwrite(fd: c_int, buf: *u8, amt: uint, offset: u64) -> IoResult<()> {
Expand Down Expand Up @@ -197,7 +209,7 @@ impl rtio::RtioFileStream for FileDesc {
io::SeekCur => libc::FILE_CURRENT,
};
unsafe {
let handle = libc::get_osfhandle(self.fd) as libc::HANDLE;
let handle = libc::get_osfhandle(self.fd()) as libc::HANDLE;
let mut newpos = 0;
match libc::SetFilePointerEx(handle, pos, &mut newpos, whence) {
0 => Err(super::last_error()),
Expand All @@ -212,23 +224,23 @@ impl rtio::RtioFileStream for FileDesc {
io::SeekEnd => libc::SEEK_END,
io::SeekCur => libc::SEEK_CUR,
};
let n = unsafe { libc::lseek(self.fd, pos as libc::off_t, whence) };
let n = unsafe { libc::lseek(self.fd(), pos as libc::off_t, whence) };
if n < 0 {
Err(super::last_error())
} else {
Ok(n as u64)
}
}
fn tell(&self) -> Result<u64, IoError> {
let n = unsafe { libc::lseek(self.fd, 0, libc::SEEK_CUR) };
let n = unsafe { libc::lseek(self.fd(), 0, libc::SEEK_CUR) };
if n < 0 {
Err(super::last_error())
} else {
Ok(n as u64)
}
}
fn fsync(&mut self) -> Result<(), IoError> {
return os_fsync(self.fd);
return os_fsync(self.fd());

#[cfg(windows)]
fn os_fsync(fd: c_int) -> IoResult<()> {
Expand All @@ -247,7 +259,7 @@ impl rtio::RtioFileStream for FileDesc {

#[cfg(not(windows))]
fn datasync(&mut self) -> Result<(), IoError> {
return super::mkerr_libc(os_datasync(self.fd));
return super::mkerr_libc(os_datasync(self.fd()));

#[cfg(target_os = "macos")]
fn os_datasync(fd: c_int) -> c_int {
Expand All @@ -270,7 +282,7 @@ impl rtio::RtioFileStream for FileDesc {
Ok(_) => {}, Err(e) => return Err(e),
};
let ret = unsafe {
let handle = libc::get_osfhandle(self.fd) as libc::HANDLE;
let handle = libc::get_osfhandle(self.fd()) as libc::HANDLE;
match libc::SetEndOfFile(handle) {
0 => Err(super::last_error()),
_ => Ok(())
Expand All @@ -282,7 +294,7 @@ impl rtio::RtioFileStream for FileDesc {
#[cfg(unix)]
fn truncate(&mut self, offset: i64) -> Result<(), IoError> {
super::mkerr_libc(retry(|| unsafe {
libc::ftruncate(self.fd, offset as libc::off_t)
libc::ftruncate(self.fd(), offset as libc::off_t)
}))
}
}
Expand All @@ -294,6 +306,9 @@ impl rtio::RtioPipe for FileDesc {
fn write(&mut self, buf: &[u8]) -> Result<(), IoError> {
self.inner_write(buf)
}
fn clone(&self) -> ~rtio::RtioPipe {
~FileDesc { inner: self.inner.clone() } as ~rtio::RtioPipe
}
}

impl rtio::RtioTTY for FileDesc {
Expand All @@ -312,7 +327,7 @@ impl rtio::RtioTTY for FileDesc {
fn isatty(&self) -> bool { false }
}

impl Drop for FileDesc {
impl Drop for Inner {
fn drop(&mut self) {
// closing stdio file handles makes no sense, so never do it. Also, note
// that errors are ignored when closing a file descriptor. The reason
Expand Down
Loading

0 comments on commit 6aad3bf

Please sign in to comment.