Skip to content

Commit

Permalink
improve SubscriptionClosed error
Browse files Browse the repository at this point in the history
Closing #485
  • Loading branch information
niklasad1 committed Oct 6, 2021
1 parent 94c881b commit 38d3662
Show file tree
Hide file tree
Showing 2 changed files with 41 additions and 23 deletions.
18 changes: 15 additions & 3 deletions types/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -165,11 +165,23 @@ impl Error {
#[derive(Deserialize, Serialize, Debug, PartialEq)]
pub struct SubscriptionClosedError {
subscription_closed: String,
id: u64,
}

impl From<String> for SubscriptionClosedError {
fn from(msg: String) -> Self {
Self { subscription_closed: msg }
impl SubscriptionClosedError {
/// Create a new subscription closed error.
pub fn new(reason: impl Into<String>, id: u64) -> Self {
Self { subscription_closed: reason.into(), id }
}

/// Get the reason why the subscription was closed.
pub fn close_reason(&self) -> &str {
&self.subscription_closed
}

/// Get the subscription ID.
pub fn subscription_id(&self) -> u64 {
self.id
}
}

Expand Down
46 changes: 26 additions & 20 deletions utils/src/server/rpc_module.rs
Original file line number Diff line number Diff line change
Expand Up @@ -531,45 +531,50 @@ impl SubscriptionSink {
}

fn inner_send(&mut self, msg: String) -> Result<(), Error> {
let res = if let Some(conn) = self.is_connected.as_ref() {
if !conn.is_canceled() {
let res = match self.is_connected.as_ref() {
Some(conn) if !conn.is_canceled() => {
// unbounded send only fails if the receiver has been dropped.
self.inner.unbounded_send(msg).map_err(|_| subscription_closed_err(self.uniq_sub.sub_id))
} else {
Err(subscription_closed_err(self.uniq_sub.sub_id))
self.inner.unbounded_send(msg).map_err(|_| {
Some(SubscriptionClosedError::new("Closed by the client (connection reset)", self.uniq_sub.sub_id))
})
}
} else {
Err(subscription_closed_err(self.uniq_sub.sub_id))
Some(_) => Err(Some(SubscriptionClosedError::new("Closed by unsubscribe call", self.uniq_sub.sub_id))),
// NOTE(niklasad1): this should be unreachble, after the first error is detected the subscription is closed.
None => Err(None),
};

if let Err(e) = &res {
self.close(e.to_string());
if let Err(Some(e)) = &res {
self.inner_close(e);
}

res
res.map_err(|e| {
let err = e.unwrap_or_else(|| SubscriptionClosedError::new("Close reason unknown", self.uniq_sub.sub_id));
Error::SubscriptionClosed(err)
})
}

/// Close the subscription sink with a customized error message.
pub fn close(&mut self, close_reason: String) {
pub fn close(&mut self, msg: &str) {
let err = SubscriptionClosedError::new(msg, self.uniq_sub.sub_id);
self.inner_close(&err);
}

fn inner_close(&mut self, err: &SubscriptionClosedError) {
self.is_connected.take();
if let Some((sink, _)) = self.subscribers.lock().remove(&self.uniq_sub) {
let msg =
self.build_message(&SubscriptionClosedError::from(close_reason)).expect("valid json infallible; qed");
let msg = self.build_message(err).expect("valid json infallible; qed");
let _ = sink.unbounded_send(msg);
}
}
}

impl Drop for SubscriptionSink {
fn drop(&mut self) {
self.close(format!("Subscription: {} closed", self.uniq_sub.sub_id));
let err = SubscriptionClosedError::new("Closed by the server", self.uniq_sub.sub_id);
self.inner_close(&err);
}
}

fn subscription_closed_err(sub_id: u64) -> Error {
Error::SubscriptionClosed(format!("Subscription {} closed", sub_id).into())
}

/// Wrapper struct that maintains a subscription for testing.
#[derive(Debug)]
pub struct TestSubscription {
Expand Down Expand Up @@ -793,7 +798,8 @@ mod tests {
}

// The subscription is now closed
let (val, _) = my_sub.next::<SubscriptionClosedError>().await;
assert_eq!(val, format!("Subscription: {} closed", my_sub.subscription_id()).into());
let (sub_closed_err, _) = my_sub.next::<SubscriptionClosedError>().await;
assert_eq!(sub_closed_err.subscription_id(), my_sub.subscription_id());
assert_eq!(sub_closed_err.close_reason(), "Closed by the server");
}
}

0 comments on commit 38d3662

Please sign in to comment.