Skip to content
This repository has been archived by the owner on Nov 15, 2023. It is now read-only.

chainHead: Add support for storage pagination and cancellation #14755

Merged
merged 19 commits into from
Aug 24, 2023
Merged
Show file tree
Hide file tree
Changes from 16 commits
Commits
Show all changes
19 commits
Select commit Hold shift + click to select a range
a97fd42
chainHead/api: Add `chain_head_unstable_continue` method
lexnv Aug 2, 2023
fe3404c
chainHead/subscriptions: Register operations for pagination
lexnv Aug 9, 2023
72b26c3
chainHead/subscriptions: Merge limits with registered operation
lexnv Aug 9, 2023
e28d982
chainHead/subscriptions: Expose the operation state
lexnv Aug 10, 2023
1d717df
chain_head/storage: Generate WaitingForContinue event
lexnv Aug 10, 2023
3bbc9ba
chainHead: Use the continue operation
lexnv Aug 10, 2023
5afe9c0
chainHead/tests: Adjust testing to the new storage interface
lexnv Aug 10, 2023
c8e8ed5
chainHead/config: Make pagination limit configurable
lexnv Aug 10, 2023
60492c7
chainHead/tests: Adjust chainHeadConfig
lexnv Aug 10, 2023
6929ec5
chainHead/tests: Check pagination and continue method
lexnv Aug 10, 2023
2791c90
chainHead/api: Add `chainHead_unstable_stopOperation` method
lexnv Aug 11, 2023
8bda477
chainHead/subscription: Add shared atomic state for efficient alloc
lexnv Aug 11, 2023
382ce98
chainHead: Implement operation stop
lexnv Aug 11, 2023
3a5c12a
chainHead/tests: Check that storage ops can be cancelled
lexnv Aug 11, 2023
6d825a6
chainHead/storage: Change docs for query_storage_iter_pagination
lexnv Aug 11, 2023
cab6202
chainHead/subscriptions: Fix merge conflicts
lexnv Aug 16, 2023
141ca3f
chainHead: Replace `async-channel` with `tokio::sync`
lexnv Aug 23, 2023
7ccef40
chainHead/subscription: Add comment about the sender/recv continue
lexnv Aug 23, 2023
d1b2817
Merge remote-tracking branch 'origin/master' into lexnv/chainhead_pag
lexnv Aug 24, 2023
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions client/rpc-spec-v2/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ tokio = { version = "1.22.0", features = ["sync"] }
array-bytes = "6.1"
log = "0.4.17"
futures-util = { version = "0.3.19", default-features = false }
async-channel = "1.8.0"
Copy link
Member

@niklasad1 niklasad1 Aug 22, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you explain why async-channel is used here?

You already got the futures-channel and tokio sync in the dependency tree so I don't follow why this dependency is introduced as the both tokio and futures-channel provides an API for the stuff you implemented on top of the channel

EDIT: tokio::sync::mpsc creates a buffer with 1 and futures_channel a buffer with 1 + num senders, so I guess you want exactly one here

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That makes sense! I can't actually remember if I picked the async-channel here since it doesn't require the receiver part to be mutable 🤔

I'll change a bit the API since it's worth not adding extra dependencies, thanks!

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ah, didn't notice that :D


[dev-dependencies]
serde_json = "1.0"
Expand Down
27 changes: 27 additions & 0 deletions client/rpc-spec-v2/src/chain_head/api.rs
Original file line number Diff line number Diff line change
Expand Up @@ -119,4 +119,31 @@ pub trait ChainHeadApi<Hash> {
/// This method is unstable and subject to change in the future.
#[method(name = "chainHead_unstable_unpin", blocking)]
fn chain_head_unstable_unpin(&self, follow_subscription: String, hash: Hash) -> RpcResult<()>;

/// Resumes a storage fetch started with `chainHead_storage` after it has generated an
/// `operationWaitingForContinue` event.
///
/// # Unstable
///
/// This method is unstable and subject to change in the future.
#[method(name = "chainHead_unstable_continue", blocking)]
fn chain_head_unstable_continue(
&self,
follow_subscription: String,
operation_id: String,
) -> RpcResult<()>;

/// Stops an operation started with chainHead_unstable_body, chainHead_unstable_call, or
/// chainHead_unstable_storage. If the operation was still in progress, this interrupts it. If
/// the operation was already finished, this call has no effect.
///
/// # Unstable
///
/// This method is unstable and subject to change in the future.
#[method(name = "chainHead_unstable_stopOperation", blocking)]
fn chain_head_unstable_stop_operation(
&self,
follow_subscription: String,
operation_id: String,
) -> RpcResult<()>;
}
69 changes: 59 additions & 10 deletions client/rpc-spec-v2/src/chain_head/chain_head.rs
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,9 @@ pub struct ChainHeadConfig {
pub subscription_max_pinned_duration: Duration,
/// The maximum number of ongoing operations per subscription.
pub subscription_max_ongoing_operations: usize,
/// The maximum number of items reported by the `chainHead_storage` before
/// pagination is required.
pub operation_max_storage_items: usize,
}

/// Maximum pinned blocks across all connections.
Expand All @@ -78,12 +81,17 @@ const MAX_PINNED_DURATION: Duration = Duration::from_secs(60);
/// Note: The lower limit imposed by the spec is 16.
const MAX_ONGOING_OPERATIONS: usize = 16;

