diff --git a/bin/light-base/src/json_rpc_service/state_chain.rs b/bin/light-base/src/json_rpc_service/state_chain.rs index 12ca823f98..4610e15808 100644 --- a/bin/light-base/src/json_rpc_service/state_chain.rs +++ b/bin/light-base/src/json_rpc_service/state_chain.rs @@ -397,82 +397,90 @@ impl Background { ) .await; - let mut new_blocks = { - // The buffer size should be large enough so that, if the CPU is busy, it doesn't - // become full before the execution of the runtime service resumes. - // The maximum number of pinned block is ignored, as this maximum is a way to avoid - // malicious behaviors. This code is by definition not considered malicious. - let subscribe_all = self - .runtime_service - .subscribe_all(32, NonZeroUsize::new(usize::max_value()).unwrap()) - .await; - - // The finalized and already-known blocks aren't reported to the user, but we need - // unpin them on to the runtime service. - subscribe_all - .new_blocks - .unpin_block(&header::hash_from_scale_encoded_header( - &subscribe_all.finalized_block_scale_encoded_header, - )) - .await; - for block in subscribe_all.non_finalized_blocks_ancestry_order { - subscribe_all - .new_blocks - .unpin_block(&header::hash_from_scale_encoded_header( - &block.scale_encoded_header, - )) - .await; - } - - subscribe_all.new_blocks - }; - // Spawn a separate task for the subscription. let task = { let me = self.clone(); async move { loop { - match new_blocks.next().await { - Some(runtime_service::Notification::Block(block)) => { - new_blocks + let mut new_blocks = { + // The buffer size should be large enough so that, if the CPU is busy, it + // doesn't become full before the execution of the runtime service resumes. + // The maximum number of pinned block is ignored, as this maximum is a way + // to avoid malicious behaviors. This code is by definition not considered + // malicious. + let subscribe_all = me + .runtime_service + .subscribe_all(64, NonZeroUsize::new(usize::max_value()).unwrap()) + .await; + + // The existing finalized and already-known blocks aren't reported to the + // user, but we need to unpin them on to the runtime service. + subscribe_all + .new_blocks + .unpin_block(&header::hash_from_scale_encoded_header( + &subscribe_all.finalized_block_scale_encoded_header, + )) + .await; + for block in subscribe_all.non_finalized_blocks_ancestry_order { + subscribe_all + .new_blocks .unpin_block(&header::hash_from_scale_encoded_header( &block.scale_encoded_header, )) .await; + } - let header = match methods::Header::from_scale_encoded_header( - &block.scale_encoded_header, - ) { - Ok(h) => h, - Err(error) => { - log::warn!( - target: &me.log_target, - "`chain_subscribeAllHeads` subscription has skipped \ - block due to undecodable header. Hash: {}. Error: {}", - HashDisplay(&header::hash_from_scale_encoded_header(&block.scale_encoded_header)), - error, - ); - continue; - } - }; + subscribe_all.new_blocks + }; - let _ = me - .requests_subscriptions - .try_push_notification( - &state_machine_subscription, - methods::ServerToClient::chain_newHead { - subscription: (&subscription_id).into(), - result: header, + loop { + match new_blocks.next().await { + Some(runtime_service::Notification::Block(block)) => { + new_blocks + .unpin_block(&header::hash_from_scale_encoded_header( + &block.scale_encoded_header, + )) + .await; + + let header = match methods::Header::from_scale_encoded_header( + &block.scale_encoded_header, + ) { + Ok(h) => h, + Err(error) => { + log::warn!( + target: &me.log_target, + "`chain_subscribeAllHeads` subscription has skipped \ + block due to undecodable header. Hash: {}. Error: {}", + HashDisplay(&header::hash_from_scale_encoded_header(&block.scale_encoded_header)), + error, + ); + continue; } - .to_json_call_object_parameters(None), - ) - .await; - } - Some(runtime_service::Notification::BestBlockChanged { .. }) - | Some(runtime_service::Notification::Finalized { .. }) => {} - None => { - // TODO: must recreate the channel - return; + }; + + // This function call will fail if the queue of notifications to + // the user has too many elements in it. This JSON-RPC function + // unfortunately doesn't provide any mechanism to deal with this + // situation, and we handle it by simply not sending the + // notification. + let _ = me + .requests_subscriptions + .try_push_notification( + &state_machine_subscription, + methods::ServerToClient::chain_newHead { + subscription: (&subscription_id).into(), + result: header, + } + .to_json_call_object_parameters(None), + ) + .await; + } + Some(runtime_service::Notification::BestBlockChanged { .. }) + | Some(runtime_service::Notification::Finalized { .. }) => {} + None => { + // Break from the inner loop in order to recreate the channel. + break; + } } } } diff --git a/bin/wasm-node/CHANGELOG.md b/bin/wasm-node/CHANGELOG.md index 50f5946bc5..afda08e000 100644 --- a/bin/wasm-node/CHANGELOG.md +++ b/bin/wasm-node/CHANGELOG.md @@ -5,6 +5,7 @@ ### Fixed - Changes in the current best block of a parachain are now taken into account if the new best block had already been reported in the past. ([#2457](https://github.com/paritytech/smoldot/pull/2457)) +- Fix active `chain_subscribeAllHeads` subscriptions silently freezing when the number of non-finalized blocks gets above a certain threshold, which typically happens if Internet connectivity is lost for a long time. ([#2465](https://github.com/paritytech/smoldot/pull/2465)) ## 0.6.21 - 2022-06-30