Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(clients): add explicit unsubscribe API #789

Merged
merged 4 commits into from
Jun 21, 2022
Merged
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
29 changes: 21 additions & 8 deletions core/src/client/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -167,7 +167,7 @@ macro_rules! rpc_params {
}

/// Subscription kind
#[derive(Debug)]
#[derive(Debug, Clone)]
#[non_exhaustive]
pub enum SubscriptionKind {
/// Get notifications based on Subscription ID.
Expand All @@ -187,7 +187,7 @@ pub struct Subscription<Notif> {
/// Channel from which we receive notifications from the server, as encoded `JsonValue`s.
notifs_rx: mpsc::Receiver<JsonValue>,
/// Callback kind.
kind: SubscriptionKind,
kind: Option<SubscriptionKind>,
/// Marker in order to pin the `Notif` parameter.
marker: PhantomData<Notif>,
}
Expand All @@ -203,12 +203,25 @@ impl<Notif> Subscription<Notif> {
notifs_rx: mpsc::Receiver<JsonValue>,
kind: SubscriptionKind,
) -> Self {
Self { to_back, notifs_rx, kind, marker: PhantomData }
Self { to_back, notifs_rx, kind: Some(kind), marker: PhantomData }
}

/// Return the subscription type and, if applicable, ID.
pub fn kind(&self) -> &SubscriptionKind {
&self.kind
self.kind.as_ref().expect("only None after unsubscribe; qed")
}

/// Unsubscribe and consume the subscription.
pub async fn unsubscribe(mut self) -> Result<(), Error> {
niklasad1 marked this conversation as resolved.
Show resolved Hide resolved
let msg = match self.kind.take().expect("only None after unsubscribe; qed") {
SubscriptionKind::Method(notif) => FrontToBack::UnregisterNotification(notif),
SubscriptionKind::Subscription(sub_id) => FrontToBack::SubscriptionClosed(sub_id),
};
self.to_back.send(msg).await?;

// wait until notif channel is closed then the subscription was closed.
while self.notifs_rx.next().await.is_some() {}
Ok(())
}
}

Expand Down Expand Up @@ -323,11 +336,11 @@ impl<Notif> Drop for Subscription<Notif> {
// the channel's buffer will be full.
// However, when a notification arrives, the background task will realize that the channel
// to the `Callback` has been closed.
let kind = std::mem::replace(&mut self.kind, SubscriptionKind::Subscription(SubscriptionId::Num(0)));

let msg = match kind {
SubscriptionKind::Method(notif) => FrontToBack::UnregisterNotification(notif),
SubscriptionKind::Subscription(sub_id) => FrontToBack::SubscriptionClosed(sub_id),
let msg = match self.kind.take() {
Some(SubscriptionKind::Method(notif)) => FrontToBack::UnregisterNotification(notif),
Some(SubscriptionKind::Subscription(sub_id)) => FrontToBack::SubscriptionClosed(sub_id),
None => return,
};
let _ = self.to_back.send(msg).now_or_never();
}
Expand Down