/// The maximum number of items the `chainHead_storage` can return
/// before paginations is required.
const MAX_STORAGE_ITER_ITEMS: usize = 5;

impl Default for ChainHeadConfig {
fn default() -> Self {
ChainHeadConfig {
global_max_pinned_blocks: MAX_PINNED_BLOCKS,
subscription_max_pinned_duration: MAX_PINNED_DURATION,
subscription_max_ongoing_operations: MAX_ONGOING_OPERATIONS,
operation_max_storage_items: MAX_STORAGE_ITER_ITEMS,
}
}
}
Expand All @@ -100,6 +108,9 @@ pub struct ChainHead<BE: Backend<Block>, Block: BlockT, Client> {
subscriptions: Arc<SubscriptionManagement<Block, BE>>,
/// The hexadecimal encoded hash of the genesis block.
genesis_hash: String,
/// The maximum number of items reported by the `chainHead_storage` before
/// pagination is required.
operation_max_storage_items: usize,
/// Phantom member to pin the block type.
_phantom: PhantomData<Block>,
}
Expand All @@ -124,6 +135,7 @@ impl<BE: Backend<Block>, Block: BlockT, Client> ChainHead<BE, Block, Client> {
config.subscription_max_ongoing_operations,
backend,
)),
operation_max_storage_items: config.operation_max_storage_items,
genesis_hash,
_phantom: PhantomData,
}
Expand Down Expand Up @@ -243,6 +255,7 @@ where
Err(_) => return Err(ChainHeadRpcError::InvalidBlock.into()),
};

let operation = block_guard.operation();
let event = match self.client.block(hash) {
Ok(Some(signed_block)) => {
let extrinsics = signed_block
Expand All @@ -252,7 +265,7 @@ where
.map(|extrinsic| hex_string(&extrinsic.encode()))
.collect();
FollowEvent::<Block::Hash>::OperationBodyDone(OperationBodyDone {
operation_id: block_guard.operation_id(),
operation_id: operation.operation_id(),
value: extrinsics,
})
},
Expand All @@ -268,14 +281,14 @@ where
return Err(ChainHeadRpcError::InvalidBlock.into())
},
Err(error) => FollowEvent::<Block::Hash>::OperationError(OperationError {
operation_id: block_guard.operation_id(),
operation_id: operation.operation_id(),
error: error.to_string(),
}),
};

let _ = block_guard.response_sender().unbounded_send(event);
Ok(MethodResponse::Started(MethodResponseStarted {
operation_id: block_guard.operation_id(),
operation_id: operation.operation_id(),
discarded_items: None,
}))
}
Expand Down Expand Up @@ -349,17 +362,21 @@ where
Err(_) => return Err(ChainHeadRpcError::InvalidBlock.into()),
};

let storage_client = ChainHeadStorage::<Client, Block, BE>::new(self.client.clone());
let operation_id = block_guard.operation_id();
let mut storage_client = ChainHeadStorage::<Client, Block, BE>::new(
self.client.clone(),
self.operation_max_storage_items,
);
let operation = block_guard.operation();
let operation_id = operation.operation_id();

// The number of operations we are allowed to execute.
let num_operations = block_guard.num_reserved();
let num_operations = operation.num_reserved();
let discarded = items.len().saturating_sub(num_operations);
let mut items = items;
items.truncate(num_operations);

let fut = async move {
storage_client.generate_events(block_guard, hash, items, child_trie);
storage_client.generate_events(block_guard, hash, items, child_trie).await;
};

self.executor
Expand Down Expand Up @@ -401,26 +418,27 @@ where
.into())
}

let operation = block_guard.operation();
let event = self
.client
.executor()
.call(hash, &function, &call_parameters, CallContext::Offchain)
.map(|result| {
FollowEvent::<Block::Hash>::OperationCallDone(OperationCallDone {
operation_id: block_guard.operation_id(),
operation_id: operation.operation_id(),
output: hex_string(&result),
})
})
.unwrap_or_else(|error| {
FollowEvent::<Block::Hash>::OperationError(OperationError {
operation_id: block_guard.operation_id(),
operation_id: operation.operation_id(),
error: error.to_string(),
})
});

let _ = block_guard.response_sender().unbounded_send(event);
Ok(MethodResponse::Started(MethodResponseStarted {
operation_id: block_guard.operation_id(),
operation_id: operation.operation_id(),
discarded_items: None,
}))
}
Expand All @@ -443,4 +461,35 @@ where
Err(_) => Err(ChainHeadRpcError::InvalidBlock.into()),
}
}

fn chain_head_unstable_continue(
&self,
follow_subscription: String,
operation_id: String,
) -> RpcResult<()> {
let Some(operation) = self.subscriptions.get_operation(&follow_subscription, &operation_id) else {
return Ok(())
};

if !operation.submit_continue() {
// Continue called without generating a `WaitingForContinue` event.
Err(ChainHeadRpcError::InvalidContinue.into())
} else {
Ok(())
}
}

fn chain_head_unstable_stop_operation(
&self,
follow_subscription: String,
operation_id: String,
) -> RpcResult<()> {
let Some(operation) = self.subscriptions.get_operation(&follow_subscription, &operation_id) else {
return Ok(())
};

operation.stop_operation();

Ok(())
}
}
Loading