From 9ed2a3df76c4cd9a98a621bc354f1f2aee29809f Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Piotr=20Miko=C5=82ajczyk?= Date: Fri, 2 Sep 2022 10:13:20 +0200 Subject: [PATCH 01/24] Refactor RPC module --- bin/node/cli/tests/common.rs | 4 +- .../frame/remote-externalities/src/rpc_api.rs | 172 ++++++++++++------ .../cli/src/commands/execute_block.rs | 3 +- .../cli/src/commands/follow_chain.rs | 4 +- .../cli/src/commands/offchain_worker.rs | 3 +- utils/frame/try-runtime/cli/src/lib.rs | 3 +- 6 files changed, 124 insertions(+), 65 deletions(-) diff --git a/bin/node/cli/tests/common.rs b/bin/node/cli/tests/common.rs index 9c739c2cf2d28..9e72266f06dd6 100644 --- a/bin/node/cli/tests/common.rs +++ b/bin/node/cli/tests/common.rs @@ -33,6 +33,7 @@ use std::{ time::Duration, }; use tokio::time::timeout; +use remote_externalities::rpc_api::RpcService; /// Wait for the given `child` the given number of `secs`. /// @@ -71,9 +72,10 @@ pub async fn wait_n_finalized_blocks( pub async fn wait_n_finalized_blocks_from(n: usize, url: &str) { let mut built_blocks = std::collections::HashSet::new(); let mut interval = tokio::time::interval(Duration::from_secs(2)); + let mut rpc_service = RpcService::new(url, false); loop { - if let Ok(block) = rpc_api::get_finalized_head::(url.to_string()).await { + if let Ok(block) = rpc_service.get_finalized_head::().await { built_blocks.insert(block); if built_blocks.len() > n { break diff --git a/utils/frame/remote-externalities/src/rpc_api.rs b/utils/frame/remote-externalities/src/rpc_api.rs index 37555de480d4c..85069dd946ed7 100644 --- a/utils/frame/remote-externalities/src/rpc_api.rs +++ b/utils/frame/remote-externalities/src/rpc_api.rs @@ -19,82 +19,134 @@ // TODO: Consolidate one off RPC calls https://github.com/paritytech/substrate/issues/8988 use jsonrpsee::{ - core::client::ClientT, + core::client::{Client, ClientT}, rpc_params, + types::ParamsSer, ws_client::{WsClient, WsClientBuilder}, }; +use serde::de::DeserializeOwned; use sp_runtime::{ generic::SignedBlock, traits::{Block as BlockT, Header as HeaderT}, }; -/// Get the header of the block identified by `at` -pub async fn get_header(from: S, at: Block::Hash) -> Result -where - Block: BlockT, - Block::Header: serde::de::DeserializeOwned, - S: AsRef, -{ - let client = build_client(from).await?; - - client - .request::("chain_getHeader", rpc_params!(at)) - .await - .map_err(|e| format!("chain_getHeader request failed: {:?}", e)) +enum RpcCall { + GetHeader, + GetFinalizedHead, + GetBlock, + GetRuntimeVersion, } -/// Get the finalized head -pub async fn get_finalized_head(from: S) -> Result -where - Block: BlockT, - S: AsRef, -{ - let client = build_client(from).await?; +impl RpcCall { + fn as_str(&self) -> &'static str { + match self { + RpcCall::GetHeader => "chain_getHeader", + RpcCall::GetFinalizedHead => "chain_getFinalizedHead", + RpcCall::GetBlock => "chain_getBlock", + RpcCall::GetRuntimeVersion => "state_getRuntimeVersion", + } + } +} +/// General purpose method for making RPC calls. +async fn make_request<'a, T: DeserializeOwned>( + client: &Client, + call: RpcCall, + params: Option>, +) -> Result { client - .request::("chain_getFinalizedHead", None) + .request::(call.as_str(), params) .await - .map_err(|e| format!("chain_getFinalizedHead request failed: {:?}", e)) + .map_err(|e| format!("{} request failed: {:?}", call.as_str(), e)) } -/// Get the signed block identified by `at`. -pub async fn get_block(from: S, at: Block::Hash) -> Result -where - S: AsRef, - Block: BlockT + serde::de::DeserializeOwned, - Block::Header: HeaderT, -{ - let client = build_client(from).await?; - let signed_block = client - .request::>("chain_getBlock", rpc_params!(at)) - .await - .map_err(|e| format!("chain_getBlock request failed: {:?}", e))?; - - Ok(signed_block.block) +/// Simple RPC service that is capable of keeping the connection. +/// +/// Service will connect to `uri` for the first time during the first request. Instantiation +/// does not trigger connecting. +pub struct RpcService { + uri: String, + client: Option, + keep_connection: bool, } -/// Build a websocket client that connects to `from`. -async fn build_client>(from: S) -> Result { - WsClientBuilder::default() - .max_request_body_size(u32::MAX) - .build(from.as_ref()) - .await - .map_err(|e| format!("`WsClientBuilder` failed to build: {:?}", e)) -} +impl RpcService { + /// Creates a new RPC service. + /// + /// Does not connect yet. + pub fn new>(uri: S, keep_connection: bool) -> Self { + Self { uri: uri.as_ref().to_string(), client: None, keep_connection } + } -/// Get the runtime version of a given chain. -pub async fn get_runtime_version( - from: S, - at: Option, -) -> Result -where - S: AsRef, - Block: BlockT + serde::de::DeserializeOwned, - Block::Header: HeaderT, -{ - let client = build_client(from).await?; - client - .request::("state_getRuntimeVersion", rpc_params!(at)) - .await - .map_err(|e| format!("state_getRuntimeVersion request failed: {:?}", e)) + /// Returns the address at which requests are sent. + pub fn uri(&self) -> String { + self.uri.clone() + } + + /// Whether to keep and reuse a single connection. + pub fn keep_connection(&self) -> bool { + self.keep_connection + } + + /// Build a websocket client that connects to `self.uri`. + async fn build_client(&self) -> Result { + WsClientBuilder::default() + .max_request_body_size(u32::MAX) + .build(&self.uri) + .await + .map_err(|e| format!("`WsClientBuilder` failed to build: {:?}", e)) + } + + /// Generic method for making RPC requests. + async fn make_request<'a, T: DeserializeOwned>( + &mut self, + call: RpcCall, + params: Option>, + ) -> Result { + match &self.client { + // `self.keep_connection` must be `true. + Some(ref client) => make_request(client, call, params).await, + None => { + let client = self.build_client().await?; + let result = make_request(&client, call, params).await; + if self.keep_connection { + self.client = Some(client) + }; + result + }, + } + } + + /// Get the header of the block identified by `at`. + pub async fn get_header(&mut self, at: Block::Hash) -> Result + where + Block: BlockT, + Block::Header: DeserializeOwned, + { + self.make_request(RpcCall::GetHeader, rpc_params!(at)).await + } + + /// Get the finalized head. + pub async fn get_finalized_head(&mut self) -> Result { + self.make_request(RpcCall::GetFinalizedHead, None).await + } + + /// Get the signed block identified by `at`. + pub async fn get_block( + &mut self, + at: Block::Hash, + ) -> Result { + Ok(self + .make_request::>(RpcCall::GetBlock, rpc_params!(at)) + .await? + .block) + } + + /// Get the runtime version of a given chain. + pub async fn get_runtime_version( + &mut self, + at: Option, + ) -> Result { + self.make_request(RpcCall::GetRuntimeVersion, rpc_params!(at)).await + } } diff --git a/utils/frame/try-runtime/cli/src/commands/execute_block.rs b/utils/frame/try-runtime/cli/src/commands/execute_block.rs index 6a3ef24ff3771..5f2163a87df82 100644 --- a/utils/frame/try-runtime/cli/src/commands/execute_block.rs +++ b/utils/frame/try-runtime/cli/src/commands/execute_block.rs @@ -148,7 +148,8 @@ where let block_ws_uri = command.block_ws_uri::(); let block_at = command.block_at::(block_ws_uri.clone()).await?; - let block: Block = rpc_api::get_block::(block_ws_uri.clone(), block_at).await?; + let mut rpc_service = rpc_api::RpcService::new(block_ws_uri.clone(), false); + let block: Block = rpc_service.get_block::(block_at).await?; let parent_hash = block.header().parent_hash(); log::info!( target: LOG_TARGET, diff --git a/utils/frame/try-runtime/cli/src/commands/follow_chain.rs b/utils/frame/try-runtime/cli/src/commands/follow_chain.rs index 01fc1dae15a05..97584240595ec 100644 --- a/utils/frame/try-runtime/cli/src/commands/follow_chain.rs +++ b/utils/frame/try-runtime/cli/src/commands/follow_chain.rs @@ -90,6 +90,8 @@ where let executor = build_executor::(&shared, &config); let execution = shared.execution; + let mut rpc_service = rpc_api::RpcService::new(&command.uri, false); + loop { let header = match subscription.next().await { Some(Ok(header)) => header, @@ -106,7 +108,7 @@ where let hash = header.hash(); let number = header.number(); - let block = rpc_api::get_block::(&command.uri, hash).await.unwrap(); + let block = rpc_service.get_block::(hash).await.unwrap(); log::debug!( target: LOG_TARGET, diff --git a/utils/frame/try-runtime/cli/src/commands/offchain_worker.rs b/utils/frame/try-runtime/cli/src/commands/offchain_worker.rs index 11ceb0a81cf37..bc92cfd5eee14 100644 --- a/utils/frame/try-runtime/cli/src/commands/offchain_worker.rs +++ b/utils/frame/try-runtime/cli/src/commands/offchain_worker.rs @@ -119,7 +119,8 @@ where let header_at = command.header_at::()?; let header_ws_uri = command.header_ws_uri::(); - let header = rpc_api::get_header::(header_ws_uri.clone(), header_at).await?; + let mut rpc_service = rpc_api::RpcService::new(header_ws_uri.clone(), false); + let header = rpc_service.get_header::(header_at).await?; log::info!( target: LOG_TARGET, "fetched header from {:?}, block number: {:?}", diff --git a/utils/frame/try-runtime/cli/src/lib.rs b/utils/frame/try-runtime/cli/src/lib.rs index 76679c43f7f14..95d8669d55958 100644 --- a/utils/frame/try-runtime/cli/src/lib.rs +++ b/utils/frame/try-runtime/cli/src/lib.rs @@ -632,7 +632,8 @@ pub(crate) async fn ensure_matching_spec(uri.clone(), None) + let mut rpc_service = remote_externalities::rpc_api::RpcService::new(uri.clone(), false); + match rpc_service.get_runtime_version::(None) .await .map(|version| (String::from(version.spec_name.clone()), version.spec_version)) .map(|(spec_name, spec_version)| (spec_name.to_lowercase(), spec_version)) From c2e700921f74b6eb45d19a7ed8c751e081157d94 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Piotr=20Miko=C5=82ajczyk?= Date: Fri, 2 Sep 2022 11:31:26 +0200 Subject: [PATCH 02/24] Add flag to `follow-chain` --- utils/frame/try-runtime/cli/src/commands/follow_chain.rs | 6 +++++- utils/frame/try-runtime/cli/src/lib.rs | 3 ++- 2 files changed, 7 insertions(+), 2 deletions(-) diff --git a/utils/frame/try-runtime/cli/src/commands/follow_chain.rs b/utils/frame/try-runtime/cli/src/commands/follow_chain.rs index 97584240595ec..d49a23471a595 100644 --- a/utils/frame/try-runtime/cli/src/commands/follow_chain.rs +++ b/utils/frame/try-runtime/cli/src/commands/follow_chain.rs @@ -56,6 +56,10 @@ pub struct FollowChainCmd { /// round-robin fashion. #[clap(long, default_value = "none")] try_state: frame_try_runtime::TryStateSelect, + + /// If present, a single connection to a node will be kept and reused for fetching blocks. + #[clap(long)] + keep_connection: bool, } pub(crate) async fn follow_chain( @@ -90,7 +94,7 @@ where let executor = build_executor::(&shared, &config); let execution = shared.execution; - let mut rpc_service = rpc_api::RpcService::new(&command.uri, false); + let mut rpc_service = rpc_api::RpcService::new(&command.uri, command.keep_connection); loop { let header = match subscription.next().await { diff --git a/utils/frame/try-runtime/cli/src/lib.rs b/utils/frame/try-runtime/cli/src/lib.rs index 95d8669d55958..6f2993e892736 100644 --- a/utils/frame/try-runtime/cli/src/lib.rs +++ b/utils/frame/try-runtime/cli/src/lib.rs @@ -633,7 +633,8 @@ pub(crate) async fn ensure_matching_spec(None) + match rpc_service + .get_runtime_version::(None) .await .map(|version| (String::from(version.spec_name.clone()), version.spec_version)) .map(|(spec_name, spec_version)| (spec_name.to_lowercase(), spec_version)) From ba6c503b90691a48c64bceebca9041e5a48b9d30 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Piotr=20Miko=C5=82ajczyk?= Date: Fri, 2 Sep 2022 11:59:09 +0200 Subject: [PATCH 03/24] Multithreading remark --- utils/frame/remote-externalities/src/rpc_api.rs | 2 ++ 1 file changed, 2 insertions(+) diff --git a/utils/frame/remote-externalities/src/rpc_api.rs b/utils/frame/remote-externalities/src/rpc_api.rs index 85069dd946ed7..810662532c68b 100644 --- a/utils/frame/remote-externalities/src/rpc_api.rs +++ b/utils/frame/remote-externalities/src/rpc_api.rs @@ -64,6 +64,8 @@ async fn make_request<'a, T: DeserializeOwned>( /// /// Service will connect to `uri` for the first time during the first request. Instantiation /// does not trigger connecting. +/// +/// Be careful with reusing the connection in a multithreaded environment. pub struct RpcService { uri: String, client: Option, From 469f60ca74ec0a65a6c4f5357fe3ead3aba87a7e Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Piotr=20Miko=C5=82ajczyk?= Date: Fri, 2 Sep 2022 12:21:22 +0200 Subject: [PATCH 04/24] fmt --- bin/node/cli/tests/common.rs | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/bin/node/cli/tests/common.rs b/bin/node/cli/tests/common.rs index 9e72266f06dd6..d85c7473a82d6 100644 --- a/bin/node/cli/tests/common.rs +++ b/bin/node/cli/tests/common.rs @@ -24,7 +24,7 @@ use nix::{ unistd::Pid, }; use node_primitives::Block; -use remote_externalities::rpc_api; +use remote_externalities::{rpc_api, rpc_api::RpcService}; use std::{ io::{BufRead, BufReader, Read}, ops::{Deref, DerefMut}, @@ -33,7 +33,6 @@ use std::{ time::Duration, }; use tokio::time::timeout; -use remote_externalities::rpc_api::RpcService; /// Wait for the given `child` the given number of `secs`. /// From 9a4dbf9c9aba25d6eee895d50c27e6b7123c6040 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Piotr=20Miko=C5=82ajczyk?= Date: Fri, 2 Sep 2022 12:26:26 +0200 Subject: [PATCH 05/24] O_O --- utils/frame/try-runtime/cli/src/commands/execute_block.rs | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/utils/frame/try-runtime/cli/src/commands/execute_block.rs b/utils/frame/try-runtime/cli/src/commands/execute_block.rs index 5f2163a87df82..2bc7b7d5566b4 100644 --- a/utils/frame/try-runtime/cli/src/commands/execute_block.rs +++ b/utils/frame/try-runtime/cli/src/commands/execute_block.rs @@ -89,6 +89,8 @@ impl ExecuteBlockCmd { Block::Hash: FromStr, ::Err: Debug, { + let mut rpc_service = rpc_api::RpcService::new(ws_uri, false); + match (&self.block_at, &self.state) { (Some(block_at), State::Snap { .. }) => hash_of::(block_at), (Some(block_at), State::Live { .. }) => { @@ -100,9 +102,7 @@ impl ExecuteBlockCmd { target: LOG_TARGET, "No --block-at or --at provided, using the latest finalized block instead" ); - remote_externalities::rpc_api::get_finalized_head::(ws_uri) - .await - .map_err(Into::into) + rpc_service.get_finalized_head::().await.map_err(Into::into) }, (None, State::Live { at: Some(at), .. }) => hash_of::(at), _ => { From d358bfe0bc1b251d21c31e0abc3c9ce5de8e5be8 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Piotr=20Miko=C5=82ajczyk?= Date: Fri, 2 Sep 2022 12:29:41 +0200 Subject: [PATCH 06/24] unused import --- utils/frame/remote-externalities/src/rpc_api.rs | 5 +---- 1 file changed, 1 insertion(+), 4 deletions(-) diff --git a/utils/frame/remote-externalities/src/rpc_api.rs b/utils/frame/remote-externalities/src/rpc_api.rs index 810662532c68b..6078f8fc38e13 100644 --- a/utils/frame/remote-externalities/src/rpc_api.rs +++ b/utils/frame/remote-externalities/src/rpc_api.rs @@ -25,10 +25,7 @@ use jsonrpsee::{ ws_client::{WsClient, WsClientBuilder}, }; use serde::de::DeserializeOwned; -use sp_runtime::{ - generic::SignedBlock, - traits::{Block as BlockT, Header as HeaderT}, -}; +use sp_runtime::{generic::SignedBlock, traits::Block as BlockT}; enum RpcCall { GetHeader, From 33991b3514e49631a092ffe360e6db5e53b086fd Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Piotr=20Miko=C5=82ajczyk?= Date: Fri, 2 Sep 2022 12:35:54 +0200 Subject: [PATCH 07/24] cmon --- bin/node/cli/tests/common.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/bin/node/cli/tests/common.rs b/bin/node/cli/tests/common.rs index d85c7473a82d6..d4f334d37f387 100644 --- a/bin/node/cli/tests/common.rs +++ b/bin/node/cli/tests/common.rs @@ -24,7 +24,7 @@ use nix::{ unistd::Pid, }; use node_primitives::Block; -use remote_externalities::{rpc_api, rpc_api::RpcService}; +use remote_externalities::rpc_api::RpcService; use std::{ io::{BufRead, BufReader, Read}, ops::{Deref, DerefMut}, From b903d97bfea5f743291cfa8ef42e37c324ba24f3 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Piotr=20Miko=C5=82ajczyk?= Date: Mon, 5 Sep 2022 10:57:41 +0200 Subject: [PATCH 08/24] accidental removal reverted --- utils/frame/try-runtime/cli/src/commands/follow_chain.rs | 8 ++++++++ 1 file changed, 8 insertions(+) diff --git a/utils/frame/try-runtime/cli/src/commands/follow_chain.rs b/utils/frame/try-runtime/cli/src/commands/follow_chain.rs index e088083e2a6d9..04a1b8759a660 100644 --- a/utils/frame/try-runtime/cli/src/commands/follow_chain.rs +++ b/utils/frame/try-runtime/cli/src/commands/follow_chain.rs @@ -235,6 +235,14 @@ where let mut rpc_service = rpc_api::RpcService::new(&command.uri, command.keep_connection); + let header_provider: RpcHeaderProvider = + RpcHeaderProvider { uri: command.uri.clone(), _phantom: PhantomData {} }; + let mut finalized_headers: FinalizedHeaders< + Block, + RpcHeaderProvider, + Subscription, + > = FinalizedHeaders::new(header_provider, subscription); + while let Some(header) = finalized_headers.next().await { let hash = header.hash(); let number = header.number(); From 9a1862c4e0dfc59e81aa4605921c956b9cdef1ef Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Piotr=20Miko=C5=82ajczyk?= Date: Mon, 5 Sep 2022 12:10:43 +0200 Subject: [PATCH 09/24] remove RpcHeaderProvider --- .../cli/src/commands/follow_chain.rs | 28 ++++++++----------- 1 file changed, 11 insertions(+), 17 deletions(-) diff --git a/utils/frame/try-runtime/cli/src/commands/follow_chain.rs b/utils/frame/try-runtime/cli/src/commands/follow_chain.rs index 04a1b8759a660..fbde6d212f854 100644 --- a/utils/frame/try-runtime/cli/src/commands/follow_chain.rs +++ b/utils/frame/try-runtime/cli/src/commands/follow_chain.rs @@ -34,6 +34,7 @@ use serde::de::DeserializeOwned; use sp_core::H256; use sp_runtime::traits::{Block as BlockT, Header as HeaderT, NumberFor}; use std::{collections::VecDeque, fmt::Debug, marker::PhantomData, str::FromStr}; +use remote_externalities::rpc_api::RpcService; const SUB: &str = "chain_subscribeFinalizedHeads"; const UN_SUB: &str = "chain_unsubscribeFinalizedHeads"; @@ -100,18 +101,13 @@ where async fn get_header(&mut self, hash: Block::Hash) -> Block::Header; } -struct RpcHeaderProvider { - uri: String, - _phantom: PhantomData, -} - #[async_trait] -impl HeaderProvider for RpcHeaderProvider +impl HeaderProvider for RpcService where Block::Header: DeserializeOwned, { async fn get_header(&mut self, hash: Block::Hash) -> Block::Header { - rpc_api::get_header::(&self.uri, hash).await.unwrap() + self.get_header::(hash).await.unwrap() } } @@ -152,19 +148,19 @@ where /// /// Returned headers are guaranteed to be ordered. There are no missing headers (even if some of /// them lack justification). -struct FinalizedHeaders, HS: HeaderSubscription> { - header_provider: HP, +struct FinalizedHeaders<'a, Block: BlockT, HP: HeaderProvider, HS: HeaderSubscription> { + header_provider: &'a HP, subscription: HS, fetched_headers: VecDeque, last_returned: Option<::Hash>, } -impl, HS: HeaderSubscription> +impl<'a, Block: BlockT, HP: HeaderProvider, HS: HeaderSubscription> FinalizedHeaders where ::Header: DeserializeOwned, { - pub fn new(header_provider: HP, subscription: HS) -> Self { + pub fn new(header_provider: &'a HP, subscription: HS) -> Self { Self { header_provider, subscription, @@ -233,15 +229,13 @@ where let executor = build_executor::(&shared, &config); let execution = shared.execution; - let mut rpc_service = rpc_api::RpcService::new(&command.uri, command.keep_connection); + let mut rpc_service = RpcService::new(&command.uri, command.keep_connection); - let header_provider: RpcHeaderProvider = - RpcHeaderProvider { uri: command.uri.clone(), _phantom: PhantomData {} }; let mut finalized_headers: FinalizedHeaders< Block, RpcHeaderProvider, Subscription, - > = FinalizedHeaders::new(header_provider, subscription); + > = FinalizedHeaders::new(&rpc_service, subscription); while let Some(header) = finalized_headers.next().await { let hash = header.hash(); @@ -380,7 +374,7 @@ mod tests { let provider = MockHeaderProvider(vec![].into()); let subscription = MockHeaderSubscription(heights.clone().into()); - let mut headers = FinalizedHeaders::new(provider, subscription); + let mut headers = FinalizedHeaders::new(&provider, subscription); for h in heights { assert_eq!(h, headers.next().await.unwrap().number); @@ -397,7 +391,7 @@ mod tests { let provider = MockHeaderProvider(heights_not_in_subscription.into()); let subscription = MockHeaderSubscription(heights_in_subscription.into()); - let mut headers = FinalizedHeaders::new(provider, subscription); + let mut headers = FinalizedHeaders::new(&provider, subscription); for h in all_heights { assert_eq!(h, headers.next().await.unwrap().number); From 3d0191b79fe953ea4e68c21f16c3b19abdf960a7 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Piotr=20Miko=C5=82ajczyk?= Date: Mon, 5 Sep 2022 12:34:38 +0200 Subject: [PATCH 10/24] mut refs --- .../try-runtime/cli/src/commands/follow_chain.rs | 14 +++++++------- 1 file changed, 7 insertions(+), 7 deletions(-) diff --git a/utils/frame/try-runtime/cli/src/commands/follow_chain.rs b/utils/frame/try-runtime/cli/src/commands/follow_chain.rs index fbde6d212f854..e347419f3ed68 100644 --- a/utils/frame/try-runtime/cli/src/commands/follow_chain.rs +++ b/utils/frame/try-runtime/cli/src/commands/follow_chain.rs @@ -149,7 +149,7 @@ where /// Returned headers are guaranteed to be ordered. There are no missing headers (even if some of /// them lack justification). struct FinalizedHeaders<'a, Block: BlockT, HP: HeaderProvider, HS: HeaderSubscription> { - header_provider: &'a HP, + header_provider: &'a mut HP, subscription: HS, fetched_headers: VecDeque, last_returned: Option<::Hash>, @@ -160,7 +160,7 @@ impl<'a, Block: BlockT, HP: HeaderProvider, HS: HeaderSubscription where ::Header: DeserializeOwned, { - pub fn new(header_provider: &'a HP, subscription: HS) -> Self { + pub fn new(header_provider: &'a mut HP, subscription: HS) -> Self { Self { header_provider, subscription, @@ -235,7 +235,7 @@ where Block, RpcHeaderProvider, Subscription, - > = FinalizedHeaders::new(&rpc_service, subscription); + > = FinalizedHeaders::new(&mut rpc_service, subscription); while let Some(header) = finalized_headers.next().await { let hash = header.hash(); @@ -372,9 +372,9 @@ mod tests { async fn finalized_headers_works_when_every_block_comes_from_subscription() { let heights = vec![4, 5, 6, 7]; - let provider = MockHeaderProvider(vec![].into()); + let mut provider = MockHeaderProvider(vec![].into()); let subscription = MockHeaderSubscription(heights.clone().into()); - let mut headers = FinalizedHeaders::new(&provider, subscription); + let mut headers = FinalizedHeaders::new(&mut provider, subscription); for h in heights { assert_eq!(h, headers.next().await.unwrap().number); @@ -389,9 +389,9 @@ mod tests { // Consecutive headers will be requested in the reversed order. let heights_not_in_subscription = vec![5, 9, 8, 7]; - let provider = MockHeaderProvider(heights_not_in_subscription.into()); + let mut provider = MockHeaderProvider(heights_not_in_subscription.into()); let subscription = MockHeaderSubscription(heights_in_subscription.into()); - let mut headers = FinalizedHeaders::new(&provider, subscription); + let mut headers = FinalizedHeaders::new(&mut provider, subscription); for h in all_heights { assert_eq!(h, headers.next().await.unwrap().number); From 6fd11dad781f1866265daa839c7a5df7d87c806b Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Piotr=20Miko=C5=82ajczyk?= Date: Mon, 5 Sep 2022 12:40:28 +0200 Subject: [PATCH 11/24] fmt --- .../cli/src/commands/follow_chain.rs | 17 +++++++---------- 1 file changed, 7 insertions(+), 10 deletions(-) diff --git a/utils/frame/try-runtime/cli/src/commands/follow_chain.rs b/utils/frame/try-runtime/cli/src/commands/follow_chain.rs index e347419f3ed68..ca5e7139fa6ac 100644 --- a/utils/frame/try-runtime/cli/src/commands/follow_chain.rs +++ b/utils/frame/try-runtime/cli/src/commands/follow_chain.rs @@ -27,14 +27,13 @@ use jsonrpsee::{ ws_client::WsClientBuilder, }; use parity_scale_codec::{Decode, Encode}; -use remote_externalities::{rpc_api, Builder, Mode, OnlineConfig}; +use remote_externalities::{rpc_api::RpcService, Builder, Mode, OnlineConfig}; use sc_executor::NativeExecutionDispatch; use sc_service::Configuration; use serde::de::DeserializeOwned; use sp_core::H256; use sp_runtime::traits::{Block as BlockT, Header as HeaderT, NumberFor}; -use std::{collections::VecDeque, fmt::Debug, marker::PhantomData, str::FromStr}; -use remote_externalities::rpc_api::RpcService; +use std::{collections::VecDeque, fmt::Debug, str::FromStr}; const SUB: &str = "chain_subscribeFinalizedHeads"; const UN_SUB: &str = "chain_unsubscribeFinalizedHeads"; @@ -148,7 +147,8 @@ where /// /// Returned headers are guaranteed to be ordered. There are no missing headers (even if some of /// them lack justification). -struct FinalizedHeaders<'a, Block: BlockT, HP: HeaderProvider, HS: HeaderSubscription> { +struct FinalizedHeaders<'a, Block: BlockT, HP: HeaderProvider, HS: HeaderSubscription> +{ header_provider: &'a mut HP, subscription: HS, fetched_headers: VecDeque, @@ -156,7 +156,7 @@ struct FinalizedHeaders<'a, Block: BlockT, HP: HeaderProvider, HS: Header } impl<'a, Block: BlockT, HP: HeaderProvider, HS: HeaderSubscription> - FinalizedHeaders + FinalizedHeaders<'a, Block, HP, HS> where ::Header: DeserializeOwned, { @@ -231,11 +231,8 @@ where let mut rpc_service = RpcService::new(&command.uri, command.keep_connection); - let mut finalized_headers: FinalizedHeaders< - Block, - RpcHeaderProvider, - Subscription, - > = FinalizedHeaders::new(&mut rpc_service, subscription); + let mut finalized_headers: FinalizedHeaders> = + FinalizedHeaders::new(&mut rpc_service, subscription); while let Some(header) = finalized_headers.next().await { let hash = header.hash(); From 53607bcc799183ffab1e37091620175bd80c4da9 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Piotr=20Miko=C5=82ajczyk?= Date: Mon, 5 Sep 2022 13:10:32 +0200 Subject: [PATCH 12/24] no mutability --- .../frame/remote-externalities/src/rpc_api.rs | 19 +++++++------- .../cli/src/commands/follow_chain.rs | 25 ++++++++++--------- 2 files changed, 23 insertions(+), 21 deletions(-) diff --git a/utils/frame/remote-externalities/src/rpc_api.rs b/utils/frame/remote-externalities/src/rpc_api.rs index 6078f8fc38e13..c18957074d42f 100644 --- a/utils/frame/remote-externalities/src/rpc_api.rs +++ b/utils/frame/remote-externalities/src/rpc_api.rs @@ -26,6 +26,7 @@ use jsonrpsee::{ }; use serde::de::DeserializeOwned; use sp_runtime::{generic::SignedBlock, traits::Block as BlockT}; +use std::cell::Cell; enum RpcCall { GetHeader, @@ -65,7 +66,7 @@ async fn make_request<'a, T: DeserializeOwned>( /// Be careful with reusing the connection in a multithreaded environment. pub struct RpcService { uri: String, - client: Option, + client: Cell>, keep_connection: bool, } @@ -74,7 +75,7 @@ impl RpcService { /// /// Does not connect yet. pub fn new>(uri: S, keep_connection: bool) -> Self { - Self { uri: uri.as_ref().to_string(), client: None, keep_connection } + Self { uri: uri.as_ref().to_string(), client: Cell::new(None), keep_connection } } /// Returns the address at which requests are sent. @@ -98,18 +99,18 @@ impl RpcService { /// Generic method for making RPC requests. async fn make_request<'a, T: DeserializeOwned>( - &mut self, + &self, call: RpcCall, params: Option>, ) -> Result { - match &self.client { + match &self.client.get() { // `self.keep_connection` must be `true. Some(ref client) => make_request(client, call, params).await, None => { let client = self.build_client().await?; let result = make_request(&client, call, params).await; if self.keep_connection { - self.client = Some(client) + self.client.set(Some(client)) }; result }, @@ -117,7 +118,7 @@ impl RpcService { } /// Get the header of the block identified by `at`. - pub async fn get_header(&mut self, at: Block::Hash) -> Result + pub async fn get_header(&self, at: Block::Hash) -> Result where Block: BlockT, Block::Header: DeserializeOwned, @@ -126,13 +127,13 @@ impl RpcService { } /// Get the finalized head. - pub async fn get_finalized_head(&mut self) -> Result { + pub async fn get_finalized_head(&self) -> Result { self.make_request(RpcCall::GetFinalizedHead, None).await } /// Get the signed block identified by `at`. pub async fn get_block( - &mut self, + &self, at: Block::Hash, ) -> Result { Ok(self @@ -143,7 +144,7 @@ impl RpcService { /// Get the runtime version of a given chain. pub async fn get_runtime_version( - &mut self, + &self, at: Option, ) -> Result { self.make_request(RpcCall::GetRuntimeVersion, rpc_params!(at)).await diff --git a/utils/frame/try-runtime/cli/src/commands/follow_chain.rs b/utils/frame/try-runtime/cli/src/commands/follow_chain.rs index ca5e7139fa6ac..922b80301c2c6 100644 --- a/utils/frame/try-runtime/cli/src/commands/follow_chain.rs +++ b/utils/frame/try-runtime/cli/src/commands/follow_chain.rs @@ -97,7 +97,7 @@ where Block::Header: HeaderT, { /// Awaits for the header of the block with hash `hash`. - async fn get_header(&mut self, hash: Block::Hash) -> Block::Header; + async fn get_header(&self, hash: Block::Hash) -> Block::Header; } #[async_trait] @@ -105,7 +105,7 @@ impl HeaderProvider for RpcService where Block::Header: DeserializeOwned, { - async fn get_header(&mut self, hash: Block::Hash) -> Block::Header { + async fn get_header(&self, hash: Block::Hash) -> Block::Header { self.get_header::(hash).await.unwrap() } } @@ -149,7 +149,7 @@ where /// them lack justification). struct FinalizedHeaders<'a, Block: BlockT, HP: HeaderProvider, HS: HeaderSubscription> { - header_provider: &'a mut HP, + header_provider: &'a HP, subscription: HS, fetched_headers: VecDeque, last_returned: Option<::Hash>, @@ -160,7 +160,7 @@ impl<'a, Block: BlockT, HP: HeaderProvider, HS: HeaderSubscription where ::Header: DeserializeOwned, { - pub fn new(header_provider: &'a mut HP, subscription: HS) -> Self { + pub fn new(header_provider: &'a HP, subscription: HS) -> Self { Self { header_provider, subscription, @@ -232,7 +232,7 @@ where let mut rpc_service = RpcService::new(&command.uri, command.keep_connection); let mut finalized_headers: FinalizedHeaders> = - FinalizedHeaders::new(&mut rpc_service, subscription); + FinalizedHeaders::new(&rpc_service, subscription); while let Some(header) = finalized_headers.next().await { let hash = header.hash(); @@ -330,12 +330,13 @@ where mod tests { use super::*; use sp_runtime::testing::{Block as TBlock, ExtrinsicWrapper, Header}; + use std::cell::Cell; type Block = TBlock>; type BlockNumber = u64; type Hash = H256; - struct MockHeaderProvider(pub VecDeque); + struct MockHeaderProvider(pub Cell>); fn headers() -> Vec
{ let mut headers = vec![Header::new_from_number(0)]; @@ -350,8 +351,8 @@ mod tests { #[async_trait] impl HeaderProvider for MockHeaderProvider { - async fn get_header(&mut self, _hash: Hash) -> Header { - let height = self.0.pop_front().unwrap(); + async fn get_header(&self, _hash: Hash) -> Header { + let height = self.0.get().pop_front().unwrap(); headers()[height as usize].clone() } } @@ -369,9 +370,9 @@ mod tests { async fn finalized_headers_works_when_every_block_comes_from_subscription() { let heights = vec![4, 5, 6, 7]; - let mut provider = MockHeaderProvider(vec![].into()); + let provider = MockHeaderProvider(Cell::new(vec![].into())); let subscription = MockHeaderSubscription(heights.clone().into()); - let mut headers = FinalizedHeaders::new(&mut provider, subscription); + let mut headers = FinalizedHeaders::new(&provider, subscription); for h in heights { assert_eq!(h, headers.next().await.unwrap().number); @@ -386,9 +387,9 @@ mod tests { // Consecutive headers will be requested in the reversed order. let heights_not_in_subscription = vec![5, 9, 8, 7]; - let mut provider = MockHeaderProvider(heights_not_in_subscription.into()); + let provider = MockHeaderProvider(Cell::new(heights_not_in_subscription.into())); let subscription = MockHeaderSubscription(heights_in_subscription.into()); - let mut headers = FinalizedHeaders::new(&mut provider, subscription); + let mut headers = FinalizedHeaders::new(&provider, subscription); for h in all_heights { assert_eq!(h, headers.next().await.unwrap().number); From 848003fc98e8a2d9dd4bebcf19604691f4f0a854 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Piotr=20Miko=C5=82ajczyk?= Date: Mon, 5 Sep 2022 13:37:27 +0200 Subject: [PATCH 13/24] now? --- utils/frame/remote-externalities/src/rpc_api.rs | 11 ++++++----- 1 file changed, 6 insertions(+), 5 deletions(-) diff --git a/utils/frame/remote-externalities/src/rpc_api.rs b/utils/frame/remote-externalities/src/rpc_api.rs index c18957074d42f..18910a21c7eb0 100644 --- a/utils/frame/remote-externalities/src/rpc_api.rs +++ b/utils/frame/remote-externalities/src/rpc_api.rs @@ -26,7 +26,7 @@ use jsonrpsee::{ }; use serde::de::DeserializeOwned; use sp_runtime::{generic::SignedBlock, traits::Block as BlockT}; -use std::cell::Cell; +use std::cell::RefCell; enum RpcCall { GetHeader, @@ -66,7 +66,7 @@ async fn make_request<'a, T: DeserializeOwned>( /// Be careful with reusing the connection in a multithreaded environment. pub struct RpcService { uri: String, - client: Cell>, + client: RefCell>, keep_connection: bool, } @@ -75,7 +75,7 @@ impl RpcService { /// /// Does not connect yet. pub fn new>(uri: S, keep_connection: bool) -> Self { - Self { uri: uri.as_ref().to_string(), client: Cell::new(None), keep_connection } + Self { uri: uri.as_ref().to_string(), client: RefCell::new(None), keep_connection } } /// Returns the address at which requests are sent. @@ -103,14 +103,15 @@ impl RpcService { call: RpcCall, params: Option>, ) -> Result { - match &self.client.get() { + let maybe_client = self.client.borrow_mut(); + match maybe_client { // `self.keep_connection` must be `true. Some(ref client) => make_request(client, call, params).await, None => { let client = self.build_client().await?; let result = make_request(&client, call, params).await; if self.keep_connection { - self.client.set(Some(client)) + *maybe_client = Some(client) }; result }, From 6bcfbd75fb42b2ed5f7d682c24955597627debf2 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Piotr=20Miko=C5=82ajczyk?= Date: Mon, 5 Sep 2022 13:45:28 +0200 Subject: [PATCH 14/24] now? --- utils/frame/remote-externalities/src/rpc_api.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/utils/frame/remote-externalities/src/rpc_api.rs b/utils/frame/remote-externalities/src/rpc_api.rs index 18910a21c7eb0..216a73ab33b2e 100644 --- a/utils/frame/remote-externalities/src/rpc_api.rs +++ b/utils/frame/remote-externalities/src/rpc_api.rs @@ -103,8 +103,8 @@ impl RpcService { call: RpcCall, params: Option>, ) -> Result { - let maybe_client = self.client.borrow_mut(); - match maybe_client { + let mut maybe_client = self.client.borrow_mut(); + match *maybe_client { // `self.keep_connection` must be `true. Some(ref client) => make_request(client, call, params).await, None => { From 0648e6faf49705caf2ac3d2019b56260020852cb Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Piotr=20Miko=C5=82ajczyk?= Date: Mon, 5 Sep 2022 14:01:03 +0200 Subject: [PATCH 15/24] arc mutex --- utils/frame/remote-externalities/src/rpc_api.rs | 8 ++++---- utils/frame/try-runtime/cli/src/commands/execute_block.rs | 4 ++-- utils/frame/try-runtime/cli/src/commands/follow_chain.rs | 2 +- .../frame/try-runtime/cli/src/commands/offchain_worker.rs | 2 +- utils/frame/try-runtime/cli/src/lib.rs | 2 +- 5 files changed, 9 insertions(+), 9 deletions(-) diff --git a/utils/frame/remote-externalities/src/rpc_api.rs b/utils/frame/remote-externalities/src/rpc_api.rs index 216a73ab33b2e..3dca206012064 100644 --- a/utils/frame/remote-externalities/src/rpc_api.rs +++ b/utils/frame/remote-externalities/src/rpc_api.rs @@ -26,7 +26,7 @@ use jsonrpsee::{ }; use serde::de::DeserializeOwned; use sp_runtime::{generic::SignedBlock, traits::Block as BlockT}; -use std::cell::RefCell; +use std::sync::{Arc, Mutex}; enum RpcCall { GetHeader, @@ -66,7 +66,7 @@ async fn make_request<'a, T: DeserializeOwned>( /// Be careful with reusing the connection in a multithreaded environment. pub struct RpcService { uri: String, - client: RefCell>, + client: Arc>>, keep_connection: bool, } @@ -75,7 +75,7 @@ impl RpcService { /// /// Does not connect yet. pub fn new>(uri: S, keep_connection: bool) -> Self { - Self { uri: uri.as_ref().to_string(), client: RefCell::new(None), keep_connection } + Self { uri: uri.as_ref().to_string(), client: Arc::new(Mutex::new(None)), keep_connection } } /// Returns the address at which requests are sent. @@ -103,7 +103,7 @@ impl RpcService { call: RpcCall, params: Option>, ) -> Result { - let mut maybe_client = self.client.borrow_mut(); + let mut maybe_client = self.client.lock().unwrap(); match *maybe_client { // `self.keep_connection` must be `true. Some(ref client) => make_request(client, call, params).await, diff --git a/utils/frame/try-runtime/cli/src/commands/execute_block.rs b/utils/frame/try-runtime/cli/src/commands/execute_block.rs index 2bc7b7d5566b4..f3a0a40fbef2b 100644 --- a/utils/frame/try-runtime/cli/src/commands/execute_block.rs +++ b/utils/frame/try-runtime/cli/src/commands/execute_block.rs @@ -89,7 +89,7 @@ impl ExecuteBlockCmd { Block::Hash: FromStr, ::Err: Debug, { - let mut rpc_service = rpc_api::RpcService::new(ws_uri, false); + let rpc_service = rpc_api::RpcService::new(ws_uri, false); match (&self.block_at, &self.state) { (Some(block_at), State::Snap { .. }) => hash_of::(block_at), @@ -148,7 +148,7 @@ where let block_ws_uri = command.block_ws_uri::(); let block_at = command.block_at::(block_ws_uri.clone()).await?; - let mut rpc_service = rpc_api::RpcService::new(block_ws_uri.clone(), false); + let rpc_service = rpc_api::RpcService::new(block_ws_uri.clone(), false); let block: Block = rpc_service.get_block::(block_at).await?; let parent_hash = block.header().parent_hash(); log::info!( diff --git a/utils/frame/try-runtime/cli/src/commands/follow_chain.rs b/utils/frame/try-runtime/cli/src/commands/follow_chain.rs index 922b80301c2c6..8d0be0d7a68f9 100644 --- a/utils/frame/try-runtime/cli/src/commands/follow_chain.rs +++ b/utils/frame/try-runtime/cli/src/commands/follow_chain.rs @@ -229,7 +229,7 @@ where let executor = build_executor::(&shared, &config); let execution = shared.execution; - let mut rpc_service = RpcService::new(&command.uri, command.keep_connection); + let rpc_service = RpcService::new(&command.uri, command.keep_connection); let mut finalized_headers: FinalizedHeaders> = FinalizedHeaders::new(&rpc_service, subscription); diff --git a/utils/frame/try-runtime/cli/src/commands/offchain_worker.rs b/utils/frame/try-runtime/cli/src/commands/offchain_worker.rs index bc92cfd5eee14..b269649442ab7 100644 --- a/utils/frame/try-runtime/cli/src/commands/offchain_worker.rs +++ b/utils/frame/try-runtime/cli/src/commands/offchain_worker.rs @@ -119,7 +119,7 @@ where let header_at = command.header_at::()?; let header_ws_uri = command.header_ws_uri::(); - let mut rpc_service = rpc_api::RpcService::new(header_ws_uri.clone(), false); + let rpc_service = rpc_api::RpcService::new(header_ws_uri.clone(), false); let header = rpc_service.get_header::(header_at).await?; log::info!( target: LOG_TARGET, diff --git a/utils/frame/try-runtime/cli/src/lib.rs b/utils/frame/try-runtime/cli/src/lib.rs index 6f2993e892736..13257bcbe6982 100644 --- a/utils/frame/try-runtime/cli/src/lib.rs +++ b/utils/frame/try-runtime/cli/src/lib.rs @@ -632,7 +632,7 @@ pub(crate) async fn ensure_matching_spec(None) .await From 757eedd4ac48a938ac4d839b25881078a8fab7d0 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Piotr=20Miko=C5=82ajczyk?= Date: Mon, 5 Sep 2022 14:43:52 +0200 Subject: [PATCH 16/24] async mutex --- Cargo.lock | 1 + utils/frame/remote-externalities/Cargo.toml | 1 + utils/frame/remote-externalities/src/rpc_api.rs | 5 +++-- .../frame/try-runtime/cli/src/commands/follow_chain.rs | 10 +++++----- 4 files changed, 10 insertions(+), 7 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index f6c2df81d6ee1..6e377a3518e1c 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -7445,6 +7445,7 @@ version = "0.10.0-dev" dependencies = [ "env_logger", "frame-support", + "futures", "jsonrpsee", "log", "pallet-elections-phragmen", diff --git a/utils/frame/remote-externalities/Cargo.toml b/utils/frame/remote-externalities/Cargo.toml index 3121157df68d8..9f22617b020e3 100644 --- a/utils/frame/remote-externalities/Cargo.toml +++ b/utils/frame/remote-externalities/Cargo.toml @@ -15,6 +15,7 @@ targets = ["x86_64-unknown-linux-gnu"] [dependencies] codec = { package = "parity-scale-codec", version = "3.0.0" } env_logger = "0.9" +futures = "0.3.21" jsonrpsee = { version = "0.15.1", features = ["ws-client", "macros"] } log = "0.4.17" serde = "1.0.136" diff --git a/utils/frame/remote-externalities/src/rpc_api.rs b/utils/frame/remote-externalities/src/rpc_api.rs index 3dca206012064..8573120c04274 100644 --- a/utils/frame/remote-externalities/src/rpc_api.rs +++ b/utils/frame/remote-externalities/src/rpc_api.rs @@ -26,7 +26,8 @@ use jsonrpsee::{ }; use serde::de::DeserializeOwned; use sp_runtime::{generic::SignedBlock, traits::Block as BlockT}; -use std::sync::{Arc, Mutex}; +use std::sync::Arc; +use futures::lock::Mutex; enum RpcCall { GetHeader, @@ -103,7 +104,7 @@ impl RpcService { call: RpcCall, params: Option>, ) -> Result { - let mut maybe_client = self.client.lock().unwrap(); + let mut maybe_client = self.client.lock().await; match *maybe_client { // `self.keep_connection` must be `true. Some(ref client) => make_request(client, call, params).await, diff --git a/utils/frame/try-runtime/cli/src/commands/follow_chain.rs b/utils/frame/try-runtime/cli/src/commands/follow_chain.rs index 8d0be0d7a68f9..3e165de69941d 100644 --- a/utils/frame/try-runtime/cli/src/commands/follow_chain.rs +++ b/utils/frame/try-runtime/cli/src/commands/follow_chain.rs @@ -330,13 +330,13 @@ where mod tests { use super::*; use sp_runtime::testing::{Block as TBlock, ExtrinsicWrapper, Header}; - use std::cell::Cell; + use std::cell::RefCell; type Block = TBlock>; type BlockNumber = u64; type Hash = H256; - struct MockHeaderProvider(pub Cell>); + struct MockHeaderProvider(pub RefCell>); fn headers() -> Vec
{ let mut headers = vec![Header::new_from_number(0)]; @@ -352,7 +352,7 @@ mod tests { #[async_trait] impl HeaderProvider for MockHeaderProvider { async fn get_header(&self, _hash: Hash) -> Header { - let height = self.0.get().pop_front().unwrap(); + let height = self.0.borrow().pop_front().unwrap(); headers()[height as usize].clone() } } @@ -370,7 +370,7 @@ mod tests { async fn finalized_headers_works_when_every_block_comes_from_subscription() { let heights = vec![4, 5, 6, 7]; - let provider = MockHeaderProvider(Cell::new(vec![].into())); + let provider = MockHeaderProvider(RefCell::new(vec![].into())); let subscription = MockHeaderSubscription(heights.clone().into()); let mut headers = FinalizedHeaders::new(&provider, subscription); @@ -387,7 +387,7 @@ mod tests { // Consecutive headers will be requested in the reversed order. let heights_not_in_subscription = vec![5, 9, 8, 7]; - let provider = MockHeaderProvider(Cell::new(heights_not_in_subscription.into())); + let provider = MockHeaderProvider(RefCell::new(heights_not_in_subscription.into())); let subscription = MockHeaderSubscription(heights_in_subscription.into()); let mut headers = FinalizedHeaders::new(&provider, subscription); From 6c248df5e4c7f96757b69f2c2d9e6371d1be5e5c Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Piotr=20Miko=C5=82ajczyk?= Date: Mon, 5 Sep 2022 15:09:14 +0200 Subject: [PATCH 17/24] async mutex --- utils/frame/remote-externalities/src/rpc_api.rs | 2 +- .../try-runtime/cli/src/commands/follow_chain.rs | 11 ++++++----- 2 files changed, 7 insertions(+), 6 deletions(-) diff --git a/utils/frame/remote-externalities/src/rpc_api.rs b/utils/frame/remote-externalities/src/rpc_api.rs index 8573120c04274..2822e5625306f 100644 --- a/utils/frame/remote-externalities/src/rpc_api.rs +++ b/utils/frame/remote-externalities/src/rpc_api.rs @@ -18,6 +18,7 @@ //! WS RPC API for one off RPC calls to a substrate node. // TODO: Consolidate one off RPC calls https://github.com/paritytech/substrate/issues/8988 +use futures::lock::Mutex; use jsonrpsee::{ core::client::{Client, ClientT}, rpc_params, @@ -27,7 +28,6 @@ use jsonrpsee::{ use serde::de::DeserializeOwned; use sp_runtime::{generic::SignedBlock, traits::Block as BlockT}; use std::sync::Arc; -use futures::lock::Mutex; enum RpcCall { GetHeader, diff --git a/utils/frame/try-runtime/cli/src/commands/follow_chain.rs b/utils/frame/try-runtime/cli/src/commands/follow_chain.rs index 3e165de69941d..695ff8324ec64 100644 --- a/utils/frame/try-runtime/cli/src/commands/follow_chain.rs +++ b/utils/frame/try-runtime/cli/src/commands/follow_chain.rs @@ -330,13 +330,14 @@ where mod tests { use super::*; use sp_runtime::testing::{Block as TBlock, ExtrinsicWrapper, Header}; - use std::cell::RefCell; + use std::sync::Arc; + use tokio::sync::Mutex; type Block = TBlock>; type BlockNumber = u64; type Hash = H256; - struct MockHeaderProvider(pub RefCell>); + struct MockHeaderProvider(pub Arc>>); fn headers() -> Vec
{ let mut headers = vec![Header::new_from_number(0)]; @@ -352,7 +353,7 @@ mod tests { #[async_trait] impl HeaderProvider for MockHeaderProvider { async fn get_header(&self, _hash: Hash) -> Header { - let height = self.0.borrow().pop_front().unwrap(); + let height = self.0.lock().await.pop_front().unwrap(); headers()[height as usize].clone() } } @@ -370,7 +371,7 @@ mod tests { async fn finalized_headers_works_when_every_block_comes_from_subscription() { let heights = vec![4, 5, 6, 7]; - let provider = MockHeaderProvider(RefCell::new(vec![].into())); + let provider = MockHeaderProvider(Default::default()); let subscription = MockHeaderSubscription(heights.clone().into()); let mut headers = FinalizedHeaders::new(&provider, subscription); @@ -387,7 +388,7 @@ mod tests { // Consecutive headers will be requested in the reversed order. let heights_not_in_subscription = vec![5, 9, 8, 7]; - let provider = MockHeaderProvider(RefCell::new(heights_not_in_subscription.into())); + let provider = MockHeaderProvider(Arc::new(Mutex::new(heights_not_in_subscription.into()))); let subscription = MockHeaderSubscription(heights_in_subscription.into()); let mut headers = FinalizedHeaders::new(&provider, subscription); From 0276f84ef76fa2e21428b40d9a5240dd29ed5847 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Piotr=20Miko=C5=82ajczyk?= Date: Mon, 5 Sep 2022 15:14:45 +0200 Subject: [PATCH 18/24] uhm --- bin/node/cli/tests/common.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/bin/node/cli/tests/common.rs b/bin/node/cli/tests/common.rs index d4f334d37f387..69f7f04de2d15 100644 --- a/bin/node/cli/tests/common.rs +++ b/bin/node/cli/tests/common.rs @@ -71,7 +71,7 @@ pub async fn wait_n_finalized_blocks( pub async fn wait_n_finalized_blocks_from(n: usize, url: &str) { let mut built_blocks = std::collections::HashSet::new(); let mut interval = tokio::time::interval(Duration::from_secs(2)); - let mut rpc_service = RpcService::new(url, false); + let rpc_service = RpcService::new(url, false); loop { if let Ok(block) = rpc_service.get_finalized_head::().await { From 707fbbbe9caa0068a1ff56bcf51b23fab4fb2842 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Piotr=20Miko=C5=82ajczyk?= Date: Mon, 5 Sep 2022 15:49:32 +0200 Subject: [PATCH 19/24] connect in constructor --- bin/node/cli/tests/common.rs | 2 +- .../frame/remote-externalities/src/rpc_api.rs | 36 +++++++------------ .../cli/src/commands/execute_block.rs | 4 +-- .../cli/src/commands/follow_chain.rs | 2 +- .../cli/src/commands/offchain_worker.rs | 2 +- utils/frame/try-runtime/cli/src/lib.rs | 2 +- 6 files changed, 18 insertions(+), 30 deletions(-) diff --git a/bin/node/cli/tests/common.rs b/bin/node/cli/tests/common.rs index 69f7f04de2d15..df04da2b809dd 100644 --- a/bin/node/cli/tests/common.rs +++ b/bin/node/cli/tests/common.rs @@ -71,7 +71,7 @@ pub async fn wait_n_finalized_blocks( pub async fn wait_n_finalized_blocks_from(n: usize, url: &str) { let mut built_blocks = std::collections::HashSet::new(); let mut interval = tokio::time::interval(Duration::from_secs(2)); - let rpc_service = RpcService::new(url, false); + let rpc_service = RpcService::new(url, false).await; loop { if let Ok(block) = rpc_service.get_finalized_head::().await { diff --git a/utils/frame/remote-externalities/src/rpc_api.rs b/utils/frame/remote-externalities/src/rpc_api.rs index 2822e5625306f..438d715909c67 100644 --- a/utils/frame/remote-externalities/src/rpc_api.rs +++ b/utils/frame/remote-externalities/src/rpc_api.rs @@ -61,22 +61,20 @@ async fn make_request<'a, T: DeserializeOwned>( /// Simple RPC service that is capable of keeping the connection. /// -/// Service will connect to `uri` for the first time during the first request. Instantiation -/// does not trigger connecting. +/// Service will connect to `uri` for the first time already during initialization. /// /// Be careful with reusing the connection in a multithreaded environment. pub struct RpcService { uri: String, - client: Arc>>, - keep_connection: bool, + client: Arc>, } impl RpcService { - /// Creates a new RPC service. - /// - /// Does not connect yet. - pub fn new>(uri: S, keep_connection: bool) -> Self { - Self { uri: uri.as_ref().to_string(), client: Arc::new(Mutex::new(None)), keep_connection } + /// Creates a new RPC service. Connects to `uri` right away. + pub async fn new>(uri: S, keep_connection: bool) -> Self { + let maybe_client = keep_connection + .then_some(Self::build_client(uri).await.expect("`RpcService` failed to connect")); + Self { uri: uri.as_ref().to_string(), client: Arc::new(maybe_client) } } /// Returns the address at which requests are sent. @@ -84,16 +82,11 @@ impl RpcService { self.uri.clone() } - /// Whether to keep and reuse a single connection. - pub fn keep_connection(&self) -> bool { - self.keep_connection - } - /// Build a websocket client that connects to `self.uri`. - async fn build_client(&self) -> Result { + async fn build_client>(uri: S) -> Result { WsClientBuilder::default() .max_request_body_size(u32::MAX) - .build(&self.uri) + .build(uri) .await .map_err(|e| format!("`WsClientBuilder` failed to build: {:?}", e)) } @@ -104,17 +97,12 @@ impl RpcService { call: RpcCall, params: Option>, ) -> Result { - let mut maybe_client = self.client.lock().await; - match *maybe_client { - // `self.keep_connection` must be `true. + match self.client { + // `self.keep_connection` must have been `true`. Some(ref client) => make_request(client, call, params).await, None => { let client = self.build_client().await?; - let result = make_request(&client, call, params).await; - if self.keep_connection { - *maybe_client = Some(client) - }; - result + make_request(&client, call, params).await }, } } diff --git a/utils/frame/try-runtime/cli/src/commands/execute_block.rs b/utils/frame/try-runtime/cli/src/commands/execute_block.rs index f3a0a40fbef2b..850b2f67ad968 100644 --- a/utils/frame/try-runtime/cli/src/commands/execute_block.rs +++ b/utils/frame/try-runtime/cli/src/commands/execute_block.rs @@ -89,7 +89,7 @@ impl ExecuteBlockCmd { Block::Hash: FromStr, ::Err: Debug, { - let rpc_service = rpc_api::RpcService::new(ws_uri, false); + let rpc_service = rpc_api::RpcService::new(ws_uri, false).await; match (&self.block_at, &self.state) { (Some(block_at), State::Snap { .. }) => hash_of::(block_at), @@ -148,7 +148,7 @@ where let block_ws_uri = command.block_ws_uri::(); let block_at = command.block_at::(block_ws_uri.clone()).await?; - let rpc_service = rpc_api::RpcService::new(block_ws_uri.clone(), false); + let rpc_service = rpc_api::RpcService::new(block_ws_uri.clone(), false).await; let block: Block = rpc_service.get_block::(block_at).await?; let parent_hash = block.header().parent_hash(); log::info!( diff --git a/utils/frame/try-runtime/cli/src/commands/follow_chain.rs b/utils/frame/try-runtime/cli/src/commands/follow_chain.rs index 695ff8324ec64..4f8c66291b8f0 100644 --- a/utils/frame/try-runtime/cli/src/commands/follow_chain.rs +++ b/utils/frame/try-runtime/cli/src/commands/follow_chain.rs @@ -229,7 +229,7 @@ where let executor = build_executor::(&shared, &config); let execution = shared.execution; - let rpc_service = RpcService::new(&command.uri, command.keep_connection); + let rpc_service = RpcService::new(&command.uri, command.keep_connection).await; let mut finalized_headers: FinalizedHeaders> = FinalizedHeaders::new(&rpc_service, subscription); diff --git a/utils/frame/try-runtime/cli/src/commands/offchain_worker.rs b/utils/frame/try-runtime/cli/src/commands/offchain_worker.rs index b269649442ab7..b5816358676ab 100644 --- a/utils/frame/try-runtime/cli/src/commands/offchain_worker.rs +++ b/utils/frame/try-runtime/cli/src/commands/offchain_worker.rs @@ -119,7 +119,7 @@ where let header_at = command.header_at::()?; let header_ws_uri = command.header_ws_uri::(); - let rpc_service = rpc_api::RpcService::new(header_ws_uri.clone(), false); + let rpc_service = rpc_api::RpcService::new(header_ws_uri.clone(), false).await; let header = rpc_service.get_header::(header_at).await?; log::info!( target: LOG_TARGET, diff --git a/utils/frame/try-runtime/cli/src/lib.rs b/utils/frame/try-runtime/cli/src/lib.rs index 13257bcbe6982..1f1303c772039 100644 --- a/utils/frame/try-runtime/cli/src/lib.rs +++ b/utils/frame/try-runtime/cli/src/lib.rs @@ -632,7 +632,7 @@ pub(crate) async fn ensure_matching_spec(None) .await From 96bd04756a1b97bb271982a18bb24299afa4d827 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Piotr=20Miko=C5=82ajczyk?= Date: Mon, 5 Sep 2022 15:51:57 +0200 Subject: [PATCH 20/24] remove dep --- Cargo.lock | 1 - utils/frame/remote-externalities/Cargo.toml | 1 - 2 files changed, 2 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 6e377a3518e1c..f6c2df81d6ee1 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -7445,7 +7445,6 @@ version = "0.10.0-dev" dependencies = [ "env_logger", "frame-support", - "futures", "jsonrpsee", "log", "pallet-elections-phragmen", diff --git a/utils/frame/remote-externalities/Cargo.toml b/utils/frame/remote-externalities/Cargo.toml index 9f22617b020e3..3121157df68d8 100644 --- a/utils/frame/remote-externalities/Cargo.toml +++ b/utils/frame/remote-externalities/Cargo.toml @@ -15,7 +15,6 @@ targets = ["x86_64-unknown-linux-gnu"] [dependencies] codec = { package = "parity-scale-codec", version = "3.0.0" } env_logger = "0.9" -futures = "0.3.21" jsonrpsee = { version = "0.15.1", features = ["ws-client", "macros"] } log = "0.4.17" serde = "1.0.136" From c7d22f21e900c1b2083076da95fd0e2f0a380984 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Piotr=20Miko=C5=82ajczyk?= Date: Mon, 5 Sep 2022 15:54:33 +0200 Subject: [PATCH 21/24] old import --- utils/frame/remote-externalities/src/rpc_api.rs | 1 - 1 file changed, 1 deletion(-) diff --git a/utils/frame/remote-externalities/src/rpc_api.rs b/utils/frame/remote-externalities/src/rpc_api.rs index 438d715909c67..99537a2e69144 100644 --- a/utils/frame/remote-externalities/src/rpc_api.rs +++ b/utils/frame/remote-externalities/src/rpc_api.rs @@ -18,7 +18,6 @@ //! WS RPC API for one off RPC calls to a substrate node. // TODO: Consolidate one off RPC calls https://github.com/paritytech/substrate/issues/8988 -use futures::lock::Mutex; use jsonrpsee::{ core::client::{Client, ClientT}, rpc_params, From b93e4a69c352c1aa8088094980ca81de000c1ab6 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Piotr=20Miko=C5=82ajczyk?= Date: Mon, 5 Sep 2022 16:11:37 +0200 Subject: [PATCH 22/24] another take --- bin/node/cli/tests/common.rs | 2 +- .../frame/remote-externalities/src/rpc_api.rs | 30 ++++++++++++------- .../cli/src/commands/execute_block.rs | 4 +-- .../cli/src/commands/follow_chain.rs | 2 +- .../cli/src/commands/offchain_worker.rs | 2 +- utils/frame/try-runtime/cli/src/lib.rs | 4 ++- 6 files changed, 27 insertions(+), 17 deletions(-) diff --git a/bin/node/cli/tests/common.rs b/bin/node/cli/tests/common.rs index df04da2b809dd..3b83f4339611d 100644 --- a/bin/node/cli/tests/common.rs +++ b/bin/node/cli/tests/common.rs @@ -71,7 +71,7 @@ pub async fn wait_n_finalized_blocks( pub async fn wait_n_finalized_blocks_from(n: usize, url: &str) { let mut built_blocks = std::collections::HashSet::new(); let mut interval = tokio::time::interval(Duration::from_secs(2)); - let rpc_service = RpcService::new(url, false).await; + let rpc_service = RpcService::new(url, false).await.unwrap(); loop { if let Ok(block) = rpc_service.get_finalized_head::().await { diff --git a/utils/frame/remote-externalities/src/rpc_api.rs b/utils/frame/remote-externalities/src/rpc_api.rs index 99537a2e69144..3ea30a30221a2 100644 --- a/utils/frame/remote-externalities/src/rpc_api.rs +++ b/utils/frame/remote-externalities/src/rpc_api.rs @@ -48,7 +48,7 @@ impl RpcCall { /// General purpose method for making RPC calls. async fn make_request<'a, T: DeserializeOwned>( - client: &Client, + client: &Arc, call: RpcCall, params: Option>, ) -> Result { @@ -58,6 +58,11 @@ async fn make_request<'a, T: DeserializeOwned>( .map_err(|e| format!("{} request failed: {:?}", call.as_str(), e)) } +enum ConnectionPolicy { + Reuse(Arc), + Reconnect, +} + /// Simple RPC service that is capable of keeping the connection. /// /// Service will connect to `uri` for the first time already during initialization. @@ -65,15 +70,18 @@ async fn make_request<'a, T: DeserializeOwned>( /// Be careful with reusing the connection in a multithreaded environment. pub struct RpcService { uri: String, - client: Arc>, + policy: ConnectionPolicy, } impl RpcService { - /// Creates a new RPC service. Connects to `uri` right away. - pub async fn new>(uri: S, keep_connection: bool) -> Self { - let maybe_client = keep_connection - .then_some(Self::build_client(uri).await.expect("`RpcService` failed to connect")); - Self { uri: uri.as_ref().to_string(), client: Arc::new(maybe_client) } + /// Creates a new RPC service. If `keep_connection`, then connects to `uri` right away. + pub async fn new>(uri: S, keep_connection: bool) -> Result { + let policy = if keep_connection { + ConnectionPolicy::Reuse(Arc::new(Self::build_client(uri.as_ref()).await?)) + } else { + ConnectionPolicy::Reconnect + }; + Ok(Self { uri: uri.as_ref().to_string(), policy }) } /// Returns the address at which requests are sent. @@ -96,11 +104,11 @@ impl RpcService { call: RpcCall, params: Option>, ) -> Result { - match self.client { + match self.policy { // `self.keep_connection` must have been `true`. - Some(ref client) => make_request(client, call, params).await, - None => { - let client = self.build_client().await?; + ConnectionPolicy::Reuse(ref client) => make_request(client, call, params).await, + ConnectionPolicy::Reconnect => { + let client = Arc::new(Self::build_client(&self.uri).await?); make_request(&client, call, params).await }, } diff --git a/utils/frame/try-runtime/cli/src/commands/execute_block.rs b/utils/frame/try-runtime/cli/src/commands/execute_block.rs index 850b2f67ad968..70ba3615f874a 100644 --- a/utils/frame/try-runtime/cli/src/commands/execute_block.rs +++ b/utils/frame/try-runtime/cli/src/commands/execute_block.rs @@ -89,7 +89,7 @@ impl ExecuteBlockCmd { Block::Hash: FromStr, ::Err: Debug, { - let rpc_service = rpc_api::RpcService::new(ws_uri, false).await; + let rpc_service = rpc_api::RpcService::new(ws_uri, false).await?; match (&self.block_at, &self.state) { (Some(block_at), State::Snap { .. }) => hash_of::(block_at), @@ -148,7 +148,7 @@ where let block_ws_uri = command.block_ws_uri::(); let block_at = command.block_at::(block_ws_uri.clone()).await?; - let rpc_service = rpc_api::RpcService::new(block_ws_uri.clone(), false).await; + let rpc_service = rpc_api::RpcService::new(block_ws_uri.clone(), false).await?; let block: Block = rpc_service.get_block::(block_at).await?; let parent_hash = block.header().parent_hash(); log::info!( diff --git a/utils/frame/try-runtime/cli/src/commands/follow_chain.rs b/utils/frame/try-runtime/cli/src/commands/follow_chain.rs index 4f8c66291b8f0..f493d5c10cd29 100644 --- a/utils/frame/try-runtime/cli/src/commands/follow_chain.rs +++ b/utils/frame/try-runtime/cli/src/commands/follow_chain.rs @@ -229,7 +229,7 @@ where let executor = build_executor::(&shared, &config); let execution = shared.execution; - let rpc_service = RpcService::new(&command.uri, command.keep_connection).await; + let rpc_service = RpcService::new(&command.uri, command.keep_connection).await?; let mut finalized_headers: FinalizedHeaders> = FinalizedHeaders::new(&rpc_service, subscription); diff --git a/utils/frame/try-runtime/cli/src/commands/offchain_worker.rs b/utils/frame/try-runtime/cli/src/commands/offchain_worker.rs index b5816358676ab..a579692abd9e2 100644 --- a/utils/frame/try-runtime/cli/src/commands/offchain_worker.rs +++ b/utils/frame/try-runtime/cli/src/commands/offchain_worker.rs @@ -119,7 +119,7 @@ where let header_at = command.header_at::()?; let header_ws_uri = command.header_ws_uri::(); - let rpc_service = rpc_api::RpcService::new(header_ws_uri.clone(), false).await; + let rpc_service = rpc_api::RpcService::new(header_ws_uri.clone(), false).await?; let header = rpc_service.get_header::(header_at).await?; log::info!( target: LOG_TARGET, diff --git a/utils/frame/try-runtime/cli/src/lib.rs b/utils/frame/try-runtime/cli/src/lib.rs index 1f1303c772039..717e571583007 100644 --- a/utils/frame/try-runtime/cli/src/lib.rs +++ b/utils/frame/try-runtime/cli/src/lib.rs @@ -632,7 +632,9 @@ pub(crate) async fn ensure_matching_spec(None) .await From 9c0ce68ab40897e7cbacbf3d9da034129c943b71 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Piotr=20Miko=C5=82ajczyk?= Date: Mon, 5 Sep 2022 16:44:05 +0200 Subject: [PATCH 23/24] trigger polkadot pipeline --- utils/frame/try-runtime/cli/src/lib.rs | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/utils/frame/try-runtime/cli/src/lib.rs b/utils/frame/try-runtime/cli/src/lib.rs index 717e571583007..734da7ccda6fe 100644 --- a/utils/frame/try-runtime/cli/src/lib.rs +++ b/utils/frame/try-runtime/cli/src/lib.rs @@ -541,8 +541,8 @@ impl State { impl TryRuntimeCmd { pub async fn run(&self, config: Configuration) -> sc_cli::Result<()> where - Block: BlockT + serde::de::DeserializeOwned, - Block::Header: serde::de::DeserializeOwned, + Block: BlockT + DeserializeOwned, + Block::Header: DeserializeOwned, Block::Hash: FromStr, ::Err: Debug, NumberFor: FromStr, @@ -626,7 +626,7 @@ where /// /// If the spec names don't match, if `relaxed`, then it emits a warning, else it panics. /// If the spec versions don't match, it only ever emits a warning. -pub(crate) async fn ensure_matching_spec( +pub(crate) async fn ensure_matching_spec( uri: String, expected_spec_name: String, expected_spec_version: u32, From 305318c31d0d47eba096ae130877b93b681c5072 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Piotr=20Miko=C5=82ajczyk?= Date: Mon, 5 Sep 2022 18:21:19 +0200 Subject: [PATCH 24/24] trigger pipeline --- utils/frame/try-runtime/cli/src/lib.rs | 7 +++---- 1 file changed, 3 insertions(+), 4 deletions(-) diff --git a/utils/frame/try-runtime/cli/src/lib.rs b/utils/frame/try-runtime/cli/src/lib.rs index 734da7ccda6fe..c71496e0b850c 100644 --- a/utils/frame/try-runtime/cli/src/lib.rs +++ b/utils/frame/try-runtime/cli/src/lib.rs @@ -267,7 +267,8 @@ use parity_scale_codec::Decode; use remote_externalities::{ - Builder, Mode, OfflineConfig, OnlineConfig, SnapshotConfig, TestExternalities, + rpc_api::RpcService, Builder, Mode, OfflineConfig, OnlineConfig, SnapshotConfig, + TestExternalities, }; use sc_chain_spec::ChainSpec; use sc_cli::{ @@ -632,9 +633,7 @@ pub(crate) async fn ensure_matching_spec( expected_spec_version: u32, relaxed: bool, ) { - let rpc_service = remote_externalities::rpc_api::RpcService::new(uri.clone(), false) - .await - .unwrap(); + let rpc_service = RpcService::new(uri.clone(), false).await.unwrap(); match rpc_service .get_runtime_version::(None) .await