Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fixed ipc leak #277

Merged
merged 14 commits into from
Jun 12, 2018
1 change: 1 addition & 0 deletions ipc/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ pub extern crate jsonrpc_core;
#[cfg(test)] mod logger;

mod server;
mod select_both;
mod meta;

use jsonrpc_core as jsonrpc;
Expand Down
73 changes: 73 additions & 0 deletions ipc/src/select_both.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
use jsonrpc::futures::{Poll, Async};
use jsonrpc::futures::stream::{Stream, Fuse};

pub trait SelectBothExt: Stream {
fn select_both<S>(self, other: S) -> SelectBoth<Self, S>
where S: Stream<Item = Self::Item, Error = Self::Error>, Self: Sized;
}

impl<T> SelectBothExt for T where T: Stream {
fn select_both<S>(self, other: S) -> SelectBoth<Self, S>
where S: Stream<Item = Self::Item, Error = Self::Error>, Self: Sized {
new(self, other)
}
}

/// An adapter for merging the output of two streams.
///
/// The merged stream produces items from either of the underlying streams as
/// they become available, and the streams are polled in a round-robin fashion.
/// Errors, however, are not merged: you get at most one error at a time.
///
/// Finishes when either of the streams stops responding
#[derive(Debug)]
#[must_use = "streams do nothing unless polled"]
pub struct SelectBoth<S1, S2> {
stream1: Fuse<S1>,
stream2: Fuse<S2>,
flag: bool,
}

fn new<S1, S2>(stream1: S1, stream2: S2) -> SelectBoth<S1, S2>
where S1: Stream,
S2: Stream<Item = S1::Item, Error = S1::Error>
{
SelectBoth {
stream1: stream1.fuse(),
stream2: stream2.fuse(),
flag: false,
}
}

impl<S1, S2> Stream for SelectBoth<S1, S2>
where S1: Stream,
S2: Stream<Item = S1::Item, Error = S1::Error>
{
type Item = S1::Item;
type Error = S1::Error;

fn poll(&mut self) -> Poll<Option<S1::Item>, S1::Error> {
let (a, b) = if self.flag {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

eating a virtual dispatch on every call to poll doesn't seem great to me.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

(a macro poll_inner!(a, b) would probably end up being less verbose in the end)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it is just modified the version of select from futures library https://docs.rs/futures/0.1.21/src/futures/stream/select.rs.html#11-15

(&mut self.stream2 as &mut Stream<Item=_, Error=_>,
&mut self.stream1 as &mut Stream<Item=_, Error=_>)
} else {
(&mut self.stream1 as &mut Stream<Item=_, Error=_>,
&mut self.stream2 as &mut Stream<Item=_, Error=_>)
};
self.flag = !self.flag;

match a.poll()? {
Async::Ready(Some(item)) => return Ok(Some(item).into()),
Async::Ready(None) => return Ok(None.into()),
Async::NotReady => (),
};

self.flag = !self.flag;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is this second negation of self.flag intended?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes


match b.poll()? {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Am I right that this whole match can be replaced with simple b.poll() call? :)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🤦‍♂️ 👍

Async::Ready(Some(item)) => Ok(Some(item).into()),
Async::Ready(None) => Ok(None.into()),
Async::NotReady => Ok(Async::NotReady),
}
}
}
3 changes: 2 additions & 1 deletion ipc/src/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ use server_utils::tokio_io::AsyncRead;
use server_utils::{reactor, session, codecs};

use meta::{MetaExtractor, NoopExtractor, RequestContext};
use select_both::SelectBothExt;

/// IPC server session
pub struct Service<M: Metadata = (), S: Middleware<M> = NoopMiddleware> {
Expand Down Expand Up @@ -172,7 +173,7 @@ impl<M: Metadata, S: Middleware<M>> ServerBuilder<M, S> {
})
})
.filter_map(|x| x)
.select(receiver.map_err(|e| {
.select_both(receiver.map_err(|e| {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I for one would appreciate a comment as to why we're using select_both here – doesn't seem super-obvious! ;)

warn!(target: "ipc", "Notification error: {:?}", e);
std::io::ErrorKind::Other.into()
}));
Expand Down