Skip to content

Commit

Permalink
Implement RFC 1047 - socket timeouts
Browse files Browse the repository at this point in the history
Closes #25619
  • Loading branch information
sfackler committed May 29, 2015
1 parent 1a3cffb commit 69a0e1a
Show file tree
Hide file tree
Showing 6 changed files with 324 additions and 7 deletions.
111 changes: 111 additions & 0 deletions src/libstd/net/tcp.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ use io;
use net::{ToSocketAddrs, SocketAddr, Shutdown};
use sys_common::net as net_imp;
use sys_common::{AsInner, FromInner};
use time::Duration;

/// A structure which represents a TCP stream between a local socket and a
/// remote socket.
Expand Down Expand Up @@ -139,6 +140,50 @@ impl TcpStream {
pub fn set_keepalive(&self, seconds: Option<u32>) -> io::Result<()> {
self.0.set_keepalive(seconds)
}

/// Sets the read timeout to the timeout specified.
///
/// If the value specified is `None`, then `read` calls will block
/// indefinitely. It is an error to pass the zero `Duration` to this
/// method.
#[unstable(feature = "socket_timeout", reason = "RFC 1047 - recently added")]
pub fn set_read_timeout(&self, dur: Option<Duration>) -> io::Result<()> {
self.0.set_read_timeout(dur)
}

/// Sets the write timeout to the timeout specified.
///
/// If the value specified is `None`, then `write` calls will block
/// indefinitely. It is an error to pass the zero `Duration` to this
/// method.
#[unstable(feature = "socket_timeout", reason = "RFC 1047 - recently added")]
pub fn set_write_timeout(&self, dur: Option<Duration>) -> io::Result<()> {
self.0.set_write_timeout(dur)
}

/// Returns the read timeout of this socket.
///
/// If the timeout is `None`, then `read` calls will block indefinitely.
///
/// # Note
///
/// Some platforms do not provide access to the current timeout.
#[unstable(feature = "socket_timeout", reason = "RFC 1047 - recently added")]
pub fn read_timeout(&self) -> io::Result<Option<Duration>> {
self.0.read_timeout()
}

/// Returns the write timeout of this socket.
///
/// If the timeout is `None`, then `write` calls will block indefinitely.
///
/// # Note
///
/// Some platforms do not provide access to the current timeout.
#[unstable(feature = "socket_timeout", reason = "RFC 1047 - recently added")]
pub fn write_timeout(&self) -> io::Result<Option<Duration>> {
self.0.write_timeout()
}
}

#[stable(feature = "rust1", since = "1.0.0")]
Expand Down Expand Up @@ -262,6 +307,7 @@ mod tests {
use net::test::{next_test_ip4, next_test_ip6};
use sync::mpsc::channel;
use sys_common::AsInner;
use time::Duration;
use thread;

fn each_ip(f: &mut FnMut(SocketAddr)) {
Expand Down Expand Up @@ -855,4 +901,69 @@ mod tests {
stream_inner);
assert_eq!(format!("{:?}", stream), compare);
}

#[test]
fn timeouts() {
let addr = next_test_ip4();
let listener = t!(TcpListener::bind(&addr));

let stream = t!(TcpStream::connect(&("localhost", addr.port())));
let dur = Duration::new(15410, 0);

assert_eq!(None, t!(stream.read_timeout()));

t!(stream.set_read_timeout(Some(dur)));
assert_eq!(Some(dur), t!(stream.read_timeout()));

assert_eq!(None, t!(stream.write_timeout()));

t!(stream.set_write_timeout(Some(dur)));
assert_eq!(Some(dur), t!(stream.write_timeout()));

t!(stream.set_read_timeout(None));
assert_eq!(None, t!(stream.read_timeout()));

t!(stream.set_write_timeout(None));
assert_eq!(None, t!(stream.write_timeout()));
}

#[test]
fn test_read_timeout() {
let addr = next_test_ip4();
let listener = t!(TcpListener::bind(&addr));

let mut stream = t!(TcpStream::connect(&("localhost", addr.port())));
t!(stream.set_read_timeout(Some(Duration::from_millis(10))));

let mut buf = [0; 10];
let wait = Duration::span(|| {
let kind = stream.read(&mut buf).err().expect("expected error").kind();
assert!(kind == ErrorKind::WouldBlock || kind == ErrorKind::TimedOut);
});
assert!(wait > Duration::from_millis(5));
assert!(wait < Duration::from_millis(15));
}

#[test]
fn test_read_with_timeout() {
let addr = next_test_ip4();
let listener = t!(TcpListener::bind(&addr));

let mut stream = t!(TcpStream::connect(&("localhost", addr.port())));
t!(stream.set_read_timeout(Some(Duration::from_millis(10))));

let mut other_end = t!(listener.accept()).0;
t!(other_end.write_all(b"hello world"));

let mut buf = [0; 11];
t!(stream.read(&mut buf));
assert_eq!(b"hello world", &buf[..]);

let wait = Duration::span(|| {
let kind = stream.read(&mut buf).err().expect("expected error").kind();
assert!(kind == ErrorKind::WouldBlock || kind == ErrorKind::TimedOut);
});
assert!(wait > Duration::from_millis(5));
assert!(wait < Duration::from_millis(15));
}
}
99 changes: 99 additions & 0 deletions src/libstd/net/udp.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ use io::{self, Error, ErrorKind};
use net::{ToSocketAddrs, SocketAddr, IpAddr};
use sys_common::net as net_imp;
use sys_common::{AsInner, FromInner};
use time::Duration;

