Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Return error from subscription callbacks #799

Merged
merged 72 commits into from
Jun 29, 2022
Merged
Show file tree
Hide file tree
Changes from 66 commits
Commits
Show all changes
72 commits
Select commit Hold shift + click to select a range
107a4e9
subscription: Allow errors in subscription callbacks
lexnv Jun 17, 2022
e05f5a8
subscription: Remove the need to own the error
lexnv Jun 17, 2022
866595f
error: Build `ErrorObject` from `CallError` for improved ergonomics
lexnv Jun 20, 2022
52a9c41
Update examples for the new subscription API
lexnv Jun 20, 2022
6d25ec3
Add alias for subscription result
lexnv Jun 20, 2022
d5e0fc8
macros: Render server subscription method with `ResultSubscription`
lexnv Jun 20, 2022
fb6268d
Port `proc_macro` example to new API
lexnv Jun 20, 2022
924c0ee
Rename `ResultSubscription` to `ReturnTypeSubscription` to avoid conf…
lexnv Jun 20, 2022
60058eb
Port all tests to new subscription API
lexnv Jun 20, 2022
5ee4ee1
Update documentation
lexnv Jun 20, 2022
50525f5
Port benches
lexnv Jun 20, 2022
eca7ce6
Replace tabs with spaces & add documentation
lexnv Jun 20, 2022
ab3ab52
Add dummy error for subscriptions
lexnv Jun 21, 2022
efe48fe
Implement `From` for `SubscriptionError`
lexnv Jun 21, 2022
2089161
Return `SubscriptionError` when parsing params
lexnv Jun 21, 2022
710ee3c
Rename `SubscriptionError` to `SubscriptionEmptyError`
lexnv Jun 21, 2022
95245d9
Change `accept` signature
lexnv Jun 21, 2022
00a91f4
Port tests to new `accept` api
lexnv Jun 21, 2022
acb65d4
Implement `pipe_from_try_stream` and `pipe_from_stream` for `PendingS…
lexnv Jun 21, 2022
bf167f8
Modify examples to ilustrate new API
lexnv Jun 21, 2022
f8cd8b4
Fix docs tests
lexnv Jun 21, 2022
70b9d8e
Rename previously `SubscriptionResult` -> `InnerSubscriptionResult`
lexnv Jun 21, 2022
4efef2d
Rename `ReturnTypeSubscription` -> `SubscriptionResult`
lexnv Jun 21, 2022
c2c29a2
Remove documentation line
lexnv Jun 21, 2022
368d803
Implement `PipeFromStreamResult`
lexnv Jun 21, 2022
9a21c54
Add comment for empty error
lexnv Jun 21, 2022
1065fd1
Update proc-macros/src/lib.rs
lexnv Jun 22, 2022
2a0a89c
Update proc-macros/src/lib.rs
lexnv Jun 22, 2022
9cb9e1a
Update proc-macros/src/lib.rs
lexnv Jun 22, 2022
96dda16
Change `ReturnTypeSubscription` -> `SubscriptionResult`
lexnv Jun 22, 2022
bab6eaf
Add `ResultConsumed` for `PipeFromStreamResult`
lexnv Jun 22, 2022
ae59750
Update examples to use `PipeFromStreamResult`
lexnv Jun 22, 2022
549e6ba
Replace ConsumedResult with Options
lexnv Jun 22, 2022
61b1e0e
Merge remote-tracking branch 'origin/master' into 734_error_subscript…
lexnv Jun 22, 2022
e6ed17b
Log warning when subscription callback fails
lexnv Jun 22, 2022
f95403e
Change ubuntu test names
lexnv Jun 22, 2022
8d8858c
server: Make `pipe` methods of `SubscriptionSink` private
lexnv Jun 24, 2022
d3c3ce9
server: Remove `pipe_from_stream` method of `SubscriptionSink`
lexnv Jun 24, 2022
0b1c927
server: Update PipeFromStreamResult documentation
lexnv Jun 24, 2022
8cf401c
Adjust tests to `SubscriptionSink::pipe_from_stream` private interface
lexnv Jun 24, 2022
348f366
Add `accept-reject` API on `SubscriptionSink`
lexnv Jun 27, 2022
15f18c0
Make `pipe_from_try_stream` public
lexnv Jun 27, 2022
5d21441
Maybe accept the subscription
lexnv Jun 27, 2022
277bbd7
Revert "server: Remove `pipe_from_stream` method of `SubscriptionSink`"
lexnv Jun 27, 2022
64f45e9
Make `unsubscribe` channel optional on accepting the connection
lexnv Jun 27, 2022
712112b
Pass `SubscriptionSink` to subscription callbacks
lexnv Jun 27, 2022
1f66edf
Implement subscription sink state
lexnv Jun 27, 2022
99ffeb7
Submit `InvalidParams` if sink was never accepted
lexnv Jun 27, 2022
7657202
Handle rejected sinks
lexnv Jun 27, 2022
24fcf3e
Remove `PendingSubscription`
lexnv Jun 27, 2022
f30d9d0
Fix doc tests
lexnv Jun 27, 2022
c968cac
macro: Make subscription sink mutable
lexnv Jun 27, 2022
38f3a9f
Fix tests and examples
lexnv Jun 27, 2022
53eeff9
macro: Return `sink.reject()` result
lexnv Jun 28, 2022
c7b2867
tests: Add test for `SubscriptionSinkState`
lexnv Jun 28, 2022
9e8f649
Test internal subscription sink state
lexnv Jun 28, 2022
18d8169
Fix `send_error` to not always return `false`
lexnv Jun 28, 2022
1d983c7
Fix benches
lexnv Jun 28, 2022
639e524
Remove `PipeFromStreamResult`
lexnv Jun 28, 2022
45cfdf0
Use valid Json-RPC return code for test errors
lexnv Jun 28, 2022
abbea2c
Remove `SubscriptionSinkState`"
lexnv Jun 28, 2022
8d3f7e5
Remodel state machine using `Option`s for `SubscriptionSink`s
lexnv Jun 28, 2022
f5952de
tests: Double accept / reject API for `SubscriptionSink`
lexnv Jun 28, 2022
8b04cd1
Implement `SubscriptionAcceptRejectError` for error propagation
lexnv Jun 28, 2022
61a03eb
Remove `maybe_accept` wrapper
lexnv Jun 28, 2022
2a36c26
Update comments and documentation
lexnv Jun 28, 2022
13de232
Update core/src/server/rpc_module.rs
lexnv Jun 29, 2022
9df5b42
Update core/src/server/rpc_module.rs
lexnv Jun 29, 2022
77ab7de
rpc_server: Add type alias for unsubscription calls
lexnv Jun 29, 2022
0f255ad
rpc_server: Improve comment regarding dropped error
lexnv Jun 29, 2022
997706a
style: Single line return errors
lexnv Jun 29, 2022
c4f0f49
Make comment more verbose
jsdw Jun 29, 2022
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,7 @@ jobs:
run: cargo hack check --workspace --each-feature --all-targets

tests_ubuntu:
name: Run nextests on Ubuntu
name: Run tests Ubuntu
runs-on: ubuntu-latest
steps:
- name: Checkout sources
Expand Down
7 changes: 2 additions & 5 deletions benches/helpers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -141,13 +141,10 @@ pub async fn ws_server(handle: tokio::runtime::Handle) -> (String, jsonrpsee::ws
let mut module = gen_rpc_module();

module
.register_subscription(SUB_METHOD_NAME, SUB_METHOD_NAME, UNSUB_METHOD_NAME, |_params, pending, _ctx| {
let mut sink = match pending.accept() {
Some(sink) => sink,
_ => return,
};
.register_subscription(SUB_METHOD_NAME, SUB_METHOD_NAME, UNSUB_METHOD_NAME, |_params, mut sink, _ctx| {
let x = "Hello";
tokio::spawn(async move { sink.send(&x) });
Ok(())
})
.unwrap();

Expand Down
5 changes: 3 additions & 2 deletions core/src/server/helpers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -156,9 +156,10 @@ impl MethodSink {

if let Err(err) = self.send_raw(json) {
tracing::warn!("Error sending response {:?}", err);
false
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think a simple two-variant enum could help understand what the bool returned from send_error specifically means

Copy link
Member

@niklasad1 niklasad1 Jun 29, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agree I think it was designed like that to be easy convert to "success/or not" in the middleware but it's unrelated this PR.

Let's tackle it another PR.

} else {
true
}

false
}

/// Helper for sending the general purpose `Error` as a JSON-RPC errors to the client
Expand Down
201 changes: 99 additions & 102 deletions core/src/server/rpc_module.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,11 +39,14 @@ use futures_channel::mpsc;
use futures_util::future::Either;
use futures_util::pin_mut;
use futures_util::{future::BoxFuture, FutureExt, Stream, StreamExt, TryStream, TryStreamExt};
use jsonrpsee_types::error::{CallError, ErrorCode, ErrorObject, ErrorObjectOwned, SUBSCRIPTION_CLOSED_WITH_ERROR};
use jsonrpsee_types::error::{
CallError, ErrorCode, ErrorObject, ErrorObjectOwned, INTERNAL_ERROR_CODE,
SUBSCRIPTION_CLOSED_WITH_ERROR, SubscriptionAcceptRejectError
};
use jsonrpsee_types::response::{SubscriptionError, SubscriptionPayloadError};
use jsonrpsee_types::{
ErrorResponse, Id, Params, Request, Response, SubscriptionId as RpcSubscriptionId, SubscriptionPayload,
SubscriptionResponse,
ErrorResponse, Id, Params, Request, Response, SubscriptionResult,
SubscriptionId as RpcSubscriptionId, SubscriptionPayload, SubscriptionResponse
};
use parking_lot::Mutex;
use rustc_hash::FxHashMap;
Expand Down Expand Up @@ -87,7 +90,7 @@ pub struct ConnState<'a> {

/// Outcome of a successful terminated subscription.
#[derive(Debug)]
pub enum SubscriptionResult {
pub enum InnerSubscriptionResult {
/// The subscription stream was executed successfully.
Success,
/// The subscription was aborted by the remote peer.
Expand Down Expand Up @@ -382,8 +385,9 @@ impl Methods {
/// use futures_util::StreamExt;
///
/// let mut module = RpcModule::new(());
/// module.register_subscription("hi", "hi", "goodbye", |_, pending, _| {
/// pending.accept().unwrap().send(&"one answer").unwrap();
/// module.register_subscription("hi", "hi", "goodbye", |_, mut sink, _| {
/// sink.send(&"one answer").unwrap();
/// Ok(())
/// }).unwrap();
/// let (resp, mut stream) = module.raw_json_request(r#"{"jsonrpc":"2.0","method":"hi","id":0}"#).await.unwrap();
/// let resp = serde_json::from_str::<Response<u64>>(&resp).unwrap();
Expand Down Expand Up @@ -443,8 +447,9 @@ impl Methods {
/// use jsonrpsee::{RpcModule, types::EmptyParams};
///
/// let mut module = RpcModule::new(());
/// module.register_subscription("hi", "hi", "goodbye", |_, pending, _| {
/// pending.accept().unwrap().send(&"one answer").unwrap();
/// module.register_subscription("hi", "hi", "goodbye", |_, mut sink, _| {
/// sink.send(&"one answer").unwrap();
/// Ok(())
/// }).unwrap();
///
/// let mut sub = module.subscribe("hi", EmptyParams::new()).await.unwrap();
Expand Down Expand Up @@ -653,27 +658,22 @@ impl<Context: Send + Sync + 'static> RpcModule<Context> {
/// use jsonrpsee_core::Error;
///
/// let mut ctx = RpcModule::new(99_usize);
/// ctx.register_subscription("sub", "notif_name", "unsub", |params, pending, ctx| {
/// ctx.register_subscription("sub", "notif_name", "unsub", |params, mut sink, ctx| {
/// let x = match params.one::<usize>() {
/// Ok(x) => x,
/// Err(e) => {
/// let err: Error = e.into();
/// pending.reject(err);
/// return;
/// }
/// };
///
/// let mut sink = match pending.accept() {
/// Some(sink) => sink,
/// _ => {
/// return;
/// sink.reject(err);
/// return Ok(());
/// }
/// };
///
/// // Sink is accepted on the first `send` call.
/// std::thread::spawn(move || {
/// let sum = x + (*ctx);
/// let _ = sink.send(&sum);
/// });
///
/// Ok(())
/// });
/// ```
pub fn register_subscription<F>(
Expand All @@ -685,7 +685,7 @@ impl<Context: Send + Sync + 'static> RpcModule<Context> {
) -> Result<MethodResourcesBuilder, Error>
where
Context: Send + Sync + 'static,
F: Fn(Params, PendingSubscription, Arc<Context>) + Send + Sync + 'static,
F: Fn(Params, SubscriptionSink, Arc<Context>) -> SubscriptionResult + Send + Sync + 'static,
{
if subscribe_method_name == unsubscribe_method_name {
return Err(Error::SubscriptionNameConflict(subscribe_method_name.into()));
Expand Down Expand Up @@ -740,17 +740,21 @@ impl<Context: Send + Sync + 'static> RpcModule<Context> {
MethodCallback::new_subscription(Arc::new(move |id, params, method_sink, conn, claimed| {
let sub_id: RpcSubscriptionId = conn.id_provider.next_id();

let sink = PendingSubscription(Some(InnerPendingSubscription {
sink: method_sink.clone(),
let sink = SubscriptionSink {
inner: method_sink.clone(),
close_notify: Some(conn.close_notify),
method: notif_method_name,
subscribers: subscribers.clone(),
uniq_sub: SubscriptionKey { conn_id: conn.conn_id, sub_id },
id: id.clone().into_owned(),
claimed,
}));
id: Some(id.clone().into_owned()),
unsubscribe: None,
_claimed: claimed,
};

callback(params, sink, ctx.clone());
// The callback returns an empty `SubscriptionError` for improved API ergonomics.
lexnv marked this conversation as resolved.
Show resolved Hide resolved
if let Err(err) = callback(params, sink, ctx.clone()) {
tracing::warn!("subscribe call `{}` failed with err={:?}", subscribe_method_name, err);
lexnv marked this conversation as resolved.
Show resolved Hide resolved
}

true
})),
Expand All @@ -775,101 +779,73 @@ impl<Context: Send + Sync + 'static> RpcModule<Context> {
}
}

