Skip to content

Commit

Permalink
chainHead: Ensure reasonable distance between leaf and finalized block (
Browse files Browse the repository at this point in the history
#3562)

This PR ensure that the distance between any leaf and the finalized
block is within a reasonable distance.

For a new subscription, the chainHead has to provide all blocks between
the leaves of the chain and the finalized block.
 When the distance between a leaf and the finalized block is large:
 - The tree route is costly to compute
 - We could deliver an unbounded number of blocks (potentially millions)
(For more details see
#3445 (comment))

The configuration of the ChainHead is extended with:
- suspend on lagging distance: When the distance between any leaf and
the finalized block is greater than this number, the subscriptions are
suspended for a given duration.
- All active subscriptions are terminated with the `Stop` event, all
blocks are unpinned and data discarded.
- For incoming subscriptions, until the suspended period expires the
subscriptions will immediately receive the `Stop` event.
    - Defaults to 128 blocks
- suspended duration: The amount of time for which subscriptions are
suspended
    - Defaults to 30 seconds
 
 
 cc @paritytech/subxt-team

---------

Signed-off-by: Alexandru Vasile <alexandru.vasile@parity.io>
Co-authored-by: Sebastian Kunert <skunert49@gmail.com>
  • Loading branch information
lexnv and skunert authored Apr 3, 2024
1 parent cdacfb9 commit 287b116
Show file tree
Hide file tree
Showing 7 changed files with 313 additions and 164 deletions.
9 changes: 5 additions & 4 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

25 changes: 21 additions & 4 deletions substrate/client/rpc-spec-v2/src/chain_head/chain_head.rs
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,9 @@ pub struct ChainHeadConfig {
pub subscription_max_pinned_duration: Duration,
/// The maximum number of ongoing operations per subscription.
pub subscription_max_ongoing_operations: usize,
/// Stop all subscriptions if the distance between the leaves and the current finalized
/// block is larger than this value.
pub max_lagging_distance: usize,
/// The maximum number of items reported by the `chainHead_storage` before
/// pagination is required.
pub operation_max_storage_items: usize,
Expand All @@ -88,6 +91,10 @@ const MAX_ONGOING_OPERATIONS: usize = 16;
/// before paginations is required.
const MAX_STORAGE_ITER_ITEMS: usize = 5;

/// Stop all subscriptions if the distance between the leaves and the current finalized
/// block is larger than this value.
const MAX_LAGGING_DISTANCE: usize = 128;

/// The maximum number of `chainHead_follow` subscriptions per connection.
const MAX_FOLLOW_SUBSCRIPTIONS_PER_CONNECTION: usize = 4;

Expand All @@ -97,6 +104,7 @@ impl Default for ChainHeadConfig {
global_max_pinned_blocks: MAX_PINNED_BLOCKS,
subscription_max_pinned_duration: MAX_PINNED_DURATION,
subscription_max_ongoing_operations: MAX_ONGOING_OPERATIONS,
max_lagging_distance: MAX_LAGGING_DISTANCE,
operation_max_storage_items: MAX_STORAGE_ITER_ITEMS,
max_follow_subscriptions_per_connection: MAX_FOLLOW_SUBSCRIPTIONS_PER_CONNECTION,
}
Expand All @@ -116,6 +124,9 @@ pub struct ChainHead<BE: Backend<Block>, Block: BlockT, Client> {
/// The maximum number of items reported by the `chainHead_storage` before
/// pagination is required.
operation_max_storage_items: usize,
/// Stop all subscriptions if the distance between the leaves and the current finalized
/// block is larger than this value.
max_lagging_distance: usize,
/// Phantom member to pin the block type.
_phantom: PhantomData<Block>,
}
Expand All @@ -140,6 +151,7 @@ impl<BE: Backend<Block>, Block: BlockT, Client> ChainHead<BE, Block, Client> {
backend,
),
operation_max_storage_items: config.operation_max_storage_items,
max_lagging_distance: config.max_lagging_distance,
_phantom: PhantomData,
}
}
Expand Down Expand Up @@ -187,6 +199,7 @@ where
let subscriptions = self.subscriptions.clone();
let backend = self.backend.clone();
let client = self.client.clone();
let max_lagging_distance = self.max_lagging_distance;

let fut = async move {
// Ensure the current connection ID has enough space to accept a new subscription.
Expand All @@ -207,8 +220,8 @@ where
let Some(sub_data) =
reserved_subscription.insert_subscription(sub_id.clone(), with_runtime)
else {
// Inserting the subscription can only fail if the JsonRPSee
// generated a duplicate subscription ID.
// Inserting the subscription can only fail if the JsonRPSee generated a duplicate
// subscription ID.
debug!(target: LOG_TARGET, "[follow][id={:?}] Subscription already accepted", sub_id);
let msg = to_sub_message(&sink, &FollowEvent::<String>::Stop);
let _ = sink.send(msg).await;
Expand All @@ -222,9 +235,13 @@ where
subscriptions,
with_runtime,
sub_id.clone(),
max_lagging_distance,
);

chain_head_follow.generate_events(sink, sub_data).await;
let result = chain_head_follow.generate_events(sink, sub_data).await;
if let Err(SubscriptionManagementError::BlockDistanceTooLarge) = result {
debug!(target: LOG_TARGET, "[follow][id={:?}] All subscriptions are stopped", sub_id);
reserved_subscription.stop_all_subscriptions();
}

debug!(target: LOG_TARGET, "[follow][id={:?}] Subscription removed", sub_id);
};
Expand Down
71 changes: 62 additions & 9 deletions substrate/client/rpc-spec-v2/src/chain_head/chain_head_follow.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,12 +41,14 @@ use sp_api::CallApiAt;
use sp_blockchain::{
Backend as BlockChainBackend, Error as BlockChainError, HeaderBackend, HeaderMetadata, Info,
};
use sp_runtime::traits::{Block as BlockT, Header as HeaderT, NumberFor};
use sp_runtime::{
traits::{Block as BlockT, Header as HeaderT, NumberFor},
SaturatedConversion, Saturating,
};
use std::{
collections::{HashSet, VecDeque},
sync::Arc,
};

/// The maximum number of finalized blocks provided by the
/// `Initialized` event.
const MAX_FINALIZED_BLOCKS: usize = 16;
Expand All @@ -67,6 +69,9 @@ pub struct ChainHeadFollower<BE: Backend<Block>, Block: BlockT, Client> {
sub_id: String,
/// The best reported block by this subscription.
best_block_cache: Option<Block::Hash>,
/// Stop all subscriptions if the distance between the leaves and the current finalized
/// block is larger than this value.
max_lagging_distance: usize,
}

impl<BE: Backend<Block>, Block: BlockT, Client> ChainHeadFollower<BE, Block, Client> {
Expand All @@ -77,8 +82,17 @@ impl<BE: Backend<Block>, Block: BlockT, Client> ChainHeadFollower<BE, Block, Cli
sub_handle: SubscriptionManagement<Block, BE>,
with_runtime: bool,
sub_id: String,
max_lagging_distance: usize,
) -> Self {
Self { client, backend, sub_handle, with_runtime, sub_id, best_block_cache: None }
Self {
client,
backend,
sub_handle,
with_runtime,
sub_id,
best_block_cache: None,
max_lagging_distance,
}
}
}

