Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fixed ipc leak #277

Merged
merged 14 commits into from
Jun 12, 2018
90 changes: 49 additions & 41 deletions ipc/src/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -247,9 +247,11 @@ mod tests {
extern crate tokio_uds;

use std::thread;
use std::time::Duration;
use super::{ServerBuilder, Server};
use jsonrpc::{MetaIoHandler, Value};
use jsonrpc::futures::{Future, future, Stream, Sink};
use jsonrpc::futures::sync::{mpsc, oneshot};
use self::tokio_uds::UnixStream;
use server_utils::tokio_core::reactor::Core;
use server_utils::tokio_io::AsyncRead;
Expand All @@ -270,8 +272,7 @@ mod tests {
}

fn dummy_request_str(path: &str, data: &str) -> String {
let mut core = Core::new().expect("Tokio Core should be created with no errors");

let core = Core::new().unwrap();
let stream = UnixStream::connect(path, &core.handle()).expect("Should have been connected to the server");
let (writer, reader) = stream.framed(codecs::StreamCodec::stream_incoming()).split();
let reply = writer
Expand All @@ -284,7 +285,7 @@ mod tests {
future::ok(reply.expect("there should be one reply"))
});

core.run(reply).unwrap()
reply.wait().unwrap()
}

#[test]
Expand Down Expand Up @@ -316,58 +317,63 @@ mod tests {
::logger::init_log();
let path = "/tmp/test-ipc-40000";
let server = run(path);
thread::sleep(::std::time::Duration::from_millis(1000));
let (stop_signal, stop_receiver) = oneshot::channel();

let result = dummy_request_str(
path,
"{\"jsonrpc\": \"2.0\", \"method\": \"say_hello\", \"params\": [42, 23], \"id\": 1}",
let _ = thread::spawn(move || {
let result = dummy_request_str(
path,
"{\"jsonrpc\": \"2.0\", \"method\": \"say_hello\", \"params\": [42, 23], \"id\": 1}",
);
stop_signal.send(result).unwrap();
}).join().unwrap();

let _ = stop_receiver.map(move |result: String| {
assert_eq!(
result,
"{\"jsonrpc\":\"2.0\",\"result\":\"hello\",\"id\":1}",
"Response does not exactly match the expected response",
);

assert_eq!(
result,
"{\"jsonrpc\":\"2.0\",\"result\":\"hello\",\"id\":1}",
"Response does not exactly match the expected response",
);

thread::sleep(::std::time::Duration::from_millis(1000));
server.close();
server.close();
}).wait();
}

#[test]
fn req_parallel() {
use std::thread;

::logger::init_log();
let path = "/tmp/test-ipc-45000";
let server = run(path);
thread::sleep(::std::time::Duration::from_millis(1000));
thread::sleep(Duration::from_millis(100));
let (stop_signal, stop_receiver) = mpsc::channel(400);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How about we parametrize the nr of threads/loops so we can do mpsc::channel(TEST_THRDS * TEST_ITERS);?


let mut handles = Vec::new();
for _ in 0..4 {
let path = path.clone();
let mut stop_signal = stop_signal.clone();
handles.push(
thread::spawn(move || {
thread::sleep(::std::time::Duration::from_millis(100));
for _ in 0..100 {
let result = dummy_request_str(
&path,
"{\"jsonrpc\": \"2.0\", \"method\": \"say_hello\", \"params\": [42, 23], \"id\": 1}",
);

assert_eq!(
result,
"{\"jsonrpc\":\"2.0\",\"result\":\"hello\",\"id\":1}",
"Response does not exactly match the expected response",
);
stop_signal.try_send(result).unwrap();
}
})
);
}
thread::sleep(Duration::from_millis(100));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just curious: why do we still need to nap?


for handle in handles.drain(..) {
handle.join().unwrap();
}
thread::sleep(::std::time::Duration::from_millis(1000));

let _ = stop_receiver.map(|result| {
assert_eq!(
result,
"{\"jsonrpc\":\"2.0\",\"result\":\"hello\",\"id\":1}",
"Response does not exactly match the expected response",
);
}).take(400).collect().wait();
server.close();
}

Expand Down Expand Up @@ -410,22 +416,24 @@ mod tests {
let builder = ServerBuilder::new(io);

let server = builder.start(path).expect("Server must run with no issues");
thread::sleep(::std::time::Duration::from_millis(1000));
let (stop_signal, stop_receiver) = oneshot::channel();

let result = dummy_request_str(&path,
"{\"jsonrpc\": \"2.0\", \"method\": \"say_huge_hello\", \"params\": [], \"id\": 1}",
);
let _ = thread::spawn(move || {
let result = dummy_request_str(
&path,
"{\"jsonrpc\": \"2.0\", \"method\": \"say_huge_hello\", \"params\": [], \"id\": 1}",
);

assert_eq!(
result,
huge_response_test_json(),
"Response does not exactly match the expected response",
);
stop_signal.send(result).unwrap();
}).join().unwrap();

thread::sleep(::std::time::Duration::from_millis(1000));
server.close();
let _ = stop_receiver.map(move |result: String| {
assert_eq!(
result,
huge_response_test_json(),
"Response does not exactly match the expected response",
);
server.close();
});
}



}