Skip to content

Commit

Permalink
TransactionInvalidationTracker (#1544)
Browse files Browse the repository at this point in the history
* TransactionInvalidationTracker

* TransacitonInvalidationTracker -> TransactionTracker

* change sign_transaction method

* clippy and spelling

* removed comment

* more transactiontracker tests

* stalls_when_transaction_tracker_returns_error

* remove test code

* remove "impl TransactionTracker for ()"

* enum TrackedTransactionStatus

* test TransactionTracker in on_transaction_status

* do_wait
  • Loading branch information
svyatonik authored and bkchr committed Apr 10, 2024
1 parent 58fe274 commit 70d6e91
Show file tree
Hide file tree
Showing 8 changed files with 514 additions and 145 deletions.
134 changes: 50 additions & 84 deletions bridges/relays/bin-substrate/src/cli/register_parachain.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,10 +26,8 @@ use polkadot_runtime_common::{
paras_registrar::Call as ParaRegistrarCall, slots::Call as ParaSlotsCall,
};
use polkadot_runtime_parachains::paras::ParaLifecycle;
use relay_substrate_client::{
AccountIdOf, CallOf, Chain, Client, HashOf, SignParam, Subscription, TransactionStatusOf,
UnsignedTransaction,
};
use relay_substrate_client::{AccountIdOf, CallOf, Chain, Client, SignParam, UnsignedTransaction};
use relay_utils::{TrackedTransactionStatus, TransactionTracker};
use rialto_runtime::SudoCall;
use sp_core::{
storage::{well_known_keys::CODE, StorageKey},
Expand Down Expand Up @@ -116,26 +114,30 @@ impl RegisterParachain {
ParaRegistrarCall::reserve {}.into();
let reserve_parachain_signer = relay_sign.clone();
let (spec_version, transaction_version) = relay_client.simple_runtime_version().await?;
wait_until_transaction_is_finalized::<Relaychain>(
relay_client
.submit_and_watch_signed_extrinsic(
relay_sudo_account.clone(),
SignParam::<Relaychain> {
spec_version,
transaction_version,
genesis_hash: relay_genesis_hash,
signer: reserve_parachain_signer,
},
move |_, transaction_nonce| {
Ok(UnsignedTransaction::new(
reserve_parachain_id_call.into(),
transaction_nonce,
))
},
)
.await?,
)
.await?;
let reserve_result = relay_client
.submit_and_watch_signed_extrinsic(
relay_sudo_account.clone(),
SignParam::<Relaychain> {
spec_version,
transaction_version,
genesis_hash: relay_genesis_hash,
signer: reserve_parachain_signer,
},
move |_, transaction_nonce| {
Ok(UnsignedTransaction::new(
reserve_parachain_id_call.into(),
transaction_nonce,
))
},
)
.await?
.wait()
.await;
if reserve_result == TrackedTransactionStatus::Lost {
return Err(anyhow::format_err!(
"Failed to finalize `reserve-parachain-id` transaction"
))
}
log::info!(target: "bridge", "Reserved parachain id: {:?}", para_id);

// step 2: register parathread
Expand All @@ -161,26 +163,30 @@ impl RegisterParachain {
}
.into();
let register_parathread_signer = relay_sign.clone();
wait_until_transaction_is_finalized::<Relaychain>(
relay_client
.submit_and_watch_signed_extrinsic(
relay_sudo_account.clone(),
SignParam::<Relaychain> {
spec_version,
transaction_version,
genesis_hash: relay_genesis_hash,
signer: register_parathread_signer,
},
move |_, transaction_nonce| {
Ok(UnsignedTransaction::new(
register_parathread_call.into(),
transaction_nonce,
))
},
)
.await?,
)
.await?;
let register_result = relay_client
.submit_and_watch_signed_extrinsic(
relay_sudo_account.clone(),
SignParam::<Relaychain> {
spec_version,
transaction_version,
genesis_hash: relay_genesis_hash,
signer: register_parathread_signer,
},
move |_, transaction_nonce| {
Ok(UnsignedTransaction::new(
register_parathread_call.into(),
transaction_nonce,
))
},
)
.await?
.wait()
.await;
if register_result == TrackedTransactionStatus::Lost {
return Err(anyhow::format_err!(
"Failed to finalize `register-parathread` transaction"
))
}
log::info!(target: "bridge", "Registered parachain: {:?}. Waiting for onboarding", para_id);

// wait until parathread is onboarded
Expand Down Expand Up @@ -256,46 +262,6 @@ impl RegisterParachain {
}
}

/// Wait until transaction is included into finalized block.
///
/// Returns the hash of the finalized block with transaction.
pub(crate) async fn wait_until_transaction_is_finalized<C: Chain>(
subscription: Subscription<TransactionStatusOf<C>>,
) -> anyhow::Result<HashOf<C>> {
loop {
let transaction_status = subscription.next().await?;
match transaction_status {
Some(TransactionStatusOf::<C>::FinalityTimeout(_)) |
Some(TransactionStatusOf::<C>::Usurped(_)) |
Some(TransactionStatusOf::<C>::Dropped) |
Some(TransactionStatusOf::<C>::Invalid) |
None =>
return Err(anyhow::format_err!(
"We've been waiting for finalization of {} transaction, but it now has the {:?} status",
C::NAME,
transaction_status,
)),
Some(TransactionStatusOf::<C>::Finalized(block_hash)) => {
log::trace!(
target: "bridge",
"{} transaction has been finalized at block {}",
C::NAME,
block_hash,
);
return Ok(block_hash)
},
_ => {
log::trace!(
target: "bridge",
"Received intermediate status of {} transaction: {:?}",
C::NAME,
transaction_status,
);
},
}
}
}

/// Wait until parachain state is changed.
async fn wait_para_state<Relaychain: Chain>(
relay_client: &Client<Relaychain>,
Expand Down
36 changes: 27 additions & 9 deletions bridges/relays/client-substrate/src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,8 @@ use crate::{
SubstrateGrandpaClient, SubstrateStateClient, SubstrateSystemClient,
SubstrateTransactionPaymentClient,
},
ConnectionParams, Error, HashOf, HeaderIdOf, Result, SignParam, TransactionSignScheme,
TransactionStatusOf, UnsignedTransaction,
transaction_stall_timeout, ConnectionParams, Error, HashOf, HeaderIdOf, Result, SignParam,
TransactionSignScheme, TransactionTracker, UnsignedTransaction,
};

use async_std::sync::{Arc, Mutex};
Expand All @@ -40,7 +40,7 @@ use jsonrpsee::{
use num_traits::{Bounded, Zero};
use pallet_balances::AccountData;
use pallet_transaction_payment::InclusionFee;
use relay_utils::relay_loop::RECONNECT_DELAY;
use relay_utils::{relay_loop::RECONNECT_DELAY, STALL_TIMEOUT};
use sp_core::{
storage::{StorageData, StorageKey},
Bytes, Hasher,
Expand All @@ -58,7 +58,7 @@ const SUB_API_TXPOOL_VALIDATE_TRANSACTION: &str = "TaggedTransactionQueue_valida
const MAX_SUBSCRIPTION_CAPACITY: usize = 4096;

/// Opaque justifications subscription type.
pub struct Subscription<T>(Mutex<futures::channel::mpsc::Receiver<Option<T>>>);
pub struct Subscription<T>(pub(crate) Mutex<futures::channel::mpsc::Receiver<Option<T>>>);

/// Opaque GRANDPA authorities set.
pub type OpaqueGrandpaAuthoritiesSet = Vec<u8>;
Expand Down Expand Up @@ -467,14 +467,20 @@ impl<C: Chain> Client<C> {
prepare_extrinsic: impl FnOnce(HeaderIdOf<C>, C::Index) -> Result<UnsignedTransaction<C>>
+ Send
+ 'static,
) -> Result<Subscription<TransactionStatusOf<C>>> {
) -> Result<TransactionTracker<C>> {
let _guard = self.submit_signed_extrinsic_lock.lock().await;
let transaction_nonce = self.next_account_index(extrinsic_signer).await?;
let best_header = self.best_header().await?;
let best_header_id = best_header.id();
let subscription = self
let (sender, receiver) = futures::channel::mpsc::channel(MAX_SUBSCRIPTION_CAPACITY);
let (tracker, subscription) = self
.jsonrpsee_execute(move |client| async move {
let extrinsic = prepare_extrinsic(best_header_id, transaction_nonce)?;
let stall_timeout = transaction_stall_timeout(
extrinsic.era.mortality_period(),
C::AVERAGE_BLOCK_INTERVAL,
STALL_TIMEOUT,
);
let signed_extrinsic = S::sign_transaction(signing_data, extrinsic)?.encode();
let tx_hash = C::Hasher::hash(&signed_extrinsic);
let subscription = SubstrateAuthorClient::<C>::submit_and_watch_extrinsic(
Expand All @@ -487,17 +493,21 @@ impl<C: Chain> Client<C> {
e
})?;
log::trace!(target: "bridge", "Sent transaction to {} node: {:?}", C::NAME, tx_hash);
Ok(subscription)
let tracker = TransactionTracker::new(
stall_timeout,
tx_hash,
Subscription(Mutex::new(receiver)),
);
Ok((tracker, subscription))
})
.await?;
let (sender, receiver) = futures::channel::mpsc::channel(MAX_SUBSCRIPTION_CAPACITY);
self.tokio.spawn(Subscription::background_worker(
C::NAME.into(),
"extrinsic".into(),
subscription,
sender,
));
Ok(Subscription(Mutex::new(receiver)))
Ok(tracker)
}

/// Returns pending extrinsics from transaction pool.
Expand Down Expand Up @@ -669,6 +679,14 @@ impl<C: Chain> Client<C> {
}

impl<T: DeserializeOwned> Subscription<T> {
/// Consumes subscription and returns future statuses stream.
pub fn into_stream(self) -> impl futures::Stream<Item = T> {
futures::stream::unfold(self, |this| async {
let item = this.0.lock().await.next().await.unwrap_or(None);
item.map(|i| (i, this))
})
}

/// Return next item from the subscription.
pub async fn next(&self) -> Result<Option<T>> {
let mut receiver = self.0.lock().await;
Expand Down
2 changes: 2 additions & 0 deletions bridges/relays/client-substrate/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ mod client;
mod error;
mod rpc;
mod sync_header;
mod transaction_tracker;

pub mod guard;
pub mod metrics;
Expand All @@ -39,6 +40,7 @@ pub use crate::{
client::{ChainRuntimeVersion, Client, OpaqueGrandpaAuthoritiesSet, Subscription},
error::{Error, Result},
sync_header::SyncHeader,
transaction_tracker::TransactionTracker,
};
pub use bp_runtime::{
AccountIdOf, AccountPublicOf, BalanceOf, BlockNumberOf, Chain as ChainBase, HashOf, HeaderOf,
Expand Down
Loading

0 comments on commit 70d6e91

Please sign in to comment.