Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Transport trait #1789

Closed
wants to merge 17 commits into from
12 changes: 7 additions & 5 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
name = "libp2p"
edition = "2018"
description = "Peer-to-peer networking library"
version = "0.29.0"
version = "0.28.1"
authors = ["Parity Technologies <admin@parity.io>"]
license = "MIT"
repository = "https://github.com/libp2p/rust-libp2p"
Expand All @@ -25,7 +25,7 @@ default = [
"pnet",
"request-response",
"secp256k1",
"tcp-async-std",
"tcp",
"uds",
"wasm-ext",
"websocket",
Expand All @@ -45,8 +45,7 @@ ping = ["libp2p-ping"]
plaintext = ["libp2p-plaintext"]
pnet = ["libp2p-pnet"]
request-response = ["libp2p-request-response"]
tcp-async-std = ["libp2p-tcp", "libp2p-tcp/async-std"]
tcp-tokio = ["libp2p-tcp", "libp2p-tcp/tokio"]
tcp = ["libp2p-tcp"]
uds = ["libp2p-uds"]
wasm-ext = ["libp2p-wasm-ext"]
wasm-ext-websocket = ["wasm-ext", "libp2p-wasm-ext/websocket"]
Expand Down Expand Up @@ -126,4 +125,7 @@ members = [

[[example]]
name = "chat-tokio"
required-features = ["tcp-tokio", "mdns-tokio"]
required-features = ["tcp", "mdns-tokio"]

[patch.crates-io]
async-io = { git = "https://github.com/stjepang/async-io", branch = "poll-methods" }
2 changes: 1 addition & 1 deletion core/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ ring = { version = "0.16.9", features = ["alloc", "std"], default-features = fal
async-std = "1.6.2"
libp2p-mplex = { path = "../muxers/mplex" }
libp2p-noise = { path = "../protocols/noise" }
libp2p-tcp = { path = "../transports/tcp", features = ["async-std"] }
libp2p-tcp = { path = "../transports/tcp" }
quickcheck = "0.9.0"
wasm-timer = "0.2"

Expand Down
107 changes: 91 additions & 16 deletions core/src/connection/listeners.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@

//! Manage listening on multiple multiaddresses at once.

use crate::{Multiaddr, Transport, transport::{TransportError, ListenerEvent}};
use crate::{address_translation, Dialer, Multiaddr, Transport, transport::{TransportError, ListenerEvent}};
use futures::{prelude::*, task::Context, task::Poll};
use log::debug;
use smallvec::SmallVec;
Expand Down Expand Up @@ -110,6 +110,8 @@ where
/// The object that actually listens.
#[pin]
listener: TTrans::Listener,
/// The object that dials.
dialer: TTrans::Dialer,
/// Addresses it is listening on.
addresses: SmallVec<[Multiaddr; 4]>
}
Expand Down Expand Up @@ -196,10 +198,11 @@ where
where
TTrans: Clone,
{
let listener = self.transport.clone().listen_on(addr)?;
let (listener, dialer) = self.transport.clone().listen_on(addr)?;
self.listeners.push_back(Box::pin(Listener {
id: self.next_id,
listener,
dialer,
addresses: SmallVec::new()
}));
let id = self.next_id;
Expand Down Expand Up @@ -229,6 +232,42 @@ where
self.listeners.iter().flat_map(|l| l.addresses.iter())
}

/// Perform address translation.
pub fn address_translation(&self, observed_addr: &Multiaddr) -> Vec<Multiaddr> {
let mut addrs = Vec::with_capacity(4 * self.listeners.len());
for listener in &self.listeners {
if listener.dialer.requires_address_translation() {
for addr in &listener.addresses {
if let Some(new_addr) = address_translation(addr, observed_addr) {
if addrs.iter().find(|addr| *addr == &new_addr).is_none() {
addrs.push(new_addr);
}
}
}
} else {
if addrs.iter().find(|addr| *addr == observed_addr).is_none() {
addrs.push(observed_addr.clone());
}
}
}
addrs
}

/// Returns a dialer for an address.
pub fn dialer_for_addr(&self, addr: &Multiaddr) -> TTrans::Dialer {
if self.listen_addrs().find(|addr2| *addr2 == addr).is_some() {
return self.transport.dialer();
}

self.listeners.iter().filter_map(|listener| {
listener.addresses.iter().find(|address| {
address.can_dial(addr)
}).map(|_| listener.dialer.clone())
})
.next()
.unwrap_or_else(|| self.transport.dialer())
}

/// Provides an API similar to `Stream`, except that it cannot end.
pub fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<ListenersEvent<TTrans>> {
// We remove each element from `listeners` one by one and add them back.
Expand Down Expand Up @@ -373,7 +412,7 @@ where
#[cfg(test)]
mod tests {
use super::*;
use crate::transport;
use crate::transport::{self, Dialer, ListenerEvent};

#[test]
fn incoming_event() {
Expand All @@ -394,7 +433,7 @@ mod tests {

let address2 = address.clone();
async_std::task::spawn(async move {
mem_transport.dial(address2).unwrap().await.unwrap();
mem_transport.dialer().dial(address2).unwrap().await.unwrap();
});

match listeners.next().await.unwrap() {
Expand All @@ -417,20 +456,38 @@ mod tests {
impl transport::Transport for DummyTrans {
type Output = ();
type Error = std::io::Error;
type Listener = Pin<Box<dyn Stream<Item = Result<ListenerEvent<Self::ListenerUpgrade, std::io::Error>, std::io::Error>>>>;
type ListenerUpgrade = Pin<Box<dyn Future<Output = Result<Self::Output, Self::Error>>>>;
type Dial = Pin<Box<dyn Future<Output = Result<Self::Output, Self::Error>>>>;
type Dialer = DummyDialer;
type Listener = DummyListener;
type ListenerUpgrade = future::Ready<Result<Self::Output, Self::Error>>;

fn listen_on(self, _: Multiaddr) -> Result<Self::Listener, transport::TransportError<Self::Error>> {
Ok(Box::pin(stream::unfold((), |()| async move {
Some((Ok(ListenerEvent::Error(std::io::Error::from(std::io::ErrorKind::Other))), ()))
})))
fn dialer(&self) -> Self::Dialer {
DummyDialer
}

fn listen_on(self, _: Multiaddr) -> Result<(Self::Listener, Self::Dialer), transport::TransportError<Self::Error>> {
Ok((DummyListener, DummyDialer))
}
}
#[derive(Clone)]
struct DummyDialer;
impl transport::Dialer for DummyDialer {
type Output = ();
type Error = std::io::Error;
type Dial = Pin<Box<dyn Future<Output = Result<Self::Output, Self::Error>>>>;

fn dial(self, _: Multiaddr) -> Result<Self::Dial, transport::TransportError<Self::Error>> {
panic!()
}
}
struct DummyListener;
impl Stream for DummyListener {
type Item = Result<ListenerEvent<future::Ready<Result<(), std::io::Error>>, std::io::Error>, std::io::Error>;

fn poll_next(self: Pin<&mut Self>, _: &mut Context) -> Poll<Option<Self::Item>> {
Poll::Ready(Some(Ok(ListenerEvent::Error(std::io::Error::from(std::io::ErrorKind::Other)))))
}
}

async_std::task::block_on(async move {
let transport = DummyTrans;
Expand All @@ -455,20 +512,38 @@ mod tests {
impl transport::Transport for DummyTrans {
type Output = ();
type Error = std::io::Error;
type Listener = Pin<Box<dyn Stream<Item = Result<ListenerEvent<Self::ListenerUpgrade, std::io::Error>, std::io::Error>>>>;
type ListenerUpgrade = Pin<Box<dyn Future<Output = Result<Self::Output, Self::Error>>>>;
type Dial = Pin<Box<dyn Future<Output = Result<Self::Output, Self::Error>>>>;
type Dialer = DummyDialer;
type Listener = DummyListener;
type ListenerUpgrade = future::Ready<Result<Self::Output, Self::Error>>;

fn listen_on(self, _: Multiaddr) -> Result<Self::Listener, transport::TransportError<Self::Error>> {
Ok(Box::pin(stream::unfold((), |()| async move {
Some((Err(std::io::Error::from(std::io::ErrorKind::Other)), ()))
})))
fn dialer(&self) -> Self::Dialer {
DummyDialer
}

fn listen_on(self, _: Multiaddr) -> Result<(Self::Listener, Self::Dialer), transport::TransportError<Self::Error>> {
Ok((DummyListener, DummyDialer))
}
}
#[derive(Clone)]
struct DummyDialer;
impl transport::Dialer for DummyDialer {
type Output = ();
type Error = std::io::Error;
type Dial = Pin<Box<dyn Future<Output = Result<Self::Output, Self::Error>>>>;

fn dial(self, _: Multiaddr) -> Result<Self::Dial, transport::TransportError<Self::Error>> {
panic!()
}
}
struct DummyListener;
impl Stream for DummyListener {
type Item = Result<ListenerEvent<future::Ready<Result<(), std::io::Error>>, std::io::Error>, std::io::Error>;

fn poll_next(self: Pin<&mut Self>, _: &mut Context) -> Poll<Option<Self::Item>> {
Poll::Ready(Some(Err(std::io::Error::from(std::io::ErrorKind::Other))))
}
}

async_std::task::block_on(async move {
let transport = DummyTrans;
Expand Down
58 changes: 41 additions & 17 deletions core/src/either.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
use crate::{
muxing::{StreamMuxer, StreamMuxerEvent},
ProtocolName,
transport::{Transport, ListenerEvent, TransportError},
transport::{Dialer, Transport, ListenerEvent, TransportError},
Multiaddr
};
use futures::{prelude::*, io::{IoSlice, IoSliceMut}};
Expand Down Expand Up @@ -430,51 +430,75 @@ impl<A: ProtocolName, B: ProtocolName> ProtocolName for EitherName<A, B> {
}

#[derive(Debug, Copy, Clone)]
pub enum EitherTransport<A, B> {
pub enum EitherDialer<A, B> {
Left(A),
Right(B),
}

impl<A, B> Transport for EitherTransport<A, B>
impl<A, B> Dialer for EitherDialer<A, B>
where
B: Transport,
A: Transport,
B: Dialer,
A: Dialer,
{
type Output = EitherOutput<A::Output, B::Output>;
type Error = EitherError<A::Error, B::Error>;
type Listener = EitherListenStream<A::Listener, B::Listener>;
type ListenerUpgrade = EitherFuture<A::ListenerUpgrade, B::ListenerUpgrade>;
type Dial = EitherFuture<A::Dial, B::Dial>;

fn listen_on(self, addr: Multiaddr) -> Result<Self::Listener, TransportError<Self::Error>> {
fn dial(self, addr: Multiaddr) -> Result<Self::Dial, TransportError<Self::Error>> {
use TransportError::*;
match self {
EitherTransport::Left(a) => match a.listen_on(addr) {
Ok(listener) => Ok(EitherListenStream::First(listener)),
EitherDialer::Left(a) => match a.dial(addr) {
Ok(connec) => Ok(EitherFuture::First(connec)),
Err(MultiaddrNotSupported(addr)) => Err(MultiaddrNotSupported(addr)),
Err(Other(err)) => Err(Other(EitherError::A(err))),
},
EitherTransport::Right(b) => match b.listen_on(addr) {
Ok(listener) => Ok(EitherListenStream::Second(listener)),
EitherDialer::Right(b) => match b.dial(addr) {
Ok(connec) => Ok(EitherFuture::Second(connec)),
Err(MultiaddrNotSupported(addr)) => Err(MultiaddrNotSupported(addr)),
Err(Other(err)) => Err(Other(EitherError::B(err))),
},
}
}
}

fn dial(self, addr: Multiaddr) -> Result<Self::Dial, TransportError<Self::Error>> {
#[derive(Debug, Copy, Clone)]
pub enum EitherTransport<A, B> {
Left(A),
Right(B),
}

impl<A, B> Transport for EitherTransport<A, B>
where
B: Transport,
A: Transport,
{
type Output = EitherOutput<A::Output, B::Output>;
type Error = EitherError<A::Error, B::Error>;
type Dial = EitherFuture<A::Dial, B::Dial>;
type Dialer = EitherDialer<A::Dialer, B::Dialer>;
type Listener = EitherListenStream<A::Listener, B::Listener>;
type ListenerUpgrade = EitherFuture<A::ListenerUpgrade, B::ListenerUpgrade>;

fn listen_on(self, addr: Multiaddr) -> Result<(Self::Listener, Self::Dialer), TransportError<Self::Error>> {
use TransportError::*;
match self {
EitherTransport::Left(a) => match a.dial(addr) {
Ok(connec) => Ok(EitherFuture::First(connec)),
EitherTransport::Left(a) => match a.listen_on(addr) {
Ok((listener, dialer)) => Ok((EitherListenStream::First(listener), EitherDialer::Left(dialer))),
Err(MultiaddrNotSupported(addr)) => Err(MultiaddrNotSupported(addr)),
Err(Other(err)) => Err(Other(EitherError::A(err))),
},
EitherTransport::Right(b) => match b.dial(addr) {
Ok(connec) => Ok(EitherFuture::Second(connec)),
EitherTransport::Right(b) => match b.listen_on(addr) {
Ok((listener, dialer)) => Ok((EitherListenStream::Second(listener), EitherDialer::Right(dialer))),
Err(MultiaddrNotSupported(addr)) => Err(MultiaddrNotSupported(addr)),
Err(Other(err)) => Err(Other(EitherError::B(err))),
},
}
}

fn dialer(&self) -> Self::Dialer {
match self {
EitherTransport::Left(a) => EitherDialer::Left(a.dialer()),
EitherTransport::Right(b) => EitherDialer::Right(b.dialer()),
}
}
}
2 changes: 1 addition & 1 deletion core/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ pub use multiaddr::Multiaddr;
pub use muxing::StreamMuxer;
pub use peer_id::PeerId;
pub use identity::PublicKey;
pub use transport::Transport;
pub use transport::{Dialer, Transport};
pub use translation::address_translation;
pub use upgrade::{InboundUpgrade, OutboundUpgrade, UpgradeInfo, UpgradeError, ProtocolName};
pub use connection::{Connected, Endpoint, ConnectedPoint, ConnectionInfo};
Expand Down
Loading