From 9868f88ce0db299599fa22c5858516ccfead4c1c Mon Sep 17 00:00:00 2001 From: Niklas Date: Fri, 27 Nov 2020 13:01:21 +0100 Subject: [PATCH 01/16] fix(ws client): drop subscriptions when full. This commit changes the behavior in the `WebSocket Client` where each subscription channel is used in a non-blocking matter until it is determined as full or disconnected. When that occurs the channel is simply dropped and when the user `poll` the subscription it will return all sent subscriptions before it was and terminate (return None) once it's polled one last time. Similarly as `Streams` works in Rust. It also adds configuration for the `WebSocket Client` to configure capacity for the different internal channels to avoid filling the buffers when it's not expected. --- benches/benches.rs | 4 +- examples/subscription.rs | 4 +- examples/ws.rs | 4 +- src/client/mod.rs | 4 +- src/client/ws/client.rs | 139 ++++++++++++++++++++++++++----------- src/client/ws/mod.rs | 7 +- src/client/ws/transport.rs | 5 +- src/ws/tests.rs | 20 +++--- 8 files changed, 126 insertions(+), 61 deletions(-) diff --git a/benches/benches.rs b/benches/benches.rs index 04510e3861..ba145ef077 100644 --- a/benches/benches.rs +++ b/benches/benches.rs @@ -1,7 +1,7 @@ use async_std::task::block_on; use criterion::*; use futures::channel::oneshot::{self, Sender}; -use jsonrpsee::client::{HttpClient, HttpConfig, WsClient}; +use jsonrpsee::client::{HttpClient, HttpConfig, WsClient, WsConfig}; use jsonrpsee::http::HttpServer; use jsonrpsee::types::jsonrpc::{JsonValue, Params}; use jsonrpsee::ws::WsServer; @@ -77,7 +77,7 @@ pub fn websocket_requests(c: &mut criterion::Criterion) { let (tx_addr, rx_addr) = oneshot::channel::(); async_std::task::spawn(ws_server(tx_addr)); let server_addr = block_on(rx_addr).unwrap(); - let client = Arc::new(block_on(WsClient::new(&format!("ws://{}", server_addr))).unwrap()); + let client = Arc::new(block_on(WsClient::new(&format!("ws://{}", server_addr), WsConfig::default())).unwrap()); c.bench_function("synchronous WebSocket round trip", |b| { b.iter(|| { diff --git a/examples/subscription.rs b/examples/subscription.rs index 0e58daed2c..401a1d9ff7 100644 --- a/examples/subscription.rs +++ b/examples/subscription.rs @@ -26,7 +26,7 @@ use async_std::task; use futures::channel::oneshot::{self, Sender}; -use jsonrpsee::client::{WsClient, WsSubscription}; +use jsonrpsee::client::{WsClient, WsConfig, WsSubscription}; use jsonrpsee::types::jsonrpc::{JsonValue, Params}; use jsonrpsee::ws::WsServer; @@ -44,7 +44,7 @@ async fn main() -> Result<(), Box> { }); server_started_rx.await?; - let client = WsClient::new(SERVER_URI).await?; + let client = WsClient::new(SERVER_URI, WsConfig::default()).await?; let mut subscribe_hello: WsSubscription = client.subscribe("subscribe_hello", Params::None, "unsubscribe_hello").await?; diff --git a/examples/ws.rs b/examples/ws.rs index e4f66c3a43..69cebeb33e 100644 --- a/examples/ws.rs +++ b/examples/ws.rs @@ -26,7 +26,7 @@ use async_std::task; use futures::channel::oneshot::{self, Sender}; -use jsonrpsee::client::WsClient; +use jsonrpsee::client::{WsClient, WsConfig}; use jsonrpsee::types::jsonrpc::{JsonValue, Params}; use jsonrpsee::ws::WsServer; @@ -43,7 +43,7 @@ async fn main() -> Result<(), Box> { }); server_started_rx.await?; - let client = WsClient::new(SERVER_URI).await?; + let client = WsClient::new(SERVER_URI, WsConfig::default()).await?; let response: JsonValue = client.request("say_hello", Params::None).await?; println!("r: {:?}", response); diff --git a/src/client/mod.rs b/src/client/mod.rs index 9fff256970..950f2b3077 100644 --- a/src/client/mod.rs +++ b/src/client/mod.rs @@ -9,4 +9,6 @@ mod ws; #[cfg(feature = "http")] pub use http::{HttpClient, HttpConfig, HttpTransportClient}; #[cfg(feature = "ws")] -pub use ws::{Client as WsClient, RawClient as RawWsClient, Subscription as WsSubscription, WsTransportClient}; +pub use ws::{ + Client as WsClient, Config as WsConfig, RawClient as RawWsClient, Subscription as WsSubscription, WsTransportClient, +}; diff --git a/src/client/ws/client.rs b/src/client/ws/client.rs index 7e6df64330..7353749e6b 100644 --- a/src/client/ws/client.rs +++ b/src/client/ws/client.rs @@ -24,15 +24,20 @@ // IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER // DEALINGS IN THE SOFTWARE. -use crate::client::ws::{RawClient, RawClientEvent, RawClientRequestId, WsTransportClient}; +use crate::client::ws::transport::WsConnectError; +use crate::client::ws::{RawClient, RawClientError, RawClientEvent, RawClientRequestId, WsTransportClient}; use crate::types::error::Error; use crate::types::jsonrpc::{self, JsonValue}; +// NOTE: this is a sign of a leaky abstraction to expose transport related details +// Should be removed after https://github.com/paritytech/jsonrpsee/issues/154 +use soketto::connection::Error as SokettoError; use futures::{ channel::{mpsc, oneshot}, future::Either, pin_mut, prelude::*, + sink::SinkExt, }; use std::{collections::HashMap, io, marker::PhantomData}; @@ -45,16 +50,45 @@ use std::{collections::HashMap, io, marker::PhantomData}; pub struct Client { /// Channel to send requests to the background task. to_back: mpsc::Sender, + /// Config. + config: Config, +} + +#[derive(Copy, Clone, Debug)] +/// Configuration. +pub struct Config { + /// Backend channel for serving requests and notifications. + pub request_channel_capacity: usize, + /// Backend channel for each unique subscription. + pub subscription_channel_capacity: usize, + /// Allow losses when the channel gets full + pub allow_subscription_losses: bool, + /// Allow losses when the request/notifications channel gets full + pub allow_request_losses: bool, + /// Max request body size + pub max_request_body_size: usize, +} + +impl Default for Config { + fn default() -> Self { + Self { + request_channel_capacity: 100, + subscription_channel_capacity: 4, + allow_subscription_losses: false, + allow_request_losses: false, + max_request_body_size: 10 * 1024 * 1024, + } + } } /// Active subscription on a [`Client`]. pub struct Subscription { /// Channel to send requests to the background task. to_back: mpsc::Sender, - /// Channel from which we receive notifications from the server, as undecoded `JsonValue`s. + /// Channel from which we receive notifications from the server, as un-decoded `JsonValue`s. notifs_rx: mpsc::Receiver, /// Marker in order to pin the `Notif` parameter. - marker: PhantomData>, + marker: PhantomData, } /// Message that the [`Client`] can send to the background task. @@ -104,14 +138,16 @@ impl Client { /// Initializes a new WebSocket client /// /// Fails when the URL is invalid. - pub async fn new(target: &str) -> Result { + pub async fn new(target: impl AsRef, config: Config) -> Result { let transport = WsTransportClient::new(target).await.map_err(|e| Error::TransportError(Box::new(e)))?; let client = RawClient::new(transport); - let (to_back, from_front) = mpsc::channel(16); + + let (to_back, from_front) = mpsc::channel(config.request_channel_capacity); + async_std::task::spawn(async move { - background_task(client, from_front).await; + background_task(client, from_front, config).await; }); - Ok(Client { to_back }) + Ok(Client { to_back, config }) } /// Send a notification to the server. @@ -123,7 +159,11 @@ impl Client { let method = method.into(); let params = params.into(); log::trace!("[frontend]: send notification: method={:?}, params={:?}", method, params); - self.to_back.clone().send(FrontToBack::Notification { method, params }).await.map_err(Error::Internal) + self.to_back + .clone() + .send(FrontToBack::Notification { method, params }) + .await + .map_err(|e| Error::Internal(e.into())) } /// Perform a request towards the server. @@ -143,9 +183,7 @@ impl Client { .clone() .send(FrontToBack::StartRequest { method, params, send_back: send_back_tx }) .await - .map_err(Error::Internal)?; - - // TODO: send a `ChannelClosed` message if we close the channel unexpectedly + .map_err(|e| Error::Internal(e.into()))?; let json_value = match send_back_rx.await { Ok(Ok(v)) => v, @@ -179,14 +217,13 @@ impl Client { let (send_back_tx, send_back_rx) = oneshot::channel(); self.to_back .clone() - .send(FrontToBack::Subscribe { + .try_send(FrontToBack::Subscribe { subscribe_method, unsubscribe_method, params: params.into(), send_back: send_back_tx, }) - .await - .map_err(Error::Internal)?; + .map_err(|e| Error::Internal(e.into_send_error().into()))?; let notifs_rx = match send_back_rx.await { Ok(Ok(v)) => v, @@ -205,18 +242,19 @@ impl Subscription where Notif: jsonrpc::DeserializeOwned, { - /// Returns the next notification sent from the server. + /// Returns the next notification from the stream + /// This may return `None` if finished subscription has been terminated /// /// Ignores any malformed packet. - pub async fn next(&mut self) -> Notif { + pub async fn next(&mut self) -> Option { loop { match self.notifs_rx.next().await { Some(n) => { if let Ok(parsed) = jsonrpc::from_value(n) { - return parsed; + return Some(parsed); } } - None => futures::pending!(), + None => return None, } } } @@ -225,20 +263,20 @@ where impl Drop for Subscription { fn drop(&mut self) { // We can't actually guarantee that this goes through. If the background task is busy, then - // the channel's buffer will be full, and our unsubscription request will never make it. + // the channel's buffer will be full, and our un-subscription request will never make it. // However, when a notification arrives, the background task will realize that the channel // to the `Subscription` has been closed, and will perform the unsubscribe. - let _ = self.to_back.send(FrontToBack::ChannelClosed).now_or_never(); + let _ = self.to_back.try_send(FrontToBack::ChannelClosed); } } /// Function being run in the background that processes messages from the frontend. -async fn background_task(mut client: RawClient, mut from_front: mpsc::Receiver) { +async fn background_task(mut client: RawClient, mut from_front: mpsc::Receiver, config: Config) { // List of subscription requests that have been sent to the server, with the method name to // unsubscribe. let mut pending_subscriptions: HashMap, _)> = HashMap::new(); // List of subscription that are active on the server, with the method name to unsubscribe. - let mut active_subscriptions: HashMap, _)> = HashMap::new(); + let mut active_subscriptions: HashMap, _)> = HashMap::new(); // List of requests that the server must answer. let mut ongoing_requests: HashMap>> = HashMap::new(); @@ -300,9 +338,10 @@ async fn background_task(mut client: RawClient, mut from_front: mpsc::Receiver { - // TODO: there's no way to cancel pending subscriptions and requests, otherwise - // we should clean them up as well + //TODO: there's no way to cancel pending subscriptions and requests + //TODO(niklasad1): using `iter().find()` is wrong, it's guessing (could close down the wrong channel) and inefficient while let Some(rq_id) = active_subscriptions.iter().find(|(_, (v, _))| v.is_closed()).map(|(k, _)| *k) { let (_, unsubscribe) = active_subscriptions.remove(&rq_id).unwrap(); client.subscription_by_id(rq_id).unwrap().into_active().unwrap().close(unsubscribe).await.unwrap(); @@ -315,15 +354,16 @@ async fn background_task(mut client: RawClient, mut from_front: mpsc::Receiver { log::trace!("[backend]: client received response to subscription: {:?}", result); let (send_back, unsubscribe) = pending_subscriptions.remove(&request_id).unwrap(); if let Err(err) = result { let _ = send_back.send(Err(Error::Request(err))); } else { - // TODO: what's a good limit here? way more tricky than it looks - let (notifs_tx, notifs_rx) = mpsc::channel(4); + let (notifs_tx, notifs_rx) = mpsc::channel(config.subscription_channel_capacity); + + // Send receiving end of `subscription channel` to the frontend if send_back.send(Ok(notifs_rx)).is_ok() { active_subscriptions.insert(request_id, (notifs_tx, unsubscribe)); } else { @@ -339,28 +379,47 @@ async fn background_task(mut client: RawClient, mut from_front: mpsc::Receiver { - // TODO: unsubscribe if channel is closed - let (notifs_tx, _) = active_subscriptions.get_mut(&request_id).unwrap(); - if notifs_tx.send(result).await.is_err() { - let (_, unsubscribe) = active_subscriptions.remove(&request_id).unwrap(); - client - .subscription_by_id(request_id) - .unwrap() - .into_active() - .unwrap() - .close(unsubscribe) - .await - .unwrap(); + let notifs_tx = match active_subscriptions.get_mut(&request_id) { + None => { + log::debug!("Invalid subscription response: {:?}", request_id); + continue; + } + Some((notifs_tx, _)) => notifs_tx, + }; + + match notifs_tx.try_send(result) { + Ok(()) => (), + // Channel is either full or disconnected, close it. + Err(e) => { + log::error!("Subscription ID: {:?} failed: {:?}", request_id, e); + let (_, unsubscribe) = active_subscriptions.remove(&request_id).unwrap(); + client + .subscription_by_id(request_id) + .unwrap() + .into_active() + .unwrap() + .close(unsubscribe) + .await + .unwrap(); + } } } // Request for the server to unsubscribe us has succeeded. Either::Right(Ok(RawClientEvent::Unsubscribed { request_id: _ })) => {} + Either::Right(Err(RawClientError::Inner(WsConnectError::Ws(SokettoError::UnexpectedOpCode(e))))) => { + log::error!( + "Client Error: {:?}, ", + SokettoError::UnexpectedOpCode(e) + ); + } Either::Right(Err(e)) => { // TODO: https://github.com/paritytech/jsonrpsee/issues/67 - log::error!("Client Error: {:?}", e); + log::error!("Client Error: {:?} terminating connection", e); + break; } } } diff --git a/src/client/ws/mod.rs b/src/client/ws/mod.rs index b23d0d920d..b27dbe8d92 100644 --- a/src/client/ws/mod.rs +++ b/src/client/ws/mod.rs @@ -3,6 +3,9 @@ pub mod raw; pub mod stream; pub mod transport; -pub use client::{Client, Subscription}; -pub use raw::{RawClient, RawClientEvent, RawClientRequestId}; +#[cfg(test)] +mod tests; + +pub use client::{Client, Config, Subscription}; +pub use raw::{RawClient, RawClientError, RawClientEvent, RawClientRequestId}; pub use transport::{WsConnectError, WsTransportClient}; diff --git a/src/client/ws/transport.rs b/src/client/ws/transport.rs index e4e98ea496..62362c4051 100644 --- a/src/client/ws/transport.rs +++ b/src/client/ws/transport.rs @@ -159,8 +159,9 @@ impl WsTransportClient { } /// Initializes a new WS client from a URL. - pub async fn new(target: &str) -> Result { - let url = url::Url::parse(target).map_err(|e| WsNewDnsError::Url(format!("Invalid URL: {}", e).into()))?; + pub async fn new(target: impl AsRef) -> Result { + let url = + url::Url::parse(target.as_ref()).map_err(|e| WsNewDnsError::Url(format!("Invalid URL: {}", e).into()))?; let mode = match url.scheme() { "ws" => Mode::Plain, "wss" => Mode::Tls, diff --git a/src/ws/tests.rs b/src/ws/tests.rs index 5063178074..f9b6b4ed11 100644 --- a/src/ws/tests.rs +++ b/src/ws/tests.rs @@ -1,6 +1,6 @@ #![cfg(test)] -use crate::client::{WsClient, WsSubscription}; +use crate::client::{WsClient, WsConfig, WsSubscription}; use crate::types::error::Error; use crate::types::jsonrpc::{JsonValue, Params}; use crate::ws::WsServer; @@ -97,15 +97,15 @@ async fn subscription_works() { server_subscribe_only(server_started_tx); let server_addr = server_started_rx.await.unwrap(); let uri = format!("ws://{}", server_addr); - let client = WsClient::new(&uri).await.unwrap(); + let client = WsClient::new(&uri, WsConfig::default()).await.unwrap(); let mut hello_sub: WsSubscription = client.subscribe("subscribe_hello", Params::None, "unsubscribe_hello").await.unwrap(); let mut foo_sub: WsSubscription = client.subscribe("subscribe_foo", Params::None, "unsubscribe_foo").await.unwrap(); for _ in 0..10 { - let hello = hello_sub.next().await; - let foo = foo_sub.next().await; + let hello = hello_sub.next().await.unwrap(); + let foo = foo_sub.next().await.unwrap(); assert_eq!(hello, JsonValue::String("hello from subscription".to_owned())); assert_eq!(foo, JsonValue::Number(1337_u64.into())); } @@ -120,7 +120,7 @@ async fn subscription_several_clients() { let mut clients = Vec::with_capacity(10); for _ in 0..10 { let uri = format!("ws://{}", server_addr); - let client = WsClient::new(&uri).await.unwrap(); + let client = WsClient::new(&uri, WsConfig::default()).await.unwrap(); let hello_sub: WsSubscription = client.subscribe("subscribe_hello", Params::None, "unsubscribe_hello").await.unwrap(); let foo_sub: WsSubscription = @@ -130,8 +130,8 @@ async fn subscription_several_clients() { for _ in 0..10 { for (_client, hello_sub, foo_sub) in &mut clients { - let hello = hello_sub.next().await; - let foo = foo_sub.next().await; + let hello = hello_sub.next().await.unwrap(); + let foo = foo_sub.next().await.unwrap(); assert_eq!(hello, JsonValue::String("hello from subscription".to_owned())); assert_eq!(foo, JsonValue::Number(1337_u64.into())); } @@ -142,13 +142,13 @@ async fn subscription_several_clients() { drop(client); } - // make sure nothing weird happend after dropping half the clients (should be `unsubscribed` in the server) + // make sure nothing weird happened after dropping half the clients (should be `unsubscribed` in the server) // would be good to know that subscriptions actually were removed but not possible to verify at // this layer. for _ in 0..10 { for (_client, hello_sub, foo_sub) in &mut clients { - let hello = hello_sub.next().await; - let foo = foo_sub.next().await; + let hello = hello_sub.next().await.unwrap(); + let foo = foo_sub.next().await.unwrap(); assert_eq!(hello, JsonValue::String("hello from subscription".to_owned())); assert_eq!(foo, JsonValue::Number(1337_u64.into())); } From 15cae68643a5fc070181630afabaefa311e0a5bf Mon Sep 17 00:00:00 2001 From: Niklas Date: Fri, 27 Nov 2020 13:04:51 +0100 Subject: [PATCH 02/16] tests(ws client): simple subscription test. --- src/client/ws/tests.rs | 78 ++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 78 insertions(+) create mode 100644 src/client/ws/tests.rs diff --git a/src/client/ws/tests.rs b/src/client/ws/tests.rs new file mode 100644 index 0000000000..093d2dacf6 --- /dev/null +++ b/src/client/ws/tests.rs @@ -0,0 +1,78 @@ +#![cfg(test)] + +use crate::client::{WsClient, WsConfig, WsSubscription}; +use crate::types::jsonrpc::{JsonValue, Params}; +use crate::ws::WsServer; + +use std::net::SocketAddr; +use std::time::Duration; + +use futures::channel::oneshot::{self, Sender}; +use futures::future::FutureExt; + +pub fn server(server_started: Sender) { + std::thread::spawn(move || { + let mut rt = tokio::runtime::Runtime::new().unwrap(); + + let server = rt.block_on(WsServer::new("127.0.0.1:0")).unwrap(); + let mut sub = + server.register_subscription("subscribe_hello".to_owned(), "unsubscribe_hello".to_owned()).unwrap(); + server_started.send(*server.local_addr()).unwrap(); + let mut call = server.register_method("say_hello".to_owned()).unwrap(); + + rt.block_on(async move { + loop { + let hello_fut = async { + let handle = call.next().await; + handle.respond(Ok(JsonValue::String("hello".to_owned()))).await.unwrap(); + } + .fuse(); + + let timeout = tokio::time::delay_for(Duration::from_millis(200)).fuse(); + + futures::pin_mut!(hello_fut, timeout); + + futures::select! { + _ = hello_fut => (), + _ = timeout => { + sub.send(JsonValue::String("hello from subscription".to_owned())).await.unwrap(); + } + } + } + }); + }); +} + +#[tokio::test] +async fn subscription_without_polling_doesnt_make_client_unuseable() { + env_logger::init(); + let (server_started_tx, server_started_rx) = oneshot::channel::(); + server(server_started_tx); + let server_addr = server_started_rx.await.unwrap(); + + let uri = format!("ws://{}", server_addr); + let client = + WsClient::new(&uri, WsConfig { subscription_channel_capacity: 4, ..Default::default() }).await.unwrap(); + let mut hello_sub: WsSubscription = + client.subscribe("subscribe_hello", Params::None, "unsubscribe_hello").await.unwrap(); + + // don't poll the subscription stream for 2 seconds, should be full now. + std::thread::sleep(Duration::from_secs(2)); + + // Capacity is `num_sender` + `capacity` + for _ in 0..5 { + assert!(hello_sub.next().await.is_some()); + } + + // NOTE: this is now unuseable and unregistered. + assert!(hello_sub.next().await.is_none()); + + // The client should still be useable => make sure it still works. + let _hello_req: JsonValue = client.request("say_hello", Params::None).await.unwrap(); + + // The same subscription should be possible to register again. + let mut other_sub: WsSubscription = + client.subscribe("subscribe_hello", Params::None, "unsubscribe_hello").await.unwrap(); + + other_sub.next().await.unwrap(); +} From 29beb50b79431f4793ecbd72370d2e410aa0b4c1 Mon Sep 17 00:00:00 2001 From: Niklas Date: Fri, 27 Nov 2020 13:28:00 +0100 Subject: [PATCH 03/16] fix: nits --- src/client/ws/client.rs | 10 ++++++---- src/client/ws/tests.rs | 1 - 2 files changed, 6 insertions(+), 5 deletions(-) diff --git a/src/client/ws/client.rs b/src/client/ws/client.rs index 7353749e6b..395e37c61b 100644 --- a/src/client/ws/client.rs +++ b/src/client/ws/client.rs @@ -217,13 +217,14 @@ impl Client { let (send_back_tx, send_back_rx) = oneshot::channel(); self.to_back .clone() - .try_send(FrontToBack::Subscribe { + .send(FrontToBack::Subscribe { subscribe_method, unsubscribe_method, params: params.into(), send_back: send_back_tx, }) - .map_err(|e| Error::Internal(e.into_send_error().into()))?; + .await + .map_err(Error::Internal)?; let notifs_rx = match send_back_rx.await { Ok(Ok(v)) => v, @@ -233,7 +234,6 @@ impl Client { return Err(Error::TransportError(Box::new(err))); } }; - Ok(Subscription { to_back: self.to_back.clone(), notifs_rx, marker: PhantomData }) } } @@ -354,7 +354,7 @@ async fn background_task(mut client: RawClient, mut from_front: mpsc::Receiver { log::trace!("[backend]: client received response to subscription: {:?}", result); let (send_back, unsubscribe) = pending_subscriptions.remove(&request_id).unwrap(); @@ -389,6 +389,8 @@ async fn background_task(mut client: RawClient, mut from_front: mpsc::Receiver notifs_tx, }; + // NOTE: This is non_blocking but doesn't depend on any external handling to finish + // such as a response from a remote party. match notifs_tx.try_send(result) { Ok(()) => (), // Channel is either full or disconnected, close it. diff --git a/src/client/ws/tests.rs b/src/client/ws/tests.rs index 093d2dacf6..7b4ea602d4 100644 --- a/src/client/ws/tests.rs +++ b/src/client/ws/tests.rs @@ -29,7 +29,6 @@ pub fn server(server_started: Sender) { .fuse(); let timeout = tokio::time::delay_for(Duration::from_millis(200)).fuse(); - futures::pin_mut!(hello_fut, timeout); futures::select! { From f50becda3378ad36f73bf31c8548649b7ac94477 Mon Sep 17 00:00:00 2001 From: Niklas Adolfsson Date: Fri, 27 Nov 2020 13:53:09 +0100 Subject: [PATCH 04/16] Update src/client/ws/client.rs --- src/client/ws/client.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/client/ws/client.rs b/src/client/ws/client.rs index 395e37c61b..eebf934202 100644 --- a/src/client/ws/client.rs +++ b/src/client/ws/client.rs @@ -389,7 +389,7 @@ async fn background_task(mut client: RawClient, mut from_front: mpsc::Receiver notifs_tx, }; - // NOTE: This is non_blocking but doesn't depend on any external handling to finish + // NOTE: This is non_blocking but it doesn't depend on any external handling to finish // such as a response from a remote party. match notifs_tx.try_send(result) { Ok(()) => (), From 213e9667b701f5993ecd9647cdfd57b79491a0e3 Mon Sep 17 00:00:00 2001 From: Niklas Date: Fri, 27 Nov 2020 18:32:31 +0100 Subject: [PATCH 05/16] refactor(tests): introduce integration_tests Make the repo structure more understable w.r.t testing. --- src/client/ws/tests.rs | 76 ------------------- src/http/mod.rs | 1 + src/ws/tests.rs | 91 +--------------------- tests/helpers.rs | 61 +++++++++++++++ tests/integration_tests.rs | 151 +++++++++++++++++++++++++++++++++++++ 5 files changed, 214 insertions(+), 166 deletions(-) create mode 100644 tests/helpers.rs create mode 100644 tests/integration_tests.rs diff --git a/src/client/ws/tests.rs b/src/client/ws/tests.rs index 7b4ea602d4..67005ab4bc 100644 --- a/src/client/ws/tests.rs +++ b/src/client/ws/tests.rs @@ -1,77 +1 @@ #![cfg(test)] - -use crate::client::{WsClient, WsConfig, WsSubscription}; -use crate::types::jsonrpc::{JsonValue, Params}; -use crate::ws::WsServer; - -use std::net::SocketAddr; -use std::time::Duration; - -use futures::channel::oneshot::{self, Sender}; -use futures::future::FutureExt; - -pub fn server(server_started: Sender) { - std::thread::spawn(move || { - let mut rt = tokio::runtime::Runtime::new().unwrap(); - - let server = rt.block_on(WsServer::new("127.0.0.1:0")).unwrap(); - let mut sub = - server.register_subscription("subscribe_hello".to_owned(), "unsubscribe_hello".to_owned()).unwrap(); - server_started.send(*server.local_addr()).unwrap(); - let mut call = server.register_method("say_hello".to_owned()).unwrap(); - - rt.block_on(async move { - loop { - let hello_fut = async { - let handle = call.next().await; - handle.respond(Ok(JsonValue::String("hello".to_owned()))).await.unwrap(); - } - .fuse(); - - let timeout = tokio::time::delay_for(Duration::from_millis(200)).fuse(); - futures::pin_mut!(hello_fut, timeout); - - futures::select! { - _ = hello_fut => (), - _ = timeout => { - sub.send(JsonValue::String("hello from subscription".to_owned())).await.unwrap(); - } - } - } - }); - }); -} - -#[tokio::test] -async fn subscription_without_polling_doesnt_make_client_unuseable() { - env_logger::init(); - let (server_started_tx, server_started_rx) = oneshot::channel::(); - server(server_started_tx); - let server_addr = server_started_rx.await.unwrap(); - - let uri = format!("ws://{}", server_addr); - let client = - WsClient::new(&uri, WsConfig { subscription_channel_capacity: 4, ..Default::default() }).await.unwrap(); - let mut hello_sub: WsSubscription = - client.subscribe("subscribe_hello", Params::None, "unsubscribe_hello").await.unwrap(); - - // don't poll the subscription stream for 2 seconds, should be full now. - std::thread::sleep(Duration::from_secs(2)); - - // Capacity is `num_sender` + `capacity` - for _ in 0..5 { - assert!(hello_sub.next().await.is_some()); - } - - // NOTE: this is now unuseable and unregistered. - assert!(hello_sub.next().await.is_none()); - - // The client should still be useable => make sure it still works. - let _hello_req: JsonValue = client.request("say_hello", Params::None).await.unwrap(); - - // The same subscription should be possible to register again. - let mut other_sub: WsSubscription = - client.subscribe("subscribe_hello", Params::None, "unsubscribe_hello").await.unwrap(); - - other_sub.next().await.unwrap(); -} diff --git a/src/http/mod.rs b/src/http/mod.rs index 5465a820ad..c7859e2423 100644 --- a/src/http/mod.rs +++ b/src/http/mod.rs @@ -31,6 +31,7 @@ mod transport; #[cfg(test)] mod tests; +pub use crate::types::http::HttpConfig; pub use raw::RawServer as HttpRawServer; pub use raw::RawServerEvent as HttpRawServerEvent; pub use raw::TypedResponder as HttpTypedResponder; diff --git a/src/ws/tests.rs b/src/ws/tests.rs index f9b6b4ed11..04347a2eaf 100644 --- a/src/ws/tests.rs +++ b/src/ws/tests.rs @@ -1,8 +1,7 @@ #![cfg(test)] -use crate::client::{WsClient, WsConfig, WsSubscription}; use crate::types::error::Error; -use crate::types::jsonrpc::{JsonValue, Params}; +use crate::types::jsonrpc::JsonValue; use crate::ws::WsServer; use std::net::SocketAddr; @@ -13,28 +12,6 @@ use futures::{pin_mut, select}; use jsonrpsee_test_utils::helpers::*; use jsonrpsee_test_utils::types::{Id, WebSocketTestClient}; -/// Spawns a dummy `JSONRPC v2 WebSocket` that just send subscriptions to `subscribe_hello` and -/// `subscribe_foo`. -// -// TODO: not sure why `tokio::spawn` doesn't works for this. -pub fn server_subscribe_only(server_started: Sender) { - std::thread::spawn(move || { - use async_std::task::block_on; - let server = block_on(WsServer::new("127.0.0.1:0")).unwrap(); - let mut hello = - server.register_subscription("subscribe_hello".to_owned(), "unsubscribe_hello".to_owned()).unwrap(); - let mut foo = server.register_subscription("subscribe_foo".to_owned(), "unsubscribe_foo".to_owned()).unwrap(); - server_started.send(*server.local_addr()).unwrap(); - - loop { - block_on(hello.send(JsonValue::String("hello from subscription".to_owned()))).unwrap(); - std::thread::sleep(std::time::Duration::from_millis(100)); - block_on(foo.send(JsonValue::Number(1337_u64.into()))).unwrap(); - std::thread::sleep(std::time::Duration::from_millis(100)); - } - }); -} - /// Spawns a dummy `JSONRPC v2 WebSocket` /// It has two hardcoded methods "say_hello" and "add", one hardcoded notification "notif" pub async fn server(server_started: Sender) { @@ -89,72 +66,6 @@ async fn single_method_call_works() { assert_eq!(response, ok_response(JsonValue::String("hello".to_owned()), Id::Num(i))); } } - -// TODO: technically more of a integration test because the "real" client is used. -#[tokio::test] -async fn subscription_works() { - let (server_started_tx, server_started_rx) = oneshot::channel::(); - server_subscribe_only(server_started_tx); - let server_addr = server_started_rx.await.unwrap(); - let uri = format!("ws://{}", server_addr); - let client = WsClient::new(&uri, WsConfig::default()).await.unwrap(); - let mut hello_sub: WsSubscription = - client.subscribe("subscribe_hello", Params::None, "unsubscribe_hello").await.unwrap(); - let mut foo_sub: WsSubscription = - client.subscribe("subscribe_foo", Params::None, "unsubscribe_foo").await.unwrap(); - - for _ in 0..10 { - let hello = hello_sub.next().await.unwrap(); - let foo = foo_sub.next().await.unwrap(); - assert_eq!(hello, JsonValue::String("hello from subscription".to_owned())); - assert_eq!(foo, JsonValue::Number(1337_u64.into())); - } -} - -#[tokio::test] -async fn subscription_several_clients() { - let (server_started_tx, server_started_rx) = oneshot::channel::(); - server_subscribe_only(server_started_tx); - let server_addr = server_started_rx.await.unwrap(); - - let mut clients = Vec::with_capacity(10); - for _ in 0..10 { - let uri = format!("ws://{}", server_addr); - let client = WsClient::new(&uri, WsConfig::default()).await.unwrap(); - let hello_sub: WsSubscription = - client.subscribe("subscribe_hello", Params::None, "unsubscribe_hello").await.unwrap(); - let foo_sub: WsSubscription = - client.subscribe("subscribe_foo", Params::None, "unsubscribe_foo").await.unwrap(); - clients.push((client, hello_sub, foo_sub)) - } - - for _ in 0..10 { - for (_client, hello_sub, foo_sub) in &mut clients { - let hello = hello_sub.next().await.unwrap(); - let foo = foo_sub.next().await.unwrap(); - assert_eq!(hello, JsonValue::String("hello from subscription".to_owned())); - assert_eq!(foo, JsonValue::Number(1337_u64.into())); - } - } - - for i in 0..5 { - let (client, _, _) = clients.remove(i); - drop(client); - } - - // make sure nothing weird happened after dropping half the clients (should be `unsubscribed` in the server) - // would be good to know that subscriptions actually were removed but not possible to verify at - // this layer. - for _ in 0..10 { - for (_client, hello_sub, foo_sub) in &mut clients { - let hello = hello_sub.next().await.unwrap(); - let foo = foo_sub.next().await.unwrap(); - assert_eq!(hello, JsonValue::String("hello from subscription".to_owned())); - assert_eq!(foo, JsonValue::Number(1337_u64.into())); - } - } -} - #[tokio::test] async fn single_method_call_with_params_works() { let (server_started_tx, server_started_rx) = oneshot::channel::(); diff --git a/tests/helpers.rs b/tests/helpers.rs new file mode 100644 index 0000000000..1c1cf2e736 --- /dev/null +++ b/tests/helpers.rs @@ -0,0 +1,61 @@ +use jsonrpsee::http::{HttpConfig, HttpServer}; +use jsonrpsee::types::jsonrpc::JsonValue; +use jsonrpsee::ws::WsServer; + +use std::net::SocketAddr; +use std::time::Duration; + +use futures::channel::oneshot::Sender; +use futures::future::FutureExt; + +pub fn websocket_server(server_started: Sender) { + std::thread::spawn(move || { + let mut rt = tokio::runtime::Runtime::new().unwrap(); + + let server = rt.block_on(WsServer::new("127.0.0.1:0")).unwrap(); + let mut sub_hello = + server.register_subscription("subscribe_hello".to_owned(), "unsubscribe_hello".to_owned()).unwrap(); + let mut sub_foo = + server.register_subscription("subscribe_foo".to_owned(), "unsubscribe_foo".to_owned()).unwrap(); + server_started.send(*server.local_addr()).unwrap(); + let mut call = server.register_method("say_hello".to_owned()).unwrap(); + + rt.block_on(async move { + loop { + let hello_fut = async { + let handle = call.next().await; + handle.respond(Ok(JsonValue::String("hello".to_owned()))).await.unwrap(); + } + .fuse(); + + let timeout = tokio::time::delay_for(Duration::from_millis(100)).fuse(); + futures::pin_mut!(hello_fut, timeout); + + futures::select! { + _ = hello_fut => (), + _ = timeout => { + sub_hello.send(JsonValue::String("hello from subscription".to_owned())).await.unwrap(); + sub_foo.send(JsonValue::Number(1337_u64.into())).await.unwrap(); + } + } + } + }); + }); +} + +pub fn http_server(server_started: Sender) { + std::thread::spawn(move || { + let mut rt = tokio::runtime::Runtime::new().unwrap(); + + let server = rt.block_on(HttpServer::new("127.0.0.1:0", HttpConfig::default())).unwrap(); + server_started.send(*server.local_addr()).unwrap(); + let mut call = server.register_method("say_hello".to_owned()).unwrap(); + + rt.block_on(async move { + loop { + let handle = call.next().await; + handle.respond(Ok(JsonValue::String("hello".to_owned()))).await.unwrap(); + } + }); + }); +} diff --git a/tests/integration_tests.rs b/tests/integration_tests.rs new file mode 100644 index 0000000000..e8cc20aca1 --- /dev/null +++ b/tests/integration_tests.rs @@ -0,0 +1,151 @@ +#![cfg(test)] + +mod helpers; + +use std::net::SocketAddr; +use std::time::Duration; + +use futures::channel::oneshot; +use helpers::{http_server, websocket_server}; +use jsonrpsee::client::{HttpClient, HttpConfig, WsClient, WsConfig, WsSubscription}; +use jsonrpsee::types::jsonrpc::{JsonValue, Params}; + +#[tokio::test] +async fn ws_subscription_works() { + let (server_started_tx, server_started_rx) = oneshot::channel::(); + websocket_server(server_started_tx); + let server_addr = server_started_rx.await.unwrap(); + let uri = format!("ws://{}", server_addr); + let client = WsClient::new(&uri, WsConfig::default()).await.unwrap(); + let mut hello_sub: WsSubscription = + client.subscribe("subscribe_hello", Params::None, "unsubscribe_hello").await.unwrap(); + let mut foo_sub: WsSubscription = + client.subscribe("subscribe_foo", Params::None, "unsubscribe_foo").await.unwrap(); + + for _ in 0..10 { + let hello = hello_sub.next().await.unwrap(); + let foo = foo_sub.next().await.unwrap(); + assert_eq!(hello, JsonValue::String("hello from subscription".to_owned())); + assert_eq!(foo, JsonValue::Number(1337_u64.into())); + } +} + +#[tokio::test] +async fn ws_method_call_works() { + let (server_started_tx, server_started_rx) = oneshot::channel::(); + websocket_server(server_started_tx); + let server_addr = server_started_rx.await.unwrap(); + let uri = format!("ws://{}", server_addr); + let client = WsClient::new(&uri, WsConfig::default()).await.unwrap(); + let response: JsonValue = client.request("say_hello", Params::None).await.unwrap(); + assert_eq!(response, JsonValue::String("hello".into())); +} + +#[tokio::test] +async fn http_method_call_works() { + let (server_started_tx, server_started_rx) = oneshot::channel::(); + http_server(server_started_tx); + let server_addr = server_started_rx.await.unwrap(); + let uri = format!("http://{}", server_addr); + let client = HttpClient::new(&uri, HttpConfig::default()).unwrap(); + let response: JsonValue = client.request("say_hello", Params::None).await.unwrap(); + assert_eq!(response, JsonValue::String("hello".into())); +} + +#[tokio::test] +async fn ws_subscription_several_clients() { + let (server_started_tx, server_started_rx) = oneshot::channel::(); + websocket_server(server_started_tx); + let server_addr = server_started_rx.await.unwrap(); + + let mut clients = Vec::with_capacity(10); + for _ in 0..10 { + let uri = format!("ws://{}", server_addr); + let client = WsClient::new(&uri, WsConfig::default()).await.unwrap(); + let hello_sub: WsSubscription = + client.subscribe("subscribe_hello", Params::None, "unsubscribe_hello").await.unwrap(); + let foo_sub: WsSubscription = + client.subscribe("subscribe_foo", Params::None, "unsubscribe_foo").await.unwrap(); + clients.push((client, hello_sub, foo_sub)) + } +} + +#[tokio::test] +async fn ws_subscription_several_clients_with_drop() { + let (server_started_tx, server_started_rx) = oneshot::channel::(); + websocket_server(server_started_tx); + let server_addr = server_started_rx.await.unwrap(); + + let mut clients = Vec::with_capacity(10); + for _ in 0..10 { + let uri = format!("ws://{}", server_addr); + let client = + WsClient::new(&uri, WsConfig { subscription_channel_capacity: u32::MAX as usize, ..Default::default() }) + .await + .unwrap(); + let hello_sub: WsSubscription = + client.subscribe("subscribe_hello", Params::None, "unsubscribe_hello").await.unwrap(); + let foo_sub: WsSubscription = + client.subscribe("subscribe_foo", Params::None, "unsubscribe_foo").await.unwrap(); + clients.push((client, hello_sub, foo_sub)) + } + + for _ in 0..10 { + for (_client, hello_sub, foo_sub) in &mut clients { + let hello = hello_sub.next().await.unwrap(); + let foo = foo_sub.next().await.unwrap(); + assert_eq!(hello, JsonValue::String("hello from subscription".to_owned())); + assert_eq!(foo, JsonValue::Number(1337_u64.into())); + } + } + + for i in 0..5 { + let (client, _, _) = clients.remove(i); + drop(client); + } + + // make sure nothing weird happened after dropping half the clients (should be `unsubscribed` in the server) + // would be good to know that subscriptions actually were removed but not possible to verify at + // this layer. + for _ in 0..10 { + for (_client, hello_sub, foo_sub) in &mut clients { + let hello = hello_sub.next().await.unwrap(); + let foo = foo_sub.next().await.unwrap(); + assert_eq!(hello, JsonValue::String("hello from subscription".to_owned())); + assert_eq!(foo, JsonValue::Number(1337_u64.into())); + } + } +} + +#[tokio::test] +async fn ws_subscription_without_polling_doesnt_make_client_unuseable() { + let (server_started_tx, server_started_rx) = oneshot::channel::(); + websocket_server(server_started_tx); + let server_addr = server_started_rx.await.unwrap(); + + let uri = format!("ws://{}", server_addr); + let client = + WsClient::new(&uri, WsConfig { subscription_channel_capacity: 4, ..Default::default() }).await.unwrap(); + let mut hello_sub: WsSubscription = + client.subscribe("subscribe_hello", Params::None, "unsubscribe_hello").await.unwrap(); + + // don't poll the subscription stream for 2 seconds, should be full now. + std::thread::sleep(Duration::from_secs(2)); + + // Capacity is `num_sender` + `capacity` + for _ in 0..5 { + assert!(hello_sub.next().await.is_some()); + } + + // NOTE: this is now unuseable and unregistered. + assert!(hello_sub.next().await.is_none()); + + // The client should still be useable => make sure it still works. + let _hello_req: JsonValue = client.request("say_hello", Params::None).await.unwrap(); + + // The same subscription should be possible to register again. + let mut other_sub: WsSubscription = + client.subscribe("subscribe_hello", Params::None, "unsubscribe_hello").await.unwrap(); + + other_sub.next().await.unwrap(); +} From 2f72354d8a4f094ce0cbffe38520d7fe5a42b8d2 Mon Sep 17 00:00:00 2001 From: Niklas Date: Fri, 27 Nov 2020 18:51:11 +0100 Subject: [PATCH 06/16] chore(license): add missing license headers --- tests/helpers.rs | 26 ++++++++++++++++++++++++++ tests/integration_tests.rs | 26 ++++++++++++++++++++++++++ 2 files changed, 52 insertions(+) diff --git a/tests/helpers.rs b/tests/helpers.rs index 1c1cf2e736..fcb094fbe7 100644 --- a/tests/helpers.rs +++ b/tests/helpers.rs @@ -1,3 +1,29 @@ +// Copyright 2019-2020 Parity Technologies (UK) Ltd. +// +// Permission is hereby granted, free of charge, to any +// person obtaining a copy of this software and associated +// documentation files (the "Software"), to deal in the +// Software without restriction, including without +// limitation the rights to use, copy, modify, merge, +// publish, distribute, sublicense, and/or sell copies of +// the Software, and to permit persons to whom the Software +// is furnished to do so, subject to the following +// conditions: +// +// The above copyright notice and this permission notice +// shall be included in all copies or substantial portions +// of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF +// ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED +// TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A +// PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT +// SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY +// CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION +// OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR +// IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER +// DEALINGS IN THE SOFTWARE. + use jsonrpsee::http::{HttpConfig, HttpServer}; use jsonrpsee::types::jsonrpc::JsonValue; use jsonrpsee::ws::WsServer; diff --git a/tests/integration_tests.rs b/tests/integration_tests.rs index e8cc20aca1..9922a23df0 100644 --- a/tests/integration_tests.rs +++ b/tests/integration_tests.rs @@ -1,3 +1,29 @@ +// Copyright 2019-2020 Parity Technologies (UK) Ltd. +// +// Permission is hereby granted, free of charge, to any +// person obtaining a copy of this software and associated +// documentation files (the "Software"), to deal in the +// Software without restriction, including without +// limitation the rights to use, copy, modify, merge, +// publish, distribute, sublicense, and/or sell copies of +// the Software, and to permit persons to whom the Software +// is furnished to do so, subject to the following +// conditions: +// +// The above copyright notice and this permission notice +// shall be included in all copies or substantial portions +// of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF +// ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED +// TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A +// PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT +// SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY +// CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION +// OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR +// IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER +// DEALINGS IN THE SOFTWARE. + #![cfg(test)] mod helpers; From 64e480f199440e4ee91a64af56f717f378cc38f8 Mon Sep 17 00:00:00 2001 From: Niklas Adolfsson Date: Sat, 28 Nov 2020 12:26:52 +0100 Subject: [PATCH 07/16] Update src/client/ws/client.rs --- src/client/ws/client.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/client/ws/client.rs b/src/client/ws/client.rs index eebf934202..2db5b60950 100644 --- a/src/client/ws/client.rs +++ b/src/client/ws/client.rs @@ -85,7 +85,7 @@ impl Default for Config { pub struct Subscription { /// Channel to send requests to the background task. to_back: mpsc::Sender, - /// Channel from which we receive notifications from the server, as un-decoded `JsonValue`s. + /// Channel from which we receive notifications from the server, as undecoded `JsonValue`s. notifs_rx: mpsc::Receiver, /// Marker in order to pin the `Notif` parameter. marker: PhantomData, From b5392d2b66e2566ef5e3e1444124251d4d164608 Mon Sep 17 00:00:00 2001 From: Niklas Adolfsson Date: Sat, 28 Nov 2020 12:27:51 +0100 Subject: [PATCH 08/16] Update src/client/ws/client.rs --- src/client/ws/client.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/client/ws/client.rs b/src/client/ws/client.rs index 2db5b60950..d2d7d9cd43 100644 --- a/src/client/ws/client.rs +++ b/src/client/ws/client.rs @@ -263,7 +263,7 @@ where impl Drop for Subscription { fn drop(&mut self) { // We can't actually guarantee that this goes through. If the background task is busy, then - // the channel's buffer will be full, and our un-subscription request will never make it. + // the channel's buffer will be full, and our unsubscription request will never make it. // However, when a notification arrives, the background task will realize that the channel // to the `Subscription` has been closed, and will perform the unsubscribe. let _ = self.to_back.try_send(FrontToBack::ChannelClosed); From 4dd85a59c3726dcbdb53e1760a90d66fb7cf7c02 Mon Sep 17 00:00:00 2001 From: Niklas Date: Sat, 28 Nov 2020 12:43:25 +0100 Subject: [PATCH 09/16] style: remove unintended spaces. --- tests/helpers.rs | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/tests/helpers.rs b/tests/helpers.rs index fcb094fbe7..3bf5f8c811 100644 --- a/tests/helpers.rs +++ b/tests/helpers.rs @@ -56,12 +56,11 @@ pub fn websocket_server(server_started: Sender) { let timeout = tokio::time::delay_for(Duration::from_millis(100)).fuse(); futures::pin_mut!(hello_fut, timeout); - futures::select! { _ = hello_fut => (), _ = timeout => { - sub_hello.send(JsonValue::String("hello from subscription".to_owned())).await.unwrap(); - sub_foo.send(JsonValue::Number(1337_u64.into())).await.unwrap(); + sub_hello.send(JsonValue::String("hello from subscription".to_owned())).await.unwrap(); + sub_foo.send(JsonValue::Number(1337_u64.into())).await.unwrap(); } } } From 29ffcd8138c0f50ffc5562529d26ea15247ace3f Mon Sep 17 00:00:00 2001 From: Niklas Date: Sun, 29 Nov 2020 13:08:59 +0100 Subject: [PATCH 10/16] tests: add concurrent deadlock test Ensure that if more than the requested channel buffer capacity is exceeded it should not deadlock. Such as spawning alot of concurrent requests, notifications or new subscriptions. --- tests/helpers.rs | 22 ++++++++++++++++++++-- tests/integration_tests.rs | 28 +++++++++++++++++++++++++++- 2 files changed, 47 insertions(+), 3 deletions(-) diff --git a/tests/helpers.rs b/tests/helpers.rs index 3bf5f8c811..d06dedf2b6 100644 --- a/tests/helpers.rs +++ b/tests/helpers.rs @@ -31,7 +31,7 @@ use jsonrpsee::ws::WsServer; use std::net::SocketAddr; use std::time::Duration; -use futures::channel::oneshot::Sender; +use futures::channel::oneshot::{Receiver, Sender}; use futures::future::FutureExt; pub fn websocket_server(server_started: Sender) { @@ -43,8 +43,8 @@ pub fn websocket_server(server_started: Sender) { server.register_subscription("subscribe_hello".to_owned(), "unsubscribe_hello".to_owned()).unwrap(); let mut sub_foo = server.register_subscription("subscribe_foo".to_owned(), "unsubscribe_foo".to_owned()).unwrap(); - server_started.send(*server.local_addr()).unwrap(); let mut call = server.register_method("say_hello".to_owned()).unwrap(); + server_started.send(*server.local_addr()).unwrap(); rt.block_on(async move { loop { @@ -68,6 +68,24 @@ pub fn websocket_server(server_started: Sender) { }); } +pub fn websocket_server_with_wait_period(server_started: Sender, wait: Receiver<()>) { + std::thread::spawn(move || { + let mut rt = tokio::runtime::Runtime::new().unwrap(); + + let server = rt.block_on(WsServer::new("127.0.0.1:0")).unwrap(); + let mut respond = server.register_method("say_hello".to_owned()).unwrap(); + server_started.send(*server.local_addr()).unwrap(); + + rt.block_on(async move { + wait.await.unwrap(); + loop { + let handle = respond.next().await; + handle.respond(Ok(JsonValue::String("hello".to_owned()))).await.unwrap(); + } + }); + }); +} + pub fn http_server(server_started: Sender) { std::thread::spawn(move || { let mut rt = tokio::runtime::Runtime::new().unwrap(); diff --git a/tests/integration_tests.rs b/tests/integration_tests.rs index 9922a23df0..5f558f245e 100644 --- a/tests/integration_tests.rs +++ b/tests/integration_tests.rs @@ -32,7 +32,7 @@ use std::net::SocketAddr; use std::time::Duration; use futures::channel::oneshot; -use helpers::{http_server, websocket_server}; +use helpers::{http_server, websocket_server, websocket_server_with_wait_period}; use jsonrpsee::client::{HttpClient, HttpConfig, WsClient, WsConfig, WsSubscription}; use jsonrpsee::types::jsonrpc::{JsonValue, Params}; @@ -175,3 +175,29 @@ async fn ws_subscription_without_polling_doesnt_make_client_unuseable() { other_sub.next().await.unwrap(); } + +#[tokio::test] +async fn ws_more_request_than_buffer_should_not_deadlock() { + env_logger::init(); + let (server_started_tx, server_started_rx) = oneshot::channel::(); + let (concurrent_tx, concurrent_rx) = oneshot::channel::<()>(); + websocket_server_with_wait_period(server_started_tx, concurrent_rx); + let server_addr = server_started_rx.await.unwrap(); + + let uri = format!("ws://{}", server_addr); + let client = WsClient::new(&uri, WsConfig { request_channel_capacity: 2, ..Default::default() }).await.unwrap(); + + let mut requests = Vec::new(); + //NOTE: we use less than 8 because of https://github.com/paritytech/jsonrpsee/issues/168. + for _ in 0..6 { + let c = client.clone(); + requests.push(tokio::spawn(async move { + let _: JsonValue = c.request("say_hello", Params::None).await.unwrap(); + })); + } + + concurrent_tx.send(()).unwrap(); + for req in requests { + req.await.unwrap(); + } +} From f82277fd3f2178ca8621f0edfba6fa99e7e0751b Mon Sep 17 00:00:00 2001 From: Niklas Adolfsson Date: Sun, 29 Nov 2020 19:39:31 +0100 Subject: [PATCH 11/16] Update src/client/ws/client.rs --- src/client/ws/client.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/client/ws/client.rs b/src/client/ws/client.rs index d2d7d9cd43..8f7fd81a8e 100644 --- a/src/client/ws/client.rs +++ b/src/client/ws/client.rs @@ -243,7 +243,7 @@ where Notif: jsonrpc::DeserializeOwned, { /// Returns the next notification from the stream - /// This may return `None` if finished subscription has been terminated + /// This may return `None` if the subscription has been terminated, may happen if the channel becomes full or dropped. /// /// Ignores any malformed packet. pub async fn next(&mut self) -> Option { From 7678ed4642eaee56254b05c809e04b7e106068b8 Mon Sep 17 00:00:00 2001 From: Niklas Date: Tue, 1 Dec 2020 10:48:59 +0100 Subject: [PATCH 12/16] fix: review grumbles --- src/client/ws/client.rs | 74 +++++++++++++++++++------------------- tests/integration_tests.rs | 1 - 2 files changed, 37 insertions(+), 38 deletions(-) diff --git a/src/client/ws/client.rs b/src/client/ws/client.rs index 8f7fd81a8e..73270b7b4b 100644 --- a/src/client/ws/client.rs +++ b/src/client/ws/client.rs @@ -61,10 +61,6 @@ pub struct Config { pub request_channel_capacity: usize, /// Backend channel for each unique subscription. pub subscription_channel_capacity: usize, - /// Allow losses when the channel gets full - pub allow_subscription_losses: bool, - /// Allow losses when the request/notifications channel gets full - pub allow_request_losses: bool, /// Max request body size pub max_request_body_size: usize, } @@ -74,8 +70,6 @@ impl Default for Config { Self { request_channel_capacity: 100, subscription_channel_capacity: 4, - allow_subscription_losses: false, - allow_request_losses: false, max_request_body_size: 10 * 1024 * 1024, } } @@ -249,11 +243,10 @@ where pub async fn next(&mut self) -> Option { loop { match self.notifs_rx.next().await { - Some(n) => { - if let Ok(parsed) = jsonrpc::from_value(n) { - return Some(parsed); - } - } + Some(n) => match jsonrpc::from_value(n) { + Ok(parsed) => return Some(parsed), + Err(e) => log::error!("Subscription response error: {:?}", e), + }, None => return None, } } @@ -338,20 +331,29 @@ async fn background_task(mut client: RawClient, mut from_front: mpsc::Receiver { //TODO: there's no way to cancel pending subscriptions and requests - //TODO(niklasad1): using `iter().find()` is wrong, it's guessing (could close down the wrong channel) and inefficient - while let Some(rq_id) = active_subscriptions.iter().find(|(_, (v, _))| v.is_closed()).map(|(k, _)| *k) { - let (_, unsubscribe) = active_subscriptions.remove(&rq_id).unwrap(); - client.subscription_by_id(rq_id).unwrap().into_active().unwrap().close(unsubscribe).await.unwrap(); + //TODO: https://github.com/paritytech/jsonrpsee/issues/169 + while let Some(req_id) = active_subscriptions.iter().find(|(_, (v, _))| v.is_closed()).map(|(k, _)| *k) + { + let (_, unsubscribe) = + active_subscriptions.remove(&req_id).expect("Subscription is active checked above; qed"); + close_subscription(&mut client, req_id, unsubscribe).await; } } // Received a response to a request from the server. Either::Right(Ok(RawClientEvent::Response { request_id, result })) => { log::trace!("[backend] client received response to req={:?}, result={:?}", request_id, result); - let _ = ongoing_requests.remove(&request_id).unwrap().send(result.map_err(Error::Request)); + match ongoing_requests.remove(&request_id) { + Some(r) => { + if let Err(e) = r.send(result.map_err(Error::Request)) { + log::error!("Could not dispatch pending request: {:?}", e); + } + } + None => log::error!("Invalid response ID to request received"), + } } // Received a response from the server that a subscription is registered. @@ -367,14 +369,7 @@ async fn background_task(mut client: RawClient, mut from_front: mpsc::Receiver notifs_tx, }; - // NOTE: This is non_blocking but it doesn't depend on any external handling to finish - // such as a response from a remote party. match notifs_tx.try_send(result) { Ok(()) => (), // Channel is either full or disconnected, close it. Err(e) => { log::error!("Subscription ID: {:?} failed: {:?}", request_id, e); - let (_, unsubscribe) = active_subscriptions.remove(&request_id).unwrap(); - client - .subscription_by_id(request_id) - .unwrap() - .into_active() - .unwrap() - .close(unsubscribe) - .await - .unwrap(); + let (_, unsubscribe) = + active_subscriptions.remove(&request_id).expect("Request is active checked above; qed"); + close_subscription(&mut client, request_id, unsubscribe).await; } } } - // Request for the server to unsubscribe us has succeeded. + // Request for the server to unsubscribe to us has succeeded. Either::Right(Ok(RawClientEvent::Unsubscribed { request_id: _ })) => {} Either::Right(Err(RawClientError::Inner(WsConnectError::Ws(SokettoError::UnexpectedOpCode(e))))) => { @@ -426,3 +413,16 @@ async fn background_task(mut client: RawClient, mut from_front: mpsc::Receiver { + if let Err(e) = sub.close(&unsubscribe_method).await { + log::error!("RequestID : {:?}, unsubscribe to {} failed: {:?}", request_id, unsubscribe_method, e); + } + } + None => log::error!("Request ID: {:?}, not an active subscription", request_id), + } +} diff --git a/tests/integration_tests.rs b/tests/integration_tests.rs index 5f558f245e..dec44e24aa 100644 --- a/tests/integration_tests.rs +++ b/tests/integration_tests.rs @@ -178,7 +178,6 @@ async fn ws_subscription_without_polling_doesnt_make_client_unuseable() { #[tokio::test] async fn ws_more_request_than_buffer_should_not_deadlock() { - env_logger::init(); let (server_started_tx, server_started_rx) = oneshot::channel::(); let (concurrent_tx, concurrent_rx) = oneshot::channel::<()>(); websocket_server_with_wait_period(server_started_tx, concurrent_rx); From 2645f96a2c4d5e884f060e672e1dfa36d911ff1e Mon Sep 17 00:00:00 2001 From: Niklas Date: Tue, 1 Dec 2020 10:54:27 +0100 Subject: [PATCH 13/16] fix nits: `remove needless closure` --- src/client/ws/client.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/client/ws/client.rs b/src/client/ws/client.rs index 73270b7b4b..473fa75f3b 100644 --- a/src/client/ws/client.rs +++ b/src/client/ws/client.rs @@ -157,7 +157,7 @@ impl Client { .clone() .send(FrontToBack::Notification { method, params }) .await - .map_err(|e| Error::Internal(e.into())) + .map_err(Error::Internal) } /// Perform a request towards the server. @@ -177,7 +177,7 @@ impl Client { .clone() .send(FrontToBack::StartRequest { method, params, send_back: send_back_tx }) .await - .map_err(|e| Error::Internal(e.into()))?; + .map_err(Error::Internal)?; let json_value = match send_back_rx.await { Ok(Ok(v)) => v, From 488d626edfc3719cafb035db0e88091965a1f270 Mon Sep 17 00:00:00 2001 From: Niklas Date: Tue, 1 Dec 2020 12:10:58 +0100 Subject: [PATCH 14/16] fix: cargo fmt --- src/client/ws/client.rs | 6 +----- 1 file changed, 1 insertion(+), 5 deletions(-) diff --git a/src/client/ws/client.rs b/src/client/ws/client.rs index 473fa75f3b..dd40622453 100644 --- a/src/client/ws/client.rs +++ b/src/client/ws/client.rs @@ -153,11 +153,7 @@ impl Client { let method = method.into(); let params = params.into(); log::trace!("[frontend]: send notification: method={:?}, params={:?}", method, params); - self.to_back - .clone() - .send(FrontToBack::Notification { method, params }) - .await - .map_err(Error::Internal) + self.to_back.clone().send(FrontToBack::Notification { method, params }).await.map_err(Error::Internal) } /// Perform a request towards the server. From bba15941a16d219be67e4923ac68f9c5a67c290f Mon Sep 17 00:00:00 2001 From: Niklas Adolfsson Date: Tue, 1 Dec 2020 14:22:16 +0100 Subject: [PATCH 15/16] Update src/client/ws/client.rs Co-authored-by: David --- src/client/ws/client.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/client/ws/client.rs b/src/client/ws/client.rs index dd40622453..bd96e1d12e 100644 --- a/src/client/ws/client.rs +++ b/src/client/ws/client.rs @@ -348,7 +348,7 @@ async fn background_task(mut client: RawClient, mut from_front: mpsc::Receiver log::error!("Invalid response ID to request received"), + None => log::error!("No pending response found for request ID {}", request_id), } } From 3e94f6a7579e087f5a71442321ed319fca161534 Mon Sep 17 00:00:00 2001 From: Niklas Date: Tue, 1 Dec 2020 19:36:48 +0100 Subject: [PATCH 16/16] fix more nits --- src/client/ws/client.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/client/ws/client.rs b/src/client/ws/client.rs index bd96e1d12e..60f1bf3ae8 100644 --- a/src/client/ws/client.rs +++ b/src/client/ws/client.rs @@ -345,10 +345,10 @@ async fn background_task(mut client: RawClient, mut from_front: mpsc::Receiver { if let Err(e) = r.send(result.map_err(Error::Request)) { - log::error!("Could not dispatch pending request: {:?}", e); + log::error!("Could not dispatch pending request ID: {:?}, error: {:?}", request_id, e); } } - None => log::error!("No pending response found for request ID {}", request_id), + None => log::error!("No pending response found for request ID {:?}", request_id), } }