diff --git a/relays/client-substrate/src/lib.rs b/relays/client-substrate/src/lib.rs index b9e489688d3..bd38f3a928f 100644 --- a/relays/client-substrate/src/lib.rs +++ b/relays/client-substrate/src/lib.rs @@ -88,30 +88,3 @@ pub fn transaction_stall_timeout( .map(|mortality_period| average_block_interval.saturating_mul(mortality_period + 1 + 1)) .unwrap_or(default_stall_timeout) } - -/// Returns stall timeout for relay loop that submit transactions to two chains. -/// -/// Bidirectional relay may have two active transactions. Even if one of them has been spoiled, we -/// can't just restart the loop - the other transaction may still be alive and we'll be submitting -/// duplicate transaction, which may result in funds loss. So we'll be selecting maximal mortality -/// for choosing loop stall timeout. -pub fn bidirectional_transaction_stall_timeout( - left_mortality_period: Option, - right_mortality_period: Option, - left_average_block_interval: Duration, - right_average_block_interval: Duration, - default_stall_timeout: Duration, -) -> Duration { - std::cmp::max( - transaction_stall_timeout( - left_mortality_period, - left_average_block_interval, - default_stall_timeout, - ), - transaction_stall_timeout( - right_mortality_period, - right_average_block_interval, - default_stall_timeout, - ), - ) -} diff --git a/relays/lib-substrate-relay/src/messages_lane.rs b/relays/lib-substrate-relay/src/messages_lane.rs index f69ebc347d7..55fcb540ebe 100644 --- a/relays/lib-substrate-relay/src/messages_lane.rs +++ b/relays/lib-substrate-relay/src/messages_lane.rs @@ -164,13 +164,6 @@ where { let source_client = params.source_client; let target_client = params.target_client; - let stall_timeout = relay_substrate_client::bidirectional_transaction_stall_timeout( - params.source_transaction_params.mortality, - params.target_transaction_params.mortality, - P::SourceChain::AVERAGE_BLOCK_INTERVAL, - P::TargetChain::AVERAGE_BLOCK_INTERVAL, - STALL_TIMEOUT, - ); let relayer_id_at_source: AccountIdOf = params.source_transaction_params.signer.public().into(); @@ -202,8 +195,7 @@ where Max messages in single transaction: {}\n\t\ Max messages size in single transaction: {}\n\t\ Max messages weight in single transaction: {}\n\t\ - Tx mortality: {:?} (~{}m)/{:?} (~{}m)\n\t\ - Stall timeout: {:?}", + Tx mortality: {:?} (~{}m)/{:?} (~{}m)", P::SourceChain::NAME, P::TargetChain::NAME, P::SourceChain::NAME, @@ -223,7 +215,6 @@ where P::TargetChain::AVERAGE_BLOCK_INTERVAL, STALL_TIMEOUT, ).as_secs_f64() / 60.0f64, - stall_timeout, ); messages_relay::message_lane_loop::run( @@ -232,7 +223,6 @@ where source_tick: P::SourceChain::AVERAGE_BLOCK_INTERVAL, target_tick: P::TargetChain::AVERAGE_BLOCK_INTERVAL, reconnect_delay: relay_utils::relay_loop::RECONNECT_DELAY, - stall_timeout, delivery_params: messages_relay::message_lane_loop::MessageDeliveryParams { max_unrewarded_relayer_entries_at_target: P::SourceChain::MAX_UNREWARDED_RELAYERS_IN_CONFIRMATION_TX, diff --git a/relays/lib-substrate-relay/src/messages_source.rs b/relays/lib-substrate-relay/src/messages_source.rs index 5de9e30dd0e..ca0c3f54bb5 100644 --- a/relays/lib-substrate-relay/src/messages_source.rs +++ b/relays/lib-substrate-relay/src/messages_source.rs @@ -51,7 +51,7 @@ use num_traits::{Bounded, Zero}; use relay_substrate_client::{ AccountIdOf, AccountKeyPairOf, BalanceOf, BlockNumberOf, Chain, ChainWithMessages, Client, Error as SubstrateError, HashOf, HeaderIdOf, IndexOf, SignParam, TransactionEra, - TransactionSignScheme, UnsignedTransaction, + TransactionSignScheme, TransactionTracker, UnsignedTransaction, }; use relay_utils::{relay_loop::Client as RelayClient, HeaderId}; use sp_core::{Bytes, Pair}; @@ -144,6 +144,8 @@ where From< as Pair>::Public>, P::SourceTransactionSignScheme: TransactionSignScheme, { + type TransactionTracker = TransactionTracker; + async fn state(&self) -> Result>, SubstrateError> { // we can't continue to deliver confirmations if source node is out of sync, because // it may have already received confirmations that we're going to deliver @@ -338,13 +340,13 @@ where &self, _generated_at_block: TargetHeaderIdOf>, proof: as MessageLane>::MessagesReceivingProof, - ) -> Result<(), SubstrateError> { + ) -> Result { let genesis_hash = *self.source_client.genesis_hash(); let transaction_params = self.transaction_params.clone(); let (spec_version, transaction_version) = self.source_client.simple_runtime_version().await?; self.source_client - .submit_signed_extrinsic( + .submit_and_watch_signed_extrinsic( self.transaction_params.signer.public().into(), SignParam:: { spec_version, @@ -362,8 +364,7 @@ where ) }, ) - .await?; - Ok(()) + .await } async fn require_target_header_on_source(&self, id: TargetHeaderIdOf>) { diff --git a/relays/lib-substrate-relay/src/messages_target.rs b/relays/lib-substrate-relay/src/messages_target.rs index 29d80bc2c5e..4e5ba6ae087 100644 --- a/relays/lib-substrate-relay/src/messages_target.rs +++ b/relays/lib-substrate-relay/src/messages_target.rs @@ -39,13 +39,13 @@ use codec::Encode; use frame_support::weights::{Weight, WeightToFee}; use messages_relay::{ message_lane::{MessageLane, SourceHeaderIdOf, TargetHeaderIdOf}, - message_lane_loop::{TargetClient, TargetClientState}, + message_lane_loop::{NoncesSubmitArtifacts, TargetClient, TargetClientState}, }; use num_traits::{Bounded, Zero}; use relay_substrate_client::{ AccountIdOf, AccountKeyPairOf, BalanceOf, BlockNumberOf, Chain, ChainWithMessages, Client, Error as SubstrateError, HashOf, HeaderIdOf, IndexOf, SignParam, TransactionEra, - TransactionSignScheme, UnsignedTransaction, WeightToFeeOf, + TransactionSignScheme, TransactionTracker, UnsignedTransaction, WeightToFeeOf, }; use relay_utils::{relay_loop::Client as RelayClient, HeaderId}; use sp_core::{Bytes, Pair}; @@ -145,6 +145,8 @@ where P::TargetTransactionSignScheme: TransactionSignScheme, BalanceOf: TryFrom>, { + type TransactionTracker = TransactionTracker; + async fn state(&self) -> Result>, SubstrateError> { // we can't continue to deliver confirmations if source node is out of sync, because // it may have already received confirmations that we're going to deliver @@ -245,15 +247,16 @@ where _generated_at_header: SourceHeaderIdOf>, nonces: RangeInclusive, proof: as MessageLane>::MessagesProof, - ) -> Result, SubstrateError> { + ) -> Result, SubstrateError> { let genesis_hash = *self.target_client.genesis_hash(); let transaction_params = self.transaction_params.clone(); let relayer_id_at_source = self.relayer_id_at_source.clone(); let nonces_clone = nonces.clone(); let (spec_version, transaction_version) = self.target_client.simple_runtime_version().await?; - self.target_client - .submit_signed_extrinsic( + let tx_tracker = self + .target_client + .submit_and_watch_signed_extrinsic( self.transaction_params.signer.public().into(), SignParam:: { spec_version, @@ -274,7 +277,7 @@ where }, ) .await?; - Ok(nonces) + Ok(NoncesSubmitArtifacts { nonces, tx_tracker }) } async fn require_source_header_on_target(&self, id: SourceHeaderIdOf>) { diff --git a/relays/messages/src/message_lane_loop.rs b/relays/messages/src/message_lane_loop.rs index bd7a7de8290..6e3b02f1cff 100644 --- a/relays/messages/src/message_lane_loop.rs +++ b/relays/messages/src/message_lane_loop.rs @@ -33,7 +33,7 @@ use bp_messages::{LaneId, MessageNonce, UnrewardedRelayersState, Weight}; use bp_runtime::messages::DispatchFeePayment; use relay_utils::{ interval, metrics::MetricsParams, process_future_result, relay_loop::Client as RelayClient, - retry_backoff, FailedClient, + retry_backoff, FailedClient, TransactionTracker, }; use crate::{ @@ -55,8 +55,6 @@ pub struct Params { pub target_tick: Duration, /// Delay between moments when connection error happens and our reconnect attempt. pub reconnect_delay: Duration, - /// The loop will auto-restart if there has been no updates during this period. - pub stall_timeout: Duration, /// Message delivery race parameters. pub delivery_params: MessageDeliveryParams, } @@ -119,9 +117,20 @@ pub struct MessageProofParameters { pub dispatch_weight: Weight, } +/// Artifacts of submitting nonces proof. +pub struct NoncesSubmitArtifacts { + /// Submitted nonces range. + pub nonces: RangeInclusive, + /// Submitted transaction tracker. + pub tx_tracker: T, +} + /// Source client trait. #[async_trait] pub trait SourceClient: RelayClient { + /// Transaction tracker to track submitted transactions. + type TransactionTracker: TransactionTracker; + /// Returns state of the client. async fn state(&self) -> Result, Self::Error>; @@ -160,7 +169,7 @@ pub trait SourceClient: RelayClient { &self, generated_at_block: TargetHeaderIdOf

