Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(websocket client): WIP V2 ring buffer channel #165

Closed
wants to merge 3 commits into from
Closed
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ parking_lot = "0.11.1"
pin-project = "1.0.1"
jsonrpsee-proc-macros = { path = "proc-macros" }
rand = "0.7.3"
ring-channel = { git = "https://github.com/niklasad1/ring-channel.git", branch = "na-atomicptr" }
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Were you planning to open a PR for this?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's up to the author of the library, he doesn't seem to be very keen on using AtomicPtr instead of NonNullPtr let's see how it goes. However, the idea is to split this PR into two parts when it works properly

  1. Extract try_send/send.now_or_never into a separate PR and handle the errors without ring channel crate
  2. Support for ring channel for ring channel.

serde = { version = "1.0.117", default-features = false, features = ["derive"] }
serde_json = "1.0.59"
smallvec = { version = "1.5.0", default-features = false }
Expand Down
4 changes: 2 additions & 2 deletions benches/benches.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use async_std::task::block_on;
use criterion::*;
use futures::channel::oneshot::{self, Sender};
use jsonrpsee::client::{HttpClient, HttpConfig, WsClient};
use jsonrpsee::client::{HttpClient, HttpConfig, WsClient, WsConfig};
use jsonrpsee::http::HttpServer;
use jsonrpsee::types::jsonrpc::{JsonValue, Params};
use jsonrpsee::ws::WsServer;
Expand Down Expand Up @@ -77,7 +77,7 @@ pub fn websocket_requests(c: &mut criterion::Criterion) {
let (tx_addr, rx_addr) = oneshot::channel::<SocketAddr>();
async_std::task::spawn(ws_server(tx_addr));
let server_addr = block_on(rx_addr).unwrap();
let client = Arc::new(block_on(WsClient::new(&format!("ws://{}", server_addr))).unwrap());
let client = Arc::new(block_on(WsClient::new(&format!("ws://{}", server_addr), WsConfig::default())).unwrap());

c.bench_function("synchronous WebSocket round trip", |b| {
b.iter(|| {
Expand Down
5 changes: 3 additions & 2 deletions examples/subscription.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,9 +26,10 @@

use async_std::task;
use futures::channel::oneshot::{self, Sender};
use jsonrpsee::client::{WsClient, WsSubscription};
use jsonrpsee::client::{WsClient, WsConfig, WsSubscription};
use jsonrpsee::types::jsonrpc::{JsonValue, Params};
use jsonrpsee::ws::WsServer;
use std::num::NonZeroUsize;

const SOCK_ADDR: &str = "127.0.0.1:9966";
const SERVER_URI: &str = "ws://localhost:9966";
Expand All @@ -44,7 +45,7 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
});

server_started_rx.await?;
let client = WsClient::new(SERVER_URI).await?;
let client = WsClient::new(SERVER_URI, WsConfig::default()).await?;
let mut subscribe_hello: WsSubscription<JsonValue> =
client.subscribe("subscribe_hello", Params::None, "unsubscribe_hello").await?;

Expand Down
5 changes: 3 additions & 2 deletions examples/ws.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,9 +26,10 @@

use async_std::task;
use futures::channel::oneshot::{self, Sender};
use jsonrpsee::client::WsClient;
use jsonrpsee::client::{WsClient, WsConfig};
use jsonrpsee::types::jsonrpc::{JsonValue, Params};
use jsonrpsee::ws::WsServer;
use std::num::NonZeroUsize;

const SOCK_ADDR: &str = "127.0.0.1:9944";
const SERVER_URI: &str = "ws://localhost:9944";
Expand All @@ -43,7 +44,7 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
});

server_started_rx.await?;
let client = WsClient::new(SERVER_URI).await?;
let client = WsClient::new(SERVER_URI, WsConfig::default()).await?;
let response: JsonValue = client.request("say_hello", Params::None).await?;
println!("r: {:?}", response);

Expand Down
4 changes: 3 additions & 1 deletion src/client/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,4 +9,6 @@ mod ws;
#[cfg(feature = "http")]
pub use http::{HttpClient, HttpConfig, HttpTransportClient};
#[cfg(feature = "ws")]
pub use ws::{Client as WsClient, RawClient as RawWsClient, Subscription as WsSubscription, WsTransportClient};
pub use ws::{
Client as WsClient, Config as WsConfig, RawClient as RawWsClient, Subscription as WsSubscription, WsTransportClient,
};
207 changes: 159 additions & 48 deletions src/client/ws/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,11 @@
// DEALINGS IN THE SOFTWARE.

use crate::client::ws::{RawClient, RawClientEvent, RawClientRequestId, WsTransportClient};
use crate::types::error::Error;
use crate::types::error::{Error, SenderError};
use crate::types::jsonrpc::{self, JsonValue};
use std::num::NonZeroUsize;

use ring_channel::{ring_channel, RingReceiver, RingSender};

use futures::{
channel::{mpsc, oneshot},
Expand All @@ -36,6 +39,56 @@ use futures::{
};
use std::{collections::HashMap, io, marker::PhantomData};

enum InternalChannelSender<T> {
AllowLosses(RingSender<T>),
BlockWhenFull(mpsc::Sender<T>),
}

impl<T> InternalChannelSender<T> {
fn send(self, data: T) -> Result<(), Error> {
Copy link
Member Author

@niklasad1 niklasad1 Nov 25, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

TODO: rename to send_non_blocking and provide an additional method send_async or something

match self {
// Fails when the channel is disconnected only.
Self::AllowLosses(mut inner) => inner.send(data).map_err(|_| Error::Internal(SenderError::Disconnected)),
// NOTE: we don't want block here when the queue full
// either close the connection or just log an error.
Self::BlockWhenFull(mut inner) => {
inner.try_send(data).map_err(|e| Error::Internal(e.into_send_error().into()))
}
}
}

fn is_closed(self, dummy: T) -> bool {
match self {
// Send dummy value just to check if the channel is alive.
Self::AllowLosses(mut inner) => inner.send(dummy).is_err(),
Self::BlockWhenFull(inner) => inner.is_closed(),
}
}
}

impl<T> Clone for InternalChannelSender<T> {
fn clone(&self) -> Self {
match self {
Self::AllowLosses(inner) => Self::AllowLosses(inner.clone()),
Self::BlockWhenFull(inner) => Self::BlockWhenFull(inner.clone()),
}
}
}

enum InternalChannelReceiver<T> {
AllowLosses(RingReceiver<T>),
BlockWhenFull(mpsc::Receiver<T>),
}

impl<T> InternalChannelReceiver<T> {
async fn next(&mut self) -> Option<T> {
match self {
Self::AllowLosses(inner) => inner.next().await,
Self::BlockWhenFull(inner) => inner.next().await,
}
}
}

/// Client that can be cloned.
///
/// > **Note**: This struct is designed to be easy to use, but it works by maintaining a background
Expand All @@ -44,17 +97,46 @@ use std::{collections::HashMap, io, marker::PhantomData};
#[derive(Clone)]
pub struct Client {
/// Channel to send requests to the background task.
to_back: mpsc::Sender<FrontToBack>,
to_back: InternalChannelSender<FrontToBack>,
/// Config.
config: Config,
}

#[derive(Copy, Clone, Debug)]
/// Configuration.
pub struct Config {
/// Backend channel for serving requests and notifications.
request_channel_capacity: usize,
/// Backend channel for each unique subscription.
subscription_channel_capacity: usize,
/// Allow losses when the channel gets full
allow_subscription_losses: bool,
/// Allow losses when the request/notifications channel gets full
allow_request_losses: bool,
/// Max request body size
max_request_body_size: usize,
}

impl Default for Config {
fn default() -> Self {
Self {
request_channel_capacity: 16,
subscription_channel_capacity: 16,
allow_subscription_losses: true,
allow_request_losses: false,
max_request_body_size: 10 * 1024 * 1024,
}
}
}

/// Active subscription on a [`Client`].
pub struct Subscription<Notif> {
/// Channel to send requests to the background task.
to_back: mpsc::Sender<FrontToBack>,
/// Channel from which we receive notifications from the server, as undecoded `JsonValue`s.
notifs_rx: mpsc::Receiver<JsonValue>,
to_back: InternalChannelSender<FrontToBack>,
/// Channel from which we receive notifications from the server, as un-decoded `JsonValue`s.
notifs_rx: InternalChannelReceiver<JsonValue>,
/// Marker in order to pin the `Notif` parameter.
marker: PhantomData<mpsc::Receiver<Notif>>,
marker: PhantomData<Notif>,
}

/// Message that the [`Client`] can send to the background task.
Expand Down Expand Up @@ -88,7 +170,7 @@ enum FrontToBack {
/// When we get a response from the server about that subscription, we send the result on
/// this channel. If the subscription succeeds, we return a `Receiver` that will receive
/// notifications.
send_back: oneshot::Sender<Result<mpsc::Receiver<JsonValue>, Error>>,
send_back: oneshot::Sender<Result<InternalChannelReceiver<JsonValue>, Error>>,
},

/// When a request or subscription channel is closed, we send this message to the background
Expand All @@ -104,14 +186,20 @@ impl Client {
/// Initializes a new WebSocket client
///
/// Fails when the URL is invalid.
pub async fn new(target: &str) -> Result<Self, Error> {
pub async fn new(target: impl AsRef<str>, config: Config) -> Result<Self, Error> {
let transport = WsTransportClient::new(target).await.map_err(|e| Error::TransportError(Box::new(e)))?;
let client = RawClient::new(transport);
let (to_back, from_front) = mpsc::channel(16);

let (to_back, from_front) = if config.allow_request_losses {
allow_losses_channel(config.request_channel_capacity)
} else {
blocking_channel(config.request_channel_capacity)
};

async_std::task::spawn(async move {
background_task(client, from_front).await;
background_task(client, from_front, config).await;
});
Ok(Client { to_back })
Ok(Client { to_back, config })
}

/// Send a notification to the server.
Expand All @@ -123,7 +211,7 @@ impl Client {
let method = method.into();
let params = params.into();
log::trace!("[frontend]: send notification: method={:?}, params={:?}", method, params);
self.to_back.clone().send(FrontToBack::Notification { method, params }).await.map_err(Error::Internal)
self.to_back.clone().send(FrontToBack::Notification { method, params })
}

/// Perform a request towards the server.
Expand All @@ -139,13 +227,9 @@ impl Client {
let params = params.into();
log::trace!("[frontend]: send request: method={:?}, params={:?}", method, params);
let (send_back_tx, send_back_rx) = oneshot::channel();
self.to_back
.clone()
.send(FrontToBack::StartRequest { method, params, send_back: send_back_tx })
.await
.map_err(Error::Internal)?;

// TODO: send a `ChannelClosed` message if we close the channel unexpectedly
// -> it is impossible to do without another channel.
self.to_back.clone().send(FrontToBack::StartRequest { method, params, send_back: send_back_tx })?;

let json_value = match send_back_rx.await {
Ok(Ok(v)) => v,
Expand Down Expand Up @@ -177,16 +261,12 @@ impl Client {

log::trace!("[frontend]: subscribe: {:?}, unsubscribe: {:?}", subscribe_method, unsubscribe_method);
let (send_back_tx, send_back_rx) = oneshot::channel();
self.to_back
.clone()
.send(FrontToBack::Subscribe {
subscribe_method,
unsubscribe_method,
params: params.into(),
send_back: send_back_tx,
})
.await
.map_err(Error::Internal)?;
self.to_back.clone().send(FrontToBack::Subscribe {
subscribe_method,
unsubscribe_method,
params: params.into(),
send_back: send_back_tx,
})?;

let notifs_rx = match send_back_rx.await {
Ok(Ok(v)) => v,
Expand Down Expand Up @@ -225,20 +305,20 @@ where
impl<Notif> Drop for Subscription<Notif> {
fn drop(&mut self) {
// We can't actually guarantee that this goes through. If the background task is busy, then
// the channel's buffer will be full, and our unsubscription request will never make it.
// the channel's buffer will be full, and our un-subscription request will never make it.
// However, when a notification arrives, the background task will realize that the channel
// to the `Subscription` has been closed, and will perform the unsubscribe.
let _ = self.to_back.send(FrontToBack::ChannelClosed).now_or_never();
let _ = self.to_back.clone().send(FrontToBack::ChannelClosed);
}
}

/// Function being run in the background that processes messages from the frontend.
async fn background_task(mut client: RawClient, mut from_front: mpsc::Receiver<FrontToBack>) {
async fn background_task(mut client: RawClient, mut from_front: InternalChannelReceiver<FrontToBack>, config: Config) {
// List of subscription requests that have been sent to the server, with the method name to
// unsubscribe.
let mut pending_subscriptions: HashMap<RawClientRequestId, (oneshot::Sender<_>, _)> = HashMap::new();
// List of subscription that are active on the server, with the method name to unsubscribe.
let mut active_subscriptions: HashMap<RawClientRequestId, (mpsc::Sender<jsonrpc::JsonValue>, _)> = HashMap::new();
let mut active_subscriptions: HashMap<RawClientRequestId, (InternalChannelSender<JsonValue>, _)> = HashMap::new();
// List of requests that the server must answer.
let mut ongoing_requests: HashMap<RawClientRequestId, oneshot::Sender<Result<_, _>>> = HashMap::new();

Expand Down Expand Up @@ -303,10 +383,21 @@ async fn background_task(mut client: RawClient, mut from_front: mpsc::Receiver<F
Either::Left(Some(FrontToBack::ChannelClosed)) => {
// TODO: there's no way to cancel pending subscriptions and requests, otherwise
// we should clean them up as well
while let Some(rq_id) = active_subscriptions.iter().find(|(_, (v, _))| v.is_closed()).map(|(k, _)| *k) {
let (_, unsubscribe) = active_subscriptions.remove(&rq_id).unwrap();
client.subscription_by_id(rq_id).unwrap().into_active().unwrap().close(unsubscribe).await.unwrap();

// TODO(niklasad1): perf use mem::replace trick here instead of Vec.
let mut remove = Vec::new();

for (req_id, (sender, _unsubscribe)) in &active_subscriptions {
if sender.clone().is_closed(JsonValue::Null) {
remove.push(*req_id);
}
}

for req_id in remove {
let (_, unsubscribe) = active_subscriptions.remove(&req_id).unwrap();
client.subscription_by_id(req_id).unwrap().into_active().unwrap().close(unsubscribe).await.unwrap();
}
return;
}

// Received a response to a request from the server.
Expand All @@ -317,13 +408,17 @@ async fn background_task(mut client: RawClient, mut from_front: mpsc::Receiver<F

// Receive a response from the server about a subscription.
Either::Right(Ok(RawClientEvent::SubscriptionResponse { request_id, result })) => {
log::trace!("[backend]: client received response to subscription: {:?}", result);
log::info!("[backend]: client received response to subscription: {:?}", result);
let (send_back, unsubscribe) = pending_subscriptions.remove(&request_id).unwrap();
if let Err(err) = result {
let _ = send_back.send(Err(Error::Request(err)));
} else {
// TODO: what's a good limit here? way more tricky than it looks
let (notifs_tx, notifs_rx) = mpsc::channel(4);
let (notifs_tx, notifs_rx) = if config.allow_request_losses {
allow_losses_channel(config.subscription_channel_capacity)
} else {
blocking_channel(config.subscription_channel_capacity)
};

if send_back.send(Ok(notifs_rx)).is_ok() {
active_subscriptions.insert(request_id, (notifs_tx, unsubscribe));
} else {
Expand All @@ -342,16 +437,22 @@ async fn background_task(mut client: RawClient, mut from_front: mpsc::Receiver<F
Either::Right(Ok(RawClientEvent::SubscriptionNotif { request_id, result })) => {
// TODO: unsubscribe if channel is closed
let (notifs_tx, _) = active_subscriptions.get_mut(&request_id).unwrap();
if notifs_tx.send(result).await.is_err() {
let (_, unsubscribe) = active_subscriptions.remove(&request_id).unwrap();
client
.subscription_by_id(request_id)
.unwrap()
.into_active()
.unwrap()
.close(unsubscribe)
.await
.unwrap();

match notifs_tx.clone().send(result) {
Err(Error::Internal(SenderError::Disconnected)) => {
let (_, unsubscribe) = active_subscriptions.remove(&request_id).unwrap();
let _ = client
.subscription_by_id(request_id)
.unwrap()
.into_active()
.unwrap()
.close(unsubscribe)
.await;
}
Err(Error::Internal(SenderError::Full)) => {
// TODO: err log, close subscription or close connection?!
}
_ => (),
}
}

Expand All @@ -365,3 +466,13 @@ async fn background_task(mut client: RawClient, mut from_front: mpsc::Receiver<F
}
}
}

fn blocking_channel<T>(capacity: usize) -> (InternalChannelSender<T>, InternalChannelReceiver<T>) {
let (tx, rx) = mpsc::channel(capacity);
(InternalChannelSender::BlockWhenFull(tx), InternalChannelReceiver::BlockWhenFull(rx))
}

fn allow_losses_channel<T>(capacity: usize) -> (InternalChannelSender<T>, InternalChannelReceiver<T>) {
let (tx, rx) = ring_channel(NonZeroUsize::new(capacity).unwrap());
(InternalChannelSender::AllowLosses(tx), InternalChannelReceiver::AllowLosses(rx))
}
Loading