Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix chain_subscribeAllHeads not recreating the channel if it dies #2465

Merged
merged 4 commits into from
Jul 5, 2022
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
136 changes: 72 additions & 64 deletions bin/light-base/src/json_rpc_service/state_chain.rs
Original file line number Diff line number Diff line change
Expand Up @@ -397,82 +397,90 @@ impl<TPlat: Platform> Background<TPlat> {
)
.await;

let mut new_blocks = {
// The buffer size should be large enough so that, if the CPU is busy, it doesn't
// become full before the execution of the runtime service resumes.
// The maximum number of pinned block is ignored, as this maximum is a way to avoid
// malicious behaviors. This code is by definition not considered malicious.
let subscribe_all = self
.runtime_service
.subscribe_all(32, NonZeroUsize::new(usize::max_value()).unwrap())
.await;

// The finalized and already-known blocks aren't reported to the user, but we need
// unpin them on to the runtime service.
subscribe_all
.new_blocks
.unpin_block(&header::hash_from_scale_encoded_header(
&subscribe_all.finalized_block_scale_encoded_header,
))
.await;
for block in subscribe_all.non_finalized_blocks_ancestry_order {
subscribe_all
.new_blocks
.unpin_block(&header::hash_from_scale_encoded_header(
&block.scale_encoded_header,
))
.await;
}

subscribe_all.new_blocks
};

// Spawn a separate task for the subscription.
let task = {
let me = self.clone();
async move {
loop {
match new_blocks.next().await {
Some(runtime_service::Notification::Block(block)) => {
new_blocks
let mut new_blocks = {
// The buffer size should be large enough so that, if the CPU is busy, it
// doesn't become full before the execution of the runtime service resumes.
// The maximum number of pinned block is ignored, as this maximum is a way
// to avoid malicious behaviors. This code is by definition not considered
// malicious.
let subscribe_all = me
.runtime_service
.subscribe_all(64, NonZeroUsize::new(usize::max_value()).unwrap())
.await;

// The existing finalized and already-known blocks aren't reported to the
// user, but we need to unpin them on to the runtime service.
subscribe_all
.new_blocks
.unpin_block(&header::hash_from_scale_encoded_header(
&subscribe_all.finalized_block_scale_encoded_header,
))
.await;
for block in subscribe_all.non_finalized_blocks_ancestry_order {
subscribe_all
.new_blocks
.unpin_block(&header::hash_from_scale_encoded_header(
&block.scale_encoded_header,
))
.await;
}

let header = match methods::Header::from_scale_encoded_header(
&block.scale_encoded_header,
) {
Ok(h) => h,
Err(error) => {
log::warn!(
target: &me.log_target,
"`chain_subscribeAllHeads` subscription has skipped \
block due to undecodable header. Hash: {}. Error: {}",
HashDisplay(&header::hash_from_scale_encoded_header(&block.scale_encoded_header)),
error,
);
continue;
}
};
subscribe_all.new_blocks
};

let _ = me
.requests_subscriptions
.try_push_notification(
&state_machine_subscription,
methods::ServerToClient::chain_newHead {
subscription: (&subscription_id).into(),
result: header,
loop {
match new_blocks.next().await {
Some(runtime_service::Notification::Block(block)) => {
new_blocks
.unpin_block(&header::hash_from_scale_encoded_header(
&block.scale_encoded_header,
))
.await;

let header = match methods::Header::from_scale_encoded_header(
&block.scale_encoded_header,
) {
Ok(h) => h,
Err(error) => {
log::warn!(
target: &me.log_target,
"`chain_subscribeAllHeads` subscription has skipped \
block due to undecodable header. Hash: {}. Error: {}",
HashDisplay(&header::hash_from_scale_encoded_header(&block.scale_encoded_header)),
error,
);
continue;
}
.to_json_call_object_parameters(None),
)
.await;
}
Some(runtime_service::Notification::BestBlockChanged { .. })
| Some(runtime_service::Notification::Finalized { .. }) => {}
None => {
// TODO: must recreate the channel
return;
};

// This function call will fail if the queue of notifications to
// the user has too many elements in it. This JSON-RPC function
// unfortunately doesn't provide any mechanism to deal with this
// situation, and we handle it by simply not sending the
// notification.
let _ = me
.requests_subscriptions
.try_push_notification(
&state_machine_subscription,
methods::ServerToClient::chain_newHead {
subscription: (&subscription_id).into(),
result: header,
}
.to_json_call_object_parameters(None),
)
.await;
}
Some(runtime_service::Notification::BestBlockChanged { .. })
| Some(runtime_service::Notification::Finalized { .. }) => {}
None => {
// Break from the inner loop in order to recreate the channel.
break;
}
}
}
}
Expand Down
1 change: 1 addition & 0 deletions bin/wasm-node/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
### Fixed

- Changes in the current best block of a parachain are now taken into account if the new best block had already been reported in the past. ([#2457](https://github.com/paritytech/smoldot/pull/2457))
- Fix active `chain_subscribeAllHeads` subscriptions silently freezing when the number of non-finalized blocks gets above a certain threshold, which typically happens if Internet connectivity is lost for a long time. ([#2465](https://github.com/paritytech/smoldot/pull/2465))

## 0.6.21 - 2022-06-30

Expand Down