Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor SubscriptionClosed #612

Merged
merged 8 commits into from
Dec 15, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -2,4 +2,4 @@ error: Method argument names must be valid Rust identifiers; got `_` instead
--> $DIR/method_ignored_arguments.rs:6:20
|
6 | async fn a(&self, _: Vec<u8>);
| ^^^^^^^^^^
| ^
6 changes: 4 additions & 2 deletions tests/tests/integration_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ use jsonrpsee::{
http_client::HttpClientBuilder,
rpc_params,
types::{
error::SubscriptionClosedError,
error::SubscriptionClosedReason,
traits::{Client, SubscriptionClient},
Error, JsonValue, Subscription,
},
Expand Down Expand Up @@ -356,8 +356,10 @@ async fn ws_server_should_stop_subscription_after_client_drop() {

assert_eq!(res, 1);
drop(client);
let close_err = rx.next().await.unwrap();

// assert that the server received `SubscriptionClosed` after the client was dropped.
assert!(matches!(rx.next().await.unwrap(), SubscriptionClosedError { .. }));
assert!(matches!(close_err.close_reason(), &SubscriptionClosedReason::ConnectionReset));
}

#[tokio::test]
Expand Down
8 changes: 5 additions & 3 deletions types/src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@
// IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
// DEALINGS IN THE SOFTWARE.

use crate::{error::SubscriptionClosedError, v2::SubscriptionId, Error};
use crate::{error::SubscriptionClosed, v2::SubscriptionId, Error};
use core::marker::PhantomData;
use futures_channel::{mpsc, oneshot};
use futures_util::{
Expand Down Expand Up @@ -53,9 +53,11 @@ pub enum SubscriptionKind {
/// the server was a valid notification or should be treated as an error.
#[derive(Debug, Deserialize, Serialize)]
#[serde(untagged)]
enum NotifResponse<Notif> {
pub enum NotifResponse<Notif> {
/// Successful response
Ok(Notif),
Err(SubscriptionClosedError),
/// Subscription was closed.
Err(SubscriptionClosed),
}

/// Active subscription on the client.
Expand Down
73 changes: 58 additions & 15 deletions types/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -129,7 +129,7 @@ pub enum Error {
SubscriptionNameConflict(String),
/// Subscription got closed.
#[error("Subscription closed: {0:?}")]
SubscriptionClosed(SubscriptionClosedError),
SubscriptionClosed(SubscriptionClosed),
/// Request timeout
#[error("Request timeout")]
RequestTimeout,
Expand Down Expand Up @@ -177,32 +177,48 @@ impl Error {
}
}

/// Error type with a special `subscription_closed` field to detect that
/// A type with a special `subscription_closed` field to detect that
/// a subscription has been closed to distinguish valid items produced
/// by the server on the subscription stream from an error.
///
/// This is included in the `result field` of the SubscriptionResponse
/// when an error is reported by the server.
#[derive(Deserialize, Serialize, Debug, PartialEq)]
pub struct SubscriptionClosedError {
subscription_closed: String,
id: u64,
#[serde(deny_unknown_fields)]
pub struct SubscriptionClosed {
reason: SubscriptionClosedReason,
}

impl SubscriptionClosedError {
/// Create a new subscription closed error.
pub fn new(reason: impl Into<String>, id: u64) -> Self {
Self { subscription_closed: reason.into(), id }
impl From<SubscriptionClosedReason> for SubscriptionClosed {
fn from(reason: SubscriptionClosedReason) -> Self {
Self::new(reason)
}
}

/// Get the reason why the subscription was closed.
pub fn close_reason(&self) -> &str {
&self.subscription_closed
impl SubscriptionClosed {
/// Create a new [`SubscriptionClosed`].
pub fn new(reason: SubscriptionClosedReason) -> Self {
Self { reason }
}

/// Get the subscription ID.
pub fn subscription_id(&self) -> u64 {
self.id
/// Get the close reason.
pub fn close_reason(&self) -> &SubscriptionClosedReason {
&self.reason
}
}

/// A type to represent when a subscription gets closed
/// by either the server or client side.
#[derive(Deserialize, Serialize, Debug, PartialEq)]
niklasad1 marked this conversation as resolved.
Show resolved Hide resolved
pub enum SubscriptionClosedReason {
dvdplm marked this conversation as resolved.
Show resolved Hide resolved
/// The subscription was closed by calling the unsubscribe method.
Unsubscribed,
/// The client closed the connection.
ConnectionReset,
/// The server closed the subscription, providing a description of the reason as a `String`.
Server(String),
}

/// Generic transport error.
#[derive(Debug, thiserror::Error)]
pub enum GenericTransportError<T: std::error::Error + Send + Sync> {
Expand Down Expand Up @@ -240,3 +256,30 @@ impl From<hyper::Error> for Error {
Error::Transport(hyper_err.into())
}
}

#[cfg(test)]
mod tests {
use super::{SubscriptionClosed, SubscriptionClosedReason};

#[test]
fn subscription_closed_ser_deser_works() {
let items: Vec<(&str, SubscriptionClosed)> = vec![
(r#"{"reason":"Unsubscribed"}"#, SubscriptionClosedReason::Unsubscribed.into()),
(r#"{"reason":"ConnectionReset"}"#, SubscriptionClosedReason::ConnectionReset.into()),
(r#"{"reason":{"Server":"hoho"}}"#, SubscriptionClosedReason::Server("hoho".into()).into()),
];

for (s, d) in items {
let dsr: SubscriptionClosed = serde_json::from_str(s).unwrap();
assert_eq!(dsr, d);
let ser = serde_json::to_string(&d).unwrap();
assert_eq!(ser, s);
}
}

#[test]
fn subscription_closed_deny_unknown_field() {
let ser = r#"{"reason":"Unsubscribed","deny":1}"#;
assert!(serde_json::from_str::<SubscriptionClosed>(ser).is_err());
}
}
5 changes: 5 additions & 0 deletions utils/src/server/helpers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,11 @@ impl MethodSink {
MethodSink { tx, max_response_size }
}

/// Returns whether this channel is closed without needing a context.
pub fn is_closed(&self) -> bool {
self.tx.is_closed()
}

/// Send a JSON-RPC response to the client. If the serialization of `result` exceeds `max_response_size`,
/// an error will be sent instead.
pub fn send_response(&self, id: Id, result: impl Serialize) -> bool {
Expand Down
71 changes: 42 additions & 29 deletions utils/src/server/rpc_module.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,10 +29,11 @@ use crate::server::resource_limiting::{ResourceGuard, ResourceTable, ResourceVec
use beef::Cow;
use futures_channel::{mpsc, oneshot};
use futures_util::{future::BoxFuture, FutureExt, StreamExt};
use jsonrpsee_types::error::{SubscriptionClosed, SubscriptionClosedReason};
use jsonrpsee_types::to_json_raw_value;
use jsonrpsee_types::v2::error::{invalid_subscription_err, CALL_EXECUTION_FAILED_CODE};
use jsonrpsee_types::{
error::{Error, SubscriptionClosedError},
error::Error,
traits::ToRpcParams,
v2::{
ErrorCode, Id, Params, Request, Response, SubscriptionId as RpcSubscriptionId, SubscriptionPayload,
Expand Down Expand Up @@ -768,10 +769,18 @@ pub struct SubscriptionSink {
impl SubscriptionSink {
/// Send a message back to subscribers.
pub fn send<T: Serialize>(&mut self, result: &T) -> Result<(), Error> {
if self.is_closed() {
return Err(Error::SubscriptionClosed(SubscriptionClosedReason::ConnectionReset.into()));
}
let msg = self.build_message(result)?;
self.inner_send(msg).map_err(Into::into)
}

/// Returns whether this channel is closed without needing a context.
pub fn is_closed(&self) -> bool {
self.inner.is_closed()
}

fn build_message<T: Serialize>(&self, result: &T) -> Result<String, Error> {
serde_json::to_string(&SubscriptionResponse::new(
self.method.into(),
Expand All @@ -784,45 +793,47 @@ impl SubscriptionSink {
let res = match self.is_connected.as_ref() {
Some(conn) if !conn.is_canceled() => {
// unbounded send only fails if the receiver has been dropped.
self.inner.send_raw(msg).map_err(|_| {
Some(SubscriptionClosedError::new("Closed by the client (connection reset)", self.uniq_sub.sub_id))
})
self.inner.send_raw(msg).map_err(|_| Some(SubscriptionClosedReason::ConnectionReset))
}
Some(_) => Err(Some(SubscriptionClosedError::new("Closed by unsubscribe call", self.uniq_sub.sub_id))),
Some(_) => Err(Some(SubscriptionClosedReason::Unsubscribed)),
// NOTE(niklasad1): this should be unreachble, after the first error is detected the subscription is closed.
None => Err(None),
};

if let Err(Some(e)) = &res {
self.inner_close(e);
// The subscription was already closed by the client
// Close down the subscription but don't send a message to the client.
if res.is_err() {
self.inner_close(None);
}

res.map_err(|e| {
let err = e.unwrap_or_else(|| SubscriptionClosedError::new("Close reason unknown", self.uniq_sub.sub_id));
Error::SubscriptionClosed(err)
let err = e.unwrap_or_else(|| SubscriptionClosedReason::Server("Close reason unknown".to_string()));
Error::SubscriptionClosed(err.into())
})
}

/// Close the subscription sink with a customized error message.
pub fn close(&mut self, msg: &str) {
let err = SubscriptionClosedError::new(msg, self.uniq_sub.sub_id);
self.inner_close(&err);
let close_reason = SubscriptionClosedReason::Server(msg.to_string()).into();
self.inner_close(Some(&close_reason));
}

fn inner_close(&mut self, err: &SubscriptionClosedError) {
fn inner_close(&mut self, close_reason: Option<&SubscriptionClosed>) {
self.is_connected.take();
if let Some((sink, _)) = self.subscribers.lock().remove(&self.uniq_sub) {
tracing::debug!("Closing subscription: {:?}", self.uniq_sub.sub_id);
let msg = self.build_message(err).expect("valid json infallible; qed");
let _ = sink.send_raw(msg);
tracing::debug!("Closing subscription: {:?} reason: {:?}", self.uniq_sub.sub_id, close_reason);
if let Some(close_reason) = close_reason {
let msg = self.build_message(close_reason).expect("valid json infallible; qed");
let _ = sink.send_raw(msg);
}
}
}
}

impl Drop for SubscriptionSink {
fn drop(&mut self) {
let err = SubscriptionClosedError::new("Closed by the server", self.uniq_sub.sub_id);
self.inner_close(&err);
let err = SubscriptionClosedReason::Server("No close reason provided".into()).into();
self.inner_close(Some(&err));
}
}

Expand Down Expand Up @@ -855,9 +866,13 @@ impl Subscription {
&mut self,
) -> Option<Result<(T, jsonrpsee_types::v2::SubscriptionId<'static>), Error>> {
let raw = self.rx.next().await?;
let res = serde_json::from_str::<SubscriptionResponse<T>>(&raw)
.map(|v| (v.params.result, v.params.subscription.into_owned()))
.map_err(Into::into);
let res = match serde_json::from_str::<SubscriptionResponse<T>>(&raw) {
Ok(r) => Ok((r.params.result, r.params.subscription.into_owned())),
Err(_) => match serde_json::from_str::<SubscriptionResponse<SubscriptionClosed>>(&raw) {
Ok(e) => Err(Error::SubscriptionClosed(e.params.result)),
Err(e) => Err(e.into()),
},
};
Some(res)
}
}
Expand Down Expand Up @@ -1028,16 +1043,14 @@ mod tests {
module
.register_subscription("my_sub", "my_sub", "my_unsub", |_, mut sink, _| {
let mut stream_data = vec!['0', '1', '2'];
std::thread::spawn(move || loop {
tracing::debug!("This is your friendly subscription sending data.");
if let Some(letter) = stream_data.pop() {
std::thread::spawn(move || {
while let Some(letter) = stream_data.pop() {
tracing::debug!("This is your friendly subscription sending data.");
if let Err(Error::SubscriptionClosed(_)) = sink.send(&letter) {
return;
}
} else {
return;
std::thread::sleep(std::time::Duration::from_millis(500));
}
std::thread::sleep(std::time::Duration::from_millis(500));
});
Ok(())
})
Expand All @@ -1050,10 +1063,10 @@ mod tests {
assert_eq!(id, v2::params::SubscriptionId::Num(my_sub.subscription_id()));
}

let sub_err = my_sub.next::<char>().await.unwrap().unwrap_err();

// The subscription is now closed by the server.
let (sub_closed_err, _) = my_sub.next::<SubscriptionClosedError>().await.unwrap().unwrap();
assert_eq!(sub_closed_err.subscription_id(), my_sub.subscription_id());
assert_eq!(sub_closed_err.close_reason(), "Closed by the server");
assert!(matches!(sub_err, Error::SubscriptionClosed(_)));
}

#[tokio::test]
Expand Down