diff --git a/core/src/lib.rs b/core/src/lib.rs index 0dfa8c57b61..c3276415c67 100644 --- a/core/src/lib.rs +++ b/core/src/lib.rs @@ -60,6 +60,7 @@ pub use identity::PublicKey; pub use transport::Transport; pub use translation::address_translation; pub use upgrade::{InboundUpgrade, OutboundUpgrade, UpgradeInfo, UpgradeError, ProtocolName}; +pub use nodes::ConnectionInfo; #[derive(Debug, Copy, Clone, PartialEq, Eq, Hash)] pub enum Endpoint { diff --git a/core/src/nodes/network.rs b/core/src/nodes/network.rs index 5b2eb63fed1..abe9e631681 100644 --- a/core/src/nodes/network.rs +++ b/core/src/nodes/network.rs @@ -782,9 +782,8 @@ where TTrans: Transport, TTrans::Error: Send + 'static, TTrans::Dial: Send + 'static, - TMuxer: StreamMuxer + Send + Sync + 'static, + TMuxer: Send + Sync + 'static, TMuxer::OutboundSubstream: Send, - TMuxer::Substream: Send, TInEvent: Send + 'static, TOutEvent: Send + 'static, TConnInfo: Send + 'static, @@ -937,12 +936,10 @@ where TTrans: Transport, TTrans::Dial: Send + 'static, TTrans::Error: Send + 'static, - TMuxer: StreamMuxer + Send + Sync + 'static, + TMuxer: Send + Sync + 'static, TMuxer::OutboundSubstream: Send, - TMuxer::Substream: Send, TInEvent: Send + 'static, TOutEvent: Send + 'static, - TConnInfo: Send + 'static, TPeerId: Send + 'static, { let reach_id = match self.transport().clone().dial(first.clone()) { @@ -985,14 +982,12 @@ where TTrans::Error: Send + 'static, TTrans::Dial: Send + 'static, TTrans::ListenerUpgrade: Send + 'static, - TMuxer: StreamMuxer + Send + Sync + 'static, + TMuxer: Send + Sync + 'static, TMuxer::OutboundSubstream: Send, - TMuxer::Substream: Send, TInEvent: Send + 'static, TOutEvent: Send + 'static, THandler: IntoNodeHandler<(TConnInfo, ConnectedPoint)> + Send + 'static, THandler::Handler: NodeHandler, InEvent = TInEvent, OutEvent = TOutEvent, Error = THandlerErr> + Send + 'static, - ::OutboundOpenInfo: Send + 'static, // TODO: shouldn't be necessary THandlerErr: error::Error + Send + 'static, TConnInfo: Clone, TPeerId: AsRef<[u8]> + Send + 'static, @@ -1151,7 +1146,6 @@ where TTrans: Transport + Clone, TMuxer: StreamMuxer + Send + Sync + 'static, TMuxer::OutboundSubstream: Send, - TMuxer::Substream: Send, TInEvent: Send + 'static, TOutEvent: Send + 'static, TConnInfo: ConnectionInfo + Clone + Send + 'static, diff --git a/core/src/transport/mod.rs b/core/src/transport/mod.rs index 27e4753b410..6cc9dc7bb30 100644 --- a/core/src/transport/mod.rs +++ b/core/src/transport/mod.rs @@ -25,12 +25,11 @@ //! any desired protocols. The rest of the module defines combinators for //! modifying a transport through composition with other transports or protocol upgrades. -use crate::{InboundUpgrade, OutboundUpgrade, ConnectedPoint}; +use crate::ConnectedPoint; use futures::prelude::*; use multiaddr::Multiaddr; -use std::{error, fmt}; +use std::{error::Error, fmt}; use std::time::Duration; -use tokio_io::{AsyncRead, AsyncWrite}; pub mod and_then; pub mod boxed; @@ -69,11 +68,7 @@ pub use self::upgrade::Upgrade; /// /// Additional protocols can be layered on top of the connections established /// by a [`Transport`] through an upgrade mechanism that is initiated via -/// [`with_upgrade`](Transport::with_upgrade) and optionally followed by further upgrades -/// through chaining calls to [`with_upgrade`](Transport::with_upgrade) and -/// [`and_then`](Transport::and_then). Thereby every upgrade yields a new [`Transport`] -/// whose connection setup incorporates all earlier upgrades followed by the new upgrade, -/// i.e. the order of the upgrades is significant. +/// [`upgrade`](Transport::upgrade). /// /// > **Note**: The methods of this trait use `self` and not `&self` or `&mut self`. In other /// > words, listening or dialing consumes the transport object. This has been designed @@ -88,7 +83,7 @@ pub trait Transport { type Output; /// An error that occurred during connection setup. - type Error: error::Error; + type Error: Error; /// A stream of [`Output`](Transport::Output)s for inbound connections. /// @@ -127,7 +122,7 @@ pub trait Transport { where Self: Sized; - /// Turns this `Transport` into an abstract boxed transport. + /// Turns the transport into an abstract boxed (i.e. heap-allocated) transport. fn boxed(self) -> boxed::Boxed where Self: Sized + Clone + Send + Sync + 'static, Self::Dial: Send + 'static, @@ -138,93 +133,89 @@ pub trait Transport { } /// Applies a function on the connections created by the transport. - fn map(self, map: F) -> map::Map + fn map(self, f: F) -> map::Map where Self: Sized, F: FnOnce(Self::Output, ConnectedPoint) -> O + Clone { - map::Map::new(self, map) + map::Map::new(self, f) } /// Applies a function on the errors generated by the futures of the transport. - fn map_err(self, map_err: F) -> map_err::MapErr + fn map_err(self, f: F) -> map_err::MapErr where Self: Sized, - F: FnOnce(Self::Error) -> TNewErr + Clone + F: FnOnce(Self::Error) -> E + Clone { - map_err::MapErr::new(self, map_err) + map_err::MapErr::new(self, f) } - /// Builds a new transport that falls back to another transport when - /// encountering errors on dialing or listening for connections. + /// Adds a fallback transport that is used when encountering errors + /// while establishing inbound or outbound connections. /// /// The returned transport will act like `self`, except that if `listen_on` or `dial` /// return an error then `other` will be tried. - fn or_transport(self, other: T) -> OrTransport + fn or_transport(self, other: U) -> OrTransport where Self: Sized, + U: Transport, + ::Error: 'static { OrTransport::new(self, other) } - /// Wraps this transport inside an [`Upgrade`]. - /// - /// Whenever an inbound or outbound connection is established by this - /// transport, the upgrade is applied on the current state of the - /// connection (which may have already gone through previous upgrades) - /// as an [`upgrade::InboundUpgrade`] or [`upgrade::OutboundUpgrade`], - /// respectively. - fn with_upgrade(self, upgrade: U) -> Upgrade - where - Self: Sized, - Self::Output: AsyncRead + AsyncWrite, - U: InboundUpgrade, - U: OutboundUpgrade - { - Upgrade::new(self, upgrade) - } - /// Applies a function producing an asynchronous result to every connection /// created by this transport. /// - /// This function can be used for ad-hoc protocol upgrades on a transport or - /// for processing or adapting the output of an earlier upgrade before - /// applying the next upgrade. - fn and_then(self, upgrade: C) -> and_then::AndThen + /// This function can be used for ad-hoc protocol upgrades or + /// for processing or adapting the output for following configurations. + /// + /// For the high-level transport upgrade procedure, see [`Transport::upgrade`]. + fn and_then(self, f: C) -> and_then::AndThen where Self: Sized, C: FnOnce(Self::Output, ConnectedPoint) -> F + Clone, - F: IntoFuture + F: IntoFuture, + ::Error: Error + 'static { - and_then::AndThen::new(self, upgrade) + and_then::AndThen::new(self, f) } - /// Adds a timeout to the connection setup (including upgrades) for all inbound - /// and outbound connection attempts. - fn with_timeout(self, timeout: Duration) -> timeout::TransportTimeout + /// Adds a timeout to the connection setup (including upgrades) for all + /// inbound and outbound connections established through the transport. + fn timeout(self, timeout: Duration) -> timeout::TransportTimeout where - Self: Sized, + Self: Sized { timeout::TransportTimeout::new(self, timeout) } /// Adds a timeout to the connection setup (including upgrades) for all outbound - /// connection attempts. - fn with_outbound_timeout(self, timeout: Duration) -> timeout::TransportTimeout + /// connections established through the transport. + fn outbound_timeout(self, timeout: Duration) -> timeout::TransportTimeout where - Self: Sized, + Self: Sized { timeout::TransportTimeout::with_outgoing_timeout(self, timeout) } /// Adds a timeout to the connection setup (including upgrades) for all inbound - /// connection attempts. - fn with_inbound_timeout(self, timeout: Duration) -> timeout::TransportTimeout + /// connections established through the transport. + fn inbound_timeout(self, timeout: Duration) -> timeout::TransportTimeout where - Self: Sized, + Self: Sized { timeout::TransportTimeout::with_ingoing_timeout(self, timeout) } + + /// Begins a series of protocol upgrades via an [`upgrade::Builder`]. + fn upgrade(self) -> upgrade::Builder + where + Self: Sized, + Self::Error: 'static + { + upgrade::Builder::new(self) + } } /// Event produced by [`Transport::Listener`]s. @@ -362,10 +353,10 @@ where TErr: fmt::Display, } } -impl error::Error for TransportError -where TErr: error::Error + 'static, +impl Error for TransportError +where TErr: Error + 'static, { - fn source(&self) -> Option<&(dyn error::Error + 'static)> { + fn source(&self) -> Option<&(dyn Error + 'static)> { match self { TransportError::MultiaddrNotSupported(_) => None, TransportError::Other(err) => Some(err), diff --git a/core/src/transport/upgrade.rs b/core/src/transport/upgrade.rs index 68406bd00be..4a4535ff381 100644 --- a/core/src/transport/upgrade.rs +++ b/core/src/transport/upgrade.rs @@ -1,4 +1,4 @@ -// Copyright 2017-2018 Parity Technologies (UK) Ltd. +// Copyright 2017-2019 Parity Technologies (UK) Ltd. // // Permission is hereby granted, free of charge, to any person obtaining a // copy of this software and associated documentation files (the "Software"), @@ -18,9 +18,20 @@ // FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER // DEALINGS IN THE SOFTWARE. +//! Configuration of transport protocol upgrades. + use crate::{ - transport::{Transport, TransportError, ListenerEvent}, + ConnectedPoint, + ConnectionInfo, + transport::{ + Transport, + TransportError, + ListenerEvent, + and_then::AndThen, + }, + muxing::StreamMuxer, upgrade::{ + self, OutboundUpgrade, InboundUpgrade, apply_inbound, @@ -30,11 +41,193 @@ use crate::{ InboundUpgradeApply } }; -use futures::{future::Either, prelude::*, try_ready}; +use futures::{future, prelude::*, try_ready}; use multiaddr::Multiaddr; -use std::{error, fmt}; +use std::{error::Error, fmt}; use tokio_io::{AsyncRead, AsyncWrite}; +/// A `Builder` facilitates upgrading of a [`Transport`] for use with +/// a [`Network`]. +/// +/// The upgrade process is defined by the following stages: +/// +/// [`authenticate`](Builder::authenticate)`{1}` +/// -> [`apply`](Builder::apply)`{*}` +/// -> [`multiplex`](Builder::multiplex)`{1}` +/// +/// It thus enforces the following invariants on every transport +/// obtained from [`multiplex`](Builder::multiplex): +/// +/// 1. The transport must be [authenticated](Builder::authenticate) +/// and [multiplexed](Builder::multiplex). +/// 2. Authentication must precede the negotiation of a multiplexer. +/// 3. Applying a multiplexer is the last step in the upgrade process. +/// 4. The [`Transport::Output`] conforms to the requirements of a [`Network`], +/// namely a tuple of a [`ConnectionInfo`] (from the authentication upgrade) and a +/// [`StreamMuxer`] (from the multiplexing upgrade). +/// +/// [`Network`]: crate::nodes::Network +pub struct Builder { + inner: T +} + +impl Builder +where + T: Transport, + T::Error: 'static, +{ + /// Creates a `Builder` over the given (base) `Transport`. + pub fn new(transport: T) -> Builder { + Builder { inner: transport } + } + + /// Upgrades the transport to perform authentication of the remote. + /// + /// The supplied upgrade receives the I/O resource `C` and must + /// produce a pair `(I, D)`, where `I` is a [`ConnectionInfo`] and + /// `D` is a new I/O resource. The upgrade must thus at a minimum + /// identify the remote, which typically involves the use of a + /// cryptographic authentication protocol in the context of establishing + /// a secure channel. + /// + /// ## Transitions + /// + /// * I/O upgrade: `C -> (I, D)`. + /// * Transport output: `C -> (I, D)` + pub fn authenticate(self, upgrade: U) -> Builder< + AndThen Authenticate + Clone> + > where + T: Transport, + I: ConnectionInfo, + C: AsyncRead + AsyncWrite, + D: AsyncRead + AsyncWrite, + U: InboundUpgrade, + U: OutboundUpgrade + Clone, + E: Error + 'static, + { + Builder::new(self.inner.and_then(move |conn, endpoint| { + Authenticate { + inner: upgrade::apply(conn, upgrade, endpoint) + } + })) + } + + /// Applies an arbitrary upgrade on an authenticated, non-multiplexed + /// transport. + /// + /// The upgrade receives the I/O resource (i.e. connection) `C` and + /// must produce a new I/O resource `D`. Any number of such upgrades + /// can be performed. + /// + /// ## Transitions + /// + /// * I/O upgrade: `C -> D`. + /// * Transport output: `(I, C) -> (I, D)`. + pub fn apply(self, upgrade: U) -> Builder> + where + T: Transport, + C: AsyncRead + AsyncWrite, + D: AsyncRead + AsyncWrite, + I: ConnectionInfo, + U: InboundUpgrade, + U: OutboundUpgrade + Clone, + E: Error + 'static, + { + Builder::new(Upgrade::new(self.inner, upgrade)) + } + + /// Upgrades the transport with a (sub)stream multiplexer. + /// + /// The supplied upgrade receives the I/O resource `C` and must + /// produce a [`StreamMuxer`] `M`. The transport must already be authenticated. + /// This ends the (regular) transport upgrade process, yielding the underlying, + /// configured transport. + /// + /// ## Transitions + /// + /// * I/O upgrade: `C -> M`. + /// * Transport output: `(I, C) -> (I, M)`. + pub fn multiplex(self, upgrade: U) + -> AndThen Multiplex + Clone> + where + T: Transport, + C: AsyncRead + AsyncWrite, + M: StreamMuxer, + I: ConnectionInfo, + U: InboundUpgrade, + U: OutboundUpgrade + Clone, + E: Error + 'static, + { + self.inner.and_then(move |(i, c), endpoint| { + let upgrade = upgrade::apply(c, upgrade, endpoint); + Multiplex { info: Some(i), upgrade } + }) + } +} + +/// An upgrade that authenticates the remote peer, typically +/// in the context of negotiating a secure channel. +/// +/// Configured through [`Builder::authenticate`]. +pub struct Authenticate +where + C: AsyncRead + AsyncWrite, + U: InboundUpgrade + OutboundUpgrade +{ + inner: EitherUpgrade +} + +impl Future for Authenticate +where + C: AsyncRead + AsyncWrite, + U: InboundUpgrade + OutboundUpgrade>::Output, + Error = >::Error + > +{ + type Item = as Future>::Item; + type Error = as Future>::Error; + + fn poll(&mut self) -> Poll { + self.inner.poll() + } +} + +/// An upgrade that negotiates a (sub)stream multiplexer on +/// top of an authenticated transport. +/// +/// Configured through [`Builder::multiplex`]. +pub struct Multiplex +where + C: AsyncRead + AsyncWrite, + U: InboundUpgrade + OutboundUpgrade, +{ + info: Option, + upgrade: EitherUpgrade, +} + +impl Future for Multiplex +where + C: AsyncRead + AsyncWrite, + U: InboundUpgrade, + U: OutboundUpgrade +{ + type Item = (I, M); + type Error = UpgradeError; + + fn poll(&mut self) -> Poll { + let m = try_ready!(self.upgrade.poll()); + let i = self.info.take().expect("Multiplex future polled after completion."); + Ok(Async::Ready((i, m))) + } +} + +/// An inbound or outbound upgrade. +type EitherUpgrade = future::Either, OutboundUpgradeApply>; + +/// An upgrade on an authenticated, non-multiplexed [`Transport`]. +/// +/// See [`Builder::upgrade`](Builder::upgrade). #[derive(Debug, Copy, Clone)] pub struct Upgrade { inner: T, upgrade: U } @@ -44,50 +237,53 @@ impl Upgrade { } } -impl Transport for Upgrade +impl Transport for Upgrade where - D: Transport, - D::Output: AsyncRead + AsyncWrite, - D::Error: 'static, - U: InboundUpgrade, - U: OutboundUpgrade + Clone, - TUpgrErr: std::error::Error + Send + Sync + 'static // TODO: remove bounds + T: Transport, + T::Error: 'static, + C: AsyncRead + AsyncWrite, + U: InboundUpgrade, + U: OutboundUpgrade + Clone, + E: Error + 'static { - type Output = O; - type Error = TransportUpgradeError; - type Listener = ListenerStream; - type ListenerUpgrade = ListenerUpgradeFuture; - type Dial = DialUpgradeFuture; + type Output = (I, D); + type Error = TransportUpgradeError; + type Listener = ListenerStream; + type ListenerUpgrade = ListenerUpgradeFuture; + type Dial = DialUpgradeFuture; fn dial(self, addr: Multiaddr) -> Result> { - let outbound = self.inner.dial(addr.clone()) + let future = self.inner.dial(addr.clone()) .map_err(|err| err.map(TransportUpgradeError::Transport))?; Ok(DialUpgradeFuture { - future: outbound, - upgrade: Either::A(Some(self.upgrade)) + future, + upgrade: future::Either::A(Some(self.upgrade)) }) } fn listen_on(self, addr: Multiaddr) -> Result> { - let inbound = self.inner.listen_on(addr) + let stream = self.inner.listen_on(addr) .map_err(|err| err.map(TransportUpgradeError::Transport))?; - Ok(ListenerStream { stream: inbound, upgrade: self.upgrade }) + Ok(ListenerStream { + stream, + upgrade: self.upgrade + }) } } -/// Error produced by a transport upgrade. +/// Errors produced by a transport upgrade. #[derive(Debug)] -pub enum TransportUpgradeError { +pub enum TransportUpgradeError { /// Error in the transport. - Transport(TTransErr), + Transport(T), /// Error while upgrading to a protocol. - Upgrade(UpgradeError), + Upgrade(UpgradeError), } -impl fmt::Display for TransportUpgradeError +impl fmt::Display for TransportUpgradeError where - TTransErr: fmt::Display, - TUpgrErr: fmt::Display, + T: fmt::Display, + U: fmt::Display, { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { match self { @@ -97,12 +293,12 @@ where } } -impl error::Error for TransportUpgradeError +impl Error for TransportUpgradeError where - TTransErr: error::Error + 'static, - TUpgrErr: error::Error + 'static, + T: Error + 'static, + U: Error + 'static, { - fn source(&self) -> Option<&(dyn error::Error + 'static)> { + fn source(&self) -> Option<&(dyn Error + 'static)> { match self { TransportUpgradeError::Transport(e) => Some(e), TransportUpgradeError::Upgrade(e) => Some(e), @@ -110,63 +306,67 @@ where } } -pub struct DialUpgradeFuture +/// The [`Transport::Dial`] future of an [`Upgrade`]d transport. +pub struct DialUpgradeFuture where - T: Future, - T::Item: AsyncRead + AsyncWrite, - U: OutboundUpgrade + U: OutboundUpgrade, + C: AsyncRead + AsyncWrite, { - future: T, - upgrade: Either, OutboundUpgradeApply> + future: F, + upgrade: future::Either, (Option, OutboundUpgradeApply)> } -impl Future for DialUpgradeFuture +impl Future for DialUpgradeFuture where - T: Future, - T::Item: AsyncRead + AsyncWrite, - U: OutboundUpgrade, - U::Error: std::error::Error + Send + Sync + 'static + F: Future, + C: AsyncRead + AsyncWrite, + U: OutboundUpgrade, + U::Error: Error { - type Item = U::Output; - type Error = TransportUpgradeError; + type Item = (I, D); + type Error = TransportUpgradeError; fn poll(&mut self) -> Poll { loop { - let next = match self.upgrade { - Either::A(ref mut up) => { - let x = try_ready!(self.future.poll().map_err(TransportUpgradeError::Transport)); + self.upgrade = match self.upgrade { + future::Either::A(ref mut up) => { + let (i, c) = try_ready!(self.future.poll().map_err(TransportUpgradeError::Transport)); let u = up.take().expect("DialUpgradeFuture is constructed with Either::A(Some)."); - Either::B(apply_outbound(x, u)) + future::Either::B((Some(i), apply_outbound(c, u))) + } + future::Either::B((ref mut i, ref mut up)) => { + let d = try_ready!(up.poll().map_err(TransportUpgradeError::Upgrade)); + let i = i.take().expect("DialUpgradeFuture polled after completion."); + return Ok(Async::Ready((i, d))) } - Either::B(ref mut up) => return up.poll().map_err(TransportUpgradeError::Upgrade) - }; - self.upgrade = next + } } } } -pub struct ListenerStream { - stream: T, +/// The [`Transport::Listener`] stream of an [`Upgrade`]d transport. +pub struct ListenerStream { + stream: S, upgrade: U } -impl Stream for ListenerStream +impl Stream for ListenerStream where - T: Stream>, - F: Future, - F::Item: AsyncRead + AsyncWrite, - U: InboundUpgrade + Clone + S: Stream>, + F: Future, + C: AsyncRead + AsyncWrite, + U: InboundUpgrade + Clone { - type Item = ListenerEvent>; - type Error = TransportUpgradeError; + type Item = ListenerEvent>; + type Error = TransportUpgradeError; fn poll(&mut self) -> Poll, Self::Error> { match try_ready!(self.stream.poll().map_err(TransportUpgradeError::Transport)) { Some(event) => { - let event = event.map(move |x| { + let event = event.map(move |future| { ListenerUpgradeFuture { - future: x, - upgrade: Either::A(Some(self.upgrade.clone())) + future, + upgrade: future::Either::A(Some(self.upgrade.clone())) } }); Ok(Async::Ready(Some(event))) @@ -176,37 +376,40 @@ where } } -pub struct ListenerUpgradeFuture +/// The [`Transport::ListenerUpgrade`] future of an [`Upgrade`]d transport. +pub struct ListenerUpgradeFuture where - T: Future, - T::Item: AsyncRead + AsyncWrite, - U: InboundUpgrade + C: AsyncRead + AsyncWrite, + U: InboundUpgrade { - future: T, - upgrade: Either, InboundUpgradeApply> + future: F, + upgrade: future::Either, (Option, InboundUpgradeApply)> } -impl Future for ListenerUpgradeFuture +impl Future for ListenerUpgradeFuture where - T: Future, - T::Item: AsyncRead + AsyncWrite, - U: InboundUpgrade, - U::Error: std::error::Error + Send + Sync + 'static + F: Future, + C: AsyncRead + AsyncWrite, + U: InboundUpgrade, + U::Error: Error { - type Item = U::Output; - type Error = TransportUpgradeError; + type Item = (I, D); + type Error = TransportUpgradeError; fn poll(&mut self) -> Poll { loop { - let next = match self.upgrade { - Either::A(ref mut up) => { - let x = try_ready!(self.future.poll().map_err(TransportUpgradeError::Transport)); + self.upgrade = match self.upgrade { + future::Either::A(ref mut up) => { + let (i, c) = try_ready!(self.future.poll().map_err(TransportUpgradeError::Transport)); let u = up.take().expect("ListenerUpgradeFuture is constructed with Either::A(Some)."); - Either::B(apply_inbound(x, u)) + future::Either::B((Some(i), apply_inbound(c, u))) } - Either::B(ref mut up) => return up.poll().map_err(TransportUpgradeError::Upgrade) - }; - self.upgrade = next + future::Either::B((ref mut i, ref mut up)) => { + let d = try_ready!(up.poll().map_err(TransportUpgradeError::Upgrade)); + let i = i.take().expect("ListenerUpgradeFuture polled after completion."); + return Ok(Async::Ready((i, d))) + } + } } } } diff --git a/core/tests/network_dial_error.rs b/core/tests/network_dial_error.rs index 606c6806771..cc9c3dfac53 100644 --- a/core/tests/network_dial_error.rs +++ b/core/tests/network_dial_error.rs @@ -24,7 +24,7 @@ use futures::{future, prelude::*}; use libp2p_core::identity; use libp2p_core::multiaddr::multiaddr; use libp2p_core::nodes::network::{Network, NetworkEvent, NetworkReachError, PeerState, UnknownPeerDialErr, IncomingError}; -use libp2p_core::{PeerId, Transport, upgrade, upgrade::InboundUpgradeExt, upgrade::OutboundUpgradeExt}; +use libp2p_core::{PeerId, Transport, upgrade}; use libp2p_swarm::{ ProtocolsHandler, KeepAlive, @@ -91,21 +91,13 @@ where fn deny_incoming_connec() { // Checks whether refusing an incoming connection on a swarm triggers the correct events. - // TODO: make creating the transport more elegant ; literaly half of the code of the test - // is about creating the transport let mut swarm1: Network<_, _, _, NodeHandlerWrapperBuilder>, _> = { let local_key = identity::Keypair::generate_ed25519(); let local_public_key = local_key.public(); let transport = libp2p_tcp::TcpConfig::new() - .with_upgrade(libp2p_secio::SecioConfig::new(local_key)) - .and_then(move |out, endpoint| { - let peer_id = out.remote_key.into_peer_id(); - let peer_id2 = peer_id.clone(); - let upgrade = libp2p_mplex::MplexConfig::default() - .map_outbound(move |muxer| (peer_id, muxer)) - .map_inbound(move |muxer| (peer_id2, muxer)); - upgrade::apply(out.stream, upgrade, endpoint) - }); + .upgrade() + .authenticate(libp2p_secio::SecioConfig::new(local_key)) + .multiplex(libp2p_mplex::MplexConfig::new()); Network::new(transport, local_public_key.into()) }; @@ -113,15 +105,9 @@ fn deny_incoming_connec() { let local_key = identity::Keypair::generate_ed25519(); let local_public_key = local_key.public(); let transport = libp2p_tcp::TcpConfig::new() - .with_upgrade(libp2p_secio::SecioConfig::new(local_key)) - .and_then(move |out, endpoint| { - let peer_id = out.remote_key.into_peer_id(); - let peer_id2 = peer_id.clone(); - let upgrade = libp2p_mplex::MplexConfig::default() - .map_outbound(move |muxer| (peer_id, muxer)) - .map_inbound(move |muxer| (peer_id2, muxer)); - upgrade::apply(out.stream, upgrade, endpoint) - }); + .upgrade() + .authenticate(libp2p_secio::SecioConfig::new(local_key)) + .multiplex(libp2p_mplex::MplexConfig::new()); Network::new(transport, local_public_key.into()) }; @@ -180,27 +166,18 @@ fn dial_self() { // // The last two items can happen in any order. - // TODO: make creating the transport more elegant ; literaly half of the code of the test - // is about creating the transport let mut swarm = { let local_key = identity::Keypair::generate_ed25519(); let local_public_key = local_key.public(); let transport = libp2p_tcp::TcpConfig::new() - .with_upgrade(libp2p_secio::SecioConfig::new(local_key)) - .and_then(move |out, endpoint| { - let peer_id = out.remote_key.into_peer_id(); - let peer_id2 = peer_id.clone(); - let upgrade = libp2p_mplex::MplexConfig::default() - .map_outbound(move |muxer| (peer_id, muxer)) - .map_inbound(move |muxer| (peer_id2, muxer)); - upgrade::apply(out.stream, upgrade, endpoint) - }) + .upgrade() + .authenticate(libp2p_secio::SecioConfig::new(local_key)) + .multiplex(libp2p_mplex::MplexConfig::new()) .and_then(|(peer, mplex), _| { // Gracefully close the connection to allow protocol // negotiation to complete. util::CloseMuxer::new(mplex).map(move |mplex| (peer, mplex)) }); - Network::new(transport, local_public_key.into()) }; @@ -268,21 +245,13 @@ fn dial_self_by_id() { // Trying to dial self by passing the same `PeerId` shouldn't even be possible in the first // place. - // TODO: make creating the transport more elegant ; literaly half of the code of the test - // is about creating the transport let mut swarm: Network<_, _, _, NodeHandlerWrapperBuilder>, _> = { let local_key = identity::Keypair::generate_ed25519(); let local_public_key = local_key.public(); let transport = libp2p_tcp::TcpConfig::new() - .with_upgrade(libp2p_secio::SecioConfig::new(local_key)) - .and_then(move |out, endpoint| { - let peer_id = out.remote_key.into_peer_id(); - let peer_id2 = peer_id.clone(); - let upgrade = libp2p_mplex::MplexConfig::default() - .map_outbound(move |muxer| (peer_id, muxer)) - .map_inbound(move |muxer| (peer_id2, muxer)); - upgrade::apply(out.stream, upgrade, endpoint) - }); + .upgrade() + .authenticate(libp2p_secio::SecioConfig::new(local_key)) + .multiplex(libp2p_mplex::MplexConfig::new()); Network::new(transport, local_public_key.into()) }; @@ -294,21 +263,13 @@ fn dial_self_by_id() { fn multiple_addresses_err() { // Tries dialing multiple addresses, and makes sure there's one dialing error per addresses. - // TODO: make creating the transport more elegant ; literaly half of the code of the test - // is about creating the transport let mut swarm = { let local_key = identity::Keypair::generate_ed25519(); let local_public_key = local_key.public(); let transport = libp2p_tcp::TcpConfig::new() - .with_upgrade(libp2p_secio::SecioConfig::new(local_key)) - .and_then(move |out, endpoint| { - let peer_id = out.remote_key.into_peer_id(); - let peer_id2 = peer_id.clone(); - let upgrade = libp2p_mplex::MplexConfig::default() - .map_outbound(move |muxer| (peer_id, muxer)) - .map_inbound(move |muxer| (peer_id2, muxer)); - upgrade::apply(out.stream, upgrade, endpoint) - }); + .upgrade() + .authenticate(libp2p_secio::SecioConfig::new(local_key)) + .multiplex(libp2p_mplex::MplexConfig::new()); Network::new(transport, local_public_key.into()) }; diff --git a/core/tests/network_simult.rs b/core/tests/network_simult.rs index 0b5c23839d6..958631b5142 100644 --- a/core/tests/network_simult.rs +++ b/core/tests/network_simult.rs @@ -21,16 +21,15 @@ mod util; use futures::{future, prelude::*}; -use libp2p_core::identity; +use libp2p_core::{identity, upgrade, Transport}; use libp2p_core::nodes::{Network, NetworkEvent, Peer}; use libp2p_core::nodes::network::IncomingError; -use libp2p_core::{Transport, upgrade, upgrade::OutboundUpgradeExt, upgrade::InboundUpgradeExt}; use libp2p_swarm::{ ProtocolsHandler, KeepAlive, SubstreamProtocol, ProtocolsHandlerEvent, - ProtocolsHandlerUpgrErr + ProtocolsHandlerUpgrErr, }; use std::{io, time::Duration}; use wasm_timer::{Delay, Instant}; @@ -107,21 +106,13 @@ fn raw_swarm_simultaneous_connect() { // despite the fact that it adds a dependency. for _ in 0 .. 10 { - // TODO: make creating the transport more elegant ; literaly half of the code of the test - // is about creating the transport let mut swarm1 = { let local_key = identity::Keypair::generate_ed25519(); let local_public_key = local_key.public(); let transport = libp2p_tcp::TcpConfig::new() - .with_upgrade(libp2p_secio::SecioConfig::new(local_key)) - .and_then(move |out, endpoint| { - let peer_id = out.remote_key.into_peer_id(); - let peer_id2 = peer_id.clone(); - let upgrade = libp2p_mplex::MplexConfig::default() - .map_outbound(move |muxer| (peer_id, muxer)) - .map_inbound(move |muxer| (peer_id2, muxer)); - upgrade::apply(out.stream, upgrade, endpoint) - }) + .upgrade() + .authenticate(libp2p_secio::SecioConfig::new(local_key)) + .multiplex(libp2p_mplex::MplexConfig::new()) .and_then(|(peer, mplex), _| { // Gracefully close the connection to allow protocol // negotiation to complete. @@ -134,15 +125,9 @@ fn raw_swarm_simultaneous_connect() { let local_key = identity::Keypair::generate_ed25519(); let local_public_key = local_key.public(); let transport = libp2p_tcp::TcpConfig::new() - .with_upgrade(libp2p_secio::SecioConfig::new(local_key)) - .and_then(move |out, endpoint| { - let peer_id = out.remote_key.into_peer_id(); - let peer_id2 = peer_id.clone(); - let upgrade = libp2p_mplex::MplexConfig::default() - .map_outbound(move |muxer| (peer_id, muxer)) - .map_inbound(move |muxer| (peer_id2, muxer)); - upgrade::apply(out.stream, upgrade, endpoint) - }) + .upgrade() + .authenticate(libp2p_secio::SecioConfig::new(local_key)) + .multiplex(libp2p_mplex::MplexConfig::new()) .and_then(|(peer, mplex), _| { // Gracefully close the connection to allow protocol // negotiation to complete. @@ -313,3 +298,4 @@ fn raw_swarm_simultaneous_connect() { } } } + diff --git a/core/tests/transport_upgrade.rs b/core/tests/transport_upgrade.rs new file mode 100644 index 00000000000..61b96f35e80 --- /dev/null +++ b/core/tests/transport_upgrade.rs @@ -0,0 +1,128 @@ +// Copyright 2019 Parity Technologies (UK) Ltd. +// +// Permission is hereby granted, free of charge, to any person obtaining a +// copy of this software and associated documentation files (the "Software"), +// to deal in the Software without restriction, including without limitation +// the rights to use, copy, modify, merge, publish, distribute, sublicense, +// and/or sell copies of the Software, and to permit persons to whom the +// Software is furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in +// all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS +// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING +// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER +// DEALINGS IN THE SOFTWARE. + +mod util; + +use futures::future::Future; +use futures::stream::Stream; +use libp2p_core::identity; +use libp2p_core::transport::{Transport, MemoryTransport, ListenerEvent}; +use libp2p_core::upgrade::{UpgradeInfo, Negotiated, InboundUpgrade, OutboundUpgrade}; +use libp2p_mplex::MplexConfig; +use libp2p_secio::SecioConfig; +use multiaddr::Multiaddr; +use rand::random; +use std::io; +use tokio_io::{io as nio, AsyncWrite, AsyncRead}; + +#[derive(Clone)] +struct HelloUpgrade {} + +impl UpgradeInfo for HelloUpgrade { + type Info = &'static str; + type InfoIter = std::iter::Once; + + fn protocol_info(&self) -> Self::InfoIter { + std::iter::once("/hello/1") + } +} + +impl InboundUpgrade for HelloUpgrade +where + C: AsyncRead + AsyncWrite + Send + 'static +{ + type Output = Negotiated; + type Error = io::Error; + type Future = Box + Send>; + + fn upgrade_inbound(self, socket: Negotiated, _: Self::Info) -> Self::Future { + Box::new(nio::read_exact(socket, [0u8; 5]).map(|(io, buf)| { + assert_eq!(&buf[..], "hello".as_bytes()); + io + })) + } +} + +impl OutboundUpgrade for HelloUpgrade +where + C: AsyncWrite + AsyncRead + Send + 'static, +{ + type Output = Negotiated; + type Error = io::Error; + type Future = Box + Send>; + + fn upgrade_outbound(self, socket: Negotiated, _: Self::Info) -> Self::Future { + Box::new(nio::write_all(socket, "hello").map(|(io, _)| io)) + } +} + +#[test] +fn upgrade_pipeline() { + let listener_keys = identity::Keypair::generate_ed25519(); + let listener_id = listener_keys.public().into_peer_id(); + let listener_transport = MemoryTransport::default() + .upgrade() + .authenticate(SecioConfig::new(listener_keys)) + .apply(HelloUpgrade {}) + .apply(HelloUpgrade {}) + .apply(HelloUpgrade {}) + .multiplex(MplexConfig::default()) + .and_then(|(peer, mplex), _| { + // Gracefully close the connection to allow protocol + // negotiation to complete. + util::CloseMuxer::new(mplex).map(move |mplex| (peer, mplex)) + }); + + let dialer_keys = identity::Keypair::generate_ed25519(); + let dialer_id = dialer_keys.public().into_peer_id(); + let dialer_transport = MemoryTransport::default() + .upgrade() + .authenticate(SecioConfig::new(dialer_keys)) + .apply(HelloUpgrade {}) + .apply(HelloUpgrade {}) + .apply(HelloUpgrade {}) + .multiplex(MplexConfig::default()) + .and_then(|(peer, mplex), _| { + // Gracefully close the connection to allow protocol + // negotiation to complete. + util::CloseMuxer::new(mplex).map(move |mplex| (peer, mplex)) + }); + + let listen_addr: Multiaddr = format!("/memory/{}", random::()).parse().unwrap(); + let listener = listener_transport.listen_on(listen_addr.clone()).unwrap() + .filter_map(ListenerEvent::into_upgrade) + .for_each(move |(upgrade, _remote_addr)| { + let dialer = dialer_id.clone(); + upgrade.map(move |(peer, _mplex)| { + assert_eq!(peer, dialer) + }) + }) + .map_err(|e| panic!("Listener error: {}", e)); + + let dialer = dialer_transport.dial(listen_addr).unwrap() + .map(move |(peer, _mplex)| { + assert_eq!(peer, listener_id) + }); + + let mut rt = tokio::runtime::Runtime::new().unwrap(); + rt.spawn(listener); + rt.block_on(dialer).unwrap() +} + diff --git a/muxers/mplex/tests/async_write.rs b/muxers/mplex/tests/async_write.rs index be1a2b5a3d6..8d728302437 100644 --- a/muxers/mplex/tests/async_write.rs +++ b/muxers/mplex/tests/async_write.rs @@ -18,7 +18,7 @@ // FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER // DEALINGS IN THE SOFTWARE. -use libp2p_core::{muxing, Transport, transport::ListenerEvent}; +use libp2p_core::{muxing, upgrade, Transport, transport::ListenerEvent}; use libp2p_tcp::TcpConfig; use futures::prelude::*; use std::sync::{Arc, mpsc}; @@ -32,8 +32,9 @@ fn async_write() { let (tx, rx) = mpsc::channel(); let bg_thread = thread::spawn(move || { - let transport = - TcpConfig::new().with_upgrade(libp2p_mplex::MplexConfig::new()); + let mplex = libp2p_mplex::MplexConfig::new(); + + let transport = TcpConfig::new().and_then(move |c, e| upgrade::apply(c, mplex, e)); let mut listener = transport .listen_on("/ip4/127.0.0.1/tcp/0".parse().unwrap()) @@ -67,7 +68,8 @@ fn async_write() { let _ = rt.block_on(future).unwrap(); }); - let transport = TcpConfig::new().with_upgrade(libp2p_mplex::MplexConfig::new()); + let mplex = libp2p_mplex::MplexConfig::new(); + let transport = TcpConfig::new().and_then(move |c, e| upgrade::apply(c, mplex, e)); let future = transport .dial(rx.recv().unwrap()) diff --git a/muxers/mplex/tests/two_peers.rs b/muxers/mplex/tests/two_peers.rs index 580ed5c17cc..aaa4fca2baa 100644 --- a/muxers/mplex/tests/two_peers.rs +++ b/muxers/mplex/tests/two_peers.rs @@ -18,7 +18,7 @@ // FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER // DEALINGS IN THE SOFTWARE. -use libp2p_core::{muxing, Transport, transport::ListenerEvent}; +use libp2p_core::{muxing, upgrade, Transport, transport::ListenerEvent}; use libp2p_tcp::TcpConfig; use futures::prelude::*; use std::sync::{Arc, mpsc}; @@ -35,8 +35,9 @@ fn client_to_server_outbound() { let (tx, rx) = mpsc::channel(); let bg_thread = thread::spawn(move || { - let transport = - TcpConfig::new().with_upgrade(libp2p_mplex::MplexConfig::new()); + let mplex = libp2p_mplex::MplexConfig::new(); + + let transport = TcpConfig::new().and_then(move |c, e| upgrade::apply(c, mplex, e)); let mut listener = transport .listen_on("/ip4/127.0.0.1/tcp/0".parse().unwrap()) @@ -75,7 +76,8 @@ fn client_to_server_outbound() { let _ = rt.block_on(future).unwrap(); }); - let transport = TcpConfig::new().with_upgrade(libp2p_mplex::MplexConfig::new()); + let mplex = libp2p_mplex::MplexConfig::new(); + let transport = TcpConfig::new().and_then(move |c, e| upgrade::apply(c, mplex, e)); let future = transport .dial(rx.recv().unwrap()) @@ -98,8 +100,8 @@ fn client_to_server_inbound() { let (tx, rx) = mpsc::channel(); let bg_thread = thread::spawn(move || { - let transport = - TcpConfig::new().with_upgrade(libp2p_mplex::MplexConfig::new()); + let mplex = libp2p_mplex::MplexConfig::new(); + let transport = TcpConfig::new().and_then(move |c, e| upgrade::apply(c, mplex, e)); let mut listener = transport .listen_on("/ip4/127.0.0.1/tcp/0".parse().unwrap()) @@ -139,7 +141,8 @@ fn client_to_server_inbound() { let _ = rt.block_on(future).unwrap(); }); - let transport = TcpConfig::new().with_upgrade(libp2p_mplex::MplexConfig::new()); + let mplex = libp2p_mplex::MplexConfig::new(); + let transport = TcpConfig::new().and_then(move |c, e| upgrade::apply(c, mplex, e)); let future = transport .dial(rx.recv().unwrap()) diff --git a/protocols/deflate/tests/test.rs b/protocols/deflate/tests/test.rs index 7ea9b116570..a0b2c07fa8c 100644 --- a/protocols/deflate/tests/test.rs +++ b/protocols/deflate/tests/test.rs @@ -20,7 +20,7 @@ use futures::prelude::*; use libp2p_core::transport::{ListenerEvent, Transport}; -use libp2p_core::upgrade::Negotiated; +use libp2p_core::upgrade::{self, Negotiated}; use libp2p_deflate::{DeflateConfig, DeflateOutput}; use libp2p_tcp::{TcpConfig, TcpTransStream}; use log::info; @@ -32,9 +32,9 @@ fn deflate() { let _ = env_logger::try_init(); fn prop(message: Vec) -> bool { - let server_transport = TcpConfig::new().with_upgrade(DeflateConfig {}); - let client_transport = TcpConfig::new().with_upgrade(DeflateConfig {}); - run(server_transport, client_transport, message); + let client = TcpConfig::new().and_then(|c, e| upgrade::apply(c, DeflateConfig {}, e)); + let server = client.clone(); + run(server, client, message); true } diff --git a/protocols/identify/src/identify.rs b/protocols/identify/src/identify.rs index dd416c19dc6..7c8b68e4467 100644 --- a/protocols/identify/src/identify.rs +++ b/protocols/identify/src/identify.rs @@ -259,10 +259,9 @@ mod tests { use libp2p_core::{ identity, PeerId, - upgrade::{self, OutboundUpgradeExt, InboundUpgradeExt}, muxing::StreamMuxer, Multiaddr, - Transport + Transport, }; use libp2p_tcp::TcpConfig; use libp2p_secio::SecioConfig; @@ -283,15 +282,9 @@ mod tests { let pubkey = id_keys.public(); let transport = TcpConfig::new() .nodelay(true) - .with_upgrade(SecioConfig::new(id_keys)) - .and_then(move |out, endpoint| { - let peer_id = out.remote_key.into_peer_id(); - let peer_id2 = peer_id.clone(); - let upgrade = MplexConfig::default() - .map_outbound(move |muxer| (peer_id, muxer)) - .map_inbound(move |muxer| (peer_id2, muxer)); - upgrade::apply(out.stream, upgrade, endpoint) - }); + .upgrade() + .authenticate(SecioConfig::new(id_keys)) + .multiplex(MplexConfig::new()); (pubkey, transport) } diff --git a/protocols/kad/src/behaviour/test.rs b/protocols/kad/src/behaviour/test.rs index 3b65f1cb37f..d39a312fe30 100644 --- a/protocols/kad/src/behaviour/test.rs +++ b/protocols/kad/src/behaviour/test.rs @@ -34,7 +34,6 @@ use libp2p_core::{ nodes::Substream, multiaddr::{Protocol, multiaddr}, muxing::StreamMuxerBox, - upgrade, }; use libp2p_secio::SecioConfig; use libp2p_swarm::Swarm; @@ -61,18 +60,13 @@ fn build_nodes_with_config(num: usize, cfg: KademliaConfig) -> (u64, Vec> = Vec::with_capacity(num); for _ in 0 .. num { - // TODO: make creating the transport more elegant ; literaly half of the code of the test - // is about creating the transport let local_key = identity::Keypair::generate_ed25519(); let local_public_key = local_key.public(); let transport = MemoryTransport::default() - .with_upgrade(SecioConfig::new(local_key)) - .and_then(move |out, endpoint| { - let peer_id = out.remote_key.into_peer_id(); - let yamux = yamux::Config::default(); - upgrade::apply(out.stream, yamux, endpoint) - .map(|muxer| (peer_id, StreamMuxerBox::new(muxer))) - }) + .upgrade() + .authenticate(SecioConfig::new(local_key)) + .multiplex(yamux::Config::default()) + .map(|(p, m), _| (p, StreamMuxerBox::new(m))) .map_err(|e| panic!("Failed to create transport: {:?}", e)) .boxed(); diff --git a/protocols/noise/src/error.rs b/protocols/noise/src/error.rs index 8f51b0dfb50..b074a45c606 100644 --- a/protocols/noise/src/error.rs +++ b/protocols/noise/src/error.rs @@ -31,6 +31,9 @@ pub enum NoiseError { Noise(SnowError), /// A public key is invalid. InvalidKey, + /// Authentication in a [`NoiseAuthenticated`](crate::NoiseAuthenticated) + /// upgrade failed. + AuthenticationFailed, /// A handshake payload is invalid. InvalidPayload(protobuf::ProtobufError), /// A signature was required and could not be created. @@ -46,6 +49,7 @@ impl fmt::Display for NoiseError { NoiseError::Noise(e) => write!(f, "{}", e), NoiseError::InvalidKey => f.write_str("invalid public key"), NoiseError::InvalidPayload(e) => write!(f, "{}", e), + NoiseError::AuthenticationFailed => f.write_str("Authentication failed"), NoiseError::SigningError(e) => write!(f, "{}", e), NoiseError::__Nonexhaustive => f.write_str("__Nonexhaustive") } @@ -58,6 +62,7 @@ impl Error for NoiseError { NoiseError::Io(e) => Some(e), NoiseError::Noise(_) => None, // TODO: `SnowError` should implement `Error`. NoiseError::InvalidKey => None, + NoiseError::AuthenticationFailed => None, NoiseError::InvalidPayload(e) => Some(e), NoiseError::SigningError(e) => Some(e), NoiseError::__Nonexhaustive => None diff --git a/protocols/noise/src/lib.rs b/protocols/noise/src/lib.rs index 6fb93a440d9..fc6ed25e308 100644 --- a/protocols/noise/src/lib.rs +++ b/protocols/noise/src/lib.rs @@ -43,9 +43,9 @@ //! # fn main() { //! let id_keys = identity::Keypair::generate_ed25519(); //! let dh_keys = Keypair::::new().into_authentic(&id_keys).unwrap(); -//! let noise = NoiseConfig::xx(dh_keys); -//! let transport = TcpConfig::new().with_upgrade(noise); -//! // ... +//! let noise = NoiseConfig::xx(dh_keys).into_authenticated(); +//! let builder = TcpConfig::new().upgrade().authenticate(noise); +//! // let transport = builder.multiplex(...); //! # } //! ``` //! @@ -61,7 +61,8 @@ pub use io::handshake::{Handshake, RemoteIdentity, IdentityExchange}; pub use protocol::{Keypair, AuthenticKeypair, KeypairIdentity, PublicKey, SecretKey}; pub use protocol::{Protocol, ProtocolParams, x25519::X25519, IX, IK, XX}; -use libp2p_core::{identity, UpgradeInfo, InboundUpgrade, OutboundUpgrade, upgrade::Negotiated}; +use futures::{future::{self, FutureResult}, Future}; +use libp2p_core::{identity, PeerId, UpgradeInfo, InboundUpgrade, OutboundUpgrade, Negotiated}; use tokio_io::{AsyncRead, AsyncWrite}; use zeroize::Zeroize; @@ -74,6 +75,14 @@ pub struct NoiseConfig { _marker: std::marker::PhantomData

} +impl NoiseConfig { + /// Turn the `NoiseConfig` into an authenticated upgrade for use + /// with a [`Network`](libp2p_core::nodes::Network). + pub fn into_authenticated(self) -> NoiseAuthenticated { + NoiseAuthenticated { config: self } + } +} + impl NoiseConfig where C: Protocol + Zeroize @@ -277,3 +286,84 @@ where } } +// Authenticated Upgrades ///////////////////////////////////////////////////// + +/// A `NoiseAuthenticated` transport upgrade that wraps around any +/// `NoiseConfig` handshake and verifies that the remote identified with a +/// [`RemoteIdentity::IdentityKey`], aborting otherwise. +/// +/// See [`NoiseConfig::into_authenticated`]. +/// +/// On success, the upgrade yields the [`PeerId`] obtained from the +/// `RemoteIdentity`. The output of this upgrade is thus directly suitable +/// for creating an [`authenticated`](libp2p_core::TransportBuilder::authenticate) +/// transport for use with a [`Network`](libp2p_core::nodes::Network). +#[derive(Clone)] +pub struct NoiseAuthenticated { + config: NoiseConfig +} + +impl UpgradeInfo for NoiseAuthenticated +where + NoiseConfig: UpgradeInfo +{ + type Info = as UpgradeInfo>::Info; + type InfoIter = as UpgradeInfo>::InfoIter; + + fn protocol_info(&self) -> Self::InfoIter { + self.config.protocol_info() + } +} + +impl InboundUpgrade for NoiseAuthenticated +where + NoiseConfig: UpgradeInfo + InboundUpgrade, NoiseOutput>), + Error = NoiseError + >, + T: AsyncRead + AsyncWrite + Send + 'static, + C: Protocol + AsRef<[u8]> + Zeroize + Send + 'static, +{ + type Output = (PeerId, NoiseOutput>); + type Error = NoiseError; + type Future = future::AndThen< + as InboundUpgrade>::Future, + FutureResult, + fn((RemoteIdentity, NoiseOutput>)) -> FutureResult + >; + + fn upgrade_inbound(self, socket: Negotiated, info: Self::Info) -> Self::Future { + self.config.upgrade_inbound(socket, info) + .and_then(|(remote, io)| future::result(match remote { + RemoteIdentity::IdentityKey(pk) => Ok((pk.into_peer_id(), io)), + _ => Err(NoiseError::AuthenticationFailed) + })) + } +} + +impl OutboundUpgrade for NoiseAuthenticated +where + NoiseConfig: UpgradeInfo + OutboundUpgrade, NoiseOutput>), + Error = NoiseError + >, + T: AsyncRead + AsyncWrite + Send + 'static, + C: Protocol + AsRef<[u8]> + Zeroize + Send + 'static, +{ + type Output = (PeerId, NoiseOutput>); + type Error = NoiseError; + type Future = future::AndThen< + as OutboundUpgrade>::Future, + FutureResult, + fn((RemoteIdentity, NoiseOutput>)) -> FutureResult + >; + + fn upgrade_outbound(self, socket: Negotiated, info: Self::Info) -> Self::Future { + self.config.upgrade_outbound(socket, info) + .and_then(|(remote, io)| future::result(match remote { + RemoteIdentity::IdentityKey(pk) => Ok((pk.into_peer_id(), io)), + _ => Err(NoiseError::AuthenticationFailed) + })) + } +} + diff --git a/protocols/noise/tests/smoke.rs b/protocols/noise/tests/smoke.rs index 712840046d0..ff7a9d5a163 100644 --- a/protocols/noise/tests/smoke.rs +++ b/protocols/noise/tests/smoke.rs @@ -20,7 +20,7 @@ use futures::{future::{self, Either}, prelude::*}; use libp2p_core::identity; -use libp2p_core::upgrade::{Negotiated, apply_inbound, apply_outbound}; +use libp2p_core::upgrade::{self, Negotiated, apply_inbound, apply_outbound}; use libp2p_core::transport::{Transport, ListenerEvent}; use libp2p_noise::{Keypair, X25519, NoiseConfig, RemoteIdentity, NoiseError, NoiseOutput}; use libp2p_tcp::{TcpConfig, TcpTransStream}; @@ -28,6 +28,16 @@ use log::info; use quickcheck::QuickCheck; use tokio::{self, io}; +#[allow(dead_code)] +fn core_upgrade_compat() { + // Tests API compaibility with the libp2p-core upgrade API, + // i.e. if it compiles, the "test" is considered a success. + let id_keys = identity::Keypair::generate_ed25519(); + let dh_keys = Keypair::::new().into_authentic(&id_keys).unwrap(); + let noise = NoiseConfig::xx(dh_keys).into_authenticated(); + let _ = TcpConfig::new().upgrade().authenticate(noise); +} + #[test] fn xx() { let _ = env_logger::try_init(); @@ -40,12 +50,16 @@ fn xx() { let server_dh = Keypair::::new().into_authentic(&server_id).unwrap(); let server_transport = TcpConfig::new() - .with_upgrade(NoiseConfig::xx(server_dh)) + .and_then(move |output, endpoint| { + upgrade::apply(output, NoiseConfig::xx(server_dh), endpoint) + }) .and_then(move |out, _| expect_identity(out, &client_id_public)); let client_dh = Keypair::::new().into_authentic(&client_id).unwrap(); let client_transport = TcpConfig::new() - .with_upgrade(NoiseConfig::xx(client_dh)) + .and_then(move |output, endpoint| { + upgrade::apply(output, NoiseConfig::xx(client_dh), endpoint) + }) .and_then(move |out, _| expect_identity(out, &server_id_public)); run(server_transport, client_transport, message); @@ -66,12 +80,16 @@ fn ix() { let server_dh = Keypair::::new().into_authentic(&server_id).unwrap(); let server_transport = TcpConfig::new() - .with_upgrade(NoiseConfig::ix(server_dh)) + .and_then(move |output, endpoint| { + upgrade::apply(output, NoiseConfig::ix(server_dh), endpoint) + }) .and_then(move |out, _| expect_identity(out, &client_id_public)); let client_dh = Keypair::::new().into_authentic(&client_id).unwrap(); let client_transport = TcpConfig::new() - .with_upgrade(NoiseConfig::ix(client_dh)) + .and_then(move |output, endpoint| { + upgrade::apply(output, NoiseConfig::ix(client_dh), endpoint) + }) .and_then(move |out, _| expect_identity(out, &server_id_public)); run(server_transport, client_transport, message); diff --git a/protocols/ping/tests/ping.rs b/protocols/ping/tests/ping.rs index 3a6b7661d56..6d6b98c2e6e 100644 --- a/protocols/ping/tests/ping.rs +++ b/protocols/ping/tests/ping.rs @@ -23,16 +23,18 @@ use libp2p_core::{ Multiaddr, PeerId, + Negotiated, identity, muxing::StreamMuxer, - upgrade::{self, OutboundUpgradeExt, InboundUpgradeExt}, - transport::Transport + transport::{Transport, boxed::Boxed}, + either::EitherError, + upgrade::UpgradeError }; use libp2p_ping::*; -use libp2p_yamux as yamux; -use libp2p_secio::SecioConfig; +use libp2p_yamux::{self as yamux, Yamux}; +use libp2p_secio::{SecioConfig, SecioOutput, SecioError}; use libp2p_swarm::Swarm; -use libp2p_tcp::TcpConfig; +use libp2p_tcp::{TcpConfig, TcpTransStream}; use futures::{future, prelude::*}; use std::{fmt, io, time::Duration, sync::mpsc::sync_channel}; use tokio::runtime::Runtime; @@ -101,26 +103,21 @@ fn ping() { assert!(rtt < Duration::from_millis(50)); } -fn mk_transport() -> (PeerId, impl Transport< - Output = (PeerId, impl StreamMuxer>), - Listener = impl Send, - ListenerUpgrade = impl Send, - Dial = impl Send, - Error = impl fmt::Debug -> + Clone) { +fn mk_transport() -> ( + PeerId, + Boxed< + (PeerId, Yamux>>>), + EitherError>, UpgradeError> + > +) { let id_keys = identity::Keypair::generate_ed25519(); let peer_id = id_keys.public().into_peer_id(); let transport = TcpConfig::new() .nodelay(true) - .with_upgrade(SecioConfig::new(id_keys)) - .and_then(move |out, endpoint| { - let peer_id = out.remote_key.into_peer_id(); - let peer_id2 = peer_id.clone(); - let upgrade = yamux::Config::default() - .map_outbound(move |muxer| (peer_id, muxer)) - .map_inbound(move |muxer| (peer_id2, muxer)); - upgrade::apply(out.stream, upgrade, endpoint) - }); + .upgrade() + .authenticate(SecioConfig::new(id_keys)) + .multiplex(yamux::Config::default()) + .boxed(); (peer_id, transport) } diff --git a/protocols/secio/Cargo.toml b/protocols/secio/Cargo.toml index 346f7cba76f..c65d13bd954 100644 --- a/protocols/secio/Cargo.toml +++ b/protocols/secio/Cargo.toml @@ -45,6 +45,7 @@ aes-all = ["aesni"] [dev-dependencies] criterion = "0.2" libp2p-tcp = { version = "0.12.0", path = "../../transports/tcp" } +libp2p-mplex = { version = "0.12.0", path = "../../muxers/mplex" } tokio = "0.1" tokio-tcp = "0.1" diff --git a/protocols/secio/src/lib.rs b/protocols/secio/src/lib.rs index 199802945e9..2965a92150c 100644 --- a/protocols/secio/src/lib.rs +++ b/protocols/secio/src/lib.rs @@ -21,65 +21,54 @@ //! The `secio` protocol is a middleware that will encrypt and decrypt communications going //! through a socket (or anything that implements `AsyncRead + AsyncWrite`). //! -//! # Connection upgrade +//! # Usage //! -//! The `SecioConfig` struct implements the `ConnectionUpgrade` trait. You can apply it over a -//! `Transport` by using the `with_upgrade` method. The returned object will also implement -//! `Transport` and will automatically apply the secio protocol over any connection that is opened -//! through it. +//! The `SecioConfig` implements [`InboundUpgrade`] and [`OutboundUpgrade`] and thus +//! serves as a connection upgrade for authentication of a transport. +//! See [`authenticate`](libp2p_core::transport::upgrade::builder::Builder::authenticate). //! //! ```no_run //! # fn main() { //! use futures::Future; //! use libp2p_secio::{SecioConfig, SecioOutput}; -//! use libp2p_core::{Multiaddr, identity, upgrade::apply_inbound}; +//! use libp2p_core::{PeerId, Multiaddr, identity}; //! use libp2p_core::transport::Transport; +//! use libp2p_mplex::MplexConfig; //! use libp2p_tcp::TcpConfig; -//! use tokio_io::io::write_all; -//! use tokio::runtime::current_thread::Runtime; //! -//! let dialer = TcpConfig::new() -//! .with_upgrade({ -//! # let private_key = &mut []; -//! // See the documentation of `identity::Keypair`. -//! let keypair = identity::Keypair::rsa_from_pkcs8(private_key).unwrap(); -//! SecioConfig::new(keypair) -//! }) -//! .map(|out: SecioOutput<_>, _| out.stream); +//! // Create a local peer identity. +//! let local_keys = identity::Keypair::generate_ed25519(); //! -//! let future = dialer.dial("/ip4/127.0.0.1/tcp/12345".parse::().unwrap()) -//! .unwrap() -//! .map_err(|e| panic!("error: {:?}", e)) -//! .and_then(|connection| { -//! // Sends "hello world" on the connection, will be encrypted. -//! write_all(connection, "hello world") -//! }) -//! .map_err(|e| panic!("error: {:?}", e)); +//! // Create a `Transport`. +//! let transport = TcpConfig::new() +//! .upgrade() +//! .authenticate(SecioConfig::new(local_keys.clone())) +//! .multiplex(MplexConfig::default()); //! -//! let mut rt = Runtime::new().unwrap(); -//! let _ = rt.block_on(future).unwrap(); +//! // The transport can be used with a `Network` from `libp2p-core`, or a +//! // `Swarm` from from `libp2p-swarm`. See the documentation of these +//! // crates for mode details. +//! +//! // let network = Network::new(transport, local_keys.public().into_peer_id()); +//! // let swarm = Swarm::new(transport, behaviour, local_keys.public().into_peer_id()); //! # } //! ``` //! -//! # Manual usage -//! -//! > **Note**: You are encouraged to use `SecioConfig` as described above. -//! -//! You can add the `secio` layer over a socket by calling `SecioMiddleware::handshake()`. This -//! method will perform a handshake with the host, and return a future that corresponds to the -//! moment when the handshake succeeds or errored. On success, the future produces a -//! `SecioMiddleware` that implements `Sink` and `Stream` and can be used to send packets of data. -//! pub use self::error::SecioError; use bytes::BytesMut; use futures::stream::MapErr as StreamMapErr; use futures::{Future, Poll, Sink, StartSend, Stream}; -use libp2p_core::{PublicKey, identity, upgrade::{UpgradeInfo, InboundUpgrade, OutboundUpgrade, Negotiated}}; +use libp2p_core::{ + PeerId, + PublicKey, + identity, + upgrade::{UpgradeInfo, InboundUpgrade, OutboundUpgrade, Negotiated} +}; use log::debug; use rw_stream_sink::RwStreamSink; -use std::io::{Error as IoError, ErrorKind as IoErrorKind}; +use std::io; use std::iter; use tokio_io::{AsyncRead, AsyncWrite}; @@ -145,7 +134,7 @@ impl SecioConfig { self } - fn handshake(self, socket: T) -> impl Future, Error=SecioError> + fn handshake(self, socket: T) -> impl Future), Error=SecioError> where T: AsyncRead + AsyncWrite + Send + 'static { @@ -153,11 +142,13 @@ impl SecioConfig { SecioMiddleware::handshake(socket, self) .map(|(stream_sink, pubkey, ephemeral)| { let mapped = stream_sink.map_err(map_err as fn(_) -> _); - SecioOutput { + let peer = pubkey.clone().into_peer_id(); + let io = SecioOutput { stream: RwStreamSink::new(mapped), remote_key: pubkey, ephemeral_public_key: ephemeral - } + }; + (peer, io) }) } } @@ -168,7 +159,7 @@ where S: AsyncRead + AsyncWrite, { /// The encrypted stream. - pub stream: RwStreamSink, fn(SecioError) -> IoError>>, + pub stream: RwStreamSink, fn(SecioError) -> io::Error>>, /// The public key of the remote. pub remote_key: PublicKey, /// Ephemeral public key used during the negotiation. @@ -188,7 +179,7 @@ impl InboundUpgrade for SecioConfig where T: AsyncRead + AsyncWrite + Send + 'static { - type Output = SecioOutput>; + type Output = (PeerId, SecioOutput>); type Error = SecioError; type Future = Box + Send>; @@ -201,7 +192,7 @@ impl OutboundUpgrade for SecioConfig where T: AsyncRead + AsyncWrite + Send + 'static { - type Output = SecioOutput>; + type Output = (PeerId, SecioOutput>); type Error = SecioError; type Future = Box + Send>; @@ -210,10 +201,37 @@ where } } -#[inline] -fn map_err(err: SecioError) -> IoError { +impl io::Read for SecioOutput { + fn read(&mut self, buf: &mut [u8]) -> io::Result { + self.stream.read(buf) + } +} + +impl AsyncRead for SecioOutput { + unsafe fn prepare_uninitialized_buffer(&self, buf: &mut [u8]) -> bool { + self.stream.prepare_uninitialized_buffer(buf) + } +} + +impl io::Write for SecioOutput { + fn write(&mut self, buf: &[u8]) -> io::Result { + self.stream.write(buf) + } + + fn flush(&mut self) -> io::Result<()> { + self.stream.flush() + } +} + +impl AsyncWrite for SecioOutput { + fn shutdown(&mut self) -> Poll<(), io::Error> { + self.stream.shutdown() + } +} + +fn map_err(err: SecioError) -> io::Error { debug!("error during secio handshake {:?}", err); - IoError::new(IoErrorKind::InvalidData, err) + io::Error::new(io::ErrorKind::InvalidData, err) } /// Wraps around an object that implements `AsyncRead` and `AsyncWrite`. @@ -247,7 +265,7 @@ where S: AsyncRead + AsyncWrite, { type SinkItem = BytesMut; - type SinkError = IoError; + type SinkError = io::Error; #[inline] fn start_send(&mut self, item: Self::SinkItem) -> StartSend { diff --git a/src/lib.rs b/src/lib.rs index c6795abbd89..f69e50a0248 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -82,15 +82,15 @@ //! *upgraded*. Upgrading a transport is the process of negotiating an additional protocol //! with the remote, mediated through a negotiation protocol called [`multistream-select`]. //! -//! Example ([`secio`] Protocol Upgrade): +//! Example ([`secio`] + [`yamux`] Protocol Upgrade): //! //! ```rust //! # #[cfg(all(not(any(target_os = "emscripten", target_os = "unknown")), feature = "libp2p-secio"))] { -//! use libp2p::{Transport, tcp::TcpConfig, secio::SecioConfig, identity::Keypair}; +//! use libp2p::{Transport, tcp::TcpConfig, secio::SecioConfig, identity::Keypair, yamux}; //! let tcp = TcpConfig::new(); -//! let secio_upgrade = SecioConfig::new(Keypair::generate_ed25519()); -//! let tcp_secio = tcp.with_upgrade(secio_upgrade); -//! // let _ = tcp_secio.dial(...); +//! let secio = SecioConfig::new(Keypair::generate_ed25519()); +//! let yamux = yamux::Config::default(); +//! let transport = tcp.upgrade().authenticate(secio).multiplex(yamux); //! # } //! ``` //! In this example, `tcp_secio` is a new [`Transport`] that negotiates the secio protocol @@ -222,7 +222,6 @@ pub use self::simple::SimpleProtocol; pub use self::swarm::Swarm; pub use self::transport_ext::TransportExt; -use futures::prelude::*; use std::{error, io, time::Duration}; /// Builds a `Transport` that supports the most commonly-used protocols that libp2p supports. @@ -245,18 +244,11 @@ pub fn build_tcp_ws_secio_mplex_yamux(keypair: identity::Keypair) -> impl Transport> + Send + Sync), Error = impl error::Error + Send, Listener = impl Send, Dial = impl Send, ListenerUpgrade = impl Send> + Clone { CommonTransport::new() - .with_upgrade(secio::SecioConfig::new(keypair)) - .and_then(move |output, endpoint| { - let peer_id = output.remote_key.into_peer_id(); - let peer_id2 = peer_id.clone(); - let upgrade = core::upgrade::SelectUpgrade::new(yamux::Config::default(), mplex::MplexConfig::new()) - // TODO: use a single `.map` instead of two maps - .map_inbound(move |muxer| (peer_id, muxer)) - .map_outbound(move |muxer| (peer_id2, muxer)); - core::upgrade::apply(output.stream, upgrade, endpoint) - .map(|(id, muxer)| (id, core::muxing::StreamMuxerBox::new(muxer))) - }) - .with_timeout(Duration::from_secs(20)) + .upgrade() + .authenticate(secio::SecioConfig::new(keypair)) + .multiplex(core::upgrade::SelectUpgrade::new(yamux::Config::default(), mplex::MplexConfig::new())) + .map(|(peer, muxer), _| (peer, core::muxing::StreamMuxerBox::new(muxer))) + .timeout(Duration::from_secs(20)) } /// Implementation of `Transport` that supports the most common protocols. diff --git a/transports/websocket/Cargo.toml b/transports/websocket/Cargo.toml index ffbc535632b..026fbd5676b 100644 --- a/transports/websocket/Cargo.toml +++ b/transports/websocket/Cargo.toml @@ -18,7 +18,7 @@ rw-stream-sink = { version = "0.1.1", path = "../../misc/rw-stream-sink" } tokio-codec = "0.1.1" tokio-io = "0.1.12" tokio-rustls = "0.10.0-alpha.3" -soketto = { version = "0.2.0", features = ["deflate"] } +soketto = { version = "0.2.3", features = ["deflate"] } url = "1.7.2" webpki-roots = "0.16.0"