Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fixed ipc leak #277

Merged
merged 14 commits into from
Jun 12, 2018
3 changes: 2 additions & 1 deletion ipc/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,10 @@ parity-tokio-ipc = { git = "https://github.com/nikvolf/parity-tokio-ipc" }
[dev-dependencies]
env_logger = "0.5"
lazy_static = "1.0"
parking_lot = "0.5"

[target.'cfg(not(windows))'.dev-dependencies]
tokio-uds = "0.1"
tokio-uds = "0.2"

[badges]
travis-ci = { repository = "paritytech/jsonrpc", branch = "master"}
1 change: 1 addition & 0 deletions ipc/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ pub extern crate jsonrpc_core;
#[cfg(test)] mod logger;

mod server;
mod select_with_weak;
mod meta;

use jsonrpc_core as jsonrpc;
Expand Down
77 changes: 77 additions & 0 deletions ipc/src/select_with_weak.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
use jsonrpc::futures::{Poll, Async};
use jsonrpc::futures::stream::{Stream, Fuse};

pub trait SelectWithWeakExt: Stream {
fn select_with_weak<S>(self, other: S) -> SelectWithWeak<Self, S>
where S: Stream<Item = Self::Item, Error = Self::Error>, Self: Sized;
}

impl<T> SelectWithWeakExt for T where T: Stream {
fn select_with_weak<S>(self, other: S) -> SelectWithWeak<Self, S>
where S: Stream<Item = Self::Item, Error = Self::Error>, Self: Sized {
new(self, other)
}
}

/// An adapter for merging the output of two streams.
///
/// The merged stream produces items from either of the underlying streams as
/// they become available, and the streams are polled in a round-robin fashion.
/// Errors, however, are not merged: you get at most one error at a time.
///
/// Finishes when strong stream finishes
#[derive(Debug)]
#[must_use = "streams do nothing unless polled"]
pub struct SelectWithWeak<S1, S2> {
strong: Fuse<S1>,
weak: Fuse<S2>,
use_strong: bool,
}

fn new<S1, S2>(stream1: S1, stream2: S2) -> SelectWithWeak<S1, S2>
where S1: Stream,
S2: Stream<Item = S1::Item, Error = S1::Error>
{
SelectWithWeak {
strong: stream1.fuse(),
weak: stream2.fuse(),
use_strong: false,
}
}

impl<S1, S2> Stream for SelectWithWeak<S1, S2>
where S1: Stream,
S2: Stream<Item = S1::Item, Error = S1::Error>
{
type Item = S1::Item;
type Error = S1::Error;

fn poll(&mut self) -> Poll<Option<S1::Item>, S1::Error> {
let mut checked_strong = false;
loop {
if self.use_strong {
match self.strong.poll()? {
Async::Ready(Some(item)) => {
self.use_strong = false;
return Ok(Some(item).into())
},
Async::Ready(None) => return Ok(None.into()),
Async::NotReady => {
if !checked_strong {
self.use_strong = false;
} else {
return Ok(Async::NotReady)
}
}
}
checked_strong = true;
} else {
self.use_strong = true;
match self.weak.poll()? {
Async::Ready(Some(item)) => return Ok(Some(item).into()),
Async::Ready(None) | Async::NotReady => (),
}
}
}
}
}
172 changes: 121 additions & 51 deletions ipc/src/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ use server_utils::tokio_io::AsyncRead;
use server_utils::{reactor, session, codecs};

use meta::{MetaExtractor, NoopExtractor, RequestContext};
use select_with_weak::SelectWithWeakExt;

/// IPC server session
pub struct Service<M: Metadata = (), S: Middleware<M> = NoopMiddleware> {
Expand Down Expand Up @@ -131,7 +132,6 @@ impl<M: Metadata, S: Middleware<M>> ServerBuilder<M, S> {
}
};

