From 63b51d9a708916d057b9fd83cb5b5740745aa475 Mon Sep 17 00:00:00 2001 From: Svyatoslav Nikolsky Date: Mon, 3 Oct 2022 09:25:48 +0300 Subject: [PATCH] Read extrinsic dispatch result for mined transaction (#1582) * read extrinsic dispatch result for mined transaction * commit for the history * Revert "commit for the history" This reverts commit 99341b04750639db296172cc1432bd70e458ef4b. * Revert "read extrinsic dispatch result for mined transaction" This reverts commit 662b776cbf992be9f1637e52f023b782e8c441d1. * check for successfult transaction in finality relay * check for successful transaction in parachains relay * TrackedTransactionStatus ->TrackedTransactionStatus * check for successful transaction in messages relay * fix compilation * message_lane_loop_is_able_to_recover_from_unsuccessful_transaction * fixed too-complex-type clippy error * aaand compilation --- relays/client-substrate/src/client.rs | 4 +- .../src/transaction_tracker.rs | 124 ++++++++++++--- relays/finality/src/finality_loop.rs | 58 +++++-- relays/finality/src/finality_loop_tests.rs | 22 ++- .../src/finality/target.rs | 2 +- .../src/messages_source.rs | 2 +- .../src/messages_target.rs | 2 +- .../src/parachains/target.rs | 2 +- relays/messages/src/message_lane_loop.rs | 145 +++++++++++++++-- relays/messages/src/message_race_loop.rs | 56 +++++-- relays/parachains/src/parachains_loop.rs | 148 ++++++++++++------ relays/utils/src/lib.rs | 11 +- 12 files changed, 462 insertions(+), 114 deletions(-) diff --git a/relays/client-substrate/src/client.rs b/relays/client-substrate/src/client.rs index ed327e167b06..067d3d89d245 100644 --- a/relays/client-substrate/src/client.rs +++ b/relays/client-substrate/src/client.rs @@ -467,7 +467,8 @@ impl Client { prepare_extrinsic: impl FnOnce(HeaderIdOf, C::Index) -> Result> + Send + 'static, - ) -> Result> { + ) -> Result> { + let self_clone = self.clone(); let _guard = self.submit_signed_extrinsic_lock.lock().await; let transaction_nonce = self.next_account_index(extrinsic_signer).await?; let best_header = self.best_header().await?; @@ -494,6 +495,7 @@ impl Client { })?; log::trace!(target: "bridge", "Sent transaction to {} node: {:?}", C::NAME, tx_hash); let tracker = TransactionTracker::new( + self_clone, stall_timeout, tx_hash, Subscription(Mutex::new(receiver)), diff --git a/relays/client-substrate/src/transaction_tracker.rs b/relays/client-substrate/src/transaction_tracker.rs index a84a46240a4a..44d8b72cb8d4 100644 --- a/relays/client-substrate/src/transaction_tracker.rs +++ b/relays/client-substrate/src/transaction_tracker.rs @@ -16,13 +16,28 @@ //! Helper for tracking transaction invalidation events. -use crate::{Chain, HashOf, Subscription, TransactionStatusOf}; +use crate::{Chain, Client, Error, HashOf, HeaderIdOf, Subscription, TransactionStatusOf}; use async_trait::async_trait; use futures::{future::Either, Future, FutureExt, Stream, StreamExt}; -use relay_utils::TrackedTransactionStatus; +use relay_utils::{HeaderId, TrackedTransactionStatus}; +use sp_runtime::traits::Header as _; use std::time::Duration; +/// Transaction tracker environment. +#[async_trait] +pub trait Environment: Send + Sync { + /// Returns header id by its hash. + async fn header_id_by_hash(&self, hash: HashOf) -> Result, Error>; +} + +#[async_trait] +impl Environment for Client { + async fn header_id_by_hash(&self, hash: HashOf) -> Result, Error> { + self.header_by_hash(hash).await.map(|h| HeaderId(*h.number(), hash)) + } +} + /// Substrate transaction tracker implementation. /// /// Substrate node provides RPC API to submit and watch for transaction events. This way @@ -43,20 +58,22 @@ use std::time::Duration; /// it is lost. /// /// This struct implements third option as it seems to be the most optimal. -pub struct TransactionTracker { +pub struct TransactionTracker { + environment: E, transaction_hash: HashOf, stall_timeout: Duration, subscription: Subscription>, } -impl TransactionTracker { +impl> TransactionTracker { /// Create transaction tracker. pub fn new( + environment: E, stall_timeout: Duration, transaction_hash: HashOf, subscription: Subscription>, ) -> Self { - Self { stall_timeout, transaction_hash, subscription } + Self { environment, stall_timeout, transaction_hash, subscription } } /// Wait for final transaction status and return it along with last known internal invalidation @@ -65,10 +82,11 @@ impl TransactionTracker { self, wait_for_stall_timeout: impl Future, wait_for_stall_timeout_rest: impl Future, - ) -> (TrackedTransactionStatus, Option) { + ) -> (TrackedTransactionStatus>, Option>>) { // sometimes we want to wait for the rest of the stall timeout even if // `wait_for_invalidation` has been "select"ed first => it is shared - let wait_for_invalidation = watch_transaction_status::( + let wait_for_invalidation = watch_transaction_status::<_, C, _>( + self.environment, self.transaction_hash, self.subscription.into_stream(), ); @@ -86,8 +104,8 @@ impl TransactionTracker { (TrackedTransactionStatus::Lost, None) }, Either::Right((invalidation_status, _)) => match invalidation_status { - InvalidationStatus::Finalized => - (TrackedTransactionStatus::Finalized, Some(invalidation_status)), + InvalidationStatus::Finalized(at_block) => + (TrackedTransactionStatus::Finalized(at_block), Some(invalidation_status)), InvalidationStatus::Invalid => (TrackedTransactionStatus::Lost, Some(invalidation_status)), InvalidationStatus::Lost => { @@ -111,8 +129,10 @@ impl TransactionTracker { } #[async_trait] -impl relay_utils::TransactionTracker for TransactionTracker { - async fn wait(self) -> TrackedTransactionStatus { +impl> relay_utils::TransactionTracker for TransactionTracker { + type HeaderId = HeaderIdOf; + + async fn wait(self) -> TrackedTransactionStatus> { let wait_for_stall_timeout = async_std::task::sleep(self.stall_timeout).shared(); let wait_for_stall_timeout_rest = wait_for_stall_timeout.clone(); self.do_wait(wait_for_stall_timeout, wait_for_stall_timeout_rest).await.0 @@ -125,9 +145,9 @@ impl relay_utils::TransactionTracker for TransactionTracker { /// ignored - relay loops are detecting the mining/finalization using their own /// techniques. That's why we're using `InvalidationStatus` here. #[derive(Debug, PartialEq)] -enum InvalidationStatus { - /// Transaction has been included into block and finalized. - Finalized, +enum InvalidationStatus { + /// Transaction has been included into block and finalized at given block. + Finalized(BlockId), /// Transaction has been invalidated. Invalid, /// We have lost track of transaction status. @@ -135,10 +155,15 @@ enum InvalidationStatus { } /// Watch for transaction status until transaction is finalized or we lose track of its status. -async fn watch_transaction_status>>( +async fn watch_transaction_status< + E: Environment, + C: Chain, + S: Stream>, +>( + environment: E, transaction_hash: HashOf, subscription: S, -) -> InvalidationStatus { +) -> InvalidationStatus> { futures::pin_mut!(subscription); loop { @@ -153,7 +178,23 @@ async fn watch_transaction_status header_id, + Err(e) => { + log::error!( + target: "bridge", + "Failed to read header {:?} when watching for {} transaction {:?}: {:?}", + block_hash, + C::NAME, + transaction_hash, + e, + ); + // that's the best option we have here + return InvalidationStatus::Lost + }, + }; + return InvalidationStatus::Finalized(header_id) }, Some(TransactionStatusOf::::Invalid) => { // if node says that the transaction is invalid, there are still chances that @@ -247,11 +288,27 @@ mod tests { use futures::{FutureExt, SinkExt}; use sc_transaction_pool_api::TransactionStatus; + struct TestEnvironment(Result, Error>); + + #[async_trait] + impl Environment for TestEnvironment { + async fn header_id_by_hash( + &self, + _hash: HashOf, + ) -> Result, Error> { + self.0.as_ref().map_err(|_| Error::UninitializedBridgePallet).cloned() + } + } + async fn on_transaction_status( status: TransactionStatus, HashOf>, - ) -> Option<(TrackedTransactionStatus, InvalidationStatus)> { + ) -> Option<( + TrackedTransactionStatus>, + InvalidationStatus>, + )> { let (mut sender, receiver) = futures::channel::mpsc::channel(1); - let tx_tracker = TransactionTracker::::new( + let tx_tracker = TransactionTracker::::new( + TestEnvironment(Ok(HeaderId(0, Default::default()))), Duration::from_secs(0), Default::default(), Subscription(async_std::sync::Mutex::new(receiver)), @@ -270,7 +327,23 @@ mod tests { async fn returns_finalized_on_finalized() { assert_eq!( on_transaction_status(TransactionStatus::Finalized(Default::default())).await, - Some((TrackedTransactionStatus::Finalized, InvalidationStatus::Finalized)), + Some(( + TrackedTransactionStatus::Finalized(Default::default()), + InvalidationStatus::Finalized(Default::default()) + )), + ); + } + + #[async_std::test] + async fn returns_lost_on_finalized_and_environment_error() { + assert_eq!( + watch_transaction_status::<_, TestChain, _>( + TestEnvironment(Err(Error::UninitializedBridgePallet)), + Default::default(), + futures::stream::iter([TransactionStatus::Finalized(Default::default())]) + ) + .now_or_never(), + Some(InvalidationStatus::Lost), ); } @@ -343,8 +416,12 @@ mod tests { #[async_std::test] async fn lost_on_subscription_error() { assert_eq!( - watch_transaction_status::(Default::default(), futures::stream::iter([])) - .now_or_never(), + watch_transaction_status::<_, TestChain, _>( + TestEnvironment(Ok(HeaderId(0, Default::default()))), + Default::default(), + futures::stream::iter([]) + ) + .now_or_never(), Some(InvalidationStatus::Lost), ); } @@ -352,7 +429,8 @@ mod tests { #[async_std::test] async fn lost_on_timeout_when_waiting_for_invalidation_status() { let (_sender, receiver) = futures::channel::mpsc::channel(1); - let tx_tracker = TransactionTracker::::new( + let tx_tracker = TransactionTracker::::new( + TestEnvironment(Ok(HeaderId(0, Default::default()))), Duration::from_secs(0), Default::default(), Subscription(async_std::sync::Mutex::new(receiver)), diff --git a/relays/finality/src/finality_loop.rs b/relays/finality/src/finality_loop.rs index 951edfdde948..a89068604709 100644 --- a/relays/finality/src/finality_loop.rs +++ b/relays/finality/src/finality_loop.rs @@ -290,15 +290,55 @@ pub(crate) async fn run_until_connection_lost( // wait till exit signal, or new source block select! { transaction_status = last_transaction_tracker => { - if transaction_status == TrackedTransactionStatus::Lost { - log::error!( - target: "bridge", - "Finality synchronization from {} to {} has stalled. Going to restart", - P::SOURCE_NAME, - P::TARGET_NAME, - ); - - return Err(FailedClient::Both); + match transaction_status { + TrackedTransactionStatus::Finalized(_) => { + // transaction has been finalized, but it may have been finalized in the "failed" state. So + // let's check if the block number has been actually updated. If it is not, then we are stalled. + // + // please also note that we're restarting the loop if we have failed to read required data + // from the target client - that's the best we can do here to avoid actual stall. + target_client + .best_finalized_source_block_id() + .await + .map_err(|e| format!("failed to read best block from target node: {:?}", e)) + .and_then(|best_id_at_target| { + let last_submitted_header_number = last_submitted_header_number + .expect("always Some when last_transaction_tracker is set;\ + last_transaction_tracker is set;\ + qed"); + if last_submitted_header_number > best_id_at_target.0 { + Err(format!( + "best block at target after tx is {:?} and we've submitted {:?}", + best_id_at_target, + last_submitted_header_number, + )) + } else { + Ok(()) + } + }) + .map_err(|e| { + log::error!( + target: "bridge", + "Failed Finality synchronization from {} to {} has stalled. Transaction failed: {}. \ + Going to restart", + P::SOURCE_NAME, + P::TARGET_NAME, + e, + ); + + FailedClient::Both + })?; + }, + TrackedTransactionStatus::Lost => { + log::error!( + target: "bridge", + "Finality synchronization from {} to {} has stalled. Going to restart", + P::SOURCE_NAME, + P::TARGET_NAME, + ); + + return Err(FailedClient::Both); + }, } }, _ = async_std::task::sleep(next_tick).fuse() => {}, diff --git a/relays/finality/src/finality_loop_tests.rs b/relays/finality/src/finality_loop_tests.rs index 7144ccb0c481..c8d5cefc2277 100644 --- a/relays/finality/src/finality_loop_tests.rs +++ b/relays/finality/src/finality_loop_tests.rs @@ -48,17 +48,19 @@ type TestNumber = u64; type TestHash = u64; #[derive(Clone, Debug)] -struct TestTransactionTracker(TrackedTransactionStatus); +struct TestTransactionTracker(TrackedTransactionStatus>); impl Default for TestTransactionTracker { fn default() -> TestTransactionTracker { - TestTransactionTracker(TrackedTransactionStatus::Finalized) + TestTransactionTracker(TrackedTransactionStatus::Finalized(Default::default())) } } #[async_trait] impl TransactionTracker for TestTransactionTracker { - async fn wait(self) -> TrackedTransactionStatus { + type HeaderId = HeaderId; + + async fn wait(self) -> TrackedTransactionStatus> { self.0 } } @@ -224,7 +226,9 @@ fn prepare_test_clients( target_best_block_id: HeaderId(5, 5), target_headers: vec![], - target_transaction_tracker: TestTransactionTracker(TrackedTransactionStatus::Finalized), + target_transaction_tracker: TestTransactionTracker(TrackedTransactionStatus::Finalized( + Default::default(), + )), })); ( TestSourceClient { @@ -581,3 +585,13 @@ fn stalls_when_transaction_tracker_returns_error() { assert_eq!(result, Err(FailedClient::Both)); } + +#[test] +fn stalls_when_transaction_tracker_returns_finalized_but_transaction_fails() { + let (_, result) = run_sync_loop(|data| { + data.target_best_block_id = HeaderId(5, 5); + data.target_best_block_id.0 == 16 + }); + + assert_eq!(result, Err(FailedClient::Both)); +} diff --git a/relays/lib-substrate-relay/src/finality/target.rs b/relays/lib-substrate-relay/src/finality/target.rs index 7bdb77d4ee05..132c33253438 100644 --- a/relays/lib-substrate-relay/src/finality/target.rs +++ b/relays/lib-substrate-relay/src/finality/target.rs @@ -89,7 +89,7 @@ where AccountIdOf: From< as Pair>::Public>, P::TransactionSignScheme: TransactionSignScheme, { - type TransactionTracker = TransactionTracker; + type TransactionTracker = TransactionTracker>; async fn best_finalized_source_block_id(&self) -> Result, Error> { // we can't continue to relay finality if target node is out of sync, because diff --git a/relays/lib-substrate-relay/src/messages_source.rs b/relays/lib-substrate-relay/src/messages_source.rs index ca0c3f54bb56..e34f477e5f0c 100644 --- a/relays/lib-substrate-relay/src/messages_source.rs +++ b/relays/lib-substrate-relay/src/messages_source.rs @@ -144,7 +144,7 @@ where From< as Pair>::Public>, P::SourceTransactionSignScheme: TransactionSignScheme, { - type TransactionTracker = TransactionTracker; + type TransactionTracker = TransactionTracker>; async fn state(&self) -> Result>, SubstrateError> { // we can't continue to deliver confirmations if source node is out of sync, because diff --git a/relays/lib-substrate-relay/src/messages_target.rs b/relays/lib-substrate-relay/src/messages_target.rs index 21a43112f81c..da41dba63a53 100644 --- a/relays/lib-substrate-relay/src/messages_target.rs +++ b/relays/lib-substrate-relay/src/messages_target.rs @@ -145,7 +145,7 @@ where P::TargetTransactionSignScheme: TransactionSignScheme, BalanceOf: TryFrom>, { - type TransactionTracker = TransactionTracker; + type TransactionTracker = TransactionTracker>; async fn state(&self) -> Result>, SubstrateError> { // we can't continue to deliver confirmations if source node is out of sync, because diff --git a/relays/lib-substrate-relay/src/parachains/target.rs b/relays/lib-substrate-relay/src/parachains/target.rs index 34a6a31311dd..8d0d361984f3 100644 --- a/relays/lib-substrate-relay/src/parachains/target.rs +++ b/relays/lib-substrate-relay/src/parachains/target.rs @@ -86,7 +86,7 @@ where P::TransactionSignScheme: TransactionSignScheme, AccountIdOf: From< as Pair>::Public>, { - type TransactionTracker = TransactionTracker; + type TransactionTracker = TransactionTracker>; async fn best_block(&self) -> Result, Self::Error> { let best_header = self.client.best_header().await?; diff --git a/relays/messages/src/message_lane_loop.rs b/relays/messages/src/message_lane_loop.rs index 6e3b02f1cffc..05c157ee7210 100644 --- a/relays/messages/src/message_lane_loop.rs +++ b/relays/messages/src/message_lane_loop.rs @@ -129,7 +129,7 @@ pub struct NoncesSubmitArtifacts { #[async_trait] pub trait SourceClient: RelayClient { /// Transaction tracker to track submitted transactions. - type TransactionTracker: TransactionTracker; + type TransactionTracker: TransactionTracker>; /// Returns state of the client. async fn state(&self) -> Result, Self::Error>; @@ -182,7 +182,7 @@ pub trait SourceClient: RelayClient { #[async_trait] pub trait TargetClient: RelayClient { /// Transaction tracker to track submitted transactions. - type TransactionTracker: TransactionTracker; + type TransactionTracker: TransactionTracker>; /// Returns state of the client. async fn state(&self) -> Result, Self::Error>; @@ -529,17 +529,19 @@ pub(crate) mod tests { } #[derive(Clone, Debug)] - pub struct TestTransactionTracker(TrackedTransactionStatus); + pub struct TestTransactionTracker(TrackedTransactionStatus); impl Default for TestTransactionTracker { fn default() -> TestTransactionTracker { - TestTransactionTracker(TrackedTransactionStatus::Finalized) + TestTransactionTracker(TrackedTransactionStatus::Finalized(Default::default())) } } #[async_trait] impl TransactionTracker for TestTransactionTracker { - async fn wait(self) -> TrackedTransactionStatus { + type HeaderId = TestTargetHeaderId; + + async fn wait(self) -> TrackedTransactionStatus { self.0 } } @@ -551,14 +553,14 @@ pub(crate) mod tests { source_state: SourceClientState, source_latest_generated_nonce: MessageNonce, source_latest_confirmed_received_nonce: MessageNonce, - source_tracked_transaction_status: TrackedTransactionStatus, + source_tracked_transaction_status: TrackedTransactionStatus, submitted_messages_receiving_proofs: Vec, is_target_fails: bool, is_target_reconnected: bool, target_state: SourceClientState, target_latest_received_nonce: MessageNonce, target_latest_confirmed_received_nonce: MessageNonce, - target_tracked_transaction_status: TrackedTransactionStatus, + target_tracked_transaction_status: TrackedTransactionStatus, submitted_messages_proofs: Vec, target_to_source_header_required: Option, target_to_source_header_requirements: Vec, @@ -574,14 +576,20 @@ pub(crate) mod tests { source_state: Default::default(), source_latest_generated_nonce: 0, source_latest_confirmed_received_nonce: 0, - source_tracked_transaction_status: TrackedTransactionStatus::Finalized, + source_tracked_transaction_status: TrackedTransactionStatus::Finalized(HeaderId( + 0, + Default::default(), + )), submitted_messages_receiving_proofs: Vec::new(), is_target_fails: false, is_target_reconnected: false, target_state: Default::default(), target_latest_received_nonce: 0, target_latest_confirmed_received_nonce: 0, - target_tracked_transaction_status: TrackedTransactionStatus::Finalized, + target_tracked_transaction_status: TrackedTransactionStatus::Finalized(HeaderId( + 0, + Default::default(), + )), submitted_messages_proofs: Vec::new(), target_to_source_header_required: None, target_to_source_header_requirements: Vec::new(), @@ -595,6 +603,7 @@ pub(crate) mod tests { pub struct TestSourceClient { data: Arc>, tick: Arc, + post_tick: Arc, } impl Default for TestSourceClient { @@ -602,6 +611,7 @@ pub(crate) mod tests { TestSourceClient { data: Arc::new(Mutex::new(TestClientData::default())), tick: Arc::new(|_| {}), + post_tick: Arc::new(|_| {}), } } } @@ -615,6 +625,7 @@ pub(crate) mod tests { let mut data = self.data.lock(); (self.tick)(&mut data); data.is_source_reconnected = true; + (self.post_tick)(&mut data); } Ok(()) } @@ -630,6 +641,7 @@ pub(crate) mod tests { if data.is_source_fails { return Err(TestError) } + (self.post_tick)(&mut data); Ok(data.source_state.clone()) } @@ -642,6 +654,7 @@ pub(crate) mod tests { if data.is_source_fails { return Err(TestError) } + (self.post_tick)(&mut data); Ok((id, data.source_latest_generated_nonce)) } @@ -651,6 +664,7 @@ pub(crate) mod tests { ) -> Result<(SourceHeaderIdOf, MessageNonce), TestError> { let mut data = self.data.lock(); (self.tick)(&mut data); + (self.post_tick)(&mut data); Ok((id, data.source_latest_confirmed_received_nonce)) } @@ -685,6 +699,7 @@ pub(crate) mod tests { > { let mut data = self.data.lock(); (self.tick)(&mut data); + (self.post_tick)(&mut data); Ok(( id, nonces.clone(), @@ -711,6 +726,7 @@ pub(crate) mod tests { data.source_state.best_finalized_self = data.source_state.best_self; data.submitted_messages_receiving_proofs.push(proof); data.source_latest_confirmed_received_nonce = proof; + (self.post_tick)(&mut data); Ok(TestTransactionTracker(data.source_tracked_transaction_status)) } @@ -719,6 +735,7 @@ pub(crate) mod tests { data.target_to_source_header_required = Some(id); data.target_to_source_header_requirements.push(id); (self.tick)(&mut data); + (self.post_tick)(&mut data); } async fn estimate_confirmation_transaction(&self) -> TestSourceChainBalance { @@ -730,6 +747,7 @@ pub(crate) mod tests { pub struct TestTargetClient { data: Arc>, tick: Arc, + post_tick: Arc, } impl Default for TestTargetClient { @@ -737,6 +755,7 @@ pub(crate) mod tests { TestTargetClient { data: Arc::new(Mutex::new(TestClientData::default())), tick: Arc::new(|_| {}), + post_tick: Arc::new(|_| {}), } } } @@ -750,6 +769,7 @@ pub(crate) mod tests { let mut data = self.data.lock(); (self.tick)(&mut data); data.is_target_reconnected = true; + (self.post_tick)(&mut data); } Ok(()) } @@ -765,6 +785,7 @@ pub(crate) mod tests { if data.is_target_fails { return Err(TestError) } + (self.post_tick)(&mut data); Ok(data.target_state.clone()) } @@ -777,6 +798,7 @@ pub(crate) mod tests { if data.is_target_fails { return Err(TestError) } + (self.post_tick)(&mut data); Ok((id, data.target_latest_received_nonce)) } @@ -804,6 +826,7 @@ pub(crate) mod tests { if data.is_target_fails { return Err(TestError) } + (self.post_tick)(&mut data); Ok((id, data.target_latest_confirmed_received_nonce)) } @@ -834,6 +857,7 @@ pub(crate) mod tests { target_latest_confirmed_received_nonce; } data.submitted_messages_proofs.push(proof); + (self.post_tick)(&mut data); Ok(NoncesSubmitArtifacts { nonces, tx_tracker: TestTransactionTracker(data.target_tracked_transaction_status), @@ -845,6 +869,7 @@ pub(crate) mod tests { data.source_to_target_header_required = Some(id); data.source_to_target_header_requirements.push(id); (self.tick)(&mut data); + (self.post_tick)(&mut data); } async fn estimate_delivery_transaction_in_source_tokens( @@ -863,14 +888,24 @@ pub(crate) mod tests { fn run_loop_test( data: TestClientData, source_tick: Arc, + source_post_tick: Arc, target_tick: Arc, + target_post_tick: Arc, exit_signal: impl Future + 'static + Send, ) -> TestClientData { async_std::task::block_on(async { let data = Arc::new(Mutex::new(data)); - let source_client = TestSourceClient { data: data.clone(), tick: source_tick }; - let target_client = TestTargetClient { data: data.clone(), tick: target_tick }; + let source_client = TestSourceClient { + data: data.clone(), + tick: source_tick, + post_tick: source_post_tick, + }; + let target_client = TestTargetClient { + data: data.clone(), + tick: target_tick, + post_tick: target_post_tick, + }; let _ = run( Params { lane: [0, 0, 0, 0], @@ -928,6 +963,7 @@ pub(crate) mod tests { data.is_target_fails = true; } }), + Arc::new(|_| {}), Arc::new(move |data: &mut TestClientData| { if data.is_target_reconnected { data.is_target_fails = false; @@ -942,6 +978,7 @@ pub(crate) mod tests { exit_sender.unbounded_send(()).unwrap(); } }), + Arc::new(|_| {}), exit_receiver.into_future().map(|(_, _)| ()), ); @@ -976,24 +1013,104 @@ pub(crate) mod tests { }, Arc::new(move |data: &mut TestClientData| { if data.is_source_reconnected { - data.source_tracked_transaction_status = TrackedTransactionStatus::Finalized; + data.source_tracked_transaction_status = + TrackedTransactionStatus::Finalized(Default::default()); } if data.is_source_reconnected && data.is_target_reconnected { source_exit_sender.unbounded_send(()).unwrap(); } }), + Arc::new(|_| {}), Arc::new(move |data: &mut TestClientData| { if data.is_target_reconnected { - data.target_tracked_transaction_status = TrackedTransactionStatus::Finalized; + data.target_tracked_transaction_status = + TrackedTransactionStatus::Finalized(Default::default()); } if data.is_source_reconnected && data.is_target_reconnected { target_exit_sender.unbounded_send(()).unwrap(); } }), + Arc::new(|_| {}), + exit_receiver.into_future().map(|(_, _)| ()), + ); + + assert!(result.is_source_reconnected); + } + + #[test] + fn message_lane_loop_is_able_to_recover_from_unsuccessful_transaction() { + // with this configuration, both source and target clients will mine their transactions, but + // their corresponding nonce won't be udpated => reconnect will happen + let (exit_sender, exit_receiver) = unbounded(); + let result = run_loop_test( + TestClientData { + source_state: ClientState { + best_self: HeaderId(0, 0), + best_finalized_self: HeaderId(0, 0), + best_finalized_peer_at_best_self: HeaderId(0, 0), + actual_best_finalized_peer_at_best_self: HeaderId(0, 0), + }, + source_latest_generated_nonce: 1, + target_state: ClientState { + best_self: HeaderId(0, 0), + best_finalized_self: HeaderId(0, 0), + best_finalized_peer_at_best_self: HeaderId(0, 0), + actual_best_finalized_peer_at_best_self: HeaderId(0, 0), + }, + target_latest_received_nonce: 0, + ..Default::default() + }, + Arc::new(move |data: &mut TestClientData| { + // blocks are produced on every tick + data.source_state.best_self = + HeaderId(data.source_state.best_self.0 + 1, data.source_state.best_self.1 + 1); + data.source_state.best_finalized_self = data.source_state.best_self; + // syncing target headers -> source chain + if let Some(last_requirement) = data.target_to_source_header_requirements.last() { + if *last_requirement != data.source_state.best_finalized_peer_at_best_self { + data.source_state.best_finalized_peer_at_best_self = *last_requirement; + } + } + }), + Arc::new(move |data: &mut TestClientData| { + // if it is the first time we're submitting delivery proof, let's revert changes + // to source status => then the delivery confirmation transaction is "finalized", + // but the state is not altered + if data.submitted_messages_receiving_proofs.len() == 1 { + data.source_latest_confirmed_received_nonce = 0; + } + }), + Arc::new(move |data: &mut TestClientData| { + // blocks are produced on every tick + data.target_state.best_self = + HeaderId(data.target_state.best_self.0 + 1, data.target_state.best_self.1 + 1); + data.target_state.best_finalized_self = data.target_state.best_self; + // syncing source headers -> target chain + if let Some(last_requirement) = data.source_to_target_header_requirements.last() { + if *last_requirement != data.target_state.best_finalized_peer_at_best_self { + data.target_state.best_finalized_peer_at_best_self = *last_requirement; + } + } + // if source has received all messages receiving confirmations => stop + if data.source_latest_confirmed_received_nonce == 1 { + exit_sender.unbounded_send(()).unwrap(); + } + }), + Arc::new(move |data: &mut TestClientData| { + // if it is the first time we're submitting messages proof, let's revert changes + // to target status => then the messages delivery transaction is "finalized", but + // the state is not altered + if data.submitted_messages_proofs.len() == 1 { + data.target_latest_received_nonce = 0; + data.target_latest_confirmed_received_nonce = 0; + } + }), exit_receiver.into_future().map(|(_, _)| ()), ); assert!(result.is_source_reconnected); + assert_eq!(result.submitted_messages_proofs.len(), 2); + assert_eq!(result.submitted_messages_receiving_proofs.len(), 2); } #[test] @@ -1037,6 +1154,7 @@ pub(crate) mod tests { } } }), + Arc::new(|_| {}), Arc::new(move |data: &mut TestClientData| { // blocks are produced on every tick data.target_state.best_self = @@ -1061,6 +1179,7 @@ pub(crate) mod tests { exit_sender.unbounded_send(()).unwrap(); } }), + Arc::new(|_| {}), exit_receiver.into_future().map(|(_, _)| ()), ); diff --git a/relays/messages/src/message_race_loop.rs b/relays/messages/src/message_race_loop.rs index 86306b1c424d..15308f93032b 100644 --- a/relays/messages/src/message_race_loop.rs +++ b/relays/messages/src/message_race_loop.rs @@ -128,7 +128,7 @@ pub trait TargetClient { /// Type of the additional data from the target client, used by the race. type TargetNoncesData: std::fmt::Debug; /// Transaction tracker to track submitted transactions. - type TransactionTracker: TransactionTracker; + type TransactionTracker: TransactionTracker; /// Ask headers relay to relay finalized headers up to (and including) given header /// from race source to race target. @@ -419,17 +419,49 @@ pub async fn run, TC: TargetClient

>( ).fail_if_error(FailedClient::Target).map(|_| true)?; }, target_transaction_status = target_tx_tracker => { - if target_transaction_status == TrackedTransactionStatus::Lost { - log::warn!( - target: "bridge", - "{} -> {} race has stalled. State: {:?}. Strategy: {:?}", - P::source_name(), - P::target_name(), - race_state, - strategy, - ); - - return Err(FailedClient::Both); + match (target_transaction_status, race_state.nonces_submitted.as_ref()) { + (TrackedTransactionStatus::Finalized(at_block), Some(nonces_submitted)) => { + // our transaction has been mined, but was it successful or not? let's check the best + // nonce at the target node. + race_target.nonces(at_block, false) + .await + .map_err(|e| format!("failed to read nonces from target node: {:?}", e)) + .and_then(|(_, nonces_at_target)| { + if nonces_at_target.latest_nonce < *nonces_submitted.end() { + Err(format!( + "best nonce at target after tx is {:?} and we've submitted {:?}", + nonces_at_target.latest_nonce, + nonces_submitted.end(), + )) + } else { + Ok(()) + } + }) + .map_err(|e| { + log::error!( + target: "bridge", + "{} -> {} race has stalled. Transaction failed: {}. Going to restart", + P::source_name(), + P::target_name(), + e, + ); + + FailedClient::Both + })?; + }, + (TrackedTransactionStatus::Lost, _) => { + log::warn!( + target: "bridge", + "{} -> {} race has stalled. State: {:?}. Strategy: {:?}", + P::source_name(), + P::target_name(), + race_state, + strategy, + ); + + return Err(FailedClient::Both); + }, + _ => (), } }, diff --git a/relays/parachains/src/parachains_loop.rs b/relays/parachains/src/parachains_loop.rs index 09e55740cef3..6648f2efc727 100644 --- a/relays/parachains/src/parachains_loop.rs +++ b/relays/parachains/src/parachains_loop.rs @@ -124,7 +124,7 @@ pub trait SourceClient: RelayClient { #[async_trait] pub trait TargetClient: RelayClient { /// Transaction tracker to track submitted transactions. - type TransactionTracker: TransactionTracker; + type TransactionTracker: TransactionTracker>; /// Get best block id. async fn best_block(&self) -> Result, Self::Error>; @@ -260,13 +260,13 @@ where // check if our transaction has been mined if let Some(tracker) = submitted_heads_tracker.take() { - match tracker.update(&heads_at_target).await { + match tracker.update(&best_target_block, &heads_at_target).await { SubmittedHeadsStatus::Waiting(tracker) => { // no news about our transaction and we shall keep waiting submitted_heads_tracker = Some(tracker); continue }, - SubmittedHeadsStatus::Final(TrackedTransactionStatus::Finalized) => { + SubmittedHeadsStatus::Final(TrackedTransactionStatus::Finalized(_)) => { // all heads have been updated, we don't need this tracker anymore }, SubmittedHeadsStatus::Final(TrackedTransactionStatus::Lost) => { @@ -529,9 +529,24 @@ enum SubmittedHeadsStatus { /// Heads are not yet updated. Waiting(SubmittedHeadsTracker

), /// Heads transaction has either been finalized or lost (i.e. received its "final" status). - Final(TrackedTransactionStatus), + Final(TrackedTransactionStatus>), } +/// Type of the transaction tracker that the `SubmittedHeadsTracker` is using. +/// +/// It needs to be shared because of `poll` macro and our consuming `update` method. +type SharedTransactionTracker

= Shared< + Pin< + Box< + dyn Future< + Output = TrackedTransactionStatus< + HeaderIdOf<

::TargetChain>, + >, + > + Send, + >, + >, +>; + /// Submitted parachain heads transaction. struct SubmittedHeadsTracker { /// Ids of parachains which heads were updated in the tracked transaction. @@ -541,7 +556,7 @@ struct SubmittedHeadsTracker { /// Future that waits for submitted transaction finality or loss. /// /// It needs to be shared because of `poll` macro and our consuming `update` method. - transaction_tracker: Shared + Send>>>, + transaction_tracker: SharedTransactionTracker

, } impl SubmittedHeadsTracker

@@ -552,7 +567,7 @@ where pub fn new( awaiting_update: impl IntoIterator, relay_block_number: BlockNumberOf, - transaction_tracker: impl TransactionTracker + 'static, + transaction_tracker: impl TransactionTracker> + 'static, ) -> Self { SubmittedHeadsTracker { awaiting_update: awaiting_update.into_iter().collect(), @@ -564,6 +579,7 @@ where /// Returns `None` if all submitted parachain heads have been updated. pub async fn update( mut self, + at_target_block: &HeaderIdOf, heads_at_target: &BTreeMap>, ) -> SubmittedHeadsStatus

{ // remove all pending heads that were synced @@ -590,14 +606,23 @@ where // if we have synced all required heads, we are done if self.awaiting_update.is_empty() { - return SubmittedHeadsStatus::Final(TrackedTransactionStatus::Finalized) + return SubmittedHeadsStatus::Final(TrackedTransactionStatus::Finalized( + *at_target_block, + )) } // if underlying transaction tracker has reported that the transaction is lost, we may // then restart our sync let transaction_tracker = self.transaction_tracker.clone(); - if let Poll::Ready(TrackedTransactionStatus::Lost) = poll!(transaction_tracker) { - return SubmittedHeadsStatus::Final(TrackedTransactionStatus::Lost) + match poll!(transaction_tracker) { + Poll::Ready(TrackedTransactionStatus::Lost) => + return SubmittedHeadsStatus::Final(TrackedTransactionStatus::Lost), + Poll::Ready(TrackedTransactionStatus::Finalized(_)) => { + // so we are here and our transaction is mined+finalized, but some of heads were not + // updated => we're considering our loop as stalled + return SubmittedHeadsStatus::Final(TrackedTransactionStatus::Lost) + }, + _ => (), } SubmittedHeadsStatus::Waiting(self) @@ -644,12 +669,17 @@ mod tests { } #[derive(Clone, Debug)] - struct TestTransactionTracker(TrackedTransactionStatus); + struct TestTransactionTracker(Option>>); #[async_trait] impl TransactionTracker for TestTransactionTracker { - async fn wait(self) -> TrackedTransactionStatus { - self.0 + type HeaderId = HeaderIdOf; + + async fn wait(self) -> TrackedTransactionStatus> { + match self.0 { + Some(status) => status, + None => futures::future::pending().await, + } } } @@ -785,7 +815,9 @@ mod tests { if let Some(mut exit_signal_sender) = data.exit_signal_sender.take() { exit_signal_sender.send(()).await.unwrap(); } - Ok(TestTransactionTracker(TrackedTransactionStatus::Finalized)) + Ok(TestTransactionTracker(Some( + TrackedTransactionStatus::Finalized(Default::default()), + ))) } } @@ -938,10 +970,31 @@ mod tests { SubmittedHeadsTracker::new( vec![ParaId(PARA_ID), ParaId(PARA_1_ID)], SOURCE_BLOCK_NUMBER, - TestTransactionTracker(TrackedTransactionStatus::Finalized), + TestTransactionTracker(None), ) } + fn all_expected_tracker_heads() -> BTreeMap> { + vec![ + ( + ParaId(PARA_ID), + Some(BestParaHeadHash { + at_relay_block_number: SOURCE_BLOCK_NUMBER, + head_hash: PARA_0_HASH, + }), + ), + ( + ParaId(PARA_1_ID), + Some(BestParaHeadHash { + at_relay_block_number: SOURCE_BLOCK_NUMBER, + head_hash: PARA_0_HASH, + }), + ), + ] + .into_iter() + .collect() + } + impl From> for Option> { fn from(status: SubmittedHeadsStatus) -> Option> { match status { @@ -955,7 +1008,10 @@ mod tests { async fn tx_tracker_update_when_nothing_is_updated() { assert_eq!( Some(test_tx_tracker().awaiting_update), - test_tx_tracker().update(&vec![].into_iter().collect()).await.into(), + test_tx_tracker() + .update(&HeaderId(0, Default::default()), &vec![].into_iter().collect()) + .await + .into(), ); } @@ -965,6 +1021,7 @@ mod tests { Some(test_tx_tracker().awaiting_update), test_tx_tracker() .update( + &HeaderId(0, Default::default()), &vec![( ParaId(PARA_ID), Some(BestParaHeadHash { @@ -986,6 +1043,7 @@ mod tests { Some(vec![ParaId(PARA_1_ID)].into_iter().collect::>()), test_tx_tracker() .update( + &HeaderId(0, Default::default()), &vec![( ParaId(PARA_ID), Some(BestParaHeadHash { @@ -1006,50 +1064,52 @@ mod tests { assert_eq!( Option::>::None, test_tx_tracker() - .update( - &vec![ - ( - ParaId(PARA_ID), - Some(BestParaHeadHash { - at_relay_block_number: SOURCE_BLOCK_NUMBER, - head_hash: PARA_0_HASH, - }) - ), - ( - ParaId(PARA_1_ID), - Some(BestParaHeadHash { - at_relay_block_number: SOURCE_BLOCK_NUMBER, - head_hash: PARA_0_HASH, - }) - ), - ] - .into_iter() - .collect() - ) + .update(&HeaderId(0, Default::default()), &all_expected_tracker_heads()) .await .into(), ); } #[async_std::test] - async fn tx_tracker_update_when_tx_is_stalled() { + async fn tx_tracker_update_when_tx_is_lost() { let mut tx_tracker = test_tx_tracker(); tx_tracker.transaction_tracker = futures::future::ready(TrackedTransactionStatus::Lost).boxed().shared(); - assert_eq!( - Option::>::None, - tx_tracker.update(&vec![].into_iter().collect()).await.into(), - ); + assert!(matches!( + tx_tracker + .update(&HeaderId(0, Default::default()), &vec![].into_iter().collect()) + .await, + SubmittedHeadsStatus::Final(TrackedTransactionStatus::Lost), + )); + } + + #[async_std::test] + async fn tx_tracker_update_when_tx_is_finalized_but_heads_are_not_updated() { + let mut tx_tracker = test_tx_tracker(); + tx_tracker.transaction_tracker = + futures::future::ready(TrackedTransactionStatus::Finalized(Default::default())) + .boxed() + .shared(); + assert!(matches!( + tx_tracker + .update(&HeaderId(0, Default::default()), &vec![].into_iter().collect()) + .await, + SubmittedHeadsStatus::Final(TrackedTransactionStatus::Lost), + )); } #[async_std::test] - async fn tx_tracker_update_when_tx_is_finalized() { + async fn tx_tracker_update_when_tx_is_finalized_and_heads_are_updated() { let mut tx_tracker = test_tx_tracker(); tx_tracker.transaction_tracker = - futures::future::ready(TrackedTransactionStatus::Finalized).boxed().shared(); + futures::future::ready(TrackedTransactionStatus::Finalized(Default::default())) + .boxed() + .shared(); assert!(matches!( - tx_tracker.update(&vec![].into_iter().collect()).await, - SubmittedHeadsStatus::Waiting(_), + tx_tracker + .update(&HeaderId(0, Default::default()), &all_expected_tracker_heads()) + .await, + SubmittedHeadsStatus::Final(TrackedTransactionStatus::Finalized(_)), )); } diff --git a/relays/utils/src/lib.rs b/relays/utils/src/lib.rs index b19b9cd44988..eb3d8ec75250 100644 --- a/relays/utils/src/lib.rs +++ b/relays/utils/src/lib.rs @@ -122,18 +122,21 @@ pub trait MaybeConnectionError { /// Final status of the tracked transaction. #[derive(Debug, Clone, Copy, PartialEq)] -pub enum TrackedTransactionStatus { +pub enum TrackedTransactionStatus { /// Transaction has been lost. Lost, - /// Transaction has been mined and finalized. - Finalized, + /// Transaction has been mined and finalized at given block. + Finalized(BlockId), } /// Transaction tracker. #[async_trait] pub trait TransactionTracker: Send { + /// Header id, used by the chain. + type HeaderId: Clone + Send; + /// Wait until transaction is either finalized or invalidated/lost. - async fn wait(self) -> TrackedTransactionStatus; + async fn wait(self) -> TrackedTransactionStatus; } /// Stringified error that may be either connection-related or not.