, proof: P::MessagesReceivingProof, - ) -> Result<(), Self::Error>; + ) -> Result; /// We need given finalized target header on source to continue synchronization. async fn require_target_header_on_source(&self, id: TargetHeaderIdOf

); @@ -172,6 +181,9 @@ pub trait SourceClient: RelayClient { /// Target client trait. #[async_trait] pub trait TargetClient: RelayClient { + /// Transaction tracker to track submitted transactions. + type TransactionTracker: TransactionTracker; + /// Returns state of the client. async fn state(&self) -> Result, Self::Error>; @@ -205,7 +217,7 @@ pub trait TargetClient: RelayClient { generated_at_header: SourceHeaderIdOf

, nonces: RangeInclusive, proof: P::MessagesProof, - ) -> Result, Self::Error>; + ) -> Result, Self::Error>; /// We need given finalized source header on target to continue synchronization. async fn require_source_header_on_target(&self, id: SourceHeaderIdOf

); @@ -327,7 +339,6 @@ async fn run_until_connection_lost< delivery_source_state_receiver, target_client.clone(), delivery_target_state_receiver, - params.stall_timeout, metrics_msg.clone(), params.delivery_params, ) @@ -342,7 +353,6 @@ async fn run_until_connection_lost< receiving_source_state_receiver, target_client.clone(), receiving_target_state_receiver, - params.stall_timeout, metrics_msg.clone(), ) .fuse(); @@ -465,7 +475,7 @@ pub(crate) mod tests { use futures::stream::StreamExt; use parking_lot::Mutex; - use relay_utils::{HeaderId, MaybeConnectionError}; + use relay_utils::{HeaderId, MaybeConnectionError, TrackedTransactionStatus}; use crate::relay_strategy::AltruisticStrategy; @@ -518,19 +528,37 @@ pub(crate) mod tests { type TargetHeaderHash = TestTargetHeaderHash; } - #[derive(Debug, Default, Clone)] + #[derive(Clone, Debug)] + pub struct TestTransactionTracker(TrackedTransactionStatus); + + impl Default for TestTransactionTracker { + fn default() -> TestTransactionTracker { + TestTransactionTracker(TrackedTransactionStatus::Finalized) + } + } + + #[async_trait] + impl TransactionTracker for TestTransactionTracker { + async fn wait(self) -> TrackedTransactionStatus { + self.0 + } + } + + #[derive(Debug, Clone)] pub struct TestClientData { is_source_fails: bool, is_source_reconnected: bool, source_state: SourceClientState, source_latest_generated_nonce: MessageNonce, source_latest_confirmed_received_nonce: MessageNonce, + source_tracked_transaction_status: TrackedTransactionStatus, submitted_messages_receiving_proofs: Vec, is_target_fails: bool, is_target_reconnected: bool, target_state: SourceClientState, target_latest_received_nonce: MessageNonce, target_latest_confirmed_received_nonce: MessageNonce, + target_tracked_transaction_status: TrackedTransactionStatus, submitted_messages_proofs: Vec, target_to_source_header_required: Option, target_to_source_header_requirements: Vec, @@ -538,6 +566,31 @@ pub(crate) mod tests { source_to_target_header_requirements: Vec, } + impl Default for TestClientData { + fn default() -> TestClientData { + TestClientData { + is_source_fails: false, + is_source_reconnected: false, + source_state: Default::default(), + source_latest_generated_nonce: 0, + source_latest_confirmed_received_nonce: 0, + source_tracked_transaction_status: TrackedTransactionStatus::Finalized, + submitted_messages_receiving_proofs: Vec::new(), + is_target_fails: false, + is_target_reconnected: false, + target_state: Default::default(), + target_latest_received_nonce: 0, + target_latest_confirmed_received_nonce: 0, + target_tracked_transaction_status: TrackedTransactionStatus::Finalized, + submitted_messages_proofs: Vec::new(), + target_to_source_header_required: None, + target_to_source_header_requirements: Vec::new(), + source_to_target_header_required: None, + source_to_target_header_requirements: Vec::new(), + } + } + } + #[derive(Clone)] pub struct TestSourceClient { data: Arc>, @@ -569,6 +622,8 @@ pub(crate) mod tests { #[async_trait] impl SourceClient for TestSourceClient { + type TransactionTracker = TestTransactionTracker; + async fn state(&self) -> Result, TestError> { let mut data = self.data.lock(); (self.tick)(&mut data); @@ -648,7 +703,7 @@ pub(crate) mod tests { &self, _generated_at_block: TargetHeaderIdOf, proof: TestMessagesReceivingProof, - ) -> Result<(), TestError> { + ) -> Result { let mut data = self.data.lock(); (self.tick)(&mut data); data.source_state.best_self = @@ -656,7 +711,7 @@ pub(crate) mod tests { data.source_state.best_finalized_self = data.source_state.best_self; data.submitted_messages_receiving_proofs.push(proof); data.source_latest_confirmed_received_nonce = proof; - Ok(()) + Ok(TestTransactionTracker(data.source_tracked_transaction_status)) } async fn require_target_header_on_source(&self, id: TargetHeaderIdOf) { @@ -702,6 +757,8 @@ pub(crate) mod tests { #[async_trait] impl TargetClient for TestTargetClient { + type TransactionTracker = TestTransactionTracker; + async fn state(&self) -> Result, TestError> { let mut data = self.data.lock(); (self.tick)(&mut data); @@ -762,7 +819,7 @@ pub(crate) mod tests { _generated_at_header: SourceHeaderIdOf, nonces: RangeInclusive, proof: TestMessagesProof, - ) -> Result, TestError> { + ) -> Result, TestError> { let mut data = self.data.lock(); (self.tick)(&mut data); if data.is_target_fails { @@ -777,7 +834,10 @@ pub(crate) mod tests { target_latest_confirmed_received_nonce; } data.submitted_messages_proofs.push(proof); - Ok(nonces) + Ok(NoncesSubmitArtifacts { + nonces, + tx_tracker: TestTransactionTracker(data.target_tracked_transaction_status), + }) } async fn require_source_header_on_target(&self, id: SourceHeaderIdOf) { @@ -817,7 +877,6 @@ pub(crate) mod tests { source_tick: Duration::from_millis(100), target_tick: Duration::from_millis(100), reconnect_delay: Duration::from_millis(0), - stall_timeout: Duration::from_millis(60 * 1000), delivery_params: MessageDeliveryParams { max_unrewarded_relayer_entries_at_target: 4, max_unconfirmed_nonces_at_target: 4, @@ -889,6 +948,54 @@ pub(crate) mod tests { assert_eq!(result.submitted_messages_proofs, vec![(1..=1, None)],); } + #[test] + fn message_lane_loop_is_able_to_recover_from_race_stall() { + // with this configuration, both source and target clients will lose their transactions => + // reconnect will happen + let (source_exit_sender, exit_receiver) = unbounded(); + let target_exit_sender = source_exit_sender.clone(); + let result = run_loop_test( + TestClientData { + source_state: ClientState { + best_self: HeaderId(0, 0), + best_finalized_self: HeaderId(0, 0), + best_finalized_peer_at_best_self: HeaderId(0, 0), + actual_best_finalized_peer_at_best_self: HeaderId(0, 0), + }, + source_latest_generated_nonce: 1, + source_tracked_transaction_status: TrackedTransactionStatus::Lost, + target_state: ClientState { + best_self: HeaderId(0, 0), + best_finalized_self: HeaderId(0, 0), + best_finalized_peer_at_best_self: HeaderId(0, 0), + actual_best_finalized_peer_at_best_self: HeaderId(0, 0), + }, + target_latest_received_nonce: 0, + target_tracked_transaction_status: TrackedTransactionStatus::Lost, + ..Default::default() + }, + Arc::new(move |data: &mut TestClientData| { + if data.is_source_reconnected { + data.source_tracked_transaction_status = TrackedTransactionStatus::Finalized; + } + if data.is_source_reconnected && data.is_target_reconnected { + source_exit_sender.unbounded_send(()).unwrap(); + } + }), + Arc::new(move |data: &mut TestClientData| { + if data.is_target_reconnected { + data.target_tracked_transaction_status = TrackedTransactionStatus::Finalized; + } + if data.is_source_reconnected && data.is_target_reconnected { + target_exit_sender.unbounded_send(()).unwrap(); + } + }), + exit_receiver.into_future().map(|(_, _)| ()), + ); + + assert!(result.is_source_reconnected); + } + #[test] fn message_lane_loop_works() { let (exit_sender, exit_receiver) = unbounded(); diff --git a/relays/messages/src/message_race_delivery.rs b/relays/messages/src/message_race_delivery.rs index 85f6d955e7d..e15e08b0423 100644 --- a/relays/messages/src/message_race_delivery.rs +++ b/relays/messages/src/message_race_delivery.rs @@ -13,7 +13,7 @@ //! Message delivery race delivers proof-of-messages from "lane.source" to "lane.target". -use std::{collections::VecDeque, marker::PhantomData, ops::RangeInclusive, time::Duration}; +use std::{collections::VecDeque, marker::PhantomData, ops::RangeInclusive}; use async_trait::async_trait; use futures::stream::FusedStream; @@ -24,7 +24,7 @@ use relay_utils::FailedClient; use crate::{ message_lane::{MessageLane, SourceHeaderIdOf, TargetHeaderIdOf}, message_lane_loop::{ - MessageDeliveryParams, MessageDetailsMap, MessageProofParameters, + MessageDeliveryParams, MessageDetailsMap, MessageProofParameters, NoncesSubmitArtifacts, SourceClient as MessageLaneSourceClient, SourceClientState, TargetClient as MessageLaneTargetClient, TargetClientState, }, @@ -43,7 +43,6 @@ pub async fn run( source_state_updates: impl FusedStream>, target_client: impl MessageLaneTargetClient

, target_state_updates: impl FusedStream>, - stall_timeout: Duration, metrics_msg: Option, params: MessageDeliveryParams, ) -> Result<(), FailedClient> { @@ -60,7 +59,6 @@ pub async fn run( _phantom: Default::default(), }, target_state_updates, - stall_timeout, MessageDeliveryStrategy:: { lane_source_client: source_client, lane_target_client: target_client, @@ -174,6 +172,7 @@ where { type Error = C::Error; type TargetNoncesData = DeliveryRaceTargetNoncesData; + type TransactionTracker = C::TransactionTracker; async fn require_source_header(&self, id: SourceHeaderIdOf

) { self.client.require_source_header_on_target(id).await @@ -215,7 +214,7 @@ where generated_at_block: SourceHeaderIdOf

, nonces: RangeInclusive, proof: P::MessagesProof, - ) -> Result, Self::Error> { + ) -> Result, Self::Error> { self.client.submit_messages_proof(generated_at_block, nonces, proof).await } } diff --git a/relays/messages/src/message_race_loop.rs b/relays/messages/src/message_race_loop.rs index a7254f70ee4..a68efbfd193 100644 --- a/relays/messages/src/message_race_loop.rs +++ b/relays/messages/src/message_race_loop.rs @@ -20,7 +20,7 @@ //! associated data - like messages, lane state, etc) to the target node by //! generating and submitting proof. -use crate::message_lane_loop::ClientState; +use crate::message_lane_loop::{ClientState, NoncesSubmitArtifacts}; use async_trait::async_trait; use bp_messages::MessageNonce; @@ -28,7 +28,10 @@ use futures::{ future::FutureExt, stream::{FusedStream, StreamExt}, }; -use relay_utils::{process_future_result, retry_backoff, FailedClient, MaybeConnectionError}; +use relay_utils::{ + process_future_result, retry_backoff, FailedClient, MaybeConnectionError, + TrackedTransactionStatus, TransactionTracker, +}; use std::{ fmt::Debug, ops::RangeInclusive, @@ -124,6 +127,8 @@ pub trait TargetClient { type Error: std::fmt::Debug + MaybeConnectionError; /// Type of the additional data from the target client, used by the race. type TargetNoncesData: std::fmt::Debug; + /// Transaction tracker to track submitted transactions. + type TransactionTracker: TransactionTracker; /// Ask headers relay to relay finalized headers up to (and including) given header /// from race source to race target. @@ -141,7 +146,7 @@ pub trait TargetClient { generated_at_block: P::SourceHeaderId, nonces: RangeInclusive, proof: P::Proof, - ) -> Result, Self::Error>; + ) -> Result, Self::Error>; } /// Race strategy. @@ -222,7 +227,6 @@ pub async fn run, TC: TargetClient

>( race_source_updated: impl FusedStream>, race_target: TC, race_target_updated: impl FusedStream>, - stall_timeout: Duration, mut strategy: impl RaceStrategy< P::SourceHeaderId, P::TargetHeaderId, @@ -234,7 +238,6 @@ pub async fn run, TC: TargetClient

>( ) -> Result<(), FailedClient> { let mut progress_context = Instant::now(); let mut race_state = RaceState::default(); - let mut stall_countdown = Instant::now(); let mut source_retry_backoff = retry_backoff(); let mut source_client_is_online = true; @@ -250,6 +253,7 @@ pub async fn run, TC: TargetClient

>( let target_best_nonces = futures::future::Fuse::terminated(); let target_finalized_nonces = futures::future::Fuse::terminated(); let target_submit_proof = futures::future::Fuse::terminated(); + let target_tx_tracker = futures::future::Fuse::terminated(); let target_go_offline_future = futures::future::Fuse::terminated(); futures::pin_mut!( @@ -261,6 +265,7 @@ pub async fn run, TC: TargetClient

>( target_best_nonces, target_finalized_nonces, target_submit_proof, + target_tx_tracker, target_go_offline_future, ); @@ -343,11 +348,7 @@ pub async fn run, TC: TargetClient

>( nonces, ); - let prev_best_at_target = strategy.best_at_target(); strategy.best_target_nonces_updated(nonces, &mut race_state); - if strategy.best_at_target() != prev_best_at_target { - stall_countdown = Instant::now(); - } }, &mut target_go_offline_future, async_std::task::sleep, @@ -400,23 +401,37 @@ pub async fn run, TC: TargetClient

>( target_client_is_online = process_future_result( proof_submit_result, &mut target_retry_backoff, - |nonces_range| { + |artifacts: NoncesSubmitArtifacts| { log::debug!( target: "bridge", "Successfully submitted proof of nonces {:?} to {}", - nonces_range, + artifacts.nonces, P::target_name(), ); race_state.nonces_to_submit = None; - race_state.nonces_submitted = Some(nonces_range); - stall_countdown = Instant::now(); + race_state.nonces_submitted = Some(artifacts.nonces); + target_tx_tracker.set(artifacts.tx_tracker.wait().fuse()); }, &mut target_go_offline_future, async_std::task::sleep, || format!("Error submitting proof {}", P::target_name()), ).fail_if_connection_error(FailedClient::Target)?; }, + target_transaction_status = target_tx_tracker => { + if target_transaction_status == TrackedTransactionStatus::Lost { + log::warn!( + target: "bridge", + "{} -> {} race has stalled. State: {:?}. Strategy: {:?}", + P::source_name(), + P::target_name(), + race_state, + strategy, + ); + + return Err(FailedClient::Both); + } + }, // when we're ready to retry request _ = source_go_offline_future => { @@ -429,24 +444,6 @@ pub async fn run, TC: TargetClient

>( progress_context = print_race_progress::(progress_context, &strategy); - if stall_countdown.elapsed() > stall_timeout { - log::warn!( - target: "bridge", - "{} -> {} race has stalled. State: {:?}. Strategy: {:?}", - P::source_name(), - P::target_name(), - race_state, - strategy, - ); - - return Err(FailedClient::Both) - } else if race_state.nonces_to_submit.is_none() && - race_state.nonces_submitted.is_none() && - strategy.is_empty() - { - stall_countdown = Instant::now(); - } - if source_client_is_online { source_client_is_online = false; diff --git a/relays/messages/src/message_race_receiving.rs b/relays/messages/src/message_race_receiving.rs index 5aa36cbd9c6..c3d65d0e86a 100644 --- a/relays/messages/src/message_race_receiving.rs +++ b/relays/messages/src/message_race_receiving.rs @@ -16,7 +16,7 @@ use crate::{ message_lane::{MessageLane, SourceHeaderIdOf, TargetHeaderIdOf}, message_lane_loop::{ - SourceClient as MessageLaneSourceClient, SourceClientState, + NoncesSubmitArtifacts, SourceClient as MessageLaneSourceClient, SourceClientState, TargetClient as MessageLaneTargetClient, TargetClientState, }, message_race_loop::{ @@ -31,7 +31,7 @@ use async_trait::async_trait; use bp_messages::MessageNonce; use futures::stream::FusedStream; use relay_utils::FailedClient; -use std::{marker::PhantomData, ops::RangeInclusive, time::Duration}; +use std::{marker::PhantomData, ops::RangeInclusive}; /// Message receiving confirmations delivery strategy. type ReceivingConfirmationsBasicStrategy

= BasicStrategy< @@ -49,7 +49,6 @@ pub async fn run( source_state_updates: impl FusedStream>, target_client: impl MessageLaneTargetClient

, target_state_updates: impl FusedStream>, - stall_timeout: Duration, metrics_msg: Option, ) -> Result<(), FailedClient> { crate::message_race_loop::run( @@ -65,7 +64,6 @@ pub async fn run( _phantom: Default::default(), }, source_state_updates, - stall_timeout, ReceivingConfirmationsBasicStrategy::

::new(), ) .await @@ -157,6 +155,7 @@ where { type Error = C::Error; type TargetNoncesData = (); + type TransactionTracker = C::TransactionTracker; async fn require_source_header(&self, id: TargetHeaderIdOf

) { self.client.require_target_header_on_source(id).await @@ -182,9 +181,10 @@ where generated_at_block: TargetHeaderIdOf

, nonces: RangeInclusive, proof: P::MessagesReceivingProof, - ) -> Result, Self::Error> { - self.client.submit_messages_receiving_proof(generated_at_block, proof).await?; - Ok(nonces) + ) -> Result, Self::Error> { + let tx_tracker = + self.client.submit_messages_receiving_proof(generated_at_block, proof).await?; + Ok(NoncesSubmitArtifacts { nonces, tx_tracker }) } }