/// A User Datagram Protocol socket.
///
Expand Down Expand Up @@ -127,6 +128,42 @@ impl UdpSocket {
pub fn set_time_to_live(&self, ttl: i32) -> io::Result<()> {
self.0.time_to_live(ttl)
}

/// Sets the read timeout to the timeout specified.
///
/// If the value specified is `None`, then `read` calls will block
/// indefinitely. It is an error to pass the zero `Duration` to this
/// method.
#[unstable(feature = "socket_timeout", reason = "RFC 1047 - recently added")]
pub fn set_read_timeout(&self, dur: Option<Duration>) -> io::Result<()> {
self.0.set_read_timeout(dur)
}

/// Sets the write timeout to the timeout specified.
///
/// If the value specified is `None`, then `write` calls will block
/// indefinitely. It is an error to pass the zero `Duration` to this
/// method.
#[unstable(feature = "socket_timeout", reason = "RFC 1047 - recently added")]
pub fn set_write_timeout(&self, dur: Option<Duration>) -> io::Result<()> {
self.0.set_write_timeout(dur)
}

/// Returns the read timeout of this socket.
///
/// If the timeout is `None`, then `read` calls will block indefinitely.
#[unstable(feature = "socket_timeout", reason = "RFC 1047 - recently added")]
pub fn read_timeout(&self) -> io::Result<Option<Duration>> {
self.0.read_timeout()
}

/// Returns the write timeout of this socket.
///
/// If the timeout is `None`, then `write` calls will block indefinitely.
#[unstable(feature = "socket_timeout", reason = "RFC 1047 - recently added")]
pub fn write_timeout(&self) -> io::Result<Option<Duration>> {
self.0.write_timeout()
}
}