/// Represent a pending subscription which waits to be accepted or rejected.
///
/// Note: you need to call either `PendingSubscription::accept` or `PendingSubscription::reject` otherwise
/// the subscription will be dropped with an `InvalidParams` error.
/// Represents a single subscription.
#[derive(Debug)]
struct InnerPendingSubscription {
pub struct SubscriptionSink {
/// Sink.
sink: MethodSink,
inner: MethodSink,
/// Get notified when subscribers leave so we can exit
close_notify: Option<SubscriptionPermit>,
/// MethodCallback.
method: &'static str,
/// Shared Mutex of subscriptions for this method.
subscribers: Subscribers,
/// Unique subscription.
uniq_sub: SubscriptionKey,
/// Shared Mutex of subscriptions
subscribers: Subscribers,
/// Request ID.
id: Id<'static>,
/// Id of the subscription.
lexnv marked this conversation as resolved.
Show resolved Hide resolved
///
/// *Note*: Having some value means the subscription was not accepted or rejected yet.
id: Option<Id<'static>>,
/// Returns when the unsubscribe method has been called.
///
/// *Note*: Have some values means the subscription was accepted.
unsubscribe: Option<watch::Receiver<()>>,
lexnv marked this conversation as resolved.
Show resolved Hide resolved
/// Claimed resources.
claimed: Option<ResourceGuard>,
_claimed: Option<ResourceGuard>,
}

/// Represent a pending subscription which waits until it's either accepted or rejected.
///
/// This type implements `Drop` for ease of use, e.g. when dropped in error short circuiting via `map_err()?`.
#[derive(Debug)]
pub struct PendingSubscription(Option<InnerPendingSubscription>);

impl PendingSubscription {
impl SubscriptionSink {
/// Reject the subscription call from [`ErrorObject`].
pub fn reject(mut self, err: impl Into<ErrorObjectOwned>) -> bool {
if let Some(inner) = self.0.take() {
let InnerPendingSubscription { sink, id, .. } = inner;
sink.send_error(id, err.into())
pub fn reject(&mut self, err: impl Into<ErrorObjectOwned>) -> Result<(), SubscriptionAcceptRejectError> {
let id = self.id.take().ok_or(SubscriptionAcceptRejectError::AlreadyCalled)?;

if self.inner.send_error(id, err.into()) {
Ok(())
} else {
false
Err(SubscriptionAcceptRejectError::RemotePeerAborted)
}
}

/// Attempt to accept the subscription and respond the subscription method call.
///
/// Fails if the connection was closed
pub fn accept(mut self) -> Option<SubscriptionSink> {
let inner = self.0.take()?;

let InnerPendingSubscription { sink, close_notify, method, uniq_sub, subscribers, id, claimed } = inner;
/// Fails if the connection was closed, or if called multiple times.
pub fn accept(&mut self) -> Result<(), SubscriptionAcceptRejectError> {
let id = self.id.take().ok_or(SubscriptionAcceptRejectError::AlreadyCalled)?;

if sink.send_response(id, &uniq_sub.sub_id) {
if self.inner.send_response(id, &self.uniq_sub.sub_id) {
let (tx, rx) = watch::channel(());
subscribers.lock().insert(uniq_sub.clone(), (sink.clone(), tx));
Some(SubscriptionSink { inner: sink, close_notify, method, uniq_sub, subscribers, unsubscribe: rx, _claimed: claimed })
self.subscribers.lock().insert(self.uniq_sub.clone(), (self.inner.clone(), tx));
self.unsubscribe = Some(rx);
Ok(())
} else {
None
Err(SubscriptionAcceptRejectError::RemotePeerAborted)
}
}
}

// When dropped it returns an [`InvalidParams`] error to the subscriber
impl Drop for PendingSubscription {
fn drop(&mut self) {
if let Some(inner) = self.0.take() {
let InnerPendingSubscription { sink, id, .. } = inner;
sink.send_error(id, ErrorCode::InvalidParams.into());
}
}
}

/// Represents a single subscription.
#[derive(Debug)]
pub struct SubscriptionSink {
/// Sink.
inner: MethodSink,
/// Get notified when subscribers leave so we can exit
close_notify: Option<SubscriptionPermit>,
/// MethodCallback.
method: &'static str,
/// Unique subscription.
uniq_sub: SubscriptionKey,
/// Shared Mutex of subscriptions for this method.
subscribers: Subscribers,
/// Future that returns when the unsubscribe method has been called.
unsubscribe: watch::Receiver<()>,
/// Claimed resources.
_claimed: Option<ResourceGuard>,
}

impl SubscriptionSink {
/// Send a message back to subscribers.
///
/// Returns `Ok(true)` if the message could be send
/// Returns `Ok(false)` if the sink was closed (either because the subscription was closed or the connection was terminated)
/// Return `Err(err)` if the message could not be serialized.
///
/// Returns
/// - `Ok(true)` if the message could be send.
/// - `Ok(false)` if the sink was closed (either because the subscription was closed or the connection was terminated),
/// or the subscription could not be accepted.
/// - `Err(err)` if the message could not be serialized.
pub fn send<T: Serialize>(&mut self, result: &T) -> Result<bool, serde_json::Error> {
// only possible to trigger when the connection is dropped.
// Cannot accept the subscription.
if let Err(SubscriptionAcceptRejectError::RemotePeerAborted) = self.accept() {
return Ok(false);
}

// Only possible to trigger when the connection is dropped.
if self.is_closed() {
return Ok(false);
}
Expand All @@ -889,14 +865,13 @@ impl SubscriptionSink {
///
/// ```no_run
///
/// use jsonrpsee_core::server::rpc_module::{RpcModule, SubscriptionResult};
/// use jsonrpsee_core::server::rpc_module::RpcModule;
/// use jsonrpsee_core::error::{Error, SubscriptionClosed};
/// use jsonrpsee_types::ErrorObjectOwned;
/// use anyhow::anyhow;
lexnv marked this conversation as resolved.
Show resolved Hide resolved
///
/// let mut m = RpcModule::new(());
/// m.register_subscription("sub", "_", "unsub", |params, pending, _| {
/// let mut sink = pending.accept().unwrap();
/// m.register_subscription("sub", "_", "unsub", |params, mut sink, _| {
/// let stream = futures_util::stream::iter(vec![Ok(1_u32), Ok(2), Err("error on the stream")]);
/// // This will return send `[Ok(1_u32), Ok(2_u32), Err(Error::SubscriptionClosed))]` to the subscriber
/// // because after the `Err(_)` the stream is terminated.
Expand All @@ -915,6 +890,7 @@ impl SubscriptionSink {
/// }
/// }
/// });
/// Ok(())
/// });
/// ```
pub async fn pipe_from_try_stream<S, T, E>(&mut self, mut stream: S) -> SubscriptionClosed
Expand All @@ -923,14 +899,29 @@ impl SubscriptionSink {
T: Serialize,
E: std::fmt::Display,
{
if let Err(SubscriptionAcceptRejectError::RemotePeerAborted) = self.accept() {
return SubscriptionClosed::RemotePeerAborted;
}

let conn_closed = match self.close_notify.as_ref().map(|cn| cn.handle()) {
Some(cn) => cn,
None => {
return SubscriptionClosed::RemotePeerAborted;
}
};

let mut sub_closed = self.unsubscribe.clone();
let mut sub_closed = match self.unsubscribe.as_ref() {
Some(rx) => rx.clone(),
_ => {
lexnv marked this conversation as resolved.
Show resolved Hide resolved
let err = ErrorObject::owned(
INTERNAL_ERROR_CODE,
"Unsubscribe watcher not set after accepting the subscription".to_string(),
None::<()>
);
return SubscriptionClosed::Failed(err);
}
};

let sub_closed_fut = sub_closed.changed();

let conn_closed_fut = conn_closed.notified();
Expand Down Expand Up @@ -983,10 +974,10 @@ impl SubscriptionSink {
/// use jsonrpsee_core::server::rpc_module::RpcModule;
///
/// let mut m = RpcModule::new(());
/// m.register_subscription("sub", "_", "unsub", |params, pending, _| {
/// let mut sink = pending.accept().unwrap();
/// m.register_subscription("sub", "_", "unsub", |params, mut sink, _| {
/// let stream = futures_util::stream::iter(vec![1_usize, 2, 3]);
/// tokio::spawn(async move { sink.pipe_from_stream(stream).await; });
/// Ok(())
/// });
/// ```
pub async fn pipe_from_stream<S, T>(&mut self, stream: S) -> SubscriptionClosed
niklasad1 marked this conversation as resolved.
Show resolved Hide resolved
Expand All @@ -1003,7 +994,10 @@ impl SubscriptionSink {
}

fn is_active_subscription(&self) -> bool {
!self.unsubscribe.has_changed().is_err()
match self.unsubscribe.as_ref() {
Some(unsubscribe) => !unsubscribe.has_changed().is_err(),
_ => false,
}
}

fn build_message<T: Serialize>(&self, result: &T) -> Result<String, serde_json::Error> {
Expand Down Expand Up @@ -1057,7 +1051,10 @@ impl SubscriptionSink {

impl Drop for SubscriptionSink {
fn drop(&mut self) {
if self.is_active_subscription() {
// Subscription was never accepted.
if let Some(id) = self.id.take() {
lexnv marked this conversation as resolved.
Show resolved Hide resolved
self.inner.send_error(id, ErrorCode::InvalidParams.into());
} else if self.is_active_subscription() {
self.subscribers.lock().remove(&self.uniq_sub);
}
}
Expand Down
Loading