Skip to content

Commit

Permalink
Unify RelayChainInterface error handling and introduce async (parityt…
Browse files Browse the repository at this point in the history
  • Loading branch information
skunert authored Jan 25, 2022
1 parent c70156b commit a963055
Show file tree
Hide file tree
Showing 17 changed files with 532 additions and 414 deletions.
3 changes: 3 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion client/consensus/common/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ cumulus-relay-chain-interface = { path = "../../relay-chain-interface" }
futures = { version = "0.3.8", features = ["compat"] }
codec = { package = "parity-scale-codec", version = "2.3.0", features = [ "derive" ] }
tracing = "0.1.25"
async-trait = "0.1.42"
async-trait = "0.1.52"
dyn-clone = "1.0.4"

[dev-dependencies]
Expand Down
84 changes: 58 additions & 26 deletions client/consensus/common/src/parachain_consensus.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,12 +14,13 @@
// You should have received a copy of the GNU General Public License
// along with Cumulus. If not, see <http://www.gnu.org/licenses/>.

use cumulus_relay_chain_interface::RelayChainInterface;
use async_trait::async_trait;
use cumulus_relay_chain_interface::{RelayChainInterface, RelayChainResult};
use sc_client_api::{
Backend, BlockBackend, BlockImportNotification, BlockchainEvents, Finalizer, UsageProvider,
};
use sc_consensus::{BlockImport, BlockImportParams, ForkChoiceStrategy};
use sp_blockchain::{Error as ClientError, Result as ClientResult};
use sp_blockchain::Error as ClientError;
use sp_consensus::{BlockOrigin, BlockStatus};
use sp_runtime::{
generic::BlockId,
Expand All @@ -29,11 +30,14 @@ use sp_runtime::{
use polkadot_primitives::v1::{Block as PBlock, Id as ParaId, OccupiedCoreAssumption};

use codec::Decode;
use futures::{future, select, FutureExt, Stream, StreamExt};
use futures::{select, FutureExt, Stream, StreamExt};

use std::{pin::Pin, sync::Arc};

const LOG_TARGET: &str = "cumulus-consensus";

/// Helper for the relay chain client. This is expected to be a lightweight handle like an `Arc`.
#[async_trait]
pub trait RelaychainClient: Clone + 'static {
/// The error type for interacting with the Polkadot client.
type Error: std::fmt::Debug + Send;
Expand All @@ -42,17 +46,17 @@ pub trait RelaychainClient: Clone + 'static {
type HeadStream: Stream<Item = Vec<u8>> + Send + Unpin;

/// Get a stream of new best heads for the given parachain.
fn new_best_heads(&self, para_id: ParaId) -> Self::HeadStream;
async fn new_best_heads(&self, para_id: ParaId) -> RelayChainResult<Self::HeadStream>;

/// Get a stream of finalized heads for the given parachain.
fn finalized_heads(&self, para_id: ParaId) -> Self::HeadStream;
async fn finalized_heads(&self, para_id: ParaId) -> RelayChainResult<Self::HeadStream>;

/// Returns the parachain head for the given `para_id` at the given block id.
fn parachain_head_at(
async fn parachain_head_at(
&self,
at: &BlockId<PBlock>,
para_id: ParaId,
) -> ClientResult<Option<Vec<u8>>>;
) -> RelayChainResult<Option<Vec<u8>>>;
}

/// Follow the finalized head of the given parachain.
Expand All @@ -66,7 +70,13 @@ where
R: RelaychainClient,
B: Backend<Block>,
{
let mut finalized_heads = relay_chain.finalized_heads(para_id);
let mut finalized_heads = match relay_chain.finalized_heads(para_id).await {
Ok(finalized_heads_stream) => finalized_heads_stream,
Err(err) => {
tracing::error!(target: LOG_TARGET, error = ?err, "Unable to retrieve finalized heads stream.");
return
},
};

loop {
let finalized_head = if let Some(h) = finalized_heads.next().await {
Expand Down Expand Up @@ -165,7 +175,14 @@ async fn follow_new_best<P, R, Block, B>(
R: RelaychainClient,
B: Backend<Block>,
{
let mut new_best_heads = relay_chain.new_best_heads(para_id).fuse();
let mut new_best_heads = match relay_chain.new_best_heads(para_id).await {
Ok(best_heads_stream) => best_heads_stream.fuse(),
Err(err) => {
tracing::error!(target: LOG_TARGET, error = ?err, "Unable to retrieve best heads stream.");
return
},
};

let mut imported_blocks = parachain.import_notification_stream().fuse();
// The unset best header of the parachain. Will be `Some(_)` when we have imported a relay chain
// block before the parachain block it included. In this case we need to wait for this block to
Expand Down Expand Up @@ -368,6 +385,7 @@ where
}
}

#[async_trait]
impl<RCInterface> RelaychainClient for RCInterface
where
RCInterface: RelayChainInterface + Clone + 'static,
Expand All @@ -376,39 +394,53 @@ where

type HeadStream = Pin<Box<dyn Stream<Item = Vec<u8>> + Send>>;

fn new_best_heads(&self, para_id: ParaId) -> Self::HeadStream {
async fn new_best_heads(&self, para_id: ParaId) -> RelayChainResult<Self::HeadStream> {
let relay_chain = self.clone();

self.import_notification_stream()
let new_best_notification_stream = self
.new_best_notification_stream()
.await?
.filter_map(move |n| {
future::ready(if n.is_new_best {
relay_chain.parachain_head_at(&BlockId::hash(n.hash), para_id).ok().flatten()
} else {
None
})
let relay_chain = relay_chain.clone();
async move {
relay_chain
.parachain_head_at(&BlockId::hash(n.hash()), para_id)
.await
.ok()
.flatten()
}
})
.boxed()
.boxed();
Ok(new_best_notification_stream)
}

fn finalized_heads(&self, para_id: ParaId) -> Self::HeadStream {
async fn finalized_heads(&self, para_id: ParaId) -> RelayChainResult<Self::HeadStream> {
let relay_chain = self.clone();

self.finality_notification_stream()
let finality_notification_stream = self
.finality_notification_stream()
.await?
.filter_map(move |n| {
future::ready(
relay_chain.parachain_head_at(&BlockId::hash(n.hash), para_id).ok().flatten(),
)
let relay_chain = relay_chain.clone();
async move {
relay_chain
.parachain_head_at(&BlockId::hash(n.hash()), para_id)
.await
.ok()
.flatten()
}
})
.boxed()
.boxed();
Ok(finality_notification_stream)
}

fn parachain_head_at(
async fn parachain_head_at(
&self,
at: &BlockId<PBlock>,
para_id: ParaId,
) -> ClientResult<Option<Vec<u8>>> {
) -> RelayChainResult<Option<Vec<u8>>> {
self.persisted_validation_data(at, para_id, OccupiedCoreAssumption::TimedOut)
.await
.map(|s| s.map(|s| s.parent_head.0))
.map_err(Into::into)
}
}
19 changes: 13 additions & 6 deletions client/consensus/common/src/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,9 @@

use crate::*;

use async_trait::async_trait;
use codec::Encode;
use cumulus_relay_chain_interface::RelayChainResult;
use cumulus_test_client::{
runtime::{Block, Header},
Backend, Client, InitBlockBuilder, TestClientBuilder, TestClientBuilderExt,
Expand All @@ -26,7 +28,7 @@ use futures_timer::Delay;
use polkadot_primitives::v1::{Block as PBlock, Id as ParaId};
use sc_client_api::UsageProvider;
use sc_consensus::{BlockImport, BlockImportParams, ForkChoiceStrategy};
use sp_blockchain::{Error as ClientError, Result as ClientResult};
use sp_blockchain::Error as ClientError;
use sp_consensus::BlockOrigin;
use sp_runtime::generic::BlockId;
use std::{
Expand Down Expand Up @@ -66,12 +68,13 @@ impl Relaychain {
}
}

#[async_trait]
impl crate::parachain_consensus::RelaychainClient for Relaychain {
type Error = ClientError;

type HeadStream = Box<dyn Stream<Item = Vec<u8>> + Send + Unpin>;

fn new_best_heads(&self, _: ParaId) -> Self::HeadStream {
async fn new_best_heads(&self, _: ParaId) -> RelayChainResult<Self::HeadStream> {
let stream = self
.inner
.lock()
Expand All @@ -80,10 +83,10 @@ impl crate::parachain_consensus::RelaychainClient for Relaychain {
.take()
.expect("Should only be called once");

Box::new(stream.map(|v| v.encode()))
Ok(Box::new(stream.map(|v| v.encode())))
}

fn finalized_heads(&self, _: ParaId) -> Self::HeadStream {
async fn finalized_heads(&self, _: ParaId) -> RelayChainResult<Self::HeadStream> {
let stream = self
.inner
.lock()
Expand All @@ -92,10 +95,14 @@ impl crate::parachain_consensus::RelaychainClient for Relaychain {
.take()
.expect("Should only be called once");

Box::new(stream.map(|v| v.encode()))
Ok(Box::new(stream.map(|v| v.encode())))
}

fn parachain_head_at(&self, _: &BlockId<PBlock>, _: ParaId) -> ClientResult<Option<Vec<u8>>> {
async fn parachain_head_at(
&self,
_: &BlockId<PBlock>,
_: ParaId,
) -> RelayChainResult<Option<Vec<u8>>> {
unimplemented!("Not required for tests")
}
}
Expand Down
2 changes: 1 addition & 1 deletion client/consensus/relay-chain/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -176,7 +176,7 @@ where
.propose(
inherent_data,
Default::default(),
//TODO: Fix this.
// TODO: Fix this.
Duration::from_millis(500),
// Set the block limit to 50% of the maximum PoV size.
//
Expand Down
Loading

0 comments on commit a963055

Please sign in to comment.