Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement RFC 1047 - socket timeouts #25818

Merged
merged 4 commits into from
May 30, 2015
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 1 addition & 2 deletions src/liblibc/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6043,7 +6043,6 @@ pub mod funcs {
use types::common::c95::{c_void};
use types::os::common::bsd44::{socklen_t, sockaddr, SOCKET};
use types::os::arch::c95::c_int;
use types::os::arch::posix88::ssize_t;

extern "system" {
pub fn socket(domain: c_int, ty: c_int, protocol: c_int) -> SOCKET;
Expand All @@ -6068,7 +6067,7 @@ pub mod funcs {
flags: c_int) -> c_int;
pub fn recvfrom(socket: SOCKET, buf: *mut c_void, len: c_int,
flags: c_int, addr: *mut sockaddr,
addrlen: *mut c_int) -> ssize_t;
addrlen: *mut c_int) -> c_int;
pub fn sendto(socket: SOCKET, buf: *const c_void, len: c_int,
flags: c_int, addr: *const sockaddr,
addrlen: c_int) -> c_int;
Expand Down
111 changes: 111 additions & 0 deletions src/libstd/net/tcp.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ use io;
use net::{ToSocketAddrs, SocketAddr, Shutdown};
use sys_common::net as net_imp;
use sys_common::{AsInner, FromInner};
use time::Duration;

/// A structure which represents a TCP stream between a local socket and a
/// remote socket.
Expand Down Expand Up @@ -139,6 +140,50 @@ impl TcpStream {
pub fn set_keepalive(&self, seconds: Option<u32>) -> io::Result<()> {
self.0.set_keepalive(seconds)
}

/// Sets the read timeout to the timeout specified.
///
/// If the value specified is `None`, then `read` calls will block
/// indefinitely. It is an error to pass the zero `Duration` to this
/// method.
#[unstable(feature = "socket_timeout", reason = "RFC 1047 - recently added")]
pub fn set_read_timeout(&self, dur: Option<Duration>) -> io::Result<()> {
self.0.set_read_timeout(dur)
}

/// Sets the write timeout to the timeout specified.
///
/// If the value specified is `None`, then `write` calls will block
/// indefinitely. It is an error to pass the zero `Duration` to this
/// method.
#[unstable(feature = "socket_timeout", reason = "RFC 1047 - recently added")]
pub fn set_write_timeout(&self, dur: Option<Duration>) -> io::Result<()> {
self.0.set_write_timeout(dur)
}

/// Returns the read timeout of this socket.
///
/// If the timeout is `None`, then `read` calls will block indefinitely.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can these docs also mention that not all platforms support accessing the read/write timeouts, even if they have been set successfully?

///
/// # Note
///
/// Some platforms do not provide access to the current timeout.
#[unstable(feature = "socket_timeout", reason = "RFC 1047 - recently added")]
pub fn read_timeout(&self) -> io::Result<Option<Duration>> {
self.0.read_timeout()
}

/// Returns the write timeout of this socket.
///
/// If the timeout is `None`, then `write` calls will block indefinitely.
///
/// # Note
///
/// Some platforms do not provide access to the current timeout.
#[unstable(feature = "socket_timeout", reason = "RFC 1047 - recently added")]
pub fn write_timeout(&self) -> io::Result<Option<Duration>> {
self.0.write_timeout()
}
}

#[stable(feature = "rust1", since = "1.0.0")]
Expand Down Expand Up @@ -262,6 +307,7 @@ mod tests {
use net::test::{next_test_ip4, next_test_ip6};
use sync::mpsc::channel;
use sys_common::AsInner;
use time::Duration;
use thread;

fn each_ip(f: &mut FnMut(SocketAddr)) {
Expand Down Expand Up @@ -855,4 +901,69 @@ mod tests {
stream_inner);
assert_eq!(format!("{:?}", stream), compare);
}

#[test]
fn timeouts() {
let addr = next_test_ip4();
let listener = t!(TcpListener::bind(&addr));

let stream = t!(TcpStream::connect(&("localhost", addr.port())));
let dur = Duration::new(15410, 0);

assert_eq!(None, t!(stream.read_timeout()));

t!(stream.set_read_timeout(Some(dur)));
assert_eq!(Some(dur), t!(stream.read_timeout()));

assert_eq!(None, t!(stream.write_timeout()));

t!(stream.set_write_timeout(Some(dur)));
assert_eq!(Some(dur), t!(stream.write_timeout()));

t!(stream.set_read_timeout(None));
assert_eq!(None, t!(stream.read_timeout()));

t!(stream.set_write_timeout(None));
assert_eq!(None, t!(stream.write_timeout()));
}

#[test]
fn test_read_timeout() {
let addr = next_test_ip4();
let listener = t!(TcpListener::bind(&addr));

let mut stream = t!(TcpStream::connect(&("localhost", addr.port())));
t!(stream.set_read_timeout(Some(Duration::from_millis(1000))));

let mut buf = [0; 10];
let wait = Duration::span(|| {
let kind = stream.read(&mut buf).err().expect("expected error").kind();
assert!(kind == ErrorKind::WouldBlock || kind == ErrorKind::TimedOut);
});
assert!(wait > Duration::from_millis(400));
assert!(wait < Duration::from_millis(1600));
}

#[test]
fn test_read_with_timeout() {
let addr = next_test_ip4();
let listener = t!(TcpListener::bind(&addr));

let mut stream = t!(TcpStream::connect(&("localhost", addr.port())));
t!(stream.set_read_timeout(Some(Duration::from_millis(1000))));

let mut other_end = t!(listener.accept()).0;
t!(other_end.write_all(b"hello world"));

let mut buf = [0; 11];
t!(stream.read(&mut buf));
assert_eq!(b"hello world", &buf[..]);

let wait = Duration::span(|| {
let kind = stream.read(&mut buf).err().expect("expected error").kind();
assert!(kind == ErrorKind::WouldBlock || kind == ErrorKind::TimedOut);
});
assert!(wait > Duration::from_millis(400));
assert!(wait < Duration::from_millis(1600));
}
}
99 changes: 99 additions & 0 deletions src/libstd/net/udp.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ use io::{self, Error, ErrorKind};
use net::{ToSocketAddrs, SocketAddr, IpAddr};
use sys_common::net as net_imp;
use sys_common::{AsInner, FromInner};
use time::Duration;

/// A User Datagram Protocol socket.
///
Expand Down Expand Up @@ -127,6 +128,42 @@ impl UdpSocket {
pub fn set_time_to_live(&self, ttl: i32) -> io::Result<()> {
self.0.time_to_live(ttl)
}

/// Sets the read timeout to the timeout specified.
///
/// If the value specified is `None`, then `read` calls will block
/// indefinitely. It is an error to pass the zero `Duration` to this
/// method.
#[unstable(feature = "socket_timeout", reason = "RFC 1047 - recently added")]
pub fn set_read_timeout(&self, dur: Option<Duration>) -> io::Result<()> {
self.0.set_read_timeout(dur)
}

/// Sets the write timeout to the timeout specified.
///
/// If the value specified is `None`, then `write` calls will block
/// indefinitely. It is an error to pass the zero `Duration` to this
/// method.
#[unstable(feature = "socket_timeout", reason = "RFC 1047 - recently added")]
pub fn set_write_timeout(&self, dur: Option<Duration>) -> io::Result<()> {
self.0.set_write_timeout(dur)
}

/// Returns the read timeout of this socket.
///
/// If the timeout is `None`, then `read` calls will block indefinitely.
#[unstable(feature = "socket_timeout", reason = "RFC 1047 - recently added")]
pub fn read_timeout(&self) -> io::Result<Option<Duration>> {
self.0.read_timeout()
}

/// Returns the write timeout of this socket.
///
/// If the timeout is `None`, then `write` calls will block indefinitely.
#[unstable(feature = "socket_timeout", reason = "RFC 1047 - recently added")]
pub fn write_timeout(&self) -> io::Result<Option<Duration>> {
self.0.write_timeout()
}
}

impl AsInner<net_imp::UdpSocket> for UdpSocket {
Expand All @@ -152,6 +189,7 @@ mod tests {
use net::test::{next_test_ip4, next_test_ip6};
use sync::mpsc::channel;
use sys_common::AsInner;
use time::Duration;
use thread;

fn each_ip(f: &mut FnMut(SocketAddr, SocketAddr)) {
Expand Down Expand Up @@ -321,4 +359,65 @@ mod tests {
socket_addr, name, udpsock_inner);
assert_eq!(format!("{:?}", udpsock), compare);
}

#[test]
fn timeouts() {
let addr = next_test_ip4();

let stream = t!(UdpSocket::bind(&addr));
let dur = Duration::new(15410, 0);

assert_eq!(None, t!(stream.read_timeout()));

t!(stream.set_read_timeout(Some(dur)));
assert_eq!(Some(dur), t!(stream.read_timeout()));

assert_eq!(None, t!(stream.write_timeout()));

t!(stream.set_write_timeout(Some(dur)));
assert_eq!(Some(dur), t!(stream.write_timeout()));

t!(stream.set_read_timeout(None));
assert_eq!(None, t!(stream.read_timeout()));

t!(stream.set_write_timeout(None));
assert_eq!(None, t!(stream.write_timeout()));
}

#[test]
fn test_read_timeout() {
let addr = next_test_ip4();

let mut stream = t!(UdpSocket::bind(&addr));
t!(stream.set_read_timeout(Some(Duration::from_millis(1000))));

let mut buf = [0; 10];
let wait = Duration::span(|| {
let kind = stream.recv_from(&mut buf).err().expect("expected error").kind();
assert!(kind == ErrorKind::WouldBlock || kind == ErrorKind::TimedOut);
});
assert!(wait > Duration::from_millis(400));
assert!(wait < Duration::from_millis(1600));
}

#[test]
fn test_read_with_timeout() {
let addr = next_test_ip4();

let mut stream = t!(UdpSocket::bind(&addr));
t!(stream.set_read_timeout(Some(Duration::from_millis(1000))));

t!(stream.send_to(b"hello world", &addr));

let mut buf = [0; 11];
t!(stream.recv_from(&mut buf));
assert_eq!(b"hello world", &buf[..]);

let wait = Duration::span(|| {
let kind = stream.recv_from(&mut buf).err().expect("expected error").kind();
assert!(kind == ErrorKind::WouldBlock || kind == ErrorKind::TimedOut);
});
assert!(wait > Duration::from_millis(400));
assert!(wait < Duration::from_millis(1600));
}
}
46 changes: 39 additions & 7 deletions src/libstd/sys/common/net.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,12 +20,13 @@ use str::from_utf8;
use sys::c;
use sys::net::{cvt, cvt_r, cvt_gai, Socket, init, wrlen_t};
use sys_common::{AsInner, FromInner, IntoInner};
use time::Duration;

////////////////////////////////////////////////////////////////////////////////
// sockaddr and misc bindings
////////////////////////////////////////////////////////////////////////////////

fn setsockopt<T>(sock: &Socket, opt: c_int, val: c_int,
pub fn setsockopt<T>(sock: &Socket, opt: c_int, val: c_int,
payload: T) -> io::Result<()> {
unsafe {
let payload = &payload as *const T as *const c_void;
Expand All @@ -35,16 +36,15 @@ fn setsockopt<T>(sock: &Socket, opt: c_int, val: c_int,
}
}

#[allow(dead_code)]
fn getsockopt<T: Copy>(sock: &Socket, opt: c_int,
pub fn getsockopt<T: Copy>(sock: &Socket, opt: c_int,
val: c_int) -> io::Result<T> {
unsafe {
let mut slot: T = mem::zeroed();
let mut len = mem::size_of::<T>() as socklen_t;
let ret = try!(cvt(c::getsockopt(*sock.as_inner(), opt, val,
&mut slot as *mut _ as *mut _,
&mut len)));
assert_eq!(ret as usize, mem::size_of::<T>());
try!(cvt(c::getsockopt(*sock.as_inner(), opt, val,
&mut slot as *mut _ as *mut _,
&mut len)));
assert_eq!(len as usize, mem::size_of::<T>());
Ok(slot)
}
}
Expand Down Expand Up @@ -220,6 +220,22 @@ impl TcpStream {
Ok(())
}

pub fn set_read_timeout(&self, dur: Option<Duration>) -> io::Result<()> {
self.inner.set_timeout(dur, libc::SO_RCVTIMEO)
}

pub fn set_write_timeout(&self, dur: Option<Duration>) -> io::Result<()> {
self.inner.set_timeout(dur, libc::SO_SNDTIMEO)
}

pub fn read_timeout(&self) -> io::Result<Option<Duration>> {
self.inner.timeout(libc::SO_RCVTIMEO)
}

pub fn write_timeout(&self) -> io::Result<Option<Duration>> {
self.inner.timeout(libc::SO_SNDTIMEO)
}

pub fn read(&self, buf: &mut [u8]) -> io::Result<usize> {
self.inner.read(buf)
}
Expand Down Expand Up @@ -471,6 +487,22 @@ impl UdpSocket {
pub fn duplicate(&self) -> io::Result<UdpSocket> {
self.inner.duplicate().map(|s| UdpSocket { inner: s })
}

pub fn set_read_timeout(&self, dur: Option<Duration>) -> io::Result<()> {
self.inner.set_timeout(dur, libc::SO_RCVTIMEO)
}

pub fn set_write_timeout(&self, dur: Option<Duration>) -> io::Result<()> {
self.inner.set_timeout(dur, libc::SO_SNDTIMEO)
}

pub fn read_timeout(&self) -> io::Result<Option<Duration>> {
self.inner.timeout(libc::SO_RCVTIMEO)
}

pub fn write_timeout(&self) -> io::Result<Option<Duration>> {
self.inner.timeout(libc::SO_SNDTIMEO)
}
}

impl FromInner<Socket> for UdpSocket {
Expand Down
Loading