Skip to content
This repository has been archived by the owner on Nov 15, 2023. It is now read-only.

chainHead: Add support for storage pagination and cancellation #14755

Merged
merged 19 commits into from
Aug 24, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
19 commits
Select commit Hold shift + click to select a range
a97fd42
chainHead/api: Add `chain_head_unstable_continue` method
lexnv Aug 2, 2023
fe3404c
chainHead/subscriptions: Register operations for pagination
lexnv Aug 9, 2023
72b26c3
chainHead/subscriptions: Merge limits with registered operation
lexnv Aug 9, 2023
e28d982
chainHead/subscriptions: Expose the operation state
lexnv Aug 10, 2023
1d717df
chain_head/storage: Generate WaitingForContinue event
lexnv Aug 10, 2023
3bbc9ba
chainHead: Use the continue operation
lexnv Aug 10, 2023
5afe9c0
chainHead/tests: Adjust testing to the new storage interface
lexnv Aug 10, 2023
c8e8ed5
chainHead/config: Make pagination limit configurable
lexnv Aug 10, 2023
60492c7
chainHead/tests: Adjust chainHeadConfig
lexnv Aug 10, 2023
6929ec5
chainHead/tests: Check pagination and continue method
lexnv Aug 10, 2023
2791c90
chainHead/api: Add `chainHead_unstable_stopOperation` method
lexnv Aug 11, 2023
8bda477
chainHead/subscription: Add shared atomic state for efficient alloc
lexnv Aug 11, 2023
382ce98
chainHead: Implement operation stop
lexnv Aug 11, 2023
3a5c12a
chainHead/tests: Check that storage ops can be cancelled
lexnv Aug 11, 2023
6d825a6
chainHead/storage: Change docs for query_storage_iter_pagination
lexnv Aug 11, 2023
cab6202
chainHead/subscriptions: Fix merge conflicts
lexnv Aug 16, 2023
141ca3f
chainHead: Replace `async-channel` with `tokio::sync`
lexnv Aug 23, 2023
7ccef40
chainHead/subscription: Add comment about the sender/recv continue
lexnv Aug 23, 2023
d1b2817
Merge remote-tracking branch 'origin/master' into lexnv/chainhead_pag
lexnv Aug 24, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
27 changes: 27 additions & 0 deletions client/rpc-spec-v2/src/chain_head/api.rs
Original file line number Diff line number Diff line change
Expand Up @@ -119,4 +119,31 @@ pub trait ChainHeadApi<Hash> {
/// This method is unstable and subject to change in the future.
#[method(name = "chainHead_unstable_unpin", blocking)]
fn chain_head_unstable_unpin(&self, follow_subscription: String, hash: Hash) -> RpcResult<()>;

/// Resumes a storage fetch started with `chainHead_storage` after it has generated an
/// `operationWaitingForContinue` event.
///
/// # Unstable
///
/// This method is unstable and subject to change in the future.
#[method(name = "chainHead_unstable_continue", blocking)]
fn chain_head_unstable_continue(
&self,
follow_subscription: String,
operation_id: String,
) -> RpcResult<()>;

/// Stops an operation started with chainHead_unstable_body, chainHead_unstable_call, or
/// chainHead_unstable_storage. If the operation was still in progress, this interrupts it. If
/// the operation was already finished, this call has no effect.
///
/// # Unstable
///
/// This method is unstable and subject to change in the future.
#[method(name = "chainHead_unstable_stopOperation", blocking)]
fn chain_head_unstable_stop_operation(
&self,
follow_subscription: String,
operation_id: String,
) -> RpcResult<()>;
}
82 changes: 63 additions & 19 deletions client/rpc-spec-v2/src/chain_head/chain_head.rs
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,9 @@ pub struct ChainHeadConfig {
pub subscription_max_pinned_duration: Duration,
/// The maximum number of ongoing operations per subscription.
pub subscription_max_ongoing_operations: usize,
/// The maximum number of items reported by the `chainHead_storage` before
/// pagination is required.
pub operation_max_storage_items: usize,
}

/// Maximum pinned blocks across all connections.
Expand All @@ -78,12 +81,17 @@ const MAX_PINNED_DURATION: Duration = Duration::from_secs(60);
/// Note: The lower limit imposed by the spec is 16.
const MAX_ONGOING_OPERATIONS: usize = 16;

/// The maximum number of items the `chainHead_storage` can return
/// before paginations is required.
const MAX_STORAGE_ITER_ITEMS: usize = 5;

