diff --git a/CHANGELOG.md b/CHANGELOG.md index c49535ac73a..678b4c336bb 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -4,6 +4,11 @@ initialization. Unless `KademliaConfig::set_replication_factor` is used change has no effect. [PR 1536](https://github.com/libp2p/rust-libp2p/pull/1536) +- `libp2p-tcp`: On listeners started with an IPv6 multi-address the socket + option `IPV6_V6ONLY` is set to true. Instead of relying on IPv4-mapped IPv6 + address support, two listeners can be started if IPv4 and IPv6 should both + be supported. IPv4 listener addresses are not affected by this change. + [PR 1555](https://github.com/libp2p/rust-libp2p/pull/1555) # Version 0.18.1 (2020-04-17) diff --git a/transports/tcp/Cargo.toml b/transports/tcp/Cargo.toml index b1d5d996975..1133941c159 100644 --- a/transports/tcp/Cargo.toml +++ b/transports/tcp/Cargo.toml @@ -17,6 +17,7 @@ get_if_addrs = "0.5.3" ipnet = "2.0.0" libp2p-core = { version = "0.18.0", path = "../../core" } log = "0.4.1" +socket2 = "0.3.12" tokio = { version = "0.2", default-features = false, features = ["tcp"], optional = true } [features] diff --git a/transports/tcp/src/lib.rs b/transports/tcp/src/lib.rs index aaa70f6ed1b..1e870f78c6a 100644 --- a/transports/tcp/src/lib.rs +++ b/transports/tcp/src/lib.rs @@ -39,8 +39,10 @@ use libp2p_core::{ transport::{ListenerEvent, TransportError} }; use log::{debug, trace}; +use socket2::{Socket, Domain, Type}; use std::{ collections::VecDeque, + convert::TryFrom, io, iter::{self, FromIterator}, net::{IpAddr, SocketAddr}, @@ -108,7 +110,22 @@ impl Transport for $tcp_config { async fn do_listen(cfg: $tcp_config, socket_addr: SocketAddr) -> Result>, io::Error>, io::Error>>, io::Error> { - let listener = <$tcp_listener>::bind(&socket_addr).await?; + let socket = if socket_addr.is_ipv4() { + Socket::new(Domain::ipv4(), Type::stream(), Some(socket2::Protocol::tcp()))? + } else { + let s = Socket::new(Domain::ipv6(), Type::stream(), Some(socket2::Protocol::tcp()))?; + s.set_only_v6(true)?; + s + }; + if cfg!(target_family = "unix") { + socket.set_reuse_address(true)?; + } + socket.bind(&socket_addr.into())?; + socket.listen(1024)?; // we may want to make this configurable + + let listener = <$tcp_listener>::try_from(socket.into_tcp_listener()) + .map_err(|e| io::Error::new(io::ErrorKind::Other, e))?; + let local_addr = listener.local_addr()?; let port = local_addr.port(); @@ -485,42 +502,45 @@ mod tests { #[test] #[cfg(feature = "async-std")] fn wildcard_expansion() { - let mut listener = TcpConfig::new() - .listen_on("/ip4/0.0.0.0/tcp/0".parse().unwrap()) - .expect("listener"); - - // Get the first address. - let addr = futures::executor::block_on_stream(listener.by_ref()) - .next() - .expect("some event") - .expect("no error") - .into_new_address() - .expect("listen address"); - - // Process all initial `NewAddress` events and make sure they - // do not contain wildcard address or port. - let server = listener - .take_while(|event| match event.as_ref().unwrap() { - ListenerEvent::NewAddress(a) => { - let mut iter = a.iter(); - match iter.next().expect("ip address") { - Protocol::Ip4(ip) => assert!(!ip.is_unspecified()), - Protocol::Ip6(ip) => assert!(!ip.is_unspecified()), - other => panic!("Unexpected protocol: {}", other) - } - if let Protocol::Tcp(port) = iter.next().expect("port") { - assert_ne!(0, port) - } else { - panic!("No TCP port in address: {}", a) + fn test(addr: Multiaddr) { + let mut listener = TcpConfig::new().listen_on(addr).expect("listener"); + + // Get the first address. + let addr = futures::executor::block_on_stream(listener.by_ref()) + .next() + .expect("some event") + .expect("no error") + .into_new_address() + .expect("listen address"); + + // Process all initial `NewAddress` events and make sure they + // do not contain wildcard address or port. + let server = listener + .take_while(|event| match event.as_ref().unwrap() { + ListenerEvent::NewAddress(a) => { + let mut iter = a.iter(); + match iter.next().expect("ip address") { + Protocol::Ip4(ip) => assert!(!ip.is_unspecified()), + Protocol::Ip6(ip) => assert!(!ip.is_unspecified()), + other => panic!("Unexpected protocol: {}", other) + } + if let Protocol::Tcp(port) = iter.next().expect("port") { + assert_ne!(0, port) + } else { + panic!("No TCP port in address: {}", a) + } + futures::future::ready(true) } - futures::future::ready(true) - } - _ => futures::future::ready(false) - }) - .for_each(|_| futures::future::ready(())); + _ => futures::future::ready(false) + }) + .for_each(|_| futures::future::ready(())); - let client = TcpConfig::new().dial(addr).expect("dialer"); - async_std::task::block_on(futures::future::join(server, client)).1.unwrap(); + let client = TcpConfig::new().dial(addr).expect("dialer"); + async_std::task::block_on(futures::future::join(server, client)).1.unwrap(); + } + + test("/ip4/0.0.0.0/tcp/0".parse().unwrap()); + test("/ip6/::1/tcp/0".parse().unwrap()); } #[test] @@ -575,43 +595,47 @@ mod tests { #[test] #[cfg(feature = "async-std")] fn communicating_between_dialer_and_listener() { - let (ready_tx, ready_rx) = futures::channel::oneshot::channel(); - let mut ready_tx = Some(ready_tx); - - async_std::task::spawn(async move { - let addr = "/ip4/127.0.0.1/tcp/0".parse::().unwrap(); - let tcp = TcpConfig::new(); - let mut listener = tcp.listen_on(addr).unwrap(); - - loop { - match listener.next().await.unwrap().unwrap() { - ListenerEvent::NewAddress(listen_addr) => { - ready_tx.take().unwrap().send(listen_addr).unwrap(); - }, - ListenerEvent::Upgrade { upgrade, .. } => { - let mut upgrade = upgrade.await.unwrap(); - let mut buf = [0u8; 3]; - upgrade.read_exact(&mut buf).await.unwrap(); - assert_eq!(buf, [1, 2, 3]); - upgrade.write_all(&[4, 5, 6]).await.unwrap(); - }, - _ => unreachable!() + fn test(addr: Multiaddr) { + let (ready_tx, ready_rx) = futures::channel::oneshot::channel(); + let mut ready_tx = Some(ready_tx); + + async_std::task::spawn(async move { + let tcp = TcpConfig::new(); + let mut listener = tcp.listen_on(addr).unwrap(); + + loop { + match listener.next().await.unwrap().unwrap() { + ListenerEvent::NewAddress(listen_addr) => { + ready_tx.take().unwrap().send(listen_addr).unwrap(); + }, + ListenerEvent::Upgrade { upgrade, .. } => { + let mut upgrade = upgrade.await.unwrap(); + let mut buf = [0u8; 3]; + upgrade.read_exact(&mut buf).await.unwrap(); + assert_eq!(buf, [1, 2, 3]); + upgrade.write_all(&[4, 5, 6]).await.unwrap(); + }, + _ => unreachable!() + } } - } - }); + }); - async_std::task::block_on(async move { - let addr = ready_rx.await.unwrap(); - let tcp = TcpConfig::new(); + async_std::task::block_on(async move { + let addr = ready_rx.await.unwrap(); + let tcp = TcpConfig::new(); - // Obtain a future socket through dialing - let mut socket = tcp.dial(addr.clone()).unwrap().await.unwrap(); - socket.write_all(&[0x1, 0x2, 0x3]).await.unwrap(); + // Obtain a future socket through dialing + let mut socket = tcp.dial(addr.clone()).unwrap().await.unwrap(); + socket.write_all(&[0x1, 0x2, 0x3]).await.unwrap(); + + let mut buf = [0u8; 3]; + socket.read_exact(&mut buf).await.unwrap(); + assert_eq!(buf, [4, 5, 6]); + }); + } - let mut buf = [0u8; 3]; - socket.read_exact(&mut buf).await.unwrap(); - assert_eq!(buf, [4, 5, 6]); - }); + test("/ip4/127.0.0.1/tcp/0".parse().unwrap()); + test("/ip6/::1/tcp/0".parse().unwrap()); } #[test]