From ad42b0098183262b33b18d45e471cd2ef3594e60 Mon Sep 17 00:00:00 2001 From: Pierre Krieger Date: Tue, 10 Dec 2019 11:46:30 +0100 Subject: [PATCH 1/2] Cleanups in libp2p-core in stable-futures branch --- core/Cargo.toml | 1 + core/src/either.rs | 196 ++++++++++++++++------------------ core/src/lib.rs | 2 - core/src/transport/choice.rs | 6 -- core/src/transport/map.rs | 33 +++--- core/src/transport/map_err.rs | 51 ++++----- core/src/transport/timeout.rs | 28 +++-- core/src/upgrade/map.rs | 35 +++--- core/src/upgrade/mod.rs | 2 - core/src/upgrade/select.rs | 2 - 10 files changed, 161 insertions(+), 195 deletions(-) diff --git a/core/Cargo.toml b/core/Cargo.toml index 8884097cbc2..2e1b18cb52f 100644 --- a/core/Cargo.toml +++ b/core/Cargo.toml @@ -25,6 +25,7 @@ multiaddr = { package = "parity-multiaddr", version = "0.6.0", path = "../misc/m multihash = { package = "parity-multihash", version = "0.2.0", path = "../misc/multihash" } multistream-select = { version = "0.6.0", path = "../misc/multistream-select" } parking_lot = "0.9.0" +pin-project = "0.4.6" protobuf = "2.8" quick-error = "1.2" rand = "0.7" diff --git a/core/src/either.rs b/core/src/either.rs index f1b69e41bc6..0d0fc794a7b 100644 --- a/core/src/either.rs +++ b/core/src/either.rs @@ -20,7 +20,8 @@ use crate::{muxing::StreamMuxer, ProtocolName, transport::ListenerEvent}; use futures::prelude::*; -use std::{fmt, io::{Error as IoError, Read, Write}, pin::Pin, task::Context, task::Poll}; +use pin_project::{pin_project, project}; +use std::{fmt, io::{Error as IoError}, pin::Pin, task::Context, task::Poll}; #[derive(Debug, Copy, Clone)] pub enum EitherError { @@ -56,99 +57,75 @@ where /// Implements `AsyncRead` and `AsyncWrite` and dispatches all method calls to /// either `First` or `Second`. +#[pin_project] #[derive(Debug, Copy, Clone)] pub enum EitherOutput { - First(A), - Second(B), + First(#[pin] A), + Second(#[pin] B), } impl AsyncRead for EitherOutput where - A: AsyncRead + Unpin, - B: AsyncRead + Unpin, + A: AsyncRead, + B: AsyncRead, { - fn poll_read(mut self: Pin<&mut Self>, cx: &mut Context, buf: &mut [u8]) -> Poll> { - match &mut *self { - EitherOutput::First(a) => AsyncRead::poll_read(Pin::new(a), cx, buf), - EitherOutput::Second(b) => AsyncRead::poll_read(Pin::new(b), cx, buf), - } - } -} - -// TODO: remove? -impl Read for EitherOutput -where - A: Read, - B: Read, -{ - fn read(&mut self, buf: &mut [u8]) -> Result { - match self { - EitherOutput::First(a) => a.read(buf), - EitherOutput::Second(b) => b.read(buf), + #[project] + fn poll_read(self: Pin<&mut Self>, cx: &mut Context, buf: &mut [u8]) -> Poll> { + #[project] + match self.project() { + EitherOutput::First(a) => AsyncRead::poll_read(a, cx, buf), + EitherOutput::Second(b) => AsyncRead::poll_read(b, cx, buf), } } } impl AsyncWrite for EitherOutput where - A: AsyncWrite + Unpin, - B: AsyncWrite + Unpin, + A: AsyncWrite, + B: AsyncWrite, { - fn poll_write(mut self: Pin<&mut Self>, cx: &mut Context, buf: &[u8]) -> Poll> { - match &mut *self { - EitherOutput::First(a) => AsyncWrite::poll_write(Pin::new(a), cx, buf), - EitherOutput::Second(b) => AsyncWrite::poll_write(Pin::new(b), cx, buf), - } - } - - fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll> { - match &mut *self { - EitherOutput::First(a) => AsyncWrite::poll_flush(Pin::new(a), cx), - EitherOutput::Second(b) => AsyncWrite::poll_flush(Pin::new(b), cx), + #[project] + fn poll_write(self: Pin<&mut Self>, cx: &mut Context, buf: &[u8]) -> Poll> { + #[project] + match self.project() { + EitherOutput::First(a) => AsyncWrite::poll_write(a, cx, buf), + EitherOutput::Second(b) => AsyncWrite::poll_write(b, cx, buf), } } - fn poll_close(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll> { - match &mut *self { - EitherOutput::First(a) => AsyncWrite::poll_close(Pin::new(a), cx), - EitherOutput::Second(b) => AsyncWrite::poll_close(Pin::new(b), cx), + #[project] + fn poll_flush(self: Pin<&mut Self>, cx: &mut Context) -> Poll> { + #[project] + match self.project() { + EitherOutput::First(a) => AsyncWrite::poll_flush(a, cx), + EitherOutput::Second(b) => AsyncWrite::poll_flush(b, cx), } } -} -// TODO: remove? -impl Write for EitherOutput -where - A: Write, - B: Write, -{ - fn write(&mut self, buf: &[u8]) -> Result { - match self { - EitherOutput::First(a) => a.write(buf), - EitherOutput::Second(b) => b.write(buf), - } - } - - fn flush(&mut self) -> Result<(), IoError> { - match self { - EitherOutput::First(a) => a.flush(), - EitherOutput::Second(b) => b.flush(), + #[project] + fn poll_close(self: Pin<&mut Self>, cx: &mut Context) -> Poll> { + #[project] + match self.project() { + EitherOutput::First(a) => AsyncWrite::poll_close(a, cx), + EitherOutput::Second(b) => AsyncWrite::poll_close(b, cx), } } } impl Stream for EitherOutput where - A: TryStream + Unpin, - B: TryStream + Unpin, + A: TryStream, + B: TryStream, { type Item = Result>; - fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll> { - match &mut *self { - EitherOutput::First(a) => TryStream::try_poll_next(Pin::new(a), cx) + #[project] + fn poll_next(self: Pin<&mut Self>, cx: &mut Context) -> Poll> { + #[project] + match self.project() { + EitherOutput::First(a) => TryStream::try_poll_next(a, cx) .map(|v| v.map(|r| r.map_err(EitherError::A))), - EitherOutput::Second(b) => TryStream::try_poll_next(Pin::new(b), cx) + EitherOutput::Second(b) => TryStream::try_poll_next(b, cx) .map(|v| v.map(|r| r.map_err(EitherError::B))), } } @@ -161,31 +138,39 @@ where { type Error = EitherError; - fn poll_ready(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll> { - match &mut *self { - EitherOutput::First(a) => Sink::poll_ready(Pin::new(a), cx).map_err(EitherError::A), - EitherOutput::Second(b) => Sink::poll_ready(Pin::new(b), cx).map_err(EitherError::B), + #[project] + fn poll_ready(self: Pin<&mut Self>, cx: &mut Context) -> Poll> { + #[project] + match self.project() { + EitherOutput::First(a) => Sink::poll_ready(a, cx).map_err(EitherError::A), + EitherOutput::Second(b) => Sink::poll_ready(b, cx).map_err(EitherError::B), } } - fn start_send(mut self: Pin<&mut Self>, item: I) -> Result<(), Self::Error> { - match &mut *self { - EitherOutput::First(a) => Sink::start_send(Pin::new(a), item).map_err(EitherError::A), - EitherOutput::Second(b) => Sink::start_send(Pin::new(b), item).map_err(EitherError::B), + #[project] + fn start_send(self: Pin<&mut Self>, item: I) -> Result<(), Self::Error> { + #[project] + match self.project() { + EitherOutput::First(a) => Sink::start_send(a, item).map_err(EitherError::A), + EitherOutput::Second(b) => Sink::start_send(b, item).map_err(EitherError::B), } } - fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll> { - match &mut *self { - EitherOutput::First(a) => Sink::poll_flush(Pin::new(a), cx).map_err(EitherError::A), - EitherOutput::Second(b) => Sink::poll_flush(Pin::new(b), cx).map_err(EitherError::B), + #[project] + fn poll_flush(self: Pin<&mut Self>, cx: &mut Context) -> Poll> { + #[project] + match self.project() { + EitherOutput::First(a) => Sink::poll_flush(a, cx).map_err(EitherError::A), + EitherOutput::Second(b) => Sink::poll_flush(b, cx).map_err(EitherError::B), } } - fn poll_close(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll> { - match &mut *self { - EitherOutput::First(a) => Sink::poll_close(Pin::new(a), cx).map_err(EitherError::A), - EitherOutput::Second(b) => Sink::poll_close(Pin::new(b), cx).map_err(EitherError::B), + #[project] + fn poll_close(self: Pin<&mut Self>, cx: &mut Context) -> Poll> { + #[project] + match self.project() { + EitherOutput::First(a) => Sink::poll_close(a, cx).map_err(EitherError::A), + EitherOutput::Second(b) => Sink::poll_close(b, cx).map_err(EitherError::B), } } } @@ -337,29 +322,32 @@ pub enum EitherOutbound { } /// Implements `Stream` and dispatches all method calls to either `First` or `Second`. +#[pin_project] #[derive(Debug, Copy, Clone)] #[must_use = "futures do nothing unless polled"] pub enum EitherListenStream { - First(A), - Second(B), + First(#[pin] A), + Second(#[pin] B), } impl Stream for EitherListenStream where - AStream: TryStream> + Unpin, - BStream: TryStream> + Unpin, + AStream: TryStream>, + BStream: TryStream>, { type Item = Result>, EitherError>; - fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll> { - match &mut *self { - EitherListenStream::First(a) => match TryStream::try_poll_next(Pin::new(a), cx) { + #[project] + fn poll_next(self: Pin<&mut Self>, cx: &mut Context) -> Poll> { + #[project] + match self.project() { + EitherListenStream::First(a) => match TryStream::try_poll_next(a, cx) { Poll::Pending => Poll::Pending, Poll::Ready(None) => Poll::Ready(None), Poll::Ready(Some(Ok(le))) => Poll::Ready(Some(Ok(le.map(EitherFuture::First)))), Poll::Ready(Some(Err(err))) => Poll::Ready(Some(Err(EitherError::A(err)))), }, - EitherListenStream::Second(a) => match TryStream::try_poll_next(Pin::new(a), cx) { + EitherListenStream::Second(a) => match TryStream::try_poll_next(a, cx) { Poll::Pending => Poll::Pending, Poll::Ready(None) => Poll::Ready(None), Poll::Ready(Some(Ok(le))) => Poll::Ready(Some(Ok(le.map(EitherFuture::Second)))), @@ -370,33 +358,37 @@ where } /// Implements `Future` and dispatches all method calls to either `First` or `Second`. +#[pin_project] #[derive(Debug, Copy, Clone)] #[must_use = "futures do nothing unless polled"] pub enum EitherFuture { - First(A), - Second(B), + First(#[pin] A), + Second(#[pin] B), } impl Future for EitherFuture where - AFuture: TryFuture + Unpin, - BFuture: TryFuture + Unpin, + AFuture: TryFuture, + BFuture: TryFuture, { type Output = Result, EitherError>; - fn poll(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll { - match &mut *self { - EitherFuture::First(a) => TryFuture::try_poll(Pin::new(a), cx) + #[project] + fn poll(self: Pin<&mut Self>, cx: &mut Context) -> Poll { + #[project] + match self.project() { + EitherFuture::First(a) => TryFuture::try_poll(a, cx) .map_ok(EitherOutput::First).map_err(EitherError::A), - EitherFuture::Second(a) => TryFuture::try_poll(Pin::new(a), cx) + EitherFuture::Second(a) => TryFuture::try_poll(a, cx) .map_ok(EitherOutput::Second).map_err(EitherError::B), } } } +#[pin_project] #[derive(Debug, Copy, Clone)] #[must_use = "futures do nothing unless polled"] -pub enum EitherFuture2 { A(A), B(B) } +pub enum EitherFuture2 { A(#[pin] A), B(#[pin] B) } impl Future for EitherFuture2 where @@ -405,11 +397,13 @@ where { type Output = Result, EitherError>; - fn poll(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll { - match &mut *self { - EitherFuture2::A(a) => TryFuture::try_poll(Pin::new(a), cx) + #[project] + fn poll(self: Pin<&mut Self>, cx: &mut Context) -> Poll { + #[project] + match self.project() { + EitherFuture2::A(a) => TryFuture::try_poll(a, cx) .map_ok(EitherOutput::First).map_err(EitherError::A), - EitherFuture2::B(a) => TryFuture::try_poll(Pin::new(a), cx) + EitherFuture2::B(a) => TryFuture::try_poll(a, cx) .map_ok(EitherOutput::Second).map_err(EitherError::B), } } diff --git a/core/src/lib.rs b/core/src/lib.rs index 471e928f1db..f6af9c10dc8 100644 --- a/core/src/lib.rs +++ b/core/src/lib.rs @@ -18,8 +18,6 @@ // FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER // DEALINGS IN THE SOFTWARE. -#![cfg_attr(feature = "async-await", feature(async_await))] - //! Transports, upgrades, multiplexing and node handling of *libp2p*. //! //! The main concepts of libp2p-core are: diff --git a/core/src/transport/choice.rs b/core/src/transport/choice.rs index c3bfc15d1f0..c6593912761 100644 --- a/core/src/transport/choice.rs +++ b/core/src/transport/choice.rs @@ -35,13 +35,7 @@ impl OrTransport { impl Transport for OrTransport where B: Transport, - B::Dial: Unpin, - B::Listener: Unpin, - B::ListenerUpgrade: Unpin, A: Transport, - A::Dial: Unpin, - A::Listener: Unpin, - A::ListenerUpgrade: Unpin, { type Output = EitherOutput; type Error = EitherError; diff --git a/core/src/transport/map.rs b/core/src/transport/map.rs index 7652e89232a..33772cf2b0e 100644 --- a/core/src/transport/map.rs +++ b/core/src/transport/map.rs @@ -39,9 +39,6 @@ impl Map { impl Transport for Map where T: Transport, - T::Dial: Unpin, - T::Listener: Unpin, - T::ListenerUpgrade: Unpin, F: FnOnce(T::Output, ConnectedPoint) -> D + Clone { type Output = D; @@ -65,22 +62,21 @@ where /// Custom `Stream` implementation to avoid boxing. /// /// Maps a function over every stream item. +#[pin_project::pin_project] #[derive(Clone, Debug)] -pub struct MapStream { stream: T, fun: F } - -impl Unpin for MapStream { -} +pub struct MapStream { #[pin] stream: T, fun: F } impl Stream for MapStream where - T: TryStream> + Unpin, + T: TryStream>, X: TryFuture, F: FnOnce(A, ConnectedPoint) -> B + Clone { type Item = Result>, T::Error>; - fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll> { - match TryStream::try_poll_next(Pin::new(&mut self.stream), cx) { + fn poll_next(self: Pin<&mut Self>, cx: &mut Context) -> Poll> { + let this = self.project(); + match TryStream::try_poll_next(this.stream, cx) { Poll::Ready(Some(Ok(event))) => { let event = match event { ListenerEvent::Upgrade { upgrade, local_addr, remote_addr } => { @@ -91,7 +87,7 @@ where ListenerEvent::Upgrade { upgrade: MapFuture { inner: upgrade, - args: Some((self.fun.clone(), point)) + args: Some((this.fun.clone(), point)) }, local_addr, remote_addr @@ -112,30 +108,29 @@ where /// Custom `Future` to avoid boxing. /// /// Applies a function to the inner future's result. +#[pin_project::pin_project] #[derive(Clone, Debug)] pub struct MapFuture { + #[pin] inner: T, args: Option<(F, ConnectedPoint)> } -impl Unpin for MapFuture { -} - impl Future for MapFuture where - T: TryFuture + Unpin, + T: TryFuture, F: FnOnce(A, ConnectedPoint) -> B { type Output = Result; - fn poll(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll { - let item = match TryFuture::try_poll(Pin::new(&mut self.inner), cx) { + fn poll(self: Pin<&mut Self>, cx: &mut Context) -> Poll { + let this = self.project(); + let item = match TryFuture::try_poll(this.inner, cx) { Poll::Pending => return Poll::Pending, Poll::Ready(Ok(v)) => v, Poll::Ready(Err(err)) => return Poll::Ready(Err(err)), }; - let (f, a) = self.args.take().expect("MapFuture has already finished."); + let (f, a) = this.args.take().expect("MapFuture has already finished."); Poll::Ready(Ok(f(item, a))) } } - diff --git a/core/src/transport/map_err.rs b/core/src/transport/map_err.rs index 36f48209b5d..ba36114682b 100644 --- a/core/src/transport/map_err.rs +++ b/core/src/transport/map_err.rs @@ -40,9 +40,6 @@ impl MapErr { impl Transport for MapErr where T: Transport, - T::Dial: Unpin, - T::Listener: Unpin, - T::ListenerUpgrade: Unpin, F: FnOnce(T::Error) -> TErr + Clone, TErr: error::Error, { @@ -70,67 +67,62 @@ where } /// Listening stream for `MapErr`. +#[pin_project::pin_project] pub struct MapErrListener { + #[pin] inner: T::Listener, map: F, } -impl Unpin for MapErrListener - where T: Transport -{ -} - impl Stream for MapErrListener where T: Transport, - T::Listener: Unpin, F: FnOnce(T::Error) -> TErr + Clone, TErr: error::Error, { type Item = Result>, TErr>; - fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll> { - match TryStream::try_poll_next(Pin::new(&mut self.inner), cx) { + fn poll_next(self: Pin<&mut Self>, cx: &mut Context) -> Poll> { + let this = self.project(); + match TryStream::try_poll_next(this.inner, cx) { Poll::Ready(Some(Ok(event))) => { + let map = &*this.map; let event = event.map(move |value| { MapErrListenerUpgrade { inner: value, - map: Some(self.map.clone()) + map: Some(map.clone()) } }); Poll::Ready(Some(Ok(event))) } Poll::Ready(None) => Poll::Ready(None), Poll::Pending => Poll::Pending, - Poll::Ready(Some(Err(err))) => Poll::Ready(Some(Err((self.map.clone())(err)))), + Poll::Ready(Some(Err(err))) => Poll::Ready(Some(Err((this.map.clone())(err)))), } } } /// Listening upgrade future for `MapErr`. +#[pin_project::pin_project] pub struct MapErrListenerUpgrade { + #[pin] inner: T::ListenerUpgrade, map: Option, } -impl Unpin for MapErrListenerUpgrade - where T: Transport -{ -} - impl Future for MapErrListenerUpgrade where T: Transport, - T::ListenerUpgrade: Unpin, F: FnOnce(T::Error) -> TErr, { type Output = Result; - fn poll(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll { - match Future::poll(Pin::new(&mut self.inner), cx) { + fn poll(self: Pin<&mut Self>, cx: &mut Context) -> Poll { + let this = self.project(); + match Future::poll(this.inner, cx) { Poll::Ready(Ok(value)) => Poll::Ready(Ok(value)), Poll::Pending => Poll::Pending, Poll::Ready(Err(err)) => { - let map = self.map.take().expect("poll() called again after error"); + let map = this.map.take().expect("poll() called again after error"); Poll::Ready(Err(map(err))) } } @@ -138,30 +130,27 @@ where T: Transport, } /// Dialing future for `MapErr`. +#[pin_project::pin_project] pub struct MapErrDial { + #[pin] inner: T::Dial, map: Option, } -impl Unpin for MapErrDial - where T: Transport -{ -} - impl Future for MapErrDial where T: Transport, - T::Dial: Unpin, F: FnOnce(T::Error) -> TErr, { type Output = Result; - fn poll(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll { - match Future::poll(Pin::new(&mut self.inner), cx) { + fn poll(self: Pin<&mut Self>, cx: &mut Context) -> Poll { + let this = self.project(); + match Future::poll(this.inner, cx) { Poll::Ready(Ok(value)) => Poll::Ready(Ok(value)), Poll::Pending => Poll::Pending, Poll::Ready(Err(err)) => { - let map = self.map.take().expect("poll() called again after error"); + let map = this.map.take().expect("poll() called again after error"); Poll::Ready(Err(map(err))) } } diff --git a/core/src/transport/timeout.rs b/core/src/transport/timeout.rs index c254d24186f..15fcf855ac6 100644 --- a/core/src/transport/timeout.rs +++ b/core/src/transport/timeout.rs @@ -74,9 +74,6 @@ impl Transport for TransportTimeout where InnerTrans: Transport, InnerTrans::Error: 'static, - InnerTrans::Dial: Unpin, - InnerTrans::Listener: Unpin, - InnerTrans::ListenerUpgrade: Unpin, { type Output = InnerTrans::Output; type Error = TransportTimeoutError; @@ -108,29 +105,34 @@ where // TODO: can be removed and replaced with an `impl Stream` once impl Trait is fully stable // in Rust (https://github.com/rust-lang/rust/issues/34511) +#[pin_project::pin_project] pub struct TimeoutListener { + #[pin] inner: InnerStream, timeout: Duration, } impl Stream for TimeoutListener where - InnerStream: TryStream> + Unpin + InnerStream: TryStream>, { type Item = Result>, TransportTimeoutError>; - fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll> { - let poll_out = match TryStream::try_poll_next(Pin::new(&mut self.inner), cx) { + fn poll_next(self: Pin<&mut Self>, cx: &mut Context) -> Poll> { + let this = self.project(); + + let poll_out = match TryStream::try_poll_next(this.inner, cx) { Poll::Ready(Some(Err(err))) => return Poll::Ready(Some(Err(TransportTimeoutError::Other(err)))), Poll::Ready(Some(Ok(v))) => v, Poll::Ready(None) => return Poll::Ready(None), Poll::Pending => return Poll::Pending, }; + let timeout = *this.timeout; let event = poll_out.map(move |inner_fut| { Timeout { inner: inner_fut, - timer: Delay::new(self.timeout), + timer: Delay::new(timeout), } }); @@ -142,31 +144,35 @@ where /// `TransportTimeoutError`. // TODO: can be replaced with `impl Future` once `impl Trait` are fully stable in Rust // (https://github.com/rust-lang/rust/issues/34511) +#[pin_project::pin_project] #[must_use = "futures do nothing unless polled"] pub struct Timeout { + #[pin] inner: InnerFut, timer: Delay, } impl Future for Timeout where - InnerFut: TryFuture + Unpin, + InnerFut: TryFuture, { type Output = Result>; - fn poll(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll { + fn poll(self: Pin<&mut Self>, cx: &mut Context) -> Poll { // It is debatable whether we should poll the inner future first or the timer first. // For example, if you start dialing with a timeout of 10 seconds, then after 15 seconds // the dialing succeeds on the wire, then after 20 seconds you poll, then depending on // which gets polled first, the outcome will be success or failure. - match TryFuture::try_poll(Pin::new(&mut self.inner), cx) { + let mut this = self.project(); + + match TryFuture::try_poll(this.inner, cx) { Poll::Pending => {}, Poll::Ready(Ok(v)) => return Poll::Ready(Ok(v)), Poll::Ready(Err(err)) => return Poll::Ready(Err(TransportTimeoutError::Other(err))), } - match TryFuture::try_poll(Pin::new(&mut self.timer), cx) { + match TryFuture::try_poll(Pin::new(&mut this.timer), cx) { Poll::Pending => Poll::Pending, Poll::Ready(Ok(())) => Poll::Ready(Err(TransportTimeoutError::Timeout)), Poll::Ready(Err(err)) => Poll::Ready(Err(TransportTimeoutError::TimerError(err))), diff --git a/core/src/upgrade/map.rs b/core/src/upgrade/map.rs index ebbd9a246c5..50da58d97dd 100644 --- a/core/src/upgrade/map.rs +++ b/core/src/upgrade/map.rs @@ -48,7 +48,6 @@ where impl InboundUpgrade for MapInboundUpgrade where U: InboundUpgrade, - U::Future: Unpin, F: FnOnce(U::Output) -> T { type Output = T; @@ -66,7 +65,6 @@ where impl OutboundUpgrade for MapInboundUpgrade where U: OutboundUpgrade, - U::Future: Unpin, { type Output = U::Output; type Error = U::Error; @@ -102,7 +100,6 @@ where impl InboundUpgrade for MapOutboundUpgrade where U: InboundUpgrade, - U::Future: Unpin, { type Output = U::Output; type Error = U::Error; @@ -116,7 +113,6 @@ where impl OutboundUpgrade for MapOutboundUpgrade where U: OutboundUpgrade, - U::Future: Unpin, F: FnOnce(U::Output) -> T { type Output = T; @@ -156,7 +152,6 @@ where impl InboundUpgrade for MapInboundUpgradeErr where U: InboundUpgrade, - U::Future: Unpin, F: FnOnce(U::Error) -> T { type Output = U::Output; @@ -174,7 +169,6 @@ where impl OutboundUpgrade for MapInboundUpgradeErr where U: OutboundUpgrade, - U::Future: Unpin, { type Output = U::Output; type Error = U::Error; @@ -210,7 +204,6 @@ where impl OutboundUpgrade for MapOutboundUpgradeErr where U: OutboundUpgrade, - U::Future: Unpin, F: FnOnce(U::Error) -> T { type Output = U::Output; @@ -238,54 +231,54 @@ where } } +#[pin_project::pin_project] pub struct MapFuture { + #[pin] inner: TInnerFut, map: Option, } -impl Unpin for MapFuture { -} - impl Future for MapFuture where - TInnerFut: TryFuture + Unpin, + TInnerFut: TryFuture, TMap: FnOnce(TIn) -> TOut, { type Output = Result; - fn poll(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll { - let item = match TryFuture::try_poll(Pin::new(&mut self.inner), cx) { + fn poll(self: Pin<&mut Self>, cx: &mut Context) -> Poll { + let this = self.project(); + let item = match TryFuture::try_poll(this.inner, cx) { Poll::Ready(Ok(v)) => v, Poll::Ready(Err(err)) => return Poll::Ready(Err(err)), Poll::Pending => return Poll::Pending, }; - let map = self.map.take().expect("Future has already finished"); + let map = this.map.take().expect("Future has already finished"); Poll::Ready(Ok(map(item))) } } +#[pin_project::pin_project] pub struct MapErrFuture { + #[pin] fut: T, fun: Option, } -impl Unpin for MapErrFuture { -} - impl Future for MapErrFuture where - T: TryFuture + Unpin, + T: TryFuture, F: FnOnce(E) -> A, { type Output = Result; - fn poll(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll { - match TryFuture::try_poll(Pin::new(&mut self.fut), cx) { + fn poll(self: Pin<&mut Self>, cx: &mut Context) -> Poll { + let this = self.project(); + match TryFuture::try_poll(this.fut, cx) { Poll::Pending => Poll::Pending, Poll::Ready(Ok(x)) => Poll::Ready(Ok(x)), Poll::Ready(Err(e)) => { - let f = self.fun.take().expect("Future has not resolved yet"); + let f = this.fun.take().expect("Future has not resolved yet"); Poll::Ready(Err(f(e))) } } diff --git a/core/src/upgrade/mod.rs b/core/src/upgrade/mod.rs index e2043c5ad6b..b0babe7c450 100644 --- a/core/src/upgrade/mod.rs +++ b/core/src/upgrade/mod.rs @@ -144,7 +144,6 @@ pub trait InboundUpgrade: UpgradeInfo { /// Possible error during the handshake. type Error; /// Future that performs the handshake with the remote. - // TODO: remove Unpin type Future: Future> + Unpin; /// After we have determined that the remote supports one of the protocols we support, this @@ -185,7 +184,6 @@ pub trait OutboundUpgrade: UpgradeInfo { /// Possible error during the handshake. type Error; /// Future that performs the handshake with the remote. - // TODO: remove Unpin type Future: Future> + Unpin; /// After we have determined that the remote supports one of the protocols we support, this diff --git a/core/src/upgrade/select.rs b/core/src/upgrade/select.rs index 8adcbabcebf..a8ee6504de7 100644 --- a/core/src/upgrade/select.rs +++ b/core/src/upgrade/select.rs @@ -59,9 +59,7 @@ where impl InboundUpgrade for SelectUpgrade where A: InboundUpgrade, - >::Future: Unpin, B: InboundUpgrade, - >::Future: Unpin, { type Output = EitherOutput; type Error = EitherError; From d738f4158f66590991249ea643fc75357df80e39 Mon Sep 17 00:00:00 2001 From: Pierre Krieger Date: Tue, 10 Dec 2019 13:40:40 +0100 Subject: [PATCH 2/2] More work --- core/src/upgrade/either.rs | 4 ---- core/src/upgrade/select.rs | 2 -- 2 files changed, 6 deletions(-) diff --git a/core/src/upgrade/either.rs b/core/src/upgrade/either.rs index 6eb99bb3313..9e6d07421ea 100644 --- a/core/src/upgrade/either.rs +++ b/core/src/upgrade/either.rs @@ -50,9 +50,7 @@ where impl InboundUpgrade for EitherUpgrade where A: InboundUpgrade, - >::Future: Unpin, B: InboundUpgrade, - >::Future: Unpin, { type Output = EitherOutput; type Error = EitherError; @@ -70,9 +68,7 @@ where impl OutboundUpgrade for EitherUpgrade where A: OutboundUpgrade, - >::Future: Unpin, B: OutboundUpgrade, - >::Future: Unpin, { type Output = EitherOutput; type Error = EitherError; diff --git a/core/src/upgrade/select.rs b/core/src/upgrade/select.rs index a8ee6504de7..35d82042db6 100644 --- a/core/src/upgrade/select.rs +++ b/core/src/upgrade/select.rs @@ -76,9 +76,7 @@ where impl OutboundUpgrade for SelectUpgrade where A: OutboundUpgrade, - >::Future: Unpin, B: OutboundUpgrade, - >::Future: Unpin, { type Output = EitherOutput; type Error = EitherError;