Skip to content
This repository has been archived by the owner on Nov 15, 2023. It is now read-only.

Introduce async runtime calling trait for runtime-api subsystem #5782

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
32 commits
Select commit Hold shift + click to select a range
870f7dd
Implement OverseerRuntimeClient
skunert Mar 15, 2022
262c6f2
blockchainevents
skunert Mar 24, 2022
c2d13f0
Merge branch 'master' into collator-rpc-poc
skunert Apr 8, 2022
0890e08
Update patches
skunert Apr 11, 2022
59d3357
Finish merging rntime-api subsystem
skunert Apr 25, 2022
acb33f2
Merge branch 'cumulus-2022-04-25' into collator-rpc-poc
skunert Apr 25, 2022
ab7dfeb
First version that is able to produce blocks
skunert May 6, 2022
d88094a
Merge branch 'polkadot-cumulus-2022-05-11' into collator-rpc-poc
skunert May 11, 2022
e37cdf9
Make OverseerRuntimeClient async
skunert May 18, 2022
684696f
Move overseer notification stream forwarding to cumulus
skunert May 27, 2022
a2454d5
Merge branch 'cumulus-2022-05-30' into collator-rpc-poc
skunert May 30, 2022
2f4ea76
Remove unused imports
skunert Jun 1, 2022
3a1a586
Add more logging to collator-protocol
skunert Jun 16, 2022
fba6b8f
Lockfile
skunert Jun 20, 2022
6469e3a
Merge branch 'cumulus-2022-06-20' into collator-rpc-poc
skunert Jun 20, 2022
19123f7
Use hashes in OverseerRuntimeClient
skunert Jul 11, 2022
969586b
Merge branch 'cumulus-2022-07-11' into collator-rpc-poc
skunert Jul 11, 2022
accd10d
Move OverseerRuntimeClient into extra module
skunert Jul 12, 2022
71f8bb1
Fix old session info call and make HeadSupportsParachain async
skunert Jul 13, 2022
01b633e
Improve naming of trait
skunert Jul 14, 2022
82c95aa
Merge branch 'master' into runtime-api-async-trait
skunert Jul 14, 2022
d04ec57
Cleanup
skunert Jul 14, 2022
9f5f084
Remove unused From trait implementation
skunert Jul 14, 2022
ed2b7ca
Remove unwanted debug print
skunert Jul 14, 2022
894ab5b
Move trait to polkadot-node-subsystem-types
skunert Jul 15, 2022
18ff03a
Add sections to runtime client
skunert Jul 15, 2022
b1fa616
Reorder methods
skunert Jul 15, 2022
b0d1a25
Fix spelling
skunert Jul 18, 2022
e693405
Fix spacing in Cargo.toml
skunert Jul 18, 2022
56c47df
Remove unused babe methods
skunert Jul 18, 2022
f4868db
Merge branch 'master' into runtime-api-async-trait
skunert Jul 19, 2022
bba2af7
Merge branch 'master' into runtime-api-async-trait
skunert Jul 19, 2022
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 9 additions & 2 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions node/core/approval-voting/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ sp-application-crypto = { git = "https://github.com/paritytech/substrate", branc
sp-runtime = { git = "https://github.com/paritytech/substrate", branch = "master", default-features = false }

[dev-dependencies]
async-trait = "0.1.56"
parking_lot = "0.12.0"
rand_core = "0.5.1" # should match schnorrkel
sp-keyring = { git = "https://github.com/paritytech/substrate", branch = "master" }
Expand Down
4 changes: 3 additions & 1 deletion node/core/approval-voting/src/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ use polkadot_primitives::v2::{
use std::time::Duration;

use assert_matches::assert_matches;
use async_trait::async_trait;
use parking_lot::Mutex;
use sp_keyring::sr25519::Keyring as Sr25519Keyring;
use sp_keystore::CryptoStore;
Expand Down Expand Up @@ -117,8 +118,9 @@ pub mod test_constants {

struct MockSupportsParachains;

#[async_trait]
impl HeadSupportsParachains for MockSupportsParachains {
fn head_supports_parachains(&self, _head: &Hash) -> bool {
async fn head_supports_parachains(&self, _head: &Hash) -> bool {
true
}
}
Expand Down
7 changes: 4 additions & 3 deletions node/core/runtime-api/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -10,15 +10,16 @@ gum = { package = "tracing-gum", path = "../../gum" }
memory-lru = "0.1.0"
parity-util-mem = { version = "0.11.0", default-features = false }

sp-api = { git = "https://github.com/paritytech/substrate", branch = "master" }
sp-authority-discovery = { git = "https://github.com/paritytech/substrate", branch = "master" }
sp-consensus-babe = { git = "https://github.com/paritytech/substrate", branch = "master" }

polkadot-primitives = { path = "../../../primitives" }
polkadot-node-subsystem = {path = "../../subsystem" }
polkadot-node-subsystem = { path = "../../subsystem" }
polkadot-node-subsystem-types = { path = "../../subsystem-types" }
polkadot-node-subsystem-util = { path = "../../subsystem-util" }

[dev-dependencies]
sp-api = { git = "https://github.com/paritytech/substrate", branch = "master" }
sp-authority-discovery = { git = "https://github.com/paritytech/substrate", branch = "master" }
sp-core = { git = "https://github.com/paritytech/substrate", branch = "master" }
sp-keyring = { git = "https://github.com/paritytech/substrate", branch = "master" }
futures = { version = "0.3.21", features = ["thread-pool"] }
Expand Down
57 changes: 19 additions & 38 deletions node/core/runtime-api/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,14 +27,8 @@ use polkadot_node_subsystem::{
messages::{RuntimeApiMessage, RuntimeApiRequest as Request},
overseer, FromOrchestra, OverseerSignal, SpawnedSubsystem, SubsystemError, SubsystemResult,
};
use polkadot_primitives::{
runtime_api::ParachainHost,
v2::{Block, BlockId, Hash},
};

use sp_api::ProvideRuntimeApi;
use sp_authority_discovery::AuthorityDiscoveryApi;
use sp_consensus_babe::BabeApi;
use polkadot_node_subsystem_types::RuntimeApiSubsystemClient;
use polkadot_primitives::v2::Hash;

use cache::{RequestResult, RequestResultCache};
use futures::{channel::oneshot, prelude::*, select, stream::FuturesUnordered};
Expand Down Expand Up @@ -88,8 +82,7 @@ impl<Client> RuntimeApiSubsystem<Client> {
#[overseer::subsystem(RuntimeApi, error = SubsystemError, prefix = self::overseer)]
impl<Client, Context> RuntimeApiSubsystem<Client>
where
Client: ProvideRuntimeApi<Block> + Send + 'static + Sync,
Client::Api: ParachainHost<Block> + BabeApi<Block> + AuthorityDiscoveryApi<Block>,
Client: RuntimeApiSubsystemClient + Send + Sync + 'static,
{
fn start(self, ctx: Context) -> SpawnedSubsystem {
SpawnedSubsystem { future: run(ctx, self).boxed(), name: "runtime-api-subsystem" }
Expand All @@ -98,8 +91,7 @@ where

impl<Client> RuntimeApiSubsystem<Client>
where
Client: ProvideRuntimeApi<Block> + Send + 'static + Sync,
Client::Api: ParachainHost<Block> + BabeApi<Block> + AuthorityDiscoveryApi<Block>,
Client: RuntimeApiSubsystemClient + Send + 'static + Sync,
{
fn store_cache(&mut self, result: RequestResult) {
use RequestResult::*;
Expand Down Expand Up @@ -282,7 +274,7 @@ where
};

let request = async move {
let result = make_runtime_api_request(client, metrics, relay_parent, request);
let result = make_runtime_api_request(client, metrics, relay_parent, request).await;
let _ = sender.send(result);
}
.boxed();
Expand Down Expand Up @@ -317,8 +309,7 @@ async fn run<Client, Context>(
mut subsystem: RuntimeApiSubsystem<Client>,
) -> SubsystemResult<()>
where
Client: ProvideRuntimeApi<Block> + Send + Sync + 'static,
Client::Api: ParachainHost<Block> + BabeApi<Block> + AuthorityDiscoveryApi<Block>,
Client: RuntimeApiSubsystemClient + Send + Sync + 'static,
{
loop {
// Let's add some back pressure when the subsystem is running at `MAX_PARALLEL_REQUESTS`.
Expand Down Expand Up @@ -348,26 +339,21 @@ where
}
}

fn make_runtime_api_request<Client>(
async fn make_runtime_api_request<Client>(
client: Arc<Client>,
metrics: Metrics,
relay_parent: Hash,
request: Request,
) -> Option<RequestResult>
where
Client: ProvideRuntimeApi<Block>,
Client::Api: ParachainHost<Block> + BabeApi<Block> + AuthorityDiscoveryApi<Block>,
Client: RuntimeApiSubsystemClient + 'static,
{
use sp_api::ApiExt;

let _timer = metrics.time_make_runtime_api_request();

macro_rules! query {
($req_variant:ident, $api_name:ident ($($param:expr),*), ver = $version:literal, $sender:expr) => {{
let sender = $sender;
let api = client.runtime_api();

let runtime_version = api.api_version::<dyn ParachainHost<Block>>(&BlockId::Hash(relay_parent))
let runtime_version = client.api_version_parachain_host(relay_parent).await
.unwrap_or_else(|e| {
gum::warn!(
target: LOG_TARGET,
Expand All @@ -385,7 +371,7 @@ where
});

let res = if runtime_version >= $version {
api.$api_name(&BlockId::Hash(relay_parent) $(, $param.clone() )*)
client.$api_name(relay_parent $(, $param.clone() )*).await
.map_err(|e| RuntimeApiError::Execution {
runtime_api_name: stringify!($api_name),
source: std::sync::Arc::new(e),
Expand All @@ -404,11 +390,7 @@ where

match request {
Request::Version(sender) => {
let api = client.runtime_api();

let runtime_version = match api
.api_version::<dyn ParachainHost<Block>>(&BlockId::Hash(relay_parent))
{
let runtime_version = match client.api_version_parachain_host(relay_parent).await {
Ok(Some(v)) => Ok(v),
Ok(None) => Err(RuntimeApiError::NotSupported { runtime_api_name: "api_version" }),
Err(e) => Err(RuntimeApiError::Execution {
Expand Down Expand Up @@ -465,25 +447,24 @@ where
Request::CandidateEvents(sender) =>
query!(CandidateEvents, candidate_events(), ver = 1, sender),
Request::SessionInfo(index, sender) => {
let api = client.runtime_api();
let block_id = BlockId::Hash(relay_parent);

let api_version = api
.api_version::<dyn ParachainHost<Block>>(&BlockId::Hash(relay_parent))
let api_version = client
.api_version_parachain_host(relay_parent)
.await
.unwrap_or_default()
.unwrap_or_default();

let res = if api_version >= 2 {
let res =
api.session_info(&block_id, index).map_err(|e| RuntimeApiError::Execution {
let res = client.session_info(relay_parent, index).await.map_err(|e| {
RuntimeApiError::Execution {
runtime_api_name: "SessionInfo",
source: std::sync::Arc::new(e),
});
}
});
metrics.on_request(res.is_ok());
res
} else {
#[allow(deprecated)]
let res = api.session_info_before_version_2(&block_id, index).map_err(|e| {
let res = client.session_info_before_version_2(relay_parent, index).await.map_err(|e| {
RuntimeApiError::Execution {
runtime_api_name: "SessionInfo",
source: std::sync::Arc::new(e),
Expand Down
18 changes: 12 additions & 6 deletions node/core/runtime-api/src/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,13 +20,19 @@ use ::test_helpers::{dummy_committed_candidate_receipt, dummy_validation_code};
use polkadot_node_primitives::{BabeAllowedSlots, BabeEpoch, BabeEpochConfiguration};
use polkadot_node_subsystem::SpawnGlue;
use polkadot_node_subsystem_test_helpers::make_subsystem_context;
use polkadot_primitives::v2::{
AuthorityDiscoveryId, BlockNumber, CandidateEvent, CandidateHash, CommittedCandidateReceipt,
CoreState, DisputeState, GroupRotationInfo, Id as ParaId, InboundDownwardMessage,
InboundHrmpMessage, OccupiedCoreAssumption, PersistedValidationData, PvfCheckStatement,
ScrapedOnChainVotes, SessionIndex, SessionInfo, ValidationCode, ValidationCodeHash,
ValidatorId, ValidatorIndex, ValidatorSignature,
use polkadot_primitives::{
runtime_api::ParachainHost,
v2::{
AuthorityDiscoveryId, Block, BlockNumber, CandidateEvent, CandidateHash,
CommittedCandidateReceipt, CoreState, DisputeState, GroupRotationInfo, Id as ParaId,
InboundDownwardMessage, InboundHrmpMessage, OccupiedCoreAssumption,
PersistedValidationData, PvfCheckStatement, ScrapedOnChainVotes, SessionIndex, SessionInfo,
ValidationCode, ValidationCodeHash, ValidatorId, ValidatorIndex, ValidatorSignature,
},
};
use sp_api::ProvideRuntimeApi;
use sp_authority_discovery::AuthorityDiscoveryApi;
use sp_consensus_babe::BabeApi;
use sp_core::testing::TaskExecutor;
use std::{
collections::{BTreeMap, HashMap},
Expand Down
1 change: 1 addition & 0 deletions node/overseer/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ gum = { package = "tracing-gum", path = "../gum" }
lru = "0.7"
parity-util-mem = { version = "0.11.0", default-features = false }
sp-core = { git = "https://github.com/paritytech/substrate", branch = "master" }
async-trait = "0.1.56"

[dev-dependencies]
metered = { package = "prioritized-metered-channel", path = "../metered-channel" }
Expand Down
5 changes: 4 additions & 1 deletion node/overseer/examples/minimal-example.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@

use futures::{channel::oneshot, pending, pin_mut, select, stream, FutureExt, StreamExt};
use futures_timer::Delay;
use orchestra::async_trait;
use std::time::Duration;

use ::test_helpers::{dummy_candidate_descriptor, dummy_hash};
Expand All @@ -34,8 +35,10 @@ use polkadot_overseer::{
use polkadot_primitives::v2::{CandidateReceipt, Hash};

struct AlwaysSupportsParachains;

#[async_trait]
impl HeadSupportsParachains for AlwaysSupportsParachains {
fn head_supports_parachains(&self, _head: &Hash) -> bool {
async fn head_supports_parachains(&self, _head: &Hash) -> bool {
true
}
}
Expand Down
37 changes: 16 additions & 21 deletions node/overseer/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -71,11 +71,7 @@ use futures::{channel::oneshot, future::BoxFuture, select, Future, FutureExt, St
use lru::LruCache;

use client::{BlockImportNotification, BlockchainEvents, FinalityNotification};
use polkadot_primitives::{
runtime_api::ParachainHost,
v2::{Block, BlockId, BlockNumber, Hash},
};
use sp_api::{ApiExt, ProvideRuntimeApi};
use polkadot_primitives::v2::{Block, BlockNumber, Hash};

use polkadot_node_subsystem_types::messages::{
ApprovalDistributionMessage, ApprovalVotingMessage, AvailabilityDistributionMessage,
Expand All @@ -89,6 +85,7 @@ use polkadot_node_subsystem_types::messages::{
pub use polkadot_node_subsystem_types::{
errors::{SubsystemError, SubsystemResult},
jaeger, ActivatedLeaf, ActiveLeavesUpdate, LeafStatus, OverseerSignal,
RuntimeApiSubsystemClient,
};

pub mod metrics;
Expand Down Expand Up @@ -157,25 +154,20 @@ impl<S: SpawnNamed + Clone + Send + Sync> crate::gen::Spawner for SpawnGlue<S> {
}

/// Whether a header supports parachain consensus or not.
#[async_trait::async_trait]
pub trait HeadSupportsParachains {
/// Return true if the given header supports parachain consensus. Otherwise, false.
fn head_supports_parachains(&self, head: &Hash) -> bool;
async fn head_supports_parachains(&self, head: &Hash) -> bool;
}

#[async_trait::async_trait]
impl<Client> HeadSupportsParachains for Arc<Client>
where
Client: ProvideRuntimeApi<Block>,
Client::Api: ParachainHost<Block>,
Client: RuntimeApiSubsystemClient + Sync + Send,
{
fn head_supports_parachains(&self, head: &Hash) -> bool {
let id = BlockId::Hash(*head);
async fn head_supports_parachains(&self, head: &Hash) -> bool {
// Check that the `ParachainHost` runtime api is at least with version 1 present on chain.
self.runtime_api()
.api_version::<dyn ParachainHost<Block>>(&id)
.ok()
.flatten()
.unwrap_or(0) >=
1
self.api_version_parachain_host(*head).await.ok().flatten().unwrap_or(0) >= 1
}
}

Expand Down Expand Up @@ -421,9 +413,12 @@ pub async fn forward_events<P: BlockchainEvents<Block>>(client: Arc<P>, mut hand
/// # fn main() { executor::block_on(async move {
///
/// struct AlwaysSupportsParachains;
///
/// #[async_trait::async_trait]
/// impl HeadSupportsParachains for AlwaysSupportsParachains {
/// fn head_supports_parachains(&self, _head: &Hash) -> bool { true }
/// async fn head_supports_parachains(&self, _head: &Hash) -> bool { true }
/// }
///
/// let spawner = sp_core::testing::TaskExecutor::new();
/// let (overseer, _handle) = dummy_overseer_builder(spawner, AlwaysSupportsParachains, None)
/// .unwrap()
Expand Down Expand Up @@ -718,7 +713,7 @@ where
// Notify about active leaves on startup before starting the loop
for (hash, number) in std::mem::take(&mut self.leaves) {
let _ = self.active_leaves.insert(hash, number);
if let Some((span, status)) = self.on_head_activated(&hash, None) {
if let Some((span, status)) = self.on_head_activated(&hash, None).await {
let update =
ActiveLeavesUpdate::start_work(ActivatedLeaf { hash, number, status, span });
self.broadcast_signal(OverseerSignal::ActiveLeaves(update)).await?;
Expand Down Expand Up @@ -780,7 +775,7 @@ where
},
};

let mut update = match self.on_head_activated(&block.hash, Some(block.parent_hash)) {
let mut update = match self.on_head_activated(&block.hash, Some(block.parent_hash)).await {
Some((span, status)) => ActiveLeavesUpdate::start_work(ActivatedLeaf {
hash: block.hash,
number: block.number,
Expand Down Expand Up @@ -837,12 +832,12 @@ where

/// Handles a header activation. If the header's state doesn't support the parachains API,
/// this returns `None`.
fn on_head_activated(
async fn on_head_activated(
&mut self,
hash: &Hash,
parent_hash: Option<Hash>,
) -> Option<(Arc<jaeger::Span>, LeafStatus)> {
if !self.supports_parachains.head_supports_parachains(hash) {
if !self.supports_parachains.head_supports_parachains(hash).await {
return None
}

Expand Down
Loading