Skip to content
This repository has been archived by the owner on Nov 15, 2023. It is now read-only.

new remote-ext mode OfflineOrElseOnline #10192

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions utils/frame/remote-externalities/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ sp-version = { version = "4.0.0-dev", path = "../../../primitives/version" }
[dev-dependencies]
tokio = { version = "1.10", features = ["macros", "rt-multi-thread"] }
pallet-elections-phragmen = { path = "../../../frame/elections-phragmen", version = "5.0.0-dev" }
frame-support = { path = "../../../frame/support", version = "4.0.0-dev" }

[features]
remote-test = []
122 changes: 92 additions & 30 deletions utils/frame/remote-externalities/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@

use codec::{Decode, Encode};
use jsonrpsee_ws_client::{types::v2::params::JsonRpcParams, WsClient, WsClientBuilder};
use log::*;
use sp_core::{
hashing::twox_128,
hexdisplay::HexDisplay,
Expand Down Expand Up @@ -62,10 +61,12 @@ jsonrpsee_proc_macros::rpc_client_api! {
/// The execution mode.
#[derive(Clone)]
pub enum Mode<B: BlockT> {
/// Online.
/// Online. Potentially writes to a cache file.
Online(OnlineConfig<B>),
/// Offline. Uses a state snapshot file and needs not any client config.
Offline(OfflineConfig),
/// Prefer using a cache file if it exists, else use a remote server.
OfflineOrElseOnline(OfflineConfig, OnlineConfig<B>),
}

impl<B: BlockT> Default for Mode<B> {
Expand All @@ -83,6 +84,12 @@ pub struct OfflineConfig {
pub state_snapshot: SnapshotConfig,
}

impl<P: Into<PathBuf>> From<P> for SnapshotConfig {
fn from(p: P) -> Self {
Self { path: p.into() }
}
}

/// Description of the transport protocol (for online execution).
#[derive(Debug)]
pub struct Transport {
Expand Down Expand Up @@ -193,13 +200,15 @@ impl<B: BlockT> Builder<B> {
fn as_online(&self) -> &OnlineConfig<B> {
match &self.mode {
Mode::Online(config) => &config,
Mode::OfflineOrElseOnline(_, config) => &config,
_ => panic!("Unexpected mode: Online"),
}
}

fn as_online_mut(&mut self) -> &mut OnlineConfig<B> {
match &mut self.mode {
Mode::Online(config) => config,
Mode::OfflineOrElseOnline(_, config) => config,
_ => panic!("Unexpected mode: Online"),
}
}
Expand All @@ -212,19 +221,19 @@ impl<B: BlockT> Builder<B> {
key: StorageKey,
maybe_at: Option<B::Hash>,
) -> Result<StorageData, &'static str> {
trace!(target: LOG_TARGET, "rpc: get_storage");
log::trace!(target: LOG_TARGET, "rpc: get_storage");
RpcApi::<B>::get_storage(self.as_online().rpc_client(), key, maybe_at)
.await
.map_err(|e| {
error!("Error = {:?}", e);
log::error!(target: LOG_TARGET, "Error = {:?}", e);
"rpc get_storage failed."
})
}
/// Get the latest finalized head.
async fn rpc_get_head(&self) -> Result<B::Hash, &'static str> {
trace!(target: LOG_TARGET, "rpc: finalized_head");
log::trace!(target: LOG_TARGET, "rpc: finalized_head");
RpcApi::<B>::finalized_head(self.as_online().rpc_client()).await.map_err(|e| {
error!("Error = {:?}", e);
log::error!(target: LOG_TARGET, "Error = {:?}", e);
"rpc finalized_head failed."
})
}
Expand All @@ -248,19 +257,19 @@ impl<B: BlockT> Builder<B> {
)
.await
.map_err(|e| {
error!(target: LOG_TARGET, "Error = {:?}", e);
log::error!(target: LOG_TARGET, "Error = {:?}", e);
"rpc get_keys failed"
})?;
let page_len = page.len();
all_keys.extend(page);

if page_len < PAGE as usize {
debug!(target: LOG_TARGET, "last page received: {}", page_len);
log::debug!(target: LOG_TARGET, "last page received: {}", page_len);
break all_keys
} else {
let new_last_key =
all_keys.last().expect("all_keys is populated; has .last(); qed");
debug!(
log::debug!(
target: LOG_TARGET,
"new total = {}, full page received: {:?}",
all_keys.len(),
Expand All @@ -286,7 +295,7 @@ impl<B: BlockT> Builder<B> {
use serde_json::to_value;
let keys = self.get_keys_paged(prefix, at).await?;
let keys_count = keys.len();
debug!(target: LOG_TARGET, "Querying a total of {} keys", keys.len());
log::debug!(target: LOG_TARGET, "Querying a total of {} keys", keys.len());

let mut key_values: Vec<KeyPair> = vec![];
let client = self.as_online().rpc_client();
Expand Down Expand Up @@ -323,7 +332,7 @@ impl<B: BlockT> Builder<B> {
key_values.push((key.clone(), value));
if key_values.len() % (10 * BATCH_SIZE) == 0 {
let ratio: f64 = key_values.len() as f64 / keys_count as f64;
debug!(
log::debug!(
target: LOG_TARGET,
"progress = {:.2} [{} / {}]",
ratio,
Expand All @@ -342,14 +351,14 @@ impl<B: BlockT> Builder<B> {
impl<B: BlockT> Builder<B> {
/// Save the given data as state snapshot.
fn save_state_snapshot(&self, data: &[KeyPair], path: &Path) -> Result<(), &'static str> {
debug!(target: LOG_TARGET, "writing to state snapshot file {:?}", path);
log::debug!(target: LOG_TARGET, "writing to state snapshot file {:?}", path);
fs::write(path, data.encode()).map_err(|_| "fs::write failed.")?;
Ok(())
}

/// initialize `Self` from state snapshot. Panics if the file does not exist.
fn load_state_snapshot(&self, path: &Path) -> Result<Vec<KeyPair>, &'static str> {
info!(target: LOG_TARGET, "scraping key-pairs from state snapshot {:?}", path);
log::info!(target: LOG_TARGET, "scraping key-pairs from state snapshot {:?}", path);
let bytes = fs::read(path).map_err(|_| "fs::read failed.")?;
Decode::decode(&mut &*bytes).map_err(|_| "decode failed")
}
Expand All @@ -362,14 +371,14 @@ impl<B: BlockT> Builder<B> {
.at
.expect("online config must be initialized by this point; qed.")
.clone();
info!(target: LOG_TARGET, "scraping key-pairs from remote @ {:?}", at);
log::info!(target: LOG_TARGET, "scraping key-pairs from remote @ {:?}", at);

let mut keys_and_values = if config.pallets.len() > 0 {
let mut filtered_kv = vec![];
for f in config.pallets.iter() {
let hashed_prefix = StorageKey(twox_128(f.as_bytes()).to_vec());
let module_kv = self.rpc_get_pairs_paged(hashed_prefix.clone(), at).await?;
info!(
log::info!(
target: LOG_TARGET,
"downloaded data for module {} (count: {} / prefix: {:?}).",
f,
Expand All @@ -380,12 +389,12 @@ impl<B: BlockT> Builder<B> {
}
filtered_kv
} else {
info!(target: LOG_TARGET, "downloading data for all pallets.");
log::info!(target: LOG_TARGET, "downloading data for all pallets.");
self.rpc_get_pairs_paged(StorageKey(vec![]), at).await?
};

for prefix in &self.hashed_prefixes {
info!(
log::info!(
target: LOG_TARGET,
"adding data for hashed prefix: {:?}",
HexDisplay::from(prefix)
Expand All @@ -397,7 +406,11 @@ impl<B: BlockT> Builder<B> {

for key in &self.hashed_keys {
let key = StorageKey(key.to_vec());
info!(target: LOG_TARGET, "adding data for hashed key: {:?}", HexDisplay::from(&key));
log::info!(
target: LOG_TARGET,
"adding data for hashed key: {:?}",
HexDisplay::from(&key)
);
let value = self.rpc_get_storage(key.clone(), Some(at)).await?;
keys_and_values.push((key, value));
}
Expand All @@ -407,7 +420,7 @@ impl<B: BlockT> Builder<B> {

pub(crate) async fn init_remote_client(&mut self) -> Result<(), &'static str> {
let mut online = self.as_online_mut();
debug!(target: LOG_TARGET, "initializing remote client to {:?}", online.transport.uri);
log::debug!(target: LOG_TARGET, "initializing remote client to {:?}", online.transport.uri);

// First, initialize the ws client.
let ws_client = WsClientBuilder::default()
Expand Down Expand Up @@ -437,11 +450,23 @@ impl<B: BlockT> Builder<B> {
}
kp
},
Mode::OfflineOrElseOnline(offline_config, online_config) => {
if let Ok(kv) = self.load_state_snapshot(&offline_config.state_snapshot.path) {
kv
} else {
self.init_remote_client().await?;
let kp = self.load_remote().await?;
if let Some(c) = online_config.state_snapshot {
self.save_state_snapshot(&kp, &c.path)?;
}
kp
}
},
};

// inject manual key values.
if !self.hashed_key_values.is_empty() {
debug!(
log::debug!(
target: LOG_TARGET,
"extending externalities with {} manually injected key-values",
self.hashed_key_values.len()
Expand All @@ -451,7 +476,7 @@ impl<B: BlockT> Builder<B> {

// exclude manual key values.
if !self.hashed_blacklist.is_empty() {
debug!(
log::debug!(
target: LOG_TARGET,
"excluding externalities from {} keys",
self.hashed_blacklist.len()
Expand Down Expand Up @@ -522,7 +547,7 @@ impl<B: BlockT> Builder<B> {
let kv = self.pre_build().await?;
let mut ext = TestExternalities::new_empty();

info!(target: LOG_TARGET, "injecting a total of {} keys", kv.len());
log::info!(target: LOG_TARGET, "injecting a total of {} keys", kv.len());
for (k, v) in kv {
let (k, v) = (k.0, v.0);
// Insert the key,value pair into the test trie backend
Expand Down Expand Up @@ -603,12 +628,51 @@ mod remote_tests {

const REMOTE_INACCESSIBLE: &'static str = "Can't reach the remote node. Is it running?";

#[tokio::test]
async fn offline_else_online_works() {
init_logger();
// this shows that in the second run, we use the remote and create a cache.
Builder::<Block>::new()
.mode(Mode::OfflineOrElseOnline(
OfflineConfig {
state_snapshot: SnapshotConfig::new("test_snapshot_to_remove.bin"),
},
OnlineConfig {
pallets: vec!["Proxy".to_owned()],
state_snapshot: Some(SnapshotConfig::new("test_snapshot_to_remove.bin")),
..Default::default()
},
))
.build()
.await
.expect(REMOTE_INACCESSIBLE)
.execute_with(|| {});

// this shows that in the second run, we are not using the remote
Builder::<Block>::new()
.mode(Mode::OfflineOrElseOnline(
OfflineConfig {
state_snapshot: SnapshotConfig::new("test_snapshot_to_remove.bin"),
},
OnlineConfig {
pallets: vec!["Proxy".to_owned()],
state_snapshot: Some(SnapshotConfig::new("test_snapshot_to_remove.bin")),
transport: "ws://non-existent:666".to_owned().into(),
..Default::default()
},
))
.build()
.await
.expect(REMOTE_INACCESSIBLE)
.execute_with(|| {});
}

#[tokio::test]
async fn can_build_one_pallet() {
init_logger();
Builder::<Block>::new()
.mode(Mode::Online(OnlineConfig {
pallets: vec!["System".to_owned()],
pallets: vec!["Proxy".to_owned()],
..Default::default()
}))
.build()
Expand All @@ -622,11 +686,7 @@ mod remote_tests {
init_logger();
Builder::<Block>::new()
.mode(Mode::Online(OnlineConfig {
pallets: vec![
"Proxy".to_owned(),
"Multisig".to_owned(),
"PhragmenElection".to_owned(),
],
pallets: vec!["Proxy".to_owned(), "Multisig".to_owned()],
..Default::default()
}))
.build()
Expand All @@ -639,6 +699,7 @@ mod remote_tests {
async fn sanity_check_decoding() {
use pallet_elections_phragmen::SeatHolder;
use sp_core::crypto::Ss58Codec;

type AccountId = sp_runtime::AccountId32;
type Balance = u128;
frame_support::generate_storage_alias!(
Expand Down Expand Up @@ -676,15 +737,15 @@ mod remote_tests {
Builder::<Block>::new()
.mode(Mode::Online(OnlineConfig {
state_snapshot: Some(SnapshotConfig::new("test_snapshot_to_remove.bin")),
pallets: vec!["Balances".to_owned()],
pallets: vec!["Proxy".to_owned()],
..Default::default()
}))
.build()
.await
.expect(REMOTE_INACCESSIBLE)
.execute_with(|| {});

let to_delete = std::fs::read_dir(SnapshotConfig::default().path)
let to_delete = std::fs::read_dir(Path::new("."))
.unwrap()
.into_iter()
.map(|d| d.unwrap())
Expand All @@ -699,6 +760,7 @@ mod remote_tests {
}

#[tokio::test]
#[ignore = "takes too much time on average."]
async fn can_fetch_all() {
init_logger();
Builder::<Block>::new()
Expand Down