diff --git a/bin/light-base/src/json_rpc_service.rs b/bin/light-base/src/json_rpc_service.rs index 23fb9aefe8..61ce356973 100644 --- a/bin/light-base/src/json_rpc_service.rs +++ b/bin/light-base/src/json_rpc_service.rs @@ -602,7 +602,11 @@ impl Background { // malicious. let mut subscribe_all = me .runtime_service - .subscribe_all(32, NonZeroUsize::new(usize::max_value()).unwrap()) + .subscribe_all( + "json-rpc-blocks-cache", + 32, + NonZeroUsize::new(usize::max_value()).unwrap(), + ) .await; cache.subscription_id = Some(subscribe_all.new_blocks.id()); diff --git a/bin/light-base/src/json_rpc_service/chain_head.rs b/bin/light-base/src/json_rpc_service/chain_head.rs index c3ca039090..7ab9142030 100644 --- a/bin/light-base/src/json_rpc_service/chain_head.rs +++ b/bin/light-base/src/json_rpc_service/chain_head.rs @@ -380,7 +380,7 @@ impl Background { let (mut subscribe_all, runtime_subscribe_all) = if runtime_updates { let subscribe_all = self .runtime_service - .subscribe_all(32, NonZeroUsize::new(32).unwrap()) + .subscribe_all("chainHead_follow", 32, NonZeroUsize::new(32).unwrap()) .await; let id = subscribe_all.new_blocks.id(); (either::Left(subscribe_all), Some(id)) diff --git a/bin/light-base/src/json_rpc_service/getters.rs b/bin/light-base/src/json_rpc_service/getters.rs index 2d932ca468..cba435ca8a 100644 --- a/bin/light-base/src/json_rpc_service/getters.rs +++ b/bin/light-base/src/json_rpc_service/getters.rs @@ -38,7 +38,7 @@ impl Background { header::hash_from_scale_encoded_header( &self .runtime_service - .subscribe_all(16, NonZeroUsize::new(24).unwrap()) + .subscribe_all("chain_getFinalizedHead", 16, NonZeroUsize::new(24).unwrap()) .await .finalized_block_scale_encoded_header, ), diff --git a/bin/light-base/src/json_rpc_service/state_chain.rs b/bin/light-base/src/json_rpc_service/state_chain.rs index e8ba11798b..6430cf78de 100644 --- a/bin/light-base/src/json_rpc_service/state_chain.rs +++ b/bin/light-base/src/json_rpc_service/state_chain.rs @@ -417,7 +417,11 @@ impl Background { // malicious. let subscribe_all = me .runtime_service - .subscribe_all(64, NonZeroUsize::new(usize::max_value()).unwrap()) + .subscribe_all( + "chain_subscribeAllHeads", + 64, + NonZeroUsize::new(usize::max_value()).unwrap(), + ) .await; // The existing finalized and already-known blocks aren't reported to the diff --git a/bin/light-base/src/json_rpc_service/state_chain/sub_utils.rs b/bin/light-base/src/json_rpc_service/state_chain/sub_utils.rs index d3a82149e6..c2dffee818 100644 --- a/bin/light-base/src/json_rpc_service/state_chain/sub_utils.rs +++ b/bin/light-base/src/json_rpc_service/state_chain/sub_utils.rs @@ -44,7 +44,7 @@ pub async fn subscribe_runtime_version( ) { let mut master_stream = stream::unfold(runtime_service.clone(), |runtime_service| async move { let subscribe_all = runtime_service - .subscribe_all(16, NonZeroUsize::new(24).unwrap()) + .subscribe_all("subscribe-runtime-version", 16, NonZeroUsize::new(24).unwrap()) .await; // Map of runtimes by hash. Contains all non-finalized blocks, plus the current finalized @@ -227,7 +227,7 @@ pub async fn subscribe_finalized( ) -> (Vec, stream::BoxStream<'static, Vec>) { let mut master_stream = stream::unfold(runtime_service.clone(), |runtime_service| async move { let subscribe_all = runtime_service - .subscribe_all(16, NonZeroUsize::new(32).unwrap()) + .subscribe_all("subscribe-finalized", 16, NonZeroUsize::new(32).unwrap()) .await; // Map of block headers by hash. Contains all non-finalized blocks headers. @@ -310,7 +310,7 @@ pub async fn subscribe_best( ) -> (Vec, stream::BoxStream<'static, Vec>) { let mut master_stream = stream::unfold(runtime_service.clone(), |runtime_service| async move { let subscribe_all = runtime_service - .subscribe_all(16, NonZeroUsize::new(32).unwrap()) + .subscribe_all("subscribe-best", 16, NonZeroUsize::new(32).unwrap()) .await; // Map of block headers by hash. Contains all non-finalized blocks headers, plus the diff --git a/bin/light-base/src/runtime_service.rs b/bin/light-base/src/runtime_service.rs index 2c5881e917..416bd9a703 100644 --- a/bin/light-base/src/runtime_service.rs +++ b/bin/light-base/src/runtime_service.rs @@ -185,6 +185,11 @@ impl RuntimeService { /// This function only returns once the runtime of the current finalized block is known. This /// might take a long time. /// + /// A name must be passed to be used for debugging purposes. At the time of writing of this + /// comment, the `#[must_use]` attribute doesn't work on asynchronous functions, making a name + /// extremely useful. If `#[must_use]` ever works on asynchronous functions, this `name` might + /// be removed. + /// /// Only up to `buffer_size` block notifications are buffered in the channel. If the channel /// is full when a new notification is attempted to be pushed, the channel gets closed. /// @@ -201,6 +206,7 @@ impl RuntimeService { /// See [`SubscribeAll`] for information about the return value. pub async fn subscribe_all( &self, + subscription_name: &'static str, buffer_size: usize, max_pinned_blocks: NonZeroUsize, ) -> SubscribeAll { @@ -326,7 +332,10 @@ impl RuntimeService { 0 | 1 )); - all_blocks_subscriptions.insert(subscription_id, (tx, max_pinned_blocks.get() - 1)); + all_blocks_subscriptions.insert( + subscription_id, + (subscription_name, tx, max_pinned_blocks.get() - 1), + ); SubscribeAll { finalized_block_scale_encoded_header: finalized_block.scale_encoded_header.clone(), @@ -374,23 +383,25 @@ impl RuntimeService { .. } = &mut guarded_lock.tree { - let block_counts_towards_limit = - match pinned_blocks.remove(&(subscription_id.0, *block_hash)) { - Some((_, _, _, to_remove)) => !to_remove, - None => { - // Cold path. - if all_blocks_subscriptions.contains_key(&subscription_id.0) { - panic!("block already unpinned"); - } else { - return; - } + let block_counts_towards_limit = match pinned_blocks + .remove(&(subscription_id.0, *block_hash)) + { + Some((_, _, _, to_remove)) => !to_remove, + None => { + // Cold path. + if let Some((sub_name, _, _)) = all_blocks_subscriptions.get(&subscription_id.0) + { + panic!("block already unpinned for {} subscription", sub_name); + } else { + return; } - }; + } + }; guarded_lock.runtimes.retain(|_, rt| rt.strong_count() > 0); if block_counts_towards_limit { - let (_, finalized_pinned_remaining) = all_blocks_subscriptions + let (_name, _, finalized_pinned_remaining) = all_blocks_subscriptions .get_mut(&subscription_id.0) .unwrap(); *finalized_pinned_remaining += 1; @@ -434,8 +445,10 @@ impl RuntimeService { Some(v) => v.clone(), None => { // Cold path. - if all_blocks_subscriptions.contains_key(&subscription_id.0) { - panic!("block already unpinned"); + if let Some((sub_name, _, _)) = + all_blocks_subscriptions.get(&subscription_id.0) + { + panic!("block already unpinned for subscription {}", sub_name); } else { return Err(PinnedBlockRuntimeLockError::ObsoleteSubscription); } @@ -1001,8 +1014,11 @@ enum GuardedInner { /// finalized or non-canonical blocks remaining for this subscription. /// /// Keys are assigned from [`Guarded::next_subscription_id`]. - all_blocks_subscriptions: - hashbrown::HashMap, usize), fnv::FnvBuildHasher>, + all_blocks_subscriptions: hashbrown::HashMap< + u64, + (&'static str, mpsc::Sender, usize), + fnv::FnvBuildHasher, + >, /// List of pinned blocks. /// @@ -1569,7 +1585,7 @@ impl Background { }; let mut to_remove = Vec::new(); - for (subscription_id, (sender, finalized_pinned_remaining)) in + for (subscription_id, (_, sender, finalized_pinned_remaining)) in all_blocks_subscriptions.iter_mut() { let count_limit = pruned_blocks.len() + 1; @@ -1658,7 +1674,8 @@ impl Background { }); let mut to_remove = Vec::new(); - for (subscription_id, (sender, _)) in all_blocks_subscriptions.iter_mut() { + for (subscription_id, (_, sender, _)) in all_blocks_subscriptions.iter_mut() + { if sender.try_send(notif.clone()).is_ok() { pinned_blocks.insert( (*subscription_id, block_hash), @@ -1698,7 +1715,8 @@ impl Background { let notif = Notification::BestBlockChanged { hash }; let mut to_remove = Vec::new(); - for (subscription_id, (sender, _)) in all_blocks_subscriptions.iter_mut() { + for (subscription_id, (_, sender, _)) in all_blocks_subscriptions.iter_mut() + { if sender.try_send(notif.clone()).is_err() { to_remove.push(*subscription_id); } diff --git a/bin/light-base/src/sync_service/parachain.rs b/bin/light-base/src/sync_service/parachain.rs index be674aa2c9..de7d106d03 100644 --- a/bin/light-base/src/sync_service/parachain.rs +++ b/bin/light-base/src/sync_service/parachain.rs @@ -81,7 +81,11 @@ pub(super) async fn start_parachain( // The maximum number of pinned block is ignored, as this maximum is a way to avoid // malicious behaviors. This code is by definition not considered malicious. let mut relay_chain_subscribe_all = relay_chain_sync - .subscribe_all(32, NonZeroUsize::new(usize::max_value()).unwrap()) + .subscribe_all( + "parachain-sync", + 32, + NonZeroUsize::new(usize::max_value()).unwrap(), + ) .await; log::debug!( target: &log_target, diff --git a/bin/light-base/src/transactions_service.rs b/bin/light-base/src/transactions_service.rs index d377ad01d0..f265837b6b 100644 --- a/bin/light-base/src/transactions_service.rs +++ b/bin/light-base/src/transactions_service.rs @@ -338,7 +338,11 @@ async fn background_task( // malicious behaviors. This code is by definition not considered malicious. let mut subscribe_all = worker .runtime_service - .subscribe_all(32, NonZeroUsize::new(usize::max_value()).unwrap()) + .subscribe_all( + "transactions-service", + 32, + NonZeroUsize::new(usize::max_value()).unwrap(), + ) .await; let initial_finalized_block_hash = header::hash_from_scale_encoded_header( &subscribe_all.finalized_block_scale_encoded_header,