From a38045ce263b69219bac7e2bd46a86cf8c666ea1 Mon Sep 17 00:00:00 2001 From: Niklas Adolfsson Date: Tue, 26 Apr 2022 16:47:28 +0200 Subject: [PATCH 01/12] feat: limit the number of subscriptions Closing #729 --- core/src/server/helpers.rs | 49 ++++++++++++++++++++++++++++++++++++++ ws-server/src/server.rs | 45 +++++++++++++++++++++------------- 2 files changed, 78 insertions(+), 16 deletions(-) diff --git a/core/src/server/helpers.rs b/core/src/server/helpers.rs index 2fb7bb319d..82fd33376e 100644 --- a/core/src/server/helpers.rs +++ b/core/src/server/helpers.rs @@ -25,6 +25,7 @@ // DEALINGS IN THE SOFTWARE. use std::io; +use std::sync::Arc; use crate::{to_json_raw_value, Error}; use futures_channel::mpsc; @@ -32,6 +33,7 @@ use futures_util::StreamExt; use jsonrpsee_types::error::{ErrorCode, ErrorObject, ErrorResponse, OVERSIZED_RESPONSE_CODE, OVERSIZED_RESPONSE_MSG}; use jsonrpsee_types::{Id, InvalidRequest, Response}; use serde::Serialize; +use tokio::sync::Notify; /// Bounded writer that allows writing at most `max_len` bytes. /// @@ -196,8 +198,41 @@ pub async fn collect_batch_response(rx: mpsc::UnboundedReceiver) -> Stri buf } +/// Wrapper over [`tokio::sync::Notify`] with bounds check. +#[derive(Debug)] +pub struct BoundedSubscriptions { + inner: Arc, + max_subscriptions: u32, +} + +impl BoundedSubscriptions { + /// Create a new bounded subscription. + pub fn new(max_subscriptions: u32) -> Self { + Self { inner: Arc::new(Notify::new()), max_subscriptions } + } + + /// The get a handle to a subscription + /// + /// Fails if `max_subscriptions` have been exceeded. + pub fn get(&self) -> Option> { + // The type itself increases the strong count by + if Arc::strong_count(&self.inner) as u32 > self.max_subscriptions { + None + } else { + Some(self.inner.clone()) + } + } + + /// Close all subscriptions. + pub fn close(&self) { + self.inner.notify_waiters(); + } +} + #[cfg(test)] mod tests { + use crate::server::helpers::BoundedSubscriptions; + use super::{BoundedWriter, Id, Response}; #[test] @@ -215,4 +250,18 @@ mod tests { // NOTE: `"` is part of the serialization so 101 characters. assert!(serde_json::to_writer(&mut writer, &"x".repeat(99)).is_err()); } + + #[test] + fn bounded_subscriptions_work() { + let subs = Arc::new(BoundedSubscriptions::new(5)); + let mut handles = Vec::new(); + + for _ in 0..5 { + handles.push(subs.get().unwrap()); + } + + assert!(subs.get().is_none()); + handles.swap_remove(0); + assert!(subs.get().is_some()); + } } diff --git a/ws-server/src/server.rs b/ws-server/src/server.rs index 5e1b2872bd..136dc008fe 100644 --- a/ws-server/src/server.rs +++ b/ws-server/src/server.rs @@ -39,7 +39,7 @@ use futures_util::io::{BufReader, BufWriter}; use futures_util::stream::StreamExt; use jsonrpsee_core::id_providers::RandomIntegerIdProvider; use jsonrpsee_core::middleware::Middleware; -use jsonrpsee_core::server::helpers::{collect_batch_response, prepare_error, MethodSink}; +use jsonrpsee_core::server::helpers::{collect_batch_response, prepare_error, BoundedSubscriptions, MethodSink}; use jsonrpsee_core::server::resource_limiting::Resources; use jsonrpsee_core::server::rpc_module::{ConnState, ConnectionId, MethodKind, Methods}; use jsonrpsee_core::traits::IdProvider; @@ -49,7 +49,6 @@ use soketto::connection::Error as SokettoError; use soketto::handshake::{server::Response, Server as SokettoServer}; use soketto::Sender; use tokio::net::{TcpListener, TcpStream, ToSocketAddrs}; -use tokio::sync::Notify; use tokio_util::compat::{Compat, TokioAsyncReadCompatExt}; /// Default maximum connections allowed. @@ -271,6 +270,7 @@ where resources.clone(), cfg.max_request_body_size, cfg.max_response_body_size, + BoundedSubscriptions::new(cfg.max_subscriptions_per_connection), stop_monitor.clone(), middleware, id_provider, @@ -292,6 +292,7 @@ async fn background_task( resources: Resources, max_request_body_size: u32, max_response_body_size: u32, + bounded_subscriptions: BoundedSubscriptions, stop_server: StopMonitor, middleware: impl Middleware, id_provider: Arc, @@ -301,8 +302,8 @@ async fn background_task( builder.set_max_message_size(max_request_body_size as usize); let (mut sender, mut receiver) = builder.finish(); let (tx, mut rx) = mpsc::unbounded::(); - let close_notify = Arc::new(Notify::new()); - let close_notify_server_stop = close_notify.clone(); + let bounded_subscriptions = Arc::new(bounded_subscriptions); + let bounded_subscriptions2 = bounded_subscriptions.clone(); let stop_server2 = stop_server.clone(); let sink = MethodSink::new_with_limit(tx, max_response_body_size); @@ -327,7 +328,7 @@ async fn background_task( let _ = sender.close().await; // Notify all listeners and close down associated tasks. - close_notify_server_stop.notify_waiters(); + bounded_subscriptions2.close(); }); // Buffer for incoming data. @@ -436,11 +437,14 @@ async fn background_task( }, MethodKind::Subscription(callback) => match method.claim(&req.method, &resources) { Ok(guard) => { - let cn = close_notify.clone(); - let conn_state = - ConnState { conn_id, close_notify: cn, id_provider: &*id_provider }; - - let result = callback(id, params, &sink, conn_state); + let result = if let Some(cn) = bounded_subscriptions.get() { + let conn_state = + ConnState { conn_id, close_notify: cn, id_provider: &*id_provider }; + callback(id, params, &sink, conn_state) + } else { + sink.send_error(req.id, ErrorCode::ServerIsBusy.into()); + false + }; middleware.on_result(name, result, request_start); middleware.on_response(request_start); drop(guard); @@ -470,7 +474,7 @@ async fn background_task( let methods = &methods; let sink = sink.clone(); let id_provider = id_provider.clone(); - let close_notify2 = close_notify.clone(); + let bounded_subscriptions2 = bounded_subscriptions.clone(); let fut = async move { // Batch responses must be sent back as a single message so we read the results from each @@ -537,11 +541,17 @@ async fn background_task( MethodKind::Subscription(callback) => { match method_callback.claim(&req.method, resources) { Ok(guard) => { - let close_notify = close_notify2.clone(); - let conn_state = - ConnState { conn_id, close_notify, id_provider: &*id_provider }; - - let result = callback(id, params, &sink_batch, conn_state); + let result = if let Some(cn) = bounded_subscriptions2.get() { + let conn_state = ConnState { + conn_id, + close_notify: cn, + id_provider: &*id_provider, + }; + callback(id, params, &sink_batch, conn_state) + } else { + sink_batch.send_error(req.id, ErrorCode::ServerIsBusy.into()); + false + }; middleware.on_result(&req.method, result, request_start); drop(guard); None @@ -629,6 +639,8 @@ struct Settings { max_response_body_size: u32, /// Maximum number of incoming connections allowed. max_connections: u64, + /// Maximum number of subscriptions per connection. + max_subscriptions_per_connection: u32, /// Policy by which to accept or deny incoming requests based on the `Origin` header. allowed_origins: AllowedValue, /// Policy by which to accept or deny incoming requests based on the `Host` header. @@ -642,6 +654,7 @@ impl Default for Settings { Self { max_request_body_size: TEN_MB_SIZE_BYTES, max_response_body_size: TEN_MB_SIZE_BYTES, + max_subscriptions_per_connection: 1024, max_connections: MAX_CONNECTIONS, allowed_origins: AllowedValue::Any, allowed_hosts: AllowedValue::Any, From f61ac0ea23c6d5097502e1320aa1a431028dc7a6 Mon Sep 17 00:00:00 2001 From: Niklas Adolfsson Date: Wed, 27 Apr 2022 16:14:15 +0200 Subject: [PATCH 02/12] fix nit --- core/src/server/helpers.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/src/server/helpers.rs b/core/src/server/helpers.rs index 82fd33376e..951ed5eea5 100644 --- a/core/src/server/helpers.rs +++ b/core/src/server/helpers.rs @@ -253,7 +253,7 @@ mod tests { #[test] fn bounded_subscriptions_work() { - let subs = Arc::new(BoundedSubscriptions::new(5)); + let subs = BoundedSubscriptions::new(5); let mut handles = Vec::new(); for _ in 0..5 { From 761eea480e55db440de1b4625d8c7969a39240e5 Mon Sep 17 00:00:00 2001 From: Niklas Adolfsson Date: Fri, 29 Apr 2022 17:05:10 +0200 Subject: [PATCH 03/12] Update core/src/server/helpers.rs --- core/src/server/helpers.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/src/server/helpers.rs b/core/src/server/helpers.rs index 951ed5eea5..44701e7dfc 100644 --- a/core/src/server/helpers.rs +++ b/core/src/server/helpers.rs @@ -215,7 +215,7 @@ impl BoundedSubscriptions { /// /// Fails if `max_subscriptions` have been exceeded. pub fn get(&self) -> Option> { - // The type itself increases the strong count by + // The type itself increases the strong count by 1 by having an `Arc` if Arc::strong_count(&self.inner) as u32 > self.max_subscriptions { None } else { From d78029e3f112e7dd270edbd86e079db1706a92e4 Mon Sep 17 00:00:00 2001 From: Niklas Adolfsson Date: Sat, 30 Apr 2022 00:13:07 +0200 Subject: [PATCH 04/12] add integration tests + some fixes so it works --- core/src/server/helpers.rs | 46 ++++++++++++++++---------- core/src/server/rpc_module.rs | 21 ++++++------ tests/tests/integration_tests.rs | 55 ++++++++++++++++++++++++++++++++ ws-server/src/server.rs | 11 +++++-- 4 files changed, 103 insertions(+), 30 deletions(-) diff --git a/core/src/server/helpers.rs b/core/src/server/helpers.rs index 44701e7dfc..55358a3a47 100644 --- a/core/src/server/helpers.rs +++ b/core/src/server/helpers.rs @@ -33,7 +33,7 @@ use futures_util::StreamExt; use jsonrpsee_types::error::{ErrorCode, ErrorObject, ErrorResponse, OVERSIZED_RESPONSE_CODE, OVERSIZED_RESPONSE_MSG}; use jsonrpsee_types::{Id, InvalidRequest, Response}; use serde::Serialize; -use tokio::sync::Notify; +use tokio::sync::{Notify, OwnedSemaphorePermit, Semaphore}; /// Bounded writer that allows writing at most `max_len` bytes. /// @@ -198,34 +198,46 @@ pub async fn collect_batch_response(rx: mpsc::UnboundedReceiver) -> Stri buf } -/// Wrapper over [`tokio::sync::Notify`] with bounds check. +/// A permitted subscription. #[derive(Debug)] +pub struct SubscriptionPermit { + _permit: OwnedSemaphorePermit, + resource: Arc, +} + +impl SubscriptionPermit { + /// Get the handle to [`tokio::sync::Notify`]. + pub fn handle(&self) -> Arc { + self.resource.clone() + } +} + +/// Wrapper over [`tokio::sync::Notify`] with bounds check. +#[derive(Debug, Clone)] pub struct BoundedSubscriptions { - inner: Arc, - max_subscriptions: u32, + resource: Arc, + guard: Arc, } impl BoundedSubscriptions { /// Create a new bounded subscription. pub fn new(max_subscriptions: u32) -> Self { - Self { inner: Arc::new(Notify::new()), max_subscriptions } + Self { resource: Arc::new(Notify::new()), guard: Arc::new(Semaphore::new(max_subscriptions as usize)) } } - /// The get a handle to a subscription + /// Attempts to acquire a subscription slot. /// /// Fails if `max_subscriptions` have been exceeded. - pub fn get(&self) -> Option> { - // The type itself increases the strong count by 1 by having an `Arc` - if Arc::strong_count(&self.inner) as u32 > self.max_subscriptions { - None - } else { - Some(self.inner.clone()) - } + pub fn acquire(&self) -> Option { + Arc::clone(&self.guard) + .try_acquire_owned() + .ok() + .map(|p| SubscriptionPermit { _permit: p, resource: self.resource.clone() }) } /// Close all subscriptions. pub fn close(&self) { - self.inner.notify_waiters(); + self.resource.notify_waiters(); } } @@ -257,11 +269,11 @@ mod tests { let mut handles = Vec::new(); for _ in 0..5 { - handles.push(subs.get().unwrap()); + handles.push(subs.acquire().unwrap()); } - assert!(subs.get().is_none()); + assert!(subs.acquire().is_none()); handles.swap_remove(0); - assert!(subs.get().is_some()); + assert!(subs.acquire().is_some()); } } diff --git a/core/src/server/rpc_module.rs b/core/src/server/rpc_module.rs index 3c19ffe651..7225b0567f 100644 --- a/core/src/server/rpc_module.rs +++ b/core/src/server/rpc_module.rs @@ -30,6 +30,7 @@ use std::future::Future; use std::ops::{Deref, DerefMut}; use std::sync::Arc; +use super::helpers::{BoundedSubscriptions, SubscriptionPermit}; use crate::error::{Error, SubscriptionClosed}; use crate::id_providers::RandomIntegerIdProvider; use crate::server::helpers::MethodSink; @@ -48,7 +49,6 @@ use jsonrpsee_types::{ use parking_lot::Mutex; use rustc_hash::FxHashMap; use serde::{de::DeserializeOwned, Serialize}; -use tokio::sync::Notify; /// A `MethodCallback` is an RPC endpoint, callable with a standard JSON-RPC request, /// implemented as a function pointer to a `Fn` function taking four arguments: @@ -71,14 +71,14 @@ pub type ConnectionId = usize; /// - Call result as a `String`, /// - a [`mpsc::UnboundedReceiver`] to receive future subscription results /// - a [`tokio::sync::Notify`] to allow subscribers to notify their [`SubscriptionSink`] when they disconnect. -pub type RawRpcResponse = (String, mpsc::UnboundedReceiver, Arc); +pub type RawRpcResponse = (String, mpsc::UnboundedReceiver, SubscriptionPermit); /// Helper struct to manage subscriptions. pub struct ConnState<'a> { /// Connection ID pub conn_id: ConnectionId, /// Get notified when the connection to subscribers is closed. - pub close_notify: Arc, + pub close_notify: SubscriptionPermit, /// ID provider. pub id_provider: &'a dyn IdProvider, } @@ -393,14 +393,15 @@ impl Methods { let sink = MethodSink::new(tx_sink); let id = req.id.clone(); let params = Params::new(req.params.map(|params| params.get())); - let notify = Arc::new(Notify::new()); + let bounded_subs = BoundedSubscriptions::new(u32::MAX); + let close_notify = bounded_subs.acquire().unwrap(); + let notify = bounded_subs.acquire().unwrap(); let _result = match self.method(&req.method).map(|c| &c.callback) { None => sink.send_error(req.id, ErrorCode::MethodNotFound.into()), Some(MethodKind::Sync(cb)) => (cb)(id, params, &sink), Some(MethodKind::Async(cb)) => (cb)(id.into_owned(), params.into_owned(), sink, 0, None).await, Some(MethodKind::Subscription(cb)) => { - let close_notify = notify.clone(); let conn_state = ConnState { conn_id: 0, close_notify, id_provider: &RandomIntegerIdProvider }; (cb)(id, params, &sink, conn_state) } @@ -757,7 +758,7 @@ struct InnerPendingSubscription { /// Sink. sink: MethodSink, /// Get notified when subscribers leave so we can exit - close_notify: Option>, + close_notify: Option, /// MethodCallback. method: &'static str, /// Unique subscription. @@ -819,7 +820,7 @@ pub struct SubscriptionSink { /// Sink. inner: MethodSink, /// Get notified when subscribers leave so we can exit - close_notify: Option>, + close_notify: Option, /// MethodCallback. method: &'static str, /// Unique subscription. @@ -891,7 +892,7 @@ impl SubscriptionSink { T: Serialize, E: std::fmt::Display, { - let close_notify = match self.close_notify.clone() { + let close_notify = match self.close_notify.as_ref().map(|cn| cn.handle()) { Some(close_notify) => close_notify, None => return SubscriptionClosed::RemotePeerAborted, }; @@ -1033,7 +1034,7 @@ impl Drop for SubscriptionSink { /// Wrapper struct that maintains a subscription "mainly" for testing. #[derive(Debug)] pub struct Subscription { - close_notify: Option>, + close_notify: Option, rx: mpsc::UnboundedReceiver, sub_id: RpcSubscriptionId<'static>, } @@ -1043,7 +1044,7 @@ impl Subscription { pub fn close(&mut self) { tracing::trace!("[Subscription::close] Notifying"); if let Some(n) = self.close_notify.take() { - n.notify_one() + n.handle().notify_one() } } /// Get the subscription ID diff --git a/tests/tests/integration_tests.rs b/tests/tests/integration_tests.rs index 15a808971c..43b1ea1f17 100644 --- a/tests/tests/integration_tests.rs +++ b/tests/tests/integration_tests.rs @@ -545,6 +545,61 @@ async fn ws_batch_works() { assert_eq!(responses, vec!["hello".to_string(), "hello".to_string()]); } +#[tokio::test] +async fn ws_server_limit_subs_per_conn_works() { + use futures::StreamExt; + use jsonrpsee::types::error::{CallError, SERVER_IS_BUSY_CODE, SERVER_IS_BUSY_MSG}; + use jsonrpsee::{ws_server::WsServerBuilder, RpcModule}; + + let server = WsServerBuilder::default().max_subscriptions_per_connection(10).build("127.0.0.1:0").await.unwrap(); + let server_url = format!("ws://{}", server.local_addr().unwrap()); + + let mut module = RpcModule::new(()); + + module + .register_subscription("subscribe_forever", "n", "unsubscribe_forever", |_, pending, _| { + let mut sink = match pending.accept() { + Some(sink) => sink, + _ => return, + }; + + tokio::spawn(async move { + let interval = interval(Duration::from_millis(50)); + let stream = IntervalStream::new(interval).map(move |_| 0_usize); + + match sink.pipe_from_stream(stream).await { + SubscriptionClosed::Success => { + sink.close(SubscriptionClosed::Success); + } + _ => unreachable!(), + }; + }); + }) + .unwrap(); + server.start(module).unwrap(); + + let c1 = WsClientBuilder::default().build(&server_url).await.unwrap(); + let c2 = WsClientBuilder::default().build(&server_url).await.unwrap(); + + let mut subs1 = Vec::new(); + let mut subs2 = Vec::new(); + + for _ in 0..10 { + subs1.push(c1.subscribe::("subscribe_forever", None, "unsubscribe_forever").await.unwrap()); + subs2.push(c2.subscribe::("subscribe_forever", None, "unsubscribe_forever").await.unwrap()); + } + + let err1 = c1.subscribe::("subscribe_forever", None, "unsubscribe_forever").await; + let err2 = c1.subscribe::("subscribe_forever", None, "unsubscribe_forever").await; + + assert!( + matches!(err1, Err(Error::Call(CallError::Custom(err))) if err.code() == SERVER_IS_BUSY_CODE && err.message() == SERVER_IS_BUSY_MSG) + ); + assert!( + matches!(err2, Err(Error::Call(CallError::Custom(err))) if err.code() == SERVER_IS_BUSY_CODE && err.message() == SERVER_IS_BUSY_MSG) + ); +} + #[tokio::test] async fn http_unsupported_methods_dont_work() { use hyper::{Body, Client, Method, Request}; diff --git a/ws-server/src/server.rs b/ws-server/src/server.rs index 136dc008fe..824aee527a 100644 --- a/ws-server/src/server.rs +++ b/ws-server/src/server.rs @@ -302,7 +302,6 @@ async fn background_task( builder.set_max_message_size(max_request_body_size as usize); let (mut sender, mut receiver) = builder.finish(); let (tx, mut rx) = mpsc::unbounded::(); - let bounded_subscriptions = Arc::new(bounded_subscriptions); let bounded_subscriptions2 = bounded_subscriptions.clone(); let stop_server2 = stop_server.clone(); @@ -437,7 +436,7 @@ async fn background_task( }, MethodKind::Subscription(callback) => match method.claim(&req.method, &resources) { Ok(guard) => { - let result = if let Some(cn) = bounded_subscriptions.get() { + let result = if let Some(cn) = bounded_subscriptions.acquire() { let conn_state = ConnState { conn_id, close_notify: cn, id_provider: &*id_provider }; callback(id, params, &sink, conn_state) @@ -541,7 +540,7 @@ async fn background_task( MethodKind::Subscription(callback) => { match method_callback.claim(&req.method, resources) { Ok(guard) => { - let result = if let Some(cn) = bounded_subscriptions2.get() { + let result = if let Some(cn) = bounded_subscriptions2.acquire() { let conn_state = ConnState { conn_id, close_notify: cn, @@ -709,6 +708,12 @@ impl Builder { self } + /// Set the maximum number of connections allowed. Default is 1024. + pub fn max_subscriptions_per_connection(mut self, max: u32) -> Self { + self.settings.max_subscriptions_per_connection = max; + self + } + /// Register a new resource kind. Errors if `label` is already registered, or if the number of /// registered resources on this server instance would exceed 8. /// From fdbffe9f289ddacba29a1847b83df99bf8abf228 Mon Sep 17 00:00:00 2001 From: Niklas Adolfsson Date: Sun, 1 May 2022 10:37:41 +0200 Subject: [PATCH 05/12] cargo fmt --- core/src/server/rpc_module.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/core/src/server/rpc_module.rs b/core/src/server/rpc_module.rs index 3925944194..cd634bd348 100644 --- a/core/src/server/rpc_module.rs +++ b/core/src/server/rpc_module.rs @@ -32,7 +32,7 @@ use std::sync::Arc; use crate::error::{Error, SubscriptionClosed}; use crate::id_providers::RandomIntegerIdProvider; -use crate::server::helpers::{BoundedSubscriptions, SubscriptionPermit, MethodSink}; +use crate::server::helpers::{BoundedSubscriptions, MethodSink, SubscriptionPermit}; use crate::server::resource_limiting::{ResourceGuard, ResourceTable, ResourceVec, Resources}; use crate::traits::{IdProvider, ToRpcParams}; use futures_channel::mpsc; @@ -48,7 +48,7 @@ use jsonrpsee_types::{ use parking_lot::Mutex; use rustc_hash::FxHashMap; use serde::{de::DeserializeOwned, Serialize}; -use tokio::sync::{watch}; +use tokio::sync::watch; /// A `MethodCallback` is an RPC endpoint, callable with a standard JSON-RPC request, /// implemented as a function pointer to a `Fn` function taking four arguments: From 96189b0f2e1d4c23c41014f4f35cbc4b1849375f Mon Sep 17 00:00:00 2001 From: Niklas Adolfsson Date: Mon, 2 May 2022 17:38:40 +0200 Subject: [PATCH 06/12] fix doc links --- core/src/server/rpc_module.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/src/server/rpc_module.rs b/core/src/server/rpc_module.rs index cd634bd348..fbf7b11ba6 100644 --- a/core/src/server/rpc_module.rs +++ b/core/src/server/rpc_module.rs @@ -70,7 +70,7 @@ pub type ConnectionId = usize; /// A 3-tuple containing: /// - Call result as a `String`, /// - a [`mpsc::UnboundedReceiver`] to receive future subscription results -/// - a [`crate::servers::helpers::SubscriptionPermit`] to allow subscribers to notify their [`SubscriptionSink`] when they disconnect. +/// - a [`crate::server::helpers::SubscriptionPermit`] to allow subscribers to notify their [`SubscriptionSink`] when they disconnect. pub type RawRpcResponse = (String, mpsc::UnboundedReceiver, SubscriptionPermit); /// Helper struct to manage subscriptions. From 7ac1fd66008e6ae3ba2d9d1c8aec670b3f9e1f55 Mon Sep 17 00:00:00 2001 From: James Wilson Date: Tue, 3 May 2022 11:55:36 +0100 Subject: [PATCH 07/12] Unsubscribe calls should avoid subscription limits Point to Tokio 1.16 (we use a method from it), and a little special treatment for unsubscribe methods --- benches/Cargo.toml | 2 +- client/http-client/Cargo.toml | 4 ++-- core/Cargo.toml | 4 ++-- core/src/server/rpc_module.rs | 22 ++++++++++++++++----- examples/Cargo.toml | 2 +- http-server/Cargo.toml | 2 +- http-server/src/server.rs | 4 ++-- proc-macros/Cargo.toml | 2 +- test-utils/Cargo.toml | 2 +- tests/Cargo.toml | 2 +- ws-server/Cargo.toml | 2 +- ws-server/src/server.rs | 37 +++++++++++++++++++++++++++++++++++ 12 files changed, 67 insertions(+), 18 deletions(-) diff --git a/benches/Cargo.toml b/benches/Cargo.toml index 4c68bfa962..ec9a39e4ab 100644 --- a/benches/Cargo.toml +++ b/benches/Cargo.toml @@ -18,7 +18,7 @@ jsonrpc-http-server = { version = "18.0.0", optional = true } jsonrpc-pubsub = { version = "18.0.0", optional = true } num_cpus = "1" serde_json = "1" -tokio = { version = "1.8", features = ["rt-multi-thread"] } +tokio = { version = "1.16", features = ["rt-multi-thread"] } [[bench]] name = "bench" diff --git a/client/http-client/Cargo.toml b/client/http-client/Cargo.toml index dbd87f7469..091dabf867 100644 --- a/client/http-client/Cargo.toml +++ b/client/http-client/Cargo.toml @@ -19,12 +19,12 @@ jsonrpsee-core = { path = "../../core", version = "0.11.0", features = ["client" serde = { version = "1.0", default-features = false, features = ["derive"] } serde_json = "1.0" thiserror = "1.0" -tokio = { version = "1.8", features = ["time"] } +tokio = { version = "1.16", features = ["time"] } tracing = "0.1" [dev-dependencies] jsonrpsee-test-utils = { path = "../../test-utils" } -tokio = { version = "1.8", features = ["net", "rt-multi-thread", "macros"] } +tokio = { version = "1.16", features = ["net", "rt-multi-thread", "macros"] } [features] default = ["tls"] diff --git a/core/Cargo.toml b/core/Cargo.toml index afd301d116..ffbbc03693 100644 --- a/core/Cargo.toml +++ b/core/Cargo.toml @@ -27,7 +27,7 @@ rustc-hash = { version = "1", optional = true } rand = { version = "0.8", optional = true } soketto = { version = "0.7.1", optional = true } parking_lot = { version = "0.12", optional = true } -tokio = { version = "1.8", optional = true } +tokio = { version = "1.16", optional = true } wasm-bindgen-futures = { version = "0.4.19", optional = true } futures-timer = { version = "3", optional = true } @@ -66,5 +66,5 @@ async-wasm-client = [ [dev-dependencies] serde_json = "1.0" -tokio = { version = "1.8", features = ["macros", "rt"] } +tokio = { version = "1.16", features = ["macros", "rt"] } jsonrpsee = { path = "../jsonrpsee", features = ["server", "macros"] } diff --git a/core/src/server/rpc_module.rs b/core/src/server/rpc_module.rs index fbf7b11ba6..b949f44934 100644 --- a/core/src/server/rpc_module.rs +++ b/core/src/server/rpc_module.rs @@ -61,6 +61,8 @@ pub type AsyncMethod<'a> = Arc< >; /// Method callback for subscriptions. pub type SubscriptionMethod = Arc bool>; +// Method callback to unsubscribe. +type UnsubscriptionMethod = Arc bool>; /// Connection ID, used for stateful protocol such as WebSockets. /// For stateless protocols such as http it's unused, so feel free to set it some hardcoded value. @@ -77,7 +79,7 @@ pub type RawRpcResponse = (String, mpsc::UnboundedReceiver, Subscription pub struct ConnState<'a> { /// Connection ID pub conn_id: ConnectionId, - /// Get notified when the connection to subscribers is closed. + /// Get notified when the connection to subscribers is closed.1 pub close_notify: SubscriptionPermit, /// ID provider. pub id_provider: &'a dyn IdProvider, @@ -114,8 +116,10 @@ pub enum MethodKind { Sync(SyncMethod), /// Asynchronous method handler. Async(AsyncMethod<'static>), - /// Subscription method handler + /// Subscription method handler. Subscription(SubscriptionMethod), + /// Unsubscription method handler. + Unsubscription(UnsubscriptionMethod), } /// Information about resources the method uses during its execution. Initialized when the the server starts. @@ -189,6 +193,13 @@ impl MethodCallback { } } + fn new_unsubscription(callback: UnsubscriptionMethod) -> Self { + MethodCallback { + callback: MethodKind::Unsubscription(callback), + resources: MethodResources::Uninitialized([].into()), + } + } + /// Attempt to claim resources prior to executing a method. On success returns a guard that releases /// claimed resources when dropped. pub fn claim(&self, name: &str, resources: &Resources) -> Result { @@ -210,6 +221,7 @@ impl Debug for MethodKind { Self::Async(_) => write!(f, "Async"), Self::Sync(_) => write!(f, "Sync"), Self::Subscription(_) => write!(f, "Subscription"), + Self::Unsubscription(_) => write!(f, "Unsubscription"), } } } @@ -405,6 +417,7 @@ impl Methods { let conn_state = ConnState { conn_id: 0, close_notify, id_provider: &RandomIntegerIdProvider }; (cb)(id, params, &sink, conn_state) } + Some(MethodKind::Unsubscription(cb)) => (cb)(id, params, &sink, 0), }; let resp = rx_sink.next().await.expect("tx and rx still alive; qed"); @@ -708,7 +721,7 @@ impl RpcModule { { self.methods.mut_callbacks().insert( unsubscribe_method_name, - MethodCallback::new_subscription(Arc::new(move |id, params, sink, conn| { + MethodCallback::new_unsubscription(Arc::new(move |id, params, sink, conn_id| { let sub_id = match params.one::() { Ok(sub_id) => sub_id, Err(_) => { @@ -723,8 +736,7 @@ impl RpcModule { }; let sub_id = sub_id.into_owned(); - let result = - subscribers.lock().remove(&SubscriptionKey { conn_id: conn.conn_id, sub_id }).is_some(); + let result = subscribers.lock().remove(&SubscriptionKey { conn_id, sub_id }).is_some(); sink.send_response(id, result) })), diff --git a/examples/Cargo.toml b/examples/Cargo.toml index ca0cedad10..da794b7003 100644 --- a/examples/Cargo.toml +++ b/examples/Cargo.toml @@ -13,7 +13,7 @@ futures = "0.3" jsonrpsee = { path = "../jsonrpsee", features = ["full"] } tracing = "0.1" tracing-subscriber = { version = "0.3.3", features = ["env-filter"] } -tokio = { version = "1.8", features = ["full"] } +tokio = { version = "1.16", features = ["full"] } tokio-stream = { version = "0.1", features = ["sync"] } serde_json = { version = "1" } diff --git a/http-server/Cargo.toml b/http-server/Cargo.toml index 50bff3149c..908c79ba5a 100644 --- a/http-server/Cargo.toml +++ b/http-server/Cargo.toml @@ -19,7 +19,7 @@ globset = "0.4" lazy_static = "1.4" tracing = "0.1" serde_json = "1" -tokio = { version = "1.8", features = ["rt-multi-thread", "macros"] } +tokio = { version = "1.16", features = ["rt-multi-thread", "macros"] } unicase = "2.6.0" [dev-dependencies] diff --git a/http-server/src/server.rs b/http-server/src/server.rs index 7caf660867..df66cbd8a0 100644 --- a/http-server/src/server.rs +++ b/http-server/src/server.rs @@ -554,7 +554,7 @@ async fn process_validated_request( false } }, - MethodKind::Subscription(_) => { + MethodKind::Subscription(_) | MethodKind::Unsubscription(_) => { tracing::error!("Subscriptions not supported on HTTP"); sink.send_error(req.id, ErrorCode::InternalError.into()); false @@ -622,7 +622,7 @@ async fn process_validated_request( None } }, - MethodKind::Subscription(_) => { + MethodKind::Subscription(_) | MethodKind::Unsubscription(_) => { tracing::error!("Subscriptions not supported on HTTP"); sink.send_error(req.id, ErrorCode::InternalError.into()); middleware.on_result(&req.method, false, request_start); diff --git a/proc-macros/Cargo.toml b/proc-macros/Cargo.toml index 30a48d51d7..15545a0633 100644 --- a/proc-macros/Cargo.toml +++ b/proc-macros/Cargo.toml @@ -21,6 +21,6 @@ proc-macro-crate = "1" [dev-dependencies] jsonrpsee = { path = "../jsonrpsee", features = ["full"] } trybuild = "1.0" -tokio = { version = "1.8", features = ["rt", "macros"] } +tokio = { version = "1.16", features = ["rt", "macros"] } futures-channel = { version = "0.3.14", default-features = false } futures-util = { version = "0.3.14", default-features = false } diff --git a/test-utils/Cargo.toml b/test-utils/Cargo.toml index 4fb1336ef8..1f43fcf736 100644 --- a/test-utils/Cargo.toml +++ b/test-utils/Cargo.toml @@ -16,5 +16,5 @@ tracing = "0.1" serde = { version = "1", default-features = false, features = ["derive"] } serde_json = "1" soketto = { version = "0.7.1", features = ["http"] } -tokio = { version = "1.8", features = ["net", "rt-multi-thread", "macros", "time"] } +tokio = { version = "1.16", features = ["net", "rt-multi-thread", "macros", "time"] } tokio-util = { version = "0.7", features = ["compat"] } diff --git a/tests/Cargo.toml b/tests/Cargo.toml index 42204bd613..6b3e9ca25f 100644 --- a/tests/Cargo.toml +++ b/tests/Cargo.toml @@ -12,7 +12,7 @@ env_logger = "0.9" beef = { version = "0.5.1", features = ["impl_serde"] } futures = { version = "0.3.14", default-features = false, features = ["std"] } jsonrpsee = { path = "../jsonrpsee", features = ["full"] } -tokio = { version = "1.8", features = ["full"] } +tokio = { version = "1.16", features = ["full"] } tracing = "0.1" serde = "1" serde_json = "1" diff --git a/ws-server/Cargo.toml b/ws-server/Cargo.toml index 17f4bfff05..131ca500a1 100644 --- a/ws-server/Cargo.toml +++ b/ws-server/Cargo.toml @@ -17,7 +17,7 @@ jsonrpsee-core = { path = "../core", version = "0.11.0", features = ["server", " tracing = "0.1" serde_json = { version = "1", features = ["raw_value"] } soketto = "0.7.1" -tokio = { version = "1.8", features = ["net", "rt-multi-thread", "macros", "time"] } +tokio = { version = "1.16", features = ["net", "rt-multi-thread", "macros", "time"] } tokio-util = { version = "0.7", features = ["compat"] } [dev-dependencies] diff --git a/ws-server/src/server.rs b/ws-server/src/server.rs index 824aee527a..a580f5244d 100644 --- a/ws-server/src/server.rs +++ b/ws-server/src/server.rs @@ -458,6 +458,23 @@ async fn background_task( middleware.on_response(request_start); } }, + MethodKind::Unsubscription(callback) => match method.claim(&req.method, &resources) { + Ok(guard) => { + let result = callback(id, params, &sink, conn_id); + middleware.on_result(name, result, request_start); + middleware.on_response(request_start); + drop(guard); + } + Err(err) => { + tracing::error!( + "[Methods::execute_with_resources] failed to lock resources: {:?}", + err + ); + sink.send_error(req.id, ErrorCode::ServerIsBusy.into()); + middleware.on_result(name, false, request_start); + middleware.on_response(request_start); + } + }, }, } } else { @@ -561,6 +578,26 @@ async fn background_task( err ); + sink_batch.send_error(req.id, ErrorCode::ServerIsBusy.into()); + middleware.on_result(&req.method, false, request_start); + None + } + } + } + MethodKind::Unsubscription(callback) => { + match method_callback.claim(&req.method, resources) { + Ok(guard) => { + let result = callback(id, params, &sink_batch, conn_id); + middleware.on_result(&req.method, result, request_start); + drop(guard); + None + } + Err(err) => { + tracing::error!( + "[Methods::execute_with_resources] failed to lock resources: {:?}", + err + ); + sink_batch.send_error(req.id, ErrorCode::ServerIsBusy.into()); middleware.on_result(&req.method, false, request_start); None From 5aad4f4b43c197d3d3d7709041246664aa928d53 Mon Sep 17 00:00:00 2001 From: James Wilson Date: Tue, 3 May 2022 11:59:55 +0100 Subject: [PATCH 08/12] No resource limiting for Unsubscribe calls --- ws-server/src/server.rs | 45 +++++++++-------------------------------- 1 file changed, 10 insertions(+), 35 deletions(-) diff --git a/ws-server/src/server.rs b/ws-server/src/server.rs index a580f5244d..5d548be88d 100644 --- a/ws-server/src/server.rs +++ b/ws-server/src/server.rs @@ -458,23 +458,12 @@ async fn background_task( middleware.on_response(request_start); } }, - MethodKind::Unsubscription(callback) => match method.claim(&req.method, &resources) { - Ok(guard) => { - let result = callback(id, params, &sink, conn_id); - middleware.on_result(name, result, request_start); - middleware.on_response(request_start); - drop(guard); - } - Err(err) => { - tracing::error!( - "[Methods::execute_with_resources] failed to lock resources: {:?}", - err - ); - sink.send_error(req.id, ErrorCode::ServerIsBusy.into()); - middleware.on_result(name, false, request_start); - middleware.on_response(request_start); - } - }, + MethodKind::Unsubscription(callback) => { + // Don't adhere to any resource or subscription limits; always let unsubscribing happen! + let result = callback(id, params, &sink, conn_id); + middleware.on_result(name, result, request_start); + middleware.on_response(request_start); + } }, } } else { @@ -585,24 +574,10 @@ async fn background_task( } } MethodKind::Unsubscription(callback) => { - match method_callback.claim(&req.method, resources) { - Ok(guard) => { - let result = callback(id, params, &sink_batch, conn_id); - middleware.on_result(&req.method, result, request_start); - drop(guard); - None - } - Err(err) => { - tracing::error!( - "[Methods::execute_with_resources] failed to lock resources: {:?}", - err - ); - - sink_batch.send_error(req.id, ErrorCode::ServerIsBusy.into()); - middleware.on_result(&req.method, false, request_start); - None - } - } + // Don't adhere to any resource or subscription limits; always let unsubscribing happen! + let result = callback(id, params, &sink_batch, conn_id); + middleware.on_result(&req.method, result, request_start); + None } }, } From 79d05bfa9d9fda1b0c4a704815d2d4410903ab28 Mon Sep 17 00:00:00 2001 From: James Wilson Date: Tue, 3 May 2022 13:16:46 +0100 Subject: [PATCH 09/12] Test that we can still unsubscribe after hitting a limit --- core/src/client/mod.rs | 5 +++ tests/tests/integration_tests.rs | 56 ++++++++++++++++++++++++++++++++ 2 files changed, 61 insertions(+) diff --git a/core/src/client/mod.rs b/core/src/client/mod.rs index f48373a0c4..ea8cf7d7ae 100644 --- a/core/src/client/mod.rs +++ b/core/src/client/mod.rs @@ -207,6 +207,11 @@ impl Subscription { ) -> Self { Self { to_back, notifs_rx, kind, marker: PhantomData } } + + /// Return the subscription type and, if applicable, ID. + pub fn kind(&self) -> &SubscriptionKind { + &self.kind + } } /// Batch request message. diff --git a/tests/tests/integration_tests.rs b/tests/tests/integration_tests.rs index ac982f1a58..cd06a9826c 100644 --- a/tests/tests/integration_tests.rs +++ b/tests/tests/integration_tests.rs @@ -547,6 +547,62 @@ async fn ws_server_limit_subs_per_conn_works() { ); } +#[tokio::test] +async fn ws_server_unsub_methods_should_ignore_sub_limit() { + use futures::StreamExt; + use jsonrpsee::core::client::SubscriptionKind; + use jsonrpsee::{ws_server::WsServerBuilder, RpcModule}; + + let server = WsServerBuilder::default().max_subscriptions_per_connection(10).build("127.0.0.1:0").await.unwrap(); + let server_url = format!("ws://{}", server.local_addr().unwrap()); + + let mut module = RpcModule::new(()); + + module + .register_subscription("subscribe_forever", "n", "unsubscribe_forever", |_, pending, _| { + let mut sink = match pending.accept() { + Some(sink) => sink, + _ => return, + }; + + tokio::spawn(async move { + let interval = interval(Duration::from_millis(50)); + let stream = IntervalStream::new(interval).map(move |_| 0_usize); + + match sink.pipe_from_stream(stream).await { + SubscriptionClosed::RemotePeerAborted => { + sink.close(SubscriptionClosed::RemotePeerAborted); + } + _ => unreachable!(), + }; + }); + }) + .unwrap(); + server.start(module).unwrap(); + + let client = WsClientBuilder::default().build(&server_url).await.unwrap(); + + // Add 10 subscriptions (this should fill our subscrition limit for this connection): + let mut subs = Vec::new(); + for _ in 0..10 { + subs.push(client.subscribe::("subscribe_forever", None, "unsubscribe_forever").await.unwrap()); + } + + // Get the ID of one of them: + let last_sub = subs.pop().unwrap(); + let last_sub_id = match last_sub.kind() { + SubscriptionKind::Subscription(id) => id.clone(), + _ => panic!("Expected a subscription Id to be present"), + }; + + // Manually call the unsubscribe function for this subscription: + let res: Result = client.request("unsubscribe_forever", rpc_params![last_sub_id]).await; + + // This should not hit any limits, and unsubscription should have worked: + assert!(res.is_ok(), "Unsubscription method was successfully called"); + assert_eq!(res.unwrap(), true, "Unsubscription was successful"); +} + #[tokio::test] async fn http_unsupported_methods_dont_work() { use hyper::{Body, Client, Method, Request}; From a7049297a79792fc6d6a08cb394545affb80d359 Mon Sep 17 00:00:00 2001 From: James Wilson Date: Tue, 3 May 2022 14:22:15 +0100 Subject: [PATCH 10/12] Fix a comment typo Co-authored-by: Alexandru Vasile <60601340+lexnv@users.noreply.github.com> --- core/src/server/rpc_module.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/src/server/rpc_module.rs b/core/src/server/rpc_module.rs index b949f44934..cf63fb0020 100644 --- a/core/src/server/rpc_module.rs +++ b/core/src/server/rpc_module.rs @@ -79,7 +79,7 @@ pub type RawRpcResponse = (String, mpsc::UnboundedReceiver, Subscription pub struct ConnState<'a> { /// Connection ID pub conn_id: ConnectionId, - /// Get notified when the connection to subscribers is closed.1 + /// Get notified when the connection to subscribers is closed. pub close_notify: SubscriptionPermit, /// ID provider. pub id_provider: &'a dyn IdProvider, From 21009a948309e79c7c1efbc526317e8fd607133f Mon Sep 17 00:00:00 2001 From: Niklas Adolfsson Date: Tue, 3 May 2022 15:39:35 +0200 Subject: [PATCH 11/12] Update core/src/server/rpc_module.rs --- core/src/server/rpc_module.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/src/server/rpc_module.rs b/core/src/server/rpc_module.rs index cf63fb0020..d804f05c96 100644 --- a/core/src/server/rpc_module.rs +++ b/core/src/server/rpc_module.rs @@ -407,7 +407,7 @@ impl Methods { let params = Params::new(req.params.map(|params| params.get())); let bounded_subs = BoundedSubscriptions::new(u32::MAX); let close_notify = bounded_subs.acquire().unwrap(); - let notify = bounded_subs.acquire().unwrap(); + let notify = bounded_subs.acquire().expect("u32::MAX permits is sufficient; qed"); let _result = match self.method(&req.method).map(|c| &c.callback) { None => sink.send_error(req.id, ErrorCode::MethodNotFound.into()), From dfedaa74f43bef4f5206e954b536a704c24bf584 Mon Sep 17 00:00:00 2001 From: Niklas Adolfsson Date: Tue, 3 May 2022 15:39:46 +0200 Subject: [PATCH 12/12] Update core/src/server/rpc_module.rs --- core/src/server/rpc_module.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/src/server/rpc_module.rs b/core/src/server/rpc_module.rs index d804f05c96..1c63446e5d 100644 --- a/core/src/server/rpc_module.rs +++ b/core/src/server/rpc_module.rs @@ -406,7 +406,7 @@ impl Methods { let id = req.id.clone(); let params = Params::new(req.params.map(|params| params.get())); let bounded_subs = BoundedSubscriptions::new(u32::MAX); - let close_notify = bounded_subs.acquire().unwrap(); + let close_notify = bounded_subs.acquire().expect("u32::MAX permits is sufficient; qed"); let notify = bounded_subs.acquire().expect("u32::MAX permits is sufficient; qed"); let _result = match self.method(&req.method).map(|c| &c.callback) {