impl AsInner<net_imp::UdpSocket> for UdpSocket {
Expand All @@ -152,6 +189,7 @@ mod tests {
use net::test::{next_test_ip4, next_test_ip6};
use sync::mpsc::channel;
use sys_common::AsInner;
use time::Duration;
use thread;

fn each_ip(f: &mut FnMut(SocketAddr, SocketAddr)) {
Expand Down Expand Up @@ -321,4 +359,65 @@ mod tests {
socket_addr, name, udpsock_inner);
assert_eq!(format!("{:?}", udpsock), compare);
}

#[test]
fn timeouts() {
let addr = next_test_ip4();

let stream = t!(UdpSocket::bind(&addr));
let dur = Duration::new(15410, 0);

assert_eq!(None, t!(stream.read_timeout()));

t!(stream.set_read_timeout(Some(dur)));
assert_eq!(Some(dur), t!(stream.read_timeout()));

assert_eq!(None, t!(stream.write_timeout()));

t!(stream.set_write_timeout(Some(dur)));
assert_eq!(Some(dur), t!(stream.write_timeout()));

t!(stream.set_read_timeout(None));
assert_eq!(None, t!(stream.read_timeout()));

t!(stream.set_write_timeout(None));
assert_eq!(None, t!(stream.write_timeout()));
}

#[test]
fn test_read_timeout() {
let addr = next_test_ip4();

let mut stream = t!(UdpSocket::bind(&addr));
t!(stream.set_read_timeout(Some(Duration::from_millis(10))));

let mut buf = [0; 10];
let wait = Duration::span(|| {
let kind = stream.recv_from(&mut buf).err().expect("expected error").kind();
assert!(kind == ErrorKind::WouldBlock || kind == ErrorKind::TimedOut);
});
assert!(wait > Duration::from_millis(5));
assert!(wait < Duration::from_millis(15));
}

#[test]
fn test_read_with_timeout() {
let addr = next_test_ip4();

let mut stream = t!(UdpSocket::bind(&addr));
t!(stream.set_read_timeout(Some(Duration::from_millis(10))));

t!(stream.send_to(b"hello world", &addr));

let mut buf = [0; 11];
t!(stream.recv_from(&mut buf));
assert_eq!(b"hello world", &buf[..]);

let wait = Duration::span(|| {
let kind = stream.recv_from(&mut buf).err().expect("expected error").kind();
assert!(kind == ErrorKind::WouldBlock || kind == ErrorKind::TimedOut);
});
assert!(wait > Duration::from_millis(5));
assert!(wait < Duration::from_millis(15));
}
}
46 changes: 39 additions & 7 deletions src/libstd/sys/common/net.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,12 +20,13 @@ use str::from_utf8;
use sys::c;
use sys::net::{cvt, cvt_r, cvt_gai, Socket, init, wrlen_t};
use sys_common::{AsInner, FromInner, IntoInner};
use time::Duration;

////////////////////////////////////////////////////////////////////////////////
// sockaddr and misc bindings
////////////////////////////////////////////////////////////////////////////////

fn setsockopt<T>(sock: &Socket, opt: c_int, val: c_int,
pub fn setsockopt<T>(sock: &Socket, opt: c_int, val: c_int,
payload: T) -> io::Result<()> {
unsafe {
let payload = &payload as *const T as *const c_void;
Expand All @@ -35,16 +36,15 @@ fn setsockopt<T>(sock: &Socket, opt: c_int, val: c_int,
}
}

#[allow(dead_code)]
fn getsockopt<T: Copy>(sock: &Socket, opt: c_int,
pub fn getsockopt<T: Copy>(sock: &Socket, opt: c_int,
val: c_int) -> io::Result<T> {
unsafe {
let mut slot: T = mem::zeroed();
let mut len = mem::size_of::<T>() as socklen_t;
let ret = try!(cvt(c::getsockopt(*sock.as_inner(), opt, val,
&mut slot as *mut _ as *mut _,
&mut len)));
assert_eq!(ret as usize, mem::size_of::<T>());
try!(cvt(c::getsockopt(*sock.as_inner(), opt, val,
&mut slot as *mut _ as *mut _,
&mut len)));
assert_eq!(len as usize, mem::size_of::<T>());
Ok(slot)
}
}
Expand Down Expand Up @@ -220,6 +220,22 @@ impl TcpStream {
Ok(())
}

pub fn set_read_timeout(&self, dur: Option<Duration>) -> io::Result<()> {
self.inner.set_timeout(dur, libc::SO_RCVTIMEO)
}

pub fn set_write_timeout(&self, dur: Option<Duration>) -> io::Result<()> {
self.inner.set_timeout(dur, libc::SO_SNDTIMEO)
}

pub fn read_timeout(&self) -> io::Result<Option<Duration>> {
self.inner.timeout(libc::SO_RCVTIMEO)
}

pub fn write_timeout(&self) -> io::Result<Option<Duration>> {
self.inner.timeout(libc::SO_SNDTIMEO)
}

pub fn read(&self, buf: &mut [u8]) -> io::Result<usize> {
self.inner.read(buf)
}
Expand Down Expand Up @@ -471,6 +487,22 @@ impl UdpSocket {
pub fn duplicate(&self) -> io::Result<UdpSocket> {
self.inner.duplicate().map(|s| UdpSocket { inner: s })
}

pub fn set_read_timeout(&self, dur: Option<Duration>) -> io::Result<()> {
self.inner.set_timeout(dur, libc::SO_RCVTIMEO)
}

pub fn set_write_timeout(&self, dur: Option<Duration>) -> io::Result<()> {
self.inner.set_timeout(dur, libc::SO_SNDTIMEO)
}

pub fn read_timeout(&self) -> io::Result<Option<Duration>> {
self.inner.timeout(libc::SO_RCVTIMEO)
}

pub fn write_timeout(&self) -> io::Result<Option<Duration>> {
self.inner.timeout(libc::SO_SNDTIMEO)
}
}

impl FromInner<Socket> for UdpSocket {
Expand Down
Loading

0 comments on commit 69a0e1a

Please sign in to comment.