Skip to content

Commit

Permalink
Wrap multistream-select streams under a Negotiated (#1001)
Browse files Browse the repository at this point in the history
  • Loading branch information
tomaka authored Mar 19, 2019
1 parent 63e9e39 commit 96e559b
Show file tree
Hide file tree
Showing 24 changed files with 162 additions and 111 deletions.
1 change: 1 addition & 0 deletions core/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@

/// Multi-address re-export.
pub use multiaddr;
pub use multistream_select::Negotiated;

mod keys_proto;
mod peer_id;
Expand Down
2 changes: 1 addition & 1 deletion core/src/protocols_handler/one_shot.rs
Original file line number Diff line number Diff line change
Expand Up @@ -188,7 +188,7 @@ where
self.pending_error = Some(error);
}
}

#[inline]
fn connection_keep_alive(&self) -> KeepAlive {
self.keep_alive
Expand Down
5 changes: 3 additions & 2 deletions core/src/upgrade/denied.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@

use crate::upgrade::{InboundUpgrade, OutboundUpgrade, UpgradeInfo};
use futures::future;
use multistream_select::Negotiated;
use std::iter;
use void::Void;

Expand All @@ -42,7 +43,7 @@ impl<C> InboundUpgrade<C> for DeniedUpgrade {
type Error = Void;
type Future = future::Empty<Self::Output, Self::Error>;

fn upgrade_inbound(self, _: C, _: Self::Info) -> Self::Future {
fn upgrade_inbound(self, _: Negotiated<C>, _: Self::Info) -> Self::Future {
future::empty()
}
}
Expand All @@ -52,7 +53,7 @@ impl<C> OutboundUpgrade<C> for DeniedUpgrade {
type Error = Void;
type Future = future::Empty<Self::Output, Self::Error>;

fn upgrade_outbound(self, _: C, _: Self::Info) -> Self::Future {
fn upgrade_outbound(self, _: Negotiated<C>, _: Self::Info) -> Self::Future {
future::empty()
}
}
Expand Down
5 changes: 3 additions & 2 deletions core/src/upgrade/either.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ use crate::{
either::{EitherOutput, EitherError, EitherFuture2, EitherName},
upgrade::{InboundUpgrade, OutboundUpgrade, UpgradeInfo}
};
use multistream_select::Negotiated;

/// A type to represent two possible upgrade types (inbound or outbound).
#[derive(Debug, Clone)]
Expand Down Expand Up @@ -55,7 +56,7 @@ where
type Error = EitherError<EA, EB>;
type Future = EitherFuture2<A::Future, B::Future>;

fn upgrade_inbound(self, sock: C, info: Self::Info) -> Self::Future {
fn upgrade_inbound(self, sock: Negotiated<C>, info: Self::Info) -> Self::Future {
match (self, info) {
(EitherUpgrade::A(a), EitherName::A(info)) => EitherFuture2::A(a.upgrade_inbound(sock, info)),
(EitherUpgrade::B(b), EitherName::B(info)) => EitherFuture2::B(b.upgrade_inbound(sock, info)),
Expand All @@ -73,7 +74,7 @@ where
type Error = EitherError<EA, EB>;
type Future = EitherFuture2<A::Future, B::Future>;

fn upgrade_outbound(self, sock: C, info: Self::Info) -> Self::Future {
fn upgrade_outbound(self, sock: Negotiated<C>, info: Self::Info) -> Self::Future {
match (self, info) {
(EitherUpgrade::A(a), EitherName::A(info)) => EitherFuture2::A(a.upgrade_outbound(sock, info)),
(EitherUpgrade::B(b), EitherName::B(info)) => EitherFuture2::B(b.upgrade_outbound(sock, info)),
Expand Down
17 changes: 9 additions & 8 deletions core/src/upgrade/map.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@

use crate::upgrade::{InboundUpgrade, OutboundUpgrade, UpgradeInfo};
use futures::{prelude::*, try_ready};
use multistream_select::Negotiated;

/// Wraps around an upgrade and applies a closure to the output.
#[derive(Debug, Clone)]
Expand Down Expand Up @@ -52,7 +53,7 @@ where
type Error = U::Error;
type Future = MapFuture<U::Future, F>;

fn upgrade_inbound(self, sock: C, info: Self::Info) -> Self::Future {
fn upgrade_inbound(self, sock: Negotiated<C>, info: Self::Info) -> Self::Future {
MapFuture {
inner: self.upgrade.upgrade_inbound(sock, info),
map: Some(self.fun)
Expand All @@ -68,7 +69,7 @@ where
type Error = U::Error;
type Future = U::Future;

fn upgrade_outbound(self, sock: C, info: Self::Info) -> Self::Future {
fn upgrade_outbound(self, sock: Negotiated<C>, info: Self::Info) -> Self::Future {
self.upgrade.upgrade_outbound(sock, info)
}
}
Expand Down Expand Up @@ -103,7 +104,7 @@ where
type Error = U::Error;
type Future = U::Future;

fn upgrade_inbound(self, sock: C, info: Self::Info) -> Self::Future {
fn upgrade_inbound(self, sock: Negotiated<C>, info: Self::Info) -> Self::Future {
self.upgrade.upgrade_inbound(sock, info)
}
}
Expand All @@ -117,7 +118,7 @@ where
type Error = U::Error;
type Future = MapFuture<U::Future, F>;

fn upgrade_outbound(self, sock: C, info: Self::Info) -> Self::Future {
fn upgrade_outbound(self, sock: Negotiated<C>, info: Self::Info) -> Self::Future {
MapFuture {
inner: self.upgrade.upgrade_outbound(sock, info),
map: Some(self.fun)
Expand Down Expand Up @@ -156,7 +157,7 @@ where
type Error = T;
type Future = MapErrFuture<U::Future, F>;

fn upgrade_inbound(self, sock: C, info: Self::Info) -> Self::Future {
fn upgrade_inbound(self, sock: Negotiated<C>, info: Self::Info) -> Self::Future {
MapErrFuture {
fut: self.upgrade.upgrade_inbound(sock, info),
fun: Some(self.fun)
Expand All @@ -172,7 +173,7 @@ where
type Error = U::Error;
type Future = U::Future;

fn upgrade_outbound(self, sock: C, info: Self::Info) -> Self::Future {
fn upgrade_outbound(self, sock: Negotiated<C>, info: Self::Info) -> Self::Future {
self.upgrade.upgrade_outbound(sock, info)
}
}
Expand Down Expand Up @@ -208,7 +209,7 @@ where
type Error = T;
type Future = MapErrFuture<U::Future, F>;

fn upgrade_outbound(self, sock: C, info: Self::Info) -> Self::Future {
fn upgrade_outbound(self, sock: Negotiated<C>, info: Self::Info) -> Self::Future {
MapErrFuture {
fut: self.upgrade.upgrade_outbound(sock, info),
fun: Some(self.fun)
Expand All @@ -224,7 +225,7 @@ where
type Error = U::Error;
type Future = U::Future;

fn upgrade_inbound(self, sock: C, info: Self::Info) -> Self::Future {
fn upgrade_inbound(self, sock: Negotiated<C>, info: Self::Info) -> Self::Future {
self.upgrade.upgrade_inbound(sock, info)
}
}
Expand Down
5 changes: 3 additions & 2 deletions core/src/upgrade/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@ mod transfer;

use futures::future::Future;

pub use multistream_select::Negotiated;
pub use self::{
apply::{apply, apply_inbound, apply_outbound, InboundUpgradeApply, OutboundUpgradeApply},
denied::DeniedUpgrade,
Expand Down Expand Up @@ -114,7 +115,7 @@ pub trait InboundUpgrade<C>: UpgradeInfo {
/// method is called to start the handshake.
///
/// The `info` is the identifier of the protocol, as produced by `protocol_info`.
fn upgrade_inbound(self, socket: C, info: Self::Info) -> Self::Future;
fn upgrade_inbound(self, socket: Negotiated<C>, info: Self::Info) -> Self::Future;
}

/// Extension trait for `InboundUpgrade`. Automatically implemented on all types that implement
Expand Down Expand Up @@ -154,7 +155,7 @@ pub trait OutboundUpgrade<C>: UpgradeInfo {
/// method is called to start the handshake.
///
/// The `info` is the identifier of the protocol, as produced by `protocol_info`.
fn upgrade_outbound(self, socket: C, info: Self::Info) -> Self::Future;
fn upgrade_outbound(self, socket: Negotiated<C>, info: Self::Info) -> Self::Future;
}

/// Extention trait for `OutboundUpgrade`. Automatically implemented on all types that implement
Expand Down
5 changes: 3 additions & 2 deletions core/src/upgrade/select.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ use crate::{
either::{EitherOutput, EitherError, EitherFuture2, EitherName},
upgrade::{InboundUpgrade, OutboundUpgrade, UpgradeInfo}
};
use multistream_select::Negotiated;

/// Upgrade that combines two upgrades into one. Supports all the protocols supported by either
/// sub-upgrade.
Expand Down Expand Up @@ -64,7 +65,7 @@ where
type Error = EitherError<EA, EB>;
type Future = EitherFuture2<A::Future, B::Future>;

fn upgrade_inbound(self, sock: C, info: Self::Info) -> Self::Future {
fn upgrade_inbound(self, sock: Negotiated<C>, info: Self::Info) -> Self::Future {
match info {
EitherName::A(info) => EitherFuture2::A(self.0.upgrade_inbound(sock, info)),
EitherName::B(info) => EitherFuture2::B(self.1.upgrade_inbound(sock, info))
Expand All @@ -81,7 +82,7 @@ where
type Error = EitherError<EA, EB>;
type Future = EitherFuture2<A::Future, B::Future>;

fn upgrade_outbound(self, sock: C, info: Self::Info) -> Self::Future {
fn upgrade_outbound(self, sock: Negotiated<C>, info: Self::Info) -> Self::Future {
match info {
EitherName::A(info) => EitherFuture2::A(self.0.upgrade_outbound(sock, info)),
EitherName::B(info) => EitherFuture2::B(self.1.upgrade_outbound(sock, info))
Expand Down
10 changes: 5 additions & 5 deletions misc/multistream-select/src/dialer_select.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ use crate::protocol::{
use log::trace;
use std::mem;
use tokio_io::{AsyncRead, AsyncWrite};
use crate::ProtocolChoiceError;
use crate::{Negotiated, ProtocolChoiceError};

/// Future, returned by `dialer_select_proto`, which selects a protocol and dialer
/// either sequentially of by considering all protocols in parallel.
Expand Down Expand Up @@ -125,7 +125,7 @@ where
I: Iterator,
I::Item: AsRef<[u8]> + Clone
{
type Item = (I::Item, R);
type Item = (I::Item, Negotiated<R>);
type Error = ProtocolChoiceError;

fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
Expand Down Expand Up @@ -207,7 +207,7 @@ where
ListenerToDialerMessage::ProtocolAck { ref name }
if name.as_ref() == proto_name.as_ref() =>
{
return Ok(Async::Ready((proto_name, r.into_inner())))
return Ok(Async::Ready((proto_name, Negotiated(r.into_inner()))))
}
ListenerToDialerMessage::NotAvailable => {
let proto_name = protocols.next()
Expand Down Expand Up @@ -300,7 +300,7 @@ where
I: Iterator,
I::Item: AsRef<[u8]> + Clone
{
type Item = (I::Item, R);
type Item = (I::Item, Negotiated<R>);
type Error = ProtocolChoiceError;

fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
Expand Down Expand Up @@ -423,7 +423,7 @@ where
Some(ListenerToDialerMessage::ProtocolAck { ref name })
if name.as_ref() == proto_name.as_ref() =>
{
return Ok(Async::Ready((proto_name, dialer.into_inner())))
return Ok(Async::Ready((proto_name, Negotiated(dialer.into_inner()))))
}
_ => return Err(ProtocolChoiceError::UnexpectedMessage)
}
Expand Down
43 changes: 43 additions & 0 deletions misc/multistream-select/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,49 @@ mod tests;

mod protocol;

use futures::prelude::*;
use std::io;

pub use self::dialer_select::{dialer_select_proto, DialerSelectFuture};
pub use self::error::ProtocolChoiceError;
pub use self::listener_select::{listener_select_proto, ListenerSelectFuture};

/// A stream after it has been negotiated.
pub struct Negotiated<TInner>(pub(crate) TInner);

impl<TInner> io::Read for Negotiated<TInner>
where
TInner: io::Read
{
fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
self.0.read(buf)
}
}

impl<TInner> tokio_io::AsyncRead for Negotiated<TInner>
where
TInner: tokio_io::AsyncRead
{
}

impl<TInner> io::Write for Negotiated<TInner>
where
TInner: io::Write
{
fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
self.0.write(buf)
}

fn flush(&mut self) -> io::Result<()> {
self.0.flush()
}
}

impl<TInner> tokio_io::AsyncWrite for Negotiated<TInner>
where
TInner: tokio_io::AsyncWrite
{
fn shutdown(&mut self) -> Poll<(), io::Error> {
self.0.shutdown()
}
}
6 changes: 3 additions & 3 deletions misc/multistream-select/src/listener_select.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ use crate::protocol::{
use log::{debug, trace};
use std::mem;
use tokio_io::{AsyncRead, AsyncWrite};
use crate::ProtocolChoiceError;
use crate::{Negotiated, ProtocolChoiceError};

/// Helps selecting a protocol amongst the ones supported.
///
Expand Down Expand Up @@ -99,7 +99,7 @@ where
for<'a> &'a I: IntoIterator<Item = X>,
X: AsRef<[u8]> + Clone
{
type Item = (X, R, I);
type Item = (X, Negotiated<R>, I);
type Error = ProtocolChoiceError;

fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
Expand Down Expand Up @@ -171,7 +171,7 @@ where
}
};
if let Some(p) = outcome {
return Ok(Async::Ready((p, listener.into_inner(), protocols)))
return Ok(Async::Ready((p, Negotiated(listener.into_inner()), protocols)))
} else {
let stream = listener.into_future();
self.inner = ListenerSelectState::Incoming { stream, protocols }
Expand Down
10 changes: 5 additions & 5 deletions muxers/mplex/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ use bytes::Bytes;
use libp2p_core::{
Endpoint,
StreamMuxer,
upgrade::{InboundUpgrade, OutboundUpgrade, UpgradeInfo}
upgrade::{InboundUpgrade, OutboundUpgrade, UpgradeInfo, Negotiated}
};
use log::{debug, trace};
use parking_lot::Mutex;
Expand Down Expand Up @@ -158,11 +158,11 @@ impl<C> InboundUpgrade<C> for MplexConfig
where
C: AsyncRead + AsyncWrite,
{
type Output = Multiplex<C>;
type Output = Multiplex<Negotiated<C>>;
type Error = IoError;
type Future = future::FutureResult<Self::Output, IoError>;

fn upgrade_inbound(self, socket: C, _: Self::Info) -> Self::Future {
fn upgrade_inbound(self, socket: Negotiated<C>, _: Self::Info) -> Self::Future {
future::ok(self.upgrade(socket))
}
}
Expand All @@ -171,11 +171,11 @@ impl<C> OutboundUpgrade<C> for MplexConfig
where
C: AsyncRead + AsyncWrite,
{
type Output = Multiplex<C>;
type Output = Multiplex<Negotiated<C>>;
type Error = IoError;
type Future = future::FutureResult<Self::Output, IoError>;

fn upgrade_outbound(self, socket: C, _: Self::Info) -> Self::Future {
fn upgrade_outbound(self, socket: Negotiated<C>, _: Self::Info) -> Self::Future {
future::ok(self.upgrade(socket))
}
}
Expand Down
14 changes: 7 additions & 7 deletions muxers/yamux/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
//! [specification](https://github.com/hashicorp/yamux/blob/master/spec.md).

use futures::{future::{self, FutureResult}, prelude::*};
use libp2p_core::upgrade::{InboundUpgrade, OutboundUpgrade, UpgradeInfo};
use libp2p_core::upgrade::{InboundUpgrade, OutboundUpgrade, UpgradeInfo, Negotiated};
use log::debug;
use std::{io, iter, sync::atomic};
use std::io::{Error as IoError};
Expand Down Expand Up @@ -155,11 +155,11 @@ impl<C> InboundUpgrade<C> for Config
where
C: AsyncRead + AsyncWrite + 'static,
{
type Output = Yamux<C>;
type Output = Yamux<Negotiated<C>>;
type Error = io::Error;
type Future = FutureResult<Yamux<C>, io::Error>;
type Future = FutureResult<Yamux<Negotiated<C>>, io::Error>;

fn upgrade_inbound(self, i: C, _: Self::Info) -> Self::Future {
fn upgrade_inbound(self, i: Negotiated<C>, _: Self::Info) -> Self::Future {
future::ok(Yamux::new(i, self.0, yamux::Mode::Server))
}
}
Expand All @@ -168,11 +168,11 @@ impl<C> OutboundUpgrade<C> for Config
where
C: AsyncRead + AsyncWrite + 'static,
{
type Output = Yamux<C>;
type Output = Yamux<Negotiated<C>>;
type Error = io::Error;
type Future = FutureResult<Yamux<C>, io::Error>;
type Future = FutureResult<Yamux<Negotiated<C>>, io::Error>;

fn upgrade_outbound(self, i: C, _: Self::Info) -> Self::Future {
fn upgrade_outbound(self, i: Negotiated<C>, _: Self::Info) -> Self::Future {
future::ok(Yamux::new(i, self.0, yamux::Mode::Client))
}
}
Expand Down
Loading

0 comments on commit 96e559b

Please sign in to comment.