impl Default for ChainHeadConfig {
fn default() -> Self {
ChainHeadConfig {
global_max_pinned_blocks: MAX_PINNED_BLOCKS,
subscription_max_pinned_duration: MAX_PINNED_DURATION,
subscription_max_ongoing_operations: MAX_ONGOING_OPERATIONS,
operation_max_storage_items: MAX_STORAGE_ITER_ITEMS,
}
}
}
Expand All @@ -100,6 +108,9 @@ pub struct ChainHead<BE: Backend<Block>, Block: BlockT, Client> {
subscriptions: Arc<SubscriptionManagement<Block, BE>>,
/// The hexadecimal encoded hash of the genesis block.
genesis_hash: String,
/// The maximum number of items reported by the `chainHead_storage` before
/// pagination is required.
operation_max_storage_items: usize,
/// Phantom member to pin the block type.
_phantom: PhantomData<Block>,
}
Expand All @@ -124,6 +135,7 @@ impl<BE: Backend<Block>, Block: BlockT, Client> ChainHead<BE, Block, Client> {
config.subscription_max_ongoing_operations,
backend,
)),
operation_max_storage_items: config.operation_max_storage_items,
genesis_hash,
_phantom: PhantomData,
}
Expand Down Expand Up @@ -232,7 +244,7 @@ where
follow_subscription: String,
hash: Block::Hash,
) -> RpcResult<MethodResponse> {
let block_guard = match self.subscriptions.lock_block(&follow_subscription, hash, 1) {
let mut block_guard = match self.subscriptions.lock_block(&follow_subscription, hash, 1) {
Ok(block) => block,
Err(SubscriptionManagementError::SubscriptionAbsent) |
Err(SubscriptionManagementError::ExceededLimits) => return Ok(MethodResponse::LimitReached),
Expand All @@ -243,6 +255,8 @@ where
Err(_) => return Err(ChainHeadRpcError::InvalidBlock.into()),
};

let operation_id = block_guard.operation().operation_id();

let event = match self.client.block(hash) {
Ok(Some(signed_block)) => {
let extrinsics = signed_block
Expand All @@ -252,7 +266,7 @@ where
.map(|extrinsic| hex_string(&extrinsic.encode()))
.collect();
FollowEvent::<Block::Hash>::OperationBodyDone(OperationBodyDone {
operation_id: block_guard.operation_id(),
operation_id: operation_id.clone(),
value: extrinsics,
})
},
Expand All @@ -268,16 +282,13 @@ where
return Err(ChainHeadRpcError::InvalidBlock.into())
},
Err(error) => FollowEvent::<Block::Hash>::OperationError(OperationError {
operation_id: block_guard.operation_id(),
operation_id: operation_id.clone(),
error: error.to_string(),
}),
};

let _ = block_guard.response_sender().unbounded_send(event);
Ok(MethodResponse::Started(MethodResponseStarted {
operation_id: block_guard.operation_id(),
discarded_items: None,
}))
Ok(MethodResponse::Started(MethodResponseStarted { operation_id, discarded_items: None }))
}

fn chain_head_unstable_header(
Expand Down Expand Up @@ -337,7 +348,7 @@ where
.transpose()?
.map(ChildInfo::new_default_from_vec);

let block_guard =
let mut block_guard =
match self.subscriptions.lock_block(&follow_subscription, hash, items.len()) {
Ok(block) => block,
Err(SubscriptionManagementError::SubscriptionAbsent) |
Expand All @@ -349,17 +360,21 @@ where
Err(_) => return Err(ChainHeadRpcError::InvalidBlock.into()),
};

let storage_client = ChainHeadStorage::<Client, Block, BE>::new(self.client.clone());
let operation_id = block_guard.operation_id();
let mut storage_client = ChainHeadStorage::<Client, Block, BE>::new(
self.client.clone(),
self.operation_max_storage_items,
);
let operation = block_guard.operation();
let operation_id = operation.operation_id();

// The number of operations we are allowed to execute.
let num_operations = block_guard.num_reserved();
let num_operations = operation.num_reserved();
let discarded = items.len().saturating_sub(num_operations);
let mut items = items;
items.truncate(num_operations);

let fut = async move {
storage_client.generate_events(block_guard, hash, items, child_trie);
storage_client.generate_events(block_guard, hash, items, child_trie).await;
};

self.executor
Expand All @@ -379,7 +394,7 @@ where
) -> RpcResult<MethodResponse> {
let call_parameters = Bytes::from(parse_hex_param(call_parameters)?);

let block_guard = match self.subscriptions.lock_block(&follow_subscription, hash, 1) {
let mut block_guard = match self.subscriptions.lock_block(&follow_subscription, hash, 1) {
Ok(block) => block,
Err(SubscriptionManagementError::SubscriptionAbsent) |
Err(SubscriptionManagementError::ExceededLimits) => {
Expand All @@ -401,28 +416,26 @@ where
.into())
}

let operation_id = block_guard.operation().operation_id();
let event = self
.client
.executor()
.call(hash, &function, &call_parameters, CallContext::Offchain)
.map(|result| {
FollowEvent::<Block::Hash>::OperationCallDone(OperationCallDone {
operation_id: block_guard.operation_id(),
operation_id: operation_id.clone(),
output: hex_string(&result),
})
})
.unwrap_or_else(|error| {
FollowEvent::<Block::Hash>::OperationError(OperationError {
operation_id: block_guard.operation_id(),
operation_id: operation_id.clone(),
error: error.to_string(),
})
});

let _ = block_guard.response_sender().unbounded_send(event);
Ok(MethodResponse::Started(MethodResponseStarted {
operation_id: block_guard.operation_id(),
discarded_items: None,
}))
Ok(MethodResponse::Started(MethodResponseStarted { operation_id, discarded_items: None }))
}

fn chain_head_unstable_unpin(
Expand All @@ -443,4 +456,35 @@ where
Err(_) => Err(ChainHeadRpcError::InvalidBlock.into()),
}
}

fn chain_head_unstable_continue(
&self,
follow_subscription: String,
operation_id: String,
) -> RpcResult<()> {
let Some(operation) = self.subscriptions.get_operation(&follow_subscription, &operation_id) else {
return Ok(())
};

if !operation.submit_continue() {
// Continue called without generating a `WaitingForContinue` event.
Err(ChainHeadRpcError::InvalidContinue.into())
} else {
Ok(())
}
}

fn chain_head_unstable_stop_operation(
&self,
follow_subscription: String,
operation_id: String,
) -> RpcResult<()> {
let Some(operation) = self.subscriptions.get_operation(&follow_subscription, &operation_id) else {
return Ok(())
};

operation.stop_operation();

Ok(())
}
}
Loading