diff --git a/Cargo.lock b/Cargo.lock index bd7a29278a3e..bcd7f6bca8dd 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -6538,9 +6538,9 @@ dependencies = [ [[package]] name = "hermit-abi" -version = "0.3.2" +version = "0.3.9" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "443144c8cdadd93ebf52ddb4056d257f5b52c04d3c804e657d19eb73fc33668b" +checksum = "d231dfb89cfffdbc30e7fc41579ed6066ad03abda9e567ccafae602b97ec5024" [[package]] name = "hex" @@ -7050,7 +7050,7 @@ version = "1.0.11" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "eae7b9aee968036d54dce06cebaefd919e4472e753296daccd6d344e3e2df0c2" dependencies = [ - "hermit-abi 0.3.2", + "hermit-abi 0.3.9", "libc", "windows-sys 0.48.0", ] @@ -7085,7 +7085,7 @@ version = "0.4.9" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "cb0889898416213fab133e1d33a0e5858a48177452750691bde3666d0fdbaf8b" dependencies = [ - "hermit-abi 0.3.2", + "hermit-abi 0.3.9", "rustix 0.38.21", "windows-sys 0.48.0", ] @@ -8589,13 +8589,14 @@ dependencies = [ [[package]] name = "mio" -version = "0.8.11" +version = "1.0.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a4a650543ca06a924e8b371db273b2756685faae30f8487da1b56505a8f78b0c" +checksum = "80e04d1dcff3aae0704555fe5fee3bcfaf3d1fdf8a7e521d5b9d2b42acb52cec" dependencies = [ + "hermit-abi 0.3.9", "libc", "wasi", - "windows-sys 0.48.0", + "windows-sys 0.52.0", ] [[package]] @@ -9327,7 +9328,7 @@ version = "1.16.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "4161fcb6d602d4d2081af7c3a45852d875a03dd337a6bfdd6e06407b61342a43" dependencies = [ - "hermit-abi 0.3.2", + "hermit-abi 0.3.9", "libc", ] @@ -21765,28 +21766,27 @@ checksum = "1f3ccbac311fea05f86f61904b462b55fb3df8837a366dfc601a0161d0532f20" [[package]] name = "tokio" -version = "1.37.0" +version = "1.40.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "1adbebffeca75fcfd058afa480fb6c0b81e165a0323f9c9d39c9697e37c46787" +checksum = "e2b070231665d27ad9ec9b8df639893f46727666c6767db40317fbe920a5d998" dependencies = [ "backtrace", "bytes", "libc", "mio", - "num_cpus", "parking_lot 0.12.3", "pin-project-lite", "signal-hook-registry", "socket2 0.5.7", "tokio-macros", - "windows-sys 0.48.0", + "windows-sys 0.52.0", ] [[package]] name = "tokio-macros" -version = "2.2.0" +version = "2.4.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "5b8a1e28f2deaa14e508979454cb3a223b10b938b45af148bc0986de36f1923b" +checksum = "693d596312e88961bc67d7f1f97af8a70227d9f90c31bba5806eec004978d752" dependencies = [ "proc-macro2 1.0.82", "quote 1.0.35", diff --git a/Cargo.toml b/Cargo.toml index 1d20bba853eb..79d197b29407 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1292,7 +1292,7 @@ tikv-jemalloc-ctl = { version = "0.5.0" } tikv-jemallocator = { version = "0.5.0" } time = { version = "0.3" } tiny-keccak = { version = "2.0.2" } -tokio = { version = "1.37.0", default-features = false } +tokio = { version = "1.40.0", default-features = false } tokio-retry = { version = "0.3.0" } tokio-stream = { version = "0.1.14" } tokio-test = { version = "0.4.2" } diff --git a/prdoc/pr_5741.prdoc b/prdoc/pr_5741.prdoc new file mode 100644 index 000000000000..5eafbc90ee85 --- /dev/null +++ b/prdoc/pr_5741.prdoc @@ -0,0 +1,25 @@ +# Schema: Polkadot SDK PRDoc Schema (prdoc) v1.0.0 +# See doc at https://raw.githubusercontent.com/paritytech/polkadot-sdk/master/prdoc/schema_user.json + +title: make RPC endpoint `chainHead_v1_storage` faster + +doc: + - audience: Node Operator + description: | + The RPC endpoint `chainHead_v1_storage` now relies solely on backpressure to + determine how quickly to serve back values instead of handing back a fixed number + of entries and then expecting the client to ask for more. This should improve the + throughput for bigger storage queries significantly. + + Benchmarks using subxt on localhost: + - Iterate over 10 accounts on westend-dev -> ~2-3x faster + - Fetch 1024 storage values (i.e, not descedant values) -> ~50x faster + - Fetch 1024 descendant values -> ~500x faster + +crates: + - name: sc-rpc-spec-v2 + bump: major + - name: sc-rpc-server + bump: patch + - name: sc-service + bump: major diff --git a/substrate/client/rpc-spec-v2/Cargo.toml b/substrate/client/rpc-spec-v2/Cargo.toml index 51d1788d34db..da6b9a8c9f2f 100644 --- a/substrate/client/rpc-spec-v2/Cargo.toml +++ b/substrate/client/rpc-spec-v2/Cargo.toml @@ -35,8 +35,6 @@ sp-version.workspace = true sp-version.default-features = true sc-client-api.workspace = true sc-client-api.default-features = true -sc-utils.workspace = true -sc-utils.default-features = true sc-rpc.workspace = true sc-rpc.default-features = true codec = { workspace = true, default-features = true } @@ -68,3 +66,5 @@ sc-service = { features = ["test-helpers"], default-features = true, path = "../ assert_matches = { workspace = true } pretty_assertions = { workspace = true } sc-transaction-pool = { default-features = true, path = "../transaction-pool" } +sc-utils = { workspace = true } +sc-rpc = { workspace = true, features = ["test-helpers"] } \ No newline at end of file diff --git a/substrate/client/rpc-spec-v2/src/archive/archive.rs b/substrate/client/rpc-spec-v2/src/archive/archive.rs index 82c6b2cacc2f..dd6c566a76ed 100644 --- a/substrate/client/rpc-spec-v2/src/archive/archive.rs +++ b/substrate/client/rpc-spec-v2/src/archive/archive.rs @@ -275,6 +275,7 @@ where self.storage_max_descendant_responses, self.storage_max_queried_items, ); + Ok(storage_client.handle_query(hash, items, child_trie)) } } diff --git a/substrate/client/rpc-spec-v2/src/chain_head/chain_head.rs b/substrate/client/rpc-spec-v2/src/chain_head/chain_head.rs index a056b4d437c8..279a7594fef9 100644 --- a/substrate/client/rpc-spec-v2/src/chain_head/chain_head.rs +++ b/substrate/client/rpc-spec-v2/src/chain_head/chain_head.rs @@ -27,14 +27,15 @@ use crate::{ api::ChainHeadApiServer, chain_head_follow::ChainHeadFollower, error::Error as ChainHeadRpcError, - event::{FollowEvent, MethodResponse, OperationError}, - subscription::{SubscriptionManagement, SubscriptionManagementError}, + event::{FollowEvent, MethodResponse, OperationError, OperationId, OperationStorageItems}, + subscription::{StopHandle, SubscriptionManagement, SubscriptionManagementError}, + FollowEventSendError, FollowEventSender, }, - common::events::StorageQuery, + common::{events::StorageQuery, storage::QueryResult}, hex_string, SubscriptionTaskExecutor, }; use codec::Encode; -use futures::{channel::oneshot, future::FutureExt}; +use futures::{channel::oneshot, future::FutureExt, SinkExt}; use jsonrpsee::{ core::async_trait, server::ResponsePayload, types::SubscriptionId, ConnectionId, Extensions, MethodResponseFuture, PendingSubscriptionSink, SubscriptionSink, @@ -51,9 +52,16 @@ use sp_core::{traits::CallContext, Bytes}; use sp_rpc::list::ListOrValue; use sp_runtime::traits::Block as BlockT; use std::{marker::PhantomData, sync::Arc, time::Duration}; +use tokio::sync::mpsc; pub(crate) const LOG_TARGET: &str = "rpc-spec-v2"; +/// The buffer capacity for each storage query. +/// +/// This is small because the underlying JSON-RPC server has +/// its down buffer capacity per connection as well. +const STORAGE_QUERY_BUF: usize = 16; + /// The configuration of [`ChainHead`]. pub struct ChainHeadConfig { /// The maximum number of pinned blocks across all subscriptions. @@ -65,9 +73,6 @@ pub struct ChainHeadConfig { /// Stop all subscriptions if the distance between the leaves and the current finalized /// block is larger than this value. pub max_lagging_distance: usize, - /// The maximum number of items reported by the `chainHead_storage` before - /// pagination is required. - pub operation_max_storage_items: usize, /// The maximum number of `chainHead_follow` subscriptions per connection. pub max_follow_subscriptions_per_connection: usize, } @@ -87,10 +92,6 @@ const MAX_PINNED_DURATION: Duration = Duration::from_secs(60); /// Note: The lower limit imposed by the spec is 16. const MAX_ONGOING_OPERATIONS: usize = 16; -/// The maximum number of items the `chainHead_storage` can return -/// before paginations is required. -const MAX_STORAGE_ITER_ITEMS: usize = 5; - /// Stop all subscriptions if the distance between the leaves and the current finalized /// block is larger than this value. const MAX_LAGGING_DISTANCE: usize = 128; @@ -105,7 +106,6 @@ impl Default for ChainHeadConfig { subscription_max_pinned_duration: MAX_PINNED_DURATION, subscription_max_ongoing_operations: MAX_ONGOING_OPERATIONS, max_lagging_distance: MAX_LAGGING_DISTANCE, - operation_max_storage_items: MAX_STORAGE_ITER_ITEMS, max_follow_subscriptions_per_connection: MAX_FOLLOW_SUBSCRIPTIONS_PER_CONNECTION, } } @@ -121,9 +121,6 @@ pub struct ChainHead, Block: BlockT, Client> { executor: SubscriptionTaskExecutor, /// Keep track of the pinned blocks for each subscription. subscriptions: SubscriptionManagement, - /// The maximum number of items reported by the `chainHead_storage` before - /// pagination is required. - operation_max_storage_items: usize, /// Stop all subscriptions if the distance between the leaves and the current finalized /// block is larger than this value. max_lagging_distance: usize, @@ -150,7 +147,6 @@ impl, Block: BlockT, Client> ChainHead { config.max_follow_subscriptions_per_connection, backend, ), - operation_max_storage_items: config.operation_max_storage_items, max_lagging_distance: config.max_lagging_distance, _phantom: PhantomData, } @@ -315,7 +311,7 @@ where }), }; - let (rp, rp_fut) = method_started_response(operation_id, None); + let (rp, rp_fut) = method_started_response(operation_id); let fut = async move { // Wait for the server to send out the response and if it produces an error no event // should be generated. @@ -323,7 +319,7 @@ where return; } - let _ = block_guard.response_sender().unbounded_send(event); + let _ = block_guard.response_sender().send(event).await; }; executor.spawn_blocking("substrate-rpc-subscription", Some("rpc"), fut.boxed()); @@ -427,20 +423,10 @@ where Err(_) => return ResponsePayload::error(ChainHeadRpcError::InvalidBlock), }; - let mut storage_client = ChainHeadStorage::::new( - self.client.clone(), - self.operation_max_storage_items, - ); - let operation = block_guard.operation(); - let operation_id = operation.operation_id(); + let mut storage_client = ChainHeadStorage::::new(self.client.clone()); - // The number of operations we are allowed to execute. - let num_operations = operation.num_reserved(); - let discarded = items.len().saturating_sub(num_operations); - let mut items = items; - items.truncate(num_operations); + let (rp, rp_fut) = method_started_response(block_guard.operation().operation_id()); - let (rp, rp_fut) = method_started_response(operation_id, Some(discarded)); let fut = async move { // Wait for the server to send out the response and if it produces an error no event // should be generated. @@ -448,10 +434,20 @@ where return; } - storage_client.generate_events(block_guard, hash, items, child_trie).await; + let (tx, rx) = tokio::sync::mpsc::channel(STORAGE_QUERY_BUF); + let operation_id = block_guard.operation().operation_id(); + let stop_handle = block_guard.operation().stop_handle().clone(); + let response_sender = block_guard.response_sender(); + + // May fail if the channel is closed or the connection is closed. + // which is okay to ignore. + let _ = futures::future::join( + storage_client.generate_events(hash, items, child_trie, tx), + process_storage_items(rx, response_sender, operation_id, &stop_handle), + ) + .await; }; - self.executor - .spawn_blocking("substrate-rpc-subscription", Some("rpc"), fut.boxed()); + self.executor.spawn("substrate-rpc-subscription", Some("rpc"), fut.boxed()); rp } @@ -504,7 +500,7 @@ where let operation_id = block_guard.operation().operation_id(); let client = self.client.clone(); - let (rp, rp_fut) = method_started_response(operation_id.clone(), None); + let (rp, rp_fut) = method_started_response(operation_id.clone()); let fut = async move { // Wait for the server to send out the response and if it produces an error no event // should be generated. @@ -528,7 +524,7 @@ where }) }); - let _ = block_guard.response_sender().unbounded_send(event); + let _ = block_guard.response_sender().send(event).await; }; self.executor .spawn_blocking("substrate-rpc-subscription", Some("rpc"), fut.boxed()); @@ -589,13 +585,9 @@ where return Ok(()) } - let Some(operation) = self.subscriptions.get_operation(&follow_subscription, &operation_id) - else { - return Ok(()) - }; - - if !operation.submit_continue() { - // Continue called without generating a `WaitingForContinue` event. + // WaitingForContinue event is never emitted, in such cases + // emit an `InvalidContinue error`. + if self.subscriptions.get_operation(&follow_subscription, &operation_id).is_some() { Err(ChainHeadRpcError::InvalidContinue.into()) } else { Ok(()) @@ -617,12 +609,13 @@ where return Ok(()) } - let Some(operation) = self.subscriptions.get_operation(&follow_subscription, &operation_id) + let Some(mut operation) = + self.subscriptions.get_operation(&follow_subscription, &operation_id) else { return Ok(()) }; - operation.stop_operation(); + operation.stop(); Ok(()) } @@ -630,9 +623,8 @@ where fn method_started_response( operation_id: String, - discarded_items: Option, ) -> (ResponsePayload<'static, MethodResponse>, MethodResponseFuture) { - let rp = MethodResponse::Started(MethodResponseStarted { operation_id, discarded_items }); + let rp = MethodResponse::Started(MethodResponseStarted { operation_id, discarded_items: None }); ResponsePayload::success(rp).notify_on_completion() } @@ -658,3 +650,46 @@ where rx } + +async fn process_storage_items( + mut storage_query_stream: mpsc::Receiver, + mut sender: FollowEventSender, + operation_id: String, + stop_handle: &StopHandle, +) -> Result<(), FollowEventSendError> { + loop { + tokio::select! { + _ = stop_handle.stopped() => { + break; + }, + + maybe_storage = storage_query_stream.recv() => { + let Some(storage) = maybe_storage else { + break; + }; + + let item = match storage { + QueryResult::Err(error) => { + return sender + .send(FollowEvent::OperationError(OperationError { operation_id, error })) + .await + } + QueryResult::Ok(Some(v)) => v, + QueryResult::Ok(None) => continue, + }; + + sender + .send(FollowEvent::OperationStorageItems(OperationStorageItems { + operation_id: operation_id.clone(), + items: vec![item], + })).await?; + }, + } + } + + sender + .send(FollowEvent::OperationStorageDone(OperationId { operation_id })) + .await?; + + Ok(()) +} diff --git a/substrate/client/rpc-spec-v2/src/chain_head/chain_head_storage.rs b/substrate/client/rpc-spec-v2/src/chain_head/chain_head_storage.rs index ee39ec253a30..936117e66f98 100644 --- a/substrate/client/rpc-spec-v2/src/chain_head/chain_head_storage.rs +++ b/substrate/client/rpc-spec-v2/src/chain_head/chain_head_storage.rs @@ -18,45 +18,34 @@ //! Implementation of the `chainHead_storage` method. -use std::{collections::VecDeque, marker::PhantomData, sync::Arc}; +use std::{marker::PhantomData, sync::Arc}; use sc_client_api::{Backend, ChildInfo, StorageKey, StorageProvider}; -use sc_utils::mpsc::TracingUnboundedSender; use sp_runtime::traits::Block as BlockT; +use tokio::sync::mpsc; -use crate::{ - chain_head::{ - event::{OperationError, OperationId, OperationStorageItems}, - subscription::BlockGuard, - FollowEvent, - }, - common::{ - events::{StorageQuery, StorageQueryType}, - storage::{IterQueryType, QueryIter, QueryIterResult, Storage}, - }, +use crate::common::{ + events::{StorageQuery, StorageQueryType}, + storage::{IterQueryType, QueryIter, QueryResult, Storage}, }; /// Generates the events of the `chainHead_storage` method. pub struct ChainHeadStorage { /// Storage client. client: Storage, - /// Queue of operations that may require pagination. - iter_operations: VecDeque, - /// The maximum number of items reported by the `chainHead_storage` before - /// pagination is required. - operation_max_storage_items: usize, _phandom: PhantomData<(BE, Block)>, } +impl Clone for ChainHeadStorage { + fn clone(&self) -> Self { + Self { client: self.client.clone(), _phandom: PhantomData } + } +} + impl ChainHeadStorage { /// Constructs a new [`ChainHeadStorage`]. - pub fn new(client: Arc, operation_max_storage_items: usize) -> Self { - Self { - client: Storage::new(client), - iter_operations: VecDeque::new(), - operation_max_storage_items, - _phandom: PhantomData, - } + pub fn new(client: Arc) -> Self { + Self { client: Storage::new(client), _phandom: PhantomData } } } @@ -64,146 +53,71 @@ impl ChainHeadStorage where Block: BlockT + 'static, BE: Backend + 'static, - Client: StorageProvider + 'static, + Client: StorageProvider + Send + Sync + 'static, { - /// Iterate over (key, hash) and (key, value) generating the `WaitingForContinue` event if - /// necessary. - async fn generate_storage_iter_events( - &mut self, - mut block_guard: BlockGuard, - hash: Block::Hash, - child_key: Option, - ) { - let sender = block_guard.response_sender(); - let operation = block_guard.operation(); - - while let Some(query) = self.iter_operations.pop_front() { - if operation.was_stopped() { - return - } - - let result = self.client.query_iter_pagination( - query, - hash, - child_key.as_ref(), - self.operation_max_storage_items, - ); - let (events, maybe_next_query) = match result { - QueryIterResult::Ok(result) => result, - QueryIterResult::Err(error) => { - send_error::(&sender, operation.operation_id(), error.to_string()); - return - }, - }; - - if !events.is_empty() { - // Send back the results of the iteration produced so far. - let _ = sender.unbounded_send(FollowEvent::::OperationStorageItems( - OperationStorageItems { operation_id: operation.operation_id(), items: events }, - )); - } - - if let Some(next_query) = maybe_next_query { - let _ = - sender.unbounded_send(FollowEvent::::OperationWaitingForContinue( - OperationId { operation_id: operation.operation_id() }, - )); - - // The operation might be continued or cancelled only after the - // `OperationWaitingForContinue` is generated above. - operation.wait_for_continue().await; - - // Give a chance for the other items to advance next time. - self.iter_operations.push_back(next_query); - } - } - - if operation.was_stopped() { - return - } - - let _ = - sender.unbounded_send(FollowEvent::::OperationStorageDone(OperationId { - operation_id: operation.operation_id(), - })); - } - /// Generate the block events for the `chainHead_storage` method. pub async fn generate_events( &mut self, - mut block_guard: BlockGuard, hash: Block::Hash, items: Vec>, child_key: Option, - ) { - let sender = block_guard.response_sender(); - let operation = block_guard.operation(); - - let mut storage_results = Vec::with_capacity(items.len()); - for item in items { - match item.query_type { - StorageQueryType::Value => { - match self.client.query_value(hash, &item.key, child_key.as_ref()) { - Ok(Some(value)) => storage_results.push(value), - Ok(None) => continue, - Err(error) => { - send_error::(&sender, operation.operation_id(), error); - return - }, - } - }, - StorageQueryType::Hash => - match self.client.query_hash(hash, &item.key, child_key.as_ref()) { - Ok(Some(value)) => storage_results.push(value), - Ok(None) => continue, - Err(error) => { - send_error::(&sender, operation.operation_id(), error); - return - }, + tx: mpsc::Sender, + ) -> Result<(), tokio::task::JoinError> { + let this = self.clone(); + + tokio::task::spawn_blocking(move || { + for item in items { + match item.query_type { + StorageQueryType::Value => { + let rp = this.client.query_value(hash, &item.key, child_key.as_ref()); + if tx.blocking_send(rp).is_err() { + break; + } }, - StorageQueryType::ClosestDescendantMerkleValue => - match self.client.query_merkle_value(hash, &item.key, child_key.as_ref()) { - Ok(Some(value)) => storage_results.push(value), - Ok(None) => continue, - Err(error) => { - send_error::(&sender, operation.operation_id(), error); - return - }, + StorageQueryType::Hash => { + let rp = this.client.query_hash(hash, &item.key, child_key.as_ref()); + if tx.blocking_send(rp).is_err() { + break; + } }, - StorageQueryType::DescendantsValues => self.iter_operations.push_back(QueryIter { - query_key: item.key, - ty: IterQueryType::Value, - pagination_start_key: None, - }), - StorageQueryType::DescendantsHashes => self.iter_operations.push_back(QueryIter { - query_key: item.key, - ty: IterQueryType::Hash, - pagination_start_key: None, - }), - }; - } - - if !storage_results.is_empty() { - let _ = sender.unbounded_send(FollowEvent::::OperationStorageItems( - OperationStorageItems { - operation_id: operation.operation_id(), - items: storage_results, - }, - )); - } + StorageQueryType::ClosestDescendantMerkleValue => { + let rp = + this.client.query_merkle_value(hash, &item.key, child_key.as_ref()); + if tx.blocking_send(rp).is_err() { + break; + } + }, + StorageQueryType::DescendantsValues => { + let query = QueryIter { + query_key: item.key, + ty: IterQueryType::Value, + pagination_start_key: None, + }; + this.client.query_iter_pagination_with_producer( + query, + hash, + child_key.as_ref(), + &tx, + ) + }, + StorageQueryType::DescendantsHashes => { + let query = QueryIter { + query_key: item.key, + ty: IterQueryType::Hash, + pagination_start_key: None, + }; + this.client.query_iter_pagination_with_producer( + query, + hash, + child_key.as_ref(), + &tx, + ) + }, + } + } + }) + .await?; - self.generate_storage_iter_events(block_guard, hash, child_key).await + Ok(()) } } - -/// Build and send the opaque error back to the `chainHead_follow` method. -fn send_error( - sender: &TracingUnboundedSender>, - operation_id: String, - error: String, -) { - let _ = sender.unbounded_send(FollowEvent::::OperationError(OperationError { - operation_id, - error, - })); -} diff --git a/substrate/client/rpc-spec-v2/src/chain_head/mod.rs b/substrate/client/rpc-spec-v2/src/chain_head/mod.rs index c9fe19aca2b1..98ddfbbdc63f 100644 --- a/substrate/client/rpc-spec-v2/src/chain_head/mod.rs +++ b/substrate/client/rpc-spec-v2/src/chain_head/mod.rs @@ -42,3 +42,10 @@ pub use event::{ BestBlockChanged, ErrorEvent, Finalized, FollowEvent, Initialized, NewBlock, RuntimeEvent, RuntimeVersionEvent, }; + +/// Follow event sender. +pub(crate) type FollowEventSender = futures::channel::mpsc::Sender>; +/// Follow event receiver. +pub(crate) type FollowEventReceiver = futures::channel::mpsc::Receiver>; +/// Follow event send error. +pub(crate) type FollowEventSendError = futures::channel::mpsc::SendError; diff --git a/substrate/client/rpc-spec-v2/src/chain_head/subscription/inner.rs b/substrate/client/rpc-spec-v2/src/chain_head/subscription/inner.rs index d4d616f54dc8..0d88d9572539 100644 --- a/substrate/client/rpc-spec-v2/src/chain_head/subscription/inner.rs +++ b/substrate/client/rpc-spec-v2/src/chain_head/subscription/inner.rs @@ -19,18 +19,25 @@ use futures::channel::oneshot; use parking_lot::Mutex; use sc_client_api::Backend; -use sc_utils::mpsc::{tracing_unbounded, TracingUnboundedReceiver, TracingUnboundedSender}; use sp_runtime::traits::Block as BlockT; use std::{ collections::{hash_map::Entry, HashMap, HashSet}, - sync::{atomic::AtomicBool, Arc}, + sync::Arc, time::{Duration, Instant}, }; -use crate::chain_head::{subscription::SubscriptionManagementError, FollowEvent}; +use crate::chain_head::{ + subscription::SubscriptionManagementError, FollowEventReceiver, FollowEventSender, +}; + +type NotifyOnDrop = tokio::sync::mpsc::Receiver<()>; +type SharedOperations = Arc>>; -/// The queue size after which the `sc_utils::mpsc::tracing_unbounded` would produce warnings. -const QUEUE_SIZE_WARNING: usize = 512; +/// The buffer capacity for each subscription +/// +/// Beware of that the JSON-RPC server has a global +/// buffer per connection and this a extra buffer. +const BUF_CAP_PER_SUBSCRIPTION: usize = 16; /// The state machine of a block of a single subscription ID. /// @@ -138,7 +145,7 @@ impl LimitOperations { .try_acquire_many_owned(num_ops.try_into().ok()?) .ok()?; - Some(PermitOperations { num_ops, _permit: permits }) + Some(permits) } } @@ -148,79 +155,36 @@ impl LimitOperations { /// to guarantee the RPC server can execute the number of operations. /// /// The number of reserved items are given back to the [`LimitOperations`] on drop. -struct PermitOperations { - /// The number of operations permitted (reserved). - num_ops: usize, - /// The permit for these operations. - _permit: tokio::sync::OwnedSemaphorePermit, -} +type PermitOperations = tokio::sync::OwnedSemaphorePermit; -/// The state of one operation. -/// -/// This is directly exposed to users via `chain_head_unstable_continue` and -/// `chain_head_unstable_stop_operation`. +/// Stop handle for the operation. #[derive(Clone)] -pub struct OperationState { - /// The shared operation state that holds information about the - /// `waitingForContinue` event and cancellation. - shared_state: Arc, - /// Send notifications when the user calls `chainHead_continue` method. - send_continue: tokio::sync::mpsc::Sender<()>, -} +pub struct StopHandle(tokio::sync::mpsc::Sender<()>); -impl OperationState { - /// Returns true if `chainHead_continue` is called after the - /// `waitingForContinue` event was emitted for the associated - /// operation ID. - pub fn submit_continue(&self) -> bool { - // `waitingForContinue` not generated. - if !self.shared_state.requested_continue.load(std::sync::atomic::Ordering::Acquire) { - return false - } - - // Has enough capacity for 1 message. - // Can fail if the `stop_operation` propagated the stop first. - self.send_continue.try_send(()).is_ok() +impl StopHandle { + pub async fn stopped(&self) { + self.0.closed().await; } - /// Stops the operation if `waitingForContinue` event was emitted for the associated - /// operation ID. - /// - /// Returns nothing in accordance with `chainHead_v1_stopOperation`. - pub fn stop_operation(&self) { - // `waitingForContinue` not generated. - if !self.shared_state.requested_continue.load(std::sync::atomic::Ordering::Acquire) { - return - } - - self.shared_state - .operation_stopped - .store(true, std::sync::atomic::Ordering::Release); - - // Send might not have enough capacity if `submit_continue` was sent first. - // However, the `operation_stopped` boolean was set. - let _ = self.send_continue.try_send(()); + pub fn is_stopped(&self) -> bool { + self.0.is_closed() } } /// The shared operation state between the backend [`RegisteredOperation`] and frontend /// [`RegisteredOperation`]. -struct SharedOperationState { - /// True if the `chainHead` generated `waitingForContinue` event. - requested_continue: AtomicBool, - /// True if the operation was cancelled by the user. - operation_stopped: AtomicBool, +#[derive(Clone)] +pub struct OperationState { + stop: StopHandle, + operations: SharedOperations, + operation_id: String, } -impl SharedOperationState { - /// Constructs a new [`SharedOperationState`]. - /// - /// This is efficiently cloned under a single heap allocation. - fn new() -> Arc { - Arc::new(SharedOperationState { - requested_continue: AtomicBool::new(false), - operation_stopped: AtomicBool::new(false), - }) +impl OperationState { + pub fn stop(&mut self) { + if !self.stop.is_stopped() { + self.operations.lock().remove(&self.operation_id); + } } } @@ -228,59 +192,31 @@ impl SharedOperationState { /// /// This is used internally by the `chainHead` methods. pub struct RegisteredOperation { - /// The shared operation state that holds information about the - /// `waitingForContinue` event and cancellation. - shared_state: Arc, - /// Receive notifications when the user calls `chainHead_continue` method. - recv_continue: tokio::sync::mpsc::Receiver<()>, + /// Stop handle for the operation. + stop_handle: StopHandle, + /// Track the operations ID of this subscription. + operations: SharedOperations, /// The operation ID of the request. operation_id: String, - /// Track the operations ID of this subscription. - operations: Arc>>, /// Permit a number of items to be executed by this operation. - permit: PermitOperations, + _permit: PermitOperations, } impl RegisteredOperation { - /// Wait until the user calls `chainHead_continue` or the operation - /// is cancelled via `chainHead_stopOperation`. - pub async fn wait_for_continue(&mut self) { - self.shared_state - .requested_continue - .store(true, std::sync::atomic::Ordering::Release); - - // The sender part of this channel is around for as long as this object exists, - // because it is stored in the `OperationState` of the `operations` field. - // The sender part is removed from tracking when this object is dropped. - let _ = self.recv_continue.recv().await; - - self.shared_state - .requested_continue - .store(false, std::sync::atomic::Ordering::Release); - } - - /// Returns true if the current operation was stopped. - pub fn was_stopped(&self) -> bool { - self.shared_state.operation_stopped.load(std::sync::atomic::Ordering::Acquire) + /// Stop handle for the operation. + pub fn stop_handle(&self) -> &StopHandle { + &self.stop_handle } /// Get the operation ID. pub fn operation_id(&self) -> String { self.operation_id.clone() } - - /// Returns the number of reserved elements for this permit. - /// - /// This can be smaller than the number of items requested via [`LimitOperations::reserve()`]. - pub fn num_reserved(&self) -> usize { - self.permit.num_ops - } } impl Drop for RegisteredOperation { fn drop(&mut self) { - let mut operations = self.operations.lock(); - operations.remove(&self.operation_id); + self.operations.lock().remove(&self.operation_id); } } @@ -291,7 +227,7 @@ struct Operations { /// Limit the number of ongoing operations. limits: LimitOperations, /// Track the operations ID of this subscription. - operations: Arc>>, + operations: SharedOperations, } impl Operations { @@ -307,25 +243,25 @@ impl Operations { /// Register a new operation. pub fn register_operation(&mut self, to_reserve: usize) -> Option { let permit = self.limits.reserve_at_most(to_reserve)?; - let operation_id = self.next_operation_id(); - // At most one message can be sent. - let (send_continue, recv_continue) = tokio::sync::mpsc::channel(1); - let shared_state = SharedOperationState::new(); - - let state = OperationState { send_continue, shared_state: shared_state.clone() }; - - // Cloned operations for removing the current ID on drop. + let (tx, rx) = tokio::sync::mpsc::channel(1); + let stop_handle = StopHandle(tx); let operations = self.operations.clone(); - operations.lock().insert(operation_id.clone(), state); + operations.lock().insert(operation_id.clone(), (rx, stop_handle.clone())); - Some(RegisteredOperation { shared_state, operation_id, recv_continue, operations, permit }) + Some(RegisteredOperation { stop_handle, operation_id, operations, _permit: permit }) } /// Get the associated operation state with the ID. pub fn get_operation(&self, id: &str) -> Option { - self.operations.lock().get(id).map(|state| state.clone()) + let stop = self.operations.lock().get(id).map(|(_, stop)| stop.clone())?; + + Some(OperationState { + stop, + operations: self.operations.clone(), + operation_id: id.to_string(), + }) } /// Generate the next operation ID for this subscription. @@ -352,7 +288,7 @@ struct SubscriptionState { /// The sender of message responses to the `chainHead_follow` events. /// /// This object is cloned between methods. - response_sender: TracingUnboundedSender>, + response_sender: FollowEventSender, /// The ongoing operations of a subscription. operations: Operations, /// Track the block hashes available for this subscription. @@ -486,7 +422,7 @@ impl SubscriptionState { pub struct BlockGuard> { hash: Block::Hash, with_runtime: bool, - response_sender: TracingUnboundedSender>, + response_sender: FollowEventSender, operation: RegisteredOperation, backend: Arc, } @@ -504,7 +440,7 @@ impl> BlockGuard { fn new( hash: Block::Hash, with_runtime: bool, - response_sender: TracingUnboundedSender>, + response_sender: FollowEventSender, operation: RegisteredOperation, backend: Arc, ) -> Result { @@ -521,7 +457,7 @@ impl> BlockGuard { } /// Send message responses from the `chainHead` methods to `chainHead_follow`. - pub fn response_sender(&self) -> TracingUnboundedSender> { + pub fn response_sender(&self) -> FollowEventSender { self.response_sender.clone() } @@ -543,7 +479,7 @@ pub struct InsertedSubscriptionData { /// Signal that the subscription must stop. pub rx_stop: oneshot::Receiver<()>, /// Receive message responses from the `chainHead` methods. - pub response_receiver: TracingUnboundedReceiver>, + pub response_receiver: FollowEventReceiver, } pub struct SubscriptionsInner> { @@ -594,7 +530,7 @@ impl> SubscriptionsInner { if let Entry::Vacant(entry) = self.subs.entry(sub_id) { let (tx_stop, rx_stop) = oneshot::channel(); let (response_sender, response_receiver) = - tracing_unbounded("chain-head-method-responses", QUEUE_SIZE_WARNING); + futures::channel::mpsc::channel(BUF_CAP_PER_SUBSCRIPTION); let state = SubscriptionState:: { with_runtime, tx_stop: Some(tx_stop), @@ -972,8 +908,7 @@ mod tests { #[test] fn sub_state_register_twice() { - let (response_sender, _response_receiver) = - tracing_unbounded("test-chain-head-method-responses", QUEUE_SIZE_WARNING); + let (response_sender, _response_receiver) = futures::channel::mpsc::channel(1); let mut sub_state = SubscriptionState:: { with_runtime: false, tx_stop: None, @@ -1001,8 +936,7 @@ mod tests { #[test] fn sub_state_register_unregister() { - let (response_sender, _response_receiver) = - tracing_unbounded("test-chain-head-method-responses", QUEUE_SIZE_WARNING); + let (response_sender, _response_receiver) = futures::channel::mpsc::channel(1); let mut sub_state = SubscriptionState:: { with_runtime: false, tx_stop: None, @@ -1349,12 +1283,12 @@ mod tests { // One operation is reserved. let permit_one = ops.reserve_at_most(1).unwrap(); - assert_eq!(permit_one.num_ops, 1); + assert_eq!(permit_one.num_permits(), 1); // Request 2 operations, however there is capacity only for one. let permit_two = ops.reserve_at_most(2).unwrap(); // Number of reserved permits is smaller than provided. - assert_eq!(permit_two.num_ops, 1); + assert_eq!(permit_two.num_permits(), 1); // Try to reserve operations when there's no space. let permit = ops.reserve_at_most(1); @@ -1365,7 +1299,7 @@ mod tests { // Can reserve again let permit_three = ops.reserve_at_most(1).unwrap(); - assert_eq!(permit_three.num_ops, 1); + assert_eq!(permit_three.num_permits(), 1); } #[test] diff --git a/substrate/client/rpc-spec-v2/src/chain_head/subscription/mod.rs b/substrate/client/rpc-spec-v2/src/chain_head/subscription/mod.rs index f266c9d8b34f..84d1b8f8f9b7 100644 --- a/substrate/client/rpc-spec-v2/src/chain_head/subscription/mod.rs +++ b/substrate/client/rpc-spec-v2/src/chain_head/subscription/mod.rs @@ -34,7 +34,7 @@ use self::inner::SubscriptionsInner; pub use self::inner::OperationState; pub use error::SubscriptionManagementError; -pub use inner::{BlockGuard, InsertedSubscriptionData}; +pub use inner::{BlockGuard, InsertedSubscriptionData, StopHandle}; /// Manage block pinning / unpinning for subscription IDs. pub struct SubscriptionManagement> { diff --git a/substrate/client/rpc-spec-v2/src/chain_head/tests.rs b/substrate/client/rpc-spec-v2/src/chain_head/tests.rs index b195e05b6649..8e9ff4a2ee3d 100644 --- a/substrate/client/rpc-spec-v2/src/chain_head/tests.rs +++ b/substrate/client/rpc-spec-v2/src/chain_head/tests.rs @@ -33,12 +33,12 @@ use jsonrpsee::{ }; use sc_block_builder::BlockBuilderBuilder; use sc_client_api::ChildInfo; +use sc_rpc::testing::TokioTestExecutor; use sc_service::client::new_in_mem; use sp_blockchain::HeaderBackend; use sp_consensus::BlockOrigin; use sp_core::{ storage::well_known_keys::{self, CODE}, - testing::TaskExecutor, Blake2Hasher, Hasher, }; use sp_runtime::traits::Block as BlockT; @@ -60,7 +60,6 @@ type Block = substrate_test_runtime_client::runtime::Block; const MAX_PINNED_BLOCKS: usize = 32; const MAX_PINNED_SECS: u64 = 60; const MAX_OPERATIONS: usize = 16; -const MAX_PAGINATION_LIMIT: usize = 5; const MAX_LAGGING_DISTANCE: usize = 128; const MAX_FOLLOW_SUBSCRIPTIONS_PER_CONNECTION: usize = 4; @@ -80,12 +79,11 @@ pub async fn run_server() -> std::net::SocketAddr { let api = ChainHead::new( client, backend, - Arc::new(TaskExecutor::default()), + Arc::new(TokioTestExecutor::default()), ChainHeadConfig { global_max_pinned_blocks: MAX_PINNED_BLOCKS, subscription_max_pinned_duration: Duration::from_secs(MAX_PINNED_SECS), subscription_max_ongoing_operations: MAX_OPERATIONS, - operation_max_storage_items: MAX_PAGINATION_LIMIT, max_follow_subscriptions_per_connection: 1, max_lagging_distance: MAX_LAGGING_DISTANCE, }, @@ -142,12 +140,11 @@ async fn setup_api() -> ( let api = ChainHead::new( client.clone(), backend, - Arc::new(TaskExecutor::default()), + Arc::new(TokioTestExecutor::default()), ChainHeadConfig { global_max_pinned_blocks: MAX_PINNED_BLOCKS, subscription_max_pinned_duration: Duration::from_secs(MAX_PINNED_SECS), subscription_max_ongoing_operations: MAX_OPERATIONS, - operation_max_storage_items: MAX_PAGINATION_LIMIT, max_lagging_distance: MAX_LAGGING_DISTANCE, max_follow_subscriptions_per_connection: MAX_FOLLOW_SUBSCRIPTIONS_PER_CONNECTION, }, @@ -250,13 +247,11 @@ async fn follow_subscription_produces_blocks() { let api = ChainHead::new( client.clone(), backend, - Arc::new(TaskExecutor::default()), + Arc::new(TokioTestExecutor::default()), ChainHeadConfig { global_max_pinned_blocks: MAX_PINNED_BLOCKS, subscription_max_pinned_duration: Duration::from_secs(MAX_PINNED_SECS), subscription_max_ongoing_operations: MAX_OPERATIONS, - operation_max_storage_items: MAX_PAGINATION_LIMIT, - max_lagging_distance: MAX_LAGGING_DISTANCE, max_follow_subscriptions_per_connection: MAX_FOLLOW_SUBSCRIPTIONS_PER_CONNECTION, }, @@ -321,13 +316,11 @@ async fn follow_with_runtime() { let api = ChainHead::new( client.clone(), backend, - Arc::new(TaskExecutor::default()), + Arc::new(TokioTestExecutor::default()), ChainHeadConfig { global_max_pinned_blocks: MAX_PINNED_BLOCKS, subscription_max_pinned_duration: Duration::from_secs(MAX_PINNED_SECS), subscription_max_ongoing_operations: MAX_OPERATIONS, - operation_max_storage_items: MAX_PAGINATION_LIMIT, - max_lagging_distance: MAX_LAGGING_DISTANCE, max_follow_subscriptions_per_connection: MAX_FOLLOW_SUBSCRIPTIONS_PER_CONNECTION, }, @@ -631,13 +624,11 @@ async fn call_runtime_without_flag() { let api = ChainHead::new( client.clone(), backend, - Arc::new(TaskExecutor::default()), + Arc::new(TokioTestExecutor::default()), ChainHeadConfig { global_max_pinned_blocks: MAX_PINNED_BLOCKS, subscription_max_pinned_duration: Duration::from_secs(MAX_PINNED_SECS), subscription_max_ongoing_operations: MAX_OPERATIONS, - operation_max_storage_items: MAX_PAGINATION_LIMIT, - max_lagging_distance: MAX_LAGGING_DISTANCE, max_follow_subscriptions_per_connection: MAX_FOLLOW_SUBSCRIPTIONS_PER_CONNECTION, }, @@ -1292,13 +1283,11 @@ async fn separate_operation_ids_for_subscriptions() { let api = ChainHead::new( client.clone(), backend, - Arc::new(TaskExecutor::default()), + Arc::new(TokioTestExecutor::default()), ChainHeadConfig { global_max_pinned_blocks: MAX_PINNED_BLOCKS, subscription_max_pinned_duration: Duration::from_secs(MAX_PINNED_SECS), subscription_max_ongoing_operations: MAX_OPERATIONS, - operation_max_storage_items: MAX_PAGINATION_LIMIT, - max_lagging_distance: MAX_LAGGING_DISTANCE, max_follow_subscriptions_per_connection: MAX_FOLLOW_SUBSCRIPTIONS_PER_CONNECTION, }, @@ -1380,13 +1369,11 @@ async fn follow_generates_initial_blocks() { let api = ChainHead::new( client.clone(), backend, - Arc::new(TaskExecutor::default()), + Arc::new(TokioTestExecutor::default()), ChainHeadConfig { global_max_pinned_blocks: MAX_PINNED_BLOCKS, subscription_max_pinned_duration: Duration::from_secs(MAX_PINNED_SECS), subscription_max_ongoing_operations: MAX_OPERATIONS, - operation_max_storage_items: MAX_PAGINATION_LIMIT, - max_lagging_distance: MAX_LAGGING_DISTANCE, max_follow_subscriptions_per_connection: MAX_FOLLOW_SUBSCRIPTIONS_PER_CONNECTION, }, @@ -1538,13 +1525,11 @@ async fn follow_exceeding_pinned_blocks() { let api = ChainHead::new( client.clone(), backend, - Arc::new(TaskExecutor::default()), + Arc::new(TokioTestExecutor::default()), ChainHeadConfig { global_max_pinned_blocks: 2, subscription_max_pinned_duration: Duration::from_secs(MAX_PINNED_SECS), subscription_max_ongoing_operations: MAX_OPERATIONS, - operation_max_storage_items: MAX_PAGINATION_LIMIT, - max_lagging_distance: MAX_LAGGING_DISTANCE, max_follow_subscriptions_per_connection: MAX_FOLLOW_SUBSCRIPTIONS_PER_CONNECTION, }, @@ -1617,13 +1602,11 @@ async fn follow_with_unpin() { let api = ChainHead::new( client.clone(), backend, - Arc::new(TaskExecutor::default()), + Arc::new(TokioTestExecutor::default()), ChainHeadConfig { global_max_pinned_blocks: 2, subscription_max_pinned_duration: Duration::from_secs(MAX_PINNED_SECS), subscription_max_ongoing_operations: MAX_OPERATIONS, - operation_max_storage_items: MAX_PAGINATION_LIMIT, - max_lagging_distance: MAX_LAGGING_DISTANCE, max_follow_subscriptions_per_connection: MAX_FOLLOW_SUBSCRIPTIONS_PER_CONNECTION, }, @@ -1725,13 +1708,11 @@ async fn unpin_duplicate_hashes() { let api = ChainHead::new( client.clone(), backend, - Arc::new(TaskExecutor::default()), + Arc::new(TokioTestExecutor::default()), ChainHeadConfig { global_max_pinned_blocks: 3, subscription_max_pinned_duration: Duration::from_secs(MAX_PINNED_SECS), subscription_max_ongoing_operations: MAX_OPERATIONS, - operation_max_storage_items: MAX_PAGINATION_LIMIT, - max_lagging_distance: MAX_LAGGING_DISTANCE, max_follow_subscriptions_per_connection: MAX_FOLLOW_SUBSCRIPTIONS_PER_CONNECTION, }, @@ -1830,13 +1811,11 @@ async fn follow_with_multiple_unpin_hashes() { let api = ChainHead::new( client.clone(), backend, - Arc::new(TaskExecutor::default()), + Arc::new(TokioTestExecutor::default()), ChainHeadConfig { global_max_pinned_blocks: MAX_PINNED_BLOCKS, subscription_max_pinned_duration: Duration::from_secs(MAX_PINNED_SECS), subscription_max_ongoing_operations: MAX_OPERATIONS, - operation_max_storage_items: MAX_PAGINATION_LIMIT, - max_lagging_distance: MAX_LAGGING_DISTANCE, max_follow_subscriptions_per_connection: MAX_FOLLOW_SUBSCRIPTIONS_PER_CONNECTION, }, @@ -1977,13 +1956,11 @@ async fn follow_prune_best_block() { let api = ChainHead::new( client.clone(), backend, - Arc::new(TaskExecutor::default()), + Arc::new(TokioTestExecutor::default()), ChainHeadConfig { global_max_pinned_blocks: MAX_PINNED_BLOCKS, subscription_max_pinned_duration: Duration::from_secs(MAX_PINNED_SECS), subscription_max_ongoing_operations: MAX_OPERATIONS, - operation_max_storage_items: MAX_PAGINATION_LIMIT, - max_lagging_distance: MAX_LAGGING_DISTANCE, max_follow_subscriptions_per_connection: MAX_FOLLOW_SUBSCRIPTIONS_PER_CONNECTION, }, @@ -2165,13 +2142,11 @@ async fn follow_forks_pruned_block() { let api = ChainHead::new( client.clone(), backend, - Arc::new(TaskExecutor::default()), + Arc::new(TokioTestExecutor::default()), ChainHeadConfig { global_max_pinned_blocks: MAX_PINNED_BLOCKS, subscription_max_pinned_duration: Duration::from_secs(MAX_PINNED_SECS), subscription_max_ongoing_operations: MAX_OPERATIONS, - operation_max_storage_items: MAX_PAGINATION_LIMIT, - max_lagging_distance: MAX_LAGGING_DISTANCE, max_follow_subscriptions_per_connection: MAX_FOLLOW_SUBSCRIPTIONS_PER_CONNECTION, }, @@ -2327,13 +2302,11 @@ async fn follow_report_multiple_pruned_block() { let api = ChainHead::new( client.clone(), backend, - Arc::new(TaskExecutor::default()), + Arc::new(TokioTestExecutor::default()), ChainHeadConfig { global_max_pinned_blocks: MAX_PINNED_BLOCKS, subscription_max_pinned_duration: Duration::from_secs(MAX_PINNED_SECS), subscription_max_ongoing_operations: MAX_OPERATIONS, - operation_max_storage_items: MAX_PAGINATION_LIMIT, - max_lagging_distance: MAX_LAGGING_DISTANCE, max_follow_subscriptions_per_connection: MAX_FOLLOW_SUBSCRIPTIONS_PER_CONNECTION, }, @@ -2566,7 +2539,7 @@ async fn pin_block_references() { genesis_block_builder, None, None, - Box::new(TaskExecutor::new()), + Box::new(TokioTestExecutor::default()), client_config, ) .unwrap(), @@ -2575,13 +2548,11 @@ async fn pin_block_references() { let api = ChainHead::new( client.clone(), backend.clone(), - Arc::new(TaskExecutor::default()), + Arc::new(TokioTestExecutor::default()), ChainHeadConfig { global_max_pinned_blocks: 3, subscription_max_pinned_duration: Duration::from_secs(MAX_PINNED_SECS), subscription_max_ongoing_operations: MAX_OPERATIONS, - operation_max_storage_items: MAX_PAGINATION_LIMIT, - max_lagging_distance: MAX_LAGGING_DISTANCE, max_follow_subscriptions_per_connection: MAX_FOLLOW_SUBSCRIPTIONS_PER_CONNECTION, }, @@ -2712,13 +2683,11 @@ async fn follow_finalized_before_new_block() { let api = ChainHead::new( client_mock.clone(), backend, - Arc::new(TaskExecutor::default()), + Arc::new(TokioTestExecutor::default()), ChainHeadConfig { global_max_pinned_blocks: MAX_PINNED_BLOCKS, subscription_max_pinned_duration: Duration::from_secs(MAX_PINNED_SECS), subscription_max_ongoing_operations: MAX_OPERATIONS, - operation_max_storage_items: MAX_PAGINATION_LIMIT, - max_lagging_distance: MAX_LAGGING_DISTANCE, max_follow_subscriptions_per_connection: MAX_FOLLOW_SUBSCRIPTIONS_PER_CONNECTION, }, @@ -2829,13 +2798,11 @@ async fn ensure_operation_limits_works() { let api = ChainHead::new( client.clone(), backend, - Arc::new(TaskExecutor::default()), + Arc::new(TokioTestExecutor::default()), ChainHeadConfig { global_max_pinned_blocks: MAX_PINNED_BLOCKS, subscription_max_pinned_duration: Duration::from_secs(MAX_PINNED_SECS), subscription_max_ongoing_operations: 1, - operation_max_storage_items: MAX_PAGINATION_LIMIT, - max_lagging_distance: MAX_LAGGING_DISTANCE, max_follow_subscriptions_per_connection: MAX_FOLLOW_SUBSCRIPTIONS_PER_CONNECTION, }, @@ -2887,7 +2854,7 @@ async fn ensure_operation_limits_works() { let operation_id = match response { MethodResponse::Started(started) => { // Check discarded items. - assert_eq!(started.discarded_items.unwrap(), 3); + assert!(started.discarded_items.is_none()); started.operation_id }, MethodResponse::LimitReached => panic!("Expected started response"), @@ -2922,7 +2889,7 @@ async fn ensure_operation_limits_works() { } #[tokio::test] -async fn check_continue_operation() { +async fn storage_is_backpressured() { let child_info = ChildInfo::new_default(CHILD_STORAGE_KEY); let builder = TestClientBuilder::new().add_extra_child_storage( &child_info, @@ -2936,13 +2903,11 @@ async fn check_continue_operation() { let api = ChainHead::new( client.clone(), backend, - Arc::new(TaskExecutor::default()), + Arc::new(TokioTestExecutor::default()), ChainHeadConfig { global_max_pinned_blocks: MAX_PINNED_BLOCKS, subscription_max_pinned_duration: Duration::from_secs(MAX_PINNED_SECS), subscription_max_ongoing_operations: MAX_OPERATIONS, - operation_max_storage_items: 1, - max_lagging_distance: MAX_LAGGING_DISTANCE, max_follow_subscriptions_per_connection: MAX_FOLLOW_SUBSCRIPTIONS_PER_CONNECTION, }, @@ -3021,18 +2986,6 @@ async fn check_continue_operation() { res.items[0].result == StorageResultType::Value(hex_string(b"a")) ); - // Pagination event. - assert_matches!( - get_next_event::>(&mut sub).await, - FollowEvent::OperationWaitingForContinue(res) if res.operation_id == operation_id - ); - - does_not_produce_event::>( - &mut sub, - std::time::Duration::from_secs(DOES_NOT_PRODUCE_EVENTS_SECONDS), - ) - .await; - let _res: () = api.call("chainHead_v1_continue", [&sub_id, &operation_id]).await.unwrap(); assert_matches!( get_next_event::>(&mut sub).await, FollowEvent::OperationStorageItems(res) if res.operation_id == operation_id && @@ -3041,17 +2994,6 @@ async fn check_continue_operation() { res.items[0].result == StorageResultType::Value(hex_string(b"ab")) ); - // Pagination event. - assert_matches!( - get_next_event::>(&mut sub).await, - FollowEvent::OperationWaitingForContinue(res) if res.operation_id == operation_id - ); - does_not_produce_event::>( - &mut sub, - std::time::Duration::from_secs(DOES_NOT_PRODUCE_EVENTS_SECONDS), - ) - .await; - let _res: () = api.call("chainHead_v1_continue", [&sub_id, &operation_id]).await.unwrap(); assert_matches!( get_next_event::>(&mut sub).await, FollowEvent::OperationStorageItems(res) if res.operation_id == operation_id && @@ -3060,18 +3002,6 @@ async fn check_continue_operation() { res.items[0].result == StorageResultType::Value(hex_string(b"abcmoD")) ); - // Pagination event. - assert_matches!( - get_next_event::>(&mut sub).await, - FollowEvent::OperationWaitingForContinue(res) if res.operation_id == operation_id - ); - - does_not_produce_event::>( - &mut sub, - std::time::Duration::from_secs(DOES_NOT_PRODUCE_EVENTS_SECONDS), - ) - .await; - let _res: () = api.call("chainHead_v1_continue", [&sub_id, &operation_id]).await.unwrap(); assert_matches!( get_next_event::>(&mut sub).await, FollowEvent::OperationStorageItems(res) if res.operation_id == operation_id && @@ -3080,17 +3010,6 @@ async fn check_continue_operation() { res.items[0].result == StorageResultType::Value(hex_string(b"abc")) ); - // Pagination event. - assert_matches!( - get_next_event::>(&mut sub).await, - FollowEvent::OperationWaitingForContinue(res) if res.operation_id == operation_id - ); - does_not_produce_event::>( - &mut sub, - std::time::Duration::from_secs(DOES_NOT_PRODUCE_EVENTS_SECONDS), - ) - .await; - let _res: () = api.call("chainHead_v1_continue", [&sub_id, &operation_id]).await.unwrap(); assert_matches!( get_next_event::>(&mut sub).await, FollowEvent::OperationStorageItems(res) if res.operation_id == operation_id && @@ -3121,13 +3040,11 @@ async fn stop_storage_operation() { let api = ChainHead::new( client.clone(), backend, - Arc::new(TaskExecutor::default()), + Arc::new(TokioTestExecutor::default()), ChainHeadConfig { global_max_pinned_blocks: MAX_PINNED_BLOCKS, subscription_max_pinned_duration: Duration::from_secs(MAX_PINNED_SECS), subscription_max_ongoing_operations: MAX_OPERATIONS, - operation_max_storage_items: 1, - max_lagging_distance: MAX_LAGGING_DISTANCE, max_follow_subscriptions_per_connection: MAX_FOLLOW_SUBSCRIPTIONS_PER_CONNECTION, }, @@ -3203,15 +3120,22 @@ async fn stop_storage_operation() { res.items[0].result == StorageResultType::Value(hex_string(b"a")) ); - // Pagination event. assert_matches!( get_next_event::>(&mut sub).await, - FollowEvent::OperationWaitingForContinue(res) if res.operation_id == operation_id + FollowEvent::OperationStorageItems(res) if res.operation_id == operation_id && + res.items.len() == 1 && + res.items[0].key == hex_string(b":mo") && + res.items[0].result == StorageResultType::Value(hex_string(b"ab")) ); // Stop the operation. let _res: () = api.call("chainHead_v1_stopOperation", [&sub_id, &operation_id]).await.unwrap(); + assert_matches!( + get_next_event::>(&mut sub).await, + FollowEvent::OperationStorageDone(done) if done.operation_id == operation_id + ); + does_not_produce_event::>( &mut sub, std::time::Duration::from_secs(DOES_NOT_PRODUCE_EVENTS_SECONDS), @@ -3289,30 +3213,23 @@ async fn storage_closest_merkle_value() { MethodResponse::LimitReached => panic!("Expected started response"), }; - let event = get_next_event::>(&mut sub).await; - let merkle_values: HashMap<_, _> = match event { - FollowEvent::OperationStorageItems(res) => { - assert_eq!(res.operation_id, operation_id); + let mut merkle_values = HashMap::new(); - res.items - .into_iter() - .map(|res| { + loop { + match get_next_event::>(&mut sub).await { + FollowEvent::OperationStorageItems(res) if res.operation_id == operation_id => + for res in res.items { let value = match res.result { StorageResultType::ClosestDescendantMerkleValue(value) => value, _ => panic!("Unexpected StorageResultType"), }; - (res.key, value) - }) - .collect() - }, - _ => panic!("Expected OperationStorageItems event"), - }; - - // Finished. - assert_matches!( - get_next_event::>(&mut sub).await, - FollowEvent::OperationStorageDone(done) if done.operation_id == operation_id - ); + merkle_values.insert(res.key, value); + }, + FollowEvent::OperationStorageDone(done) if done.operation_id == operation_id => + break, + _ => panic!("Unexpected event"), + } + } // Response for AAAA, AAAB, A and AA. assert_eq!(merkle_values.len(), 4); @@ -3420,12 +3337,11 @@ async fn chain_head_stop_all_subscriptions() { let api = ChainHead::new( client.clone(), backend, - Arc::new(TaskExecutor::default()), + Arc::new(TokioTestExecutor::default()), ChainHeadConfig { global_max_pinned_blocks: MAX_PINNED_BLOCKS, subscription_max_pinned_duration: Duration::from_secs(MAX_PINNED_SECS), subscription_max_ongoing_operations: MAX_OPERATIONS, - operation_max_storage_items: MAX_PAGINATION_LIMIT, max_lagging_distance: 5, max_follow_subscriptions_per_connection: MAX_FOLLOW_SUBSCRIPTIONS_PER_CONNECTION, }, @@ -3634,12 +3550,11 @@ async fn chain_head_limit_reached() { let api = ChainHead::new( client.clone(), backend, - Arc::new(TaskExecutor::default()), + Arc::new(TokioTestExecutor::default()), ChainHeadConfig { global_max_pinned_blocks: MAX_PINNED_BLOCKS, subscription_max_pinned_duration: Duration::from_secs(MAX_PINNED_SECS), subscription_max_ongoing_operations: MAX_OPERATIONS, - operation_max_storage_items: MAX_PAGINATION_LIMIT, max_lagging_distance: MAX_LAGGING_DISTANCE, max_follow_subscriptions_per_connection: 1, }, @@ -3675,12 +3590,11 @@ async fn follow_unique_pruned_blocks() { let api = ChainHead::new( client.clone(), backend, - Arc::new(TaskExecutor::default()), + Arc::new(TokioTestExecutor::default()), ChainHeadConfig { global_max_pinned_blocks: MAX_PINNED_BLOCKS, subscription_max_pinned_duration: Duration::from_secs(MAX_PINNED_SECS), subscription_max_ongoing_operations: MAX_OPERATIONS, - operation_max_storage_items: MAX_PAGINATION_LIMIT, max_follow_subscriptions_per_connection: MAX_FOLLOW_SUBSCRIPTIONS_PER_CONNECTION, max_lagging_distance: MAX_LAGGING_DISTANCE, }, diff --git a/substrate/client/rpc-spec-v2/src/common/storage.rs b/substrate/client/rpc-spec-v2/src/common/storage.rs index bd249e033f8f..2e24a8da8ca8 100644 --- a/substrate/client/rpc-spec-v2/src/common/storage.rs +++ b/substrate/client/rpc-spec-v2/src/common/storage.rs @@ -22,6 +22,7 @@ use std::{marker::PhantomData, sync::Arc}; use sc_client_api::{Backend, ChildInfo, StorageKey, StorageProvider}; use sp_runtime::traits::Block as BlockT; +use tokio::sync::mpsc; use super::events::{StorageResult, StorageResultType}; use crate::hex_string; @@ -33,6 +34,12 @@ pub struct Storage { _phandom: PhantomData<(BE, Block)>, } +impl Clone for Storage { + fn clone(&self) -> Self { + Self { client: self.client.clone(), _phandom: PhantomData } + } +} + impl Storage { /// Constructs a new [`Storage`]. pub fn new(client: Arc) -> Self { @@ -41,6 +48,7 @@ impl Storage { } /// Query to iterate over storage. +#[derive(Debug)] pub struct QueryIter { /// The key from which the iteration was started. pub query_key: StorageKey, @@ -51,6 +59,7 @@ pub struct QueryIter { } /// The query type of an iteration. +#[derive(Debug)] pub enum IterQueryType { /// Iterating over (key, value) pairs. Value, @@ -123,7 +132,7 @@ where key: &StorageKey, child_key: Option<&ChildInfo>, ) -> QueryResult { - let result = if let Some(child_key) = child_key { + let result = if let Some(ref child_key) = child_key { self.client.child_closest_merkle_value(hash, child_key, key) } else { self.client.closest_merkle_value(hash, key) @@ -146,6 +155,50 @@ where .unwrap_or_else(|error| QueryResult::Err(error.to_string())) } + /// Iterate over the storage keys and send the results to the provided sender. + /// + /// Because this relies on a bounded channel, it will pause the storage iteration + // if the channel is becomes full which in turn provides backpressure. + pub fn query_iter_pagination_with_producer( + &self, + query: QueryIter, + hash: Block::Hash, + child_key: Option<&ChildInfo>, + tx: &mpsc::Sender, + ) { + let QueryIter { ty, query_key, pagination_start_key } = query; + + let maybe_storage = if let Some(child_key) = child_key { + self.client.child_storage_keys( + hash, + child_key.to_owned(), + Some(&query_key), + pagination_start_key.as_ref(), + ) + } else { + self.client.storage_keys(hash, Some(&query_key), pagination_start_key.as_ref()) + }; + + let keys_iter = match maybe_storage { + Ok(keys_iter) => keys_iter, + Err(error) => { + _ = tx.blocking_send(Err(error.to_string())); + return; + }, + }; + + for key in keys_iter { + let result = match ty { + IterQueryType::Value => self.query_value(hash, &key, child_key), + IterQueryType::Hash => self.query_hash(hash, &key, child_key), + }; + + if tx.blocking_send(result).is_err() { + break; + } + } + } + /// Iterate over at most the provided number of keys. /// /// Returns the storage result with a potential next key to resume iteration.