From f4daba315287c2cce8e6b6d24180b3c6e2a9142a Mon Sep 17 00:00:00 2001 From: Niklas Adolfsson Date: Thu, 6 Apr 2023 17:55:08 +0200 Subject: [PATCH 01/11] replace FutureDriver with mpsc and tokio::task --- core/src/server/helpers.rs | 10 +- server/src/future.rs | 102 +-------------- server/src/transport/ws.rs | 260 +++++++++++++++++++++---------------- 3 files changed, 157 insertions(+), 215 deletions(-) diff --git a/core/src/server/helpers.rs b/core/src/server/helpers.rs index 7bf723562d..9c900c9049 100644 --- a/core/src/server/helpers.rs +++ b/core/src/server/helpers.rs @@ -32,7 +32,7 @@ use crate::Error; use jsonrpsee_types::error::{ErrorCode, ErrorObject, ErrorResponse, OVERSIZED_RESPONSE_CODE, OVERSIZED_RESPONSE_MSG}; use jsonrpsee_types::{Id, InvalidRequest, Response}; use serde::Serialize; -use tokio::sync::mpsc::{self, Permit}; +use tokio::sync::mpsc::{self, OwnedPermit}; use super::{DisconnectError, SendTimeoutError, SubscriptionMessage, TrySendError}; @@ -145,7 +145,7 @@ impl MethodSink { /// Waits for channel capacity. Once capacity to send one message is available, it is reserved for the caller. pub async fn reserve(&self) -> Result { - match self.tx.reserve().await { + match self.tx.clone().reserve_owned().await { Ok(permit) => Ok(MethodSinkPermit { tx: permit, max_log_length: self.max_log_length }), Err(_) => Err(DisconnectError(SubscriptionMessage::empty())), } @@ -154,12 +154,12 @@ impl MethodSink { /// A method sink with reserved spot in the bounded queue. #[derive(Debug)] -pub struct MethodSinkPermit<'a> { - tx: Permit<'a, String>, +pub struct MethodSinkPermit { + tx: OwnedPermit, max_log_length: u32, } -impl<'a> MethodSinkPermit<'a> { +impl MethodSinkPermit { /// Send a JSON-RPC error to the client pub fn send_error(self, id: Id, error: ErrorObject) { let json = serde_json::to_string(&ErrorResponse::borrowed(error, id)).expect("valid JSON; qed"); diff --git a/server/src/future.rs b/server/src/future.rs index 515331f205..beb69b0f19 100644 --- a/server/src/future.rs +++ b/server/src/future.rs @@ -26,110 +26,10 @@ //! Utilities for handling async code. -use std::future::Future; -use std::pin::Pin; -use std::sync::Arc; -use std::task::{Context, Poll}; - -use futures_util::future::FutureExt; use jsonrpsee_core::Error; +use std::sync::Arc; use tokio::sync::{watch, OwnedSemaphorePermit, Semaphore, TryAcquireError}; -/// This is a flexible collection of futures that need to be driven to completion -/// alongside some other future, such as connection handlers that need to be -/// handled along with a listener for new connections. -/// -/// In order to `.await` on these futures and drive them to completion, call -/// `select_with` providing some other future, the result of which you need. -pub(crate) struct FutureDriver { - futures: Vec, -} - -impl Default for FutureDriver { - fn default() -> Self { - FutureDriver { futures: Vec::new() } - } -} - -impl FutureDriver { - /// Add a new future to this driver - pub(crate) fn add(&mut self, future: F) { - self.futures.push(future); - } -} - -impl FutureDriver -where - F: Future + Unpin, -{ - pub(crate) async fn select_with(&mut self, selector: S) -> S::Output { - tokio::pin!(selector); - - DriverSelect { selector, driver: self }.await - } - - fn drive(&mut self, cx: &mut Context) { - let mut i = 0; - - while i < self.futures.len() { - if self.futures[i].poll_unpin(cx).is_ready() { - // Using `swap_remove` since we don't care about ordering - // but we do care about removing being `O(1)`. - // - // We don't increment `i` in this branch, since we now - // have a shorter length, and potentially a new value at - // current index - self.futures.swap_remove(i); - } else { - i += 1; - } - } - } -} - -impl Future for FutureDriver -where - F: Future + Unpin, -{ - type Output = (); - - fn poll(self: Pin<&mut Self>, cx: &mut Context) -> Poll { - let this = Pin::into_inner(self); - - this.drive(cx); - - if this.futures.is_empty() { - Poll::Ready(()) - } else { - Poll::Pending - } - } -} - -/// This is a glorified select `Future` that will attempt to drive all -/// connection futures `F` to completion on each `poll`, while also -/// handling incoming connections. -struct DriverSelect<'a, S, F> { - selector: S, - driver: &'a mut FutureDriver, -} - -impl<'a, R, F> Future for DriverSelect<'a, R, F> -where - R: Future + Unpin, - F: Future + Unpin, -{ - type Output = R::Output; - - fn poll(self: Pin<&mut Self>, cx: &mut Context) -> Poll { - let this = Pin::into_inner(self); - - this.driver.drive(cx); - - this.selector.poll_unpin(cx) - } -} - /// Represent a stop handle which is a wrapper over a `multi-consumer receiver` /// and cloning [`StopHandle`] will get a separate instance of the underlying receiver. #[derive(Debug, Clone)] diff --git a/server/src/transport/ws.rs b/server/src/transport/ws.rs index 2d3110c027..f9b73f1ebb 100644 --- a/server/src/transport/ws.rs +++ b/server/src/transport/ws.rs @@ -1,13 +1,14 @@ +use std::sync::Arc; use std::time::Duration; -use crate::future::{FutureDriver, StopHandle}; +use crate::future::StopHandle; use crate::logger::{self, Logger, TransportProtocol}; use crate::server::ServiceData; use futures_util::future::{self, Either}; use futures_util::io::{BufReader, BufWriter}; -use futures_util::stream::FuturesOrdered; -use futures_util::{Future, FutureExt, StreamExt}; +use futures_util::stream::{FuturesOrdered, FuturesUnordered}; +use futures_util::{Future, StreamExt}; use hyper::upgrade::Upgraded; use jsonrpsee_core::server::helpers::{ batch_response_error, prepare_error, BatchResponseBuilder, MethodResponse, MethodSink, @@ -25,6 +26,7 @@ use jsonrpsee_types::error::{ use jsonrpsee_types::{ErrorObject, Id, InvalidRequest, Notification, Params, Request}; use soketto::connection::Error as SokettoError; use soketto::data::ByteSlice125; +use tokio::sync::mpsc::UnboundedReceiver; use tokio::sync::{mpsc, oneshot}; use tokio_stream::wrappers::{IntervalStream, ReceiverStream}; use tokio_util::compat::Compat; @@ -243,6 +245,7 @@ pub(crate) async fn background_task( .. } = svc; + let (method_executor, method_consumer) = mpsc::unbounded_channel::>(); let (tx, rx) = mpsc::channel::(message_buffer_capacity as usize); let (conn_tx, conn_rx) = oneshot::channel(); let sink = MethodSink::new_with_limit(tx, max_response_body_size, max_log_length); @@ -251,10 +254,20 @@ pub(crate) async fn background_task( // Spawn another task that sends out the responses on the Websocket. tokio::spawn(send_task(rx, sender, stop_handle.clone(), ping_interval, conn_rx)); + let params = ExecuteParams { + conn_id, + methods, + max_log_length, + max_response_body_size, + bounded_subscriptions, + sink: sink.clone(), + id_provider, + logger: logger.clone(), + }; + tokio::spawn(method_executor_task(params, method_consumer)); + // Buffer for incoming data. let mut data = Vec::with_capacity(100); - let mut method_executor = FutureDriver::default(); - let logger = &logger; let stopped = stop_handle.shutdown(); tokio::pin!(stopped); @@ -262,15 +275,15 @@ pub(crate) async fn background_task( let result = loop { data.clear(); - let sink_permit = match wait_for_permit(&sink, &mut method_executor, stopped).await { - Some((p, s)) => { - stopped = s; - p + let sink_permit = match wait_for_reserve(&sink, stopped).await { + Some((permit, stop)) => { + stopped = stop; + permit } None => break Ok(()), }; - match try_recv(&mut receiver, &mut data, &mut method_executor, stopped).await { + match try_recv(&mut receiver, &mut data, stopped).await { Receive::Shutdown => break Ok(()), Receive::Ok(stop) => { stopped = stop; @@ -307,40 +320,9 @@ pub(crate) async fn background_task( match first_non_whitespace { Some(b'{') => { let data = std::mem::take(&mut data); - let sink = sink.clone(); - let methods = &methods; - let id_provider = &*id_provider; - let bounded_subscriptions = bounded_subscriptions.clone(); - - let fut = async move { - let call = CallData { - conn_id: conn_id as usize, - bounded_subscriptions, - max_response_body_size, - max_log_length, - methods, - sink: &sink, - id_provider, - logger, - request_start, - }; - - if let Some(rp) = process_single_request(data, call).await { - match rp { - CallOrSubscription::Subscription(r) => { - logger.on_response(&r.result, request_start, TransportProtocol::WebSocket); - } - - CallOrSubscription::Call(r) => { - logger.on_response(&r.result, request_start, TransportProtocol::WebSocket); - sink_permit.send_raw(r.result); - } - } - }; + if method_executor.send(MethodOp::Call { data, sink_permit, start: request_start }).is_err() { + break Ok(()); } - .boxed(); - - method_executor.add(fut); } Some(b'[') if !batch_requests_supported => { let response = MethodResponse::error( @@ -352,37 +334,10 @@ pub(crate) async fn background_task( } Some(b'[') => { // Make sure the following variables are not moved into async closure below. - let methods = &methods; - let sink = sink.clone(); - let id_provider = id_provider.clone(); let data = std::mem::take(&mut data); - let bounded_subscriptions = bounded_subscriptions.clone(); - - let fut = async move { - let response = process_batch_request(Batch { - data, - call: CallData { - conn_id: conn_id as usize, - bounded_subscriptions, - max_response_body_size, - max_log_length, - methods, - sink: &sink, - id_provider: &*id_provider, - logger, - request_start, - }, - }) - .await; - - if let Some(response) = response { - tx_log_from_str(&response, max_log_length); - logger.on_response(&response, request_start, TransportProtocol::WebSocket); - sink_permit.send_raw(response); - } - }; - - method_executor.add(Box::pin(fut)); + if method_executor.send(MethodOp::Batch { data, sink_permit, start: request_start }).is_err() { + break Ok(()); + } } _ => { sink_permit.send_error(Id::Null, ErrorCode::ParseError.into()); @@ -392,14 +347,8 @@ pub(crate) async fn background_task( logger.on_disconnect(remote_addr, TransportProtocol::WebSocket); - // Drive all running methods to completion. - // **NOTE** Do not return early in this function. This `await` needs to run to guarantee - // proper drop behaviour. - method_executor.await; - let _ = conn_tx.send(()); drop(conn); - result } @@ -472,14 +421,21 @@ enum Receive { Ok(S), } -async fn try_recv( - receiver: &mut Receiver, - data: &mut Vec, - method_executor: &mut FutureDriver, - stopped: S, -) -> Receive +async fn wait_for_reserve(sink: &MethodSink, stopped: S) -> Option<(MethodSinkPermit, S)> +where + S: Future + Unpin, +{ + let reserve = sink.reserve(); + tokio::pin!(reserve); + + match futures_util::future::select(reserve, stopped).await { + Either::Left((Ok(sink), s)) => Some((sink, s)), + _ => None, + } +} + +async fn try_recv(receiver: &mut Receiver, data: &mut Vec, stopped: S) -> Receive where - F: Future + Unpin, S: Future + Unpin, { let receive = async { @@ -497,7 +453,6 @@ where } }; - let receive = method_executor.select_with(receive); tokio::pin!(receive); match futures_util::future::select(receive, stopped).await { @@ -507,28 +462,115 @@ where } } -// Wait until there is a slot in the bounded channel which means that -// the underlying TCP socket won't be read. -// -// This will force the client to read socket on the other side -// otherwise the socket will not be read again. -// -// Fails if the connection was closed or if the server was stopped, -async fn wait_for_permit<'a, F, S>( - sink: &'a MethodSink, - method_executor: &mut FutureDriver, - stopped: S, -) -> Option<(MethodSinkPermit<'a>, S)> -where - F: Future + Unpin, - S: Future + Unpin, -{ - let sink_permit_fut = method_executor.select_with(sink.reserve()); - tokio::pin!(sink_permit_fut); +enum MethodOp { + Call { data: Vec, sink_permit: MethodSinkPermit, start: L::Instant }, + Batch { data: Vec, sink_permit: MethodSinkPermit, start: L::Instant }, +} - match futures_util::future::select(sink_permit_fut, stopped).await { - Either::Left((Ok(permit), s)) => Some((permit, s)), - // The sink or stopped were triggered, just terminate. - _ => None, +#[derive(Clone)] +struct ExecuteParams { + conn_id: u32, + bounded_subscriptions: BoundedSubscriptions, + id_provider: Arc, + methods: Methods, + max_response_body_size: u32, + max_log_length: u32, + sink: MethodSink, + logger: L, +} + +async fn method_executor_task(params: ExecuteParams, mut next_task: UnboundedReceiver>) { + let mut tasks = FuturesUnordered::new(); + + loop { + // This is way to not poll `FuturesUnordered` when it's empty to waste CPU cycles. + if tasks.is_empty() { + let Some(command) = next_task.recv().await else { + return; + }; + + tasks.push(execute_command(command, params.clone())); + } else { + tokio::select! { + _ = tasks.next() => {}, + task = next_task.recv() => { + let Some(command) = task else { + return; + }; + + tasks.push(execute_command(command, params.clone())); + } + } + } + } +} + +async fn execute_command(command: MethodOp, params: ExecuteParams) { + let ExecuteParams { + conn_id, + sink, + max_response_body_size, + max_log_length, + methods, + id_provider, + bounded_subscriptions, + logger, + } = params; + + match command { + MethodOp::Call { data, sink_permit, start } => { + let call = CallData { + conn_id: conn_id as usize, + bounded_subscriptions, + max_response_body_size, + max_log_length, + methods: &methods, + sink: &sink, + id_provider: &*id_provider, + logger: &logger, + request_start: start, + }; + + if let Some(rp) = process_single_request(data, call).await { + match rp { + CallOrSubscription::Subscription(r) => { + logger.on_response(&r.result, start, TransportProtocol::WebSocket); + } + + CallOrSubscription::Call(r) => { + logger.on_response(&r.result, start, TransportProtocol::WebSocket); + sink_permit.send_raw(r.result); + } + } + } + } + MethodOp::Batch { data, sink_permit, start } => { + let methods = methods.clone(); + let logger = logger.clone(); + let id_provider = id_provider.clone(); + let bounded_subscriptions = bounded_subscriptions.clone(); + + let response = process_batch_request(Batch { + data, + call: CallData { + conn_id: conn_id as usize, + bounded_subscriptions, + max_response_body_size, + max_log_length, + methods: &methods, + sink: &sink, + id_provider: &*id_provider, + logger: &logger, + request_start: start, + }, + }) + .await; + + if let Some(response) = response { + tx_log_from_str(&response, max_log_length); + logger.on_response(&response, start, TransportProtocol::WebSocket); + sink_permit.send_raw(response); + } + } } } From bcde3fd31e751b6010b0bc12547210452779d0bf Mon Sep 17 00:00:00 2001 From: Niklas Adolfsson Date: Fri, 7 Apr 2023 10:30:39 +0200 Subject: [PATCH 02/11] tokio spawn for calls --- server/src/transport/ws.rs | 43 +++++++++++++++++++++----------------- 1 file changed, 24 insertions(+), 19 deletions(-) diff --git a/server/src/transport/ws.rs b/server/src/transport/ws.rs index f9b73f1ebb..9e429d1add 100644 --- a/server/src/transport/ws.rs +++ b/server/src/transport/ws.rs @@ -245,7 +245,6 @@ pub(crate) async fn background_task( .. } = svc; - let (method_executor, method_consumer) = mpsc::unbounded_channel::>(); let (tx, rx) = mpsc::channel::(message_buffer_capacity as usize); let (conn_tx, conn_rx) = oneshot::channel(); let sink = MethodSink::new_with_limit(tx, max_response_body_size, max_log_length); @@ -254,18 +253,6 @@ pub(crate) async fn background_task( // Spawn another task that sends out the responses on the Websocket. tokio::spawn(send_task(rx, sender, stop_handle.clone(), ping_interval, conn_rx)); - let params = ExecuteParams { - conn_id, - methods, - max_log_length, - max_response_body_size, - bounded_subscriptions, - sink: sink.clone(), - id_provider, - logger: logger.clone(), - }; - tokio::spawn(method_executor_task(params, method_consumer)); - // Buffer for incoming data. let mut data = Vec::with_capacity(100); let stopped = stop_handle.shutdown(); @@ -320,9 +307,18 @@ pub(crate) async fn background_task( match first_non_whitespace { Some(b'{') => { let data = std::mem::take(&mut data); - if method_executor.send(MethodOp::Call { data, sink_permit, start: request_start }).is_err() { - break Ok(()); - } + let cmd = MethodOp::Call { data, sink_permit, start: request_start }; + let params = ExecuteParams { + conn_id, + methods: methods.clone(), + max_log_length, + max_response_body_size, + bounded_subscriptions: bounded_subscriptions.clone(), + sink: sink.clone(), + id_provider: id_provider.clone(), + logger: logger.clone(), + }; + tokio::spawn(execute_command(cmd, params)); } Some(b'[') if !batch_requests_supported => { let response = MethodResponse::error( @@ -335,9 +331,18 @@ pub(crate) async fn background_task( Some(b'[') => { // Make sure the following variables are not moved into async closure below. let data = std::mem::take(&mut data); - if method_executor.send(MethodOp::Batch { data, sink_permit, start: request_start }).is_err() { - break Ok(()); - } + let cmd = MethodOp::Batch { data, sink_permit, start: request_start }; + let params = ExecuteParams { + conn_id, + methods: methods.clone(), + max_log_length, + max_response_body_size, + bounded_subscriptions: bounded_subscriptions.clone(), + sink: sink.clone(), + id_provider: id_provider.clone(), + logger: logger.clone(), + }; + tokio::spawn(execute_command(cmd, params)); } _ => { sink_permit.send_error(Id::Null, ErrorCode::ParseError.into()); From bb8f6258f4752a54e0563de16edbe0cd964edc1c Mon Sep 17 00:00:00 2001 From: Niklas Adolfsson Date: Tue, 11 Apr 2023 11:03:07 +0200 Subject: [PATCH 03/11] refactor round trip for multiple calls --- benches/bench.rs | 16 +++++++++++++--- 1 file changed, 13 insertions(+), 3 deletions(-) diff --git a/benches/bench.rs b/benches/bench.rs index 8aa3b15169..f95767ab93 100644 --- a/benches/bench.rs +++ b/benches/bench.rs @@ -235,11 +235,21 @@ impl RequestBencher for AsyncBencher { } fn round_trip(rt: &TokioRuntime, crit: &mut Criterion, client: Arc, name: &str, request: RequestType) { - for method in request.methods() { - let bench_name = format!("{}/{}", name, method); + for &size in [3, 9, 27, 81, 243, 729].iter() { + let bench_name = format!("{name}/{size}"); + crit.bench_function(&request.group_name(&bench_name), |b| { b.to_async(rt).iter(|| async { - black_box(client.request::(method, None).await.unwrap()); + let futs = FuturesUnordered::new(); + + // Make `n` concurrent calls by 1/3 slow calls, 1/3 fast calls and 1/3 memory intense calls. + for _ in 0..(size / 3) { + for method in request.methods() { + futs.push(client.request::(method, None)); + } + } + + join_all(futs).await; }) }); } From 6b1b3fec545556dd5563ed8348f9d070e0a36357 Mon Sep 17 00:00:00 2001 From: Niklas Adolfsson Date: Tue, 11 Apr 2023 14:44:40 +0200 Subject: [PATCH 04/11] cleanup --- server/src/transport/ws.rs | 123 +++++++++++++------------------------ 1 file changed, 44 insertions(+), 79 deletions(-) diff --git a/server/src/transport/ws.rs b/server/src/transport/ws.rs index 9e429d1add..73e614a1ae 100644 --- a/server/src/transport/ws.rs +++ b/server/src/transport/ws.rs @@ -7,7 +7,7 @@ use crate::server::ServiceData; use futures_util::future::{self, Either}; use futures_util::io::{BufReader, BufWriter}; -use futures_util::stream::{FuturesOrdered, FuturesUnordered}; +use futures_util::stream::FuturesOrdered; use futures_util::{Future, StreamExt}; use hyper::upgrade::Upgraded; use jsonrpsee_core::server::helpers::{ @@ -26,7 +26,7 @@ use jsonrpsee_types::error::{ use jsonrpsee_types::{ErrorObject, Id, InvalidRequest, Notification, Params, Request}; use soketto::connection::Error as SokettoError; use soketto::data::ByteSlice125; -use tokio::sync::mpsc::UnboundedReceiver; + use tokio::sync::{mpsc, oneshot}; use tokio_stream::wrappers::{IntervalStream, ReceiverStream}; use tokio_util::compat::Compat; @@ -306,19 +306,21 @@ pub(crate) async fn background_task( match first_non_whitespace { Some(b'{') => { - let data = std::mem::take(&mut data); - let cmd = MethodOp::Call { data, sink_permit, start: request_start }; let params = ExecuteParams { conn_id, methods: methods.clone(), max_log_length, max_response_body_size, bounded_subscriptions: bounded_subscriptions.clone(), + request_start, sink: sink.clone(), + sink_permit, id_provider: id_provider.clone(), logger: logger.clone(), + data: std::mem::take(&mut data), + command: MethodOp::Call, }; - tokio::spawn(execute_command(cmd, params)); + tokio::spawn(execute_command(params)); } Some(b'[') if !batch_requests_supported => { let response = MethodResponse::error( @@ -329,20 +331,21 @@ pub(crate) async fn background_task( sink_permit.send_raw(response.result); } Some(b'[') => { - // Make sure the following variables are not moved into async closure below. - let data = std::mem::take(&mut data); - let cmd = MethodOp::Batch { data, sink_permit, start: request_start }; let params = ExecuteParams { conn_id, methods: methods.clone(), max_log_length, max_response_body_size, bounded_subscriptions: bounded_subscriptions.clone(), + request_start, sink: sink.clone(), + sink_permit, id_provider: id_provider.clone(), logger: logger.clone(), + data: std::mem::take(&mut data), + command: MethodOp::Batch, }; - tokio::spawn(execute_command(cmd, params)); + tokio::spawn(execute_command(params)); } _ => { sink_permit.send_error(Id::Null, ErrorCode::ParseError.into()); @@ -467,113 +470,75 @@ where } } -enum MethodOp { - Call { data: Vec, sink_permit: MethodSinkPermit, start: L::Instant }, - Batch { data: Vec, sink_permit: MethodSinkPermit, start: L::Instant }, +enum MethodOp { + Call, + Batch, } -#[derive(Clone)] struct ExecuteParams { - conn_id: u32, bounded_subscriptions: BoundedSubscriptions, + command: MethodOp, + conn_id: u32, + data: Vec, id_provider: Arc, methods: Methods, max_response_body_size: u32, max_log_length: u32, sink: MethodSink, + sink_permit: MethodSinkPermit, logger: L, + request_start: L::Instant, } -async fn method_executor_task(params: ExecuteParams, mut next_task: UnboundedReceiver>) { - let mut tasks = FuturesUnordered::new(); - - loop { - // This is way to not poll `FuturesUnordered` when it's empty to waste CPU cycles. - if tasks.is_empty() { - let Some(command) = next_task.recv().await else { - return; - }; - - tasks.push(execute_command(command, params.clone())); - } else { - tokio::select! { - _ = tasks.next() => {}, - task = next_task.recv() => { - let Some(command) = task else { - return; - }; - - tasks.push(execute_command(command, params.clone())); - } - } - } - } -} - -async fn execute_command(command: MethodOp, params: ExecuteParams) { +async fn execute_command(params: ExecuteParams) { let ExecuteParams { conn_id, + command, + data, sink, + sink_permit, max_response_body_size, max_log_length, methods, id_provider, bounded_subscriptions, logger, + request_start, } = params; + let call_data = CallData { + conn_id: conn_id as usize, + bounded_subscriptions, + max_response_body_size, + max_log_length, + methods: &methods, + sink: &sink, + id_provider: &*id_provider, + logger: &logger, + request_start, + }; + match command { - MethodOp::Call { data, sink_permit, start } => { - let call = CallData { - conn_id: conn_id as usize, - bounded_subscriptions, - max_response_body_size, - max_log_length, - methods: &methods, - sink: &sink, - id_provider: &*id_provider, - logger: &logger, - request_start: start, - }; - - if let Some(rp) = process_single_request(data, call).await { + MethodOp::Call => { + if let Some(rp) = process_single_request(data, call_data).await { match rp { CallOrSubscription::Subscription(r) => { - logger.on_response(&r.result, start, TransportProtocol::WebSocket); + logger.on_response(&r.result, request_start, TransportProtocol::WebSocket); } CallOrSubscription::Call(r) => { - logger.on_response(&r.result, start, TransportProtocol::WebSocket); + logger.on_response(&r.result, request_start, TransportProtocol::WebSocket); sink_permit.send_raw(r.result); } } } } - MethodOp::Batch { data, sink_permit, start } => { - let methods = methods.clone(); - let logger = logger.clone(); - let id_provider = id_provider.clone(); - let bounded_subscriptions = bounded_subscriptions.clone(); - - let response = process_batch_request(Batch { - data, - call: CallData { - conn_id: conn_id as usize, - bounded_subscriptions, - max_response_body_size, - max_log_length, - methods: &methods, - sink: &sink, - id_provider: &*id_provider, - logger: &logger, - request_start: start, - }, - }) - .await; + MethodOp::Batch => { + let response = process_batch_request(Batch { data, call: call_data }).await; if let Some(response) = response { tx_log_from_str(&response, max_log_length); - logger.on_response(&response, start, TransportProtocol::WebSocket); + logger.on_response(&response, request_start, TransportProtocol::WebSocket); sink_permit.send_raw(response); } } From ee2b708611584924ee17884398fecbb147c1ea3f Mon Sep 17 00:00:00 2001 From: Niklas Adolfsson Date: Fri, 14 Apr 2023 13:54:07 +0200 Subject: [PATCH 05/11] cleanup --- server/src/transport/ws.rs | 161 +++++++++++++++++-------------------- 1 file changed, 76 insertions(+), 85 deletions(-) diff --git a/server/src/transport/ws.rs b/server/src/transport/ws.rs index eb4615462d..65ba14df2e 100644 --- a/server/src/transport/ws.rs +++ b/server/src/transport/ws.rs @@ -7,7 +7,7 @@ use crate::server::{BatchRequestConfig, ServiceData}; use futures_util::future::{self, Either}; use futures_util::io::{BufReader, BufWriter}; -use futures_util::stream::FuturesOrdered; +use futures_util::stream::{FuturesOrdered, FuturesUnordered}; use futures_util::{Future, StreamExt}; use hyper::upgrade::Upgraded; use jsonrpsee_core::server::helpers::{ @@ -254,6 +254,7 @@ pub(crate) async fn background_task( let (conn_tx, conn_rx) = oneshot::channel(); let sink = MethodSink::new_with_limit(tx, max_response_body_size, max_log_length); let bounded_subscriptions = BoundedSubscriptions::new(max_subscriptions_per_connection); + let mut pending_calls = FuturesUnordered::new(); // Spawn another task that sends out the responses on the Websocket. tokio::spawn(send_task(rx, sender, stop_handle.clone(), ping_interval, conn_rx)); @@ -304,67 +305,32 @@ pub(crate) async fn background_task( } }; } - } + }; - let request_start = logger.on_request(TransportProtocol::WebSocket); - let first_non_whitespace = data.iter().find(|byte| !byte.is_ascii_whitespace()); - - match first_non_whitespace { - Some(b'{') => { - let params = ExecuteParams { - conn_id, - methods: methods.clone(), - max_log_length, - max_response_body_size, - bounded_subscriptions: bounded_subscriptions.clone(), - request_start, - sink: sink.clone(), - sink_permit, - id_provider: id_provider.clone(), - logger: logger.clone(), - data: std::mem::take(&mut data), - command: MethodOp::Call, - }; - tokio::spawn(execute_command(params)); - } - Some(b'[') => { - let limit = match batch_requests_config { - BatchRequestConfig::Disabled => { - let response = MethodResponse::error( - Id::Null, - ErrorObject::borrowed(BATCHES_NOT_SUPPORTED_CODE, &BATCHES_NOT_SUPPORTED_MSG, None), - ); - logger.on_response(&response.result, request_start, TransportProtocol::WebSocket); - sink_permit.send_raw(response.result); - continue; - } - BatchRequestConfig::Limit(limit) => limit as usize, - BatchRequestConfig::Unlimited => usize::MAX, - }; - let params = ExecuteParams { - conn_id, - methods: methods.clone(), - max_log_length, - max_response_body_size, - bounded_subscriptions: bounded_subscriptions.clone(), - request_start, - sink: sink.clone(), - sink_permit, - id_provider: id_provider.clone(), - logger: logger.clone(), - data: std::mem::take(&mut data), - command: MethodOp::Batch(limit), - }; - tokio::spawn(execute_command(params)); - } - _ => { - sink_permit.send_error(Id::Null, ErrorCode::ParseError.into()); - } - } + let params = ExecuteCallParams { + batch_requests_config, + bounded_subscriptions: bounded_subscriptions.clone(), + conn_id, + methods: methods.clone(), + max_log_length, + max_response_body_size, + sink: sink.clone(), + sink_permit, + id_provider: id_provider.clone(), + logger: logger.clone(), + data: std::mem::take(&mut data), + }; + + pending_calls.push(tokio::spawn(execute_unchecked_call(params))); }; logger.on_disconnect(remote_addr, TransportProtocol::WebSocket); + // Drive all running methods to completion. + // **NOTE** Do not return early in this function. This `await` needs to run to guarantee + // proper drop behaviour. + while pending_calls.next().await.is_some() {} + let _ = conn_tx.send(()); drop(conn); result @@ -480,14 +446,9 @@ where } } -enum MethodOp { - Call, - Batch(usize), -} - -struct ExecuteParams { +struct ExecuteCallParams { + batch_requests_config: BatchRequestConfig, bounded_subscriptions: BoundedSubscriptions, - command: MethodOp, conn_id: u32, data: Vec, id_provider: Arc, @@ -497,13 +458,12 @@ struct ExecuteParams { sink: MethodSink, sink_permit: MethodSinkPermit, logger: L, - request_start: L::Instant, } -async fn execute_command(params: ExecuteParams) { - let ExecuteParams { +async fn execute_unchecked_call(params: ExecuteCallParams) { + let ExecuteCallParams { + batch_requests_config, conn_id, - command, data, sink, sink_permit, @@ -513,23 +473,25 @@ async fn execute_command(params: ExecuteParams) { id_provider, bounded_subscriptions, logger, - request_start, } = params; - let call_data = CallData { - conn_id: conn_id as usize, - bounded_subscriptions, - max_response_body_size, - max_log_length, - methods: &methods, - sink: &sink, - id_provider: &*id_provider, - logger: &logger, - request_start, - }; + let request_start = logger.on_request(TransportProtocol::WebSocket); + let first_non_whitespace = data.iter().find(|byte| !byte.is_ascii_whitespace()); + + match first_non_whitespace { + Some(b'{') => { + let call_data = CallData { + conn_id: conn_id as usize, + bounded_subscriptions, + max_response_body_size, + max_log_length, + methods: &methods, + sink: &sink, + id_provider: &*id_provider, + logger: &logger, + request_start, + }; - match command { - MethodOp::Call => { if let Some(rp) = process_single_request(data, call_data).await { match rp { CallOrSubscription::Subscription(r) => { @@ -543,8 +505,34 @@ async fn execute_command(params: ExecuteParams) { } } } - MethodOp::Batch(max_len) => { - let response = process_batch_request(Batch { data, call: call_data, max_len }).await; + Some(b'[') => { + let limit = match batch_requests_config { + BatchRequestConfig::Disabled => { + let response = MethodResponse::error( + Id::Null, + ErrorObject::borrowed(BATCHES_NOT_SUPPORTED_CODE, &BATCHES_NOT_SUPPORTED_MSG, None), + ); + logger.on_response(&response.result, request_start, TransportProtocol::WebSocket); + sink_permit.send_raw(response.result); + return; + } + BatchRequestConfig::Limit(limit) => limit as usize, + BatchRequestConfig::Unlimited => usize::MAX, + }; + + let call_data = CallData { + conn_id: conn_id as usize, + bounded_subscriptions, + max_response_body_size, + max_log_length, + methods: &methods, + sink: &sink, + id_provider: &*id_provider, + logger: &logger, + request_start, + }; + + let response = process_batch_request(Batch { data, call: call_data, max_len: limit }).await; if let Some(response) = response { tx_log_from_str(&response, max_log_length); @@ -552,5 +540,8 @@ async fn execute_command(params: ExecuteParams) { sink_permit.send_raw(response); } } - } + _ => { + sink_permit.send_error(Id::Null, ErrorCode::ParseError.into()); + } + }; } From 05cb7894567eab0d294e09b2dfd21559e17daa6b Mon Sep 17 00:00:00 2001 From: Niklas Adolfsson Date: Fri, 14 Apr 2023 16:55:49 +0200 Subject: [PATCH 06/11] fix graceful shutdown --- server/src/transport/ws.rs | 41 +++++++++++++++++++++++++------------- 1 file changed, 27 insertions(+), 14 deletions(-) diff --git a/server/src/transport/ws.rs b/server/src/transport/ws.rs index 65ba14df2e..cb24aaec5e 100644 --- a/server/src/transport/ws.rs +++ b/server/src/transport/ws.rs @@ -1,7 +1,6 @@ use std::sync::Arc; use std::time::Duration; -use crate::future::StopHandle; use crate::logger::{self, Logger, TransportProtocol}; use crate::server::{BatchRequestConfig, ServiceData}; @@ -254,10 +253,10 @@ pub(crate) async fn background_task( let (conn_tx, conn_rx) = oneshot::channel(); let sink = MethodSink::new_with_limit(tx, max_response_body_size, max_log_length); let bounded_subscriptions = BoundedSubscriptions::new(max_subscriptions_per_connection); - let mut pending_calls = FuturesUnordered::new(); + let pending_calls = FuturesUnordered::new(); // Spawn another task that sends out the responses on the Websocket. - tokio::spawn(send_task(rx, sender, stop_handle.clone(), ping_interval, conn_rx)); + tokio::spawn(send_task(rx, sender, ping_interval, conn_rx)); // Buffer for incoming data. let mut data = Vec::with_capacity(100); @@ -324,15 +323,13 @@ pub(crate) async fn background_task( pending_calls.push(tokio::spawn(execute_unchecked_call(params))); }; - logger.on_disconnect(remote_addr, TransportProtocol::WebSocket); - // Drive all running methods to completion. // **NOTE** Do not return early in this function. This `await` needs to run to guarantee // proper drop behaviour. - while pending_calls.next().await.is_some() {} - - let _ = conn_tx.send(()); + graceful_shutdown(pending_calls, conn_tx).await; + logger.on_disconnect(remote_addr, TransportProtocol::WebSocket); drop(conn); + result } @@ -340,21 +337,19 @@ pub(crate) async fn background_task( async fn send_task( rx: mpsc::Receiver, mut ws_sender: Sender, - stop_handle: StopHandle, ping_interval: Duration, - conn_closed: oneshot::Receiver<()>, + stop: oneshot::Receiver<()>, ) { // Interval to send out continuously `pings`. let ping_interval = IntervalStream::new(tokio::time::interval(ping_interval)); - let stopped = stop_handle.shutdown(); let rx = ReceiverStream::new(rx); - tokio::pin!(ping_interval, stopped, rx, conn_closed); + tokio::pin!(ping_interval, rx, stop); // Received messages from the WebSocket. let mut rx_item = rx.next(); let next_ping = ping_interval.next(); - let mut futs = future::select(next_ping, future::select(stopped, conn_closed)); + let mut futs = future::select(next_ping, stop); loop { // Ensure select is cancel-safe by fetching and storing the `rx_item` that did not finish yet. @@ -387,7 +382,7 @@ async fn send_task( futs = future::select(ping_interval.next(), stop); } - // Server is stopped or closed + // Server is stopped. Either::Right((Either::Right(_), _)) => { break; } @@ -545,3 +540,21 @@ async fn execute_unchecked_call(params: ExecuteCallParams) { } }; } + +async fn graceful_shutdown(mut pending_calls: FuturesUnordered, mut conn: oneshot::Sender<()>) { + loop { + tokio::select! { + // If connection is closed, there is no point waiting for the calls to terminate. + _ = conn.closed() => { + break; + } + maybe_done = pending_calls.next() => { + // If the all pending calls has been executed => it's ok to stop. + if maybe_done.is_none() { + let _ = conn.send(()); + break; + } + } + } + } +} From 010a68a7662e15ab4e93ac328d001f11d724088b Mon Sep 17 00:00:00 2001 From: Niklas Adolfsson Date: Fri, 14 Apr 2023 17:55:07 +0200 Subject: [PATCH 07/11] minor tweaks --- server/src/transport/ws.rs | 54 ++++++++++++++++++++++---------------- 1 file changed, 31 insertions(+), 23 deletions(-) diff --git a/server/src/transport/ws.rs b/server/src/transport/ws.rs index cb24aaec5e..9453420bd4 100644 --- a/server/src/transport/ws.rs +++ b/server/src/transport/ws.rs @@ -250,10 +250,10 @@ pub(crate) async fn background_task( } = svc; let (tx, rx) = mpsc::channel::(message_buffer_capacity as usize); - let (conn_tx, conn_rx) = oneshot::channel(); + let (mut conn_tx, conn_rx) = oneshot::channel(); let sink = MethodSink::new_with_limit(tx, max_response_body_size, max_log_length); let bounded_subscriptions = BoundedSubscriptions::new(max_subscriptions_per_connection); - let pending_calls = FuturesUnordered::new(); + let mut pending_calls = FuturesUnordered::new(); // Spawn another task that sends out the responses on the Websocket. tokio::spawn(send_task(rx, sender, ping_interval, conn_rx)); @@ -267,7 +267,7 @@ pub(crate) async fn background_task( let result = loop { data.clear(); - let sink_permit = match wait_for_reserve(&sink, stopped).await { + let sink_permit = match wait_for_permit(&sink, stopped).await { Some((permit, stop)) => { stopped = stop; permit @@ -326,7 +326,26 @@ pub(crate) async fn background_task( // Drive all running methods to completion. // **NOTE** Do not return early in this function. This `await` needs to run to guarantee // proper drop behaviour. - graceful_shutdown(pending_calls, conn_tx).await; + // + // This is not strictly not needed because `tokio::spawn` will drive these the completion + // but it's preferred the `stop_handle.stopped()` should not return until all methods has been + // executed and the connection has been closed. + loop { + tokio::select! { + // If connection is closed, there is no point waiting for the calls to terminate. + _ = conn_tx.closed() => { + break; + } + maybe_done = pending_calls.next() => { + // All pending calls has been executed => ok to stop. + if maybe_done.is_none() { + let _ = conn_tx.send(()); + break; + } + } + } + } + logger.on_disconnect(remote_addr, TransportProtocol::WebSocket); drop(conn); @@ -400,7 +419,13 @@ enum Receive { Ok(S), } -async fn wait_for_reserve(sink: &MethodSink, stopped: S) -> Option<(MethodSinkPermit, S)> +// Wait until there is a slot in the bounded channel. +// +// This will force the client to read socket on the other side +// otherwise the socket will not be read again. +// +// Fails if the server was stopped. +async fn wait_for_permit(sink: &MethodSink, stopped: S) -> Option<(MethodSinkPermit, S)> where S: Future + Unpin, { @@ -413,6 +438,7 @@ where } } +/// Attempts to read data from WebSocket fails if the server was stopped. async fn try_recv(receiver: &mut Receiver, data: &mut Vec, stopped: S) -> Receive where S: Future + Unpin, @@ -540,21 +566,3 @@ async fn execute_unchecked_call(params: ExecuteCallParams) { } }; } - -async fn graceful_shutdown(mut pending_calls: FuturesUnordered, mut conn: oneshot::Sender<()>) { - loop { - tokio::select! { - // If connection is closed, there is no point waiting for the calls to terminate. - _ = conn.closed() => { - break; - } - maybe_done = pending_calls.next() => { - // If the all pending calls has been executed => it's ok to stop. - if maybe_done.is_none() { - let _ = conn.send(()); - break; - } - } - } - } -} From e3f47df4a2227cc369a4321be23d87dab56ff937 Mon Sep 17 00:00:00 2001 From: Niklas Adolfsson Date: Fri, 14 Apr 2023 19:49:16 +0200 Subject: [PATCH 08/11] add test for graceful shutdown --- server/src/transport/ws.rs | 19 ++------- tests/tests/integration_tests.rs | 69 +++++++++++++++++++++++++++++++- 2 files changed, 71 insertions(+), 17 deletions(-) diff --git a/server/src/transport/ws.rs b/server/src/transport/ws.rs index 9453420bd4..89e68716a1 100644 --- a/server/src/transport/ws.rs +++ b/server/src/transport/ws.rs @@ -250,7 +250,7 @@ pub(crate) async fn background_task( } = svc; let (tx, rx) = mpsc::channel::(message_buffer_capacity as usize); - let (mut conn_tx, conn_rx) = oneshot::channel(); + let (conn_tx, conn_rx) = oneshot::channel(); let sink = MethodSink::new_with_limit(tx, max_response_body_size, max_log_length); let bounded_subscriptions = BoundedSubscriptions::new(max_subscriptions_per_connection); let mut pending_calls = FuturesUnordered::new(); @@ -330,21 +330,8 @@ pub(crate) async fn background_task( // This is not strictly not needed because `tokio::spawn` will drive these the completion // but it's preferred the `stop_handle.stopped()` should not return until all methods has been // executed and the connection has been closed. - loop { - tokio::select! { - // If connection is closed, there is no point waiting for the calls to terminate. - _ = conn_tx.closed() => { - break; - } - maybe_done = pending_calls.next() => { - // All pending calls has been executed => ok to stop. - if maybe_done.is_none() { - let _ = conn_tx.send(()); - break; - } - } - } - } + while pending_calls.next().await.is_some() {} + let _ = conn_tx.send(()); logger.on_disconnect(remote_addr, TransportProtocol::WebSocket); drop(conn); diff --git a/tests/tests/integration_tests.rs b/tests/tests/integration_tests.rs index d2713c49f6..bf3e0c2044 100644 --- a/tests/tests/integration_tests.rs +++ b/tests/tests/integration_tests.rs @@ -29,9 +29,11 @@ mod helpers; +use std::sync::atomic::AtomicBool; use std::sync::Arc; use std::time::Duration; +use futures::stream::FuturesUnordered; use futures::{channel::mpsc, StreamExt, TryStreamExt}; use helpers::{ init_logger, pipe_from_stream_and_drop, server, server_with_access_control, server_with_health_api, @@ -43,9 +45,10 @@ use jsonrpsee::core::params::{ArrayParams, BatchRequestBuilder}; use jsonrpsee::core::server::SubscriptionMessage; use jsonrpsee::core::{Error, JsonValue}; use jsonrpsee::http_client::HttpClientBuilder; -use jsonrpsee::rpc_params; +use jsonrpsee::server::ServerBuilder; use jsonrpsee::types::error::{ErrorObject, UNKNOWN_ERROR_CODE}; use jsonrpsee::ws_client::WsClientBuilder; +use jsonrpsee::{rpc_params, RpcModule}; use jsonrpsee_test_utils::TimeoutFutureExt; use tokio::time::interval; use tokio_stream::wrappers::IntervalStream; @@ -1140,3 +1143,67 @@ async fn subscription_ok_unit_not_sent() { // Assert that `result: null` is not sent. assert!(sub.next().with_timeout(std::time::Duration::from_secs(10)).await.is_err()); } + +#[tokio::test] +async fn graceful_shutdown_works() { + init_logger(); + + let (tx, mut rx) = tokio::sync::mpsc::unbounded_channel(); + let call_answered = Arc::new(AtomicBool::new(false)); + + let (handle, addr) = { + let server = ServerBuilder::default().build("127.0.0.1:0").with_default_timeout().await.unwrap().unwrap(); + + let mut module = RpcModule::new((tx, call_answered.clone())); + + module + .register_async_method("sleep_10s", |_, mut ctx| async move { + let ctx = Arc::make_mut(&mut ctx); + let _ = ctx.0.send(()); + tokio::time::sleep(Duration::from_secs(10)).await; + ctx.1.store(true, std::sync::atomic::Ordering::SeqCst); + "ok" + }) + .unwrap(); + let addr = server.local_addr().unwrap(); + + (server.start(module).unwrap(), addr) + }; + + let client = Arc::new( + WsClientBuilder::default().build(format!("ws://{addr}")).with_default_timeout().await.unwrap().unwrap(), + ); + + let mut calls: FuturesUnordered<_> = (0..10) + .map(|_| { + let c = client.clone(); + async move { c.request::("sleep_10s", rpc_params!()).await } + }) + .collect(); + + let calls_len = calls.len(); + + let res = tokio::spawn(async move { + let mut c = 0; + while let Some(Ok(_)) = calls.next().await { + c += 1; + } + c + }); + + // All calls has been received by server => then stop. + for _ in 0..calls_len { + rx.recv().await.unwrap(); + } + + // Assert that no calls have been answered yet + // The assumption is that the server should be able to read 10 messages < 10s. + assert!(!call_answered.load(std::sync::atomic::Ordering::SeqCst)); + + // Stop the server. + handle.stop().unwrap(); + handle.stopped().await; + + // The pending calls should be answered before shutdown. + assert_eq!(res.await.unwrap(), calls_len); +} From 198355dc13bf3ae5d951d3c3413e68e2ba862a50 Mon Sep 17 00:00:00 2001 From: Niklas Adolfsson Date: Sat, 15 Apr 2023 19:55:58 +0200 Subject: [PATCH 09/11] add test for #585 --- server/src/tests/ws.rs | 36 ++++++++++++++++++++++++++++++++++++ server/src/transport/ws.rs | 14 ++++++++++---- 2 files changed, 46 insertions(+), 4 deletions(-) diff --git a/server/src/tests/ws.rs b/server/src/tests/ws.rs index 2f18ee8191..a5051c358c 100644 --- a/server/src/tests/ws.rs +++ b/server/src/tests/ws.rs @@ -802,3 +802,39 @@ async fn notif_is_ignored() { // This call should not be answered and a timeout is regarded as "not answered" assert!(client.send_request_text(r#"{"jsonrpc":"2.0","method":"bar"}"#).with_default_timeout().await.is_err()); } + +#[tokio::test] +async fn drop_client_with_pending_calls_works() { + init_logger(); + + let (handle, addr) = { + let server = ServerBuilder::default().build("127.0.0.1:0").with_default_timeout().await.unwrap().unwrap(); + + let mut module = RpcModule::new(()); + + module + .register_async_method("infinite_call", |_, _| async move { + futures_util::future::pending::<()>().await; + "ok" + }) + .unwrap(); + let addr = server.local_addr().unwrap(); + + (server.start(module).unwrap(), addr) + }; + + let mut client = WebSocketTestClient::new(addr).with_default_timeout().await.unwrap().unwrap(); + + for _ in 0..10 { + let req = r#"{"jsonrpc":"2.0","method":"sleep_10s","id":1}"#; + client.send(req).with_default_timeout().await.unwrap().unwrap(); + } + + client.close().await.unwrap(); + assert!(client.receive().await.is_err()); + + // Stop the server and ensure that the server doesn't wait for futures to complete + // when the connection has already been closed. + handle.stop().unwrap(); + assert!(handle.stopped().with_default_timeout().await.is_ok()); +} diff --git a/server/src/transport/ws.rs b/server/src/transport/ws.rs index 89e68716a1..3f44ca9f59 100644 --- a/server/src/transport/ws.rs +++ b/server/src/transport/ws.rs @@ -250,7 +250,7 @@ pub(crate) async fn background_task( } = svc; let (tx, rx) = mpsc::channel::(message_buffer_capacity as usize); - let (conn_tx, conn_rx) = oneshot::channel(); + let (mut conn_tx, conn_rx) = oneshot::channel(); let sink = MethodSink::new_with_limit(tx, max_response_body_size, max_log_length); let bounded_subscriptions = BoundedSubscriptions::new(max_subscriptions_per_connection); let mut pending_calls = FuturesUnordered::new(); @@ -328,10 +328,16 @@ pub(crate) async fn background_task( // proper drop behaviour. // // This is not strictly not needed because `tokio::spawn` will drive these the completion - // but it's preferred the `stop_handle.stopped()` should not return until all methods has been + // but it's preferred that the `stop_handle.stopped()` should not return until all methods has been // executed and the connection has been closed. - while pending_calls.next().await.is_some() {} - let _ = conn_tx.send(()); + tokio::select! { + // All pending calls executed. + _ = pending_calls.for_each(|_| async {}) => { + _ = conn_tx.send(()); + } + // The connection was closed, no point of waiting for the pending calls. + _ = conn_tx.closed() => {} + } logger.on_disconnect(remote_addr, TransportProtocol::WebSocket); drop(conn); From 2aa7bbb1da956c2040500ff9712c00afb42a9092 Mon Sep 17 00:00:00 2001 From: Niklas Adolfsson Date: Sat, 15 Apr 2023 19:58:40 +0200 Subject: [PATCH 10/11] compile warn --- server/src/transport/ws.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/server/src/transport/ws.rs b/server/src/transport/ws.rs index 3f44ca9f59..b806d5f217 100644 --- a/server/src/transport/ws.rs +++ b/server/src/transport/ws.rs @@ -253,7 +253,7 @@ pub(crate) async fn background_task( let (mut conn_tx, conn_rx) = oneshot::channel(); let sink = MethodSink::new_with_limit(tx, max_response_body_size, max_log_length); let bounded_subscriptions = BoundedSubscriptions::new(max_subscriptions_per_connection); - let mut pending_calls = FuturesUnordered::new(); + let pending_calls = FuturesUnordered::new(); // Spawn another task that sends out the responses on the Websocket. tokio::spawn(send_task(rx, sender, ping_interval, conn_rx)); From 4c4c854feb03f1d4c2ec8d18084ba0fcae27608c Mon Sep 17 00:00:00 2001 From: Niklas Adolfsson Date: Sat, 15 Apr 2023 20:01:23 +0200 Subject: [PATCH 11/11] fix nit --- server/src/tests/ws.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/server/src/tests/ws.rs b/server/src/tests/ws.rs index a5051c358c..4a9d374d26 100644 --- a/server/src/tests/ws.rs +++ b/server/src/tests/ws.rs @@ -826,7 +826,7 @@ async fn drop_client_with_pending_calls_works() { let mut client = WebSocketTestClient::new(addr).with_default_timeout().await.unwrap().unwrap(); for _ in 0..10 { - let req = r#"{"jsonrpc":"2.0","method":"sleep_10s","id":1}"#; + let req = r#"{"jsonrpc":"2.0","method":"infinite_call","id":1}"#; client.send(req).with_default_timeout().await.unwrap().unwrap(); }