Skip to content
This repository has been archived by the owner on Nov 15, 2023. It is now read-only.

try-runtime::follow-chain - keep connection #12167

Merged
Show file tree
Hide file tree
Changes from 7 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 3 additions & 2 deletions bin/node/cli/tests/common.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ use nix::{
unistd::Pid,
};
use node_primitives::Block;
use remote_externalities::rpc_api;
use remote_externalities::rpc_api::RpcService;
use std::{
io::{BufRead, BufReader, Read},
ops::{Deref, DerefMut},
Expand Down Expand Up @@ -71,9 +71,10 @@ pub async fn wait_n_finalized_blocks(
pub async fn wait_n_finalized_blocks_from(n: usize, url: &str) {
let mut built_blocks = std::collections::HashSet::new();
let mut interval = tokio::time::interval(Duration::from_secs(2));
let mut rpc_service = RpcService::new(url, false);

loop {
if let Ok(block) = rpc_api::get_finalized_head::<Block, _>(url.to_string()).await {
if let Ok(block) = rpc_service.get_finalized_head::<Block>().await {
built_blocks.insert(block);
if built_blocks.len() > n {
break
Expand Down
179 changes: 115 additions & 64 deletions utils/frame/remote-externalities/src/rpc_api.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,82 +19,133 @@
// TODO: Consolidate one off RPC calls https://github.com/paritytech/substrate/issues/8988

use jsonrpsee::{
core::client::ClientT,
core::client::{Client, ClientT},
rpc_params,
types::ParamsSer,
ws_client::{WsClient, WsClientBuilder},
};
use sp_runtime::{
generic::SignedBlock,
traits::{Block as BlockT, Header as HeaderT},
};

/// Get the header of the block identified by `at`
pub async fn get_header<Block, S>(from: S, at: Block::Hash) -> Result<Block::Header, String>
where
Block: BlockT,
Block::Header: serde::de::DeserializeOwned,
S: AsRef<str>,
{
let client = build_client(from).await?;
use serde::de::DeserializeOwned;
use sp_runtime::{generic::SignedBlock, traits::Block as BlockT};

client
.request::<Block::Header>("chain_getHeader", rpc_params!(at))
.await
.map_err(|e| format!("chain_getHeader request failed: {:?}", e))
enum RpcCall {
GetHeader,
GetFinalizedHead,
GetBlock,
GetRuntimeVersion,
}

/// Get the finalized head
pub async fn get_finalized_head<Block, S>(from: S) -> Result<Block::Hash, String>
where
Block: BlockT,
S: AsRef<str>,
{
let client = build_client(from).await?;
impl RpcCall {
fn as_str(&self) -> &'static str {
match self {
RpcCall::GetHeader => "chain_getHeader",
RpcCall::GetFinalizedHead => "chain_getFinalizedHead",
RpcCall::GetBlock => "chain_getBlock",
RpcCall::GetRuntimeVersion => "state_getRuntimeVersion",
}
}
}

/// General purpose method for making RPC calls.
async fn make_request<'a, T: DeserializeOwned>(
client: &Client,
call: RpcCall,
params: Option<ParamsSer<'a>>,
) -> Result<T, String> {
client
.request::<Block::Hash>("chain_getFinalizedHead", None)
.request::<T>(call.as_str(), params)
.await
.map_err(|e| format!("chain_getFinalizedHead request failed: {:?}", e))
.map_err(|e| format!("{} request failed: {:?}", call.as_str(), e))
}

/// Get the signed block identified by `at`.
pub async fn get_block<Block, S>(from: S, at: Block::Hash) -> Result<Block, String>
where
S: AsRef<str>,
Block: BlockT + serde::de::DeserializeOwned,
Block::Header: HeaderT,
{
let client = build_client(from).await?;
let signed_block = client
.request::<SignedBlock<Block>>("chain_getBlock", rpc_params!(at))
.await
.map_err(|e| format!("chain_getBlock request failed: {:?}", e))?;

Ok(signed_block.block)
/// Simple RPC service that is capable of keeping the connection.
///
/// Service will connect to `uri` for the first time during the first request. Instantiation
/// does not trigger connecting.
///
/// Be careful with reusing the connection in a multithreaded environment.
pub struct RpcService {
uri: String,
client: Option<Client>,
keep_connection: bool,
}

/// Build a websocket client that connects to `from`.
async fn build_client<S: AsRef<str>>(from: S) -> Result<WsClient, String> {
WsClientBuilder::default()
.max_request_body_size(u32::MAX)
.build(from.as_ref())
.await
.map_err(|e| format!("`WsClientBuilder` failed to build: {:?}", e))
}
impl RpcService {
/// Creates a new RPC service.
///
/// Does not connect yet.
pub fn new<S: AsRef<str>>(uri: S, keep_connection: bool) -> Self {
Self { uri: uri.as_ref().to_string(), client: None, keep_connection }
}

/// Get the runtime version of a given chain.
pub async fn get_runtime_version<Block, S>(
from: S,
at: Option<Block::Hash>,
) -> Result<sp_version::RuntimeVersion, String>
where
S: AsRef<str>,
Block: BlockT + serde::de::DeserializeOwned,
Block::Header: HeaderT,
{
let client = build_client(from).await?;
client
.request::<sp_version::RuntimeVersion>("state_getRuntimeVersion", rpc_params!(at))
.await
.map_err(|e| format!("state_getRuntimeVersion request failed: {:?}", e))
/// Returns the address at which requests are sent.
pub fn uri(&self) -> String {
self.uri.clone()
}

/// Whether to keep and reuse a single connection.
pub fn keep_connection(&self) -> bool {
self.keep_connection
}

/// Build a websocket client that connects to `self.uri`.
async fn build_client(&self) -> Result<WsClient, String> {
WsClientBuilder::default()
.max_request_body_size(u32::MAX)
.build(&self.uri)
.await
.map_err(|e| format!("`WsClientBuilder` failed to build: {:?}", e))
}

/// Generic method for making RPC requests.
async fn make_request<'a, T: DeserializeOwned>(
&mut self,
call: RpcCall,
params: Option<ParamsSer<'a>>,
) -> Result<T, String> {
match &self.client {
// `self.keep_connection` must be `true.
Some(ref client) => make_request(client, call, params).await,
None => {
let client = self.build_client().await?;
let result = make_request(&client, call, params).await;
if self.keep_connection {
niklasad1 marked this conversation as resolved.
Show resolved Hide resolved
self.client = Some(client)
};
result
},
}
}

/// Get the header of the block identified by `at`.
pub async fn get_header<Block>(&mut self, at: Block::Hash) -> Result<Block::Header, String>
where
Block: BlockT,
Block::Header: DeserializeOwned,
{
self.make_request(RpcCall::GetHeader, rpc_params!(at)).await
}

/// Get the finalized head.
pub async fn get_finalized_head<Block: BlockT>(&mut self) -> Result<Block::Hash, String> {
self.make_request(RpcCall::GetFinalizedHead, None).await
}

/// Get the signed block identified by `at`.
pub async fn get_block<Block: BlockT + DeserializeOwned>(
&mut self,
at: Block::Hash,
) -> Result<Block, String> {
Ok(self
.make_request::<SignedBlock<Block>>(RpcCall::GetBlock, rpc_params!(at))
.await?
.block)
}

/// Get the runtime version of a given chain.
pub async fn get_runtime_version<Block: BlockT + DeserializeOwned>(
&mut self,
at: Option<Block::Hash>,
) -> Result<sp_version::RuntimeVersion, String> {
self.make_request(RpcCall::GetRuntimeVersion, rpc_params!(at)).await
}
}
9 changes: 5 additions & 4 deletions utils/frame/try-runtime/cli/src/commands/execute_block.rs
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,8 @@ impl ExecuteBlockCmd {
Block::Hash: FromStr,
<Block::Hash as FromStr>::Err: Debug,
{
let mut rpc_service = rpc_api::RpcService::new(ws_uri, false);

match (&self.block_at, &self.state) {
(Some(block_at), State::Snap { .. }) => hash_of::<Block>(block_at),
(Some(block_at), State::Live { .. }) => {
Expand All @@ -100,9 +102,7 @@ impl ExecuteBlockCmd {
target: LOG_TARGET,
"No --block-at or --at provided, using the latest finalized block instead"
);
remote_externalities::rpc_api::get_finalized_head::<Block, _>(ws_uri)
.await
.map_err(Into::into)
rpc_service.get_finalized_head::<Block>().await.map_err(Into::into)
},
(None, State::Live { at: Some(at), .. }) => hash_of::<Block>(at),
_ => {
Expand Down Expand Up @@ -148,7 +148,8 @@ where

let block_ws_uri = command.block_ws_uri::<Block>();
let block_at = command.block_at::<Block>(block_ws_uri.clone()).await?;
let block: Block = rpc_api::get_block::<Block, _>(block_ws_uri.clone(), block_at).await?;
let mut rpc_service = rpc_api::RpcService::new(block_ws_uri.clone(), false);
let block: Block = rpc_service.get_block::<Block>(block_at).await?;
let parent_hash = block.header().parent_hash();
log::info!(
target: LOG_TARGET,
Expand Down
8 changes: 7 additions & 1 deletion utils/frame/try-runtime/cli/src/commands/follow_chain.rs
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,10 @@ pub struct FollowChainCmd {
/// round-robin fashion.
#[clap(long, default_value = "none")]
try_state: frame_try_runtime::TryStateSelect,

/// If present, a single connection to a node will be kept and reused for fetching blocks.
#[clap(long)]
keep_connection: bool,
}

pub(crate) async fn follow_chain<Block, ExecDispatch>(
Expand Down Expand Up @@ -90,6 +94,8 @@ where
let executor = build_executor::<ExecDispatch>(&shared, &config);
let execution = shared.execution;

let mut rpc_service = rpc_api::RpcService::new(&command.uri, command.keep_connection);

loop {
let header = match subscription.next().await {
Some(Ok(header)) => header,
Expand All @@ -106,7 +112,7 @@ where
let hash = header.hash();
let number = header.number();

let block = rpc_api::get_block::<Block, _>(&command.uri, hash).await.unwrap();
let block = rpc_service.get_block::<Block>(hash).await.unwrap();

log::debug!(
target: LOG_TARGET,
Expand Down
3 changes: 2 additions & 1 deletion utils/frame/try-runtime/cli/src/commands/offchain_worker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -119,7 +119,8 @@ where
let header_at = command.header_at::<Block>()?;
let header_ws_uri = command.header_ws_uri::<Block>();

let header = rpc_api::get_header::<Block, _>(header_ws_uri.clone(), header_at).await?;
let mut rpc_service = rpc_api::RpcService::new(header_ws_uri.clone(), false);
let header = rpc_service.get_header::<Block>(header_at).await?;
log::info!(
target: LOG_TARGET,
"fetched header from {:?}, block number: {:?}",
Expand Down
4 changes: 3 additions & 1 deletion utils/frame/try-runtime/cli/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -632,7 +632,9 @@ pub(crate) async fn ensure_matching_spec<Block: BlockT + serde::de::DeserializeO
expected_spec_version: u32,
relaxed: bool,
) {
match remote_externalities::rpc_api::get_runtime_version::<Block, _>(uri.clone(), None)
let mut rpc_service = remote_externalities::rpc_api::RpcService::new(uri.clone(), false);
match rpc_service
.get_runtime_version::<Block>(None)
.await
.map(|version| (String::from(version.spec_name.clone()), version.spec_version))
.map(|(spec_name, spec_version)| (spec_name.to_lowercase(), spec_version))
Expand Down