start_signal.send(Ok(())).expect("Cannot fail since receiver never dropped before receiving");
let remote = handle.remote().clone();
let connections = listener.incoming();
let mut id = 0u64;
Expand Down Expand Up @@ -172,7 +172,9 @@ impl<M: Metadata, S: Middleware<M>> ServerBuilder<M, S> {
})
})
.filter_map(|x| x)
.select(receiver.map_err(|e| {
// we use `select_with_weak` here, instead of `select`, to close the stream
// as soon as the ipc pipe is closed
.select_with_weak(receiver.map_err(|e| {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

maintains the stream as long as the connection exists. the outgoing stream may be closed earlier

warn!(target: "ipc", "Notification error: {:?}", e);
std::io::ErrorKind::Other.into()
}));
Expand All @@ -187,6 +189,7 @@ impl<M: Metadata, S: Middleware<M>> ServerBuilder<M, S> {

Ok(())
});
start_signal.send(Ok(())).expect("Cannot fail since receiver never dropped before receiving");
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

start_signal was triggered before the server was fully set up. After fixing the order of initialisation, we no longer need thread::sleep in tests


let stop = stop_receiver.map_err(|_| std::io::ErrorKind::Interrupted.into());
future::Either::B(
Expand Down Expand Up @@ -242,15 +245,19 @@ impl Drop for Server {
#[cfg(not(windows))]
mod tests {
extern crate tokio_uds;
extern crate parking_lot;

use std::thread;
use std::sync::Arc;
use super::{ServerBuilder, Server};
use jsonrpc::{MetaIoHandler, Value};
use jsonrpc::futures::{Future, future, Stream, Sink};
use jsonrpc::futures::sync::{mpsc, oneshot};
use self::tokio_uds::UnixStream;
use server_utils::tokio_core::reactor::Core;
use self::parking_lot::Mutex;
use server_utils::tokio_io::AsyncRead;
use server_utils::codecs;
use meta::{MetaExtractor, RequestContext};

fn server_builder() -> ServerBuilder {
let mut io = MetaIoHandler::<()>::default();
Expand All @@ -263,25 +270,25 @@ mod tests {
fn run(path: &str) -> Server {
let builder = server_builder();
let server = builder.start(path).expect("Server must run with no issues");
thread::sleep(::std::time::Duration::from_millis(50));
server
}

fn dummy_request_str(path: &str, data: &str) -> String {
let mut core = Core::new().expect("Tokio Core should be created with no errors");

let stream = UnixStream::connect(path, &core.handle()).expect("Should have been connected to the server");
let (writer, reader) = stream.framed(codecs::StreamCodec::stream_incoming()).split();
let reply = writer
.send(data.to_owned())
.and_then(move |_| {
reader.into_future().map_err(|(err, _)| err)
})
.and_then(|(reply, _)| {
future::ok(reply.expect("there should be one reply"))
});
let stream_future = UnixStream::connect(path);
let reply = stream_future.and_then(|stream| {
let stream= stream.framed(codecs::StreamCodec::stream_incoming());
let reply = stream
.send(data.to_owned())
.and_then(move |stream| {
stream.into_future().map_err(|(err, _)| err)
})
.and_then(|(reply, _)| {
future::ok(reply.expect("there should be one reply"))
});
reply
});

core.run(reply).unwrap()
reply.wait().expect("wait for reply")
}

#[test]
Expand All @@ -304,54 +311,54 @@ mod tests {
let path = "/tmp/test-ipc-30000";
let _server = run(path);

let core = Core::new().expect("Tokio Core should be created with no errors");
UnixStream::connect(path, &core.handle()).expect("Socket should connect");
UnixStream::connect(path).wait().expect("Socket should connect");
}

#[test]
fn request() {
::logger::init_log();
let path = "/tmp/test-ipc-40000";
let _server = run(path);
let server = run(path);
let (stop_signal, stop_receiver) = oneshot::channel();

let result = dummy_request_str(
path,
"{\"jsonrpc\": \"2.0\", \"method\": \"say_hello\", \"params\": [42, 23], \"id\": 1}",
);
let t = thread::spawn(move || {
let result = dummy_request_str(
path,
"{\"jsonrpc\": \"2.0\", \"method\": \"say_hello\", \"params\": [42, 23], \"id\": 1}",
);
stop_signal.send(result).unwrap();
});
t.join().unwrap();

assert_eq!(
result,
"{\"jsonrpc\":\"2.0\",\"result\":\"hello\",\"id\":1}",
"Response does not exactly match the expected response",
let _ = stop_receiver.map(move |result: String| {
assert_eq!(
result,
"{\"jsonrpc\":\"2.0\",\"result\":\"hello\",\"id\":1}",
"Response does not exactly match the expected response",
);
server.close();
}).wait();
}

#[test]
fn req_parallel() {
use std::thread;

::logger::init_log();
let path = "/tmp/test-ipc-45000";
let _server = run(path);
let server = run(path);
let (stop_signal, stop_receiver) = mpsc::channel(400);

let mut handles = Vec::new();
for _ in 0..4 {
let path = path.clone();
let mut stop_signal = stop_signal.clone();
handles.push(
thread::spawn(move || {
for _ in 0..100 {
let result = dummy_request_str(
&path,
"{\"jsonrpc\": \"2.0\", \"method\": \"say_hello\", \"params\": [42, 23], \"id\": 1}",
);

assert_eq!(
result,
"{\"jsonrpc\":\"2.0\",\"result\":\"hello\",\"id\":1}",
"Response does not exactly match the expected response",
);

::std::thread::sleep(::std::time::Duration::from_millis(10));
stop_signal.try_send(result).unwrap();
}
})
);
Expand All @@ -360,6 +367,15 @@ mod tests {
for handle in handles.drain(..) {
handle.join().unwrap();
}

let _ = stop_receiver.map(|result| {
assert_eq!(
result,
"{\"jsonrpc\":\"2.0\",\"result\":\"hello\",\"id\":1}",
"Response does not exactly match the expected response",
);
}).take(400).collect().wait();
server.close();
}

#[test]
Expand All @@ -370,8 +386,7 @@ mod tests {
server.close();

assert!(::std::fs::metadata(path).is_err(), "There should be no socket file left");
let core = Core::new().expect("Tokio Core should be created with no errors");
assert!(UnixStream::connect(path, &core.handle()).is_err(), "Connection to the closed socket should fail");
assert!(UnixStream::connect(path).wait().is_err(), "Connection to the closed socket should fail");
}

fn huge_response_test_str() -> String {
Expand All @@ -392,6 +407,7 @@ mod tests {

#[test]
fn test_huge_response() {
::logger::init_log();
let path = "/tmp/test-ipc-60000";

let mut io = MetaIoHandler::<()>::default();
Expand All @@ -400,21 +416,75 @@ mod tests {
});
let builder = ServerBuilder::new(io);

let _server = builder.start(path).expect("Server must run with no issues");
thread::sleep(::std::time::Duration::from_millis(50));

let result = dummy_request_str(&path,
"{\"jsonrpc\": \"2.0\", \"method\": \"say_huge_hello\", \"params\": [], \"id\": 1}",
);
let server = builder.start(path).expect("Server must run with no issues");
let (stop_signal, stop_receiver) = oneshot::channel();

assert_eq!(
result,
huge_response_test_json(),
"Response does not exactly match the expected response",
let t = thread::spawn(move || {
let result = dummy_request_str(
&path,
"{\"jsonrpc\": \"2.0\", \"method\": \"say_huge_hello\", \"params\": [], \"id\": 1}",
);

stop_signal.send(result).unwrap();
});
t.join().unwrap();

let _ = stop_receiver.map(move |result: String| {
assert_eq!(
result,
huge_response_test_json(),
"Response does not exactly match the expected response",
);
server.close();
}).wait();
}

#[test]
fn test_session_end() {
struct SessionEndMeta {
drop_signal: Option<oneshot::Sender<()>>,
}

impl Drop for SessionEndMeta {
fn drop(&mut self) {
trace!(target: "ipc", "Dropping session meta");
self.drop_signal.take().unwrap().send(()).unwrap()
}
}

struct SessionEndExtractor {
drop_receivers: Arc<Mutex<mpsc::Sender<oneshot::Receiver<()>>>>,
}

impl MetaExtractor<Arc<SessionEndMeta>> for SessionEndExtractor {
fn extract(&self, _context: &RequestContext) -> Arc<SessionEndMeta> {
let (signal, receiver) = oneshot::channel();
self.drop_receivers.lock().try_send(receiver).unwrap();
let meta = SessionEndMeta {
drop_signal: Some(signal),
};
Arc::new(meta)
}
}

::logger::init_log();
let path = "/tmp/test-ipc-30009";
let (signal, receiver) = mpsc::channel(16);
let session_metadata_extractor = SessionEndExtractor {
drop_receivers: Arc::new(Mutex::new(signal))
};

let io = MetaIoHandler::<Arc<SessionEndMeta>>::default();
let builder = ServerBuilder::with_meta_extractor(io, session_metadata_extractor);
let server = builder.start(path).expect("Server must run with no issues");
{
let _ = UnixStream::connect(path).wait().expect("Socket should connect");
}

receiver.into_future()
.map_err(|_| ())
.and_then(|drop_receiver| drop_receiver.0.unwrap().map_err(|_| ()))
.wait().unwrap();
server.close();
}
}