Skip to content

Commit

Permalink
tcp: Set IPV6_V6ONLY for IPv6 listeners. (#1555)
Browse files Browse the repository at this point in the history
* tcp: Set IPV6_V6ONLY for IPv6 listeners.

The current behaviour of listening on an IPv6 address varies depending
on the operating system's IP address stack implementation. Some support
IPv4-mapped IPv6 addresses (e.g. Linux and newer versions of Windows)
so a single IPv6 address would support IPv4-mapped addresses too.
Others do not (e.g. OpenBSD). If they do, then some support them by
default (e.g. Linux) and some do not (e.g. Windows).

This PR attempts to implement the same behaviour accross operating
systems. The strategy is as follows:

Disable IPv4-mapped IPv6 addresses, hence the socket option IPV6_V6ONLY
is always set to true.

This allows binding two sockets to the same port and also avoids the
problem of comparing mixed addresses which leads issues such as #1552.

* Update CHANGELOG and address review concerns.

* Update CHANGELOG.md

Co-Authored-By: Pierre Krieger <pierre.krieger1708@gmail.com>

Co-authored-by: Pierre Krieger <pierre.krieger1708@gmail.com>
  • Loading branch information
twittner and tomaka authored Apr 24, 2020
1 parent f187fd4 commit bdbab32
Show file tree
Hide file tree
Showing 3 changed files with 98 additions and 68 deletions.
5 changes: 5 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,11 @@
initialization. Unless `KademliaConfig::set_replication_factor` is used change
has no effect.
[PR 1536](https://github.com/libp2p/rust-libp2p/pull/1536)
- `libp2p-tcp`: On listeners started with an IPv6 multi-address the socket
option `IPV6_V6ONLY` is set to true. Instead of relying on IPv4-mapped IPv6
address support, two listeners can be started if IPv4 and IPv6 should both
be supported. IPv4 listener addresses are not affected by this change.
[PR 1555](https://github.com/libp2p/rust-libp2p/pull/1555)

# Version 0.18.1 (2020-04-17)

Expand Down
1 change: 1 addition & 0 deletions transports/tcp/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ get_if_addrs = "0.5.3"
ipnet = "2.0.0"
libp2p-core = { version = "0.18.0", path = "../../core" }
log = "0.4.1"
socket2 = "0.3.12"
tokio = { version = "0.2", default-features = false, features = ["tcp"], optional = true }

[features]
Expand Down
160 changes: 92 additions & 68 deletions transports/tcp/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,8 +39,10 @@ use libp2p_core::{
transport::{ListenerEvent, TransportError}
};
use log::{debug, trace};
use socket2::{Socket, Domain, Type};
use std::{
collections::VecDeque,
convert::TryFrom,
io,
iter::{self, FromIterator},
net::{IpAddr, SocketAddr},
Expand Down Expand Up @@ -108,7 +110,22 @@ impl Transport for $tcp_config {
async fn do_listen(cfg: $tcp_config, socket_addr: SocketAddr)
-> Result<impl Stream<Item = Result<ListenerEvent<Ready<Result<$tcp_trans_stream, io::Error>>, io::Error>, io::Error>>, io::Error>
{
let listener = <$tcp_listener>::bind(&socket_addr).await?;
let socket = if socket_addr.is_ipv4() {
Socket::new(Domain::ipv4(), Type::stream(), Some(socket2::Protocol::tcp()))?
} else {
let s = Socket::new(Domain::ipv6(), Type::stream(), Some(socket2::Protocol::tcp()))?;
s.set_only_v6(true)?;
s
};
if cfg!(target_family = "unix") {
socket.set_reuse_address(true)?;
}
socket.bind(&socket_addr.into())?;
socket.listen(1024)?; // we may want to make this configurable

let listener = <$tcp_listener>::try_from(socket.into_tcp_listener())
.map_err(|e| io::Error::new(io::ErrorKind::Other, e))?;

let local_addr = listener.local_addr()?;
let port = local_addr.port();

Expand Down Expand Up @@ -485,42 +502,45 @@ mod tests {
#[test]
#[cfg(feature = "async-std")]
fn wildcard_expansion() {
let mut listener = TcpConfig::new()
.listen_on("/ip4/0.0.0.0/tcp/0".parse().unwrap())
.expect("listener");

// Get the first address.
let addr = futures::executor::block_on_stream(listener.by_ref())
.next()
.expect("some event")
.expect("no error")
.into_new_address()
.expect("listen address");

// Process all initial `NewAddress` events and make sure they
// do not contain wildcard address or port.
let server = listener
.take_while(|event| match event.as_ref().unwrap() {
ListenerEvent::NewAddress(a) => {
let mut iter = a.iter();
match iter.next().expect("ip address") {
Protocol::Ip4(ip) => assert!(!ip.is_unspecified()),
Protocol::Ip6(ip) => assert!(!ip.is_unspecified()),
other => panic!("Unexpected protocol: {}", other)
}
if let Protocol::Tcp(port) = iter.next().expect("port") {
assert_ne!(0, port)
} else {
panic!("No TCP port in address: {}", a)
fn test(addr: Multiaddr) {
let mut listener = TcpConfig::new().listen_on(addr).expect("listener");

// Get the first address.
let addr = futures::executor::block_on_stream(listener.by_ref())
.next()
.expect("some event")
.expect("no error")
.into_new_address()
.expect("listen address");

// Process all initial `NewAddress` events and make sure they
// do not contain wildcard address or port.
let server = listener
.take_while(|event| match event.as_ref().unwrap() {
ListenerEvent::NewAddress(a) => {
let mut iter = a.iter();
match iter.next().expect("ip address") {
Protocol::Ip4(ip) => assert!(!ip.is_unspecified()),
Protocol::Ip6(ip) => assert!(!ip.is_unspecified()),
other => panic!("Unexpected protocol: {}", other)
}
if let Protocol::Tcp(port) = iter.next().expect("port") {
assert_ne!(0, port)
} else {
panic!("No TCP port in address: {}", a)
}
futures::future::ready(true)
}
futures::future::ready(true)
}
_ => futures::future::ready(false)
})
.for_each(|_| futures::future::ready(()));
_ => futures::future::ready(false)
})
.for_each(|_| futures::future::ready(()));

let client = TcpConfig::new().dial(addr).expect("dialer");
async_std::task::block_on(futures::future::join(server, client)).1.unwrap();
let client = TcpConfig::new().dial(addr).expect("dialer");
async_std::task::block_on(futures::future::join(server, client)).1.unwrap();
}

test("/ip4/0.0.0.0/tcp/0".parse().unwrap());
test("/ip6/::1/tcp/0".parse().unwrap());
}

#[test]
Expand Down Expand Up @@ -575,43 +595,47 @@ mod tests {
#[test]
#[cfg(feature = "async-std")]
fn communicating_between_dialer_and_listener() {
let (ready_tx, ready_rx) = futures::channel::oneshot::channel();
let mut ready_tx = Some(ready_tx);

async_std::task::spawn(async move {
let addr = "/ip4/127.0.0.1/tcp/0".parse::<Multiaddr>().unwrap();
let tcp = TcpConfig::new();
let mut listener = tcp.listen_on(addr).unwrap();

loop {
match listener.next().await.unwrap().unwrap() {
ListenerEvent::NewAddress(listen_addr) => {
ready_tx.take().unwrap().send(listen_addr).unwrap();
},
ListenerEvent::Upgrade { upgrade, .. } => {
let mut upgrade = upgrade.await.unwrap();
let mut buf = [0u8; 3];
upgrade.read_exact(&mut buf).await.unwrap();
assert_eq!(buf, [1, 2, 3]);
upgrade.write_all(&[4, 5, 6]).await.unwrap();
},
_ => unreachable!()
fn test(addr: Multiaddr) {
let (ready_tx, ready_rx) = futures::channel::oneshot::channel();
let mut ready_tx = Some(ready_tx);

async_std::task::spawn(async move {
let tcp = TcpConfig::new();
let mut listener = tcp.listen_on(addr).unwrap();

loop {
match listener.next().await.unwrap().unwrap() {
ListenerEvent::NewAddress(listen_addr) => {
ready_tx.take().unwrap().send(listen_addr).unwrap();
},
ListenerEvent::Upgrade { upgrade, .. } => {
let mut upgrade = upgrade.await.unwrap();
let mut buf = [0u8; 3];
upgrade.read_exact(&mut buf).await.unwrap();
assert_eq!(buf, [1, 2, 3]);
upgrade.write_all(&[4, 5, 6]).await.unwrap();
},
_ => unreachable!()
}
}
}
});
});

async_std::task::block_on(async move {
let addr = ready_rx.await.unwrap();
let tcp = TcpConfig::new();
async_std::task::block_on(async move {
let addr = ready_rx.await.unwrap();
let tcp = TcpConfig::new();

// Obtain a future socket through dialing
let mut socket = tcp.dial(addr.clone()).unwrap().await.unwrap();
socket.write_all(&[0x1, 0x2, 0x3]).await.unwrap();
// Obtain a future socket through dialing
let mut socket = tcp.dial(addr.clone()).unwrap().await.unwrap();
socket.write_all(&[0x1, 0x2, 0x3]).await.unwrap();

let mut buf = [0u8; 3];
socket.read_exact(&mut buf).await.unwrap();
assert_eq!(buf, [4, 5, 6]);
});
}

let mut buf = [0u8; 3];
socket.read_exact(&mut buf).await.unwrap();
assert_eq!(buf, [4, 5, 6]);
});
test("/ip4/127.0.0.1/tcp/0".parse().unwrap());
test("/ip6/::1/tcp/0".parse().unwrap());
}

#[test]
Expand Down

0 comments on commit bdbab32

Please sign in to comment.