From d0bd00af7653ff437f1f9ed596bb0778772f388b Mon Sep 17 00:00:00 2001 From: Niklas Adolfsson Date: Tue, 26 Oct 2021 16:18:07 +0200 Subject: [PATCH 1/8] reproduce Kian's issue --- tests/Cargo.toml | 4 +++- tests/tests/helpers.rs | 7 +++++++ tests/tests/integration_tests.rs | 23 +++++++++++++++++++++++ ws-server/src/server.rs | 3 +++ 4 files changed, 36 insertions(+), 1 deletion(-) diff --git a/tests/Cargo.toml b/tests/Cargo.toml index e3f7682901..4494fb754e 100644 --- a/tests/Cargo.toml +++ b/tests/Cargo.toml @@ -8,9 +8,11 @@ license = "MIT" publish = false [dev-dependencies] +env_logger = "0.8" +tracing-subscriber = "0.3.1" +tracing = "0.1" beef = { version = "0.5.1", features = ["impl_serde"] } futures = { version = "0.3.14", default-features = false, features = ["std"] } jsonrpsee = { path = "../jsonrpsee", features = ["full"] } tokio = { version = "1", features = ["full"] } serde_json = "1" -tracing = "0.1" diff --git a/tests/tests/helpers.rs b/tests/tests/helpers.rs index 4c2bafa764..2ee4db9011 100644 --- a/tests/tests/helpers.rs +++ b/tests/tests/helpers.rs @@ -98,6 +98,13 @@ pub async fn websocket_server() -> SocketAddr { let mut module = RpcModule::new(()); module.register_method("say_hello", |_, _| Ok("hello")).unwrap(); + module + .register_async_method("slow_hello", |_, _| async { + tokio::time::sleep(std::time::Duration::from_secs(1)).await; + Ok("hello") + }) + .unwrap(); + let addr = server.local_addr().unwrap(); server.start(module).unwrap(); diff --git a/tests/tests/integration_tests.rs b/tests/tests/integration_tests.rs index 2cc7570bd1..e8432c45c3 100644 --- a/tests/tests/integration_tests.rs +++ b/tests/tests/integration_tests.rs @@ -360,3 +360,26 @@ async fn ws_server_should_stop_subscription_after_client_drop() { // assert that the server received `SubscriptionClosed` after the client was dropped. assert!(matches!(rx.next().await.unwrap(), SubscriptionClosedError { .. })); } + +#[tokio::test] +async fn ws_batch_works() { + let subscriber = tracing_subscriber::FmtSubscriber::builder() + // all spans/events with a level higher than TRACE (e.g, debug, info, warn, etc.) + // will be written to stdout. + .with_max_level(tracing::Level::TRACE) + // completes the builder. + .finish(); + + tracing::subscriber::set_global_default(subscriber).expect("setting default subscriber failed"); + + let server_addr = websocket_server().await; + let server_url = format!("ws://{}", server_addr); + let client = Arc::new(WsClientBuilder::default().build(&server_url).await.unwrap()); + + let mut batch = Vec::new(); + + batch.push(("say_hello", rpc_params![])); + batch.push(("slow_hello", rpc_params![])); + + let response: Vec = client.batch_request(batch).await.unwrap(); +} diff --git a/ws-server/src/server.rs b/ws-server/src/server.rs index 6a0abf25af..f8bd10549f 100644 --- a/ws-server/src/server.rs +++ b/ws-server/src/server.rs @@ -297,6 +297,7 @@ async fn background_task( } Some(b'[') => { if let Ok(batch) = serde_json::from_slice::>(&data) { + tracing::trace!("batch: {:?}", batch); if !batch.is_empty() { // Batch responses must be sent back as a single message so we read the results from each // request in the batch and read the results off of a new channel, `rx_batch`, and then send the @@ -309,10 +310,12 @@ async fn background_task( { method_executors.add(fut); } + tracing::info!("futures added"); // Closes the receiving half of a channel without dropping it. This prevents any further // messages from being sent on the channel. rx_batch.close(); + tracing::info!("close batch channel"); let results = collect_batch_response(rx_batch).await; if let Err(err) = tx.unbounded_send(results) { tracing::error!("Error sending batch response to the client: {:?}", err) From 0d1c18b57e54b92903113c77533d02d87c8648d3 Mon Sep 17 00:00:00 2001 From: Niklas Adolfsson Date: Wed, 27 Oct 2021 10:31:10 +0200 Subject: [PATCH 2/8] fix ws server wait until batches has completed --- utils/src/server/rpc_module.rs | 6 +-- ws-server/src/server.rs | 69 +++++++++++++++++++--------------- 2 files changed, 41 insertions(+), 34 deletions(-) diff --git a/utils/src/server/rpc_module.rs b/utils/src/server/rpc_module.rs index 6275607094..412c4365ea 100644 --- a/utils/src/server/rpc_module.rs +++ b/utils/src/server/rpc_module.rs @@ -305,15 +305,15 @@ impl Methods { tracing::trace!("[Methods::execute_with_resources] Executing request: {:?}", req); match self.callbacks.get(&*req.method) { Some(callback) => match callback.claim(&req.method, resources) { - Ok(guard) => callback.execute(tx, req, conn_id, Some(guard)), + Ok(guard) => callback.execute(&tx, req, conn_id, Some(guard)), Err(err) => { tracing::error!("[Methods::execute_with_resources] failed to lock resources: {:?}", err); - send_error(req.id, tx, ErrorCode::ServerIsBusy.into()); + send_error(req.id, &tx, ErrorCode::ServerIsBusy.into()); None } }, None => { - send_error(req.id, tx, ErrorCode::MethodNotFound.into()); + send_error(req.id, &tx, ErrorCode::MethodNotFound.into()); None } } diff --git a/ws-server/src/server.rs b/ws-server/src/server.rs index f8bd10549f..b3a5892717 100644 --- a/ws-server/src/server.rs +++ b/ws-server/src/server.rs @@ -264,12 +264,10 @@ async fn background_task( // will be possible. }); - // Buffer for incoming data. - let mut data = Vec::with_capacity(100); let mut method_executors = FutureDriver::default(); while !stop_server.shutdown_requested() { - data.clear(); + let mut data = Vec::new(); if let Err(e) = method_executors.select_with(receiver.receive_data(&mut data)).await { tracing::error!("Could not receive WS data: {:?}; closing connection", e); @@ -296,37 +294,46 @@ async fn background_task( } } Some(b'[') => { - if let Ok(batch) = serde_json::from_slice::>(&data) { - tracing::trace!("batch: {:?}", batch); - if !batch.is_empty() { - // Batch responses must be sent back as a single message so we read the results from each - // request in the batch and read the results off of a new channel, `rx_batch`, and then send the - // complete batch response back to the client over `tx`. - let (tx_batch, mut rx_batch) = mpsc::unbounded::(); - - for fut in batch - .into_iter() - .filter_map(|req| methods.execute_with_resources(&tx_batch, req, conn_id, &resources)) - { - method_executors.add(fut); - } - tracing::info!("futures added"); - - // Closes the receiving half of a channel without dropping it. This prevents any further - // messages from being sent on the channel. - rx_batch.close(); - tracing::info!("close batch channel"); - let results = collect_batch_response(rx_batch).await; - if let Err(err) = tx.unbounded_send(results) { - tracing::error!("Error sending batch response to the client: {:?}", err) + // NOTE(niklasad1): we must move deserialization into the async block below + // because `Request` uses `serde(borrow)` and that buffer is used again + // for the next iteration of the loop. + + let d = std::mem::take(&mut data); + let r = &resources; + let m = &methods; + let tx2 = tx.clone(); + + let fut = async move { + // Batch responses must be sent back as a single message so we read the results from each + // request in the batch and read the results off of a new channel, `rx_batch`, and then send the + // complete batch response back to the client over `tx`. + let (tx_batch, mut rx_batch) = mpsc::unbounded(); + if let Ok(batch) = serde_json::from_slice::>(&d) { + if !batch.is_empty() { + let futs = batch + .into_iter() + .filter_map(|req| m.execute_with_resources(&tx_batch, req, conn_id, &r)); + futures_util::future::join_all(futs).await; + + // All methods has now completed by `join_all` above + // Closes the receiving half of a channel without dropping it. This prevents any further + // messages from being sent on the channel. + rx_batch.close(); + + let results = collect_batch_response(rx_batch).await; + if let Err(err) = tx2.unbounded_send(results) { + tracing::error!("Error sending batch response to the client: {:?}", err) + } + } else { + send_error(Id::Null, &tx2, ErrorCode::InvalidRequest.into()); } } else { - send_error(Id::Null, &tx, ErrorCode::InvalidRequest.into()); + let (id, code) = prepare_error(&data); + send_error(id, &tx2, code.into()); } - } else { - let (id, code) = prepare_error(&data); - send_error(id, &tx, code.into()); - } + }; + + method_executors.add(Box::pin(fut)); } _ => send_error(Id::Null, &tx, ErrorCode::ParseError.into()), } From f3e9c5c23d98ad643467b237ffce6fccd6732078 Mon Sep 17 00:00:00 2001 From: Niklas Adolfsson Date: Wed, 27 Oct 2021 10:37:18 +0200 Subject: [PATCH 3/8] fix nit --- ws-server/src/server.rs | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/ws-server/src/server.rs b/ws-server/src/server.rs index b3a5892717..df5498e549 100644 --- a/ws-server/src/server.rs +++ b/ws-server/src/server.rs @@ -264,10 +264,12 @@ async fn background_task( // will be possible. }); + // Buffer for incoming data. + let mut data = Vec::with_capacity(100); let mut method_executors = FutureDriver::default(); while !stop_server.shutdown_requested() { - let mut data = Vec::new(); + data.clear(); if let Err(e) = method_executors.select_with(receiver.receive_data(&mut data)).await { tracing::error!("Could not receive WS data: {:?}; closing connection", e); @@ -328,7 +330,7 @@ async fn background_task( send_error(Id::Null, &tx2, ErrorCode::InvalidRequest.into()); } } else { - let (id, code) = prepare_error(&data); + let (id, code) = prepare_error(&d); send_error(id, &tx2, code.into()); } }; From 33a54188821fcbf6580f15c84056c2b54e0f2425 Mon Sep 17 00:00:00 2001 From: Niklas Adolfsson Date: Wed, 27 Oct 2021 10:50:45 +0200 Subject: [PATCH 4/8] clippify --- tests/tests/integration_tests.rs | 2 +- utils/src/server/rpc_module.rs | 6 +++--- ws-server/src/server.rs | 2 +- 3 files changed, 5 insertions(+), 5 deletions(-) diff --git a/tests/tests/integration_tests.rs b/tests/tests/integration_tests.rs index e8432c45c3..db02021e36 100644 --- a/tests/tests/integration_tests.rs +++ b/tests/tests/integration_tests.rs @@ -381,5 +381,5 @@ async fn ws_batch_works() { batch.push(("say_hello", rpc_params![])); batch.push(("slow_hello", rpc_params![])); - let response: Vec = client.batch_request(batch).await.unwrap(); + assert!(client.batch_request::(batch).await.is_ok()) } diff --git a/utils/src/server/rpc_module.rs b/utils/src/server/rpc_module.rs index 412c4365ea..6275607094 100644 --- a/utils/src/server/rpc_module.rs +++ b/utils/src/server/rpc_module.rs @@ -305,15 +305,15 @@ impl Methods { tracing::trace!("[Methods::execute_with_resources] Executing request: {:?}", req); match self.callbacks.get(&*req.method) { Some(callback) => match callback.claim(&req.method, resources) { - Ok(guard) => callback.execute(&tx, req, conn_id, Some(guard)), + Ok(guard) => callback.execute(tx, req, conn_id, Some(guard)), Err(err) => { tracing::error!("[Methods::execute_with_resources] failed to lock resources: {:?}", err); - send_error(req.id, &tx, ErrorCode::ServerIsBusy.into()); + send_error(req.id, tx, ErrorCode::ServerIsBusy.into()); None } }, None => { - send_error(req.id, &tx, ErrorCode::MethodNotFound.into()); + send_error(req.id, tx, ErrorCode::MethodNotFound.into()); None } } diff --git a/ws-server/src/server.rs b/ws-server/src/server.rs index df5498e549..df1d0b77d1 100644 --- a/ws-server/src/server.rs +++ b/ws-server/src/server.rs @@ -314,7 +314,7 @@ async fn background_task( if !batch.is_empty() { let futs = batch .into_iter() - .filter_map(|req| m.execute_with_resources(&tx_batch, req, conn_id, &r)); + .filter_map(|req| m.execute_with_resources(&tx_batch, req, conn_id, r)); futures_util::future::join_all(futs).await; // All methods has now completed by `join_all` above From 99347eb93dd77a8535eb012ce35ec633ca6c6234 Mon Sep 17 00:00:00 2001 From: Niklas Adolfsson Date: Wed, 27 Oct 2021 14:50:22 +0200 Subject: [PATCH 5/8] enable benches for ws batch requests --- benches/bench.rs | 6 ++---- 1 file changed, 2 insertions(+), 4 deletions(-) diff --git a/benches/bench.rs b/benches/bench.rs index 901cecd6cb..f69b7905ea 100644 --- a/benches/bench.rs +++ b/benches/bench.rs @@ -21,16 +21,14 @@ criterion_group!( SyncBencher::http_requests, SyncBencher::batched_http_requests, SyncBencher::websocket_requests, - // TODO: https://github.com/paritytech/jsonrpsee/issues/528 - // SyncBencher::batched_ws_requests, + SyncBencher::batched_ws_requests, ); criterion_group!( async_benches, AsyncBencher::http_requests, AsyncBencher::batched_http_requests, AsyncBencher::websocket_requests, - // TODO: https://github.com/paritytech/jsonrpsee/issues/528 - // AsyncBencher::batched_ws_requests + AsyncBencher::batched_ws_requests ); criterion_group!(subscriptions, AsyncBencher::subscriptions); criterion_main!(types_benches, sync_benches, async_benches, subscriptions); From 15bfc8d0c4c2183e65bf10140d3d4f0715652d74 Mon Sep 17 00:00:00 2001 From: Niklas Adolfsson Date: Wed, 27 Oct 2021 14:50:36 +0200 Subject: [PATCH 6/8] use stream instead of futures::join_all --- ws-server/src/server.rs | 34 +++++++++++++++++----------------- 1 file changed, 17 insertions(+), 17 deletions(-) diff --git a/ws-server/src/server.rs b/ws-server/src/server.rs index df1d0b77d1..3ee3d67921 100644 --- a/ws-server/src/server.rs +++ b/ws-server/src/server.rs @@ -36,8 +36,9 @@ use crate::types::{ TEN_MB_SIZE_BYTES, }; use futures_channel::mpsc; +use futures_util::future::FutureExt; use futures_util::io::{BufReader, BufWriter}; -use futures_util::stream::StreamExt; +use futures_util::stream::{self, StreamExt}; use soketto::handshake::{server::Response, Server as SokettoServer}; use tokio::net::{TcpListener, TcpStream, ToSocketAddrs}; use tokio_util::compat::{Compat, TokioAsyncReadCompatExt}; @@ -296,13 +297,10 @@ async fn background_task( } } Some(b'[') => { - // NOTE(niklasad1): we must move deserialization into the async block below - // because `Request` uses `serde(borrow)` and that buffer is used again - // for the next iteration of the loop. - + // Make sure the following variables are not moved into async closure below. let d = std::mem::take(&mut data); - let r = &resources; - let m = &methods; + let resources = &resources; + let methods = &methods; let tx2 = tx.clone(); let fut = async move { @@ -312,17 +310,19 @@ async fn background_task( let (tx_batch, mut rx_batch) = mpsc::unbounded(); if let Ok(batch) = serde_json::from_slice::>(&d) { if !batch.is_empty() { - let futs = batch - .into_iter() - .filter_map(|req| m.execute_with_resources(&tx_batch, req, conn_id, r)); - futures_util::future::join_all(futs).await; - - // All methods has now completed by `join_all` above - // Closes the receiving half of a channel without dropping it. This prevents any further - // messages from being sent on the channel. - rx_batch.close(); + let methods_stream = + stream::iter(batch.into_iter().filter_map(|req| { + methods.execute_with_resources(&tx_batch, req, conn_id, &resources) + })); + + let results = methods_stream + .for_each_concurrent(None, |item| item) + .then(|_| { + rx_batch.close(); + collect_batch_response(rx_batch) + }) + .await; - let results = collect_batch_response(rx_batch).await; if let Err(err) = tx2.unbounded_send(results) { tracing::error!("Error sending batch response to the client: {:?}", err) } From 43cb4ececde5b6e30382ea1e0c3ad94e374cf8f0 Mon Sep 17 00:00:00 2001 From: Niklas Adolfsson Date: Wed, 27 Oct 2021 15:22:16 +0200 Subject: [PATCH 7/8] clippify --- ws-server/src/server.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/ws-server/src/server.rs b/ws-server/src/server.rs index 3ee3d67921..6fccc7b8a3 100644 --- a/ws-server/src/server.rs +++ b/ws-server/src/server.rs @@ -312,7 +312,7 @@ async fn background_task( if !batch.is_empty() { let methods_stream = stream::iter(batch.into_iter().filter_map(|req| { - methods.execute_with_resources(&tx_batch, req, conn_id, &resources) + methods.execute_with_resources(&tx_batch, req, conn_id, resources) })); let results = methods_stream From 75f5415de400dd93314bdcbc40e45dbf7b600edb Mon Sep 17 00:00:00 2001 From: Niklas Adolfsson Date: Mon, 1 Nov 2021 12:00:57 +0100 Subject: [PATCH 8/8] address grumbles: better assert --- tests/Cargo.toml | 3 +-- tests/tests/integration_tests.rs | 14 +++----------- 2 files changed, 4 insertions(+), 13 deletions(-) diff --git a/tests/Cargo.toml b/tests/Cargo.toml index 4494fb754e..1a450dedf2 100644 --- a/tests/Cargo.toml +++ b/tests/Cargo.toml @@ -9,10 +9,9 @@ publish = false [dev-dependencies] env_logger = "0.8" -tracing-subscriber = "0.3.1" -tracing = "0.1" beef = { version = "0.5.1", features = ["impl_serde"] } futures = { version = "0.3.14", default-features = false, features = ["std"] } jsonrpsee = { path = "../jsonrpsee", features = ["full"] } tokio = { version = "1", features = ["full"] } +tracing = "0.1" serde_json = "1" diff --git a/tests/tests/integration_tests.rs b/tests/tests/integration_tests.rs index db02021e36..118c270a81 100644 --- a/tests/tests/integration_tests.rs +++ b/tests/tests/integration_tests.rs @@ -363,23 +363,15 @@ async fn ws_server_should_stop_subscription_after_client_drop() { #[tokio::test] async fn ws_batch_works() { - let subscriber = tracing_subscriber::FmtSubscriber::builder() - // all spans/events with a level higher than TRACE (e.g, debug, info, warn, etc.) - // will be written to stdout. - .with_max_level(tracing::Level::TRACE) - // completes the builder. - .finish(); - - tracing::subscriber::set_global_default(subscriber).expect("setting default subscriber failed"); - let server_addr = websocket_server().await; let server_url = format!("ws://{}", server_addr); - let client = Arc::new(WsClientBuilder::default().build(&server_url).await.unwrap()); + let client = WsClientBuilder::default().build(&server_url).await.unwrap(); let mut batch = Vec::new(); batch.push(("say_hello", rpc_params![])); batch.push(("slow_hello", rpc_params![])); - assert!(client.batch_request::(batch).await.is_ok()) + let responses: Vec = client.batch_request(batch).await.unwrap(); + assert_eq!(responses, vec!["hello".to_string(), "hello".to_string()]); }