Skip to content

Commit

Permalink
Add support for all the request types for chainHead_unstable_storage (
Browse files Browse the repository at this point in the history
#813)

* Make sync_service::storage_query more generic

* Remove `storage_prefix_keys_query`

* Add TODO

* Add support for all types for `chainHead_storage`

* Field rename

* More documentation to storage_query

* Remove wrong comment

* Link to PR

* Docfix
  • Loading branch information
tomaka authored Jun 25, 2023
1 parent 8fddb06 commit fb82b15
Show file tree
Hide file tree
Showing 10 changed files with 625 additions and 197 deletions.
35 changes: 24 additions & 11 deletions lib/src/json_rpc/methods.rs
Original file line number Diff line number Diff line change
Expand Up @@ -478,9 +478,8 @@ define_methods! {
chainHead_unstable_storage(
#[rename = "followSubscription"] follow_subscription: Cow<'a, str>,
hash: HashHexString,
key: HexString,
items: Vec<ChainHeadStorageRequestItem>,
#[rename = "childTrie"] child_trie: Option<HexString>,
#[rename = "type"] ty: ChainHeadStorageType,
#[rename = "networkConfig"] network_config: Option<NetworkConfig>
) -> Cow<'a, str>,
chainHead_unstable_storageContinue(
Expand Down Expand Up @@ -734,6 +733,26 @@ pub enum ChainHeadCallEvent<'a> {
Disjoint {},
}

#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
pub struct ChainHeadStorageRequestItem {
pub key: HexString,
#[serde(rename = "type")]
pub ty: ChainHeadStorageType,
}

#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
pub struct ChainHeadStorageResponseItem {
pub key: HexString,
#[serde(skip_serializing_if = "Option::is_none")]
pub value: Option<HexString>,
#[serde(skip_serializing_if = "Option::is_none")]
pub hash: Option<HexString>,
#[serde(rename = "merkle-value-key", skip_serializing_if = "Option::is_none")]
pub merkle_value_key: Option<String>, // TODO: `String` because the number of hex digits can be uneven
#[serde(rename = "merkle-value", skip_serializing_if = "Option::is_none")]
pub merkle_value: Option<HexString>,
}

#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
pub enum ChainHeadStorageType {
#[serde(rename = "value")]
Expand All @@ -751,15 +770,9 @@ pub enum ChainHeadStorageType {
#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
#[serde(tag = "event")]
pub enum ChainHeadStorageEvent<'a> {
#[serde(rename = "item")]
Item {
key: HexString,
#[serde(skip_serializing_if = "Option::is_none")]
value: Option<HexString>,
#[serde(skip_serializing_if = "Option::is_none")]
hash: Option<HexString>,
#[serde(rename = "merkle-value", skip_serializing_if = "Option::is_none")]
merkle_value: Option<HexString>,
#[serde(rename = "items")]
Items {
items: Vec<ChainHeadStorageResponseItem>,
},
#[serde(rename = "done")]
Done,
Expand Down
3 changes: 3 additions & 0 deletions lib/src/trie/prefix_proof.rs
Original file line number Diff line number Diff line change
Expand Up @@ -269,6 +269,7 @@ impl PrefixScan {
if next.is_empty() && self.next_queries.is_empty() {
return Ok(ResumeOutcome::Success {
entries: self.final_result,
full_storage_values_required: self.full_storage_values_required,
});
}

Expand Down Expand Up @@ -303,6 +304,8 @@ pub enum ResumeOutcome {
Success {
/// List of entries who key starts with the requested prefix.
entries: Vec<(Vec<u8>, StorageValue)>,
/// Value that was passed as [`Config::full_storage_values_required`].
full_storage_values_required: bool,
},
}

Expand Down
2 changes: 1 addition & 1 deletion lib/src/trie/prefix_proof/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14500,7 +14500,7 @@ fn regression_test_174() {
prefix_scan = scan;
continue;
}
Ok(ResumeOutcome::Success { mut entries }) => {
Ok(ResumeOutcome::Success { mut entries, .. }) => {
let mut expected = EXPECTED.to_owned();
expected.sort();
entries.sort_by(|(key1, _), (key2, _)| key1.cmp(&key2));
Expand Down
29 changes: 29 additions & 0 deletions lib/src/trie/proof_decode.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1011,6 +1011,35 @@ impl<T: AsRef<[u8]>> DecodedTrieProof<T> {
}
}
}

/// Returns the key and Merkle value of the closest ancestor to the given key.
///
/// Returns `None` if the key has no ancestor within the trie.
pub fn closest_ancestor_merkle_value<'a>(
&'a self,
trie_root_merkle_value: &[u8; 32],
key: &[nibble::Nibble],
) -> Result<Option<(&'a [nibble::Nibble], trie_node::MerkleValueOutput)>, IncompleteProofError>
{
let (full_key, (_, node_value_range, _)) =
match self.closest_ancestor(trie_root_merkle_value, key) {
Ok(Some(v)) => v,
Ok(None) => return Ok(None),
Err(err) => return Err(err),
};

let node_value = &self.proof.as_ref()[node_value_range.clone()];
if node_value.len() < 32 {
Ok(Some((
full_key,
trie_node::MerkleValueOutput::from_bytes(node_value),
)))
} else {
let hash = blake2_rfc::blake2b::blake2b(32, &[], node_value);
let merkle_value = trie_node::MerkleValueOutput::from_bytes(hash.as_bytes());
Ok(Some((full_key, merkle_value)))
}
}
}

/// Proof doesn't contain enough information to answer the request.
Expand Down
60 changes: 55 additions & 5 deletions light-base/src/json_rpc_service/background.rs
Original file line number Diff line number Diff line change
Expand Up @@ -948,14 +948,33 @@ impl<TPlat: PlatformRef> Background<TPlat> {
block_number,
hash,
&state_trie_root_hash,
keys,
keys.clone().map(|key| sync_service::StorageRequestItem {
key: key.as_ref().to_vec(), // TODO: overhead
ty: sync_service::StorageRequestItemTy::Value,
}),
total_attempts,
timeout_per_request,
max_parallel,
)
.await
.map_err(StorageQueryError::StorageRetrieval)?;

let result = keys
.map(|key| {
result
.iter()
.find_map(|entry| match entry {
sync_service::StorageResultItem::Value { key: k, value }
if k == key.as_ref() =>
{
Some(value.clone()) // TODO: overhead
}
_ => None,
})
.unwrap()
})
.collect();

Ok(result)
}

Expand Down Expand Up @@ -1000,23 +1019,54 @@ impl<TPlat: PlatformRef> Background<TPlat> {
// Download the runtime of this block. This takes a long time as the runtime is rather
// big (around 1MiB in general).
let (storage_code, storage_heap_pages) = {
let mut code_query_result = self
let entries = self
.sync_service
.clone()
.storage_query(
block_number,
block_hash,
&state_trie_root_hash,
iter::once(&b":code"[..]).chain(iter::once(&b":heappages"[..])),
[
sync_service::StorageRequestItem {
key: b":code".to_vec(),
ty: sync_service::StorageRequestItemTy::Value,
},
sync_service::StorageRequestItem {
key: b":heappages".to_vec(),
ty: sync_service::StorageRequestItemTy::Value,
},
]
.into_iter(),
3,
Duration::from_secs(20),
NonZeroU32::new(1).unwrap(),
)
.await
.map_err(runtime_service::RuntimeCallError::StorageQuery)
.map_err(RuntimeCallError::Call)?;
let heap_pages = code_query_result.pop().unwrap();
let code = code_query_result.pop().unwrap();
// TODO: not elegant
let heap_pages = entries
.iter()
.find_map(|entry| match entry {
sync_service::StorageResultItem::Value { key, value }
if key == b":heappages" =>
{
Some(value.clone()) // TODO: overhead
}
_ => None,
})
.unwrap();
let code = entries
.iter()
.find_map(|entry| match entry {
sync_service::StorageResultItem::Value { key, value }
if key == b":code" =>
{
Some(value.clone()) // TODO: overhead
}
_ => None,
})
.unwrap();
(code, heap_pages)
};

Expand Down
123 changes: 82 additions & 41 deletions light-base/src/json_rpc_service/background/chain_head.rs
Original file line number Diff line number Diff line change
Expand Up @@ -838,9 +838,8 @@ impl<TPlat: PlatformRef> ChainHeadFollowTask<TPlat> {
async fn start_chain_head_storage(&mut self, request: service::SubscriptionStartProcess) {
let methods::MethodCall::chainHead_unstable_storage {
hash,
key,
items,
child_trie,
ty,
network_config,
..
} = request.request()
Expand Down Expand Up @@ -877,26 +876,6 @@ impl<TPlat: PlatformRef> ChainHeadFollowTask<TPlat> {
return;
}

let is_hash = match ty {
methods::ChainHeadStorageType::Value => false,
methods::ChainHeadStorageType::Hash => true,
methods::ChainHeadStorageType::DescendantsValues
| methods::ChainHeadStorageType::DescendantsHashes
| methods::ChainHeadStorageType::ClosestAncestorMerkleValue => {
// TODO: implement this
request.fail(json_rpc::parse::ErrorResponse::ServerError(
-32000,
"Child key storage queries not supported yet",
));
log::warn!(
target: &self.log_target,
"chainHead_unstable_storage has been called with a type other than value or hash. \
This isn't supported by smoldot yet."
);
return;
}
};

let mut subscription = request.accept();
let subscription_id = subscription.subscription_id().to_owned();

Expand Down Expand Up @@ -927,11 +906,36 @@ impl<TPlat: PlatformRef> ChainHeadFollowTask<TPlat> {
}
};

// Perform some API conversions.
let queries = items
.into_iter()
.map(|item| sync_service::StorageRequestItem {
key: item.key.0,
ty: match item.ty {
methods::ChainHeadStorageType::Value => {
sync_service::StorageRequestItemTy::Value
}
methods::ChainHeadStorageType::Hash => {
sync_service::StorageRequestItemTy::Hash
}
methods::ChainHeadStorageType::ClosestAncestorMerkleValue => {
sync_service::StorageRequestItemTy::ClosestAncestorMerkleValue
}
methods::ChainHeadStorageType::DescendantsValues => {
sync_service::StorageRequestItemTy::DescendantsValues
}
methods::ChainHeadStorageType::DescendantsHashes => {
sync_service::StorageRequestItemTy::DescendantsHashes
}
},
})
.collect::<Vec<_>>();

let future = sync_service.clone().storage_query(
decoded_header.number,
&hash.0,
decoded_header.state_root,
iter::once(key.0.clone()), // TODO: clone :-/
queries.into_iter(),
cmp::min(10, network_config.total_attempts),
Duration::from_millis(u64::from(cmp::min(
20000,
Expand All @@ -952,29 +956,66 @@ impl<TPlat: PlatformRef> ChainHeadFollowTask<TPlat> {
};

match outcome {
Ok(values) => {
// `storage_query` returns a list of values because it can perform
// multiple queries at once. In our situation, we only start one query
// and as such the outcome only ever contains one element.
debug_assert_eq!(values.len(), 1);
let value = values.into_iter().next().unwrap();
if let Some(mut value) = value {
if is_hash {
value = blake2_rfc::blake2b::blake2b(8, &[], &value)
.as_bytes()
.to_vec();
}
Ok(entries) => {
// Perform some API conversions.
let items = entries
.into_iter()
.filter_map(|item| match item {
sync_service::StorageResultItem::Value { key, value } => {
Some(methods::ChainHeadStorageResponseItem {
key: methods::HexString(key),
value: Some(methods::HexString(value?)),
hash: None,
merkle_value: None,
merkle_value_key: None,
})
}
sync_service::StorageResultItem::Hash { key, hash } => {
Some(methods::ChainHeadStorageResponseItem {
key: methods::HexString(key),
value: None,
hash: Some(methods::HexString(hash?.to_vec())),
merkle_value: None,
merkle_value_key: None,
})
}
sync_service::StorageResultItem::DescendantValue { key, value, .. } => {
Some(methods::ChainHeadStorageResponseItem {
key: methods::HexString(key),
value: Some(methods::HexString(value)),
hash: None,
merkle_value: None,
merkle_value_key: None,
})
}
sync_service::StorageResultItem::DescendantHash { key, hash, .. } => {
Some(methods::ChainHeadStorageResponseItem {
key: methods::HexString(key),
value: None,
hash: Some(methods::HexString(hash.to_vec())),
merkle_value: None,
merkle_value_key: None,
})
}
sync_service::StorageResultItem::ClosestAncestorMerkleValue { requested_key, merkle_value } => {
let (merkle_value_of, merkle_value) = merkle_value?;
Some(methods::ChainHeadStorageResponseItem {
key: methods::HexString(requested_key),
value: None,
hash: None,
merkle_value: Some(methods::HexString(merkle_value)),
merkle_value_key: Some(format!("0x{}", merkle_value_of.iter().map(|n| format!("{:x}", n)).collect::<String>())),
})
}
})
.collect::<Vec<_>>();

if !items.is_empty() {
subscription
.send_notification(
methods::ServerToClient::chainHead_unstable_storageEvent {
subscription: (&subscription_id).into(),
result: methods::ChainHeadStorageEvent::Item {
key,
value: Some(methods::HexString(value)),
hash: None,
merkle_value: None,
},
result: methods::ChainHeadStorageEvent::Items { items },
},
)
.await;
Expand Down
Loading

0 comments on commit fb82b15

Please sign in to comment.