Skip to content

Commit

Permalink
Rework the transport upgrade API.
Browse files Browse the repository at this point in the history
ALthough transport upgrades must follow a specific pattern
in order fot the resulting transport to be usable with a
`Network` or `Swarm`, that pattern is currently not well
reflected in the transport upgrade API. Rather, transport
upgrades are rather laborious and involve non-trivial code
duplication.

This commit introduces a `transport::upgrade::Builder` that is
obtained from `Transport::upgrade`. The `Builder` encodes the
previously implicit rules for transport upgrades:

  1. Authentication upgrades must happen first.
  2. Any number of upgrades may follow.
  3. A multiplexer upgrade must happen last.

Since multiplexing is the last (regular) transport upgrade (because
that upgrade yields a `StreamMuxer` which is no longer a `AsyncRead`
/ `AsyncWrite` resource, which the upgrade process is based on),
the upgrade starts with `Transport::upgrade` and ends with
`Builder::multiplex`, which drops back down to the `Transport`,
providing a fluent API.

Authentication and multiplexer upgrades must furthermore adhere
to a minimal contract w.r.t their outputs:

  1. An authentication upgrade is given an (async) I/O resource `C`
     and must produce a pair `(I, D)` where `I: ConnectionInfo` and
     `D` is a new (async) I/O resource `D`.
  2. A multiplexer upgrade is given an (async) I/O resource `C`
     and must produce a `M: StreamMuxer`.
  • Loading branch information
Roman S. Borschel committed Sep 5, 2019
1 parent c0b379b commit 38fe01f
Show file tree
Hide file tree
Showing 20 changed files with 625 additions and 349 deletions.
1 change: 1 addition & 0 deletions core/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@ pub use identity::PublicKey;
pub use transport::Transport;
pub use translation::address_translation;
pub use upgrade::{InboundUpgrade, OutboundUpgrade, UpgradeInfo, UpgradeError, ProtocolName};
pub use nodes::ConnectionInfo;

