Skip to content

Commit

Permalink
Deliver chainHead_storage items progressively (#1605)
Browse files Browse the repository at this point in the history
* Temporarily rename `SyncService::storage_query`

* Add a replacement `storage_query`

* Provide the request index

* Update json_rpc_service::storage_query

* Update `ChainHeadFollowTask`

* Update `state_getKeys`

* Changelog

* Update `state_getKeysPaged`

* Update storage subscriptions

* Fix Wasm build

* Update `pinned_runtime_and_block_info`

* Update runtime service runtime download

* Remove `storage_query2`

* Comments and tweaks

* Buffer items in chainHead_storage

* Add TODO

* PR link

* Typo in CHANGELOG

* Fix warning

* Rustfmt
  • Loading branch information
tomaka committed Jan 24, 2024
1 parent 88d813e commit 6b62f40
Show file tree
Hide file tree
Showing 7 changed files with 731 additions and 523 deletions.
155 changes: 82 additions & 73 deletions light-base/src/json_rpc_service/background.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ use alloc::{
format,
string::{String, ToString as _},
sync::Arc,
vec,
vec::Vec,
};
use async_lock::Mutex;
Expand Down Expand Up @@ -738,13 +739,16 @@ impl<TPlat: PlatformRef> Background<TPlat> {
}
};

let result = self
// TODO: weird that keys is an iterator; revisit
let mut results = vec![None; keys.clone().count()];

let mut query = self
.sync_service
.clone()
.storage_query(
block_number,
hash,
&state_trie_root_hash,
*hash,
state_trie_root_hash,
keys.clone().map(|key| sync_service::StorageRequestItem {
key: key.as_ref().to_vec(), // TODO: overhead
ty: sync_service::StorageRequestItemTy::Value,
Expand All @@ -753,26 +757,27 @@ impl<TPlat: PlatformRef> Background<TPlat> {
timeout_per_request,
max_parallel,
)
.await
.map_err(StorageQueryError::StorageRetrieval)?;

let result = keys
.map(|key| {
result
.iter()
.find_map(|entry| match entry {
sync_service::StorageResultItem::Value { key: k, value }
if k == key.as_ref() =>
{
Some(value.clone()) // TODO: overhead
}
_ => None,
})
.unwrap()
})
.collect();
.advance()
.await;

Ok(result)
loop {
match query {
sync_service::StorageQueryProgress::Progress {
query: next,
request_index,
item: sync_service::StorageResultItem::Value { value, .. },
..
} => {
results[request_index] = value.clone();
query = next.advance().await;
}
sync_service::StorageQueryProgress::Progress { .. } => unreachable!(),
sync_service::StorageQueryProgress::Error(error) => {
return Err(StorageQueryError::StorageRetrieval(error))
}
sync_service::StorageQueryProgress::Finished => return Ok(results),
}
}
}

/// Obtain a pin of the runtime of the given block against the runtime service, plus the
Expand Down Expand Up @@ -830,13 +835,18 @@ impl<TPlat: PlatformRef> Background<TPlat> {
// Download the runtime of this block. This takes a long time as the runtime is rather
// big (around 1MiB in general).
let (storage_code, storage_heap_pages, code_merkle_value, code_closest_ancestor_excluding) = {
let entries = self
let mut storage_code = None;
let mut storage_heap_pages = None;
let mut code_merkle_value = None;
let mut code_closest_ancestor_excluding = None;

let mut query = self
.sync_service
.clone()
.storage_query(
block_number,
block_hash,
&state_trie_root_hash,
*block_hash,
state_trie_root_hash,
[
sync_service::StorageRequestItem {
key: b":code".to_vec(),
Expand All @@ -856,56 +866,55 @@ impl<TPlat: PlatformRef> Background<TPlat> {
Duration::from_secs(20),
NonZeroU32::new(1).unwrap(),
)
.await
.map_err(RuntimeCallError::StorageQuery)?;
// TODO: not elegant
let heap_pages = entries
.iter()
.find_map(|entry| match entry {
sync_service::StorageResultItem::Value { key, value }
if key == b":heappages" =>
{
Some(value.clone()) // TODO: overhead
.advance()
.await;

loop {
match query {
sync_service::StorageQueryProgress::Finished => {
break (
storage_code,
storage_heap_pages,
code_merkle_value,
code_closest_ancestor_excluding,
)
}
_ => None,
})
.unwrap();
let code = entries
.iter()
.find_map(|entry| match entry {
sync_service::StorageResultItem::Value { key, value } if key == b":code" => {
Some(value.clone()) // TODO: overhead
sync_service::StorageQueryProgress::Progress {
request_index: 0,
item:
sync_service::StorageResultItem::ClosestDescendantMerkleValue {
closest_descendant_merkle_value,
found_closest_ancestor_excluding,
..
},
query: next,
} => {
code_merkle_value = closest_descendant_merkle_value;
code_closest_ancestor_excluding = found_closest_ancestor_excluding;
query = next.advance().await;
}
_ => None,
})
.unwrap();
let (code_merkle_value, code_closest_ancestor_excluding) = if code.is_some() {
entries
.iter()
.find_map(|entry| match entry {
sync_service::StorageResultItem::ClosestDescendantMerkleValue {
requested_key,
closest_descendant_merkle_value,
found_closest_ancestor_excluding,
} if requested_key == b":code" => {
Some((
closest_descendant_merkle_value.clone(),
found_closest_ancestor_excluding.clone(),
)) // TODO overhead
}
_ => None,
})
.unwrap()
} else {
(None, None)
};

(
code,
heap_pages,
code_merkle_value,
code_closest_ancestor_excluding,
)
sync_service::StorageQueryProgress::Progress {
request_index: 1,
item: sync_service::StorageResultItem::Value { value, .. },
query: next,
} => {
storage_code = value;
query = next.advance().await;
}
sync_service::StorageQueryProgress::Progress {
request_index: 2,
item: sync_service::StorageResultItem::Value { value, .. },
query: next,
} => {
storage_heap_pages = value;
query = next.advance().await;
}
sync_service::StorageQueryProgress::Progress { .. } => unreachable!(),
sync_service::StorageQueryProgress::Error(error) => {
return Err(RuntimeCallError::StorageQuery(error))
}
}
}
};

// Give the code and heap pages to the runtime service. The runtime service will
Expand Down
Loading

0 comments on commit 6b62f40

Please sign in to comment.