Skip to content

Commit

Permalink
Merge pull request #1366 from tomaka/review-stable-fut
Browse files Browse the repository at this point in the history
Address some review comments on #1328
  • Loading branch information
tomaka authored Jan 2, 2020
2 parents ff0d2d5 + 23fc6ee commit d0944ed
Show file tree
Hide file tree
Showing 2 changed files with 26 additions and 4 deletions.
24 changes: 23 additions & 1 deletion core/src/either.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
// DEALINGS IN THE SOFTWARE.

use crate::{muxing::StreamMuxer, ProtocolName, transport::ListenerEvent};
use futures::prelude::*;
use futures::{prelude::*, io::{IoSlice, IoSliceMut}};
use pin_project::{pin_project, project};
use std::{fmt, io::{Error as IoError}, pin::Pin, task::Context, task::Poll};

Expand Down Expand Up @@ -77,6 +77,17 @@ where
EitherOutput::Second(b) => AsyncRead::poll_read(b, cx, buf),
}
}

#[project]
fn poll_read_vectored(self: Pin<&mut Self>, cx: &mut Context, bufs: &mut [IoSliceMut])
-> Poll<Result<usize, IoError>>
{
#[project]
match self.project() {
EitherOutput::First(a) => AsyncRead::poll_read_vectored(a, cx, bufs),
EitherOutput::Second(b) => AsyncRead::poll_read_vectored(b, cx, bufs),
}
}
}

impl<A, B> AsyncWrite for EitherOutput<A, B>
Expand All @@ -93,6 +104,17 @@ where
}
}

#[project]
fn poll_write_vectored(self: Pin<&mut Self>, cx: &mut Context, bufs: &[IoSlice])
-> Poll<Result<usize, IoError>>
{
#[project]
match self.project() {
EitherOutput::First(a) => AsyncWrite::poll_write_vectored(a, cx, bufs),
EitherOutput::Second(b) => AsyncWrite::poll_write_vectored(b, cx, bufs),
}
}

#[project]
fn poll_flush(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Result<(), IoError>> {
#[project]
Expand Down
6 changes: 3 additions & 3 deletions core/src/nodes/tasks/manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ use crate::{
}
};
use fnv::FnvHashMap;
use futures::{prelude::*, channel::mpsc, executor::ThreadPool, stream::FuturesUnordered, task::SpawnExt as _};
use futures::{prelude::*, channel::mpsc, executor::ThreadPool, stream::FuturesUnordered};
use std::{collections::hash_map::{Entry, OccupiedEntry}, error, fmt, pin::Pin, task::Context, task::Poll};
use super::{TaskId, task::{Task, FromTaskMessage, ToTaskMessage}, Error};

Expand Down Expand Up @@ -177,7 +177,7 @@ impl<I, O, H, E, HE, T, C> Manager<I, O, H, E, HE, T, C> {

let task = Box::pin(Task::new(task_id, self.events_tx.clone(), rx, future, handler));
if let Some(threads_pool) = &mut self.threads_pool {
threads_pool.spawn(task).expect("spawning a task on a thread pool never fails; qed");
threads_pool.spawn_ok(task);
} else {
self.local_spawns.push(task);
}
Expand Down Expand Up @@ -213,7 +213,7 @@ impl<I, O, H, E, HE, T, C> Manager<I, O, H, E, HE, T, C> {
Task::node(task_id, self.events_tx.clone(), rx, HandledNode::new(muxer, handler));

if let Some(threads_pool) = &mut self.threads_pool {
threads_pool.spawn(Box::pin(task)).expect("spawning a task on a threads pool never fails; qed");
threads_pool.spawn_ok(Box::pin(task));
} else {
self.local_spawns.push(Box::pin(task));
}
Expand Down

0 comments on commit d0944ed

Please sign in to comment.