From a920e34c2019ad46a41bad4c6eb672a903e63095 Mon Sep 17 00:00:00 2001 From: Niklas Adolfsson Date: Mon, 8 Mar 2021 11:52:23 +0100 Subject: [PATCH] integration with jsonrpsee v2 (#214) * hacky integration with jsonrpsee v2 * stray todos * fmt * add http support * make test build compile * Update src/rpc.rs * bring back set_client * use crates.io version jsonrpsee * WIP: workaround for embedded subxt client (#236) * workaround for embedded subxt client Signed-off-by: Gregory Hill * increase default channel size on subxt client Signed-off-by: Gregory Hill * remove client tests due to inference problem on From Signed-off-by: Gregory Hill * add comments for missing impls * more verbose errors * make subscription notifs buffer bigger * fmt Co-authored-by: Greg Hill --- Cargo.toml | 11 +- client/Cargo.toml | 5 +- client/src/lib.rs | 404 ++++++++++++++++++++++++----------------- src/error.rs | 8 +- src/frame/contracts.rs | 11 +- src/lib.rs | 36 ++-- src/rpc.rs | 109 ++++++++++- src/subscription.rs | 12 +- 8 files changed, 391 insertions(+), 205 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index 09b994b33c740..d39a17ba7b847 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -20,14 +20,13 @@ include = ["Cargo.toml", "src/**/*.rs", "README.md", "LICENSE"] default = [] client = ["substrate-subxt-client"] -# enable this feature to run tests which require a local dev chain node -integration-tests = [] - [dependencies] log = "0.4.13" thiserror = "1.0.23" futures = "0.3.10" -jsonrpsee = { version = "0.1.0", features = ["ws"] } +jsonrpsee-types = "0.2.0-alpha" +jsonrpsee-ws-client = "0.2.0-alpha" +jsonrpsee-http-client = "0.2.0-alpha" num-traits = { version = "0.2.14", default-features = false } serde = { version = "1.0.119", features = ["derive"] } serde_json = "1.0.61" @@ -50,7 +49,7 @@ pallet-staking = "3.0.0" sp-rpc = { version = "3.0.0", package = "sp-rpc" } sp-core = { version = "3.0.0", package = "sp-core" } -substrate-subxt-client = { version = "0.6.0", path = "client", optional = true } +substrate-subxt-client = { path = "client", optional = true } substrate-subxt-proc-macro = { version = "0.14.0", path = "proc-macro" } [dev-dependencies] @@ -59,7 +58,7 @@ env_logger = "0.8.2" frame-system = "3.0.0" pallet-balances = "3.0.0" sp-keyring = "3.0.0" -substrate-subxt-client = { version = "0.6.0", path = "client" } +substrate-subxt-client = { path = "client" } tempdir = "0.3.7" test-node = { path = "test-node" } wabt = "0.10.0" diff --git a/client/Cargo.toml b/client/Cargo.toml index 17d05d68de4a9..d0488a3e8c370 100644 --- a/client/Cargo.toml +++ b/client/Cargo.toml @@ -15,7 +15,8 @@ keywords = ["parity", "substrate", "blockchain"] async-std = "1.8.0" futures = { version = "0.3.9", features = ["compat"], package = "futures" } futures01 = { package = "futures", version = "0.1.29" } -jsonrpsee = "0.1.0" +jsonrpsee-types = "0.2.0-alpha" +jsonrpsee-ws-client = "0.2.0-alpha" log = "0.4.13" sc-network = { version = "0.9.0", default-features = false } sc-client-db = "0.9.0" @@ -30,6 +31,6 @@ sc-service = { version = "0.9.0", default-features = false, features = ["wasmtim [dev-dependencies] async-std = { version = "1.8.0", features = ["attributes"] } env_logger = "0.8.2" -substrate-subxt = { path = ".." } +substrate-subxt = { path = "..", features = ["client"] } tempdir = "0.3.7" test-node = { path = "../test-node" } diff --git a/client/src/lib.rs b/client/src/lib.rs index 5699ec1842c1d..0f8946ae81962 100644 --- a/client/src/lib.rs +++ b/client/src/lib.rs @@ -20,12 +20,11 @@ use async_std::task; use futures::{ - channel::mpsc, - compat::{ - Compat01As03, - Sink01CompatExt, - Stream01CompatExt, + channel::{ + mpsc, + oneshot, }, + compat::Stream01CompatExt, future::{ select, FutureExt, @@ -34,12 +33,25 @@ use futures::{ stream::StreamExt, }; use futures01::sync::mpsc as mpsc01; -use jsonrpsee::{ - common::{ +use jsonrpsee_types::{ + client::{ + FrontToBack, + Subscription, + }, + error::Error as JsonRpseeError, + jsonrpc::{ + self, + Call, + DeserializeOwned, + Id, + MethodCall, + Notification, + Output, Request, - Response, + SubscriptionId, + SubscriptionNotif, + Version, }, - transport::TransportClient, }; use sc_network::config::TransportConfig; pub use sc_service::{ @@ -63,12 +75,11 @@ use sc_service::{ RpcSession, TaskManager, }; -use std::{ - future::Future, - pin::Pin, -}; +use std::marker::PhantomData; use thiserror::Error; +const DEFAULT_CHANNEL_SIZE: usize = 16; + /// Error thrown by the client. #[derive(Debug, Error)] pub enum SubxtClientError { @@ -81,28 +92,144 @@ pub enum SubxtClientError { } /// Client for an embedded substrate node. +#[derive(Clone)] pub struct SubxtClient { - to_back: mpsc::Sender, - from_back: Compat01As03>, + to_back: mpsc::Sender, } impl SubxtClient { /// Create a new client. pub fn new(mut task_manager: TaskManager, rpc: RpcHandlers) -> Self { - let (to_back, from_front) = mpsc::channel(4); - let (to_front, from_back) = mpsc01::channel(4); + let (to_back, from_front) = mpsc::channel(DEFAULT_CHANNEL_SIZE); - let session = RpcSession::new(to_front.clone()); task::spawn( select( - Box::pin(from_front.for_each(move |message: String| { + Box::pin(from_front.for_each(move |message: FrontToBack| { let rpc = rpc.clone(); - let session = session.clone(); - let mut to_front = to_front.clone().sink_compat(); + let (to_front, from_back) = mpsc01::channel(DEFAULT_CHANNEL_SIZE); + let session = RpcSession::new(to_front.clone()); async move { - let response = rpc.rpc_query(&session, &message).await; - if let Some(response) = response { - to_front.send(response).await.ok(); + match message { + FrontToBack::Notification { method, params } => { + let request = + Request::Single(Call::Notification(Notification { + jsonrpc: Version::V2, + method, + params, + })); + if let Ok(message) = serde_json::to_string(&request) { + rpc.rpc_query(&session, &message).await; + } + } + + FrontToBack::StartRequest { + method, + params, + send_back, + } => { + let request = + Request::Single(Call::MethodCall(MethodCall { + jsonrpc: Version::V2, + method: method.into(), + params: params.into(), + id: Id::Num(0), + })); + if let Ok(message) = serde_json::to_string(&request) { + if let Some(response) = + rpc.rpc_query(&session, &message).await + { + let result = match serde_json::from_str::( + &response, + ) + .expect("failed to decode request response") + { + Output::Success(success) => { + Ok(success.result) + } + Output::Failure(failure) => { + Err(JsonRpseeError::Request( + failure.error, + )) + } + }; + + send_back + .send(result) + .expect("failed to send request response"); + } + } + } + + FrontToBack::Subscribe { + subscribe_method, + params, + unsubscribe_method: _, + send_back, + } => { + let request = + Request::Single(Call::MethodCall(MethodCall { + jsonrpc: Version::V2, + method: subscribe_method, + params, + id: Id::Num(0), + })); + + let (mut send_front_sub, send_back_sub) = + mpsc::channel(DEFAULT_CHANNEL_SIZE); + if let Ok(message) = serde_json::to_string(&request) { + if let Some(response) = + rpc.rpc_query(&session, &message).await + { + let result = match serde_json::from_str::( + &response, + ) + .expect("failed to decode subscription response") + { + Output::Success(_) => { + Ok(( + send_back_sub, + // NOTE: The ID is used to unsubscribe to specific subscription + // which the `SubxtClient` doesn't support so hardcoding it to `0` + // is fine. + SubscriptionId::Num(0), + )) + } + Output::Failure(failure) => { + Err(JsonRpseeError::Request( + failure.error, + )) + } + }; + + send_back.send(result).expect( + "failed to send subscription response", + ); + } + } + + task::spawn(async move { + let mut from_back = from_back.compat(); + let _session = session.clone(); + + while let Some(Ok(response)) = from_back.next().await + { + let notif = serde_json::from_str::< + SubscriptionNotif, + >( + &response + ) + .expect("failed to decode subscription notif"); + send_front_sub + .send(notif.params.result) + .await + .expect("failed to send subscription notif") + } + }); + } + + FrontToBack::SubscriptionClosed(_) => { + // NOTE: unsubscriptions are not supported by SubxtClient. + } } } })), @@ -113,10 +240,7 @@ impl SubxtClient { .map(drop), ); - Self { - to_back, - from_back: from_back.compat(), - } + Self { to_back } } /// Creates a new client from a config. @@ -128,41 +252,98 @@ impl SubxtClient { let (task_manager, rpc_handlers) = (builder)(config)?; Ok(Self::new(task_manager, rpc_handlers)) } -} -impl TransportClient for SubxtClient { - type Error = SubxtClientError; - - fn send_request<'a>( - &'a mut self, - request: Request, - ) -> Pin> + Send + 'a>> { - Box::pin(async move { - let request = serde_json::to_string(&request)?; - self.to_back.send(request).await?; - Ok(()) - }) + /// Send a JSONRPC notification. + pub async fn notification( + &self, + method: M, + params: P, + ) -> Result<(), JsonRpseeError> + where + M: Into + Send, + P: Into + Send, + { + self.to_back + .clone() + .send(FrontToBack::Notification { + method: method.into(), + params: params.into(), + }) + .await + .map_err(|e| JsonRpseeError::TransportError(Box::new(e))) } - fn next_response<'a>( - &'a mut self, - ) -> Pin> + Send + 'a>> { - Box::pin(async move { - let response = self - .from_back - .next() - .await - .expect("channel shouldn't close") - .unwrap(); - Ok(serde_json::from_str(&response)?) - }) + /// Send a JSONRPC request. + pub async fn request( + &self, + method: M, + params: P, + ) -> Result + where + T: DeserializeOwned, + M: Into + Send, + P: Into + Send, + { + let (send_back_tx, send_back_rx) = oneshot::channel(); + + self.to_back + .clone() + .send(FrontToBack::StartRequest { + method: method.into(), + params: params.into(), + send_back: send_back_tx, + }) + .await + .map_err(|e| JsonRpseeError::TransportError(Box::new(e)))?; + + let json_value = match send_back_rx.await { + Ok(Ok(v)) => v, + Ok(Err(err)) => return Err(err), + Err(err) => return Err(JsonRpseeError::TransportError(Box::new(err))), + }; + jsonrpc::from_value(json_value).map_err(JsonRpseeError::ParseError) } -} -impl From for jsonrpsee::Client { - fn from(client: SubxtClient) -> Self { - let client = jsonrpsee::raw::RawClient::new(client); - jsonrpsee::Client::new(client) + /// Send a subscription request to the server. + pub async fn subscribe( + &self, + subscribe_method: SM, + params: P, + unsubscribe_method: UM, + ) -> Result, JsonRpseeError> + where + SM: Into + Send, + UM: Into + Send, + P: Into + Send, + N: DeserializeOwned, + { + let subscribe_method = subscribe_method.into(); + let unsubscribe_method = unsubscribe_method.into(); + let params = params.into(); + + let (send_back_tx, send_back_rx) = oneshot::channel(); + self.to_back + .clone() + .send(FrontToBack::Subscribe { + subscribe_method, + unsubscribe_method, + params, + send_back: send_back_tx, + }) + .await + .map_err(JsonRpseeError::Internal)?; + + let (notifs_rx, id) = match send_back_rx.await { + Ok(Ok(val)) => val, + Ok(Err(err)) => return Err(err), + Err(err) => return Err(JsonRpseeError::TransportError(Box::new(err))), + }; + Ok(Subscription { + to_back: self.to_back.clone(), + notifs_rx, + marker: PhantomData, + id, + }) } } @@ -171,7 +352,7 @@ impl From for jsonrpsee::Client { pub enum Role { /// Light client. Light, - /// A full node (maninly used for testing purposes). + /// A full node (mainly used for testing purposes). Authority(sp_keyring::AccountKeyring), } @@ -311,108 +492,3 @@ impl SubxtClientConfig { service_config } } - -#[cfg(test)] -mod tests { - use super::*; - use async_std::path::Path; - use sp_keyring::AccountKeyring; - use substrate_subxt::{ - balances::TransferCallExt, - ClientBuilder, - NodeTemplateRuntime, - PairSigner, - }; - use tempdir::TempDir; - - #[async_std::test] - #[ignore] - async fn test_client() { - env_logger::try_init().ok(); - let client = ClientBuilder::::new() - .build() - .await - .unwrap(); - let signer = PairSigner::new(AccountKeyring::Alice.pair()); - let to = AccountKeyring::Bob.to_account_id().into(); - client - .transfer_and_watch(&signer, &to, 10_000) - .await - .unwrap(); - } - - #[async_std::test] - #[ignore] - async fn test_light_client() { - env_logger::try_init().ok(); - let chain_spec_path = - Path::new(env!("CARGO_MANIFEST_DIR")).join("dev-chain.json"); - let bytes = async_std::fs::read(chain_spec_path).await.unwrap(); - let chain_spec = - test_node::chain_spec::ChainSpec::from_json_bytes(bytes).unwrap(); - let tmp = TempDir::new("subxt-").expect("failed to create tempdir"); - let config = SubxtClientConfig { - // base_path: - impl_name: "substrate-subxt-light-client", - impl_version: "0.0.1", - author: "David Craven", - copyright_start_year: 2020, - db: DatabaseConfig::RocksDb { - path: tmp.path().into(), - cache_size: 64, - }, - keystore: KeystoreConfig::InMemory, - chain_spec, - role: Role::Light, - telemetry: None, - wasm_method: Default::default(), - }; - let client = ClientBuilder::::new() - .set_client( - SubxtClient::from_config(config, test_node::service::new_light).unwrap(), - ) - .build() - .await - .unwrap(); - let signer = PairSigner::new(AccountKeyring::Alice.pair()); - let to = AccountKeyring::Bob.to_account_id().into(); - client - .transfer_and_watch(&signer, &to, 10_000) - .await - .unwrap(); - } - - #[async_std::test] - async fn test_full_client() { - env_logger::try_init().ok(); - let tmp = TempDir::new("subxt-").expect("failed to create tempdir"); - let config = SubxtClientConfig { - impl_name: "substrate-subxt-full-client", - impl_version: "0.0.1", - author: "David Craven", - copyright_start_year: 2020, - db: DatabaseConfig::RocksDb { - path: tmp.path().into(), - cache_size: 128, - }, - keystore: KeystoreConfig::InMemory, - chain_spec: test_node::chain_spec::development_config().unwrap(), - role: Role::Authority(AccountKeyring::Alice), - telemetry: None, - wasm_method: Default::default(), - }; - let client = ClientBuilder::::new() - .set_client( - SubxtClient::from_config(config, test_node::service::new_full).unwrap(), - ) - .build() - .await - .unwrap(); - let signer = PairSigner::new(AccountKeyring::Alice.pair()); - let to = AccountKeyring::Bob.to_account_id().into(); - client - .transfer_and_watch(&signer, &to, 10_000) - .await - .unwrap(); - } -} diff --git a/src/error.rs b/src/error.rs index 65dacc2b601e0..7e05ef2f3840a 100644 --- a/src/error.rs +++ b/src/error.rs @@ -14,10 +14,7 @@ // You should have received a copy of the GNU General Public License // along with substrate-subxt. If not, see . -use jsonrpsee::{ - client::RequestError, - transport::ws::WsNewDnsError, -}; +use jsonrpsee_types::error::Error as RequestError; use sp_core::crypto::SecretStringError; use sp_runtime::{ transaction_validity::TransactionValidityError, @@ -42,9 +39,6 @@ pub enum Error { /// Rpc error. #[error("Rpc error: {0}")] Rpc(#[from] RequestError), - /// Error that can happen during the initial websocket handshake - #[error("Rpc error: {0}")] - WsHandshake(#[from] WsNewDnsError), /// Serde serialization error #[error("Serde json error: {0}")] Serialization(#[from] serde_json::error::Error), diff --git a/src/frame/contracts.rs b/src/frame/contracts.rs index b224bdd1af199..c06436df1f90a 100644 --- a/src/frame/contracts.rs +++ b/src/frame/contracts.rs @@ -60,7 +60,7 @@ pub struct PutCodeCall<'a, T: Contracts> { /// - The contract is initialized. #[derive(Clone, Debug, Eq, PartialEq, Call, Encode)] pub struct InstantiateCall<'a, T: Contracts> { - /// Initial balance transfered to the contract. + /// Initial balance transferred to the contract. #[codec(compact)] pub endowment: ::Balance, /// Gas limit. @@ -188,7 +188,7 @@ mod tests { // fund the account let endowment = 200_000_000_000_000; let _ = client - .transfer_and_watch(stash, &new_account_id, endowment) + .transfer_and_watch(stash, &new_account_id.into(), endowment) .await .expect("New account balance transfer failed"); stash.increment_nonce(); @@ -291,8 +291,11 @@ mod tests { let ctx = TestContext::init().await; let code_stored = ctx.put_code().await.unwrap(); - let instantiated = ctx.instantiate(&code_stored.code_hash, &[]).await.unwrap(); - let executed = ctx.call(&instantiated.contract, &[]).await; + let instantiated = ctx + .instantiate(&code_stored.code_hash.into(), &[]) + .await + .unwrap(); + let executed = ctx.call(&instantiated.contract.into(), &[]).await; assert!( executed.is_ok(), diff --git a/src/lib.rs b/src/lib.rs index 5b95d83cd2585..b1b3c9c0a889b 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -43,9 +43,6 @@ #[macro_use] extern crate substrate_subxt_proc_macro; -#[cfg(feature = "client")] -pub use substrate_subxt_client as client; - pub use sp_core; pub use sp_runtime; @@ -54,7 +51,15 @@ use codec::{ Decode, }; use futures::future; -use jsonrpsee::client::Subscription; +use jsonrpsee_http_client::{ + HttpClient, + HttpConfig, +}; +use jsonrpsee_ws_client::{ + WsClient, + WsConfig, + WsSubscription as Subscription, +}; use sp_core::{ storage::{ StorageChangeSet, @@ -65,7 +70,10 @@ use sp_core::{ }; pub use sp_runtime::traits::SignedExtension; pub use sp_version::RuntimeVersion; -use std::marker::PhantomData; +use std::{ + marker::PhantomData, + sync::Arc, +}; mod error; mod events; @@ -98,6 +106,7 @@ pub use crate::{ BlockNumber, ExtrinsicSuccess, ReadProof, + RpcClient, SystemProperties, }, runtimes::*, @@ -120,7 +129,7 @@ use crate::{ #[derive(Default)] pub struct ClientBuilder { url: Option, - client: Option, + client: Option, page_size: Option, event_type_registry: EventTypeRegistry, skip_type_sizes_check: bool, @@ -139,7 +148,7 @@ impl ClientBuilder { } /// Sets the jsonrpsee client. - pub fn set_client>(mut self, client: P) -> Self { + pub fn set_client>(mut self, client: C) -> Self { self.client = Some(client.into()); self } @@ -185,9 +194,13 @@ impl ClientBuilder { } else { let url = self.url.as_deref().unwrap_or("ws://127.0.0.1:9944"); if url.starts_with("ws://") || url.starts_with("wss://") { - jsonrpsee::ws_client(url).await? + let mut config = WsConfig::with_url(&url); + // max notifs per subscription capacity. + config.max_subscription_capacity = 4096; + RpcClient::WebSocket(WsClient::new(WsConfig::with_url(&url)).await?) } else { - jsonrpsee::http_client(url) + let client = HttpClient::new(url, HttpConfig::default())?; + RpcClient::Http(Arc::new(client)) } }; let rpc = Rpc::new(client); @@ -670,7 +683,6 @@ mod tests { .expect("Error creating client"); (client, tmp) } - pub(crate) async fn test_client() -> (Client, TempDir) { test_client_with(AccountKeyring::Alice).await } @@ -681,7 +693,7 @@ mod tests { let (client, _tmp) = test_client_with(AccountKeyring::Bob).await; let mut blocks = client.subscribe_blocks().await.unwrap(); // get the genesis block. - assert_eq!(blocks.next().await.number, 0); + assert_eq!(blocks.next().await.unwrap().number, 0); let public = AccountKeyring::Alice.public().as_array_ref().to_vec(); client .insert_key( @@ -696,7 +708,7 @@ mod tests { .await .unwrap()); // Alice is an authority, so blocks should be produced. - assert_eq!(blocks.next().await.number, 1); + assert_eq!(blocks.next().await.unwrap().number, 1); } #[async_std::test] diff --git a/src/rpc.rs b/src/rpc.rs index 6aa0e982c7703..4ee4ead50e6ec 100644 --- a/src/rpc.rs +++ b/src/rpc.rs @@ -19,6 +19,8 @@ // Related: https://github.com/paritytech/substrate-subxt/issues/66 #![allow(irrefutable_let_patterns)] +use std::sync::Arc; + use codec::{ Decode, Encode, @@ -29,13 +31,22 @@ use core::{ marker::PhantomData, }; use frame_metadata::RuntimeMetadataPrefixed; -use jsonrpsee::{ - client::Subscription, - common::{ +use jsonrpsee_http_client::HttpClient; +use jsonrpsee_types::{ + error::Error as RpcError, + jsonrpc::{ to_value as to_json_value, + DeserializeOwned, Params, }, - Client, + traits::{ + Client, + SubscriptionClient, + }, +}; +use jsonrpsee_ws_client::{ + WsClient, + WsSubscription as Subscription, }; use serde::{ Deserialize, @@ -142,6 +153,88 @@ pub enum TransactionStatus { Invalid, } +#[cfg(any(feature = "client", test))] +use substrate_subxt_client::SubxtClient; + +/// Rpc client wrapper. +/// This is workaround because adding generic types causes the macros to fail. +#[derive(Clone)] +pub enum RpcClient { + /// JSONRPC client WebSocket transport. + WebSocket(WsClient), + /// JSONRPC client HTTP transport. + // NOTE: Arc because `HttpClient` is not clone. + Http(Arc), + #[cfg(any(feature = "client", test))] + /// Embedded substrate node. + Subxt(SubxtClient), +} + +impl RpcClient { + async fn request( + &self, + method: &str, + params: Params, + ) -> Result { + match self { + Self::WebSocket(inner) => { + inner.request(method, params).await.map_err(Into::into) + } + Self::Http(inner) => inner.request(method, params).await.map_err(Into::into), + #[cfg(any(feature = "client", test))] + Self::Subxt(inner) => inner.request(method, params).await.map_err(Into::into), + } + } + + async fn subscribe( + &self, + subscribe_method: &str, + params: Params, + unsubscribe_method: &str, + ) -> Result, Error> { + match self { + Self::WebSocket(inner) => { + inner + .subscribe(subscribe_method, params, unsubscribe_method) + .await + .map_err(Into::into) + } + Self::Http(_) => { + Err(RpcError::Custom( + "Subscriptions not supported on HTTP transport".to_owned(), + ) + .into()) + } + #[cfg(any(feature = "client", test))] + Self::Subxt(inner) => { + inner + .subscribe(subscribe_method, params, unsubscribe_method) + .await + .map_err(Into::into) + } + } + } +} + +impl From for RpcClient { + fn from(client: WsClient) -> Self { + RpcClient::WebSocket(client) + } +} + +impl From for RpcClient { + fn from(client: HttpClient) -> Self { + RpcClient::Http(Arc::new(client)) + } +} + +#[cfg(any(feature = "client", test))] +impl From for RpcClient { + fn from(client: SubxtClient) -> Self { + RpcClient::Subxt(client) + } +} + /// ReadProof struct returned by the RPC /// /// # Note @@ -159,7 +252,7 @@ pub struct ReadProof { /// Client for substrate rpc interfaces pub struct Rpc { - client: Client, + client: RpcClient, marker: PhantomData, } @@ -173,7 +266,7 @@ impl Clone for Rpc { } impl Rpc { - pub fn new(client: Client) -> Self { + pub fn new(client: RpcClient) -> Self { Self { client, marker: PhantomData, @@ -434,7 +527,7 @@ impl Rpc { let events_sub = self.subscribe_events().await?; let mut xt_sub = self.watch_extrinsic(extrinsic).await?; - while let status = xt_sub.next().await { + while let Some(status) = xt_sub.next().await { // log::info!("received status {:?}", status); match status { // ignore in progress extrinsic for now @@ -497,7 +590,7 @@ impl Rpc { } } } - unreachable!() + Err(RpcError::Custom("RPC subscription dropped".into()).into()) } /// Insert a key into the keystore. diff --git a/src/subscription.rs b/src/subscription.rs index aa74852199ffb..4fc48dd72a7e5 100644 --- a/src/subscription.rs +++ b/src/subscription.rs @@ -14,7 +14,8 @@ // You should have received a copy of the GNU General Public License // along with substrate-subxt. If not, see . -use jsonrpsee::client::Subscription; +use jsonrpsee_types::error::Error as RpcError; +use jsonrpsee_ws_client::WsSubscription as Subscription; use sp_core::storage::StorageChangeSet; use std::collections::VecDeque; @@ -86,7 +87,14 @@ impl<'a, T: Runtime> EventSubscription<'a, T> { if self.finished { return None } - let change_set = self.subscription.next().await; + let change_set = match self.subscription.next().await { + Some(c) => c, + None => { + return Some(Err( + RpcError::Custom("RPC subscription dropped".into()).into() + )) + } + }; if let Some(hash) = self.block.as_ref() { if &change_set.block == hash { self.finished = true;