Expand Down Expand Up @@ -186,6 +200,35 @@ where
}
}

/// Check the distance between the provided blocks does not exceed a
/// a reasonable range.
///
/// When the blocks are too far apart (potentially millions of blocks):
/// - Tree route is expensive to calculate.
/// - The RPC layer will not be able to generate the `NewBlock` events for all blocks.
///
/// This edge-case can happen for parachains where the relay chain syncs slower to
/// the head of the chain than the parachain node that is synced already.
fn distace_within_reason(
&self,
block: Block::Hash,
finalized: Block::Hash,
) -> Result<(), SubscriptionManagementError> {
let Some(block_num) = self.client.number(block)? else {
return Err(SubscriptionManagementError::BlockHashAbsent)
};
let Some(finalized_num) = self.client.number(finalized)? else {
return Err(SubscriptionManagementError::BlockHashAbsent)
};

let distance: usize = block_num.saturating_sub(finalized_num).saturated_into();
if distance > self.max_lagging_distance {
return Err(SubscriptionManagementError::BlockDistanceTooLarge);
}

Ok(())
}

/// Get the in-memory blocks of the client, starting from the provided finalized hash.
///
/// The reported blocks are pinned by this function.
Expand All @@ -198,6 +241,13 @@ where
let mut pruned_forks = HashSet::new();
let mut finalized_block_descendants = Vec::new();
let mut unique_descendants = HashSet::new();

