From 1d5d4a484021ede73152bf71af37718fa38bb72b Mon Sep 17 00:00:00 2001 From: Julian Eager Date: Wed, 29 Nov 2023 14:46:10 +0800 Subject: [PATCH] Enable parallel key scraping (#1985) closes #174 --------- Co-authored-by: Liam Aharon Co-authored-by: Oliver Tale-Yazdi --- .../frame/remote-externalities/src/lib.rs | 193 ++++++++++++++---- 1 file changed, 156 insertions(+), 37 deletions(-) diff --git a/substrate/utils/frame/remote-externalities/src/lib.rs b/substrate/utils/frame/remote-externalities/src/lib.rs index 71e9320ebeeb..5c7a36867ff6 100644 --- a/substrate/utils/frame/remote-externalities/src/lib.rs +++ b/substrate/utils/frame/remote-externalities/src/lib.rs @@ -47,6 +47,7 @@ use std::{ fs, ops::{Deref, DerefMut}, path::{Path, PathBuf}, + sync::Arc, time::{Duration, Instant}, }; use substrate_rpc_client::{rpc_params, BatchRequestBuilder, ChainApi, ClientT, StateApi}; @@ -298,6 +299,7 @@ impl Default for SnapshotConfig { } /// Builder for remote-externalities. +#[derive(Clone)] pub struct Builder { /// Custom key-pairs to be injected into the final externalities. The *hashed* keys and values /// must be given. @@ -400,41 +402,134 @@ where }) } - /// Get all the keys at `prefix` at `hash` using the paged, safe RPC methods. - async fn rpc_get_keys_paged( + /// Get keys with `prefix` at `block` in a parallel manner. + async fn rpc_get_keys_parallel( &self, - prefix: StorageKey, - at: B::Hash, + prefix: &StorageKey, + block: B::Hash, + parallel: usize, + ) -> Result, &'static str> { + /// Divide the workload and return the start key of each chunks. Guaranteed to return a + /// non-empty list. + fn gen_start_keys(prefix: &StorageKey) -> Vec { + let mut prefix = prefix.as_ref().to_vec(); + let scale = 32usize.saturating_sub(prefix.len()); + + // no need to divide workload + if scale < 9 { + prefix.extend(vec![0; scale]); + return vec![StorageKey(prefix)] + } + + let chunks = 16; + let step = 0x10000 / chunks; + let ext = scale - 2; + + (0..chunks) + .map(|i| { + let mut key = prefix.clone(); + let start = i * step; + key.extend(vec![(start >> 8) as u8, (start & 0xff) as u8]); + key.extend(vec![0; ext]); + StorageKey(key) + }) + .collect() + } + + let start_keys = gen_start_keys(&prefix); + let start_keys: Vec> = start_keys.iter().map(Some).collect(); + let mut end_keys: Vec> = start_keys[1..].to_vec(); + end_keys.push(None); + + // use a semaphore to limit max scraping tasks + let parallel = Arc::new(tokio::sync::Semaphore::new(parallel)); + let builder = Arc::new(self.clone()); + let mut handles = vec![]; + + for (start_key, end_key) in start_keys.into_iter().zip(end_keys) { + let permit = parallel + .clone() + .acquire_owned() + .await + .expect("semaphore is not closed until the end of loop"); + + let builder = builder.clone(); + let prefix = prefix.clone(); + let start_key = start_key.cloned(); + let end_key = end_key.cloned(); + + let handle = tokio::spawn(async move { + let res = builder + .rpc_get_keys_in_range(&prefix, block, start_key.as_ref(), end_key.as_ref()) + .await; + drop(permit); + res + }); + + handles.push(handle); + } + + parallel.close(); + + let keys = futures::future::join_all(handles) + .await + .into_iter() + .filter_map(|res| match res { + Ok(Ok(keys)) => Some(keys), + _ => None, + }) + .flatten() + .collect::>(); + + Ok(keys) + } + + /// Get all keys with `prefix` within the given range at `block`. + /// Both `start_key` and `end_key` are optional if you want an open-ended range. + async fn rpc_get_keys_in_range( + &self, + prefix: &StorageKey, + block: B::Hash, + start_key: Option<&StorageKey>, + end_key: Option<&StorageKey>, ) -> Result, &'static str> { - let mut last_key: Option = None; - let mut all_keys: Vec = vec![]; - let keys = loop { + let mut last_key: Option<&StorageKey> = start_key; + let mut keys: Vec = vec![]; + + loop { // This loop can hit the node with very rapid requests, occasionally causing it to // error out in CI (https://github.com/paritytech/substrate/issues/14129), so we retry. let retry_strategy = FixedInterval::new(Self::KEYS_PAGE_RETRY_INTERVAL).take(Self::MAX_RETRIES); let get_page_closure = - || self.get_keys_single_page(Some(prefix.clone()), last_key.clone(), at); - let page = Retry::spawn(retry_strategy, get_page_closure).await?; - let page_len = page.len(); + || self.get_keys_single_page(Some(prefix.clone()), last_key.cloned(), block); + let mut page = Retry::spawn(retry_strategy, get_page_closure).await?; - all_keys.extend(page); + // avoid duplicated keys across workloads + if let (Some(last), Some(end)) = (page.last(), end_key) { + if last >= end { + page.retain(|key| key < end); + } + } + let page_len = page.len(); + keys.extend(page); + last_key = keys.last(); + + // scraping out of range or no more matches, + // we are done either way if page_len < Self::DEFAULT_KEY_DOWNLOAD_PAGE as usize { log::debug!(target: LOG_TARGET, "last page received: {}", page_len); - break all_keys - } else { - let new_last_key = - all_keys.last().expect("all_keys is populated; has .last(); qed"); - log::debug!( - target: LOG_TARGET, - "new total = {}, full page received: {}", - all_keys.len(), - HexDisplay::from(new_last_key) - ); - last_key = Some(new_last_key.clone()); - }; - }; + break + } + + log::debug!( + target: LOG_TARGET, + "new total = {}, full page received: {}", + keys.len(), + HexDisplay::from(last_key.expect("full page received, cannot be None")) + ); + } Ok(keys) } @@ -529,7 +624,7 @@ where "Batch request failed ({}/{} retries). Error: {}", retries, Self::MAX_RETRIES, - e.to_string() + e ); // after 2 subsequent failures something very wrong is happening. log a warning // and reset the batch size down to 1. @@ -590,7 +685,7 @@ where /// map them to values one by one. /// /// This can work with public nodes. But, expect it to be darn slow. - pub(crate) async fn rpc_get_pairs_paged( + pub(crate) async fn rpc_get_pairs( &self, prefix: StorageKey, at: B::Hash, @@ -598,8 +693,10 @@ where ) -> Result, &'static str> { let start = Instant::now(); let mut sp = Spinner::with_timer(Spinners::Dots, "Scraping keys...".into()); + // TODO We could start downloading when having collected the first batch of keys + // https://github.com/paritytech/polkadot-sdk/issues/2494 let keys = self - .rpc_get_keys_paged(prefix.clone(), at) + .rpc_get_keys_parallel(&prefix, at, Self::PARALLEL_REQUESTS) .await? .into_iter() .collect::>(); @@ -628,9 +725,9 @@ where .unwrap() .progress_chars("=>-"), ); - let payloads_chunked = payloads.chunks((&payloads.len() / Self::PARALLEL_REQUESTS).max(1)); + let payloads_chunked = payloads.chunks((payloads.len() / Self::PARALLEL_REQUESTS).max(1)); let requests = payloads_chunked.map(|payload_chunk| { - Self::get_storage_data_dynamic_batch_size(&client, payload_chunk.to_vec(), &bar) + Self::get_storage_data_dynamic_batch_size(client, payload_chunk.to_vec(), &bar) }); // Execute the requests and move the Result outside. let storage_data_result: Result, _> = @@ -644,7 +741,7 @@ where }, }; bar.finish_with_message("✅ Downloaded key values"); - print!("\n"); + println!(); // Check if we got responses for all submitted requests. assert_eq!(keys.len(), storage_data.len()); @@ -778,8 +875,9 @@ where pending_ext: &mut TestExternalities>, ) -> Result { let child_roots = top_kv - .into_iter() - .filter_map(|(k, _)| is_default_child_storage_key(k.as_ref()).then(|| k.clone())) + .iter() + .filter(|(k, _)| is_default_child_storage_key(k.as_ref())) + .map(|(k, _)| k.clone()) .collect::>(); if child_roots.is_empty() { @@ -799,11 +897,10 @@ where let mut child_kv = vec![]; for prefixed_top_key in child_roots { let child_keys = - Self::rpc_child_get_keys(&client, &prefixed_top_key, StorageKey(vec![]), at) - .await?; + Self::rpc_child_get_keys(client, &prefixed_top_key, StorageKey(vec![]), at).await?; let child_kv_inner = - Self::rpc_child_get_storage_paged(&client, &prefixed_top_key, child_keys, at) + Self::rpc_child_get_storage_paged(client, &prefixed_top_key, child_keys, at) .await?; let prefixed_top_key = PrefixedStorageKey::new(prefixed_top_key.clone().0); @@ -846,7 +943,7 @@ where for prefix in &config.hashed_prefixes { let now = std::time::Instant::now(); let additional_key_values = - self.rpc_get_pairs_paged(StorageKey(prefix.to_vec()), at, pending_ext).await?; + self.rpc_get_pairs(StorageKey(prefix.to_vec()), at, pending_ext).await?; let elapsed = now.elapsed(); log::info!( target: LOG_TARGET, @@ -1110,7 +1207,7 @@ mod test_prelude { pub(crate) type Block = RawBlock>; pub(crate) fn init_logger() { - let _ = sp_tracing::try_init_simple(); + sp_tracing::try_init_simple(); } } @@ -1440,4 +1537,26 @@ mod remote_tests { .unwrap() .execute_with(|| {}); } + + #[tokio::test] + async fn can_fetch_in_parallel() { + init_logger(); + + let uri = String::from("wss://kusama-bridge-hub-rpc.polkadot.io:443"); + let mut builder = Builder::::new() + .mode(Mode::Online(OnlineConfig { transport: uri.into(), ..Default::default() })); + builder.init_remote_client().await.unwrap(); + + let at = builder.as_online().at.unwrap(); + + let prefix = StorageKey(vec![13]); + let paged = builder.rpc_get_keys_in_range(&prefix, at, None, None).await.unwrap(); + let para = builder.rpc_get_keys_parallel(&prefix, at, 4).await.unwrap(); + assert_eq!(paged, para); + + let prefix = StorageKey(vec![]); + let paged = builder.rpc_get_keys_in_range(&prefix, at, None, None).await.unwrap(); + let para = builder.rpc_get_keys_parallel(&prefix, at, 8).await.unwrap(); + assert_eq!(paged, para); + } }