Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement ping-pong for WebSocket clients #772

Merged
merged 40 commits into from
Jun 1, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
40 commits
Select commit Hold shift + click to select a range
1162d3e
ws: Implement ping for `TransportSenderT` trait
lexnv May 17, 2022
2a76701
ws/client: Receive pong frames
lexnv May 17, 2022
80af45a
core/client: Use `select!` macro for the background task
lexnv May 17, 2022
63ba7e7
client: Propagate ping interval to background task
lexnv May 17, 2022
0d3534c
async_client: Submit ping requests
lexnv May 17, 2022
23ad9b9
async_client: Handle pong replies
lexnv May 17, 2022
48b5db9
client: Handle frontend messages to dedicated fn
lexnv May 19, 2022
c86b08e
client: Handle backend messages in dedicated fn
lexnv May 19, 2022
ae5bf75
client: Add terminated fuse for opt-out pings
lexnv May 19, 2022
0ee78c5
Set opt-out behavior for client pings
lexnv May 19, 2022
8e82ad1
Merge remote-tracking branch 'origin/master' into 738_ping_pong
lexnv May 19, 2022
e7c6edb
client: Move imports
lexnv May 20, 2022
9c5f235
client: Handle handle_frontend_messages errors
lexnv May 20, 2022
8e3ff40
client: Add custom error related to byteslice conversions
lexnv May 20, 2022
7d39b6f
client: Modify `send_ping` to send empty slices
lexnv May 20, 2022
64a7b99
Fix `cargo hack check` and use `select_biased`
lexnv May 20, 2022
1559554
Handle sending pings with lowest priority
lexnv May 20, 2022
5ba4d0e
core: Add proper number of params to `background_task`
lexnv May 20, 2022
aed7d26
Fix wasm client
lexnv May 20, 2022
48d6eec
Handle raw bytes and string received messages
lexnv May 20, 2022
cda4c09
Fix Cargo.toml feature
lexnv May 20, 2022
13ee47a
Panic when empty slice does not fit into `ByteSlice125`
lexnv May 20, 2022
6b40519
wasm: Add operation not supported for pings
lexnv May 24, 2022
537627f
Rename `ReceivedMessage` from Data to Text
lexnv May 24, 2022
2a2787f
Rename test variable
lexnv May 24, 2022
a9bed1e
Add documentation
lexnv May 24, 2022
4861c14
Merge remote-tracking branch 'origin/master' into 738_ping_pong
lexnv May 24, 2022
fb7ea4d
client: Use `future::select` for cancel safety
lexnv May 26, 2022
736da11
client: Remove `pong` handling logic
lexnv May 26, 2022
b262442
client: Update ping documentation
lexnv May 26, 2022
2d6b04c
Update core/src/client/async_client/mod.rs
lexnv May 27, 2022
24b5d67
Update core/src/client/async_client/mod.rs
lexnv May 27, 2022
839ec65
Update core/src/client/async_client/mod.rs
lexnv May 27, 2022
8a1462a
Update core/src/client/async_client/mod.rs
lexnv May 27, 2022
ba14af9
Update core/src/client/async_client/mod.rs
lexnv May 27, 2022
b79c64f
Update core/Cargo.toml
lexnv May 27, 2022
e615af5
Update core/Cargo.toml
lexnv May 27, 2022
baab48f
logs: Keep debug log for submitting `Ping` frames
lexnv May 27, 2022
1738722
Print debug logs when receiving `Pong` frames
lexnv May 27, 2022
c661065
Update core/src/client/async_client/mod.rs
niklasad1 Jun 1, 2022
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 14 additions & 8 deletions client/transport/src/web.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ use futures_util::sink::SinkExt;
use futures_util::stream::{SplitSink, SplitStream, StreamExt};
use gloo_net::websocket::{futures::WebSocket, Message, WebSocketError};
use jsonrpsee_core::async_trait;
use jsonrpsee_core::client::{TransportReceiverT, TransportSenderT};
use jsonrpsee_core::client::{ReceivedMessage, TransportReceiverT, TransportSenderT};

/// Web-sys transport error that can occur.
#[derive(Debug, thiserror::Error)]
Expand All @@ -22,6 +22,9 @@ pub enum Error {
/// WebSocket error
#[error("WebSocket Error: {0:?}")]
WebSocket(WebSocketError),
/// Operation not supported
#[error("Operation not supported")]
NotSupported,
}

/// Sender.
Expand Down Expand Up @@ -52,6 +55,11 @@ impl TransportSenderT for Sender {
Ok(())
}

async fn send_ping(&mut self) -> Result<(), Self::Error> {
tracing::trace!("send ping - not implemented for wasm");
Err(Error::NotSupported)
}