// Ensure all leaves are within a reasonable distance from the finalized block,
// before traversing the tree.
for leaf in &leaves {
self.distace_within_reason(*leaf, finalized)?;
}

for leaf in leaves {
let tree_route = sp_blockchain::tree_route(blockchain, finalized, leaf)?;

Expand Down Expand Up @@ -542,7 +592,8 @@ where
mut to_ignore: HashSet<Block::Hash>,
sink: SubscriptionSink,
rx_stop: oneshot::Receiver<()>,
) where
) -> Result<(), SubscriptionManagementError>
where
EventStream: Stream<Item = NotificationType<Block>> + Unpin,
{
let mut stream_item = stream.next();
Expand Down Expand Up @@ -576,7 +627,7 @@ where
);
let msg = to_sub_message(&sink, &FollowEvent::<String>::Stop);
let _ = sink.send(msg).await;
return
return Err(err)
},
};

Expand All @@ -591,7 +642,8 @@ where

let msg = to_sub_message(&sink, &FollowEvent::<String>::Stop);
let _ = sink.send(msg).await;
return
// No need to propagate this error further, the client disconnected.
return Ok(())
}
}

Expand All @@ -605,14 +657,15 @@ where
// - the client disconnected.
let msg = to_sub_message(&sink, &FollowEvent::<String>::Stop);
let _ = sink.send(msg).await;
Ok(())
}

/// Generate the block events for the `chainHead_follow` method.
pub async fn generate_events(
&mut self,
sink: SubscriptionSink,
sub_data: InsertedSubscriptionData<Block>,
) {
) -> Result<(), SubscriptionManagementError> {
// Register for the new block and finalized notifications.
let stream_import = self
.client
Expand Down Expand Up @@ -640,7 +693,7 @@ where
);
let msg = to_sub_message(&sink, &FollowEvent::<String>::Stop);
let _ = sink.send(msg).await;
return
return Err(err)
},
};

Expand All @@ -650,6 +703,6 @@ where
let stream = stream::once(futures::future::ready(initial)).chain(merged);

self.submit_events(&startup_point, stream.boxed(), pruned_forks, sink, sub_data.rx_stop)
.await;
.await
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,9 @@ pub enum SubscriptionManagementError {
/// The unpin method was called with duplicate hashes.
#[error("Duplicate hashes")]
DuplicateHashes,
/// The distance between the leaves and the current finalized block is too large.
#[error("Distance too large")]
BlockDistanceTooLarge,
/// Custom error.
#[error("Subscription error {0}")]
Custom(String),
Expand All @@ -57,6 +60,7 @@ impl PartialEq for SubscriptionManagementError {
(Self::BlockHeaderAbsent, Self::BlockHeaderAbsent) |
(Self::SubscriptionAbsent, Self::SubscriptionAbsent) |
(Self::DuplicateHashes, Self::DuplicateHashes) => true,
(Self::BlockDistanceTooLarge, Self::BlockDistanceTooLarge) => true,
(Self::Custom(lhs), Self::Custom(rhs)) => lhs == rhs,
_ => false,
}
Expand Down
Loading

0 comments on commit 287b116

Please sign in to comment.