#[derive(Debug, Copy, Clone, PartialEq, Eq, Hash)]
pub enum Endpoint {
Expand Down
12 changes: 3 additions & 9 deletions core/src/nodes/network.rs
Original file line number Diff line number Diff line change
Expand Up @@ -782,9 +782,8 @@ where
TTrans: Transport<Output = (TConnInfo, TMuxer)>,
TTrans::Error: Send + 'static,
TTrans::Dial: Send + 'static,
TMuxer: StreamMuxer + Send + Sync + 'static,
TMuxer: Send + Sync + 'static,
TMuxer::OutboundSubstream: Send,
TMuxer::Substream: Send,
TInEvent: Send + 'static,
TOutEvent: Send + 'static,
TConnInfo: Send + 'static,
Expand Down Expand Up @@ -937,12 +936,10 @@ where
TTrans: Transport<Output = (TConnInfo, TMuxer)>,
TTrans::Dial: Send + 'static,
TTrans::Error: Send + 'static,
TMuxer: StreamMuxer + Send + Sync + 'static,
TMuxer: Send + Sync + 'static,
TMuxer::OutboundSubstream: Send,
TMuxer::Substream: Send,
TInEvent: Send + 'static,
TOutEvent: Send + 'static,
TConnInfo: Send + 'static,
TPeerId: Send + 'static,
{
let reach_id = match self.transport().clone().dial(first.clone()) {
Expand Down Expand Up @@ -985,14 +982,12 @@ where
TTrans::Error: Send + 'static,
TTrans::Dial: Send + 'static,
TTrans::ListenerUpgrade: Send + 'static,
TMuxer: StreamMuxer + Send + Sync + 'static,
TMuxer: Send + Sync + 'static,
TMuxer::OutboundSubstream: Send,
TMuxer::Substream: Send,
TInEvent: Send + 'static,
TOutEvent: Send + 'static,
THandler: IntoNodeHandler<(TConnInfo, ConnectedPoint)> + Send + 'static,
THandler::Handler: NodeHandler<Substream = Substream<TMuxer>, InEvent = TInEvent, OutEvent = TOutEvent, Error = THandlerErr> + Send + 'static,
<THandler::Handler as NodeHandler>::OutboundOpenInfo: Send + 'static, // TODO: shouldn't be necessary
THandlerErr: error::Error + Send + 'static,
TConnInfo: Clone,
TPeerId: AsRef<[u8]> + Send + 'static,
Expand Down Expand Up @@ -1151,7 +1146,6 @@ where
TTrans: Transport<Output = (TConnInfo, TMuxer)> + Clone,
TMuxer: StreamMuxer + Send + Sync + 'static,
TMuxer::OutboundSubstream: Send,
TMuxer::Substream: Send,
TInEvent: Send + 'static,
TOutEvent: Send + 'static,
TConnInfo: ConnectionInfo<PeerId = TPeerId> + Clone + Send + 'static,
Expand Down
124 changes: 58 additions & 66 deletions core/src/transport/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,12 +25,11 @@
//! any desired protocols. The rest of the module defines combinators for
//! modifying a transport through composition with other transports or protocol upgrades.

use crate::{InboundUpgrade, OutboundUpgrade, ConnectedPoint};
use crate::ConnectedPoint;
use futures::prelude::*;
use multiaddr::Multiaddr;
use std::{error, fmt};
use std::{error::Error, fmt};
use std::time::Duration;
use tokio_io::{AsyncRead, AsyncWrite};

pub mod and_then;
pub mod boxed;
Expand Down Expand Up @@ -69,11 +68,7 @@ pub use self::upgrade::Upgrade;
///
/// Additional protocols can be layered on top of the connections established
/// by a [`Transport`] through an upgrade mechanism that is initiated via
/// [`with_upgrade`](Transport::with_upgrade) and optionally followed by further upgrades
/// through chaining calls to [`with_upgrade`](Transport::with_upgrade) and
/// [`and_then`](Transport::and_then). Thereby every upgrade yields a new [`Transport`]
/// whose connection setup incorporates all earlier upgrades followed by the new upgrade,
/// i.e. the order of the upgrades is significant.
/// [`upgrade`](Transport::upgrade).
///
/// > **Note**: The methods of this trait use `self` and not `&self` or `&mut self`. In other
/// > words, listening or dialing consumes the transport object. This has been designed
Expand All @@ -88,7 +83,7 @@ pub trait Transport {
type Output;

/// An error that occurred during connection setup.
type Error: error::Error;
type Error: Error;

/// A stream of [`Output`](Transport::Output)s for inbound connections.
///
Expand Down Expand Up @@ -127,104 +122,101 @@ pub trait Transport {
where
Self: Sized;

/// Turns this `Transport` into an abstract boxed transport.
fn boxed(self) -> boxed::Boxed<Self::Output, Self::Error>
where Self: Sized + Clone + Send + Sync + 'static,
Self::Dial: Send + 'static,
Self::Listener: Send + 'static,
Self::ListenerUpgrade: Send + 'static,
{
boxed::boxed(self)
}

/// Applies a function on the connections created by the transport.
fn map<F, O>(self, map: F) -> map::Map<Self, F>
fn map<F, O>(self, f: F) -> map::Map<Self, F>
where
Self: Sized,
F: FnOnce(Self::Output, ConnectedPoint) -> O + Clone
{
map::Map::new(self, map)
map::Map::new(self, f)
}

/// Applies a function on the errors generated by the futures of the transport.
fn map_err<F, TNewErr>(self, map_err: F) -> map_err::MapErr<Self, F>
fn map_err<F, E>(self, f: F) -> map_err::MapErr<Self, F>
where
Self: Sized,
F: FnOnce(Self::Error) -> TNewErr + Clone
E: Error + 'static,
F: FnOnce(Self::Error) -> E + Clone
{
map_err::MapErr::new(self, map_err)
map_err::MapErr::new(self, f)
}

/// Builds a new transport that falls back to another transport when
/// encountering errors on dialing or listening for connections.
/// Applies a function producing an asynchronous result to every connection
/// created by this transport.
///
/// The returned transport will act like `self`, except that if `listen_on` or `dial`
/// return an error then `other` will be tried.
fn or_transport<T>(self, other: T) -> OrTransport<Self, T>
/// This function can be used for ad-hoc protocol upgrades or
/// for processing or adapting the output for following configurations.
///
/// For the high-level transport upgrade procedure, see [`Transport::upgrade`].
fn and_then<C, F, O>(self, f: C) -> and_then::AndThen<Self, C>
where
Self: Sized,
C: FnOnce(Self::Output, ConnectedPoint) -> F + Clone,
F: IntoFuture<Item = O>,
<F as IntoFuture>::Error: Error + 'static
{
OrTransport::new(self, other)
and_then::AndThen::new(self, f)
}

/// Wraps this transport inside an [`Upgrade`].
///
/// Whenever an inbound or outbound connection is established by this
/// transport, the upgrade is applied on the current state of the
/// connection (which may have already gone through previous upgrades)
/// as an [`upgrade::InboundUpgrade`] or [`upgrade::OutboundUpgrade`],
/// respectively.
fn with_upgrade<U, O, E>(self, upgrade: U) -> Upgrade<Self, U>
where
Self: Sized,
Self::Output: AsyncRead + AsyncWrite,
U: InboundUpgrade<Self::Output, Output = O, Error = E>,
U: OutboundUpgrade<Self::Output, Output = O, Error = E>
/// Turns the transport into an abstract boxed (i.e. heap-allocated) transport.
fn boxed(self) -> boxed::Boxed<Self::Output, Self::Error>
where Self: Sized + Clone + Send + Sync + 'static,
Self::Dial: Send + 'static,
Self::Listener: Send + 'static,
Self::ListenerUpgrade: Send + 'static,
{
Upgrade::new(self, upgrade)
boxed::boxed(self)
}

/// Applies a function producing an asynchronous result to every connection
/// created by this transport.
/// Adds a fallback transport that is used when encountering errors
/// while establishing inbound or outbound connections.
///
/// This function can be used for ad-hoc protocol upgrades on a transport or
/// for processing or adapting the output of an earlier upgrade before
/// applying the next upgrade.
fn and_then<C, F, O>(self, upgrade: C) -> and_then::AndThen<Self, C>
/// The returned transport will act like `self`, except that if `listen_on` or `dial`
/// return an error then `other` will be tried.
fn or_transport<U>(self, other: U) -> OrTransport<Self, U>
where
Self: Sized,
C: FnOnce(Self::Output, ConnectedPoint) -> F + Clone,
F: IntoFuture<Item = O>
U: Transport,
<U as Transport>::Error: 'static
{
and_then::AndThen::new(self, upgrade)
OrTransport::new(self, other)
}

/// Adds a timeout to the connection setup (including upgrades) for all inbound
/// and outbound connection attempts.
fn with_timeout(self, timeout: Duration) -> timeout::TransportTimeout<Self>
/// Adds a timeout to the connection setup (including upgrades) for all
/// inbound and outbound connections established through the transport.
fn timeout(self, timeout: Duration) -> timeout::TransportTimeout<Self>
where
Self: Sized,
Self: Sized
{
timeout::TransportTimeout::new(self, timeout)
}

/// Adds a timeout to the connection setup (including upgrades) for all outbound
/// connection attempts.
fn with_outbound_timeout(self, timeout: Duration) -> timeout::TransportTimeout<Self>
/// connections established through the transport.
fn outbound_timeout(self, timeout: Duration) -> timeout::TransportTimeout<Self>
where
Self: Sized,
Self: Sized
{
timeout::TransportTimeout::with_outgoing_timeout(self, timeout)
}

/// Adds a timeout to the connection setup (including upgrades) for all inbound
/// connection attempts.
fn with_inbound_timeout(self, timeout: Duration) -> timeout::TransportTimeout<Self>
/// connections established through the transport.
fn inbound_timeout(self, timeout: Duration) -> timeout::TransportTimeout<Self>
where
Self: Sized,
Self: Sized
{
timeout::TransportTimeout::with_ingoing_timeout(self, timeout)
}

/// Begins a series of protocol upgrades via an [`upgrade::Builder`].
fn upgrade(self) -> upgrade::Builder<Self>
where
Self: Sized,
Self::Error: 'static
{
upgrade::Builder::new(self)
}
}

/// Event produced by [`Transport::Listener`]s.
Expand Down Expand Up @@ -362,10 +354,10 @@ where TErr: fmt::Display,
}
}

impl<TErr> error::Error for TransportError<TErr>
where TErr: error::Error + 'static,
impl<TErr> Error for TransportError<TErr>
where TErr: Error + 'static,
{
fn source(&self) -> Option<&(dyn error::Error + 'static)> {
fn source(&self) -> Option<&(dyn Error + 'static)> {
match self {
TransportError::MultiaddrNotSupported(_) => None,
TransportError::Other(err) => Some(err),
Expand Down
Loading

0 comments on commit 38fe01f

Please sign in to comment.