async fn close(&mut self) -> Result<(), Error> {
Ok(())
}
Expand All @@ -61,17 +69,15 @@ impl TransportSenderT for Sender {
impl TransportReceiverT for Receiver {
type Error = Error;

async fn receive(&mut self) -> Result<String, Self::Error> {
async fn receive(&mut self) -> Result<ReceivedMessage, Self::Error> {
match self.0.next().await {
Some(Ok(msg)) => {
tracing::trace!("rx: {:?}", msg);

let txt = match msg {
Message::Bytes(bytes) => String::from_utf8(bytes).expect("WebSocket message is valid utf8; qed"),
Message::Text(txt) => txt,
};

Ok(txt)
match msg {
Message::Bytes(bytes) => Ok(ReceivedMessage::Bytes(bytes)),
Message::Text(txt) => Ok(ReceivedMessage::Text(txt)),
}
}
Some(Err(err)) => Err(Error::WebSocket(err)),
None => Err(Error::SenderDisconnected),
Expand Down
40 changes: 33 additions & 7 deletions client/transport/src/ws/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,11 +31,13 @@ use std::net::{SocketAddr, ToSocketAddrs};
use std::time::Duration;

use futures_util::io::{BufReader, BufWriter};
use jsonrpsee_core::client::{CertificateStore, TransportReceiverT, TransportSenderT};
use jsonrpsee_core::client::{CertificateStore, ReceivedMessage, TransportReceiverT, TransportSenderT};
use jsonrpsee_core::TEN_MB_SIZE_BYTES;
use jsonrpsee_core::{async_trait, Cow};
use soketto::connection;
use soketto::connection::Error::Utf8;
use soketto::data::ByteSlice125;
use soketto::handshake::client::{Client as WsHandshakeClient, ServerResponse};
use soketto::{connection, Data, Incoming};
use stream::EitherStream;
use thiserror::Error;
use tokio::net::TcpStream;
Expand Down Expand Up @@ -195,6 +197,20 @@ impl TransportSenderT for Sender {
Ok(())
}

/// Sends out a ping request. Returns a `Future` that finishes when the request has been
/// successfully sent.
Copy link
Collaborator

@jsdw jsdw May 27, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since we've been talking about cancel safety a bit, and not really relaetd to this PR, but do you reckon it is worth adding an issue to add docs to functions to note when they aren't cancel safe @niklasad1?

(or maybe almost no methods are cancel safe, and we state that up front and then note which ones are, perhaps)

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sounds like a good idea.

async fn send_ping(&mut self) -> Result<(), Self::Error> {
tracing::debug!("send ping");
// Submit empty slice as "optional" parameter.
let slice: &[u8] = &[];
// Byte slice fails if the provided slice is larger than 125 bytes.
let byte_slice = ByteSlice125::try_from(slice).expect("Empty slice should fit into ByteSlice125");

self.inner.send_ping(byte_slice).await?;
self.inner.flush().await?;
Ok(())
}

/// Send a close message and close the connection.
async fn close(&mut self) -> Result<(), WsError> {
self.inner.close().await.map_err(Into::into)
Expand All @@ -206,11 +222,21 @@ impl TransportReceiverT for Receiver {
type Error = WsError;

/// Returns a `Future` resolving when the server sent us something back.
async fn receive(&mut self) -> Result<String, Self::Error> {
let mut message = Vec::new();
self.inner.receive_data(&mut message).await?;
let s = String::from_utf8(message).expect("Found invalid UTF-8");
Ok(s)
async fn receive(&mut self) -> Result<ReceivedMessage, Self::Error> {
loop {
let mut message = Vec::new();
let recv = self.inner.receive(&mut message).await?;

match recv {
Incoming::Data(Data::Text(_)) => {
let s = String::from_utf8(message).map_err(|err| WsError::Connection(Utf8(err.utf8_error())))?;
break Ok(ReceivedMessage::Text(s));
}
Incoming::Data(Data::Binary(_)) => break Ok(ReceivedMessage::Bytes(message)),
Incoming::Pong(_) => break Ok(ReceivedMessage::Pong),
_ => continue,
}
}
}
}

Expand Down
19 changes: 16 additions & 3 deletions client/ws-client/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,7 @@ pub struct WsClientBuilder<'a> {
max_request_body_size: u32,
request_timeout: Duration,
connection_timeout: Duration,
ping_interval: Option<Duration>,
headers: Vec<Header<'a>>,
max_concurrent_requests: usize,
max_notifs_per_subscription: usize,
Expand All @@ -87,6 +88,7 @@ impl<'a> Default for WsClientBuilder<'a> {
max_request_body_size: TEN_MB_SIZE_BYTES,
request_timeout: Duration::from_secs(60),
connection_timeout: Duration::from_secs(10),
ping_interval: None,
headers: Vec::new(),
max_concurrent_requests: 256,
max_notifs_per_subscription: 1024,
Expand Down Expand Up @@ -121,6 +123,12 @@ impl<'a> WsClientBuilder<'a> {
self
}

/// See documentation [`ClientBuilder::ping_interval`] (disabled by default).
pub fn ping_interval(mut self, interval: Duration) -> Self {
self.ping_interval = Some(interval);
self
}

/// See documentation [`WsTransportClientBuilder::add_header`] (default is none).
pub fn add_header(mut self, name: &'a str, value: &'a str) -> Self {
self.headers.push(Header { name, value: value.as_bytes() });
Expand Down Expand Up @@ -169,11 +177,16 @@ impl<'a> WsClientBuilder<'a> {
let uri: Uri = url.as_ref().parse().map_err(|e: InvalidUri| Error::Transport(e.into()))?;
let (sender, receiver) = transport_builder.build(uri).await.map_err(|e| Error::Transport(e.into()))?;

Ok(ClientBuilder::default()
let mut client = ClientBuilder::default()
.max_notifs_per_subscription(self.max_notifs_per_subscription)
.request_timeout(self.request_timeout)
.max_concurrent_requests(self.max_concurrent_requests)
.id_format(self.id_kind)
.build_with_tokio(sender, receiver))
.id_format(self.id_kind);

if let Some(interval) = self.ping_interval {
client = client.ping_interval(interval);
}

Ok(client.build_with_tokio(sender, receiver))
}
}
Loading