From 69ac2f50a1066531932964e9efea8576478e9085 Mon Sep 17 00:00:00 2001 From: Niklas Adolfsson Date: Tue, 31 Jan 2023 14:06:07 +0100 Subject: [PATCH 1/9] client: use tokio channels This PR replaces the future channels with tokio because the APIs fit our use-cases better. --- core/Cargo.toml | 9 ++- core/src/client/async_client/helpers.rs | 8 +-- core/src/client/async_client/manager.rs | 4 +- core/src/client/async_client/mod.rs | 85 +++++++++++++------------ core/src/client/mod.rs | 13 ++-- core/src/error.rs | 6 +- tests/tests/integration_tests.rs | 3 +- 7 files changed, 68 insertions(+), 60 deletions(-) diff --git a/core/Cargo.toml b/core/Cargo.toml index 24302a5057..e1234f94ae 100644 --- a/core/Cargo.toml +++ b/core/Cargo.toml @@ -10,7 +10,6 @@ license = "MIT" anyhow = "1" async-trait = "0.1" beef = { version = "0.5.1", features = ["impl_serde"] } -futures-channel = "0.3.14" jsonrpsee-types = { path = "../types", version = "0.16.2" } thiserror = "1" serde = { version = "1.0", default-features = false, features = ["derive"] } @@ -28,9 +27,11 @@ soketto = { version = "0.7.1", optional = true } parking_lot = { version = "0.12", optional = true } tokio = { version = "1.16", optional = true } wasm-bindgen-futures = { version = "0.4.19", optional = true } +futures-channel = { version = "0.3.14", optional = true } futures-timer = { version = "3", optional = true } globset = { version = "0.4", optional = true } http = { version = "0.2.7", optional = true } +tokio-stream = { version = "0.1", optional = true } [features] default = [] @@ -44,15 +45,16 @@ server = [ "rand", "tokio/rt", "tokio/sync", + "futures-channel", ] -client = ["futures-util/sink", "futures-channel/sink", "futures-channel/std"] +client = ["futures-util/sink", "tokio/sync"] async-client = [ "async-lock", "client", "rustc-hash", "tokio/macros", "tokio/rt", - "tokio/sync", + "tokio-stream", "futures-timer", ] async-wasm-client = [ @@ -61,6 +63,7 @@ async-wasm-client = [ "wasm-bindgen-futures", "rustc-hash/std", "futures-timer/wasm-bindgen", + "tokio-stream", ] [dev-dependencies] diff --git a/core/src/client/async_client/helpers.rs b/core/src/client/async_client/helpers.rs index dd8589300d..b9029935bf 100644 --- a/core/src/client/async_client/helpers.rs +++ b/core/src/client/async_client/helpers.rs @@ -30,9 +30,9 @@ use crate::params::ArrayParams; use crate::traits::ToRpcParams; use crate::Error; -use futures_channel::mpsc; use futures_timer::Delay; use futures_util::future::{self, Either}; +use tokio::sync::mpsc; use jsonrpsee_types::error::CallError; use jsonrpsee_types::response::SubscriptionError; @@ -155,7 +155,7 @@ pub(crate) fn process_notification(manager: &mut RequestManager, notif: Notifica Err(err) => { tracing::error!("Error sending notification, dropping handler for {:?} error: {:?}", notif.method, err); let _ = manager.remove_notification_handler(notif.method.into_owned()); - Err(err.into_send_error().into()) + Err(Error::Custom(err.to_string())) } }, None => { @@ -274,8 +274,8 @@ pub(crate) fn process_error_response(manager: &mut RequestManager, err: ErrorRes /// Wait for a stream to complete within the given timeout. pub(crate) async fn call_with_timeout( timeout: std::time::Duration, - rx: futures_channel::oneshot::Receiver>, -) -> Result, futures_channel::oneshot::Canceled> { + rx: tokio::sync::oneshot::Receiver>, +) -> Result, tokio::sync::oneshot::error::RecvError> { match future::select(rx, Delay::new(timeout)).await { Either::Left((res, _)) => res, Either::Right((_, _)) => Ok(Err(Error::RequestTimeout)), diff --git a/core/src/client/async_client/manager.rs b/core/src/client/async_client/manager.rs index 7e3da486e0..3fe01fbfcd 100644 --- a/core/src/client/async_client/manager.rs +++ b/core/src/client/async_client/manager.rs @@ -38,10 +38,10 @@ use std::{ }; use crate::{client::BatchEntry, Error}; -use futures_channel::{mpsc, oneshot}; use jsonrpsee_types::{Id, SubscriptionId}; use rustc_hash::FxHashMap; use serde_json::value::Value as JsonValue; +use tokio::sync::{mpsc, oneshot}; #[derive(Debug)] enum Kind { @@ -312,9 +312,9 @@ impl RequestManager { #[cfg(test)] mod tests { use super::{Error, RequestManager}; - use futures_channel::{mpsc, oneshot}; use jsonrpsee_types::{Id, SubscriptionId}; use serde_json::Value as JsonValue; + use tokio::sync::{mpsc, oneshot}; #[test] fn insert_remove_pending_request_works() { diff --git a/core/src/client/async_client/mod.rs b/core/src/client/async_client/mod.rs index 26856fb94a..f8f9d034bc 100644 --- a/core/src/client/async_client/mod.rs +++ b/core/src/client/async_client/mod.rs @@ -26,10 +26,8 @@ use manager::RequestManager; use async_lock::Mutex; use async_trait::async_trait; -use futures_channel::{mpsc, oneshot}; use futures_timer::Delay; use futures_util::future::{self, Either, Fuse}; -use futures_util::sink::SinkExt; use futures_util::stream::StreamExt; use futures_util::FutureExt; use jsonrpsee_types::{ @@ -37,6 +35,7 @@ use jsonrpsee_types::{ SubscriptionResponse, }; use serde::de::DeserializeOwned; +use tokio::sync::{mpsc, oneshot}; use tracing::instrument; use super::{generate_batch_id_range, FrontToBack, IdKind, RequestIdManager}; @@ -169,7 +168,7 @@ impl ClientBuilder { let (err_tx, err_rx) = oneshot::channel(); let max_notifs_per_subscription = self.max_notifs_per_subscription; let ping_interval = self.ping_interval; - let (on_close_tx, on_close_rx) = oneshot::channel(); + let (on_exit_tx, on_exit_rx) = oneshot::channel(); tokio::spawn(async move { background_task( @@ -179,7 +178,7 @@ impl ClientBuilder { err_tx, max_notifs_per_subscription, ping_interval, - on_close_tx, + on_exit_rx, ) .await; }); @@ -189,7 +188,7 @@ impl ClientBuilder { error: Mutex::new(ErrorFromBack::Unread(err_rx)), id_manager: RequestIdManager::new(self.max_concurrent_requests, self.id_kind), max_log_length: self.max_log_length, - notify: Mutex::new(Some(on_close_rx)), + on_exit: Some(on_exit_tx), } } @@ -224,7 +223,7 @@ impl ClientBuilder { #[derive(Debug)] pub struct Client { /// Channel to send requests to the background task. - to_back: mpsc::Sender, + to_back: tokio::sync::mpsc::Sender, /// If the background thread terminates the error is sent to this channel. // NOTE(niklasad1): This is a Mutex to circumvent that the async fns takes immutable references. error: Mutex, @@ -236,10 +235,8 @@ pub struct Client { /// /// Entries bigger than this limit will be truncated. max_log_length: u32, - /// Notify when the client is disconnected or encountered an error. - // NOTE: Similar to error, the async fns use immutable references. The `Receiver` is wrapped - // into `Option` to ensure the `on_disconnect` awaits only once. - notify: Mutex>>, + /// When the client is dropped a message is sent to be background thread. + on_exit: Option>, } impl Client { @@ -264,17 +261,15 @@ impl Client { /// /// This method is cancel safe. pub async fn on_disconnect(&self) { - // Wait until the `background_task` exits. - let mut notify_lock = self.notify.lock().await; - if let Some(notify) = notify_lock.take() { - let _ = notify.await; - } + self.to_back.closed().await } } impl Drop for Client { fn drop(&mut self) { - self.to_back.close_channel(); + if let Some(e) = self.on_exit.take() { + let _ = e.send(()); + } } } @@ -293,9 +288,11 @@ impl ClientT for Client { let raw = serde_json::to_string(¬if).map_err(Error::ParseError)?; tx_log_from_str(&raw, self.max_log_length); - let mut sender = self.to_back.clone(); + let sender = self.to_back.clone(); let fut = sender.send(FrontToBack::Notification(raw)); + tokio::pin!(fut); + match future::select(fut, Delay::new(self.request_timeout)).await { Either::Left((Ok(()), _)) => Ok(()), Either::Left((Err(_), _)) => Err(self.read_error_from_backend().await), @@ -434,7 +431,7 @@ impl SubscriptionClientT for Client { tx_log_from_str(&raw, self.max_log_length); - let (send_back_tx, send_back_rx) = oneshot::channel(); + let (send_back_tx, send_back_rx) = tokio::sync::oneshot::channel(); if self .to_back .clone() @@ -698,43 +695,46 @@ async fn handle_frontend_messages( async fn background_task( mut sender: S, receiver: R, - mut frontend: mpsc::Receiver, + frontend: mpsc::Receiver, front_error: oneshot::Sender, max_notifs_per_subscription: usize, ping_interval: Option, - on_close: oneshot::Sender<()>, + on_exit: oneshot::Receiver<()>, ) where S: TransportSenderT, R: TransportReceiverT, { + // Create either a valid delay fuse triggered every provided `duration`, + // or create a terminated fuse that's never selected if the provided `duration` is None. + fn ping_fut(ping_interval: Option) -> Fuse { + if let Some(duration) = ping_interval { + Delay::new(duration).fuse() + } else { + // The select macro bypasses terminated futures, and the `submit_ping` branch is never selected. + Fuse::::terminated() + } + } + let mut manager = RequestManager::new(); let backend_event = futures_util::stream::unfold(receiver, |mut receiver| async { let res = receiver.receive().await; Some((res, receiver)) }); - futures_util::pin_mut!(backend_event); + let frontend = tokio_stream::wrappers::ReceiverStream::new(frontend); + + tokio::pin!(backend_event, frontend); // Place frontend and backend messages into their own select. // This implies that either messages are received (both front or backend), // or the submitted ping timer expires (if provided). - let next_frontend = frontend.next(); - let next_backend = backend_event.next(); - let mut message_fut = future::select(next_frontend, next_backend); + let mut message_fut = future::select(frontend.next(), backend_event.next()); + let mut exit_or_ping_fut = future::select(on_exit, ping_fut(ping_interval)); loop { - // Create either a valid delay fuse triggered every provided `duration`, - // or create a terminated fuse that's never selected if the provided `duration` is None. - let submit_ping = if let Some(duration) = ping_interval { - Delay::new(duration).fuse() - } else { - // The select macro bypasses terminated futures, and the `submit_ping` branch is never selected. - Fuse::::terminated() - }; - - match future::select(message_fut, submit_ping).await { + match future::select(message_fut, exit_or_ping_fut).await { // Message received from the frontend. - Either::Left((Either::Left((frontend_value, backend)), _)) => { + Either::Left((Either::Left((frontend_value, backend)), exit_or_ping)) => { let frontend_value = if let Some(value) = frontend_value { value } else { @@ -748,9 +748,10 @@ async fn background_task( // Advance frontend, save backend. message_fut = future::select(frontend.next(), backend); + exit_or_ping_fut = exit_or_ping; } // Message received from the backend. - Either::Left((Either::Right((backend_value, frontend)), _)) => { + Either::Left((Either::Right((backend_value, frontend)), exit_or_ping)) => { if let Err(err) = handle_backend_messages::( backend_value, &mut manager, @@ -765,21 +766,27 @@ async fn background_task( } // Advance backend, save frontend. message_fut = future::select(frontend, backend_event.next()); + exit_or_ping_fut = exit_or_ping; + } + // The client is closed. + Either::Right((Either::Left((_, _)), _)) => { + break; } // Submit ping interval was triggered if enabled. - Either::Right((_, next_message_fut)) => { + Either::Right((Either::Right((_, on_exit)), msg)) => { if let Err(err) = sender.send_ping().await { tracing::error!("[backend]: Could not send ping frame: {}", err); let _ = front_error.send(Error::Custom("Could not send ping frame".into())); break; } - message_fut = next_message_fut; + message_fut = msg; + exit_or_ping_fut = future::select(on_exit, ping_fut(ping_interval)); } }; } // Wake the `on_disconnect` method. - let _ = on_close.send(()); + let _ = frontend.close(); // Send close message to the server. let _ = sender.close().await; } diff --git a/core/src/client/mod.rs b/core/src/client/mod.rs index 40aa5dc753..4df185873e 100644 --- a/core/src/client/mod.rs +++ b/core/src/client/mod.rs @@ -38,13 +38,12 @@ use crate::params::BatchRequestBuilder; use crate::traits::ToRpcParams; use async_trait::async_trait; use core::marker::PhantomData; -use futures_channel::{mpsc, oneshot}; use futures_util::future::FutureExt; -use futures_util::sink::SinkExt; use futures_util::stream::{Stream, StreamExt}; use jsonrpsee_types::{ErrorObject, Id, SubscriptionId}; use serde::de::DeserializeOwned; use serde_json::Value as JsonValue; +use tokio::sync::{mpsc, oneshot}; // Re-exports for the `rpc_params` macro. #[doc(hidden)] @@ -238,8 +237,8 @@ impl std::marker::Unpin for Subscription {} impl Subscription { /// Create a new subscription. pub fn new( - to_back: mpsc::Sender, - notifs_rx: mpsc::Receiver, + to_back: tokio::sync::mpsc::Sender, + notifs_rx: tokio::sync::mpsc::Receiver, kind: SubscriptionKind, ) -> Self { Self { to_back, notifs_rx, kind: Some(kind), marker: PhantomData } @@ -256,10 +255,10 @@ impl Subscription { SubscriptionKind::Method(notif) => FrontToBack::UnregisterNotification(notif), SubscriptionKind::Subscription(sub_id) => FrontToBack::SubscriptionClosed(sub_id), }; - self.to_back.send(msg).await?; + self.to_back.send(msg).await.unwrap(); // wait until notif channel is closed then the subscription was closed. - while self.notifs_rx.next().await.is_some() {} + while self.notifs_rx.recv().await.is_some() {} Ok(()) } } @@ -360,7 +359,7 @@ where { type Item = Result; fn poll_next(mut self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> task::Poll> { - let n = futures_util::ready!(self.notifs_rx.poll_next_unpin(cx)); + let n = futures_util::ready!(self.notifs_rx.poll_recv(cx)); let res = n.map(|n| match serde_json::from_value::(n) { Ok(parsed) => Ok(parsed), Err(e) => Err(Error::ParseError(e)), diff --git a/core/src/error.rs b/core/src/error.rs index 9b2b181996..8807237b02 100644 --- a/core/src/error.rs +++ b/core/src/error.rs @@ -63,9 +63,9 @@ pub enum Error { /// Networking error or error on the low-level protocol layer. #[error("Networking or low-level protocol error: {0}")] Transport(#[source] anyhow::Error), - /// Frontend/backend channel error. - #[error("Frontend/backend channel error: {0}")] - Internal(#[from] futures_channel::mpsc::SendError), + /// Send error. + #[error("Send error: {0}")] + SendError(String), /// Invalid response, #[error("Invalid response: {0}")] InvalidResponse(Mismatch), diff --git a/tests/tests/integration_tests.rs b/tests/tests/integration_tests.rs index 402e6cd074..8069c1887d 100644 --- a/tests/tests/integration_tests.rs +++ b/tests/tests/integration_tests.rs @@ -261,8 +261,7 @@ async fn ws_subscription_without_polling_doesnt_make_client_unuseable() { // don't poll the subscription stream for 2 seconds, should be full now. tokio::time::sleep(Duration::from_secs(2)).await; - // Capacity is `num_sender` + `capacity` - for _ in 0..5 { + for _ in 0..4 { assert!(hello_sub.next().await.unwrap().is_ok()); } From ea413bb8ad85e0aae3f836433af838446ce9552b Mon Sep 17 00:00:00 2001 From: Niklas Adolfsson Date: Tue, 31 Jan 2023 14:15:38 +0100 Subject: [PATCH 2/9] remove unused code --- core/src/error.rs | 3 --- 1 file changed, 3 deletions(-) diff --git a/core/src/error.rs b/core/src/error.rs index 8807237b02..33894e0ff8 100644 --- a/core/src/error.rs +++ b/core/src/error.rs @@ -63,9 +63,6 @@ pub enum Error { /// Networking error or error on the low-level protocol layer. #[error("Networking or low-level protocol error: {0}")] Transport(#[source] anyhow::Error), - /// Send error. - #[error("Send error: {0}")] - SendError(String), /// Invalid response, #[error("Invalid response: {0}")] InvalidResponse(Mismatch), From e076d5fd8b7bb142eadcfe72b166baa1908cc78c Mon Sep 17 00:00:00 2001 From: Niklas Adolfsson Date: Tue, 31 Jan 2023 14:23:32 +0100 Subject: [PATCH 3/9] fix wasm build --- core/src/client/async_client/mod.rs | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/core/src/client/async_client/mod.rs b/core/src/client/async_client/mod.rs index f8f9d034bc..b1350ca1c3 100644 --- a/core/src/client/async_client/mod.rs +++ b/core/src/client/async_client/mod.rs @@ -203,10 +203,10 @@ impl ClientBuilder { let (to_back, from_front) = mpsc::channel(self.max_concurrent_requests); let (err_tx, err_rx) = oneshot::channel(); let max_notifs_per_subscription = self.max_notifs_per_subscription; - let (on_close_tx, on_close_rx) = oneshot::channel(); + let (on_exit_tx, on_exit_rx) = oneshot::channel(); wasm_bindgen_futures::spawn_local(async move { - background_task(sender, receiver, from_front, err_tx, max_notifs_per_subscription, None, on_close_tx).await; + background_task(sender, receiver, from_front, err_tx, max_notifs_per_subscription, None, on_exit_rx).await; }); Client { to_back, @@ -214,7 +214,7 @@ impl ClientBuilder { error: Mutex::new(ErrorFromBack::Unread(err_rx)), id_manager: RequestIdManager::new(self.max_concurrent_requests, self.id_kind), max_log_length: self.max_log_length, - notify: Mutex::new(Some(on_close_rx)), + on_exit: Some(on_exit_tx), } } } From 56f95dea997c73cc85966064f3adb044aaeccfcc Mon Sep 17 00:00:00 2001 From: Niklas Adolfsson Date: Tue, 31 Jan 2023 14:27:14 +0100 Subject: [PATCH 4/9] fix docs --- core/src/client/async_client/mod.rs | 8 +++++--- 1 file changed, 5 insertions(+), 3 deletions(-) diff --git a/core/src/client/async_client/mod.rs b/core/src/client/async_client/mod.rs index b1350ca1c3..6ac8d248c5 100644 --- a/core/src/client/async_client/mod.rs +++ b/core/src/client/async_client/mod.rs @@ -114,8 +114,10 @@ impl ClientBuilder { /// [`Subscription::next()`](../../jsonrpsee_core/client/struct.Subscription.html#method.next) such that /// it can keep with the rate as server produces new items on the subscription. /// - /// **Note**: The actual capacity is `num_senders + max_subscription_capacity` - /// because it is passed to [`futures_channel::mpsc::channel`]. + /// + /// # Panics + /// + /// This function panics if `max` is 0. pub fn max_notifs_per_subscription(mut self, max: usize) -> Self { self.max_notifs_per_subscription = max; self @@ -235,7 +237,7 @@ pub struct Client { /// /// Entries bigger than this limit will be truncated. max_log_length: u32, - /// When the client is dropped a message is sent to be background thread. + /// When the client is dropped a message is sent to the background thread. on_exit: Option>, } From b46214b4651158c330b629cc2e731c1656d82c70 Mon Sep 17 00:00:00 2001 From: Niklas Adolfsson Date: Tue, 31 Jan 2023 15:08:09 +0100 Subject: [PATCH 5/9] fix tests --- client/ws-client/src/tests.rs | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/client/ws-client/src/tests.rs b/client/ws-client/src/tests.rs index a9d5cfe7cb..25ba4c076a 100644 --- a/client/ws-client/src/tests.rs +++ b/client/ws-client/src/tests.rs @@ -213,8 +213,7 @@ async fn notification_without_polling_doesnt_make_client_unuseable() { // don't poll the notification stream for 2 seconds, should be full now. tokio::time::sleep(std::time::Duration::from_secs(2)).await; - // Capacity is `num_sender` + `capacity` - for _ in 0..5 { + for _ in 0..4 { assert!(nh.next().with_default_timeout().await.unwrap().unwrap().is_ok()); } From 1334b55112648c439fafa57c58ad48a71986a759 Mon Sep 17 00:00:00 2001 From: Niklas Adolfsson Date: Tue, 31 Jan 2023 15:12:58 +0100 Subject: [PATCH 6/9] fix more nits --- core/src/client/async_client/helpers.rs | 6 +++--- core/src/client/async_client/mod.rs | 2 +- core/src/client/mod.rs | 4 ++-- 3 files changed, 6 insertions(+), 6 deletions(-) diff --git a/core/src/client/async_client/helpers.rs b/core/src/client/async_client/helpers.rs index b9029935bf..8888f9b81f 100644 --- a/core/src/client/async_client/helpers.rs +++ b/core/src/client/async_client/helpers.rs @@ -32,7 +32,7 @@ use crate::Error; use futures_timer::Delay; use futures_util::future::{self, Either}; -use tokio::sync::mpsc; +use tokio::sync::{mpsc, oneshot}; use jsonrpsee_types::error::CallError; use jsonrpsee_types::response::SubscriptionError; @@ -274,8 +274,8 @@ pub(crate) fn process_error_response(manager: &mut RequestManager, err: ErrorRes /// Wait for a stream to complete within the given timeout. pub(crate) async fn call_with_timeout( timeout: std::time::Duration, - rx: tokio::sync::oneshot::Receiver>, -) -> Result, tokio::sync::oneshot::error::RecvError> { + rx: oneshot::Receiver>, +) -> Result, oneshot::error::RecvError> { match future::select(rx, Delay::new(timeout)).await { Either::Left((res, _)) => res, Either::Right((_, _)) => Ok(Err(Error::RequestTimeout)), diff --git a/core/src/client/async_client/mod.rs b/core/src/client/async_client/mod.rs index 6ac8d248c5..244b9d3238 100644 --- a/core/src/client/async_client/mod.rs +++ b/core/src/client/async_client/mod.rs @@ -225,7 +225,7 @@ impl ClientBuilder { #[derive(Debug)] pub struct Client { /// Channel to send requests to the background task. - to_back: tokio::sync::mpsc::Sender, + to_back: mpsc::Sender, /// If the background thread terminates the error is sent to this channel. // NOTE(niklasad1): This is a Mutex to circumvent that the async fns takes immutable references. error: Mutex, diff --git a/core/src/client/mod.rs b/core/src/client/mod.rs index 4df185873e..1fcff16697 100644 --- a/core/src/client/mod.rs +++ b/core/src/client/mod.rs @@ -237,8 +237,8 @@ impl std::marker::Unpin for Subscription {} impl Subscription { /// Create a new subscription. pub fn new( - to_back: tokio::sync::mpsc::Sender, - notifs_rx: tokio::sync::mpsc::Receiver, + to_back: mpsc::Sender, + notifs_rx: mpsc::Receiver, kind: SubscriptionKind, ) -> Self { Self { to_back, notifs_rx, kind: Some(kind), marker: PhantomData } From 80a83c16e3a83cb397eeb15dabd64703c32477bc Mon Sep 17 00:00:00 2001 From: Niklas Adolfsson Date: Tue, 31 Jan 2023 15:17:06 +0100 Subject: [PATCH 7/9] Update core/src/client/async_client/mod.rs --- core/src/client/async_client/mod.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/src/client/async_client/mod.rs b/core/src/client/async_client/mod.rs index 244b9d3238..74ce0088f1 100644 --- a/core/src/client/async_client/mod.rs +++ b/core/src/client/async_client/mod.rs @@ -788,7 +788,7 @@ async fn background_task( } // Wake the `on_disconnect` method. - let _ = frontend.close(); + _ = frontend.close(); // Send close message to the server. let _ = sender.close().await; } From 9f987771e3e49e1b61c6dc16857eb3a042bb4dcf Mon Sep 17 00:00:00 2001 From: Niklas Adolfsson Date: Tue, 31 Jan 2023 15:17:41 +0100 Subject: [PATCH 8/9] Update core/src/client/async_client/mod.rs --- core/src/client/async_client/mod.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/src/client/async_client/mod.rs b/core/src/client/async_client/mod.rs index 74ce0088f1..33c1a7273b 100644 --- a/core/src/client/async_client/mod.rs +++ b/core/src/client/async_client/mod.rs @@ -790,7 +790,7 @@ async fn background_task( // Wake the `on_disconnect` method. _ = frontend.close(); // Send close message to the server. - let _ = sender.close().await; + _ = sender.close().await; } fn unparse_error(raw: &[u8]) -> Error { From 16a0e3dd051948ebdcdf4b653fe018c8ba7e98be Mon Sep 17 00:00:00 2001 From: Niklas Adolfsson Date: Tue, 31 Jan 2023 17:08:22 +0100 Subject: [PATCH 9/9] fix unwrap --- core/src/client/async_client/mod.rs | 2 +- core/src/client/mod.rs | 3 ++- 2 files changed, 3 insertions(+), 2 deletions(-) diff --git a/core/src/client/async_client/mod.rs b/core/src/client/async_client/mod.rs index 244b9d3238..aa6a0ebb97 100644 --- a/core/src/client/async_client/mod.rs +++ b/core/src/client/async_client/mod.rs @@ -788,7 +788,7 @@ async fn background_task( } // Wake the `on_disconnect` method. - let _ = frontend.close(); + frontend.close(); // Send close message to the server. let _ = sender.close().await; } diff --git a/core/src/client/mod.rs b/core/src/client/mod.rs index 1fcff16697..2b8bbaff82 100644 --- a/core/src/client/mod.rs +++ b/core/src/client/mod.rs @@ -255,7 +255,8 @@ impl Subscription { SubscriptionKind::Method(notif) => FrontToBack::UnregisterNotification(notif), SubscriptionKind::Subscription(sub_id) => FrontToBack::SubscriptionClosed(sub_id), }; - self.to_back.send(msg).await.unwrap(); + // If this fails the connection was already closed i.e, already "unsubscribed". + let _ = self.to_back.send(msg).await; // wait until notif channel is closed then the subscription was closed. while self.notifs_rx.recv().await.is_some() {}