From 638a9669dbea20153fd99944d31affe2cace68a0 Mon Sep 17 00:00:00 2001 From: Svyatoslav Nikolsky Date: Thu, 2 Mar 2023 17:02:49 +0300 Subject: [PATCH] Reconnect on-demand clients from MessagesSource::reconnect and MessagesTarget::reconnect (#1927) * reconnect on-demand clients from MessagesSource::reconnect and MessagesTarget::reconnect * add issue reference * fmt --- .../lib-substrate-relay/src/messages_source.rs | 16 +++++++++++++++- .../lib-substrate-relay/src/messages_target.rs | 16 +++++++++++++++- .../lib-substrate-relay/src/on_demand/headers.rs | 10 ++++++++++ .../lib-substrate-relay/src/on_demand/mod.rs | 3 +++ .../src/on_demand/parachains.rs | 9 +++++++++ 5 files changed, 52 insertions(+), 2 deletions(-) diff --git a/bridges/relays/lib-substrate-relay/src/messages_source.rs b/bridges/relays/lib-substrate-relay/src/messages_source.rs index 8d2ac5874fe60..a151af8ee8277 100644 --- a/bridges/relays/lib-substrate-relay/src/messages_source.rs +++ b/bridges/relays/lib-substrate-relay/src/messages_source.rs @@ -129,8 +129,22 @@ impl RelayClient for SubstrateMessagesSource

{ type Error = SubstrateError; async fn reconnect(&mut self) -> Result<(), SubstrateError> { + // since the client calls RPC methods on both sides, we need to reconnect both self.source_client.reconnect().await?; - self.target_client.reconnect().await + self.target_client.reconnect().await?; + + // call reconnect on on-demand headers relay, because we may use different chains there + // and the error that has lead to reconnect may have came from those other chains + // (see `require_target_header_on_source`) + // + // this may lead to multiple reconnects to the same node during the same call and it + // needs to be addressed in the future + // TODO: https://github.com/paritytech/parity-bridges-common/issues/1928 + if let Some(ref mut target_to_source_headers_relay) = self.target_to_source_headers_relay { + target_to_source_headers_relay.reconnect().await?; + } + + Ok(()) } } diff --git a/bridges/relays/lib-substrate-relay/src/messages_target.rs b/bridges/relays/lib-substrate-relay/src/messages_target.rs index 75119a1de9768..7b0d9d63fb786 100644 --- a/bridges/relays/lib-substrate-relay/src/messages_target.rs +++ b/bridges/relays/lib-substrate-relay/src/messages_target.rs @@ -123,8 +123,22 @@ impl RelayClient for SubstrateMessagesTarget

{ type Error = SubstrateError; async fn reconnect(&mut self) -> Result<(), SubstrateError> { + // since the client calls RPC methods on both sides, we need to reconnect both self.target_client.reconnect().await?; - self.source_client.reconnect().await + self.source_client.reconnect().await?; + + // call reconnect on on-demand headers relay, because we may use different chains there + // and the error that has lead to reconnect may have came from those other chains + // (see `require_source_header_on_target`) + // + // this may lead to multiple reconnects to the same node during the same call and it + // needs to be addressed in the future + // TODO: https://github.com/paritytech/parity-bridges-common/issues/1928 + if let Some(ref mut source_to_target_headers_relay) = self.source_to_target_headers_relay { + source_to_target_headers_relay.reconnect().await?; + } + + Ok(()) } } diff --git a/bridges/relays/lib-substrate-relay/src/on_demand/headers.rs b/bridges/relays/lib-substrate-relay/src/on_demand/headers.rs index 75b402aff8732..090019f087f6c 100644 --- a/bridges/relays/lib-substrate-relay/src/on_demand/headers.rs +++ b/bridges/relays/lib-substrate-relay/src/on_demand/headers.rs @@ -60,6 +60,8 @@ pub struct OnDemandHeadersRelay { required_header_number: RequiredHeaderNumberRef, /// Client of the source chain. source_client: Client, + /// Client of the target chain. + target_client: Client, } impl OnDemandHeadersRelay

{ @@ -83,6 +85,7 @@ impl OnDemandHeadersRelay

{ relay_task_name: on_demand_headers_relay_name::(), required_header_number: required_header_number.clone(), source_client: source_client.clone(), + target_client: target_client.clone(), }; async_std::task::spawn(async move { background_task::

( @@ -104,6 +107,13 @@ impl OnDemandHeadersRelay

{ impl OnDemandRelay for OnDemandHeadersRelay

{ + async fn reconnect(&self) -> Result<(), SubstrateError> { + // using clone is fine here (to avoid mut requirement), because clone on Client clones + // internal references + self.source_client.clone().reconnect().await?; + self.target_client.clone().reconnect().await + } + async fn require_more_headers(&self, required_header: BlockNumberOf) { let mut required_header_number = self.required_header_number.lock().await; if required_header > *required_header_number { diff --git a/bridges/relays/lib-substrate-relay/src/on_demand/mod.rs b/bridges/relays/lib-substrate-relay/src/on_demand/mod.rs index eca7d20163d23..00bb33d674093 100644 --- a/bridges/relays/lib-substrate-relay/src/on_demand/mod.rs +++ b/bridges/relays/lib-substrate-relay/src/on_demand/mod.rs @@ -26,6 +26,9 @@ pub mod parachains; /// On-demand headers relay that is relaying finalizing headers only when requested. #[async_trait] pub trait OnDemandRelay: Send + Sync { + /// Reconnect to source and target nodes. + async fn reconnect(&self) -> Result<(), SubstrateError>; + /// Ask relay to relay source header with given number to the target chain. /// /// Depending on implementation, on-demand relay may also relay `required_header` ancestors diff --git a/bridges/relays/lib-substrate-relay/src/on_demand/parachains.rs b/bridges/relays/lib-substrate-relay/src/on_demand/parachains.rs index 672530de3a58c..4c205770d4acb 100644 --- a/bridges/relays/lib-substrate-relay/src/on_demand/parachains.rs +++ b/bridges/relays/lib-substrate-relay/src/on_demand/parachains.rs @@ -119,6 +119,15 @@ impl OnDemandRelay, { + async fn reconnect(&self) -> Result<(), SubstrateError> { + // using clone is fine here (to avoid mut requirement), because clone on Client clones + // internal references + self.source_relay_client.clone().reconnect().await?; + self.target_client.clone().reconnect().await?; + // we'll probably need to reconnect relay chain relayer clients also + self.on_demand_source_relay_to_target_headers.reconnect().await + } + async fn require_more_headers(&self, required_header: BlockNumberOf) { if let Err(e) = self.required_header_number_sender.send(required_header).await { log::trace!(