diff --git a/src/liblibc/lib.rs b/src/liblibc/lib.rs index 3d19e95211e8d..8fe1ddcf27533 100644 --- a/src/liblibc/lib.rs +++ b/src/liblibc/lib.rs @@ -6043,7 +6043,6 @@ pub mod funcs { use types::common::c95::{c_void}; use types::os::common::bsd44::{socklen_t, sockaddr, SOCKET}; use types::os::arch::c95::c_int; - use types::os::arch::posix88::ssize_t; extern "system" { pub fn socket(domain: c_int, ty: c_int, protocol: c_int) -> SOCKET; @@ -6068,7 +6067,7 @@ pub mod funcs { flags: c_int) -> c_int; pub fn recvfrom(socket: SOCKET, buf: *mut c_void, len: c_int, flags: c_int, addr: *mut sockaddr, - addrlen: *mut c_int) -> ssize_t; + addrlen: *mut c_int) -> c_int; pub fn sendto(socket: SOCKET, buf: *const c_void, len: c_int, flags: c_int, addr: *const sockaddr, addrlen: c_int) -> c_int; diff --git a/src/libstd/net/tcp.rs b/src/libstd/net/tcp.rs index 28063c1edb3f7..cffccab7e09d0 100644 --- a/src/libstd/net/tcp.rs +++ b/src/libstd/net/tcp.rs @@ -19,6 +19,7 @@ use io; use net::{ToSocketAddrs, SocketAddr, Shutdown}; use sys_common::net as net_imp; use sys_common::{AsInner, FromInner}; +use time::Duration; /// A structure which represents a TCP stream between a local socket and a /// remote socket. @@ -139,6 +140,50 @@ impl TcpStream { pub fn set_keepalive(&self, seconds: Option) -> io::Result<()> { self.0.set_keepalive(seconds) } + + /// Sets the read timeout to the timeout specified. + /// + /// If the value specified is `None`, then `read` calls will block + /// indefinitely. It is an error to pass the zero `Duration` to this + /// method. + #[unstable(feature = "socket_timeout", reason = "RFC 1047 - recently added")] + pub fn set_read_timeout(&self, dur: Option) -> io::Result<()> { + self.0.set_read_timeout(dur) + } + + /// Sets the write timeout to the timeout specified. + /// + /// If the value specified is `None`, then `write` calls will block + /// indefinitely. It is an error to pass the zero `Duration` to this + /// method. + #[unstable(feature = "socket_timeout", reason = "RFC 1047 - recently added")] + pub fn set_write_timeout(&self, dur: Option) -> io::Result<()> { + self.0.set_write_timeout(dur) + } + + /// Returns the read timeout of this socket. + /// + /// If the timeout is `None`, then `read` calls will block indefinitely. + /// + /// # Note + /// + /// Some platforms do not provide access to the current timeout. + #[unstable(feature = "socket_timeout", reason = "RFC 1047 - recently added")] + pub fn read_timeout(&self) -> io::Result> { + self.0.read_timeout() + } + + /// Returns the write timeout of this socket. + /// + /// If the timeout is `None`, then `write` calls will block indefinitely. + /// + /// # Note + /// + /// Some platforms do not provide access to the current timeout. + #[unstable(feature = "socket_timeout", reason = "RFC 1047 - recently added")] + pub fn write_timeout(&self) -> io::Result> { + self.0.write_timeout() + } } #[stable(feature = "rust1", since = "1.0.0")] @@ -262,6 +307,7 @@ mod tests { use net::test::{next_test_ip4, next_test_ip6}; use sync::mpsc::channel; use sys_common::AsInner; + use time::Duration; use thread; fn each_ip(f: &mut FnMut(SocketAddr)) { @@ -855,4 +901,69 @@ mod tests { stream_inner); assert_eq!(format!("{:?}", stream), compare); } + + #[test] + fn timeouts() { + let addr = next_test_ip4(); + let listener = t!(TcpListener::bind(&addr)); + + let stream = t!(TcpStream::connect(&("localhost", addr.port()))); + let dur = Duration::new(15410, 0); + + assert_eq!(None, t!(stream.read_timeout())); + + t!(stream.set_read_timeout(Some(dur))); + assert_eq!(Some(dur), t!(stream.read_timeout())); + + assert_eq!(None, t!(stream.write_timeout())); + + t!(stream.set_write_timeout(Some(dur))); + assert_eq!(Some(dur), t!(stream.write_timeout())); + + t!(stream.set_read_timeout(None)); + assert_eq!(None, t!(stream.read_timeout())); + + t!(stream.set_write_timeout(None)); + assert_eq!(None, t!(stream.write_timeout())); + } + + #[test] + fn test_read_timeout() { + let addr = next_test_ip4(); + let listener = t!(TcpListener::bind(&addr)); + + let mut stream = t!(TcpStream::connect(&("localhost", addr.port()))); + t!(stream.set_read_timeout(Some(Duration::from_millis(1000)))); + + let mut buf = [0; 10]; + let wait = Duration::span(|| { + let kind = stream.read(&mut buf).err().expect("expected error").kind(); + assert!(kind == ErrorKind::WouldBlock || kind == ErrorKind::TimedOut); + }); + assert!(wait > Duration::from_millis(400)); + assert!(wait < Duration::from_millis(1600)); + } + + #[test] + fn test_read_with_timeout() { + let addr = next_test_ip4(); + let listener = t!(TcpListener::bind(&addr)); + + let mut stream = t!(TcpStream::connect(&("localhost", addr.port()))); + t!(stream.set_read_timeout(Some(Duration::from_millis(1000)))); + + let mut other_end = t!(listener.accept()).0; + t!(other_end.write_all(b"hello world")); + + let mut buf = [0; 11]; + t!(stream.read(&mut buf)); + assert_eq!(b"hello world", &buf[..]); + + let wait = Duration::span(|| { + let kind = stream.read(&mut buf).err().expect("expected error").kind(); + assert!(kind == ErrorKind::WouldBlock || kind == ErrorKind::TimedOut); + }); + assert!(wait > Duration::from_millis(400)); + assert!(wait < Duration::from_millis(1600)); + } } diff --git a/src/libstd/net/udp.rs b/src/libstd/net/udp.rs index 67c7096904d6f..ebabba7def1be 100644 --- a/src/libstd/net/udp.rs +++ b/src/libstd/net/udp.rs @@ -18,6 +18,7 @@ use io::{self, Error, ErrorKind}; use net::{ToSocketAddrs, SocketAddr, IpAddr}; use sys_common::net as net_imp; use sys_common::{AsInner, FromInner}; +use time::Duration; /// A User Datagram Protocol socket. /// @@ -127,6 +128,42 @@ impl UdpSocket { pub fn set_time_to_live(&self, ttl: i32) -> io::Result<()> { self.0.time_to_live(ttl) } + + /// Sets the read timeout to the timeout specified. + /// + /// If the value specified is `None`, then `read` calls will block + /// indefinitely. It is an error to pass the zero `Duration` to this + /// method. + #[unstable(feature = "socket_timeout", reason = "RFC 1047 - recently added")] + pub fn set_read_timeout(&self, dur: Option) -> io::Result<()> { + self.0.set_read_timeout(dur) + } + + /// Sets the write timeout to the timeout specified. + /// + /// If the value specified is `None`, then `write` calls will block + /// indefinitely. It is an error to pass the zero `Duration` to this + /// method. + #[unstable(feature = "socket_timeout", reason = "RFC 1047 - recently added")] + pub fn set_write_timeout(&self, dur: Option) -> io::Result<()> { + self.0.set_write_timeout(dur) + } + + /// Returns the read timeout of this socket. + /// + /// If the timeout is `None`, then `read` calls will block indefinitely. + #[unstable(feature = "socket_timeout", reason = "RFC 1047 - recently added")] + pub fn read_timeout(&self) -> io::Result> { + self.0.read_timeout() + } + + /// Returns the write timeout of this socket. + /// + /// If the timeout is `None`, then `write` calls will block indefinitely. + #[unstable(feature = "socket_timeout", reason = "RFC 1047 - recently added")] + pub fn write_timeout(&self) -> io::Result> { + self.0.write_timeout() + } } impl AsInner for UdpSocket { @@ -152,6 +189,7 @@ mod tests { use net::test::{next_test_ip4, next_test_ip6}; use sync::mpsc::channel; use sys_common::AsInner; + use time::Duration; use thread; fn each_ip(f: &mut FnMut(SocketAddr, SocketAddr)) { @@ -321,4 +359,65 @@ mod tests { socket_addr, name, udpsock_inner); assert_eq!(format!("{:?}", udpsock), compare); } + + #[test] + fn timeouts() { + let addr = next_test_ip4(); + + let stream = t!(UdpSocket::bind(&addr)); + let dur = Duration::new(15410, 0); + + assert_eq!(None, t!(stream.read_timeout())); + + t!(stream.set_read_timeout(Some(dur))); + assert_eq!(Some(dur), t!(stream.read_timeout())); + + assert_eq!(None, t!(stream.write_timeout())); + + t!(stream.set_write_timeout(Some(dur))); + assert_eq!(Some(dur), t!(stream.write_timeout())); + + t!(stream.set_read_timeout(None)); + assert_eq!(None, t!(stream.read_timeout())); + + t!(stream.set_write_timeout(None)); + assert_eq!(None, t!(stream.write_timeout())); + } + + #[test] + fn test_read_timeout() { + let addr = next_test_ip4(); + + let mut stream = t!(UdpSocket::bind(&addr)); + t!(stream.set_read_timeout(Some(Duration::from_millis(1000)))); + + let mut buf = [0; 10]; + let wait = Duration::span(|| { + let kind = stream.recv_from(&mut buf).err().expect("expected error").kind(); + assert!(kind == ErrorKind::WouldBlock || kind == ErrorKind::TimedOut); + }); + assert!(wait > Duration::from_millis(400)); + assert!(wait < Duration::from_millis(1600)); + } + + #[test] + fn test_read_with_timeout() { + let addr = next_test_ip4(); + + let mut stream = t!(UdpSocket::bind(&addr)); + t!(stream.set_read_timeout(Some(Duration::from_millis(1000)))); + + t!(stream.send_to(b"hello world", &addr)); + + let mut buf = [0; 11]; + t!(stream.recv_from(&mut buf)); + assert_eq!(b"hello world", &buf[..]); + + let wait = Duration::span(|| { + let kind = stream.recv_from(&mut buf).err().expect("expected error").kind(); + assert!(kind == ErrorKind::WouldBlock || kind == ErrorKind::TimedOut); + }); + assert!(wait > Duration::from_millis(400)); + assert!(wait < Duration::from_millis(1600)); + } } diff --git a/src/libstd/sys/common/net.rs b/src/libstd/sys/common/net.rs index efbcda3fecec0..5890e6a78892c 100644 --- a/src/libstd/sys/common/net.rs +++ b/src/libstd/sys/common/net.rs @@ -20,12 +20,13 @@ use str::from_utf8; use sys::c; use sys::net::{cvt, cvt_r, cvt_gai, Socket, init, wrlen_t}; use sys_common::{AsInner, FromInner, IntoInner}; +use time::Duration; //////////////////////////////////////////////////////////////////////////////// // sockaddr and misc bindings //////////////////////////////////////////////////////////////////////////////// -fn setsockopt(sock: &Socket, opt: c_int, val: c_int, +pub fn setsockopt(sock: &Socket, opt: c_int, val: c_int, payload: T) -> io::Result<()> { unsafe { let payload = &payload as *const T as *const c_void; @@ -35,16 +36,15 @@ fn setsockopt(sock: &Socket, opt: c_int, val: c_int, } } -#[allow(dead_code)] -fn getsockopt(sock: &Socket, opt: c_int, +pub fn getsockopt(sock: &Socket, opt: c_int, val: c_int) -> io::Result { unsafe { let mut slot: T = mem::zeroed(); let mut len = mem::size_of::() as socklen_t; - let ret = try!(cvt(c::getsockopt(*sock.as_inner(), opt, val, - &mut slot as *mut _ as *mut _, - &mut len))); - assert_eq!(ret as usize, mem::size_of::()); + try!(cvt(c::getsockopt(*sock.as_inner(), opt, val, + &mut slot as *mut _ as *mut _, + &mut len))); + assert_eq!(len as usize, mem::size_of::()); Ok(slot) } } @@ -220,6 +220,22 @@ impl TcpStream { Ok(()) } + pub fn set_read_timeout(&self, dur: Option) -> io::Result<()> { + self.inner.set_timeout(dur, libc::SO_RCVTIMEO) + } + + pub fn set_write_timeout(&self, dur: Option) -> io::Result<()> { + self.inner.set_timeout(dur, libc::SO_SNDTIMEO) + } + + pub fn read_timeout(&self) -> io::Result> { + self.inner.timeout(libc::SO_RCVTIMEO) + } + + pub fn write_timeout(&self) -> io::Result> { + self.inner.timeout(libc::SO_SNDTIMEO) + } + pub fn read(&self, buf: &mut [u8]) -> io::Result { self.inner.read(buf) } @@ -471,6 +487,22 @@ impl UdpSocket { pub fn duplicate(&self) -> io::Result { self.inner.duplicate().map(|s| UdpSocket { inner: s }) } + + pub fn set_read_timeout(&self, dur: Option) -> io::Result<()> { + self.inner.set_timeout(dur, libc::SO_RCVTIMEO) + } + + pub fn set_write_timeout(&self, dur: Option) -> io::Result<()> { + self.inner.set_timeout(dur, libc::SO_SNDTIMEO) + } + + pub fn read_timeout(&self) -> io::Result> { + self.inner.timeout(libc::SO_RCVTIMEO) + } + + pub fn write_timeout(&self) -> io::Result> { + self.inner.timeout(libc::SO_SNDTIMEO) + } } impl FromInner for UdpSocket { diff --git a/src/libstd/sys/unix/net.rs b/src/libstd/sys/unix/net.rs index 2e1cbb2a1e127..1f40c18be2f10 100644 --- a/src/libstd/sys/unix/net.rs +++ b/src/libstd/sys/unix/net.rs @@ -18,6 +18,8 @@ use sys::c; use net::SocketAddr; use sys::fd::FileDesc; use sys_common::{AsInner, FromInner}; +use sys_common::net::{getsockopt, setsockopt}; +use time::Duration; pub use sys::{cvt, cvt_r}; @@ -73,6 +75,49 @@ impl Socket { pub fn read(&self, buf: &mut [u8]) -> io::Result { self.0.read(buf) } + + pub fn set_timeout(&self, dur: Option, kind: libc::c_int) -> io::Result<()> { + let timeout = match dur { + Some(dur) => { + if dur.secs() == 0 && dur.extra_nanos() == 0 { + return Err(io::Error::new(io::ErrorKind::InvalidInput, + "cannot set a 0 duration timeout")); + } + + let secs = if dur.secs() > libc::time_t::max_value() as u64 { + libc::time_t::max_value() + } else { + dur.secs() as libc::time_t + }; + let mut timeout = libc::timeval { + tv_sec: secs, + tv_usec: (dur.extra_nanos() / 1000) as libc::suseconds_t, + }; + if timeout.tv_sec == 0 && timeout.tv_usec == 0 { + timeout.tv_usec = 1; + } + timeout + } + None => { + libc::timeval { + tv_sec: 0, + tv_usec: 0, + } + } + }; + setsockopt(self, libc::SOL_SOCKET, kind, timeout) + } + + pub fn timeout(&self, kind: libc::c_int) -> io::Result> { + let raw: libc::timeval = try!(getsockopt(self, libc::SOL_SOCKET, kind)); + if raw.tv_sec == 0 && raw.tv_usec == 0 { + Ok(None) + } else { + let sec = raw.tv_sec as u64; + let nsec = (raw.tv_usec as u32) * 1000; + Ok(Some(Duration::new(sec, nsec))) + } + } } impl AsInner for Socket { diff --git a/src/libstd/sys/windows/mod.rs b/src/libstd/sys/windows/mod.rs index 6b7bff2c1c6f8..18c8add17a6d6 100644 --- a/src/libstd/sys/windows/mod.rs +++ b/src/libstd/sys/windows/mod.rs @@ -60,6 +60,7 @@ pub fn decode_error_kind(errno: i32) -> ErrorKind { libc::WSAEINVAL => ErrorKind::InvalidInput, libc::WSAENOTCONN => ErrorKind::NotConnected, libc::WSAEWOULDBLOCK => ErrorKind::WouldBlock, + libc::WSAETIMEDOUT => ErrorKind::TimedOut, _ => ErrorKind::Other, } diff --git a/src/libstd/sys/windows/net.rs b/src/libstd/sys/windows/net.rs index 71e064bcc6b82..0b9052672369d 100644 --- a/src/libstd/sys/windows/net.rs +++ b/src/libstd/sys/windows/net.rs @@ -19,8 +19,11 @@ use num::One; use ops::Neg; use rt; use sync::Once; +use sys; use sys::c; use sys_common::{AsInner, FromInner}; +use sys_common::net::{setsockopt, getsockopt}; +use time::Duration; pub type wrlen_t = i32; @@ -127,6 +130,32 @@ impl Socket { } } } + + pub fn set_timeout(&self, dur: Option, kind: libc::c_int) -> io::Result<()> { + let timeout = match dur { + Some(dur) => { + let timeout = sys::dur2timeout(dur); + if timeout == 0 { + return Err(io::Error::new(io::ErrorKind::InvalidInput, + "cannot set a 0 duration timeout")); + } + timeout + } + None => 0 + }; + setsockopt(self, libc::SOL_SOCKET, kind, timeout) + } + + pub fn timeout(&self, kind: libc::c_int) -> io::Result> { + let raw: libc::DWORD = try!(getsockopt(self, libc::SOL_SOCKET, kind)); + if raw == 0 { + Ok(None) + } else { + let secs = raw / 1000; + let nsec = (raw % 1000) * 1000000; + Ok(Some(Duration::new(secs as u64, nsec as u32))) + } + } } impl Drop for Socket {