Skip to content

Commit

Permalink
Add names to runtime service subscriptions (#2631)
Browse files Browse the repository at this point in the history
Co-authored-by: mergify[bot] <37929162+mergify[bot]@users.noreply.github.com>
  • Loading branch information
tomaka and mergify[bot] authored Aug 14, 2022
1 parent 76abc44 commit a63ac12
Show file tree
Hide file tree
Showing 8 changed files with 63 additions and 29 deletions.
6 changes: 5 additions & 1 deletion bin/light-base/src/json_rpc_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -602,7 +602,11 @@ impl<TPlat: Platform> Background<TPlat> {
// malicious.
let mut subscribe_all = me
.runtime_service
.subscribe_all(32, NonZeroUsize::new(usize::max_value()).unwrap())
.subscribe_all(
"json-rpc-blocks-cache",
32,
NonZeroUsize::new(usize::max_value()).unwrap(),
)
.await;

cache.subscription_id = Some(subscribe_all.new_blocks.id());
Expand Down
2 changes: 1 addition & 1 deletion bin/light-base/src/json_rpc_service/chain_head.rs
Original file line number Diff line number Diff line change
Expand Up @@ -380,7 +380,7 @@ impl<TPlat: Platform> Background<TPlat> {
let (mut subscribe_all, runtime_subscribe_all) = if runtime_updates {
let subscribe_all = self
.runtime_service
.subscribe_all(32, NonZeroUsize::new(32).unwrap())
.subscribe_all("chainHead_follow", 32, NonZeroUsize::new(32).unwrap())
.await;
let id = subscribe_all.new_blocks.id();
(either::Left(subscribe_all), Some(id))
Expand Down
2 changes: 1 addition & 1 deletion bin/light-base/src/json_rpc_service/getters.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ impl<TPlat: Platform> Background<TPlat> {
header::hash_from_scale_encoded_header(
&self
.runtime_service
.subscribe_all(16, NonZeroUsize::new(24).unwrap())
.subscribe_all("chain_getFinalizedHead", 16, NonZeroUsize::new(24).unwrap())
.await
.finalized_block_scale_encoded_header,
),
Expand Down
6 changes: 5 additions & 1 deletion bin/light-base/src/json_rpc_service/state_chain.rs
Original file line number Diff line number Diff line change
Expand Up @@ -417,7 +417,11 @@ impl<TPlat: Platform> Background<TPlat> {
// malicious.
let subscribe_all = me
.runtime_service
.subscribe_all(64, NonZeroUsize::new(usize::max_value()).unwrap())
.subscribe_all(
"chain_subscribeAllHeads",
64,
NonZeroUsize::new(usize::max_value()).unwrap(),
)
.await;

// The existing finalized and already-known blocks aren't reported to the
Expand Down
6 changes: 3 additions & 3 deletions bin/light-base/src/json_rpc_service/state_chain/sub_utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ pub async fn subscribe_runtime_version<TPlat: Platform>(
) {
let mut master_stream = stream::unfold(runtime_service.clone(), |runtime_service| async move {
let subscribe_all = runtime_service
.subscribe_all(16, NonZeroUsize::new(24).unwrap())
.subscribe_all("subscribe-runtime-version", 16, NonZeroUsize::new(24).unwrap())
.await;

// Map of runtimes by hash. Contains all non-finalized blocks, plus the current finalized
Expand Down Expand Up @@ -227,7 +227,7 @@ pub async fn subscribe_finalized<TPlat: Platform>(
) -> (Vec<u8>, stream::BoxStream<'static, Vec<u8>>) {
let mut master_stream = stream::unfold(runtime_service.clone(), |runtime_service| async move {
let subscribe_all = runtime_service
.subscribe_all(16, NonZeroUsize::new(32).unwrap())
.subscribe_all("subscribe-finalized", 16, NonZeroUsize::new(32).unwrap())
.await;

// Map of block headers by hash. Contains all non-finalized blocks headers.
Expand Down Expand Up @@ -310,7 +310,7 @@ pub async fn subscribe_best<TPlat: Platform>(
) -> (Vec<u8>, stream::BoxStream<'static, Vec<u8>>) {
let mut master_stream = stream::unfold(runtime_service.clone(), |runtime_service| async move {
let subscribe_all = runtime_service
.subscribe_all(16, NonZeroUsize::new(32).unwrap())
.subscribe_all("subscribe-best", 16, NonZeroUsize::new(32).unwrap())
.await;

// Map of block headers by hash. Contains all non-finalized blocks headers, plus the
Expand Down
58 changes: 38 additions & 20 deletions bin/light-base/src/runtime_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -185,6 +185,11 @@ impl<TPlat: Platform> RuntimeService<TPlat> {
/// This function only returns once the runtime of the current finalized block is known. This
/// might take a long time.
///
/// A name must be passed to be used for debugging purposes. At the time of writing of this
/// comment, the `#[must_use]` attribute doesn't work on asynchronous functions, making a name
/// extremely useful. If `#[must_use]` ever works on asynchronous functions, this `name` might
/// be removed.
///
/// Only up to `buffer_size` block notifications are buffered in the channel. If the channel
/// is full when a new notification is attempted to be pushed, the channel gets closed.
///
Expand All @@ -201,6 +206,7 @@ impl<TPlat: Platform> RuntimeService<TPlat> {
/// See [`SubscribeAll`] for information about the return value.
pub async fn subscribe_all(
&self,
subscription_name: &'static str,
buffer_size: usize,
max_pinned_blocks: NonZeroUsize,
) -> SubscribeAll<TPlat> {
Expand Down Expand Up @@ -326,7 +332,10 @@ impl<TPlat: Platform> RuntimeService<TPlat> {
0 | 1
));

all_blocks_subscriptions.insert(subscription_id, (tx, max_pinned_blocks.get() - 1));
all_blocks_subscriptions.insert(
subscription_id,
(subscription_name, tx, max_pinned_blocks.get() - 1),
);

SubscribeAll {
finalized_block_scale_encoded_header: finalized_block.scale_encoded_header.clone(),
Expand Down Expand Up @@ -374,23 +383,25 @@ impl<TPlat: Platform> RuntimeService<TPlat> {
..
} = &mut guarded_lock.tree
{
let block_counts_towards_limit =
match pinned_blocks.remove(&(subscription_id.0, *block_hash)) {
Some((_, _, _, to_remove)) => !to_remove,
None => {
// Cold path.
if all_blocks_subscriptions.contains_key(&subscription_id.0) {
panic!("block already unpinned");
} else {
return;
}
let block_counts_towards_limit = match pinned_blocks
.remove(&(subscription_id.0, *block_hash))
{
Some((_, _, _, to_remove)) => !to_remove,
None => {
// Cold path.
if let Some((sub_name, _, _)) = all_blocks_subscriptions.get(&subscription_id.0)
{
panic!("block already unpinned for {} subscription", sub_name);
} else {
return;
}
};
}
};

guarded_lock.runtimes.retain(|_, rt| rt.strong_count() > 0);

if block_counts_towards_limit {
let (_, finalized_pinned_remaining) = all_blocks_subscriptions
let (_name, _, finalized_pinned_remaining) = all_blocks_subscriptions
.get_mut(&subscription_id.0)
.unwrap();
*finalized_pinned_remaining += 1;
Expand Down Expand Up @@ -434,8 +445,10 @@ impl<TPlat: Platform> RuntimeService<TPlat> {
Some(v) => v.clone(),
None => {
// Cold path.
if all_blocks_subscriptions.contains_key(&subscription_id.0) {
panic!("block already unpinned");
if let Some((sub_name, _, _)) =
all_blocks_subscriptions.get(&subscription_id.0)
{
panic!("block already unpinned for subscription {}", sub_name);
} else {
return Err(PinnedBlockRuntimeLockError::ObsoleteSubscription);
}
Expand Down Expand Up @@ -1001,8 +1014,11 @@ enum GuardedInner<TPlat: Platform> {
/// finalized or non-canonical blocks remaining for this subscription.
///
/// Keys are assigned from [`Guarded::next_subscription_id`].
all_blocks_subscriptions:
hashbrown::HashMap<u64, (mpsc::Sender<Notification>, usize), fnv::FnvBuildHasher>,
all_blocks_subscriptions: hashbrown::HashMap<
u64,
(&'static str, mpsc::Sender<Notification>, usize),
fnv::FnvBuildHasher,
>,

/// List of pinned blocks.
///
Expand Down Expand Up @@ -1569,7 +1585,7 @@ impl<TPlat: Platform> Background<TPlat> {
};

let mut to_remove = Vec::new();
for (subscription_id, (sender, finalized_pinned_remaining)) in
for (subscription_id, (_, sender, finalized_pinned_remaining)) in
all_blocks_subscriptions.iter_mut()
{
let count_limit = pruned_blocks.len() + 1;
Expand Down Expand Up @@ -1658,7 +1674,8 @@ impl<TPlat: Platform> Background<TPlat> {
});

let mut to_remove = Vec::new();
for (subscription_id, (sender, _)) in all_blocks_subscriptions.iter_mut() {
for (subscription_id, (_, sender, _)) in all_blocks_subscriptions.iter_mut()
{
if sender.try_send(notif.clone()).is_ok() {
pinned_blocks.insert(
(*subscription_id, block_hash),
Expand Down Expand Up @@ -1698,7 +1715,8 @@ impl<TPlat: Platform> Background<TPlat> {
let notif = Notification::BestBlockChanged { hash };

let mut to_remove = Vec::new();
for (subscription_id, (sender, _)) in all_blocks_subscriptions.iter_mut() {
for (subscription_id, (_, sender, _)) in all_blocks_subscriptions.iter_mut()
{
if sender.try_send(notif.clone()).is_err() {
to_remove.push(*subscription_id);
}
Expand Down
6 changes: 5 additions & 1 deletion bin/light-base/src/sync_service/parachain.rs
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,11 @@ pub(super) async fn start_parachain<TPlat: Platform>(
// The maximum number of pinned block is ignored, as this maximum is a way to avoid
// malicious behaviors. This code is by definition not considered malicious.
let mut relay_chain_subscribe_all = relay_chain_sync
.subscribe_all(32, NonZeroUsize::new(usize::max_value()).unwrap())
.subscribe_all(
"parachain-sync",
32,
NonZeroUsize::new(usize::max_value()).unwrap(),
)
.await;
log::debug!(
target: &log_target,
Expand Down
6 changes: 5 additions & 1 deletion bin/light-base/src/transactions_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -338,7 +338,11 @@ async fn background_task<TPlat: Platform>(
// malicious behaviors. This code is by definition not considered malicious.
let mut subscribe_all = worker
.runtime_service
.subscribe_all(32, NonZeroUsize::new(usize::max_value()).unwrap())
.subscribe_all(
"transactions-service",
32,
NonZeroUsize::new(usize::max_value()).unwrap(),
)
.await;
let initial_finalized_block_hash = header::hash_from_scale_encoded_header(
&subscribe_all.finalized_block_scale_encoded_header,
Expand Down

0 comments on commit a63ac12

Please sign in to comment.