Skip to content
This repository has been archived by the owner on Nov 15, 2023. It is now read-only.

try-runtime::follow-chain - keep connection #12167

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 3 additions & 2 deletions bin/node/cli/tests/common.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ use nix::{
unistd::Pid,
};
use node_primitives::Block;
use remote_externalities::rpc_api;
use remote_externalities::rpc_api::RpcService;
use std::{
io::{BufRead, BufReader, Read},
ops::{Deref, DerefMut},
Expand Down Expand Up @@ -71,9 +71,10 @@ pub async fn wait_n_finalized_blocks(
pub async fn wait_n_finalized_blocks_from(n: usize, url: &str) {
let mut built_blocks = std::collections::HashSet::new();
let mut interval = tokio::time::interval(Duration::from_secs(2));
let rpc_service = RpcService::new(url, false).await.unwrap();

loop {
if let Ok(block) = rpc_api::get_finalized_head::<Block, _>(url.to_string()).await {
if let Ok(block) = rpc_service.get_finalized_head::<Block>().await {
built_blocks.insert(block);
if built_blocks.len() > n {
break
Expand Down
175 changes: 112 additions & 63 deletions utils/frame/remote-externalities/src/rpc_api.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,82 +19,131 @@
// TODO: Consolidate one off RPC calls https://github.com/paritytech/substrate/issues/8988

use jsonrpsee::{
core::client::ClientT,
core::client::{Client, ClientT},
rpc_params,
types::ParamsSer,
ws_client::{WsClient, WsClientBuilder},
};
use sp_runtime::{
generic::SignedBlock,
traits::{Block as BlockT, Header as HeaderT},
};

/// Get the header of the block identified by `at`
pub async fn get_header<Block, S>(from: S, at: Block::Hash) -> Result<Block::Header, String>
where
Block: BlockT,
Block::Header: serde::de::DeserializeOwned,
S: AsRef<str>,
{
let client = build_client(from).await?;
use serde::de::DeserializeOwned;
use sp_runtime::{generic::SignedBlock, traits::Block as BlockT};
use std::sync::Arc;

client
.request::<Block::Header>("chain_getHeader", rpc_params!(at))
.await
.map_err(|e| format!("chain_getHeader request failed: {:?}", e))
enum RpcCall {
GetHeader,
GetFinalizedHead,
GetBlock,
GetRuntimeVersion,
}

/// Get the finalized head
pub async fn get_finalized_head<Block, S>(from: S) -> Result<Block::Hash, String>
where
Block: BlockT,
S: AsRef<str>,
{
let client = build_client(from).await?;
impl RpcCall {
fn as_str(&self) -> &'static str {
match self {
RpcCall::GetHeader => "chain_getHeader",
RpcCall::GetFinalizedHead => "chain_getFinalizedHead",
RpcCall::GetBlock => "chain_getBlock",
RpcCall::GetRuntimeVersion => "state_getRuntimeVersion",
}
}
}

/// General purpose method for making RPC calls.
async fn make_request<'a, T: DeserializeOwned>(
client: &Arc<Client>,
call: RpcCall,
params: Option<ParamsSer<'a>>,
) -> Result<T, String> {
client
.request::<Block::Hash>("chain_getFinalizedHead", None)
.request::<T>(call.as_str(), params)
.await
.map_err(|e| format!("chain_getFinalizedHead request failed: {:?}", e))
.map_err(|e| format!("{} request failed: {:?}", call.as_str(), e))
}

/// Get the signed block identified by `at`.
pub async fn get_block<Block, S>(from: S, at: Block::Hash) -> Result<Block, String>
where
S: AsRef<str>,
Block: BlockT + serde::de::DeserializeOwned,
Block::Header: HeaderT,
{
let client = build_client(from).await?;
let signed_block = client
.request::<SignedBlock<Block>>("chain_getBlock", rpc_params!(at))
.await
.map_err(|e| format!("chain_getBlock request failed: {:?}", e))?;

Ok(signed_block.block)
enum ConnectionPolicy {
Reuse(Arc<Client>),
Reconnect,
}

/// Build a websocket client that connects to `from`.
async fn build_client<S: AsRef<str>>(from: S) -> Result<WsClient, String> {
WsClientBuilder::default()
.max_request_body_size(u32::MAX)
.build(from.as_ref())
.await
.map_err(|e| format!("`WsClientBuilder` failed to build: {:?}", e))
/// Simple RPC service that is capable of keeping the connection.
///
/// Service will connect to `uri` for the first time already during initialization.
///
/// Be careful with reusing the connection in a multithreaded environment.
pub struct RpcService {
uri: String,
policy: ConnectionPolicy,
}

/// Get the runtime version of a given chain.
pub async fn get_runtime_version<Block, S>(
from: S,
at: Option<Block::Hash>,
) -> Result<sp_version::RuntimeVersion, String>
where
S: AsRef<str>,
Block: BlockT + serde::de::DeserializeOwned,
Block::Header: HeaderT,
{
let client = build_client(from).await?;
client
.request::<sp_version::RuntimeVersion>("state_getRuntimeVersion", rpc_params!(at))
.await
.map_err(|e| format!("state_getRuntimeVersion request failed: {:?}", e))
impl RpcService {
/// Creates a new RPC service. If `keep_connection`, then connects to `uri` right away.
pub async fn new<S: AsRef<str>>(uri: S, keep_connection: bool) -> Result<Self, String> {
let policy = if keep_connection {
ConnectionPolicy::Reuse(Arc::new(Self::build_client(uri.as_ref()).await?))
} else {
ConnectionPolicy::Reconnect
};
Ok(Self { uri: uri.as_ref().to_string(), policy })
}

/// Returns the address at which requests are sent.
pub fn uri(&self) -> String {
self.uri.clone()
}

/// Build a websocket client that connects to `self.uri`.
async fn build_client<S: AsRef<str>>(uri: S) -> Result<WsClient, String> {
WsClientBuilder::default()
.max_request_body_size(u32::MAX)
.build(uri)
.await
.map_err(|e| format!("`WsClientBuilder` failed to build: {:?}", e))
}

/// Generic method for making RPC requests.
async fn make_request<'a, T: DeserializeOwned>(
&self,
call: RpcCall,
params: Option<ParamsSer<'a>>,
) -> Result<T, String> {
match self.policy {
// `self.keep_connection` must have been `true`.
ConnectionPolicy::Reuse(ref client) => make_request(client, call, params).await,
ConnectionPolicy::Reconnect => {
let client = Arc::new(Self::build_client(&self.uri).await?);
make_request(&client, call, params).await
},
}
}

/// Get the header of the block identified by `at`.
pub async fn get_header<Block>(&self, at: Block::Hash) -> Result<Block::Header, String>
where
Block: BlockT,
Block::Header: DeserializeOwned,
{
self.make_request(RpcCall::GetHeader, rpc_params!(at)).await
}

/// Get the finalized head.
pub async fn get_finalized_head<Block: BlockT>(&self) -> Result<Block::Hash, String> {
self.make_request(RpcCall::GetFinalizedHead, None).await
}

/// Get the signed block identified by `at`.
pub async fn get_block<Block: BlockT + DeserializeOwned>(
&self,
at: Block::Hash,
) -> Result<Block, String> {
Ok(self
.make_request::<SignedBlock<Block>>(RpcCall::GetBlock, rpc_params!(at))
.await?
.block)
}

/// Get the runtime version of a given chain.
pub async fn get_runtime_version<Block: BlockT + DeserializeOwned>(
&self,
at: Option<Block::Hash>,
) -> Result<sp_version::RuntimeVersion, String> {
self.make_request(RpcCall::GetRuntimeVersion, rpc_params!(at)).await
}
}
9 changes: 5 additions & 4 deletions utils/frame/try-runtime/cli/src/commands/execute_block.rs
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,8 @@ impl ExecuteBlockCmd {
Block::Hash: FromStr,
<Block::Hash as FromStr>::Err: Debug,
{
let rpc_service = rpc_api::RpcService::new(ws_uri, false).await?;

match (&self.block_at, &self.state) {
(Some(block_at), State::Snap { .. }) => hash_of::<Block>(block_at),
(Some(block_at), State::Live { .. }) => {
Expand All @@ -100,9 +102,7 @@ impl ExecuteBlockCmd {
target: LOG_TARGET,
"No --block-at or --at provided, using the latest finalized block instead"
);
remote_externalities::rpc_api::get_finalized_head::<Block, _>(ws_uri)
.await
.map_err(Into::into)
rpc_service.get_finalized_head::<Block>().await.map_err(Into::into)
},
(None, State::Live { at: Some(at), .. }) => hash_of::<Block>(at),
_ => {
Expand Down Expand Up @@ -148,7 +148,8 @@ where

let block_ws_uri = command.block_ws_uri::<Block>();
let block_at = command.block_at::<Block>(block_ws_uri.clone()).await?;
let block: Block = rpc_api::get_block::<Block, _>(block_ws_uri.clone(), block_at).await?;
let rpc_service = rpc_api::RpcService::new(block_ws_uri.clone(), false).await?;
let block: Block = rpc_service.get_block::<Block>(block_at).await?;
let parent_hash = block.header().parent_hash();
log::info!(
target: LOG_TARGET,
Expand Down
Loading