Skip to content

Commit

Permalink
Merge pull request #1342 from tomaka/cleanup-stable
Browse files Browse the repository at this point in the history
Cleanups in libp2p-core in stable-futures branch
  • Loading branch information
tomaka authored Dec 12, 2019
2 parents 168f5d8 + d738f41 commit a720601
Show file tree
Hide file tree
Showing 11 changed files with 161 additions and 201 deletions.
1 change: 1 addition & 0 deletions core/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ multiaddr = { package = "parity-multiaddr", version = "0.6.0", path = "../misc/m
multihash = { package = "parity-multihash", version = "0.2.0", path = "../misc/multihash" }
multistream-select = { version = "0.6.0", path = "../misc/multistream-select" }
parking_lot = "0.9.0"
pin-project = "0.4.6"
protobuf = "2.8"
quick-error = "1.2"
rand = "0.7"
Expand Down
196 changes: 95 additions & 101 deletions core/src/either.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,8 @@

use crate::{muxing::StreamMuxer, ProtocolName, transport::ListenerEvent};
use futures::prelude::*;
use std::{fmt, io::{Error as IoError, Read, Write}, pin::Pin, task::Context, task::Poll};
use pin_project::{pin_project, project};
use std::{fmt, io::{Error as IoError}, pin::Pin, task::Context, task::Poll};

#[derive(Debug, Copy, Clone)]
pub enum EitherError<A, B> {
Expand Down Expand Up @@ -56,99 +57,75 @@ where

/// Implements `AsyncRead` and `AsyncWrite` and dispatches all method calls to
/// either `First` or `Second`.
#[pin_project]
#[derive(Debug, Copy, Clone)]
pub enum EitherOutput<A, B> {
First(A),
Second(B),
First(#[pin] A),
Second(#[pin] B),
}

impl<A, B> AsyncRead for EitherOutput<A, B>
where
A: AsyncRead + Unpin,
B: AsyncRead + Unpin,
A: AsyncRead,
B: AsyncRead,
{
fn poll_read(mut self: Pin<&mut Self>, cx: &mut Context, buf: &mut [u8]) -> Poll<Result<usize, IoError>> {
match &mut *self {
EitherOutput::First(a) => AsyncRead::poll_read(Pin::new(a), cx, buf),
EitherOutput::Second(b) => AsyncRead::poll_read(Pin::new(b), cx, buf),
}
}
}

// TODO: remove?
impl<A, B> Read for EitherOutput<A, B>
where
A: Read,
B: Read,
{
fn read(&mut self, buf: &mut [u8]) -> Result<usize, IoError> {
match self {
EitherOutput::First(a) => a.read(buf),
EitherOutput::Second(b) => b.read(buf),
#[project]
fn poll_read(self: Pin<&mut Self>, cx: &mut Context, buf: &mut [u8]) -> Poll<Result<usize, IoError>> {
#[project]
match self.project() {
EitherOutput::First(a) => AsyncRead::poll_read(a, cx, buf),
EitherOutput::Second(b) => AsyncRead::poll_read(b, cx, buf),
}
}
}

impl<A, B> AsyncWrite for EitherOutput<A, B>
where
A: AsyncWrite + Unpin,
B: AsyncWrite + Unpin,
A: AsyncWrite,
B: AsyncWrite,
{
fn poll_write(mut self: Pin<&mut Self>, cx: &mut Context, buf: &[u8]) -> Poll<Result<usize, IoError>> {
match &mut *self {
EitherOutput::First(a) => AsyncWrite::poll_write(Pin::new(a), cx, buf),
EitherOutput::Second(b) => AsyncWrite::poll_write(Pin::new(b), cx, buf),
}
}

fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Result<(), IoError>> {
match &mut *self {
EitherOutput::First(a) => AsyncWrite::poll_flush(Pin::new(a), cx),
EitherOutput::Second(b) => AsyncWrite::poll_flush(Pin::new(b), cx),
#[project]
fn poll_write(self: Pin<&mut Self>, cx: &mut Context, buf: &[u8]) -> Poll<Result<usize, IoError>> {
#[project]
match self.project() {
EitherOutput::First(a) => AsyncWrite::poll_write(a, cx, buf),
EitherOutput::Second(b) => AsyncWrite::poll_write(b, cx, buf),
}
}

fn poll_close(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Result<(), IoError>> {
match &mut *self {
EitherOutput::First(a) => AsyncWrite::poll_close(Pin::new(a), cx),
EitherOutput::Second(b) => AsyncWrite::poll_close(Pin::new(b), cx),
#[project]
fn poll_flush(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Result<(), IoError>> {
#[project]
match self.project() {
EitherOutput::First(a) => AsyncWrite::poll_flush(a, cx),
EitherOutput::Second(b) => AsyncWrite::poll_flush(b, cx),
}
}
}

// TODO: remove?
impl<A, B> Write for EitherOutput<A, B>
where
A: Write,
B: Write,
{
fn write(&mut self, buf: &[u8]) -> Result<usize, IoError> {
match self {
EitherOutput::First(a) => a.write(buf),
EitherOutput::Second(b) => b.write(buf),
}
}

fn flush(&mut self) -> Result<(), IoError> {
match self {
EitherOutput::First(a) => a.flush(),
EitherOutput::Second(b) => b.flush(),
#[project]
fn poll_close(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Result<(), IoError>> {
#[project]
match self.project() {
EitherOutput::First(a) => AsyncWrite::poll_close(a, cx),
EitherOutput::Second(b) => AsyncWrite::poll_close(b, cx),
}
}
}

impl<A, B, I> Stream for EitherOutput<A, B>
where
A: TryStream<Ok = I> + Unpin,
B: TryStream<Ok = I> + Unpin,
A: TryStream<Ok = I>,
B: TryStream<Ok = I>,
{
type Item = Result<I, EitherError<A::Error, B::Error>>;

fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Self::Item>> {
match &mut *self {
EitherOutput::First(a) => TryStream::try_poll_next(Pin::new(a), cx)
#[project]
fn poll_next(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Self::Item>> {
#[project]
match self.project() {
EitherOutput::First(a) => TryStream::try_poll_next(a, cx)
.map(|v| v.map(|r| r.map_err(EitherError::A))),
EitherOutput::Second(b) => TryStream::try_poll_next(Pin::new(b), cx)
EitherOutput::Second(b) => TryStream::try_poll_next(b, cx)
.map(|v| v.map(|r| r.map_err(EitherError::B))),
}
}
Expand All @@ -161,31 +138,39 @@ where
{
type Error = EitherError<A::Error, B::Error>;

fn poll_ready(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Result<(), Self::Error>> {
match &mut *self {
EitherOutput::First(a) => Sink::poll_ready(Pin::new(a), cx).map_err(EitherError::A),
EitherOutput::Second(b) => Sink::poll_ready(Pin::new(b), cx).map_err(EitherError::B),
#[project]
fn poll_ready(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Result<(), Self::Error>> {
#[project]
match self.project() {
EitherOutput::First(a) => Sink::poll_ready(a, cx).map_err(EitherError::A),
EitherOutput::Second(b) => Sink::poll_ready(b, cx).map_err(EitherError::B),
}
}

fn start_send(mut self: Pin<&mut Self>, item: I) -> Result<(), Self::Error> {
match &mut *self {
EitherOutput::First(a) => Sink::start_send(Pin::new(a), item).map_err(EitherError::A),
EitherOutput::Second(b) => Sink::start_send(Pin::new(b), item).map_err(EitherError::B),
#[project]
fn start_send(self: Pin<&mut Self>, item: I) -> Result<(), Self::Error> {
#[project]
match self.project() {
EitherOutput::First(a) => Sink::start_send(a, item).map_err(EitherError::A),
EitherOutput::Second(b) => Sink::start_send(b, item).map_err(EitherError::B),
}
}

fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Result<(), Self::Error>> {
match &mut *self {
EitherOutput::First(a) => Sink::poll_flush(Pin::new(a), cx).map_err(EitherError::A),
EitherOutput::Second(b) => Sink::poll_flush(Pin::new(b), cx).map_err(EitherError::B),
#[project]
fn poll_flush(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Result<(), Self::Error>> {
#[project]
match self.project() {
EitherOutput::First(a) => Sink::poll_flush(a, cx).map_err(EitherError::A),
EitherOutput::Second(b) => Sink::poll_flush(b, cx).map_err(EitherError::B),
}
}

fn poll_close(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Result<(), Self::Error>> {
match &mut *self {
EitherOutput::First(a) => Sink::poll_close(Pin::new(a), cx).map_err(EitherError::A),
EitherOutput::Second(b) => Sink::poll_close(Pin::new(b), cx).map_err(EitherError::B),
#[project]
fn poll_close(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Result<(), Self::Error>> {
#[project]
match self.project() {
EitherOutput::First(a) => Sink::poll_close(a, cx).map_err(EitherError::A),
EitherOutput::Second(b) => Sink::poll_close(b, cx).map_err(EitherError::B),
}
}
}
Expand Down Expand Up @@ -337,29 +322,32 @@ pub enum EitherOutbound<A: StreamMuxer, B: StreamMuxer> {
}

/// Implements `Stream` and dispatches all method calls to either `First` or `Second`.
#[pin_project]
#[derive(Debug, Copy, Clone)]
#[must_use = "futures do nothing unless polled"]
pub enum EitherListenStream<A, B> {
First(A),
Second(B),
First(#[pin] A),
Second(#[pin] B),
}

impl<AStream, BStream, AInner, BInner> Stream for EitherListenStream<AStream, BStream>
where
AStream: TryStream<Ok = ListenerEvent<AInner>> + Unpin,
BStream: TryStream<Ok = ListenerEvent<BInner>> + Unpin,
AStream: TryStream<Ok = ListenerEvent<AInner>>,
BStream: TryStream<Ok = ListenerEvent<BInner>>,
{
type Item = Result<ListenerEvent<EitherFuture<AInner, BInner>>, EitherError<AStream::Error, BStream::Error>>;

fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Self::Item>> {
match &mut *self {
EitherListenStream::First(a) => match TryStream::try_poll_next(Pin::new(a), cx) {
#[project]
fn poll_next(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Self::Item>> {
#[project]
match self.project() {
EitherListenStream::First(a) => match TryStream::try_poll_next(a, cx) {
Poll::Pending => Poll::Pending,
Poll::Ready(None) => Poll::Ready(None),
Poll::Ready(Some(Ok(le))) => Poll::Ready(Some(Ok(le.map(EitherFuture::First)))),
Poll::Ready(Some(Err(err))) => Poll::Ready(Some(Err(EitherError::A(err)))),
},
EitherListenStream::Second(a) => match TryStream::try_poll_next(Pin::new(a), cx) {
EitherListenStream::Second(a) => match TryStream::try_poll_next(a, cx) {
Poll::Pending => Poll::Pending,
Poll::Ready(None) => Poll::Ready(None),
Poll::Ready(Some(Ok(le))) => Poll::Ready(Some(Ok(le.map(EitherFuture::Second)))),
Expand All @@ -370,33 +358,37 @@ where
}

/// Implements `Future` and dispatches all method calls to either `First` or `Second`.
#[pin_project]
#[derive(Debug, Copy, Clone)]
#[must_use = "futures do nothing unless polled"]
pub enum EitherFuture<A, B> {
First(A),
Second(B),
First(#[pin] A),
Second(#[pin] B),
}

impl<AFuture, BFuture, AInner, BInner> Future for EitherFuture<AFuture, BFuture>
where
AFuture: TryFuture<Ok = AInner> + Unpin,
BFuture: TryFuture<Ok = BInner> + Unpin,
AFuture: TryFuture<Ok = AInner>,
BFuture: TryFuture<Ok = BInner>,
{
type Output = Result<EitherOutput<AInner, BInner>, EitherError<AFuture::Error, BFuture::Error>>;

fn poll(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output> {
match &mut *self {
EitherFuture::First(a) => TryFuture::try_poll(Pin::new(a), cx)
#[project]
fn poll(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output> {
#[project]
match self.project() {
EitherFuture::First(a) => TryFuture::try_poll(a, cx)
.map_ok(EitherOutput::First).map_err(EitherError::A),
EitherFuture::Second(a) => TryFuture::try_poll(Pin::new(a), cx)
EitherFuture::Second(a) => TryFuture::try_poll(a, cx)
.map_ok(EitherOutput::Second).map_err(EitherError::B),
}
}
}

#[pin_project]
#[derive(Debug, Copy, Clone)]
#[must_use = "futures do nothing unless polled"]
pub enum EitherFuture2<A, B> { A(A), B(B) }
pub enum EitherFuture2<A, B> { A(#[pin] A), B(#[pin] B) }

impl<AFut, BFut, AItem, BItem, AError, BError> Future for EitherFuture2<AFut, BFut>
where
Expand All @@ -405,11 +397,13 @@ where
{
type Output = Result<EitherOutput<AItem, BItem>, EitherError<AError, BError>>;

fn poll(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output> {
match &mut *self {
EitherFuture2::A(a) => TryFuture::try_poll(Pin::new(a), cx)
#[project]
fn poll(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output> {
#[project]
match self.project() {
EitherFuture2::A(a) => TryFuture::try_poll(a, cx)
.map_ok(EitherOutput::First).map_err(EitherError::A),
EitherFuture2::B(a) => TryFuture::try_poll(Pin::new(a), cx)
EitherFuture2::B(a) => TryFuture::try_poll(a, cx)
.map_ok(EitherOutput::Second).map_err(EitherError::B),
}
}
Expand Down
2 changes: 0 additions & 2 deletions core/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,6 @@
// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
// DEALINGS IN THE SOFTWARE.

#![cfg_attr(feature = "async-await", feature(async_await))]

//! Transports, upgrades, multiplexing and node handling of *libp2p*.
//!
//! The main concepts of libp2p-core are:
Expand Down
6 changes: 0 additions & 6 deletions core/src/transport/choice.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,13 +35,7 @@ impl<A, B> OrTransport<A, B> {
impl<A, B> Transport for OrTransport<A, B>
where
B: Transport,
B::Dial: Unpin,
B::Listener: Unpin,
B::ListenerUpgrade: Unpin,
A: Transport,
A::Dial: Unpin,
A::Listener: Unpin,
A::ListenerUpgrade: Unpin,
{
type Output = EitherOutput<A::Output, B::Output>;
type Error = EitherError<A::Error, B::Error>;
Expand Down
Loading

0 comments on commit a720601

Please sign in to comment.