From 1162d3e95dc16a88bf713a2444410369e35a883b Mon Sep 17 00:00:00 2001 From: Alexandru Vasile Date: Tue, 17 May 2022 16:49:39 +0300 Subject: [PATCH 01/38] ws: Implement ping for `TransportSenderT` trait Signed-off-by: Alexandru Vasile --- client/transport/src/ws/mod.rs | 11 +++++++++++ core/src/client/mod.rs | 3 +++ 2 files changed, 14 insertions(+) diff --git a/client/transport/src/ws/mod.rs b/client/transport/src/ws/mod.rs index ba5f2cb175..4ce03186d3 100644 --- a/client/transport/src/ws/mod.rs +++ b/client/transport/src/ws/mod.rs @@ -41,6 +41,7 @@ use thiserror::Error; use tokio::net::TcpStream; pub use http::{uri::InvalidUri, Uri}; +use soketto::data::ByteSlice125; pub use soketto::handshake::client::Header; /// Sending end of WebSocket transport. @@ -195,6 +196,16 @@ impl TransportSenderT for Sender { Ok(()) } + /// Sends out a ping request. Returns a `Future` that finishes when the request has been + /// successfully sent. + async fn send_ping(&mut self, data: &[u8]) -> Result<(), Self::Error> { + tracing::debug!("send ping: {:?}", data); + let byte_slice = ByteSlice125::try_from(data).expect("Found invalid ping slice"); + self.inner.send_ping(byte_slice).await?; + self.inner.flush().await?; + Ok(()) + } + /// Send a close message and close the connection. async fn close(&mut self) -> Result<(), WsError> { self.inner.close().await.map_err(Into::into) diff --git a/core/src/client/mod.rs b/core/src/client/mod.rs index eb97048447..6ba0773dbc 100644 --- a/core/src/client/mod.rs +++ b/core/src/client/mod.rs @@ -132,6 +132,9 @@ pub trait TransportSenderT: MaybeSend + 'static { /// Send. async fn send(&mut self, msg: String) -> Result<(), Self::Error>; + /// Send ping frame (opcode of 0x9). + async fn send_ping(&mut self, data: &[u8]) -> Result<(), Self::Error>; + /// If the transport supports sending customized close messages. async fn close(&mut self) -> Result<(), Self::Error> { Ok(()) From 2a767016c066e84edd3fb63269089557826abe67 Mon Sep 17 00:00:00 2001 From: Alexandru Vasile Date: Tue, 17 May 2022 17:43:27 +0300 Subject: [PATCH 02/38] ws/client: Receive pong frames Signed-off-by: Alexandru Vasile --- client/transport/src/ws/mod.rs | 20 ++++++++++++++------ core/src/client/async_client/mod.rs | 11 +++++------ core/src/client/mod.rs | 12 +++++++++++- 3 files changed, 30 insertions(+), 13 deletions(-) diff --git a/client/transport/src/ws/mod.rs b/client/transport/src/ws/mod.rs index 4ce03186d3..0c3e126a43 100644 --- a/client/transport/src/ws/mod.rs +++ b/client/transport/src/ws/mod.rs @@ -31,11 +31,11 @@ use std::net::{SocketAddr, ToSocketAddrs}; use std::time::Duration; use futures_util::io::{BufReader, BufWriter}; -use jsonrpsee_core::client::{CertificateStore, TransportReceiverT, TransportSenderT}; +use jsonrpsee_core::client::{CertificateStore, ReceivedMessage, TransportReceiverT, TransportSenderT}; use jsonrpsee_core::TEN_MB_SIZE_BYTES; use jsonrpsee_core::{async_trait, Cow}; -use soketto::connection; use soketto::handshake::client::{Client as WsHandshakeClient, ServerResponse}; +use soketto::{connection, Incoming}; use stream::EitherStream; use thiserror::Error; use tokio::net::TcpStream; @@ -217,11 +217,19 @@ impl TransportReceiverT for Receiver { type Error = WsError; /// Returns a `Future` resolving when the server sent us something back. - async fn receive(&mut self) -> Result { + async fn receive(&mut self) -> Result { let mut message = Vec::new(); - self.inner.receive_data(&mut message).await?; - let s = String::from_utf8(message).expect("Found invalid UTF-8"); - Ok(s) + + loop { + let recv = self.inner.receive(&mut message).await?; + + if let Incoming::Data(_) = recv { + let s = String::from_utf8(message).expect("Found invalid UTF-8"); + return Ok(ReceivedMessage::Data(s)); + } else if let Incoming::Pong(pong_data) = recv { + return Ok(ReceivedMessage::Pong(Vec::from(pong_data))); + } + } } } diff --git a/core/src/client/async_client/mod.rs b/core/src/client/async_client/mod.rs index db81ba0559..c8043ac052 100644 --- a/core/src/client/async_client/mod.rs +++ b/core/src/client/async_client/mod.rs @@ -5,11 +5,7 @@ mod manager; use core::time::Duration; -use crate::client::{ - async_client::helpers::process_subscription_close_response, BatchMessage, ClientT, RegisterNotificationMessage, - RequestMessage, Subscription, SubscriptionClientT, SubscriptionKind, SubscriptionMessage, TransportReceiverT, - TransportSenderT, -}; +use crate::client::{async_client::helpers::process_subscription_close_response, BatchMessage, ClientT, ReceivedMessage, RegisterNotificationMessage, RequestMessage, Subscription, SubscriptionClientT, SubscriptionKind, SubscriptionMessage, TransportReceiverT, TransportSenderT}; use helpers::{ build_unsubscribe_message, call_with_timeout, process_batch_response, process_error_response, process_notification, process_single_response, process_subscription_response, stop_subscription, @@ -492,7 +488,10 @@ async fn background_task( tracing::trace!("[backend] unregistering notification handler: {:?}", method); let _ = manager.remove_notification_handler(method); } - Either::Right((Some(Ok(raw)), _)) => { + Either::Right((Some(Ok(ReceivedMessage::Pong(pong_data))), _)) => { + tracing::debug!("[backend]: recv pong {:?}", pong_data); + } + Either::Right((Some(Ok(ReceivedMessage::Data(raw))), _)) => { // Single response to a request. if let Ok(single) = serde_json::from_str::>(&raw) { tracing::debug!("[backend]: recv method_call {:?}", single); diff --git a/core/src/client/mod.rs b/core/src/client/mod.rs index 6ba0773dbc..0acbbe5641 100644 --- a/core/src/client/mod.rs +++ b/core/src/client/mod.rs @@ -141,6 +141,16 @@ pub trait TransportSenderT: MaybeSend + 'static { } } +/// Message type received from the RPC server. +/// It can either be plain `String` data, or a `Pong` reply to a previously submitted `Ping`. +#[derive(Debug)] +pub enum ReceivedMessage { + /// Incoming packet contains plain `String` data. + Data(String), + /// Incoming packet is a `Pong` frame sent in response to a `Ping`. + Pong(Vec), +} + /// Transport interface to receive data asynchronous. #[cfg_attr(target_arch = "wasm32", async_trait(?Send))] #[cfg_attr(not(target_arch = "wasm32"), async_trait)] @@ -149,7 +159,7 @@ pub trait TransportReceiverT: 'static { type Error: std::error::Error + Send + Sync; /// Receive. - async fn receive(&mut self) -> Result; + async fn receive(&mut self) -> Result; } #[macro_export] From 80af45a4f9c783d42b7a4d7cd18fb08e75015676 Mon Sep 17 00:00:00 2001 From: Alexandru Vasile Date: Tue, 17 May 2022 17:51:30 +0300 Subject: [PATCH 03/38] core/client: Use `select!` macro for the background task Signed-off-by: Alexandru Vasile --- core/src/client/async_client/mod.rs | 275 ++++++++++++++-------------- 1 file changed, 138 insertions(+), 137 deletions(-) diff --git a/core/src/client/async_client/mod.rs b/core/src/client/async_client/mod.rs index c8043ac052..6b678cdb14 100644 --- a/core/src/client/async_client/mod.rs +++ b/core/src/client/async_client/mod.rs @@ -18,6 +18,7 @@ use async_trait::async_trait; use futures_channel::{mpsc, oneshot}; use futures_timer::Delay; use futures_util::future::{self, Either}; +use futures_util::select; use futures_util::sink::SinkExt; use futures_util::stream::StreamExt; use jsonrpsee_types::{ @@ -399,166 +400,166 @@ async fn background_task( let next_backend = backend_event.next(); futures_util::pin_mut!(next_frontend, next_backend); - match futures_util::future::select(next_frontend, next_backend).await { - // User dropped the sender side of the channel. - // There is nothing to do just terminate. - Either::Left((None, _)) => { - tracing::trace!("[backend]: frontend dropped; terminate client"); - break; - } - - Either::Left((Some(FrontToBack::Batch(batch)), _)) => { - tracing::trace!("[backend]: client prepares to send batch request: {:?}", batch.raw); - // NOTE(niklasad1): annoying allocation. - if let Err(send_back) = manager.insert_pending_batch(batch.ids.clone(), batch.send_back) { - tracing::warn!("[backend]: batch request: {:?} already pending", batch.ids); - let _ = send_back.send(Err(Error::InvalidRequestId)); - continue; + select! { + frontend_value = next_frontend => match frontend_value { + // User dropped the sender side of the channel. + // There is nothing to do just terminate. + None => { + tracing::trace!("[backend]: frontend dropped; terminate client"); + break; } + Some(FrontToBack::Batch(batch)) => { + tracing::trace!("[backend]: client prepares to send batch request: {:?}", batch.raw); + // NOTE(niklasad1): annoying allocation. + if let Err(send_back) = manager.insert_pending_batch(batch.ids.clone(), batch.send_back) { + tracing::warn!("[backend]: batch request: {:?} already pending", batch.ids); + let _ = send_back.send(Err(Error::InvalidRequestId)); + continue; + } - if let Err(e) = sender.send(batch.raw).await { - tracing::warn!("[backend]: client batch request failed: {:?}", e); - manager.complete_pending_batch(batch.ids); + if let Err(e) = sender.send(batch.raw).await { + tracing::warn!("[backend]: client batch request failed: {:?}", e); + manager.complete_pending_batch(batch.ids); + } } - } - // User called `notification` on the front-end - Either::Left((Some(FrontToBack::Notification(notif)), _)) => { - tracing::trace!("[backend]: client prepares to send notification: {:?}", notif); - if let Err(e) = sender.send(notif).await { - tracing::warn!("[backend]: client notif failed: {:?}", e); + // User called `notification` on the front-end + Some(FrontToBack::Notification(notif)) => { + tracing::trace!("[backend]: client prepares to send notification: {:?}", notif); + if let Err(e) = sender.send(notif).await { + tracing::warn!("[backend]: client notif failed: {:?}", e); + } } - } - - // User called `request` on the front-end - Either::Left((Some(FrontToBack::Request(request)), _)) => { - tracing::trace!("[backend]: client prepares to send request={:?}", request); - match sender.send(request.raw).await { + // User called `request` on the front-end + Some(FrontToBack::Request(request)) => { + tracing::trace!("[backend]: client prepares to send request={:?}", request); + match sender.send(request.raw).await { + Ok(_) => manager + .insert_pending_call(request.id, request.send_back) + .expect("ID unused checked above; qed"), + Err(e) => { + tracing::warn!("[backend]: client request failed: {:?}", e); + let _ = request.send_back.map(|s| s.send(Err(Error::Transport(e.into())))); + } + } + } + // User called `subscribe` on the front-end. + Some(FrontToBack::Subscribe(sub)) => match sender.send(sub.raw).await { Ok(_) => manager - .insert_pending_call(request.id, request.send_back) - .expect("ID unused checked above; qed"), + .insert_pending_subscription( + sub.subscribe_id, + sub.unsubscribe_id, + sub.send_back, + sub.unsubscribe_method, + ) + .expect("Request ID unused checked above; qed"), Err(e) => { - tracing::warn!("[backend]: client request failed: {:?}", e); - let _ = request.send_back.map(|s| s.send(Err(Error::Transport(e.into())))); + tracing::warn!("[backend]: client subscription failed: {:?}", e); + let _ = sub.send_back.send(Err(Error::Transport(e.into()))); } } - } - - // User called `subscribe` on the front-end. - Either::Left((Some(FrontToBack::Subscribe(sub)), _)) => match sender.send(sub.raw).await { - Ok(_) => manager - .insert_pending_subscription( - sub.subscribe_id, - sub.unsubscribe_id, - sub.send_back, - sub.unsubscribe_method, - ) - .expect("Request ID unused checked above; qed"), - Err(e) => { - tracing::warn!("[backend]: client subscription failed: {:?}", e); - let _ = sub.send_back.send(Err(Error::Transport(e.into()))); + // User dropped a subscription. + Some(FrontToBack::SubscriptionClosed(sub_id)) => { + tracing::trace!("Closing subscription: {:?}", sub_id); + // NOTE: The subscription may have been closed earlier if + // the channel was full or disconnected. + if let Some(unsub) = manager + .get_request_id_by_subscription_id(&sub_id) + .and_then(|req_id| build_unsubscribe_message(&mut manager, req_id, sub_id)) + { + stop_subscription(&mut sender, &mut manager, unsub).await; + } } - }, - // User dropped a subscription. - Either::Left((Some(FrontToBack::SubscriptionClosed(sub_id)), _)) => { - tracing::trace!("Closing subscription: {:?}", sub_id); - // NOTE: The subscription may have been closed earlier if - // the channel was full or disconnected. - if let Some(unsub) = manager - .get_request_id_by_subscription_id(&sub_id) - .and_then(|req_id| build_unsubscribe_message(&mut manager, req_id, sub_id)) - { - stop_subscription(&mut sender, &mut manager, unsub).await; + // User called `register_notification` on the front-end. + Some(FrontToBack::RegisterNotification(reg)) => { + tracing::trace!("[backend] registering notification handler: {:?}", reg.method); + let (subscribe_tx, subscribe_rx) = mpsc::channel(max_notifs_per_subscription); + + if manager.insert_notification_handler(®.method, subscribe_tx).is_ok() { + let _ = reg.send_back.send(Ok((subscribe_rx, reg.method))); + } else { + let _ = reg.send_back.send(Err(Error::MethodAlreadyRegistered(reg.method))); + } } - } - - // User called `register_notification` on the front-end. - Either::Left((Some(FrontToBack::RegisterNotification(reg)), _)) => { - tracing::trace!("[backend] registering notification handler: {:?}", reg.method); - let (subscribe_tx, subscribe_rx) = mpsc::channel(max_notifs_per_subscription); - - if manager.insert_notification_handler(®.method, subscribe_tx).is_ok() { - let _ = reg.send_back.send(Ok((subscribe_rx, reg.method))); - } else { - let _ = reg.send_back.send(Err(Error::MethodAlreadyRegistered(reg.method))); + // User dropped the notificationHandler for this method + Some(FrontToBack::UnregisterNotification(method)) => { + tracing::trace!("[backend] unregistering notification handler: {:?}", method); + let _ = manager.remove_notification_handler(method); } - } + }, - // User dropped the notificationHandler for this method - Either::Left((Some(FrontToBack::UnregisterNotification(method)), _)) => { - tracing::trace!("[backend] unregistering notification handler: {:?}", method); - let _ = manager.remove_notification_handler(method); - } - Either::Right((Some(Ok(ReceivedMessage::Pong(pong_data))), _)) => { - tracing::debug!("[backend]: recv pong {:?}", pong_data); - } - Either::Right((Some(Ok(ReceivedMessage::Data(raw))), _)) => { - // Single response to a request. - if let Ok(single) = serde_json::from_str::>(&raw) { - tracing::debug!("[backend]: recv method_call {:?}", single); - match process_single_response(&mut manager, single, max_notifs_per_subscription) { - Ok(Some(unsub)) => { - stop_subscription(&mut sender, &mut manager, unsub).await; + backend_value = next_backend => match backend_value { + Some(Ok(ReceivedMessage::Pong(pong_data))) => { + tracing::debug!("[backend]: recv pong {:?}", pong_data); + } + Some(Ok(ReceivedMessage::Data(raw))) => { + // Single response to a request. + if let Ok(single) = serde_json::from_str::>(&raw) { + tracing::debug!("[backend]: recv method_call {:?}", single); + match process_single_response(&mut manager, single, max_notifs_per_subscription) { + Ok(Some(unsub)) => { + stop_subscription(&mut sender, &mut manager, unsub).await; + } + Ok(None) => (), + Err(err) => { + let _ = front_error.send(err); + break; + } } - Ok(None) => (), - Err(err) => { - let _ = front_error.send(err); + } + // Subscription response. + else if let Ok(response) = serde_json::from_str::>(&raw) { + tracing::debug!("[backend]: recv subscription {:?}", response); + if let Err(Some(unsub)) = process_subscription_response(&mut manager, response) { + let _ = stop_subscription(&mut sender, &mut manager, unsub).await; + } + } + // Subscription error response. + else if let Ok(response) = serde_json::from_str::>(&raw) { + tracing::debug!("[backend]: recv subscription closed {:?}", response); + let _ = process_subscription_close_response(&mut manager, response); + } + // Incoming Notification + else if let Ok(notif) = serde_json::from_str::>(&raw) { + tracing::debug!("[backend]: recv notification {:?}", notif); + let _ = process_notification(&mut manager, notif); + } + // Batch response. + else if let Ok(batch) = serde_json::from_str::>>(&raw) { + tracing::debug!("[backend]: recv batch {:?}", batch); + if let Err(e) = process_batch_response(&mut manager, batch) { + let _ = front_error.send(e); break; } } - } - // Subscription response. - else if let Ok(response) = serde_json::from_str::>(&raw) { - tracing::debug!("[backend]: recv subscription {:?}", response); - if let Err(Some(unsub)) = process_subscription_response(&mut manager, response) { - let _ = stop_subscription(&mut sender, &mut manager, unsub).await; + // Error response + else if let Ok(err) = serde_json::from_str::(&raw) { + tracing::debug!("[backend]: recv error response {:?}", err); + if let Err(e) = process_error_response(&mut manager, err) { + let _ = front_error.send(e); + break; + } } - } - // Subscription error response. - else if let Ok(response) = serde_json::from_str::>(&raw) { - tracing::debug!("[backend]: recv subscription closed {:?}", response); - let _ = process_subscription_close_response(&mut manager, response); - } - // Incoming Notification - else if let Ok(notif) = serde_json::from_str::>(&raw) { - tracing::debug!("[backend]: recv notification {:?}", notif); - let _ = process_notification(&mut manager, notif); - } - // Batch response. - else if let Ok(batch) = serde_json::from_str::>>(&raw) { - tracing::debug!("[backend]: recv batch {:?}", batch); - if let Err(e) = process_batch_response(&mut manager, batch) { - let _ = front_error.send(e); + // Unparsable response + else { + tracing::debug!( + "[backend]: recv unparseable message: {:?}", + serde_json::from_str::(&raw) + ); + let _ = front_error.send(Error::Custom("Unparsable response".into())); break; } } - // Error response - else if let Ok(err) = serde_json::from_str::(&raw) { - tracing::debug!("[backend]: recv error response {:?}", err); - if let Err(e) = process_error_response(&mut manager, err) { - let _ = front_error.send(e); - break; - } + Some(Err(e)) => { + tracing::error!("Error: {:?} terminating client", e); + let _ = front_error.send(Error::Transport(e.into())); + break; } - // Unparsable response - else { - tracing::debug!( - "[backend]: recv unparseable message: {:?}", - serde_json::from_str::(&raw) - ); - let _ = front_error.send(Error::Custom("Unparsable response".into())); + None => { + tracing::error!("[backend]: WebSocket receiver dropped; terminate client"); + let _ = front_error.send(Error::Custom("WebSocket receiver dropped".into())); break; } - } - Either::Right((Some(Err(e)), _)) => { - tracing::error!("Error: {:?} terminating client", e); - let _ = front_error.send(Error::Transport(e.into())); - break; - } - Either::Right((None, _)) => { - tracing::error!("[backend]: WebSocket receiver dropped; terminate client"); - let _ = front_error.send(Error::Custom("WebSocket receiver dropped".into())); - break; - } + }, } } // Send close message to the server. From 63ba7e76958b15a18eeb033012610d8ab680b5b8 Mon Sep 17 00:00:00 2001 From: Alexandru Vasile Date: Tue, 17 May 2022 18:14:21 +0300 Subject: [PATCH 04/38] client: Propagate ping interval to background task Signed-off-by: Alexandru Vasile --- client/ws-client/src/lib.rs | 9 +++++++++ core/src/client/async_client/mod.rs | 17 ++++++++++++++++- 2 files changed, 25 insertions(+), 1 deletion(-) diff --git a/client/ws-client/src/lib.rs b/client/ws-client/src/lib.rs index 06c6a11757..701e8033da 100644 --- a/client/ws-client/src/lib.rs +++ b/client/ws-client/src/lib.rs @@ -73,6 +73,7 @@ pub struct WsClientBuilder<'a> { max_request_body_size: u32, request_timeout: Duration, connection_timeout: Duration, + ping_interval: Duration, headers: Vec>, max_concurrent_requests: usize, max_notifs_per_subscription: usize, @@ -87,6 +88,7 @@ impl<'a> Default for WsClientBuilder<'a> { max_request_body_size: TEN_MB_SIZE_BYTES, request_timeout: Duration::from_secs(60), connection_timeout: Duration::from_secs(10), + ping_interval: Duration::from_secs(300), headers: Vec::new(), max_concurrent_requests: 256, max_notifs_per_subscription: 1024, @@ -121,6 +123,12 @@ impl<'a> WsClientBuilder<'a> { self } + /// See documentation [`ClientBuilder::ping_interval`] (default is 5 minutes). + pub fn ping_interval(mut self, interval: Duration) -> Self { + self.ping_interval = interval; + self + } + /// See documentation [`WsTransportClientBuilder::add_header`] (default is none). pub fn add_header(mut self, name: &'a str, value: &'a str) -> Self { self.headers.push(Header { name, value: value.as_bytes() }); @@ -174,6 +182,7 @@ impl<'a> WsClientBuilder<'a> { .request_timeout(self.request_timeout) .max_concurrent_requests(self.max_concurrent_requests) .id_format(self.id_kind) + .ping_interval(self.ping_interval) .build_with_tokio(sender, receiver)) } } diff --git a/core/src/client/async_client/mod.rs b/core/src/client/async_client/mod.rs index 6b678cdb14..dd644240a9 100644 --- a/core/src/client/async_client/mod.rs +++ b/core/src/client/async_client/mod.rs @@ -66,6 +66,7 @@ pub struct ClientBuilder { max_concurrent_requests: usize, max_notifs_per_subscription: usize, id_kind: IdKind, + ping_interval: Duration, } impl Default for ClientBuilder { @@ -75,6 +76,7 @@ impl Default for ClientBuilder { max_concurrent_requests: 256, max_notifs_per_subscription: 1024, id_kind: IdKind::Number, + ping_interval: Duration::from_secs(300), } } } @@ -112,6 +114,17 @@ impl ClientBuilder { self } + /// Set the interval at which pings are submitted (default is 5 minutes). + /// + /// Note: The interval duration is restarted when + /// - submitted frontend command + /// - received backend reply + /// - submitted ping + pub fn ping_interval(mut self, interval: Duration) -> Self { + self.ping_interval = interval; + self + } + /// Build the client with given transport. /// /// ## Panics @@ -127,9 +140,10 @@ impl ClientBuilder { let (to_back, from_front) = mpsc::channel(self.max_concurrent_requests); let (err_tx, err_rx) = oneshot::channel(); let max_notifs_per_subscription = self.max_notifs_per_subscription; + let ping_interval = self.ping_interval; tokio::spawn(async move { - background_task(sender, receiver, from_front, err_tx, max_notifs_per_subscription).await; + background_task(sender, receiver, from_front, err_tx, max_notifs_per_subscription, ping_interval).await; }); Client { to_back, @@ -382,6 +396,7 @@ async fn background_task( mut frontend: mpsc::Receiver, front_error: oneshot::Sender, max_notifs_per_subscription: usize, + ping_interval: Duration, ) where S: TransportSenderT, R: TransportReceiverT, From 0d3534c9728f74920ee0963e6d90babc25ead746 Mon Sep 17 00:00:00 2001 From: Alexandru Vasile Date: Tue, 17 May 2022 18:23:51 +0300 Subject: [PATCH 05/38] async_client: Submit ping requests Signed-off-by: Alexandru Vasile --- core/src/client/async_client/mod.rs | 13 ++++++++++++- 1 file changed, 12 insertions(+), 1 deletion(-) diff --git a/core/src/client/async_client/mod.rs b/core/src/client/async_client/mod.rs index dd644240a9..4ae70b6f02 100644 --- a/core/src/client/async_client/mod.rs +++ b/core/src/client/async_client/mod.rs @@ -4,7 +4,6 @@ mod helpers; mod manager; use core::time::Duration; - use crate::client::{async_client::helpers::process_subscription_close_response, BatchMessage, ClientT, ReceivedMessage, RegisterNotificationMessage, RequestMessage, Subscription, SubscriptionClientT, SubscriptionKind, SubscriptionMessage, TransportReceiverT, TransportSenderT}; use helpers::{ build_unsubscribe_message, call_with_timeout, process_batch_response, process_error_response, process_notification, @@ -20,6 +19,7 @@ use futures_timer::Delay; use futures_util::future::{self, Either}; use futures_util::select; use futures_util::sink::SinkExt; +use futures_util::FutureExt; use futures_util::stream::StreamExt; use jsonrpsee_types::{ response::SubscriptionError, ErrorResponse, Id, Notification, NotificationSer, ParamsSer, RequestSer, Response, @@ -415,7 +415,18 @@ async fn background_task( let next_backend = backend_event.next(); futures_util::pin_mut!(next_frontend, next_backend); + let mut submit_ping = Delay::new(ping_interval).fuse(); + select! { + _ = submit_ping => { + tracing::trace!("[backend]: submit ping"); + if let Err(e) = sender.send_ping(&[]).await { + tracing::warn!("[backend]: client send ping failed: {:?}", e); + let _ = front_error.send(Error::Custom("Could not send ping frame".into())); + break; + } + }, + frontend_value = next_frontend => match frontend_value { // User dropped the sender side of the channel. // There is nothing to do just terminate. From 23ad9b9adc3003d2534799c04cc53e61ba140f97 Mon Sep 17 00:00:00 2001 From: Alexandru Vasile Date: Tue, 17 May 2022 18:35:19 +0300 Subject: [PATCH 06/38] async_client: Handle pong replies Signed-off-by: Alexandru Vasile --- core/src/client/async_client/mod.rs | 18 ++++++++++++++++++ 1 file changed, 18 insertions(+) diff --git a/core/src/client/async_client/mod.rs b/core/src/client/async_client/mod.rs index 4ae70b6f02..aa585a202e 100644 --- a/core/src/client/async_client/mod.rs +++ b/core/src/client/async_client/mod.rs @@ -403,6 +403,11 @@ async fn background_task( { let mut manager = RequestManager::new(); + // Flag has the following meaning: + // - true if the ping was submitted. + // - false if the ping was not submitted, or a pong reply was received. + let mut ping_submitted = false; + let backend_event = futures_util::stream::unfold(receiver, |mut receiver| async { let res = receiver.receive().await; Some((res, receiver)) @@ -419,12 +424,21 @@ async fn background_task( select! { _ = submit_ping => { + // Ping was already submitted. + // No activity from frontend, backend (replies or pong) for a duration of `ping_interval`. + if ping_submitted { + let _ = front_error.send(Error::Custom("Did not receive a pong or activity in due time".into())); + break; + } + tracing::trace!("[backend]: submit ping"); if let Err(e) = sender.send_ping(&[]).await { tracing::warn!("[backend]: client send ping failed: {:?}", e); let _ = front_error.send(Error::Custom("Could not send ping frame".into())); break; } + + ping_submitted = true; }, frontend_value = next_frontend => match frontend_value { @@ -515,7 +529,11 @@ async fn background_task( backend_value = next_backend => match backend_value { Some(Ok(ReceivedMessage::Pong(pong_data))) => { + // From WebSocket RFC:https://www.rfc-editor.org/rfc/rfc6455#section-5.5.3 + // A `Pong` frame may be send unsolicited. + // Set just the ping submitted state to allow further pinging. tracing::debug!("[backend]: recv pong {:?}", pong_data); + ping_submitted = false; } Some(Ok(ReceivedMessage::Data(raw))) => { // Single response to a request. From 48b5db993f7c3a8696b50fa5fbcfe26aae1a52cf Mon Sep 17 00:00:00 2001 From: Alexandru Vasile Date: Thu, 19 May 2022 18:53:04 +0300 Subject: [PATCH 07/38] client: Handle frontend messages to dedicated fn Signed-off-by: Alexandru Vasile --- core/src/client/async_client/mod.rs | 195 +++++++++++++++------------- 1 file changed, 106 insertions(+), 89 deletions(-) diff --git a/core/src/client/async_client/mod.rs b/core/src/client/async_client/mod.rs index aa585a202e..75ac5ad49e 100644 --- a/core/src/client/async_client/mod.rs +++ b/core/src/client/async_client/mod.rs @@ -66,7 +66,7 @@ pub struct ClientBuilder { max_concurrent_requests: usize, max_notifs_per_subscription: usize, id_kind: IdKind, - ping_interval: Duration, + ping_interval: Option, } impl Default for ClientBuilder { @@ -76,7 +76,7 @@ impl Default for ClientBuilder { max_concurrent_requests: 256, max_notifs_per_subscription: 1024, id_kind: IdKind::Number, - ping_interval: Duration::from_secs(300), + ping_interval: None, } } } @@ -121,7 +121,7 @@ impl ClientBuilder { /// - received backend reply /// - submitted ping pub fn ping_interval(mut self, interval: Duration) -> Self { - self.ping_interval = interval; + self.ping_interval = Some(interval); self } @@ -389,6 +389,104 @@ impl SubscriptionClientT for Client { } } +/// Handle frontend messages. +/// +/// Returns `true` if the main background loop should be terminated. +async fn handle_frontend_messages( + message: Option, + manager: &mut RequestManager, + sender: &mut S, + max_notifs_per_subscription: usize) -> bool { + match message { + // User dropped the sender side of the channel. + // There is nothing to do just terminate. + None => { + tracing::trace!("[backend]: frontend dropped; terminate client"); + return true; + } + + Some(FrontToBack::Batch(batch)) => { + tracing::trace!("[backend]: client prepares to send batch request: {:?}", batch.raw); + // NOTE(niklasad1): annoying allocation. + if let Err(send_back) = manager.insert_pending_batch(batch.ids.clone(), batch.send_back) { + tracing::warn!("[backend]: batch request: {:?} already pending", batch.ids); + let _ = send_back.send(Err(Error::InvalidRequestId)); + return false; + } + + if let Err(e) = sender.send(batch.raw).await { + tracing::warn!("[backend]: client batch request failed: {:?}", e); + manager.complete_pending_batch(batch.ids); + } + } + // User called `notification` on the front-end + Some(FrontToBack::Notification(notif)) => { + tracing::trace!("[backend]: client prepares to send notification: {:?}", notif); + if let Err(e) = sender.send(notif).await { + tracing::warn!("[backend]: client notif failed: {:?}", e); + } + } + // User called `request` on the front-end + Some(FrontToBack::Request(request)) => { + tracing::trace!("[backend]: client prepares to send request={:?}", request); + match sender.send(request.raw).await { + Ok(_) => manager + .insert_pending_call(request.id, request.send_back) + .expect("ID unused checked above; qed"), + Err(e) => { + tracing::warn!("[backend]: client request failed: {:?}", e); + let _ = request.send_back.map(|s| s.send(Err(Error::Transport(e.into())))); + } + } + } + // User called `subscribe` on the front-end. + Some(FrontToBack::Subscribe(sub)) => match sender.send(sub.raw).await { + Ok(_) => manager + .insert_pending_subscription( + sub.subscribe_id, + sub.unsubscribe_id, + sub.send_back, + sub.unsubscribe_method, + ) + .expect("Request ID unused checked above; qed"), + Err(e) => { + tracing::warn!("[backend]: client subscription failed: {:?}", e); + let _ = sub.send_back.send(Err(Error::Transport(e.into()))); + } + } + // User dropped a subscription. + Some(FrontToBack::SubscriptionClosed(sub_id)) => { + tracing::trace!("Closing subscription: {:?}", sub_id); + // NOTE: The subscription may have been closed earlier if + // the channel was full or disconnected. + if let Some(unsub) = manager + .get_request_id_by_subscription_id(&sub_id) + .and_then(|req_id| build_unsubscribe_message(manager, req_id, sub_id)) + { + stop_subscription(sender, manager, unsub).await; + } + } + // User called `register_notification` on the front-end. + Some(FrontToBack::RegisterNotification(reg)) => { + tracing::trace!("[backend] registering notification handler: {:?}", reg.method); + let (subscribe_tx, subscribe_rx) = mpsc::channel(max_notifs_per_subscription); + + if manager.insert_notification_handler(®.method, subscribe_tx).is_ok() { + let _ = reg.send_back.send(Ok((subscribe_rx, reg.method))); + } else { + let _ = reg.send_back.send(Err(Error::MethodAlreadyRegistered(reg.method))); + } + } + // User dropped the notificationHandler for this method + Some(FrontToBack::UnregisterNotification(method)) => { + tracing::trace!("[backend] unregistering notification handler: {:?}", method); + let _ = manager.remove_notification_handler(method); + } + } + + return false; +} + /// Function being run in the background that processes messages from the frontend. async fn background_task( mut sender: S, @@ -396,7 +494,7 @@ async fn background_task( mut frontend: mpsc::Receiver, front_error: oneshot::Sender, max_notifs_per_subscription: usize, - ping_interval: Duration, + ping_interval: Option, ) where S: TransportSenderT, R: TransportReceiverT, @@ -420,7 +518,8 @@ async fn background_task( let next_backend = backend_event.next(); futures_util::pin_mut!(next_frontend, next_backend); - let mut submit_ping = Delay::new(ping_interval).fuse(); + let mut submit_ping = Delay::new(ping_interval.map_or(Duration::from_secs(10), |f| f)).fuse(); + let mut should_stop = false; select! { _ = submit_ping => { @@ -441,90 +540,8 @@ async fn background_task( ping_submitted = true; }, - frontend_value = next_frontend => match frontend_value { - // User dropped the sender side of the channel. - // There is nothing to do just terminate. - None => { - tracing::trace!("[backend]: frontend dropped; terminate client"); - break; - } - Some(FrontToBack::Batch(batch)) => { - tracing::trace!("[backend]: client prepares to send batch request: {:?}", batch.raw); - // NOTE(niklasad1): annoying allocation. - if let Err(send_back) = manager.insert_pending_batch(batch.ids.clone(), batch.send_back) { - tracing::warn!("[backend]: batch request: {:?} already pending", batch.ids); - let _ = send_back.send(Err(Error::InvalidRequestId)); - continue; - } - - if let Err(e) = sender.send(batch.raw).await { - tracing::warn!("[backend]: client batch request failed: {:?}", e); - manager.complete_pending_batch(batch.ids); - } - } - // User called `notification` on the front-end - Some(FrontToBack::Notification(notif)) => { - tracing::trace!("[backend]: client prepares to send notification: {:?}", notif); - if let Err(e) = sender.send(notif).await { - tracing::warn!("[backend]: client notif failed: {:?}", e); - } - } - // User called `request` on the front-end - Some(FrontToBack::Request(request)) => { - tracing::trace!("[backend]: client prepares to send request={:?}", request); - match sender.send(request.raw).await { - Ok(_) => manager - .insert_pending_call(request.id, request.send_back) - .expect("ID unused checked above; qed"), - Err(e) => { - tracing::warn!("[backend]: client request failed: {:?}", e); - let _ = request.send_back.map(|s| s.send(Err(Error::Transport(e.into())))); - } - } - } - // User called `subscribe` on the front-end. - Some(FrontToBack::Subscribe(sub)) => match sender.send(sub.raw).await { - Ok(_) => manager - .insert_pending_subscription( - sub.subscribe_id, - sub.unsubscribe_id, - sub.send_back, - sub.unsubscribe_method, - ) - .expect("Request ID unused checked above; qed"), - Err(e) => { - tracing::warn!("[backend]: client subscription failed: {:?}", e); - let _ = sub.send_back.send(Err(Error::Transport(e.into()))); - } - } - // User dropped a subscription. - Some(FrontToBack::SubscriptionClosed(sub_id)) => { - tracing::trace!("Closing subscription: {:?}", sub_id); - // NOTE: The subscription may have been closed earlier if - // the channel was full or disconnected. - if let Some(unsub) = manager - .get_request_id_by_subscription_id(&sub_id) - .and_then(|req_id| build_unsubscribe_message(&mut manager, req_id, sub_id)) - { - stop_subscription(&mut sender, &mut manager, unsub).await; - } - } - // User called `register_notification` on the front-end. - Some(FrontToBack::RegisterNotification(reg)) => { - tracing::trace!("[backend] registering notification handler: {:?}", reg.method); - let (subscribe_tx, subscribe_rx) = mpsc::channel(max_notifs_per_subscription); - - if manager.insert_notification_handler(®.method, subscribe_tx).is_ok() { - let _ = reg.send_back.send(Ok((subscribe_rx, reg.method))); - } else { - let _ = reg.send_back.send(Err(Error::MethodAlreadyRegistered(reg.method))); - } - } - // User dropped the notificationHandler for this method - Some(FrontToBack::UnregisterNotification(method)) => { - tracing::trace!("[backend] unregistering notification handler: {:?}", method); - let _ = manager.remove_notification_handler(method); - } + frontend_value = next_frontend => { + should_stop = handle_frontend_messages(frontend_value, &mut manager, &mut sender, max_notifs_per_subscription).await; }, backend_value = next_backend => match backend_value { From c86b08e9c7f3cce462bbf96e7f1fc2bee19133eb Mon Sep 17 00:00:00 2001 From: Alexandru Vasile Date: Thu, 19 May 2022 19:19:06 +0300 Subject: [PATCH 08/38] client: Handle backend messages in dedicated fn Signed-off-by: Alexandru Vasile --- core/src/client/async_client/mod.rs | 185 +++++++++++++++------------- 1 file changed, 102 insertions(+), 83 deletions(-) diff --git a/core/src/client/async_client/mod.rs b/core/src/client/async_client/mod.rs index 75ac5ad49e..d6ac9d3d0f 100644 --- a/core/src/client/async_client/mod.rs +++ b/core/src/client/async_client/mod.rs @@ -3,8 +3,12 @@ mod helpers; mod manager; +use crate::client::{ + async_client::helpers::process_subscription_close_response, BatchMessage, ClientT, ReceivedMessage, + RegisterNotificationMessage, RequestMessage, Subscription, SubscriptionClientT, SubscriptionKind, + SubscriptionMessage, TransportReceiverT, TransportSenderT, +}; use core::time::Duration; -use crate::client::{async_client::helpers::process_subscription_close_response, BatchMessage, ClientT, ReceivedMessage, RegisterNotificationMessage, RequestMessage, Subscription, SubscriptionClientT, SubscriptionKind, SubscriptionMessage, TransportReceiverT, TransportSenderT}; use helpers::{ build_unsubscribe_message, call_with_timeout, process_batch_response, process_error_response, process_notification, process_single_response, process_subscription_response, stop_subscription, @@ -19,8 +23,8 @@ use futures_timer::Delay; use futures_util::future::{self, Either}; use futures_util::select; use futures_util::sink::SinkExt; -use futures_util::FutureExt; use futures_util::stream::StreamExt; +use futures_util::FutureExt; use jsonrpsee_types::{ response::SubscriptionError, ErrorResponse, Id, Notification, NotificationSer, ParamsSer, RequestSer, Response, SubscriptionResponse, @@ -389,6 +393,89 @@ impl SubscriptionClientT for Client { } } +/// Handle backend messages. +/// +/// Returns error if the main background loop should be terminated. +async fn handle_backend_messages( + message: Option>, + ping_submitted: &mut bool, + manager: &mut RequestManager, + sender: &mut S, + max_notifs_per_subscription: usize, +) -> Result<(), Error> { + match message { + Some(Ok(ReceivedMessage::Pong(pong_data))) => { + // From WebSocket RFC:https://www.rfc-editor.org/rfc/rfc6455#section-5.5.3 + // A `Pong` frame may be send unsolicited. + // Set just the ping submitted state to allow further pinging. + tracing::debug!("[backend]: recv pong {:?}", pong_data); + *ping_submitted = false; + } + Some(Ok(ReceivedMessage::Data(raw))) => { + // Single response to a request. + if let Ok(single) = serde_json::from_str::>(&raw) { + tracing::debug!("[backend]: recv method_call {:?}", single); + match process_single_response(manager, single, max_notifs_per_subscription) { + Ok(Some(unsub)) => { + stop_subscription(sender, manager, unsub).await; + } + Ok(None) => (), + Err(err) => return Err(err), + } + } + // Subscription response. + else if let Ok(response) = serde_json::from_str::>(&raw) { + tracing::debug!("[backend]: recv subscription {:?}", response); + if let Err(Some(unsub)) = process_subscription_response(manager, response) { + let _ = stop_subscription(sender, manager, unsub).await; + } + } + // Subscription error response. + else if let Ok(response) = serde_json::from_str::>(&raw) { + tracing::debug!("[backend]: recv subscription closed {:?}", response); + let _ = process_subscription_close_response(manager, response); + } + // Incoming Notification + else if let Ok(notif) = serde_json::from_str::>(&raw) { + tracing::debug!("[backend]: recv notification {:?}", notif); + let _ = process_notification(manager, notif); + } + // Batch response. + else if let Ok(batch) = serde_json::from_str::>>(&raw) { + tracing::debug!("[backend]: recv batch {:?}", batch); + if let Err(e) = process_batch_response(manager, batch) { + return Err(e); + } + } + // Error response + else if let Ok(err) = serde_json::from_str::(&raw) { + tracing::debug!("[backend]: recv error response {:?}", err); + if let Err(e) = process_error_response(manager, err) { + return Err(e); + } + } + // Unparsable response + else { + tracing::debug!( + "[backend]: recv unparseable message: {:?}", + serde_json::from_str::(&raw) + ); + return Err(Error::Custom("Unparsable response".into())); + } + } + Some(Err(e)) => { + tracing::error!("Error: {:?} terminating client", e); + return Err(Error::Transport(e.into())); + } + None => { + tracing::error!("[backend]: WebSocket receiver dropped; terminate client"); + return Err(Error::Custom("WebSocket receiver dropped".into())); + } + } + + return Ok(()); +} + /// Handle frontend messages. /// /// Returns `true` if the main background loop should be terminated. @@ -396,7 +483,8 @@ async fn handle_frontend_messages( message: Option, manager: &mut RequestManager, sender: &mut S, - max_notifs_per_subscription: usize) -> bool { + max_notifs_per_subscription: usize, +) -> bool { match message { // User dropped the sender side of the channel. // There is nothing to do just terminate. @@ -430,9 +518,9 @@ async fn handle_frontend_messages( Some(FrontToBack::Request(request)) => { tracing::trace!("[backend]: client prepares to send request={:?}", request); match sender.send(request.raw).await { - Ok(_) => manager - .insert_pending_call(request.id, request.send_back) - .expect("ID unused checked above; qed"), + Ok(_) => { + manager.insert_pending_call(request.id, request.send_back).expect("ID unused checked above; qed") + } Err(e) => { tracing::warn!("[backend]: client request failed: {:?}", e); let _ = request.send_back.map(|s| s.send(Err(Error::Transport(e.into())))); @@ -453,7 +541,7 @@ async fn handle_frontend_messages( tracing::warn!("[backend]: client subscription failed: {:?}", e); let _ = sub.send_back.send(Err(Error::Transport(e.into()))); } - } + }, // User dropped a subscription. Some(FrontToBack::SubscriptionClosed(sub_id)) => { tracing::trace!("Closing subscription: {:?}", sub_id); @@ -519,7 +607,6 @@ async fn background_task( futures_util::pin_mut!(next_frontend, next_backend); let mut submit_ping = Delay::new(ping_interval.map_or(Duration::from_secs(10), |f| f)).fuse(); - let mut should_stop = false; select! { _ = submit_ping => { @@ -541,83 +628,15 @@ async fn background_task( }, frontend_value = next_frontend => { - should_stop = handle_frontend_messages(frontend_value, &mut manager, &mut sender, max_notifs_per_subscription).await; - }, - - backend_value = next_backend => match backend_value { - Some(Ok(ReceivedMessage::Pong(pong_data))) => { - // From WebSocket RFC:https://www.rfc-editor.org/rfc/rfc6455#section-5.5.3 - // A `Pong` frame may be send unsolicited. - // Set just the ping submitted state to allow further pinging. - tracing::debug!("[backend]: recv pong {:?}", pong_data); - ping_submitted = false; - } - Some(Ok(ReceivedMessage::Data(raw))) => { - // Single response to a request. - if let Ok(single) = serde_json::from_str::>(&raw) { - tracing::debug!("[backend]: recv method_call {:?}", single); - match process_single_response(&mut manager, single, max_notifs_per_subscription) { - Ok(Some(unsub)) => { - stop_subscription(&mut sender, &mut manager, unsub).await; - } - Ok(None) => (), - Err(err) => { - let _ = front_error.send(err); - break; - } - } - } - // Subscription response. - else if let Ok(response) = serde_json::from_str::>(&raw) { - tracing::debug!("[backend]: recv subscription {:?}", response); - if let Err(Some(unsub)) = process_subscription_response(&mut manager, response) { - let _ = stop_subscription(&mut sender, &mut manager, unsub).await; - } - } - // Subscription error response. - else if let Ok(response) = serde_json::from_str::>(&raw) { - tracing::debug!("[backend]: recv subscription closed {:?}", response); - let _ = process_subscription_close_response(&mut manager, response); - } - // Incoming Notification - else if let Ok(notif) = serde_json::from_str::>(&raw) { - tracing::debug!("[backend]: recv notification {:?}", notif); - let _ = process_notification(&mut manager, notif); - } - // Batch response. - else if let Ok(batch) = serde_json::from_str::>>(&raw) { - tracing::debug!("[backend]: recv batch {:?}", batch); - if let Err(e) = process_batch_response(&mut manager, batch) { - let _ = front_error.send(e); - break; - } - } - // Error response - else if let Ok(err) = serde_json::from_str::(&raw) { - tracing::debug!("[backend]: recv error response {:?}", err); - if let Err(e) = process_error_response(&mut manager, err) { - let _ = front_error.send(e); - break; - } - } - // Unparsable response - else { - tracing::debug!( - "[backend]: recv unparseable message: {:?}", - serde_json::from_str::(&raw) - ); - let _ = front_error.send(Error::Custom("Unparsable response".into())); - break; - } - } - Some(Err(e)) => { - tracing::error!("Error: {:?} terminating client", e); - let _ = front_error.send(Error::Transport(e.into())); + if handle_frontend_messages(frontend_value, &mut manager, &mut sender, max_notifs_per_subscription).await { break; } - None => { - tracing::error!("[backend]: WebSocket receiver dropped; terminate client"); - let _ = front_error.send(Error::Custom("WebSocket receiver dropped".into())); + }, + backend_value = next_backend => { + if let Err(err) = handle_backend_messages::( + backend_value, &mut ping_submitted, &mut manager, &mut sender, max_notifs_per_subscription + ).await { + let _ = front_error.send(err); break; } }, From ae5bf7534b63ec30abe5446a5c941d2e4abab665 Mon Sep 17 00:00:00 2001 From: Alexandru Vasile Date: Thu, 19 May 2022 20:03:00 +0300 Subject: [PATCH 09/38] client: Add terminated fuse for opt-out pings Signed-off-by: Alexandru Vasile --- core/src/client/async_client/mod.rs | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/core/src/client/async_client/mod.rs b/core/src/client/async_client/mod.rs index d6ac9d3d0f..6ae8e432b8 100644 --- a/core/src/client/async_client/mod.rs +++ b/core/src/client/async_client/mod.rs @@ -20,7 +20,7 @@ use async_lock::Mutex; use async_trait::async_trait; use futures_channel::{mpsc, oneshot}; use futures_timer::Delay; -use futures_util::future::{self, Either}; +use futures_util::future::{self, Either, Fuse}; use futures_util::select; use futures_util::sink::SinkExt; use futures_util::stream::StreamExt; @@ -606,7 +606,11 @@ async fn background_task( let next_backend = backend_event.next(); futures_util::pin_mut!(next_frontend, next_backend); - let mut submit_ping = Delay::new(ping_interval.map_or(Duration::from_secs(10), |f| f)).fuse(); + let mut submit_ping = if let Some(duration) = ping_interval { + Delay::new(duration).fuse() + } else { + Fuse::::terminated() + }; select! { _ = submit_ping => { From 0ee78c5316b3906e3c43f3d667a44268a45475c6 Mon Sep 17 00:00:00 2001 From: Alexandru Vasile Date: Thu, 19 May 2022 20:08:56 +0300 Subject: [PATCH 10/38] Set opt-out behavior for client pings Signed-off-by: Alexandru Vasile --- client/ws-client/src/lib.rs | 20 ++++++++++++-------- core/src/client/async_client/mod.rs | 2 +- 2 files changed, 13 insertions(+), 9 deletions(-) diff --git a/client/ws-client/src/lib.rs b/client/ws-client/src/lib.rs index 701e8033da..e026bb83de 100644 --- a/client/ws-client/src/lib.rs +++ b/client/ws-client/src/lib.rs @@ -73,7 +73,7 @@ pub struct WsClientBuilder<'a> { max_request_body_size: u32, request_timeout: Duration, connection_timeout: Duration, - ping_interval: Duration, + ping_interval: Option, headers: Vec>, max_concurrent_requests: usize, max_notifs_per_subscription: usize, @@ -88,7 +88,7 @@ impl<'a> Default for WsClientBuilder<'a> { max_request_body_size: TEN_MB_SIZE_BYTES, request_timeout: Duration::from_secs(60), connection_timeout: Duration::from_secs(10), - ping_interval: Duration::from_secs(300), + ping_interval: None, headers: Vec::new(), max_concurrent_requests: 256, max_notifs_per_subscription: 1024, @@ -123,9 +123,9 @@ impl<'a> WsClientBuilder<'a> { self } - /// See documentation [`ClientBuilder::ping_interval`] (default is 5 minutes). + /// See documentation [`ClientBuilder::ping_interval`] (disabled by default). pub fn ping_interval(mut self, interval: Duration) -> Self { - self.ping_interval = interval; + self.ping_interval = Some(interval); self } @@ -177,12 +177,16 @@ impl<'a> WsClientBuilder<'a> { let uri: Uri = url.as_ref().parse().map_err(|e: InvalidUri| Error::Transport(e.into()))?; let (sender, receiver) = transport_builder.build(uri).await.map_err(|e| Error::Transport(e.into()))?; - Ok(ClientBuilder::default() + let mut client = ClientBuilder::default() .max_notifs_per_subscription(self.max_notifs_per_subscription) .request_timeout(self.request_timeout) .max_concurrent_requests(self.max_concurrent_requests) - .id_format(self.id_kind) - .ping_interval(self.ping_interval) - .build_with_tokio(sender, receiver)) + .id_format(self.id_kind); + + if let Some(interval) = self.ping_interval { + client = client.ping_interval(interval); + } + + Ok(client.build_with_tokio(sender, receiver)) } } diff --git a/core/src/client/async_client/mod.rs b/core/src/client/async_client/mod.rs index 6ae8e432b8..506dddfc52 100644 --- a/core/src/client/async_client/mod.rs +++ b/core/src/client/async_client/mod.rs @@ -118,7 +118,7 @@ impl ClientBuilder { self } - /// Set the interval at which pings are submitted (default is 5 minutes). + /// Set the interval at which pings are submitted (disabled by default). /// /// Note: The interval duration is restarted when /// - submitted frontend command From e7c6edb75ab72ceadd04b7b4a725f0883acbb0ab Mon Sep 17 00:00:00 2001 From: Alexandru Vasile Date: Fri, 20 May 2022 12:25:35 +0300 Subject: [PATCH 11/38] client: Move imports Signed-off-by: Alexandru Vasile --- client/transport/src/ws/mod.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/client/transport/src/ws/mod.rs b/client/transport/src/ws/mod.rs index 0c3e126a43..8568194279 100644 --- a/client/transport/src/ws/mod.rs +++ b/client/transport/src/ws/mod.rs @@ -34,6 +34,7 @@ use futures_util::io::{BufReader, BufWriter}; use jsonrpsee_core::client::{CertificateStore, ReceivedMessage, TransportReceiverT, TransportSenderT}; use jsonrpsee_core::TEN_MB_SIZE_BYTES; use jsonrpsee_core::{async_trait, Cow}; +use soketto::data::ByteSlice125; use soketto::handshake::client::{Client as WsHandshakeClient, ServerResponse}; use soketto::{connection, Incoming}; use stream::EitherStream; @@ -41,7 +42,6 @@ use thiserror::Error; use tokio::net::TcpStream; pub use http::{uri::InvalidUri, Uri}; -use soketto::data::ByteSlice125; pub use soketto::handshake::client::Header; /// Sending end of WebSocket transport. From 9c5f2352e2aa9494344d45a1f2dbc6a57b5d6dae Mon Sep 17 00:00:00 2001 From: Alexandru Vasile Date: Fri, 20 May 2022 13:00:04 +0300 Subject: [PATCH 12/38] client: Handle handle_frontend_messages errors Signed-off-by: Alexandru Vasile --- core/src/client/async_client/mod.rs | 16 +++++++++------- 1 file changed, 9 insertions(+), 7 deletions(-) diff --git a/core/src/client/async_client/mod.rs b/core/src/client/async_client/mod.rs index 506dddfc52..e1028e1cb2 100644 --- a/core/src/client/async_client/mod.rs +++ b/core/src/client/async_client/mod.rs @@ -478,19 +478,18 @@ async fn handle_backend_messages( /// Handle frontend messages. /// -/// Returns `true` if the main background loop should be terminated. +/// Returns error if the main background loop should be terminated. async fn handle_frontend_messages( message: Option, manager: &mut RequestManager, sender: &mut S, max_notifs_per_subscription: usize, -) -> bool { +) -> Result<(), Error> { match message { // User dropped the sender side of the channel. // There is nothing to do just terminate. None => { - tracing::trace!("[backend]: frontend dropped; terminate client"); - return true; + return Err(Error::Custom("[backend]: frontend dropped; terminate client".into())); } Some(FrontToBack::Batch(batch)) => { @@ -499,7 +498,7 @@ async fn handle_frontend_messages( if let Err(send_back) = manager.insert_pending_batch(batch.ids.clone(), batch.send_back) { tracing::warn!("[backend]: batch request: {:?} already pending", batch.ids); let _ = send_back.send(Err(Error::InvalidRequestId)); - return false; + return Ok(()); } if let Err(e) = sender.send(batch.raw).await { @@ -572,7 +571,7 @@ async fn handle_frontend_messages( } } - return false; + Ok(()) } /// Function being run in the background that processes messages from the frontend. @@ -632,7 +631,9 @@ async fn background_task( }, frontend_value = next_frontend => { - if handle_frontend_messages(frontend_value, &mut manager, &mut sender, max_notifs_per_subscription).await { + if let Err(err) = handle_frontend_messages(frontend_value, &mut manager, &mut sender, max_notifs_per_subscription).await { + tracing::warn!("{:?}", err); + let _ = front_error.send(err); break; } }, @@ -640,6 +641,7 @@ async fn background_task( if let Err(err) = handle_backend_messages::( backend_value, &mut ping_submitted, &mut manager, &mut sender, max_notifs_per_subscription ).await { + tracing::warn!("{:?}", err); let _ = front_error.send(err); break; } From 8e3ff4028c458cfb1c29123801510980818d0b26 Mon Sep 17 00:00:00 2001 From: Alexandru Vasile Date: Fri, 20 May 2022 13:52:01 +0300 Subject: [PATCH 13/38] client: Add custom error related to byteslice conversions Signed-off-by: Alexandru Vasile --- client/transport/src/ws/mod.rs | 13 ++++++++++--- 1 file changed, 10 insertions(+), 3 deletions(-) diff --git a/client/transport/src/ws/mod.rs b/client/transport/src/ws/mod.rs index 8568194279..2e44e81d9b 100644 --- a/client/transport/src/ws/mod.rs +++ b/client/transport/src/ws/mod.rs @@ -30,10 +30,12 @@ use std::io; use std::net::{SocketAddr, ToSocketAddrs}; use std::time::Duration; +use crate::ws::WsError::Custom; use futures_util::io::{BufReader, BufWriter}; use jsonrpsee_core::client::{CertificateStore, ReceivedMessage, TransportReceiverT, TransportSenderT}; use jsonrpsee_core::TEN_MB_SIZE_BYTES; use jsonrpsee_core::{async_trait, Cow}; +use soketto::connection::Error::Utf8; use soketto::data::ByteSlice125; use soketto::handshake::client::{Client as WsHandshakeClient, ServerResponse}; use soketto::{connection, Incoming}; @@ -181,6 +183,9 @@ pub enum WsError { /// Error in the WebSocket connection. #[error("WebSocket connection error: {0}")] Connection(#[source] soketto::connection::Error), + /// Custom error that originated before or after interacting with the WebSocket. + #[error("Custom error: {0}")] + Custom(String), } #[async_trait] @@ -199,8 +204,10 @@ impl TransportSenderT for Sender { /// Sends out a ping request. Returns a `Future` that finishes when the request has been /// successfully sent. async fn send_ping(&mut self, data: &[u8]) -> Result<(), Self::Error> { - tracing::debug!("send ping: {:?}", data); - let byte_slice = ByteSlice125::try_from(data).expect("Found invalid ping slice"); + tracing::debug!("send ping"); + // Byte slice fails if the provided slice is larger than 125 bytes. + let byte_slice = ByteSlice125::try_from(data).map_err(|err| Custom(err.clone().to_string().into()))?; + self.inner.send_ping(byte_slice).await?; self.inner.flush().await?; Ok(()) @@ -224,7 +231,7 @@ impl TransportReceiverT for Receiver { let recv = self.inner.receive(&mut message).await?; if let Incoming::Data(_) = recv { - let s = String::from_utf8(message).expect("Found invalid UTF-8"); + let s = String::from_utf8(message).map_err(|err| WsError::Connection(Utf8(err.utf8_error())))?; return Ok(ReceivedMessage::Data(s)); } else if let Incoming::Pong(pong_data) = recv { return Ok(ReceivedMessage::Pong(Vec::from(pong_data))); From 7d39b6ffc47f446d1084c7d23f47a041133cdee2 Mon Sep 17 00:00:00 2001 From: Alexandru Vasile Date: Fri, 20 May 2022 13:59:12 +0300 Subject: [PATCH 14/38] client: Modify `send_ping` to send empty slices Signed-off-by: Alexandru Vasile --- client/transport/src/ws/mod.rs | 6 ++++-- core/src/client/async_client/mod.rs | 2 +- core/src/client/mod.rs | 2 +- 3 files changed, 6 insertions(+), 4 deletions(-) diff --git a/client/transport/src/ws/mod.rs b/client/transport/src/ws/mod.rs index 2e44e81d9b..e3fba72ca7 100644 --- a/client/transport/src/ws/mod.rs +++ b/client/transport/src/ws/mod.rs @@ -203,10 +203,12 @@ impl TransportSenderT for Sender { /// Sends out a ping request. Returns a `Future` that finishes when the request has been /// successfully sent. - async fn send_ping(&mut self, data: &[u8]) -> Result<(), Self::Error> { + async fn send_ping(&mut self) -> Result<(), Self::Error> { tracing::debug!("send ping"); + // Submit empty slice as "optional" parameter. + let slice: &[u8] = &[]; // Byte slice fails if the provided slice is larger than 125 bytes. - let byte_slice = ByteSlice125::try_from(data).map_err(|err| Custom(err.clone().to_string().into()))?; + let byte_slice = ByteSlice125::try_from(slice).map_err(|err| Custom(err.clone().to_string().into()))?; self.inner.send_ping(byte_slice).await?; self.inner.flush().await?; diff --git a/core/src/client/async_client/mod.rs b/core/src/client/async_client/mod.rs index e1028e1cb2..9daf09c678 100644 --- a/core/src/client/async_client/mod.rs +++ b/core/src/client/async_client/mod.rs @@ -621,7 +621,7 @@ async fn background_task( } tracing::trace!("[backend]: submit ping"); - if let Err(e) = sender.send_ping(&[]).await { + if let Err(e) = sender.send_ping().await { tracing::warn!("[backend]: client send ping failed: {:?}", e); let _ = front_error.send(Error::Custom("Could not send ping frame".into())); break; diff --git a/core/src/client/mod.rs b/core/src/client/mod.rs index 0acbbe5641..6a0c5ba6bf 100644 --- a/core/src/client/mod.rs +++ b/core/src/client/mod.rs @@ -133,7 +133,7 @@ pub trait TransportSenderT: MaybeSend + 'static { async fn send(&mut self, msg: String) -> Result<(), Self::Error>; /// Send ping frame (opcode of 0x9). - async fn send_ping(&mut self, data: &[u8]) -> Result<(), Self::Error>; + async fn send_ping(&mut self) -> Result<(), Self::Error>; /// If the transport supports sending customized close messages. async fn close(&mut self) -> Result<(), Self::Error> { From 64a7b99634414bbe1f189685c1cd5b7814d07e1b Mon Sep 17 00:00:00 2001 From: Alexandru Vasile Date: Fri, 20 May 2022 14:36:35 +0300 Subject: [PATCH 15/38] Fix `cargo hack check` and use `select_biased` Signed-off-by: Alexandru Vasile --- core/Cargo.toml | 2 ++ core/src/client/async_client/mod.rs | 4 ++-- jsonrpsee/Cargo.toml | 2 +- 3 files changed, 5 insertions(+), 3 deletions(-) diff --git a/core/Cargo.toml b/core/Cargo.toml index 31690b12ed..f46f578b56 100644 --- a/core/Cargo.toml +++ b/core/Cargo.toml @@ -54,6 +54,7 @@ async-client = [ "tokio/sync", "tracing", "futures-timer", + "futures-util/async-await-macro", ] async-wasm-client = [ "async-lock", @@ -62,6 +63,7 @@ async-wasm-client = [ "rustc-hash/std", "futures-timer/wasm-bindgen", "tracing", + "futures-util/async-await-macro", ] [dev-dependencies] diff --git a/core/src/client/async_client/mod.rs b/core/src/client/async_client/mod.rs index 9daf09c678..aa3d65caa7 100644 --- a/core/src/client/async_client/mod.rs +++ b/core/src/client/async_client/mod.rs @@ -21,7 +21,7 @@ use async_trait::async_trait; use futures_channel::{mpsc, oneshot}; use futures_timer::Delay; use futures_util::future::{self, Either, Fuse}; -use futures_util::select; +use futures_util::select_biased; use futures_util::sink::SinkExt; use futures_util::stream::StreamExt; use futures_util::FutureExt; @@ -611,7 +611,7 @@ async fn background_task( Fuse::::terminated() }; - select! { + select_biased! { _ = submit_ping => { // Ping was already submitted. // No activity from frontend, backend (replies or pong) for a duration of `ping_interval`. diff --git a/jsonrpsee/Cargo.toml b/jsonrpsee/Cargo.toml index 4cd40d0687..59eb13139e 100644 --- a/jsonrpsee/Cargo.toml +++ b/jsonrpsee/Cargo.toml @@ -30,7 +30,7 @@ async-client = ["jsonrpsee-core/async-client"] http-client = ["jsonrpsee-http-client", "jsonrpsee-types", "jsonrpsee-core"] http-server = ["jsonrpsee-http-server", "jsonrpsee-types", "jsonrpsee-core"] wasm-client = ["jsonrpsee-wasm-client", "jsonrpsee-types", "jsonrpsee-core"] -ws-client = ["jsonrpsee-ws-client", "jsonrpsee-types", "jsonrpsee-core"] +ws-client = ["jsonrpsee-ws-client", "jsonrpsee-types", "jsonrpsee-core", "jsonrpsee-core/async-wasm-client"] ws-server = ["jsonrpsee-ws-server", "jsonrpsee-types", "jsonrpsee-core"] macros = ["jsonrpsee-proc-macros", "jsonrpsee-types", "jsonrpsee-core/client", "tracing"] From 15595546322a402015cb5a3a70dff325b4e9e77b Mon Sep 17 00:00:00 2001 From: Alexandru Vasile Date: Fri, 20 May 2022 14:37:29 +0300 Subject: [PATCH 16/38] Handle sending pings with lowest priority Signed-off-by: Alexandru Vasile --- core/src/client/async_client/mod.rs | 35 ++++++++++++++--------------- 1 file changed, 17 insertions(+), 18 deletions(-) diff --git a/core/src/client/async_client/mod.rs b/core/src/client/async_client/mod.rs index aa3d65caa7..be37ac71bc 100644 --- a/core/src/client/async_client/mod.rs +++ b/core/src/client/async_client/mod.rs @@ -612,24 +612,6 @@ async fn background_task( }; select_biased! { - _ = submit_ping => { - // Ping was already submitted. - // No activity from frontend, backend (replies or pong) for a duration of `ping_interval`. - if ping_submitted { - let _ = front_error.send(Error::Custom("Did not receive a pong or activity in due time".into())); - break; - } - - tracing::trace!("[backend]: submit ping"); - if let Err(e) = sender.send_ping().await { - tracing::warn!("[backend]: client send ping failed: {:?}", e); - let _ = front_error.send(Error::Custom("Could not send ping frame".into())); - break; - } - - ping_submitted = true; - }, - frontend_value = next_frontend => { if let Err(err) = handle_frontend_messages(frontend_value, &mut manager, &mut sender, max_notifs_per_subscription).await { tracing::warn!("{:?}", err); @@ -646,6 +628,23 @@ async fn background_task( break; } }, + _ = submit_ping => { + // Ping was already submitted. + // No activity from frontend, backend (replies or pong) for a duration of `ping_interval`. + if ping_submitted { + let _ = front_error.send(Error::Custom("Did not receive a pong or activity in due time".into())); + break; + } + + tracing::trace!("[backend]: submit ping"); + if let Err(e) = sender.send_ping().await { + tracing::warn!("[backend]: client send ping failed: {:?}", e); + let _ = front_error.send(Error::Custom("Could not send ping frame".into())); + break; + } + + ping_submitted = true; + }, } } // Send close message to the server. From 5ba4d0e696aa393e812fb16155a3aacc0726f83a Mon Sep 17 00:00:00 2001 From: Alexandru Vasile Date: Fri, 20 May 2022 14:57:26 +0300 Subject: [PATCH 17/38] core: Add proper number of params to `background_task` Signed-off-by: Alexandru Vasile --- core/src/client/async_client/mod.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/src/client/async_client/mod.rs b/core/src/client/async_client/mod.rs index be37ac71bc..1e8d4f94f7 100644 --- a/core/src/client/async_client/mod.rs +++ b/core/src/client/async_client/mod.rs @@ -170,7 +170,7 @@ impl ClientBuilder { let max_notifs_per_subscription = self.max_notifs_per_subscription; wasm_bindgen_futures::spawn_local(async move { - background_task(sender, receiver, from_front, err_tx, max_notifs_per_subscription).await; + background_task(sender, receiver, from_front, err_tx, max_notifs_per_subscription, None).await; }); Client { to_back, From aed7d2659dec425ab795e66330127123ccd31fd6 Mon Sep 17 00:00:00 2001 From: Alexandru Vasile Date: Fri, 20 May 2022 18:51:35 +0300 Subject: [PATCH 18/38] Fix wasm client Signed-off-by: Alexandru Vasile --- wasm-tests/tests/wasm.rs | 9 ++++++--- 1 file changed, 6 insertions(+), 3 deletions(-) diff --git a/wasm-tests/tests/wasm.rs b/wasm-tests/tests/wasm.rs index 14a22df510..9b5c0326d7 100644 --- a/wasm-tests/tests/wasm.rs +++ b/wasm-tests/tests/wasm.rs @@ -2,7 +2,7 @@ use jsonrpsee_client_transport::web::*; use jsonrpsee_core::{ - client::{ClientT, Subscription, SubscriptionClientT, TransportReceiverT, TransportSenderT}, + client::{ClientT, ReceivedMessage, Subscription, SubscriptionClientT, TransportReceiverT, TransportSenderT}, rpc_params, }; use jsonrpsee_wasm_client::WasmClientBuilder; @@ -26,9 +26,12 @@ async fn wasm_ws_transport_works() { let exp = r#"{"jsonrpc":"2.0","result":"Substrate Node","id":1}"#; tx.send(req.to_string()).await.unwrap(); - let rp = rx.receive().await.unwrap(); + let rp: ReceivedMessage = rx.receive().await.unwrap(); - assert_eq!(exp, &rp); + match rp { + ReceivedMessage::Data(str) => assert_eq!(exp, &str), + _ => assert!(false, "Expected string message"), + }; } #[wasm_bindgen_test] From 48d6eec87028ed7555e8c46f63380e8b9dc6ed8d Mon Sep 17 00:00:00 2001 From: Alexandru Vasile Date: Fri, 20 May 2022 18:51:52 +0300 Subject: [PATCH 19/38] Handle raw bytes and string received messages Signed-off-by: Alexandru Vasile --- client/transport/src/web.rs | 19 +++-- core/src/client/async_client/mod.rs | 115 ++++++++++++++++------------ core/src/client/mod.rs | 2 + 3 files changed, 78 insertions(+), 58 deletions(-) diff --git a/client/transport/src/web.rs b/client/transport/src/web.rs index 2eab606b68..80fa779abf 100644 --- a/client/transport/src/web.rs +++ b/client/transport/src/web.rs @@ -5,7 +5,7 @@ use futures_util::sink::SinkExt; use futures_util::stream::{SplitSink, SplitStream, StreamExt}; use gloo_net::websocket::{futures::WebSocket, Message, WebSocketError}; use jsonrpsee_core::async_trait; -use jsonrpsee_core::client::{TransportReceiverT, TransportSenderT}; +use jsonrpsee_core::client::{ReceivedMessage, TransportReceiverT, TransportSenderT}; /// Web-sys transport error that can occur. #[derive(Debug, thiserror::Error)] @@ -52,6 +52,11 @@ impl TransportSenderT for Sender { Ok(()) } + async fn send_ping(&mut self) -> Result<(), Self::Error> { + tracing::trace!("send ping - not implemented for wasm"); + Ok(()) + } + async fn close(&mut self) -> Result<(), Error> { Ok(()) } @@ -61,17 +66,15 @@ impl TransportSenderT for Sender { impl TransportReceiverT for Receiver { type Error = Error; - async fn receive(&mut self) -> Result { + async fn receive(&mut self) -> Result { match self.0.next().await { Some(Ok(msg)) => { tracing::trace!("rx: {:?}", msg); - let txt = match msg { - Message::Bytes(bytes) => String::from_utf8(bytes).expect("WebSocket message is valid utf8; qed"), - Message::Text(txt) => txt, - }; - - Ok(txt) + match msg { + Message::Bytes(bytes) => Ok(ReceivedMessage::Bytes(bytes)), + Message::Text(txt) => Ok(ReceivedMessage::Data(txt)), + } } Some(Err(err)) => Err(Error::WebSocket(err)), None => Err(Error::SenderDisconnected), diff --git a/core/src/client/async_client/mod.rs b/core/src/client/async_client/mod.rs index 1e8d4f94f7..8427100022 100644 --- a/core/src/client/async_client/mod.rs +++ b/core/src/client/async_client/mod.rs @@ -403,6 +403,67 @@ async fn handle_backend_messages( sender: &mut S, max_notifs_per_subscription: usize, ) -> Result<(), Error> { + + // Handle raw messages of form `ReceivedMessage::Bytes` (Vec) or ReceivedMessage::Data` (String). + async fn handle_recv_message( + raw: &[u8], + manager: &mut RequestManager, + sender: &mut S, + max_notifs_per_subscription: usize + ) -> Result<(), Error> { + // Single response to a request. + if let Ok(single) = serde_json::from_slice::>(&raw) { + tracing::debug!("[backend]: recv method_call {:?}", single); + match process_single_response(manager, single, max_notifs_per_subscription) { + Ok(Some(unsub)) => { + stop_subscription(sender, manager, unsub).await; + } + Ok(None) => (), + Err(err) => return Err(err), + } + } + // Subscription response. + else if let Ok(response) = serde_json::from_slice::>(&raw) { + tracing::debug!("[backend]: recv subscription {:?}", response); + if let Err(Some(unsub)) = process_subscription_response(manager, response) { + let _ = stop_subscription(sender, manager, unsub).await; + } + } + // Subscription error response. + else if let Ok(response) = serde_json::from_slice::>(&raw) { + tracing::debug!("[backend]: recv subscription closed {:?}", response); + let _ = process_subscription_close_response(manager, response); + } + // Incoming Notification + else if let Ok(notif) = serde_json::from_slice::>(&raw) { + tracing::debug!("[backend]: recv notification {:?}", notif); + let _ = process_notification(manager, notif); + } + // Batch response. + else if let Ok(batch) = serde_json::from_slice::>>(&raw) { + tracing::debug!("[backend]: recv batch {:?}", batch); + if let Err(e) = process_batch_response(manager, batch) { + return Err(e); + } + } + // Error response + else if let Ok(err) = serde_json::from_slice::(&raw) { + tracing::debug!("[backend]: recv error response {:?}", err); + if let Err(e) = process_error_response(manager, err) { + return Err(e); + } + } + // Unparsable response + else { + tracing::debug!( + "[backend]: recv unparseable message: {:?}", + serde_json::from_slice::(&raw) + ); + return Err(Error::Custom("Unparsable response".into())); + } + Ok(()) + } + match message { Some(Ok(ReceivedMessage::Pong(pong_data))) => { // From WebSocket RFC:https://www.rfc-editor.org/rfc/rfc6455#section-5.5.3 @@ -411,57 +472,11 @@ async fn handle_backend_messages( tracing::debug!("[backend]: recv pong {:?}", pong_data); *ping_submitted = false; } + Some(Ok(ReceivedMessage::Bytes(raw))) => { + handle_recv_message(raw.as_ref(), manager, sender, max_notifs_per_subscription).await?; + } Some(Ok(ReceivedMessage::Data(raw))) => { - // Single response to a request. - if let Ok(single) = serde_json::from_str::>(&raw) { - tracing::debug!("[backend]: recv method_call {:?}", single); - match process_single_response(manager, single, max_notifs_per_subscription) { - Ok(Some(unsub)) => { - stop_subscription(sender, manager, unsub).await; - } - Ok(None) => (), - Err(err) => return Err(err), - } - } - // Subscription response. - else if let Ok(response) = serde_json::from_str::>(&raw) { - tracing::debug!("[backend]: recv subscription {:?}", response); - if let Err(Some(unsub)) = process_subscription_response(manager, response) { - let _ = stop_subscription(sender, manager, unsub).await; - } - } - // Subscription error response. - else if let Ok(response) = serde_json::from_str::>(&raw) { - tracing::debug!("[backend]: recv subscription closed {:?}", response); - let _ = process_subscription_close_response(manager, response); - } - // Incoming Notification - else if let Ok(notif) = serde_json::from_str::>(&raw) { - tracing::debug!("[backend]: recv notification {:?}", notif); - let _ = process_notification(manager, notif); - } - // Batch response. - else if let Ok(batch) = serde_json::from_str::>>(&raw) { - tracing::debug!("[backend]: recv batch {:?}", batch); - if let Err(e) = process_batch_response(manager, batch) { - return Err(e); - } - } - // Error response - else if let Ok(err) = serde_json::from_str::(&raw) { - tracing::debug!("[backend]: recv error response {:?}", err); - if let Err(e) = process_error_response(manager, err) { - return Err(e); - } - } - // Unparsable response - else { - tracing::debug!( - "[backend]: recv unparseable message: {:?}", - serde_json::from_str::(&raw) - ); - return Err(Error::Custom("Unparsable response".into())); - } + handle_recv_message(raw.as_ref(), manager, sender, max_notifs_per_subscription).await?; } Some(Err(e)) => { tracing::error!("Error: {:?} terminating client", e); diff --git a/core/src/client/mod.rs b/core/src/client/mod.rs index 6a0c5ba6bf..4cca63d324 100644 --- a/core/src/client/mod.rs +++ b/core/src/client/mod.rs @@ -147,6 +147,8 @@ pub trait TransportSenderT: MaybeSend + 'static { pub enum ReceivedMessage { /// Incoming packet contains plain `String` data. Data(String), + /// Incoming packet contains bytes. + Bytes(Vec), /// Incoming packet is a `Pong` frame sent in response to a `Ping`. Pong(Vec), } From cda4c09621605c0b01ef55ae16622632252fb3f0 Mon Sep 17 00:00:00 2001 From: Alexandru Vasile Date: Fri, 20 May 2022 19:03:40 +0300 Subject: [PATCH 20/38] Fix Cargo.toml feature Signed-off-by: Alexandru Vasile --- jsonrpsee/Cargo.toml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/jsonrpsee/Cargo.toml b/jsonrpsee/Cargo.toml index 59eb13139e..4cd40d0687 100644 --- a/jsonrpsee/Cargo.toml +++ b/jsonrpsee/Cargo.toml @@ -30,7 +30,7 @@ async-client = ["jsonrpsee-core/async-client"] http-client = ["jsonrpsee-http-client", "jsonrpsee-types", "jsonrpsee-core"] http-server = ["jsonrpsee-http-server", "jsonrpsee-types", "jsonrpsee-core"] wasm-client = ["jsonrpsee-wasm-client", "jsonrpsee-types", "jsonrpsee-core"] -ws-client = ["jsonrpsee-ws-client", "jsonrpsee-types", "jsonrpsee-core", "jsonrpsee-core/async-wasm-client"] +ws-client = ["jsonrpsee-ws-client", "jsonrpsee-types", "jsonrpsee-core"] ws-server = ["jsonrpsee-ws-server", "jsonrpsee-types", "jsonrpsee-core"] macros = ["jsonrpsee-proc-macros", "jsonrpsee-types", "jsonrpsee-core/client", "tracing"] From 13ee47a66e505a49f93d2a1db4e434f59b5f93a6 Mon Sep 17 00:00:00 2001 From: Alexandru Vasile Date: Fri, 20 May 2022 19:06:30 +0300 Subject: [PATCH 21/38] Panic when empty slice does not fit into `ByteSlice125` Signed-off-by: Alexandru Vasile --- client/transport/src/ws/mod.rs | 6 +----- 1 file changed, 1 insertion(+), 5 deletions(-) diff --git a/client/transport/src/ws/mod.rs b/client/transport/src/ws/mod.rs index e3fba72ca7..ffc510b2de 100644 --- a/client/transport/src/ws/mod.rs +++ b/client/transport/src/ws/mod.rs @@ -30,7 +30,6 @@ use std::io; use std::net::{SocketAddr, ToSocketAddrs}; use std::time::Duration; -use crate::ws::WsError::Custom; use futures_util::io::{BufReader, BufWriter}; use jsonrpsee_core::client::{CertificateStore, ReceivedMessage, TransportReceiverT, TransportSenderT}; use jsonrpsee_core::TEN_MB_SIZE_BYTES; @@ -183,9 +182,6 @@ pub enum WsError { /// Error in the WebSocket connection. #[error("WebSocket connection error: {0}")] Connection(#[source] soketto::connection::Error), - /// Custom error that originated before or after interacting with the WebSocket. - #[error("Custom error: {0}")] - Custom(String), } #[async_trait] @@ -208,7 +204,7 @@ impl TransportSenderT for Sender { // Submit empty slice as "optional" parameter. let slice: &[u8] = &[]; // Byte slice fails if the provided slice is larger than 125 bytes. - let byte_slice = ByteSlice125::try_from(slice).map_err(|err| Custom(err.clone().to_string().into()))?; + let byte_slice = ByteSlice125::try_from(slice).expect("Empty slice should fit into ByteSlice125"); self.inner.send_ping(byte_slice).await?; self.inner.flush().await?; From 6b40519cfc8e3fdb2f45d9398dccc5cbe65e6f92 Mon Sep 17 00:00:00 2001 From: Alexandru Vasile Date: Tue, 24 May 2022 14:53:44 +0300 Subject: [PATCH 22/38] wasm: Add operation not supported for pings Signed-off-by: Alexandru Vasile --- client/transport/src/web.rs | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/client/transport/src/web.rs b/client/transport/src/web.rs index 80fa779abf..7ddad1dc97 100644 --- a/client/transport/src/web.rs +++ b/client/transport/src/web.rs @@ -22,6 +22,9 @@ pub enum Error { /// WebSocket error #[error("WebSocket Error: {0:?}")] WebSocket(WebSocketError), + /// Operation not supported + #[error("Operation not supported")] + NotSupported, } /// Sender. @@ -54,7 +57,7 @@ impl TransportSenderT for Sender { async fn send_ping(&mut self) -> Result<(), Self::Error> { tracing::trace!("send ping - not implemented for wasm"); - Ok(()) + Err(Error::NotSupported) } async fn close(&mut self) -> Result<(), Error> { From 537627f3f5e7be81e7e2357e363bd3fe704c8749 Mon Sep 17 00:00:00 2001 From: Alexandru Vasile Date: Tue, 24 May 2022 14:55:41 +0300 Subject: [PATCH 23/38] Rename `ReceivedMessage` from Data to Text Signed-off-by: Alexandru Vasile --- client/transport/src/web.rs | 2 +- client/transport/src/ws/mod.rs | 2 +- core/src/client/async_client/mod.rs | 2 +- core/src/client/mod.rs | 10 +++++----- wasm-tests/tests/wasm.rs | 2 +- 5 files changed, 9 insertions(+), 9 deletions(-) diff --git a/client/transport/src/web.rs b/client/transport/src/web.rs index 7ddad1dc97..7ac7fd228f 100644 --- a/client/transport/src/web.rs +++ b/client/transport/src/web.rs @@ -76,7 +76,7 @@ impl TransportReceiverT for Receiver { match msg { Message::Bytes(bytes) => Ok(ReceivedMessage::Bytes(bytes)), - Message::Text(txt) => Ok(ReceivedMessage::Data(txt)), + Message::Text(txt) => Ok(ReceivedMessage::Text(txt)), } } Some(Err(err)) => Err(Error::WebSocket(err)), diff --git a/client/transport/src/ws/mod.rs b/client/transport/src/ws/mod.rs index ffc510b2de..5c440204b8 100644 --- a/client/transport/src/ws/mod.rs +++ b/client/transport/src/ws/mod.rs @@ -230,7 +230,7 @@ impl TransportReceiverT for Receiver { if let Incoming::Data(_) = recv { let s = String::from_utf8(message).map_err(|err| WsError::Connection(Utf8(err.utf8_error())))?; - return Ok(ReceivedMessage::Data(s)); + return Ok(ReceivedMessage::Text(s)); } else if let Incoming::Pong(pong_data) = recv { return Ok(ReceivedMessage::Pong(Vec::from(pong_data))); } diff --git a/core/src/client/async_client/mod.rs b/core/src/client/async_client/mod.rs index 8427100022..2d598c70f1 100644 --- a/core/src/client/async_client/mod.rs +++ b/core/src/client/async_client/mod.rs @@ -475,7 +475,7 @@ async fn handle_backend_messages( Some(Ok(ReceivedMessage::Bytes(raw))) => { handle_recv_message(raw.as_ref(), manager, sender, max_notifs_per_subscription).await?; } - Some(Ok(ReceivedMessage::Data(raw))) => { + Some(Ok(ReceivedMessage::Text(raw))) => { handle_recv_message(raw.as_ref(), manager, sender, max_notifs_per_subscription).await?; } Some(Err(e)) => { diff --git a/core/src/client/mod.rs b/core/src/client/mod.rs index 4cca63d324..3c2882a161 100644 --- a/core/src/client/mod.rs +++ b/core/src/client/mod.rs @@ -146,11 +146,11 @@ pub trait TransportSenderT: MaybeSend + 'static { #[derive(Debug)] pub enum ReceivedMessage { /// Incoming packet contains plain `String` data. - Data(String), - /// Incoming packet contains bytes. - Bytes(Vec), - /// Incoming packet is a `Pong` frame sent in response to a `Ping`. - Pong(Vec), + Text(String), + /// Incoming packet contains bytes. + Bytes(Vec), + /// Incoming packet is a `Pong` frame sent in response to a `Ping`. + Pong(Vec), } /// Transport interface to receive data asynchronous. diff --git a/wasm-tests/tests/wasm.rs b/wasm-tests/tests/wasm.rs index 9b5c0326d7..f761f5540a 100644 --- a/wasm-tests/tests/wasm.rs +++ b/wasm-tests/tests/wasm.rs @@ -29,7 +29,7 @@ async fn wasm_ws_transport_works() { let rp: ReceivedMessage = rx.receive().await.unwrap(); match rp { - ReceivedMessage::Data(str) => assert_eq!(exp, &str), + ReceivedMessage::Text(str) => assert_eq!(exp, &str), _ => assert!(false, "Expected string message"), }; } From 2a2787f30be8f02172d2634ae450040b18a2eeb7 Mon Sep 17 00:00:00 2001 From: Alexandru Vasile Date: Tue, 24 May 2022 14:57:05 +0300 Subject: [PATCH 24/38] Rename test variable Signed-off-by: Alexandru Vasile --- wasm-tests/tests/wasm.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/wasm-tests/tests/wasm.rs b/wasm-tests/tests/wasm.rs index f761f5540a..8d90ff687f 100644 --- a/wasm-tests/tests/wasm.rs +++ b/wasm-tests/tests/wasm.rs @@ -29,7 +29,7 @@ async fn wasm_ws_transport_works() { let rp: ReceivedMessage = rx.receive().await.unwrap(); match rp { - ReceivedMessage::Text(str) => assert_eq!(exp, &str), + ReceivedMessage::Text(s) => assert_eq!(exp, &s), _ => assert!(false, "Expected string message"), }; } From a9bed1e76f8747ffbbb1d7f65483c3d3d7fe00f0 Mon Sep 17 00:00:00 2001 From: Alexandru Vasile Date: Tue, 24 May 2022 15:08:53 +0300 Subject: [PATCH 25/38] Add documentation Signed-off-by: Alexandru Vasile --- core/src/client/async_client/mod.rs | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/core/src/client/async_client/mod.rs b/core/src/client/async_client/mod.rs index 2d598c70f1..d33dab9005 100644 --- a/core/src/client/async_client/mod.rs +++ b/core/src/client/async_client/mod.rs @@ -120,6 +120,8 @@ impl ClientBuilder { /// Set the interval at which pings are submitted (disabled by default). /// + /// The Ping interval should be larger than the time expected for receiving a Pong frame. + /// /// Note: The interval duration is restarted when /// - submitted frontend command /// - received backend reply @@ -620,9 +622,12 @@ async fn background_task( let next_backend = backend_event.next(); futures_util::pin_mut!(next_frontend, next_backend); + // Create either a valid delay fuse triggered every provided `duration`, + // or create a terminated fuse that's never selected if the provided `duration` is None. let mut submit_ping = if let Some(duration) = ping_interval { Delay::new(duration).fuse() } else { + // The select macro bypasses terminated futures, and the `submit_ping` branch is never selected. Fuse::::terminated() }; From fb7ea4d17fae68008e3aa8f0053d2af4755cbe13 Mon Sep 17 00:00:00 2001 From: Alexandru Vasile Date: Thu, 26 May 2022 19:24:57 +0300 Subject: [PATCH 26/38] client: Use `future::select` for cancel safety Signed-off-by: Alexandru Vasile --- core/src/client/async_client/mod.rs | 48 ++++++++++++++--------------- 1 file changed, 24 insertions(+), 24 deletions(-) diff --git a/core/src/client/async_client/mod.rs b/core/src/client/async_client/mod.rs index d33dab9005..1565d8f2e2 100644 --- a/core/src/client/async_client/mod.rs +++ b/core/src/client/async_client/mod.rs @@ -21,7 +21,6 @@ use async_trait::async_trait; use futures_channel::{mpsc, oneshot}; use futures_timer::Delay; use futures_util::future::{self, Either, Fuse}; -use futures_util::select_biased; use futures_util::sink::SinkExt; use futures_util::stream::StreamExt; use futures_util::FutureExt; @@ -614,32 +613,38 @@ async fn background_task( let res = receiver.receive().await; Some((res, receiver)) }); - futures_util::pin_mut!(backend_event); - loop { - let next_frontend = frontend.next(); - let next_backend = backend_event.next(); - futures_util::pin_mut!(next_frontend, next_backend); + // Place frontend and backend messages into their own select. + // This implies that either messages are received (both front or backend), + // or submit ping timer expires (if provided). + let next_frontend = frontend.next(); + let next_backend = backend_event.next(); + let mut message_fut = future::select(next_frontend, next_backend); + loop { // Create either a valid delay fuse triggered every provided `duration`, // or create a terminated fuse that's never selected if the provided `duration` is None. - let mut submit_ping = if let Some(duration) = ping_interval { + let submit_ping = if let Some(duration) = ping_interval { Delay::new(duration).fuse() } else { // The select macro bypasses terminated futures, and the `submit_ping` branch is never selected. Fuse::::terminated() }; - select_biased! { - frontend_value = next_frontend => { + match future::select(message_fut, submit_ping).await { + // Message received from the frontend. + Either::Left((Either::Left((frontend_value, backend)), _)) => { if let Err(err) = handle_frontend_messages(frontend_value, &mut manager, &mut sender, max_notifs_per_subscription).await { tracing::warn!("{:?}", err); let _ = front_error.send(err); break; } - }, - backend_value = next_backend => { + // Advance frontend, save backend. + message_fut = future::select(frontend.next(), backend); + } + // Message received from the backend. + Either::Left((Either::Right((backend_value, frontend )), _))=> { if let Err(err) = handle_backend_messages::( backend_value, &mut ping_submitted, &mut manager, &mut sender, max_notifs_per_subscription ).await { @@ -647,25 +652,20 @@ async fn background_task( let _ = front_error.send(err); break; } - }, - _ = submit_ping => { - // Ping was already submitted. - // No activity from frontend, backend (replies or pong) for a duration of `ping_interval`. - if ping_submitted { - let _ = front_error.send(Error::Custom("Did not receive a pong or activity in due time".into())); - break; - } - + // Advance backend, save frontend. + message_fut = future::select(frontend, backend_event.next()); + } + // Submit ping interval was triggered. + Either::Right((_, next_message_fut)) => { tracing::trace!("[backend]: submit ping"); if let Err(e) = sender.send_ping().await { tracing::warn!("[backend]: client send ping failed: {:?}", e); let _ = front_error.send(Error::Custom("Could not send ping frame".into())); break; } - - ping_submitted = true; - }, - } + message_fut = next_message_fut; + } + }; } // Send close message to the server. let _ = sender.close().await; From 736da11e879eee1cc32381b275d528428cf0c738 Mon Sep 17 00:00:00 2001 From: Alexandru Vasile Date: Thu, 26 May 2022 19:37:25 +0300 Subject: [PATCH 27/38] client: Remove `pong` handling logic Signed-off-by: Alexandru Vasile --- client/transport/src/ws/mod.rs | 13 +++++-------- core/src/client/async_client/mod.rs | 15 +-------------- core/src/client/mod.rs | 4 +--- 3 files changed, 7 insertions(+), 25 deletions(-) diff --git a/client/transport/src/ws/mod.rs b/client/transport/src/ws/mod.rs index 5c440204b8..cc7dc32b20 100644 --- a/client/transport/src/ws/mod.rs +++ b/client/transport/src/ws/mod.rs @@ -37,7 +37,7 @@ use jsonrpsee_core::{async_trait, Cow}; use soketto::connection::Error::Utf8; use soketto::data::ByteSlice125; use soketto::handshake::client::{Client as WsHandshakeClient, ServerResponse}; -use soketto::{connection, Incoming}; +use soketto::{connection, Data}; use stream::EitherStream; use thiserror::Error; use tokio::net::TcpStream; @@ -225,15 +225,12 @@ impl TransportReceiverT for Receiver { async fn receive(&mut self) -> Result { let mut message = Vec::new(); - loop { - let recv = self.inner.receive(&mut message).await?; - - if let Incoming::Data(_) = recv { + match self.inner.receive_data(&mut message).await? { + Data::Text(_) => { let s = String::from_utf8(message).map_err(|err| WsError::Connection(Utf8(err.utf8_error())))?; - return Ok(ReceivedMessage::Text(s)); - } else if let Incoming::Pong(pong_data) = recv { - return Ok(ReceivedMessage::Pong(Vec::from(pong_data))); + Ok(ReceivedMessage::Text(s)) } + Data::Binary(_) => Ok(ReceivedMessage::Bytes(message)), } } } diff --git a/core/src/client/async_client/mod.rs b/core/src/client/async_client/mod.rs index 1565d8f2e2..5a284f2f87 100644 --- a/core/src/client/async_client/mod.rs +++ b/core/src/client/async_client/mod.rs @@ -399,7 +399,6 @@ impl SubscriptionClientT for Client { /// Returns error if the main background loop should be terminated. async fn handle_backend_messages( message: Option>, - ping_submitted: &mut bool, manager: &mut RequestManager, sender: &mut S, max_notifs_per_subscription: usize, @@ -466,13 +465,6 @@ async fn handle_backend_messages( } match message { - Some(Ok(ReceivedMessage::Pong(pong_data))) => { - // From WebSocket RFC:https://www.rfc-editor.org/rfc/rfc6455#section-5.5.3 - // A `Pong` frame may be send unsolicited. - // Set just the ping submitted state to allow further pinging. - tracing::debug!("[backend]: recv pong {:?}", pong_data); - *ping_submitted = false; - } Some(Ok(ReceivedMessage::Bytes(raw))) => { handle_recv_message(raw.as_ref(), manager, sender, max_notifs_per_subscription).await?; } @@ -604,11 +596,6 @@ async fn background_task( { let mut manager = RequestManager::new(); - // Flag has the following meaning: - // - true if the ping was submitted. - // - false if the ping was not submitted, or a pong reply was received. - let mut ping_submitted = false; - let backend_event = futures_util::stream::unfold(receiver, |mut receiver| async { let res = receiver.receive().await; Some((res, receiver)) @@ -646,7 +633,7 @@ async fn background_task( // Message received from the backend. Either::Left((Either::Right((backend_value, frontend )), _))=> { if let Err(err) = handle_backend_messages::( - backend_value, &mut ping_submitted, &mut manager, &mut sender, max_notifs_per_subscription + backend_value, &mut manager, &mut sender, max_notifs_per_subscription ).await { tracing::warn!("{:?}", err); let _ = front_error.send(err); diff --git a/core/src/client/mod.rs b/core/src/client/mod.rs index 3c2882a161..2141d1306a 100644 --- a/core/src/client/mod.rs +++ b/core/src/client/mod.rs @@ -142,15 +142,13 @@ pub trait TransportSenderT: MaybeSend + 'static { } /// Message type received from the RPC server. -/// It can either be plain `String` data, or a `Pong` reply to a previously submitted `Ping`. +/// It can either be plain text data, or bytes. #[derive(Debug)] pub enum ReceivedMessage { /// Incoming packet contains plain `String` data. Text(String), /// Incoming packet contains bytes. Bytes(Vec), - /// Incoming packet is a `Pong` frame sent in response to a `Ping`. - Pong(Vec), } /// Transport interface to receive data asynchronous. From b262442bb39e0171ae3f3f173923eae87d2a5d4d Mon Sep 17 00:00:00 2001 From: Alexandru Vasile Date: Thu, 26 May 2022 20:28:56 +0300 Subject: [PATCH 28/38] client: Update ping documentation Signed-off-by: Alexandru Vasile --- core/src/client/async_client/mod.rs | 14 +++++++++----- 1 file changed, 9 insertions(+), 5 deletions(-) diff --git a/core/src/client/async_client/mod.rs b/core/src/client/async_client/mod.rs index 5a284f2f87..b4cd1d4a7a 100644 --- a/core/src/client/async_client/mod.rs +++ b/core/src/client/async_client/mod.rs @@ -117,14 +117,18 @@ impl ClientBuilder { self } - /// Set the interval at which pings are submitted (disabled by default). + /// Set the interval at which pings frames are submitted (disabled by default). /// - /// The Ping interval should be larger than the time expected for receiving a Pong frame. + /// Periodically submitting pings at a defined interval has mainly two benefits: + /// - Directly, it acts as a "keep-alive" alternative in the WebSocket world. + /// - Indirectly by inspecting trace logs, ensures that the endpoint is still responding to messages. + /// + /// The underlying implementation does not make any assumptions about at which intervals pongs are received. /// /// Note: The interval duration is restarted when - /// - submitted frontend command - /// - received backend reply - /// - submitted ping + /// - a frontend command is submitted + /// - a reply is received from the backend + /// - the interval duration expires pub fn ping_interval(mut self, interval: Duration) -> Self { self.ping_interval = Some(interval); self From 2d6b04c33e657486a85dc6585b5981afab09593b Mon Sep 17 00:00:00 2001 From: Alexandru Vasile <60601340+lexnv@users.noreply.github.com> Date: Fri, 27 May 2022 12:34:16 +0300 Subject: [PATCH 29/38] Update core/src/client/async_client/mod.rs Co-authored-by: Tarik Gul <47201679+TarikGul@users.noreply.github.com> --- core/src/client/async_client/mod.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/src/client/async_client/mod.rs b/core/src/client/async_client/mod.rs index b4cd1d4a7a..b617c2766e 100644 --- a/core/src/client/async_client/mod.rs +++ b/core/src/client/async_client/mod.rs @@ -121,7 +121,7 @@ impl ClientBuilder { /// /// Periodically submitting pings at a defined interval has mainly two benefits: /// - Directly, it acts as a "keep-alive" alternative in the WebSocket world. - /// - Indirectly by inspecting trace logs, ensures that the endpoint is still responding to messages. + /// - Indirectly by inspecting trace logs, it ensures that the endpoint is still responding to messages. /// /// The underlying implementation does not make any assumptions about at which intervals pongs are received. /// From 24b5d67fe959b6a331c93ed8066ca4fbf62db8ad Mon Sep 17 00:00:00 2001 From: Alexandru Vasile <60601340+lexnv@users.noreply.github.com> Date: Fri, 27 May 2022 12:34:23 +0300 Subject: [PATCH 30/38] Update core/src/client/async_client/mod.rs Co-authored-by: Tarik Gul <47201679+TarikGul@users.noreply.github.com> --- core/src/client/async_client/mod.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/src/client/async_client/mod.rs b/core/src/client/async_client/mod.rs index b617c2766e..64aa0f13db 100644 --- a/core/src/client/async_client/mod.rs +++ b/core/src/client/async_client/mod.rs @@ -400,7 +400,7 @@ impl SubscriptionClientT for Client { /// Handle backend messages. /// -/// Returns error if the main background loop should be terminated. +/// Returns an error if the main background loop should be terminated. async fn handle_backend_messages( message: Option>, manager: &mut RequestManager, From 839ec65427d41556ce61055280d90df595d28394 Mon Sep 17 00:00:00 2001 From: Alexandru Vasile <60601340+lexnv@users.noreply.github.com> Date: Fri, 27 May 2022 12:34:28 +0300 Subject: [PATCH 31/38] Update core/src/client/async_client/mod.rs Co-authored-by: Tarik Gul <47201679+TarikGul@users.noreply.github.com> --- core/src/client/async_client/mod.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/src/client/async_client/mod.rs b/core/src/client/async_client/mod.rs index 64aa0f13db..1319611631 100644 --- a/core/src/client/async_client/mod.rs +++ b/core/src/client/async_client/mod.rs @@ -490,7 +490,7 @@ async fn handle_backend_messages( /// Handle frontend messages. /// -/// Returns error if the main background loop should be terminated. +/// Returns an error if the main background loop should be terminated. async fn handle_frontend_messages( message: Option, manager: &mut RequestManager, From 8a1462ab69fbdd65102dbf5f3d3e176241318bbd Mon Sep 17 00:00:00 2001 From: Alexandru Vasile <60601340+lexnv@users.noreply.github.com> Date: Fri, 27 May 2022 12:35:00 +0300 Subject: [PATCH 32/38] Update core/src/client/async_client/mod.rs Co-authored-by: Tarik Gul <47201679+TarikGul@users.noreply.github.com> --- core/src/client/async_client/mod.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/src/client/async_client/mod.rs b/core/src/client/async_client/mod.rs index 1319611631..503dee6752 100644 --- a/core/src/client/async_client/mod.rs +++ b/core/src/client/async_client/mod.rs @@ -608,7 +608,7 @@ async fn background_task( // Place frontend and backend messages into their own select. // This implies that either messages are received (both front or backend), - // or submit ping timer expires (if provided). + // or the submitted ping timer expires (if provided). let next_frontend = frontend.next(); let next_backend = backend_event.next(); let mut message_fut = future::select(next_frontend, next_backend); From ba14af9513c06585318e775b7b4cfb53ded23b87 Mon Sep 17 00:00:00 2001 From: Alexandru Vasile <60601340+lexnv@users.noreply.github.com> Date: Fri, 27 May 2022 12:39:57 +0300 Subject: [PATCH 33/38] Update core/src/client/async_client/mod.rs Co-authored-by: Tarik Gul <47201679+TarikGul@users.noreply.github.com> --- core/src/client/async_client/mod.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/src/client/async_client/mod.rs b/core/src/client/async_client/mod.rs index 503dee6752..cf48ef7ccd 100644 --- a/core/src/client/async_client/mod.rs +++ b/core/src/client/async_client/mod.rs @@ -635,7 +635,7 @@ async fn background_task( message_fut = future::select(frontend.next(), backend); } // Message received from the backend. - Either::Left((Either::Right((backend_value, frontend )), _))=> { + Either::Left((Either::Right((backend_value, frontend)), _))=> { if let Err(err) = handle_backend_messages::( backend_value, &mut manager, &mut sender, max_notifs_per_subscription ).await { From b79c64fc904afaff8603fc61e62afbb19fee5367 Mon Sep 17 00:00:00 2001 From: Alexandru Vasile <60601340+lexnv@users.noreply.github.com> Date: Fri, 27 May 2022 12:52:00 +0300 Subject: [PATCH 34/38] Update core/Cargo.toml Co-authored-by: Niklas Adolfsson --- core/Cargo.toml | 1 - 1 file changed, 1 deletion(-) diff --git a/core/Cargo.toml b/core/Cargo.toml index f46f578b56..9556600b0a 100644 --- a/core/Cargo.toml +++ b/core/Cargo.toml @@ -63,7 +63,6 @@ async-wasm-client = [ "rustc-hash/std", "futures-timer/wasm-bindgen", "tracing", - "futures-util/async-await-macro", ] [dev-dependencies] From e615af583c020c56577ccbff332047ff25b76981 Mon Sep 17 00:00:00 2001 From: Alexandru Vasile <60601340+lexnv@users.noreply.github.com> Date: Fri, 27 May 2022 12:52:09 +0300 Subject: [PATCH 35/38] Update core/Cargo.toml Co-authored-by: Niklas Adolfsson --- core/Cargo.toml | 1 - 1 file changed, 1 deletion(-) diff --git a/core/Cargo.toml b/core/Cargo.toml index 9556600b0a..31690b12ed 100644 --- a/core/Cargo.toml +++ b/core/Cargo.toml @@ -54,7 +54,6 @@ async-client = [ "tokio/sync", "tracing", "futures-timer", - "futures-util/async-await-macro", ] async-wasm-client = [ "async-lock", From baab48f78dcc02e02ffa3c1b50fa10e0578eab9a Mon Sep 17 00:00:00 2001 From: Alexandru Vasile Date: Fri, 27 May 2022 13:03:46 +0300 Subject: [PATCH 36/38] logs: Keep debug log for submitting `Ping` frames Signed-off-by: Alexandru Vasile --- core/src/client/async_client/mod.rs | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/core/src/client/async_client/mod.rs b/core/src/client/async_client/mod.rs index cf48ef7ccd..1a9270e580 100644 --- a/core/src/client/async_client/mod.rs +++ b/core/src/client/async_client/mod.rs @@ -646,9 +646,8 @@ async fn background_task( // Advance backend, save frontend. message_fut = future::select(frontend, backend_event.next()); } - // Submit ping interval was triggered. + // Submit ping interval was triggered if enabled. Either::Right((_, next_message_fut)) => { - tracing::trace!("[backend]: submit ping"); if let Err(e) = sender.send_ping().await { tracing::warn!("[backend]: client send ping failed: {:?}", e); let _ = front_error.send(Error::Custom("Could not send ping frame".into())); From 1738722a1d349c23e72fc8f44fb26d280acafaab Mon Sep 17 00:00:00 2001 From: Alexandru Vasile Date: Fri, 27 May 2022 13:13:34 +0300 Subject: [PATCH 37/38] Print debug logs when receiving `Pong` frames Signed-off-by: Alexandru Vasile --- client/transport/src/ws/mod.rs | 21 +++++++++++++-------- core/src/client/async_client/mod.rs | 5 ++++- core/src/client/mod.rs | 4 +++- 3 files changed, 20 insertions(+), 10 deletions(-) diff --git a/client/transport/src/ws/mod.rs b/client/transport/src/ws/mod.rs index cc7dc32b20..195f5aa200 100644 --- a/client/transport/src/ws/mod.rs +++ b/client/transport/src/ws/mod.rs @@ -37,7 +37,7 @@ use jsonrpsee_core::{async_trait, Cow}; use soketto::connection::Error::Utf8; use soketto::data::ByteSlice125; use soketto::handshake::client::{Client as WsHandshakeClient, ServerResponse}; -use soketto::{connection, Data}; +use soketto::{connection, Data, Incoming}; use stream::EitherStream; use thiserror::Error; use tokio::net::TcpStream; @@ -223,14 +223,19 @@ impl TransportReceiverT for Receiver { /// Returns a `Future` resolving when the server sent us something back. async fn receive(&mut self) -> Result { - let mut message = Vec::new(); - - match self.inner.receive_data(&mut message).await? { - Data::Text(_) => { - let s = String::from_utf8(message).map_err(|err| WsError::Connection(Utf8(err.utf8_error())))?; - Ok(ReceivedMessage::Text(s)) + loop { + let mut message = Vec::new(); + let recv = self.inner.receive(&mut message).await?; + + match recv { + Incoming::Data(Data::Text(_)) => { + let s = String::from_utf8(message).map_err(|err| WsError::Connection(Utf8(err.utf8_error())))?; + break Ok(ReceivedMessage::Text(s)); + } + Incoming::Data(Data::Binary(_)) => break Ok(ReceivedMessage::Bytes(message)), + Incoming::Pong(_) => break Ok(ReceivedMessage::Pong), + _ => continue, } - Data::Binary(_) => Ok(ReceivedMessage::Bytes(message)), } } } diff --git a/core/src/client/async_client/mod.rs b/core/src/client/async_client/mod.rs index 1a9270e580..2592996f10 100644 --- a/core/src/client/async_client/mod.rs +++ b/core/src/client/async_client/mod.rs @@ -121,7 +121,7 @@ impl ClientBuilder { /// /// Periodically submitting pings at a defined interval has mainly two benefits: /// - Directly, it acts as a "keep-alive" alternative in the WebSocket world. - /// - Indirectly by inspecting trace logs, it ensures that the endpoint is still responding to messages. + /// - Indirectly by inspecting debug logs, it ensures that the endpoint is still responding to messages. /// /// The underlying implementation does not make any assumptions about at which intervals pongs are received. /// @@ -469,6 +469,9 @@ async fn handle_backend_messages( } match message { + Some(Ok(ReceivedMessage::Pong)) => { + tracing::debug!("recv pong"); + } Some(Ok(ReceivedMessage::Bytes(raw))) => { handle_recv_message(raw.as_ref(), manager, sender, max_notifs_per_subscription).await?; } diff --git a/core/src/client/mod.rs b/core/src/client/mod.rs index 2141d1306a..6af11027d4 100644 --- a/core/src/client/mod.rs +++ b/core/src/client/mod.rs @@ -142,13 +142,15 @@ pub trait TransportSenderT: MaybeSend + 'static { } /// Message type received from the RPC server. -/// It can either be plain text data, or bytes. +/// It can either be plain text data, bytes, or `Pong` messages. #[derive(Debug)] pub enum ReceivedMessage { /// Incoming packet contains plain `String` data. Text(String), /// Incoming packet contains bytes. Bytes(Vec), + /// Incoming `Pong` frame as a reply to a previously submitted `Ping` frame. + Pong, } /// Transport interface to receive data asynchronous. From c6610654d9ca6d142eb68a1eb5733eff5d5516bb Mon Sep 17 00:00:00 2001 From: Niklas Adolfsson Date: Wed, 1 Jun 2022 15:36:22 +0200 Subject: [PATCH 38/38] Update core/src/client/async_client/mod.rs --- core/src/client/async_client/mod.rs | 1 - 1 file changed, 1 deletion(-) diff --git a/core/src/client/async_client/mod.rs b/core/src/client/async_client/mod.rs index 2592996f10..5376364b7e 100644 --- a/core/src/client/async_client/mod.rs +++ b/core/src/client/async_client/mod.rs @@ -509,7 +509,6 @@ async fn handle_frontend_messages( Some(FrontToBack::Batch(batch)) => { tracing::trace!("[backend]: client prepares to send batch request: {:?}", batch.raw); - // NOTE(niklasad1): annoying allocation. if let Err(send_back) = manager.insert_pending_batch(batch.ids.clone(), batch.send_back) { tracing::warn!("[backend]: batch request: {:?} already pending", batch.ids); let _ = send_back.send(Err(Error::InvalidRequestId));