From adc2f690bad99f8179871f35248d22654792343a Mon Sep 17 00:00:00 2001 From: debris Date: Wed, 6 Jun 2018 14:03:53 +0200 Subject: [PATCH 01/14] fixed ipc connection leak, closes #275 --- ipc/src/lib.rs | 1 + ipc/src/select_both.rs | 73 ++++++++++++++++++++++++++++++++++++++++++ ipc/src/server.rs | 3 +- 3 files changed, 76 insertions(+), 1 deletion(-) create mode 100644 ipc/src/select_both.rs diff --git a/ipc/src/lib.rs b/ipc/src/lib.rs index 629118fa8..414c249f4 100644 --- a/ipc/src/lib.rs +++ b/ipc/src/lib.rs @@ -15,6 +15,7 @@ pub extern crate jsonrpc_core; #[cfg(test)] mod logger; mod server; +mod select_both; mod meta; use jsonrpc_core as jsonrpc; diff --git a/ipc/src/select_both.rs b/ipc/src/select_both.rs new file mode 100644 index 000000000..4077b7cb3 --- /dev/null +++ b/ipc/src/select_both.rs @@ -0,0 +1,73 @@ +use jsonrpc::futures::{Poll, Async}; +use jsonrpc::futures::stream::{Stream, Fuse}; + +pub trait SelectBothExt: Stream { + fn select_both(self, other: S) -> SelectBoth + where S: Stream, Self: Sized; +} + +impl SelectBothExt for T where T: Stream { + fn select_both(self, other: S) -> SelectBoth + where S: Stream, Self: Sized { + new(self, other) + } +} + +/// An adapter for merging the output of two streams. +/// +/// The merged stream produces items from either of the underlying streams as +/// they become available, and the streams are polled in a round-robin fashion. +/// Errors, however, are not merged: you get at most one error at a time. +/// +/// Finishes when either of the streams stops responding +#[derive(Debug)] +#[must_use = "streams do nothing unless polled"] +pub struct SelectBoth { + stream1: Fuse, + stream2: Fuse, + flag: bool, +} + +fn new(stream1: S1, stream2: S2) -> SelectBoth + where S1: Stream, + S2: Stream +{ + SelectBoth { + stream1: stream1.fuse(), + stream2: stream2.fuse(), + flag: false, + } +} + +impl Stream for SelectBoth + where S1: Stream, + S2: Stream +{ + type Item = S1::Item; + type Error = S1::Error; + + fn poll(&mut self) -> Poll, S1::Error> { + let (a, b) = if self.flag { + (&mut self.stream2 as &mut Stream, + &mut self.stream1 as &mut Stream) + } else { + (&mut self.stream1 as &mut Stream, + &mut self.stream2 as &mut Stream) + }; + self.flag = !self.flag; + + match a.poll()? { + Async::Ready(Some(item)) => return Ok(Some(item).into()), + Async::Ready(None) => return Ok(None.into()), + Async::NotReady => (), + }; + + self.flag = !self.flag; + + match b.poll()? { + Async::Ready(Some(item)) => Ok(Some(item).into()), + Async::Ready(None) => Ok(None.into()), + Async::NotReady => Ok(Async::NotReady), + } + } +} diff --git a/ipc/src/server.rs b/ipc/src/server.rs index ad4341c24..b328ff485 100644 --- a/ipc/src/server.rs +++ b/ipc/src/server.rs @@ -11,6 +11,7 @@ use server_utils::tokio_io::AsyncRead; use server_utils::{reactor, session, codecs}; use meta::{MetaExtractor, NoopExtractor, RequestContext}; +use select_both::SelectBothExt; /// IPC server session pub struct Service = NoopMiddleware> { @@ -172,7 +173,7 @@ impl> ServerBuilder { }) }) .filter_map(|x| x) - .select(receiver.map_err(|e| { + .select_both(receiver.map_err(|e| { warn!(target: "ipc", "Notification error: {:?}", e); std::io::ErrorKind::Other.into() })); From a3e4401f8b640f131a9c15472e45d360d247fad9 Mon Sep 17 00:00:00 2001 From: debris Date: Wed, 6 Jun 2018 14:05:04 +0200 Subject: [PATCH 02/14] fixed indentation --- ipc/src/select_both.rs | 66 +++++++++++++++++++++--------------------- 1 file changed, 33 insertions(+), 33 deletions(-) diff --git a/ipc/src/select_both.rs b/ipc/src/select_both.rs index 4077b7cb3..b162621f8 100644 --- a/ipc/src/select_both.rs +++ b/ipc/src/select_both.rs @@ -23,51 +23,51 @@ impl SelectBothExt for T where T: Stream { #[derive(Debug)] #[must_use = "streams do nothing unless polled"] pub struct SelectBoth { - stream1: Fuse, - stream2: Fuse, - flag: bool, + stream1: Fuse, + stream2: Fuse, + flag: bool, } fn new(stream1: S1, stream2: S2) -> SelectBoth - where S1: Stream, - S2: Stream + where S1: Stream, + S2: Stream { - SelectBoth { - stream1: stream1.fuse(), - stream2: stream2.fuse(), - flag: false, - } + SelectBoth { + stream1: stream1.fuse(), + stream2: stream2.fuse(), + flag: false, + } } impl Stream for SelectBoth - where S1: Stream, - S2: Stream + where S1: Stream, + S2: Stream { - type Item = S1::Item; - type Error = S1::Error; + type Item = S1::Item; + type Error = S1::Error; - fn poll(&mut self) -> Poll, S1::Error> { - let (a, b) = if self.flag { - (&mut self.stream2 as &mut Stream, - &mut self.stream1 as &mut Stream) - } else { - (&mut self.stream1 as &mut Stream, - &mut self.stream2 as &mut Stream) - }; - self.flag = !self.flag; + fn poll(&mut self) -> Poll, S1::Error> { + let (a, b) = if self.flag { + (&mut self.stream2 as &mut Stream, + &mut self.stream1 as &mut Stream) + } else { + (&mut self.stream1 as &mut Stream, + &mut self.stream2 as &mut Stream) + }; + self.flag = !self.flag; - match a.poll()? { - Async::Ready(Some(item)) => return Ok(Some(item).into()), - Async::Ready(None) => return Ok(None.into()), - Async::NotReady => (), - }; + match a.poll()? { + Async::Ready(Some(item)) => return Ok(Some(item).into()), + Async::Ready(None) => return Ok(None.into()), + Async::NotReady => (), + }; self.flag = !self.flag; - match b.poll()? { - Async::Ready(Some(item)) => Ok(Some(item).into()), + match b.poll()? { + Async::Ready(Some(item)) => Ok(Some(item).into()), Async::Ready(None) => Ok(None.into()), - Async::NotReady => Ok(Async::NotReady), - } - } + Async::NotReady => Ok(Async::NotReady), + } + } } From 2ccdeb68037bc88dc31bf3cc44c95b5808228691 Mon Sep 17 00:00:00 2001 From: debris Date: Wed, 6 Jun 2018 17:15:13 +0200 Subject: [PATCH 03/14] fixed broken pipe issue in tests --- ipc/src/select_both.rs | 6 +----- ipc/src/server.rs | 28 +++++++++++++++------------- 2 files changed, 16 insertions(+), 18 deletions(-) diff --git a/ipc/src/select_both.rs b/ipc/src/select_both.rs index b162621f8..914c00219 100644 --- a/ipc/src/select_both.rs +++ b/ipc/src/select_both.rs @@ -64,10 +64,6 @@ impl Stream for SelectBoth self.flag = !self.flag; - match b.poll()? { - Async::Ready(Some(item)) => Ok(Some(item).into()), - Async::Ready(None) => Ok(None.into()), - Async::NotReady => Ok(Async::NotReady), - } + b.poll() } } diff --git a/ipc/src/server.rs b/ipc/src/server.rs index b328ff485..fda2e2790 100644 --- a/ipc/src/server.rs +++ b/ipc/src/server.rs @@ -264,7 +264,7 @@ mod tests { fn run(path: &str) -> Server { let builder = server_builder(); let server = builder.start(path).expect("Server must run with no issues"); - thread::sleep(::std::time::Duration::from_millis(50)); + thread::sleep(::std::time::Duration::from_millis(5000)); server } @@ -275,10 +275,12 @@ mod tests { let (writer, reader) = stream.framed(codecs::StreamCodec::stream_incoming()).split(); let reply = writer .send(data.to_owned()) - .and_then(move |_| { - reader.into_future().map_err(|(err, _)| err) + .and_then(move |stream| { + reader.into_future() + .map(|x| (stream, x)) + .map_err(|(err, _)| err) }) - .and_then(|(reply, _)| { + .and_then(|(_stream, (reply, _))| { future::ok(reply.expect("there should be one reply")) }); @@ -313,7 +315,7 @@ mod tests { fn request() { ::logger::init_log(); let path = "/tmp/test-ipc-40000"; - let _server = run(path); + let server = run(path); let result = dummy_request_str( path, @@ -324,7 +326,8 @@ mod tests { result, "{\"jsonrpc\":\"2.0\",\"result\":\"hello\",\"id\":1}", "Response does not exactly match the expected response", - ); + ); + server.close(); } #[test] @@ -333,7 +336,7 @@ mod tests { ::logger::init_log(); let path = "/tmp/test-ipc-45000"; - let _server = run(path); + let server = run(path); let mut handles = Vec::new(); for _ in 0..4 { @@ -351,8 +354,6 @@ mod tests { "{\"jsonrpc\":\"2.0\",\"result\":\"hello\",\"id\":1}", "Response does not exactly match the expected response", ); - - ::std::thread::sleep(::std::time::Duration::from_millis(10)); } }) ); @@ -361,6 +362,7 @@ mod tests { for handle in handles.drain(..) { handle.join().unwrap(); } + server.close(); } #[test] @@ -401,8 +403,8 @@ mod tests { }); let builder = ServerBuilder::new(io); - let _server = builder.start(path).expect("Server must run with no issues"); - thread::sleep(::std::time::Duration::from_millis(50)); + let server = builder.start(path).expect("Server must run with no issues"); + thread::sleep(::std::time::Duration::from_millis(5000)); let result = dummy_request_str(&path, "{\"jsonrpc\": \"2.0\", \"method\": \"say_huge_hello\", \"params\": [], \"id\": 1}", @@ -412,8 +414,8 @@ mod tests { result, huge_response_test_json(), "Response does not exactly match the expected response", - ); - + ); + server.close(); } From feecb988354bb40844ecb656fd61ed1cce5e1d42 Mon Sep 17 00:00:00 2001 From: debris Date: Wed, 6 Jun 2018 18:01:37 +0200 Subject: [PATCH 04/14] empirical tests fixes --- ipc/src/server.rs | 18 +++++++++++++----- 1 file changed, 13 insertions(+), 5 deletions(-) diff --git a/ipc/src/server.rs b/ipc/src/server.rs index fda2e2790..94cc70900 100644 --- a/ipc/src/server.rs +++ b/ipc/src/server.rs @@ -173,6 +173,8 @@ impl> ServerBuilder { }) }) .filter_map(|x| x) + // we use `select_both` here, instead of `select`, to close the stream + // as soon as the ipc pipe is closed .select_both(receiver.map_err(|e| { warn!(target: "ipc", "Notification error: {:?}", e); std::io::ErrorKind::Other.into() @@ -264,7 +266,6 @@ mod tests { fn run(path: &str) -> Server { let builder = server_builder(); let server = builder.start(path).expect("Server must run with no issues"); - thread::sleep(::std::time::Duration::from_millis(5000)); server } @@ -275,12 +276,11 @@ mod tests { let (writer, reader) = stream.framed(codecs::StreamCodec::stream_incoming()).split(); let reply = writer .send(data.to_owned()) - .and_then(move |stream| { + .and_then(move |_| { reader.into_future() - .map(|x| (stream, x)) .map_err(|(err, _)| err) }) - .and_then(|(_stream, (reply, _))| { + .and_then(|(reply, _)| { future::ok(reply.expect("there should be one reply")) }); @@ -316,6 +316,7 @@ mod tests { ::logger::init_log(); let path = "/tmp/test-ipc-40000"; let server = run(path); + thread::sleep(::std::time::Duration::from_millis(1000)); let result = dummy_request_str( path, @@ -327,6 +328,8 @@ mod tests { "{\"jsonrpc\":\"2.0\",\"result\":\"hello\",\"id\":1}", "Response does not exactly match the expected response", ); + + thread::sleep(::std::time::Duration::from_millis(1000)); server.close(); } @@ -337,12 +340,14 @@ mod tests { ::logger::init_log(); let path = "/tmp/test-ipc-45000"; let server = run(path); + thread::sleep(::std::time::Duration::from_millis(1000)); let mut handles = Vec::new(); for _ in 0..4 { let path = path.clone(); handles.push( thread::spawn(move || { + thread::sleep(::std::time::Duration::from_millis(100)); for _ in 0..100 { let result = dummy_request_str( &path, @@ -362,6 +367,7 @@ mod tests { for handle in handles.drain(..) { handle.join().unwrap(); } + thread::sleep(::std::time::Duration::from_millis(1000)); server.close(); } @@ -404,7 +410,7 @@ mod tests { let builder = ServerBuilder::new(io); let server = builder.start(path).expect("Server must run with no issues"); - thread::sleep(::std::time::Duration::from_millis(5000)); + thread::sleep(::std::time::Duration::from_millis(1000)); let result = dummy_request_str(&path, "{\"jsonrpc\": \"2.0\", \"method\": \"say_huge_hello\", \"params\": [], \"id\": 1}", @@ -415,6 +421,8 @@ mod tests { huge_response_test_json(), "Response does not exactly match the expected response", ); + + thread::sleep(::std::time::Duration::from_millis(1000)); server.close(); } From dd70e1fc4694eb425e1dc78a28214fff02be192a Mon Sep 17 00:00:00 2001 From: debris Date: Mon, 11 Jun 2018 14:18:54 +0200 Subject: [PATCH 05/14] fix tests --- ipc/src/server.rs | 90 ++++++++++++++++++++++++++--------------------- 1 file changed, 49 insertions(+), 41 deletions(-) diff --git a/ipc/src/server.rs b/ipc/src/server.rs index 94cc70900..bca93b288 100644 --- a/ipc/src/server.rs +++ b/ipc/src/server.rs @@ -247,9 +247,11 @@ mod tests { extern crate tokio_uds; use std::thread; + use std::time::Duration; use super::{ServerBuilder, Server}; use jsonrpc::{MetaIoHandler, Value}; use jsonrpc::futures::{Future, future, Stream, Sink}; + use jsonrpc::futures::sync::{mpsc, oneshot}; use self::tokio_uds::UnixStream; use server_utils::tokio_core::reactor::Core; use server_utils::tokio_io::AsyncRead; @@ -270,8 +272,7 @@ mod tests { } fn dummy_request_str(path: &str, data: &str) -> String { - let mut core = Core::new().expect("Tokio Core should be created with no errors"); - + let core = Core::new().unwrap(); let stream = UnixStream::connect(path, &core.handle()).expect("Should have been connected to the server"); let (writer, reader) = stream.framed(codecs::StreamCodec::stream_incoming()).split(); let reply = writer @@ -284,7 +285,7 @@ mod tests { future::ok(reply.expect("there should be one reply")) }); - core.run(reply).unwrap() + reply.wait().unwrap() } #[test] @@ -316,58 +317,63 @@ mod tests { ::logger::init_log(); let path = "/tmp/test-ipc-40000"; let server = run(path); - thread::sleep(::std::time::Duration::from_millis(1000)); + let (stop_signal, stop_receiver) = oneshot::channel(); - let result = dummy_request_str( - path, - "{\"jsonrpc\": \"2.0\", \"method\": \"say_hello\", \"params\": [42, 23], \"id\": 1}", + let _ = thread::spawn(move || { + let result = dummy_request_str( + path, + "{\"jsonrpc\": \"2.0\", \"method\": \"say_hello\", \"params\": [42, 23], \"id\": 1}", + ); + stop_signal.send(result).unwrap(); + }).join().unwrap(); + + let _ = stop_receiver.map(move |result: String| { + assert_eq!( + result, + "{\"jsonrpc\":\"2.0\",\"result\":\"hello\",\"id\":1}", + "Response does not exactly match the expected response", ); - - assert_eq!( - result, - "{\"jsonrpc\":\"2.0\",\"result\":\"hello\",\"id\":1}", - "Response does not exactly match the expected response", - ); - - thread::sleep(::std::time::Duration::from_millis(1000)); - server.close(); + server.close(); + }).wait(); } #[test] fn req_parallel() { - use std::thread; - ::logger::init_log(); let path = "/tmp/test-ipc-45000"; let server = run(path); - thread::sleep(::std::time::Duration::from_millis(1000)); + thread::sleep(Duration::from_millis(100)); + let (stop_signal, stop_receiver) = mpsc::channel(400); let mut handles = Vec::new(); for _ in 0..4 { let path = path.clone(); + let mut stop_signal = stop_signal.clone(); handles.push( thread::spawn(move || { - thread::sleep(::std::time::Duration::from_millis(100)); for _ in 0..100 { let result = dummy_request_str( &path, "{\"jsonrpc\": \"2.0\", \"method\": \"say_hello\", \"params\": [42, 23], \"id\": 1}", ); - - assert_eq!( - result, - "{\"jsonrpc\":\"2.0\",\"result\":\"hello\",\"id\":1}", - "Response does not exactly match the expected response", - ); + stop_signal.try_send(result).unwrap(); } }) ); } + thread::sleep(Duration::from_millis(100)); for handle in handles.drain(..) { handle.join().unwrap(); } - thread::sleep(::std::time::Duration::from_millis(1000)); + + let _ = stop_receiver.map(|result| { + assert_eq!( + result, + "{\"jsonrpc\":\"2.0\",\"result\":\"hello\",\"id\":1}", + "Response does not exactly match the expected response", + ); + }).take(400).collect().wait(); server.close(); } @@ -410,22 +416,24 @@ mod tests { let builder = ServerBuilder::new(io); let server = builder.start(path).expect("Server must run with no issues"); - thread::sleep(::std::time::Duration::from_millis(1000)); + let (stop_signal, stop_receiver) = oneshot::channel(); - let result = dummy_request_str(&path, - "{\"jsonrpc\": \"2.0\", \"method\": \"say_huge_hello\", \"params\": [], \"id\": 1}", - ); + let _ = thread::spawn(move || { + let result = dummy_request_str( + &path, + "{\"jsonrpc\": \"2.0\", \"method\": \"say_huge_hello\", \"params\": [], \"id\": 1}", + ); - assert_eq!( - result, - huge_response_test_json(), - "Response does not exactly match the expected response", - ); + stop_signal.send(result).unwrap(); + }).join().unwrap(); - thread::sleep(::std::time::Duration::from_millis(1000)); - server.close(); + let _ = stop_receiver.map(move |result: String| { + assert_eq!( + result, + huge_response_test_json(), + "Response does not exactly match the expected response", + ); + server.close(); + }); } - - - } From e39aca6273397afcb41ba4104deea4e4c1cc0dc4 Mon Sep 17 00:00:00 2001 From: debris Date: Mon, 11 Jun 2018 15:14:02 +0200 Subject: [PATCH 06/14] fix tests --- ipc/src/server.rs | 15 +++++++++------ 1 file changed, 9 insertions(+), 6 deletions(-) diff --git a/ipc/src/server.rs b/ipc/src/server.rs index bca93b288..17cd77143 100644 --- a/ipc/src/server.rs +++ b/ipc/src/server.rs @@ -317,15 +317,17 @@ mod tests { ::logger::init_log(); let path = "/tmp/test-ipc-40000"; let server = run(path); + thread::sleep(Duration::from_millis(1000)); let (stop_signal, stop_receiver) = oneshot::channel(); - let _ = thread::spawn(move || { + let t = thread::spawn(move || { let result = dummy_request_str( path, "{\"jsonrpc\": \"2.0\", \"method\": \"say_hello\", \"params\": [42, 23], \"id\": 1}", ); stop_signal.send(result).unwrap(); - }).join().unwrap(); + }); + t.join().unwrap(); let _ = stop_receiver.map(move |result: String| { assert_eq!( @@ -342,9 +344,9 @@ mod tests { ::logger::init_log(); let path = "/tmp/test-ipc-45000"; let server = run(path); - thread::sleep(Duration::from_millis(100)); let (stop_signal, stop_receiver) = mpsc::channel(400); + thread::sleep(Duration::from_millis(1000)); let mut handles = Vec::new(); for _ in 0..4 { let path = path.clone(); @@ -361,7 +363,6 @@ mod tests { }) ); } - thread::sleep(Duration::from_millis(100)); for handle in handles.drain(..) { handle.join().unwrap(); @@ -416,16 +417,18 @@ mod tests { let builder = ServerBuilder::new(io); let server = builder.start(path).expect("Server must run with no issues"); + thread::sleep(Duration::from_millis(1000)); let (stop_signal, stop_receiver) = oneshot::channel(); - let _ = thread::spawn(move || { + let t = thread::spawn(move || { let result = dummy_request_str( &path, "{\"jsonrpc\": \"2.0\", \"method\": \"say_huge_hello\", \"params\": [], \"id\": 1}", ); stop_signal.send(result).unwrap(); - }).join().unwrap(); + }); + t.join().unwrap(); let _ = stop_receiver.map(move |result: String| { assert_eq!( From 93be6f1d111f4297049f6a34305e4247cbc3f525 Mon Sep 17 00:00:00 2001 From: debris Date: Tue, 12 Jun 2018 08:11:37 +0200 Subject: [PATCH 07/14] fix tests --- ipc/src/server.rs | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/ipc/src/server.rs b/ipc/src/server.rs index 17cd77143..867f82f2f 100644 --- a/ipc/src/server.rs +++ b/ipc/src/server.rs @@ -317,10 +317,10 @@ mod tests { ::logger::init_log(); let path = "/tmp/test-ipc-40000"; let server = run(path); - thread::sleep(Duration::from_millis(1000)); let (stop_signal, stop_receiver) = oneshot::channel(); let t = thread::spawn(move || { + thread::sleep(Duration::from_millis(1000)); let result = dummy_request_str( path, "{\"jsonrpc\": \"2.0\", \"method\": \"say_hello\", \"params\": [42, 23], \"id\": 1}", @@ -346,13 +346,13 @@ mod tests { let server = run(path); let (stop_signal, stop_receiver) = mpsc::channel(400); - thread::sleep(Duration::from_millis(1000)); let mut handles = Vec::new(); for _ in 0..4 { let path = path.clone(); let mut stop_signal = stop_signal.clone(); handles.push( thread::spawn(move || { + thread::sleep(Duration::from_millis(1000)); for _ in 0..100 { let result = dummy_request_str( &path, @@ -417,10 +417,10 @@ mod tests { let builder = ServerBuilder::new(io); let server = builder.start(path).expect("Server must run with no issues"); - thread::sleep(Duration::from_millis(1000)); let (stop_signal, stop_receiver) = oneshot::channel(); let t = thread::spawn(move || { + thread::sleep(Duration::from_millis(1000)); let result = dummy_request_str( &path, "{\"jsonrpc\": \"2.0\", \"method\": \"say_huge_hello\", \"params\": [], \"id\": 1}", From 5ce80c776ac92b7b2411e8941feb3f708d757ff1 Mon Sep 17 00:00:00 2001 From: debris Date: Tue, 12 Jun 2018 09:59:49 +0200 Subject: [PATCH 08/14] move ipc start_signal.send after the incoming.for_each --- ipc/src/server.rs | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/ipc/src/server.rs b/ipc/src/server.rs index 867f82f2f..57485b402 100644 --- a/ipc/src/server.rs +++ b/ipc/src/server.rs @@ -132,7 +132,6 @@ impl> ServerBuilder { } }; - start_signal.send(Ok(())).expect("Cannot fail since receiver never dropped before receiving"); let remote = handle.remote().clone(); let connections = listener.incoming(); let mut id = 0u64; @@ -190,6 +189,7 @@ impl> ServerBuilder { Ok(()) }); + start_signal.send(Ok(())).expect("Cannot fail since receiver never dropped before receiving"); let stop = stop_receiver.map_err(|_| std::io::ErrorKind::Interrupted.into()); future::Either::B( @@ -272,7 +272,7 @@ mod tests { } fn dummy_request_str(path: &str, data: &str) -> String { - let core = Core::new().unwrap(); + let mut core = Core::new().unwrap(); let stream = UnixStream::connect(path, &core.handle()).expect("Should have been connected to the server"); let (writer, reader) = stream.framed(codecs::StreamCodec::stream_incoming()).split(); let reply = writer @@ -285,7 +285,7 @@ mod tests { future::ok(reply.expect("there should be one reply")) }); - reply.wait().unwrap() + core.run(reply).unwrap() } #[test] @@ -320,7 +320,7 @@ mod tests { let (stop_signal, stop_receiver) = oneshot::channel(); let t = thread::spawn(move || { - thread::sleep(Duration::from_millis(1000)); + thread::sleep(Duration::from_millis(5000)); let result = dummy_request_str( path, "{\"jsonrpc\": \"2.0\", \"method\": \"say_hello\", \"params\": [42, 23], \"id\": 1}", @@ -352,7 +352,7 @@ mod tests { let mut stop_signal = stop_signal.clone(); handles.push( thread::spawn(move || { - thread::sleep(Duration::from_millis(1000)); + thread::sleep(Duration::from_millis(5000)); for _ in 0..100 { let result = dummy_request_str( &path, @@ -420,7 +420,7 @@ mod tests { let (stop_signal, stop_receiver) = oneshot::channel(); let t = thread::spawn(move || { - thread::sleep(Duration::from_millis(1000)); + thread::sleep(Duration::from_millis(5000)); let result = dummy_request_str( &path, "{\"jsonrpc\": \"2.0\", \"method\": \"say_huge_hello\", \"params\": [], \"id\": 1}", From cf248b9c0eada0661fd6be4749deab259dd93c49 Mon Sep 17 00:00:00 2001 From: debris Date: Tue, 12 Jun 2018 10:37:03 +0200 Subject: [PATCH 09/14] log ipc traces on travis --- .travis.yml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.travis.yml b/.travis.yml index 6ac3d60d7..4dad30b1a 100644 --- a/.travis.yml +++ b/.travis.yml @@ -18,7 +18,7 @@ matrix: script: - cargo build --all - - cargo test --all + - RUST_LOG=ipc cargo test --all after_success: | [ $TRAVIS_BRANCH = master ] && From 98a7dc2b2073064dcc1154b082d972c623ad20cc Mon Sep 17 00:00:00 2001 From: debris Date: Tue, 12 Jun 2018 11:02:25 +0200 Subject: [PATCH 10/14] keep writer in memory as long as possible --- ipc/src/server.rs | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/ipc/src/server.rs b/ipc/src/server.rs index 57485b402..b32185909 100644 --- a/ipc/src/server.rs +++ b/ipc/src/server.rs @@ -277,11 +277,12 @@ mod tests { let (writer, reader) = stream.framed(codecs::StreamCodec::stream_incoming()).split(); let reply = writer .send(data.to_owned()) - .and_then(move |_| { + .and_then(move |writer| { reader.into_future() .map_err(|(err, _)| err) + .map(|x| (writer, x)) }) - .and_then(|(reply, _)| { + .and_then(|(_writer, (reply, _))| { future::ok(reply.expect("there should be one reply")) }); From 8151fd547718650017addd2ab50e5e3616592593 Mon Sep 17 00:00:00 2001 From: debris Date: Tue, 12 Jun 2018 14:04:27 +0200 Subject: [PATCH 11/14] select_with_weak --- .travis.yml | 2 +- ipc/Cargo.toml | 2 +- ipc/src/lib.rs | 2 +- ipc/src/select_both.rs | 69 --------------------------------- ipc/src/select_with_weak.rs | 77 +++++++++++++++++++++++++++++++++++++ ipc/src/server.rs | 42 ++++++++++---------- 6 files changed, 100 insertions(+), 94 deletions(-) delete mode 100644 ipc/src/select_both.rs create mode 100644 ipc/src/select_with_weak.rs diff --git a/.travis.yml b/.travis.yml index 4dad30b1a..6ac3d60d7 100644 --- a/.travis.yml +++ b/.travis.yml @@ -18,7 +18,7 @@ matrix: script: - cargo build --all - - RUST_LOG=ipc cargo test --all + - cargo test --all after_success: | [ $TRAVIS_BRANCH = master ] && diff --git a/ipc/Cargo.toml b/ipc/Cargo.toml index a6a159b97..b0e24ced4 100644 --- a/ipc/Cargo.toml +++ b/ipc/Cargo.toml @@ -20,7 +20,7 @@ env_logger = "0.5" lazy_static = "1.0" [target.'cfg(not(windows))'.dev-dependencies] -tokio-uds = "0.1" +tokio-uds = "0.2" [badges] travis-ci = { repository = "paritytech/jsonrpc", branch = "master"} diff --git a/ipc/src/lib.rs b/ipc/src/lib.rs index 414c249f4..f2b0ffbfd 100644 --- a/ipc/src/lib.rs +++ b/ipc/src/lib.rs @@ -15,7 +15,7 @@ pub extern crate jsonrpc_core; #[cfg(test)] mod logger; mod server; -mod select_both; +mod select_with_weak; mod meta; use jsonrpc_core as jsonrpc; diff --git a/ipc/src/select_both.rs b/ipc/src/select_both.rs deleted file mode 100644 index 914c00219..000000000 --- a/ipc/src/select_both.rs +++ /dev/null @@ -1,69 +0,0 @@ -use jsonrpc::futures::{Poll, Async}; -use jsonrpc::futures::stream::{Stream, Fuse}; - -pub trait SelectBothExt: Stream { - fn select_both(self, other: S) -> SelectBoth - where S: Stream, Self: Sized; -} - -impl SelectBothExt for T where T: Stream { - fn select_both(self, other: S) -> SelectBoth - where S: Stream, Self: Sized { - new(self, other) - } -} - -/// An adapter for merging the output of two streams. -/// -/// The merged stream produces items from either of the underlying streams as -/// they become available, and the streams are polled in a round-robin fashion. -/// Errors, however, are not merged: you get at most one error at a time. -/// -/// Finishes when either of the streams stops responding -#[derive(Debug)] -#[must_use = "streams do nothing unless polled"] -pub struct SelectBoth { - stream1: Fuse, - stream2: Fuse, - flag: bool, -} - -fn new(stream1: S1, stream2: S2) -> SelectBoth - where S1: Stream, - S2: Stream -{ - SelectBoth { - stream1: stream1.fuse(), - stream2: stream2.fuse(), - flag: false, - } -} - -impl Stream for SelectBoth - where S1: Stream, - S2: Stream -{ - type Item = S1::Item; - type Error = S1::Error; - - fn poll(&mut self) -> Poll, S1::Error> { - let (a, b) = if self.flag { - (&mut self.stream2 as &mut Stream, - &mut self.stream1 as &mut Stream) - } else { - (&mut self.stream1 as &mut Stream, - &mut self.stream2 as &mut Stream) - }; - self.flag = !self.flag; - - match a.poll()? { - Async::Ready(Some(item)) => return Ok(Some(item).into()), - Async::Ready(None) => return Ok(None.into()), - Async::NotReady => (), - }; - - self.flag = !self.flag; - - b.poll() - } -} diff --git a/ipc/src/select_with_weak.rs b/ipc/src/select_with_weak.rs new file mode 100644 index 000000000..a3515f430 --- /dev/null +++ b/ipc/src/select_with_weak.rs @@ -0,0 +1,77 @@ +use jsonrpc::futures::{Poll, Async}; +use jsonrpc::futures::stream::{Stream, Fuse}; + +pub trait SelectWithWeakExt: Stream { + fn select_with_weak(self, other: S) -> SelectWithWeak + where S: Stream, Self: Sized; +} + +impl SelectWithWeakExt for T where T: Stream { + fn select_with_weak(self, other: S) -> SelectWithWeak + where S: Stream, Self: Sized { + new(self, other) + } +} + +/// An adapter for merging the output of two streams. +/// +/// The merged stream produces items from either of the underlying streams as +/// they become available, and the streams are polled in a round-robin fashion. +/// Errors, however, are not merged: you get at most one error at a time. +/// +/// Finishes when either of the streams stops responding +#[derive(Debug)] +#[must_use = "streams do nothing unless polled"] +pub struct SelectWithWeak { + strong: Fuse, + weak: Fuse, + use_strong: bool, +} + +fn new(stream1: S1, stream2: S2) -> SelectWithWeak + where S1: Stream, + S2: Stream +{ + SelectWithWeak { + strong: stream1.fuse(), + weak: stream2.fuse(), + use_strong: false, + } +} + +impl Stream for SelectWithWeak + where S1: Stream, + S2: Stream +{ + type Item = S1::Item; + type Error = S1::Error; + + fn poll(&mut self) -> Poll, S1::Error> { + let mut checked_strong = false; + loop { + if self.use_strong { + match self.strong.poll()? { + Async::Ready(Some(item)) => { + self.use_strong = false; + return Ok(Some(item).into()) + }, + Async::Ready(None) => return Ok(None.into()), + Async::NotReady => { + if !checked_strong { + self.use_strong = false; + } else { + return Ok(Async::NotReady) + } + } + } + checked_strong = true; + } else { + self.use_strong = true; + match self.weak.poll()? { + Async::Ready(Some(item)) => return Ok(Some(item).into()), + Async::Ready(None) | Async::NotReady => (), + } + } + } + } +} diff --git a/ipc/src/server.rs b/ipc/src/server.rs index b32185909..c73f3ff06 100644 --- a/ipc/src/server.rs +++ b/ipc/src/server.rs @@ -11,7 +11,7 @@ use server_utils::tokio_io::AsyncRead; use server_utils::{reactor, session, codecs}; use meta::{MetaExtractor, NoopExtractor, RequestContext}; -use select_both::SelectBothExt; +use select_with_weak::SelectWithWeakExt; /// IPC server session pub struct Service = NoopMiddleware> { @@ -172,9 +172,9 @@ impl> ServerBuilder { }) }) .filter_map(|x| x) - // we use `select_both` here, instead of `select`, to close the stream + // we use `select_with_weak` here, instead of `select`, to close the stream // as soon as the ipc pipe is closed - .select_both(receiver.map_err(|e| { + .select_with_weak(receiver.map_err(|e| { warn!(target: "ipc", "Notification error: {:?}", e); std::io::ErrorKind::Other.into() })); @@ -253,7 +253,6 @@ mod tests { use jsonrpc::futures::{Future, future, Stream, Sink}; use jsonrpc::futures::sync::{mpsc, oneshot}; use self::tokio_uds::UnixStream; - use server_utils::tokio_core::reactor::Core; use server_utils::tokio_io::AsyncRead; use server_utils::codecs; @@ -272,21 +271,21 @@ mod tests { } fn dummy_request_str(path: &str, data: &str) -> String { - let mut core = Core::new().unwrap(); - let stream = UnixStream::connect(path, &core.handle()).expect("Should have been connected to the server"); - let (writer, reader) = stream.framed(codecs::StreamCodec::stream_incoming()).split(); - let reply = writer - .send(data.to_owned()) - .and_then(move |writer| { - reader.into_future() - .map_err(|(err, _)| err) - .map(|x| (writer, x)) - }) - .and_then(|(_writer, (reply, _))| { - future::ok(reply.expect("there should be one reply")) - }); + let stream_future = UnixStream::connect(path); + let reply = stream_future.and_then(|stream| { + let stream= stream.framed(codecs::StreamCodec::stream_incoming()); + let reply = stream + .send(data.to_owned()) + .and_then(move |stream| { + stream.into_future().map_err(|(err, _)| err) + }) + .and_then(|(reply, _)| { + future::ok(reply.expect("there should be one reply")) + }); + reply + }); - core.run(reply).unwrap() + reply.wait().expect("wait for reply") } #[test] @@ -309,8 +308,7 @@ mod tests { let path = "/tmp/test-ipc-30000"; let _server = run(path); - let core = Core::new().expect("Tokio Core should be created with no errors"); - UnixStream::connect(path, &core.handle()).expect("Socket should connect"); + UnixStream::connect(path).wait().expect("Socket should connect"); } #[test] @@ -387,8 +385,7 @@ mod tests { server.close(); assert!(::std::fs::metadata(path).is_err(), "There should be no socket file left"); - let core = Core::new().expect("Tokio Core should be created with no errors"); - assert!(UnixStream::connect(path, &core.handle()).is_err(), "Connection to the closed socket should fail"); + assert!(UnixStream::connect(path).wait().is_err(), "Connection to the closed socket should fail"); } fn huge_response_test_str() -> String { @@ -409,6 +406,7 @@ mod tests { #[test] fn test_huge_response() { + ::logger::init_log(); let path = "/tmp/test-ipc-60000"; let mut io = MetaIoHandler::<()>::default(); From 15e6c03fa7be8df56c15452f6da1dc04c0239d63 Mon Sep 17 00:00:00 2001 From: debris Date: Tue, 12 Jun 2018 14:12:38 +0200 Subject: [PATCH 12/14] remove redundant thread::sleep --- ipc/src/select_with_weak.rs | 2 +- ipc/src/server.rs | 4 ---- 2 files changed, 1 insertion(+), 5 deletions(-) diff --git a/ipc/src/select_with_weak.rs b/ipc/src/select_with_weak.rs index a3515f430..204059c27 100644 --- a/ipc/src/select_with_weak.rs +++ b/ipc/src/select_with_weak.rs @@ -19,7 +19,7 @@ impl SelectWithWeakExt for T where T: Stream { /// they become available, and the streams are polled in a round-robin fashion. /// Errors, however, are not merged: you get at most one error at a time. /// -/// Finishes when either of the streams stops responding +/// Finishes when strong stream finishes #[derive(Debug)] #[must_use = "streams do nothing unless polled"] pub struct SelectWithWeak { diff --git a/ipc/src/server.rs b/ipc/src/server.rs index c73f3ff06..ead47b19d 100644 --- a/ipc/src/server.rs +++ b/ipc/src/server.rs @@ -247,7 +247,6 @@ mod tests { extern crate tokio_uds; use std::thread; - use std::time::Duration; use super::{ServerBuilder, Server}; use jsonrpc::{MetaIoHandler, Value}; use jsonrpc::futures::{Future, future, Stream, Sink}; @@ -319,7 +318,6 @@ mod tests { let (stop_signal, stop_receiver) = oneshot::channel(); let t = thread::spawn(move || { - thread::sleep(Duration::from_millis(5000)); let result = dummy_request_str( path, "{\"jsonrpc\": \"2.0\", \"method\": \"say_hello\", \"params\": [42, 23], \"id\": 1}", @@ -351,7 +349,6 @@ mod tests { let mut stop_signal = stop_signal.clone(); handles.push( thread::spawn(move || { - thread::sleep(Duration::from_millis(5000)); for _ in 0..100 { let result = dummy_request_str( &path, @@ -419,7 +416,6 @@ mod tests { let (stop_signal, stop_receiver) = oneshot::channel(); let t = thread::spawn(move || { - thread::sleep(Duration::from_millis(5000)); let result = dummy_request_str( &path, "{\"jsonrpc\": \"2.0\", \"method\": \"say_huge_hello\", \"params\": [], \"id\": 1}", From a7630db618cbf10bd300586488f0b34e62ea3aab Mon Sep 17 00:00:00 2001 From: debris Date: Tue, 12 Jun 2018 16:54:59 +0200 Subject: [PATCH 13/14] test session end --- ipc/Cargo.toml | 1 + ipc/src/server.rs | 51 ++++++++++++++++++++++++++++++++++++++++++++++- 2 files changed, 51 insertions(+), 1 deletion(-) diff --git a/ipc/Cargo.toml b/ipc/Cargo.toml index b0e24ced4..977db5bd3 100644 --- a/ipc/Cargo.toml +++ b/ipc/Cargo.toml @@ -18,6 +18,7 @@ parity-tokio-ipc = { git = "https://github.com/nikvolf/parity-tokio-ipc" } [dev-dependencies] env_logger = "0.5" lazy_static = "1.0" +parking_lot = "0.5" [target.'cfg(not(windows))'.dev-dependencies] tokio-uds = "0.2" diff --git a/ipc/src/server.rs b/ipc/src/server.rs index ead47b19d..59450d196 100644 --- a/ipc/src/server.rs +++ b/ipc/src/server.rs @@ -245,15 +245,19 @@ impl Drop for Server { #[cfg(not(windows))] mod tests { extern crate tokio_uds; + extern crate parking_lot; use std::thread; + use std::sync::Arc; use super::{ServerBuilder, Server}; use jsonrpc::{MetaIoHandler, Value}; use jsonrpc::futures::{Future, future, Stream, Sink}; use jsonrpc::futures::sync::{mpsc, oneshot}; use self::tokio_uds::UnixStream; + use self::parking_lot::Mutex; use server_utils::tokio_io::AsyncRead; use server_utils::codecs; + use meta::{MetaExtractor, RequestContext}; fn server_builder() -> ServerBuilder { let mut io = MetaIoHandler::<()>::default(); @@ -432,6 +436,51 @@ mod tests { "Response does not exactly match the expected response", ); server.close(); - }); + }).wait(); + } + + #[test] + fn test_session_end() { + struct SessionEndMeta { + drop_signal: Option>, + } + + impl Drop for SessionEndMeta { + fn drop(&mut self) { + trace!(target: "ipc", "Dropping session meta"); + self.drop_signal.take().unwrap().send(()).unwrap() + } + } + + #[derive(Default)] + struct SessionEndExtractor { + drop_receivers: Arc>>>, + } + + impl MetaExtractor> for SessionEndExtractor { + fn extract(&self, _context: &RequestContext) -> Arc { + let (signal, receiver) = oneshot::channel(); + self.drop_receivers.lock().push(receiver); + let meta = SessionEndMeta { + drop_signal: Some(signal), + }; + Arc::new(meta) + } + } + + ::logger::init_log(); + let path = "/tmp/test-ipc-30009"; + let session_metadata_extractor = SessionEndExtractor::default(); + let drop_receivers = session_metadata_extractor.drop_receivers.clone(); + + let io = MetaIoHandler::>::default(); + let builder = ServerBuilder::with_meta_extractor(io, session_metadata_extractor); + let server = builder.start(path).expect("Server must run with no issues"); + UnixStream::connect(path).wait().expect("Socket should connect"); + server.close(); + + assert_eq!(drop_receivers.lock().len(), 1); + let receiver = drop_receivers.lock().pop().unwrap(); + let _ = receiver.map(|_| {}).wait().unwrap(); } } From bb76cf3c4f72e33c68f2a42bb984a05f3c32492a Mon Sep 17 00:00:00 2001 From: debris Date: Tue, 12 Jun 2018 17:50:58 +0200 Subject: [PATCH 14/14] fixed race condition in test_session_end --- ipc/src/server.rs | 24 ++++++++++++++---------- 1 file changed, 14 insertions(+), 10 deletions(-) diff --git a/ipc/src/server.rs b/ipc/src/server.rs index 59450d196..97df6d8cd 100644 --- a/ipc/src/server.rs +++ b/ipc/src/server.rs @@ -452,15 +452,14 @@ mod tests { } } - #[derive(Default)] struct SessionEndExtractor { - drop_receivers: Arc>>>, + drop_receivers: Arc>>>, } impl MetaExtractor> for SessionEndExtractor { fn extract(&self, _context: &RequestContext) -> Arc { let (signal, receiver) = oneshot::channel(); - self.drop_receivers.lock().push(receiver); + self.drop_receivers.lock().try_send(receiver).unwrap(); let meta = SessionEndMeta { drop_signal: Some(signal), }; @@ -470,17 +469,22 @@ mod tests { ::logger::init_log(); let path = "/tmp/test-ipc-30009"; - let session_metadata_extractor = SessionEndExtractor::default(); - let drop_receivers = session_metadata_extractor.drop_receivers.clone(); + let (signal, receiver) = mpsc::channel(16); + let session_metadata_extractor = SessionEndExtractor { + drop_receivers: Arc::new(Mutex::new(signal)) + }; let io = MetaIoHandler::>::default(); let builder = ServerBuilder::with_meta_extractor(io, session_metadata_extractor); let server = builder.start(path).expect("Server must run with no issues"); - UnixStream::connect(path).wait().expect("Socket should connect"); - server.close(); + { + let _ = UnixStream::connect(path).wait().expect("Socket should connect"); + } - assert_eq!(drop_receivers.lock().len(), 1); - let receiver = drop_receivers.lock().pop().unwrap(); - let _ = receiver.map(|_| {}).wait().unwrap(); + receiver.into_future() + .map_err(|_| ()) + .and_then(|drop_receiver| drop_receiver.0.unwrap().map_err(|_| ())) + .wait().unwrap(); + server.close(); } }