Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fixed ipc leak #277

Merged
merged 14 commits into from
Jun 12, 2018
1 change: 1 addition & 0 deletions ipc/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ pub extern crate jsonrpc_core;
#[cfg(test)] mod logger;

mod server;
mod select_both;
mod meta;

use jsonrpc_core as jsonrpc;
Expand Down
69 changes: 69 additions & 0 deletions ipc/src/select_both.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
use jsonrpc::futures::{Poll, Async};
use jsonrpc::futures::stream::{Stream, Fuse};

pub trait SelectBothExt: Stream {
fn select_both<S>(self, other: S) -> SelectBoth<Self, S>
where S: Stream<Item = Self::Item, Error = Self::Error>, Self: Sized;
}

impl<T> SelectBothExt for T where T: Stream {
fn select_both<S>(self, other: S) -> SelectBoth<Self, S>
where S: Stream<Item = Self::Item, Error = Self::Error>, Self: Sized {
new(self, other)
}
}

/// An adapter for merging the output of two streams.
///
/// The merged stream produces items from either of the underlying streams as
/// they become available, and the streams are polled in a round-robin fashion.
/// Errors, however, are not merged: you get at most one error at a time.
///
/// Finishes when either of the streams stops responding
#[derive(Debug)]
#[must_use = "streams do nothing unless polled"]
pub struct SelectBoth<S1, S2> {
stream1: Fuse<S1>,
stream2: Fuse<S2>,
flag: bool,
}

fn new<S1, S2>(stream1: S1, stream2: S2) -> SelectBoth<S1, S2>
where S1: Stream,
S2: Stream<Item = S1::Item, Error = S1::Error>
{
SelectBoth {
stream1: stream1.fuse(),
stream2: stream2.fuse(),
flag: false,
}
}

impl<S1, S2> Stream for SelectBoth<S1, S2>
where S1: Stream,
S2: Stream<Item = S1::Item, Error = S1::Error>
{
type Item = S1::Item;
type Error = S1::Error;

fn poll(&mut self) -> Poll<Option<S1::Item>, S1::Error> {
let (a, b) = if self.flag {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

eating a virtual dispatch on every call to poll doesn't seem great to me.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

(a macro poll_inner!(a, b) would probably end up being less verbose in the end)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it is just modified the version of select from futures library https://docs.rs/futures/0.1.21/src/futures/stream/select.rs.html#11-15

(&mut self.stream2 as &mut Stream<Item=_, Error=_>,
&mut self.stream1 as &mut Stream<Item=_, Error=_>)
} else {
(&mut self.stream1 as &mut Stream<Item=_, Error=_>,
&mut self.stream2 as &mut Stream<Item=_, Error=_>)
};
self.flag = !self.flag;

match a.poll()? {
Async::Ready(Some(item)) => return Ok(Some(item).into()),
Async::Ready(None) => return Ok(None.into()),
Async::NotReady => (),
};

self.flag = !self.flag;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is this second negation of self.flag intended?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes


b.poll()
}
}
31 changes: 17 additions & 14 deletions ipc/src/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ use server_utils::tokio_io::AsyncRead;
use server_utils::{reactor, session, codecs};

use meta::{MetaExtractor, NoopExtractor, RequestContext};
use select_both::SelectBothExt;

/// IPC server session
pub struct Service<M: Metadata = (), S: Middleware<M> = NoopMiddleware> {
Expand Down Expand Up @@ -172,7 +173,7 @@ impl<M: Metadata, S: Middleware<M>> ServerBuilder<M, S> {
})
})
.filter_map(|x| x)
.select(receiver.map_err(|e| {
.select_both(receiver.map_err(|e| {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I for one would appreciate a comment as to why we're using select_both here – doesn't seem super-obvious! ;)

warn!(target: "ipc", "Notification error: {:?}", e);
std::io::ErrorKind::Other.into()
}));
Expand Down Expand Up @@ -263,7 +264,7 @@ mod tests {
fn run(path: &str) -> Server {
let builder = server_builder();
let server = builder.start(path).expect("Server must run with no issues");
thread::sleep(::std::time::Duration::from_millis(50));
thread::sleep(::std::time::Duration::from_millis(5000));
server
}

Expand All @@ -274,10 +275,12 @@ mod tests {
let (writer, reader) = stream.framed(codecs::StreamCodec::stream_incoming()).split();
let reply = writer
.send(data.to_owned())
.and_then(move |_| {
reader.into_future().map_err(|(err, _)| err)
.and_then(move |stream| {
reader.into_future()
.map(|x| (stream, x))
.map_err(|(err, _)| err)
})
.and_then(|(reply, _)| {
.and_then(|(_stream, (reply, _))| {
future::ok(reply.expect("there should be one reply"))
});

Expand Down Expand Up @@ -312,7 +315,7 @@ mod tests {
fn request() {
::logger::init_log();
let path = "/tmp/test-ipc-40000";
let _server = run(path);
let server = run(path);

let result = dummy_request_str(
path,
Expand All @@ -323,7 +326,8 @@ mod tests {
result,
"{\"jsonrpc\":\"2.0\",\"result\":\"hello\",\"id\":1}",
"Response does not exactly match the expected response",
);
);
server.close();
Copy link
Contributor Author

@debris debris Jun 6, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

test issue no. 1

  • we were dropping the server before we even connected to it. everything was working only because of the leak

}

#[test]
Expand All @@ -332,7 +336,7 @@ mod tests {

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This test still occasionally fails here. Running for i in {1..20}; do cargo test; done it fails 2-3 times. Running it on by itself (for i in {1..20}; do cargo test req_parallel; done) makes things worse: about a fourth fails.

::logger::init_log();
let path = "/tmp/test-ipc-45000";
let _server = run(path);
let server = run(path);

let mut handles = Vec::new();
for _ in 0..4 {
Expand All @@ -350,8 +354,6 @@ mod tests {
"{\"jsonrpc\":\"2.0\",\"result\":\"hello\",\"id\":1}",
"Response does not exactly match the expected response",
);

::std::thread::sleep(::std::time::Duration::from_millis(10));
}
})
);
Expand All @@ -360,6 +362,7 @@ mod tests {
for handle in handles.drain(..) {
handle.join().unwrap();
}
server.close();
}

#[test]
Expand Down Expand Up @@ -400,8 +403,8 @@ mod tests {
});
let builder = ServerBuilder::new(io);

let _server = builder.start(path).expect("Server must run with no issues");
thread::sleep(::std::time::Duration::from_millis(50));
let server = builder.start(path).expect("Server must run with no issues");
thread::sleep(::std::time::Duration::from_millis(5000));
Copy link
Contributor Author

@debris debris Jun 6, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

test issue no. 2 (not solved)

  • server is not started synchronously, therefore we cannot assume that it's already available at the time of making the request. fixing this issue is actually quite complex, so I added only this workaround


let result = dummy_request_str(&path,
"{\"jsonrpc\": \"2.0\", \"method\": \"say_huge_hello\", \"params\": [], \"id\": 1}",
Expand All @@ -411,8 +414,8 @@ mod tests {
result,
huge_response_test_json(),
"Response does not exactly match the expected response",
);

);
server.close();
}


Expand Down