From 8f0dbaa105b36e227014ab39dda8c85c5051d6fa Mon Sep 17 00:00:00 2001 From: Shoaib Ahmed Date: Thu, 24 Feb 2022 01:59:14 +0530 Subject: [PATCH 01/45] Return update height from update_client_{dst,src} methods --- relayer/src/link/relay_path.rs | 98 ++++++++++++++++++++++------------ 1 file changed, 65 insertions(+), 33 deletions(-) diff --git a/relayer/src/link/relay_path.rs b/relayer/src/link/relay_path.rs index b61544f870..8e2be19d66 100644 --- a/relayer/src/link/relay_path.rs +++ b/relayer/src/link/relay_path.rs @@ -9,6 +9,7 @@ use tracing::{debug, error, info, span, trace, warn, Level}; use ibc::{ core::{ + ics02_client::client_consensus::QueryClientEventRequest, ics04_channel::{ channel::{ChannelEnd, Order, QueryPacketEventDataRequest, State as ChannelState}, events::{SendPacket, WriteAcknowledgement}, @@ -699,29 +700,49 @@ impl RelayPath { self.channel.connection_delay == ZERO_DURATION } + fn update_height( + chain: &C, + client_id: ClientId, + consensus_height: Height, + ) -> Option { + if let Ok(mut events) = chain.query_txs(QueryTxRequest::Client(QueryClientEventRequest { + height: Height::zero(), + event_id: WithBlockDataType::UpdateClient, + client_id, + consensus_height, + })) { + if let Some(update_client_event) = events.pop() { + return Some(update_client_event.height()); + } + } + None + } + /// Handles updating the client on the destination chain + /// Returns the height at which the client update was processed fn update_client_dst( &self, src_chain_height: Height, tracking_id: &str, - ) -> Result<(), LinkError> { + ) -> Result { // Handle the update on the destination chain // Check if a consensus state at update_height exists on destination chain already - if self - .dst_chain() - .proven_client_consensus(self.dst_client_id(), src_chain_height, Height::zero()) - .is_ok() - { - return Ok(()); + if let Some(update_height) = Self::update_height( + self.dst_chain(), + self.dst_client_id().clone(), + src_chain_height, + ) { + return Ok(update_height); } let mut dst_err_ev = None; - for i in 0..MAX_RETRIES { + 'retry: for i in 0..MAX_RETRIES { let dst_update = self.build_update_client_on_dst(src_chain_height)?; info!( "sending updateClient to client hosted on destination chain for height {} [try {}/{}]", src_chain_height, - i + 1, MAX_RETRIES, + i + 1, + MAX_RETRIES, ); let tm = TrackedMsgs::new(dst_update, tracking_id); @@ -732,12 +753,15 @@ impl RelayPath { .map_err(LinkError::relayer)?; info!("result: {}", PrettyEvents(&dst_tx_events)); - dst_err_ev = dst_tx_events - .into_iter() - .find(|event| matches!(event, IbcEvent::ChainError(_))); - - if dst_err_ev.is_none() { - return Ok(()); + for event in dst_tx_events { + match event { + IbcEvent::ChainError(_) => { + dst_err_ev = Some(event); + continue 'retry; + } + IbcEvent::UpdateClient(ev) => return Ok(ev.height()), + _ => {} + } } } @@ -748,25 +772,28 @@ impl RelayPath { } /// Handles updating the client on the source chain + /// Returns the height at which the client update was processed fn update_client_src( &self, dst_chain_height: Height, tracking_id: &str, - ) -> Result<(), LinkError> { - if self - .src_chain() - .proven_client_consensus(self.src_client_id(), dst_chain_height, Height::zero()) - .is_ok() - { - return Ok(()); + ) -> Result { + if let Some(update_height) = Self::update_height( + self.src_chain(), + self.src_client_id().clone(), + dst_chain_height, + ) { + return Ok(update_height); } let mut src_err_ev = None; - for _ in 0..MAX_RETRIES { + 'retry: for i in 0..MAX_RETRIES { let src_update = self.build_update_client_on_src(dst_chain_height)?; info!( - "sending updateClient to client hosted on source chain for height {}", + "sending updateClient to client hosted on source chain for height {} [try {}/{}]", dst_chain_height, + i + 1, + MAX_RETRIES, ); let tm = TrackedMsgs::new(src_update, tracking_id); @@ -777,12 +804,15 @@ impl RelayPath { .map_err(LinkError::relayer)?; info!("result: {}", PrettyEvents(&src_tx_events)); - src_err_ev = src_tx_events - .into_iter() - .find(|event| matches!(event, IbcEvent::ChainError(_))); - - if src_err_ev.is_none() { - return Ok(()); + for event in src_tx_events { + match event { + IbcEvent::ChainError(_) => { + src_err_ev = Some(event); + continue 'retry; + } + IbcEvent::UpdateClient(ev) => return Ok(ev.height()), + _ => {} + } } } @@ -1387,10 +1417,10 @@ impl RelayPath { } // Update clients ahead of scheduling the operational data, if the delays are non-zero. - if !self.zero_delay() { + let _header_update_height = if !self.zero_delay() { debug!("connection delay is non-zero: updating client"); let target_height = od.proofs_height.increment(); - match od.target { + let update_height = match od.target { OperationalDataTarget::Source => { self.update_client_src(target_height, &od.tracking_id)? } @@ -1398,9 +1428,11 @@ impl RelayPath { self.update_client_dst(target_height, &od.tracking_id)? } }; + Some(update_height) } else { debug!("connection delay is zero: client update message will be prepended later"); - } + None + }; od.scheduled_time = Instant::now(); From a376a674b39a05ee82d7f40dc31d0299c009d6fc Mon Sep 17 00:00:00 2001 From: Shoaib Ahmed Date: Thu, 24 Feb 2022 17:52:47 +0530 Subject: [PATCH 02/45] Add query_host_consensus_state() --- relayer/src/chain.rs | 2 ++ relayer/src/chain/cosmos.rs | 9 +++++++++ relayer/src/chain/handle.rs | 7 +++++++ relayer/src/chain/handle/base.rs | 4 ++++ relayer/src/chain/mock.rs | 4 ++++ relayer/src/chain/runtime.rs | 19 +++++++++++++++++++ tools/integration-test/src/relayer/chain.rs | 4 ++++ 7 files changed, 49 insertions(+) diff --git a/relayer/src/chain.rs b/relayer/src/chain.rs index 3da5d0f0b6..e008a61b65 100644 --- a/relayer/src/chain.rs +++ b/relayer/src/chain.rs @@ -270,6 +270,8 @@ pub trait ChainEndpoint: Sized { request: QueryBlockRequest, ) -> Result<(Vec, Vec), Error>; + fn query_host_consensus_state(&self, height: ICSHeight) -> Result; + // Provable queries fn proven_client_state( &self, diff --git a/relayer/src/chain/cosmos.rs b/relayer/src/chain/cosmos.rs index cc0aedb08c..74bb8b60e8 100644 --- a/relayer/src/chain/cosmos.rs +++ b/relayer/src/chain/cosmos.rs @@ -1945,6 +1945,15 @@ impl ChainEndpoint for CosmosSdkChain { } } + fn query_host_consensus_state(&self, height: ICSHeight) -> Result { + let height = Height::try_from(height.revision_height).map_err(Error::invalid_height)?; + + let response = self + .block_on(self.rpc_client.block(height)) + .map_err(|e| Error::rpc(self.config.rpc_addr.clone(), e))?; + Ok(response.block.header.into()) + } + fn proven_client_state( &self, client_id: &ClientId, diff --git a/relayer/src/chain/handle.rs b/relayer/src/chain/handle.rs index 2b91790553..2035614483 100644 --- a/relayer/src/chain/handle.rs +++ b/relayer/src/chain/handle.rs @@ -333,6 +333,11 @@ pub enum ChainRequest { request: QueryBlockRequest, reply_to: ReplyTo<(Vec, Vec)>, }, + + QueryHostConsensusState { + height: Height, + reply_to: ReplyTo, + }, } pub trait ChainHandle: Clone + Send + Sync + Serialize + Debug + 'static { @@ -560,4 +565,6 @@ pub trait ChainHandle: Clone + Send + Sync + Serialize + Debug + 'static { &self, request: QueryBlockRequest, ) -> Result<(Vec, Vec), Error>; + + fn query_host_consensus_state(&self, request: Height) -> Result; } diff --git a/relayer/src/chain/handle/base.rs b/relayer/src/chain/handle/base.rs index 23b13c8f6f..4a76f77c6f 100644 --- a/relayer/src/chain/handle/base.rs +++ b/relayer/src/chain/handle/base.rs @@ -454,6 +454,10 @@ impl ChainHandle for BaseChainHandle { ) -> Result<(Vec, Vec), Error> { self.send(|reply_to| ChainRequest::QueryPacketEventDataFromBlocks { request, reply_to }) } + + fn query_host_consensus_state(&self, height: Height) -> Result { + self.send(|reply_to| ChainRequest::QueryHostConsensusState { height, reply_to }) + } } impl Serialize for BaseChainHandle { diff --git a/relayer/src/chain/mock.rs b/relayer/src/chain/mock.rs index ed6506f79f..0c1b016c82 100644 --- a/relayer/src/chain/mock.rs +++ b/relayer/src/chain/mock.rs @@ -302,6 +302,10 @@ impl ChainEndpoint for MockChain { unimplemented!() } + fn query_host_consensus_state(&self, _height: Height) -> Result { + unimplemented!() + } + fn proven_client_state( &self, _client_id: &ClientId, diff --git a/relayer/src/chain/runtime.rs b/relayer/src/chain/runtime.rs index 13b5eec28f..24a04534ca 100644 --- a/relayer/src/chain/runtime.rs +++ b/relayer/src/chain/runtime.rs @@ -411,6 +411,10 @@ where self.query_blocks(request, reply_to)? }, + Ok(ChainRequest::QueryHostConsensusState { height, reply_to }) => { + self.query_host_consensus_state(height, reply_to)? + }, + Err(e) => error!("received error via chain request channel: {}", e), } }, @@ -870,4 +874,19 @@ where Ok(()) } + + fn query_host_consensus_state( + &self, + height: Height, + reply_to: ReplyTo, + ) -> Result<(), Error> { + let result = self + .chain + .query_host_consensus_state(height) + .map(|h| h.wrap_any()); + + reply_to.send(result).map_err(Error::send)?; + + Ok(()) + } } diff --git a/tools/integration-test/src/relayer/chain.rs b/tools/integration-test/src/relayer/chain.rs index d7b7516f54..610e3cf7e6 100644 --- a/tools/integration-test/src/relayer/chain.rs +++ b/tools/integration-test/src/relayer/chain.rs @@ -382,4 +382,8 @@ where ) -> Result<(Vec, Vec), Error> { self.value().query_blocks(request) } + + fn query_host_consensus_state(&self, height: Height) -> Result { + self.value().query_host_consensus_state(height) + } } From 6f799dd475cb08ab6d2c84f9542767e0a03954c8 Mon Sep 17 00:00:00 2001 From: Shoaib Ahmed Date: Thu, 24 Feb 2022 18:24:18 +0530 Subject: [PATCH 03/45] Set scheduled time appropriately --- relayer/src/link/relay_path.rs | 29 ++++++++++++++++++++--------- 1 file changed, 20 insertions(+), 9 deletions(-) diff --git a/relayer/src/link/relay_path.rs b/relayer/src/link/relay_path.rs index 8e2be19d66..da189202e3 100644 --- a/relayer/src/link/relay_path.rs +++ b/relayer/src/link/relay_path.rs @@ -1,7 +1,8 @@ use alloc::collections::BTreeMap as HashMap; use alloc::collections::VecDeque; +use std::ops::Add; use std::thread; -use std::time::Instant; +use std::time::{Duration, Instant}; use itertools::Itertools; use prost_types::Any; @@ -52,6 +53,7 @@ use crate::link::relay_sender::{AsyncReply, SubmitReply}; use crate::link::relay_summary::RelaySummary; use crate::link::{pending, relay_sender}; use crate::util::queue::Queue; +use ibc::timestamp::Timestamp; const MAX_RETRIES: usize = 5; @@ -1417,25 +1419,34 @@ impl RelayPath { } // Update clients ahead of scheduling the operational data, if the delays are non-zero. - let _header_update_height = if !self.zero_delay() { + od.scheduled_time = if !self.zero_delay() { debug!("connection delay is non-zero: updating client"); let target_height = od.proofs_height.increment(); - let update_height = match od.target { + let update_time = match od.target { OperationalDataTarget::Source => { - self.update_client_src(target_height, &od.tracking_id)? + let update_height = self.update_client_src(target_height, &od.tracking_id)?; + self.src_chain() + .query_host_consensus_state(update_height) + .map_err(LinkError::relayer)? + .timestamp() } OperationalDataTarget::Destination => { - self.update_client_dst(target_height, &od.tracking_id)? + let update_height = self.update_client_dst(target_height, &od.tracking_id)?; + self.dst_chain() + .query_host_consensus_state(update_height) + .map_err(LinkError::relayer)? + .timestamp() } }; - Some(update_height) + let duration = Timestamp::now() + .duration_since(&update_time) + .unwrap_or(Duration::ZERO); + Instant::now().add(duration) } else { debug!("connection delay is zero: client update message will be prepended later"); - None + Instant::now() }; - od.scheduled_time = Instant::now(); - match od.target { OperationalDataTarget::Source => self.src_operational_data.push_back(od), OperationalDataTarget::Destination => self.dst_operational_data.push_back(od), From c7593ab808fd48aa6769b1a73d41399ad950dd5d Mon Sep 17 00:00:00 2001 From: Shoaib Ahmed Date: Thu, 24 Feb 2022 18:52:10 +0530 Subject: [PATCH 04/45] Handle connection delay only if batch contains packet events --- relayer/src/link/operational_data.rs | 15 ++++++++++++++- relayer/src/link/relay_path.rs | 9 +++++++-- 2 files changed, 21 insertions(+), 3 deletions(-) diff --git a/relayer/src/link/operational_data.rs b/relayer/src/link/operational_data.rs index 8682433c66..c2f400ffc2 100644 --- a/relayer/src/link/operational_data.rs +++ b/relayer/src/link/operational_data.rs @@ -141,7 +141,7 @@ impl OperationalData { relay_path: &RelayPath, ) -> Result { // For zero delay we prepend the client update msgs. - let client_update_msg = if relay_path.zero_delay() { + let client_update_msg = if !relay_path.conn_delay_needed(self) { let update_height = self.proofs_height.increment(); debug!( @@ -178,6 +178,19 @@ impl OperationalData { Ok(tm) } + + pub fn has_packet_msgs(&self) -> bool { + self.batch.iter().any(|msg| { + matches!( + msg.event, + IbcEvent::ReceivePacket(_) + | IbcEvent::WriteAcknowledgement(_) + | IbcEvent::AcknowledgePacket(_) + | IbcEvent::TimeoutPacket(_) + | IbcEvent::TimeoutOnClosePacket(_) + ) + }) + } } /// A lightweight informational data structure that can be extracted diff --git a/relayer/src/link/relay_path.rs b/relayer/src/link/relay_path.rs index da189202e3..4b08f50262 100644 --- a/relayer/src/link/relay_path.rs +++ b/relayer/src/link/relay_path.rs @@ -698,10 +698,15 @@ impl RelayPath { /// Returns `true` if the delay for this relaying path is zero. /// Conversely, returns `false` if the delay is non-zero. - pub fn zero_delay(&self) -> bool { + fn zero_delay(&self) -> bool { self.channel.connection_delay == ZERO_DURATION } + /// Returns `true` if the delay for this relaying path is zero. + pub fn conn_delay_needed(&self, op_data: &OperationalData) -> bool { + !self.zero_delay() && op_data.has_packet_msgs() + } + fn update_height( chain: &C, client_id: ClientId, @@ -1419,7 +1424,7 @@ impl RelayPath { } // Update clients ahead of scheduling the operational data, if the delays are non-zero. - od.scheduled_time = if !self.zero_delay() { + od.scheduled_time = if self.conn_delay_needed(&od) { debug!("connection delay is non-zero: updating client"); let target_height = od.proofs_height.increment(); let update_time = match od.target { From 79e09c37503b04350da01276ed47f529214d8d86 Mon Sep 17 00:00:00 2001 From: Shoaib Ahmed Date: Thu, 24 Feb 2022 20:56:58 +0530 Subject: [PATCH 05/45] Apply suggestion --- relayer/src/link/relay_path.rs | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/relayer/src/link/relay_path.rs b/relayer/src/link/relay_path.rs index 4b08f50262..d298ac5dcb 100644 --- a/relayer/src/link/relay_path.rs +++ b/relayer/src/link/relay_path.rs @@ -696,15 +696,15 @@ impl RelayPath { self.recv_packet_acknowledged_on_src(&rp.packet) } - /// Returns `true` if the delay for this relaying path is zero. - /// Conversely, returns `false` if the delay is non-zero. - fn zero_delay(&self) -> bool { - self.channel.connection_delay == ZERO_DURATION + /// Returns `true` if the delay for this relaying path is non-zero. + /// Conversely, returns `false` if the delay is zero. + fn has_delay(&self) -> bool { + self.channel.connection_delay != ZERO_DURATION } /// Returns `true` if the delay for this relaying path is zero. pub fn conn_delay_needed(&self, op_data: &OperationalData) -> bool { - !self.zero_delay() && op_data.has_packet_msgs() + self.has_delay() && op_data.has_packet_msgs() } fn update_height( From f8ed1ba163940506d672c250816ba1b16506795f Mon Sep 17 00:00:00 2001 From: Shoaib Ahmed Date: Thu, 24 Feb 2022 21:01:04 +0530 Subject: [PATCH 06/45] Fix mock impl --- relayer/src/chain/mock.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/relayer/src/chain/mock.rs b/relayer/src/chain/mock.rs index 0c1b016c82..a4b6f94c9a 100644 --- a/relayer/src/chain/mock.rs +++ b/relayer/src/chain/mock.rs @@ -302,7 +302,7 @@ impl ChainEndpoint for MockChain { unimplemented!() } - fn query_host_consensus_state(&self, _height: Height) -> Result { + fn query_host_consensus_state(&self, _height: Height) -> Result { unimplemented!() } From 6877290c7852eac807e385c89ce59b2f9a2449c7 Mon Sep 17 00:00:00 2001 From: Shoaib Ahmed Date: Thu, 24 Feb 2022 22:00:29 +0530 Subject: [PATCH 07/45] Fix schedule_time calculation --- relayer/src/link/relay_path.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/relayer/src/link/relay_path.rs b/relayer/src/link/relay_path.rs index d298ac5dcb..2f48170c15 100644 --- a/relayer/src/link/relay_path.rs +++ b/relayer/src/link/relay_path.rs @@ -1,6 +1,6 @@ use alloc::collections::BTreeMap as HashMap; use alloc::collections::VecDeque; -use std::ops::Add; +use std::ops::Sub; use std::thread; use std::time::{Duration, Instant}; @@ -1446,7 +1446,7 @@ impl RelayPath { let duration = Timestamp::now() .duration_since(&update_time) .unwrap_or(Duration::ZERO); - Instant::now().add(duration) + Instant::now().sub(duration) } else { debug!("connection delay is zero: client update message will be prepended later"); Instant::now() From 52a4a449653511f353130e718c3eaf5d2ae6ee1a Mon Sep 17 00:00:00 2001 From: Shoaib Ahmed Date: Thu, 24 Feb 2022 22:07:47 +0530 Subject: [PATCH 08/45] Update comment --- relayer/src/link/relay_path.rs | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/relayer/src/link/relay_path.rs b/relayer/src/link/relay_path.rs index 2f48170c15..6adc90bfb6 100644 --- a/relayer/src/link/relay_path.rs +++ b/relayer/src/link/relay_path.rs @@ -1425,7 +1425,7 @@ impl RelayPath { // Update clients ahead of scheduling the operational data, if the delays are non-zero. od.scheduled_time = if self.conn_delay_needed(&od) { - debug!("connection delay is non-zero: updating client"); + debug!("connection delay must be taken into account: updating client"); let target_height = od.proofs_height.increment(); let update_time = match od.target { OperationalDataTarget::Source => { @@ -1448,7 +1448,10 @@ impl RelayPath { .unwrap_or(Duration::ZERO); Instant::now().sub(duration) } else { - debug!("connection delay is zero: client update message will be prepended later"); + debug!( + "connection delay need not be taken into account: client update message will be \ + prepended later" + ); Instant::now() }; From eab69081dd485af7f507fe6619a466b7c9604843 Mon Sep 17 00:00:00 2001 From: Shoaib Ahmed Date: Fri, 25 Feb 2022 00:28:06 +0530 Subject: [PATCH 09/45] Fix comment --- relayer/src/link/relay_path.rs | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/relayer/src/link/relay_path.rs b/relayer/src/link/relay_path.rs index 6adc90bfb6..0da8cf4ea4 100644 --- a/relayer/src/link/relay_path.rs +++ b/relayer/src/link/relay_path.rs @@ -696,13 +696,14 @@ impl RelayPath { self.recv_packet_acknowledged_on_src(&rp.packet) } - /// Returns `true` if the delay for this relaying path is non-zero. + /// Returns `true` if the connection delay for this relaying path is non-zero. /// Conversely, returns `false` if the delay is zero. fn has_delay(&self) -> bool { self.channel.connection_delay != ZERO_DURATION } - /// Returns `true` if the delay for this relaying path is zero. + /// Returns `true` iff the connection delay for this relaying path is non-zero and `op_data` + /// contains packet messages. pub fn conn_delay_needed(&self, op_data: &OperationalData) -> bool { self.has_delay() && op_data.has_packet_msgs() } From 3fcb91f631d738df04362015cd6dff19840ab54d Mon Sep 17 00:00:00 2001 From: Shoaib Ahmed Date: Fri, 25 Feb 2022 00:36:57 +0530 Subject: [PATCH 10/45] Add .changelog entry --- .../bug-fixes/ibc-relayer/1772-fix-conn-delay-check.md | 2 ++ 1 file changed, 2 insertions(+) create mode 100644 .changelog/unreleased/bug-fixes/ibc-relayer/1772-fix-conn-delay-check.md diff --git a/.changelog/unreleased/bug-fixes/ibc-relayer/1772-fix-conn-delay-check.md b/.changelog/unreleased/bug-fixes/ibc-relayer/1772-fix-conn-delay-check.md new file mode 100644 index 0000000000..3bbf8ce62f --- /dev/null +++ b/.changelog/unreleased/bug-fixes/ibc-relayer/1772-fix-conn-delay-check.md @@ -0,0 +1,2 @@ +- Fix the connection delay logic to use the timestamp of the host block when the client update header was installed. + ([#1772](https://github.com/informalsystems/ibc-rs/issues/1772)) \ No newline at end of file From 09b1da6964100762e014942386dd967d119570a6 Mon Sep 17 00:00:00 2001 From: Shoaib Ahmed Date: Fri, 25 Feb 2022 00:45:04 +0530 Subject: [PATCH 11/45] Improve has_packet_msgs() --- relayer/src/link/operational_data.rs | 11 +---------- 1 file changed, 1 insertion(+), 10 deletions(-) diff --git a/relayer/src/link/operational_data.rs b/relayer/src/link/operational_data.rs index c2f400ffc2..71db7b7a68 100644 --- a/relayer/src/link/operational_data.rs +++ b/relayer/src/link/operational_data.rs @@ -180,16 +180,7 @@ impl OperationalData { } pub fn has_packet_msgs(&self) -> bool { - self.batch.iter().any(|msg| { - matches!( - msg.event, - IbcEvent::ReceivePacket(_) - | IbcEvent::WriteAcknowledgement(_) - | IbcEvent::AcknowledgePacket(_) - | IbcEvent::TimeoutPacket(_) - | IbcEvent::TimeoutOnClosePacket(_) - ) - }) + self.batch.iter().any(|msg| msg.event.packet().is_some()) } } From 6181fb955d257f8441b067d94660e0ece4655461 Mon Sep 17 00:00:00 2001 From: Shoaib Ahmed Date: Tue, 8 Mar 2022 18:07:21 +0530 Subject: [PATCH 12/45] Implement query_host_consensus_state() for wrapper chain handles --- relayer/src/chain/handle.rs | 2 +- relayer/src/chain/handle/cache.rs | 4 ++++ relayer/src/chain/handle/counting.rs | 4 ++++ 3 files changed, 9 insertions(+), 1 deletion(-) diff --git a/relayer/src/chain/handle.rs b/relayer/src/chain/handle.rs index 2035614483..4433063272 100644 --- a/relayer/src/chain/handle.rs +++ b/relayer/src/chain/handle.rs @@ -566,5 +566,5 @@ pub trait ChainHandle: Clone + Send + Sync + Serialize + Debug + 'static { request: QueryBlockRequest, ) -> Result<(Vec, Vec), Error>; - fn query_host_consensus_state(&self, request: Height) -> Result; + fn query_host_consensus_state(&self, height: Height) -> Result; } diff --git a/relayer/src/chain/handle/cache.rs b/relayer/src/chain/handle/cache.rs index dac48d1ea0..203aa4d0d8 100644 --- a/relayer/src/chain/handle/cache.rs +++ b/relayer/src/chain/handle/cache.rs @@ -412,4 +412,8 @@ impl ChainHandle for CachingChainHandle { ) -> Result<(Vec, Vec), Error> { self.inner().query_blocks(request) } + + fn query_host_consensus_state(&self, height: Height) -> Result { + self.inner.query_host_consensus_state(height) + } } diff --git a/relayer/src/chain/handle/counting.rs b/relayer/src/chain/handle/counting.rs index 948a24bb11..98afec74fa 100644 --- a/relayer/src/chain/handle/counting.rs +++ b/relayer/src/chain/handle/counting.rs @@ -448,4 +448,8 @@ impl ChainHandle for CountingChainHandle { self.inc_metric("query_blocks"); self.inner().query_blocks(request) } + + fn query_host_consensus_state(&self, height: Height) -> Result { + self.inner.query_host_consensus_state(height) + } } From 2b8ef52a444545d06f53d1a39b8a265d3a3b8fa4 Mon Sep 17 00:00:00 2001 From: Shoaib Ahmed Date: Fri, 11 Mar 2022 00:39:15 +0530 Subject: [PATCH 13/45] Apply suggestion --- relayer/src/link/error.rs | 4 +- relayer/src/link/relay_path.rs | 74 +++++++++++++++++++++------------- 2 files changed, 48 insertions(+), 30 deletions(-) diff --git a/relayer/src/link/error.rs b/relayer/src/link/error.rs index 0bdd67fbdf..67b11869bc 100644 --- a/relayer/src/link/error.rs +++ b/relayer/src/link/error.rs @@ -143,7 +143,9 @@ define_error! { e.channel_id, e.chain_id) }, - } + UpdateClientFailed + |_| { "failed to update client" }, + } } impl HasExpiredOrFrozenError for LinkErrorDetail { diff --git a/relayer/src/link/relay_path.rs b/relayer/src/link/relay_path.rs index 0da8cf4ea4..61247d1f37 100644 --- a/relayer/src/link/relay_path.rs +++ b/relayer/src/link/relay_path.rs @@ -792,42 +792,58 @@ impl RelayPath { dst_chain_height, ) { return Ok(update_height); + } else { + self.do_update_client_src(dst_chain_height, tracking_id, MAX_RETRIES) } + } - let mut src_err_ev = None; - 'retry: for i in 0..MAX_RETRIES { - let src_update = self.build_update_client_on_src(dst_chain_height)?; - info!( - "sending updateClient to client hosted on source chain for height {} [try {}/{}]", - dst_chain_height, - i + 1, - MAX_RETRIES, - ); - - let tm = TrackedMsgs::new(src_update, tracking_id); - - let src_tx_events = self - .src_chain() - .send_messages_and_wait_commit(tm) - .map_err(LinkError::relayer)?; - info!("result: {}", PrettyEvents(&src_tx_events)); + /// Perform actual update_client_src with retries. + /// + /// Note that the retry is only performed in the case when there + /// is a ChainError event. It would return error immediately if + /// there are other errors returned from calls such as + /// build_update_client_on_src. + fn do_update_client_src( + &self, + dst_chain_height: Height, + tracking_id: &str, + retries_left: usize, + ) -> Result { + info!( "sending update_client to client hosted on source chain for height {} (retries left: {})", dst_chain_height, retries_left ); - for event in src_tx_events { - match event { - IbcEvent::ChainError(_) => { - src_err_ev = Some(event); - continue 'retry; + let src_update = self.build_update_client_on_src(dst_chain_height)?; + let tm = TrackedMsgs::new(src_update, tracking_id); + let src_tx_events = self + .src_chain() + .send_messages_and_wait_commit(tm) + .map_err(LinkError::relayer)?; + info!("result: {}", PrettyEvents(&src_tx_events)); + + for event in src_tx_events { + match event { + err_event @ IbcEvent::ChainError(_) => { + if retries_left == 0 { + return Err(LinkError::client(ForeignClientError::chain_error_event( + self.src_chain().id(), + err_event, + ))); + } else { + return self.do_update_client_src( + dst_chain_height, + tracking_id, + retries_left - 1, + ); } - IbcEvent::UpdateClient(ev) => return Ok(ev.height()), - _ => {} } + IbcEvent::UpdateClient(ev) => return Ok(ev.height()), + _ => {} } } - - Err(LinkError::client(ForeignClientError::chain_error_event( - self.src_chain().id(), - src_err_ev.unwrap(), - ))) + if retries_left == 0 { + Err(LinkError::update_client_failed()) + } else { + self.do_update_client_src(dst_chain_height, tracking_id, retries_left - 1) + } } /// Returns relevant packet events for building RecvPacket and timeout messages. From 02e61701c3a690aca5dbf75a5786428cab134514 Mon Sep 17 00:00:00 2001 From: Shoaib Ahmed Date: Fri, 11 Mar 2022 00:44:19 +0530 Subject: [PATCH 14/45] Fix clippy errors --- relayer/src/link/relay_path.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/relayer/src/link/relay_path.rs b/relayer/src/link/relay_path.rs index 61247d1f37..1bd7d4c942 100644 --- a/relayer/src/link/relay_path.rs +++ b/relayer/src/link/relay_path.rs @@ -791,7 +791,7 @@ impl RelayPath { self.src_client_id().clone(), dst_chain_height, ) { - return Ok(update_height); + Ok(update_height) } else { self.do_update_client_src(dst_chain_height, tracking_id, MAX_RETRIES) } From 9192eecaa2b5582d0277179b7dda984d27d83210 Mon Sep 17 00:00:00 2001 From: Shoaib Ahmed Date: Mon, 14 Mar 2022 14:47:58 +0530 Subject: [PATCH 15/45] Rework update methods --- relayer/src/link/relay_path.rs | 151 ++++++++++++++++++--------------- 1 file changed, 84 insertions(+), 67 deletions(-) diff --git a/relayer/src/link/relay_path.rs b/relayer/src/link/relay_path.rs index 1bd7d4c942..ccc35c5ef7 100644 --- a/relayer/src/link/relay_path.rs +++ b/relayer/src/link/relay_path.rs @@ -10,7 +10,9 @@ use tracing::{debug, error, info, span, trace, warn, Level}; use ibc::{ core::{ - ics02_client::client_consensus::QueryClientEventRequest, + ics02_client::{ + client_consensus::QueryClientEventRequest, events::UpdateClient as UpdateClientEvent, + }, ics04_channel::{ channel::{ChannelEnd, Order, QueryPacketEventDataRequest, State as ChannelState}, events::{SendPacket, WriteAcknowledgement}, @@ -712,18 +714,40 @@ impl RelayPath { chain: &C, client_id: ClientId, consensus_height: Height, - ) -> Option { - if let Ok(mut events) = chain.query_txs(QueryTxRequest::Client(QueryClientEventRequest { - height: Height::zero(), - event_id: WithBlockDataType::UpdateClient, - client_id, - consensus_height, - })) { - if let Some(update_client_event) = events.pop() { - return Some(update_client_event.height()); + ) -> Result { + let mut events = chain + .query_txs(QueryTxRequest::Client(QueryClientEventRequest { + height: Height::zero(), + event_id: WithBlockDataType::UpdateClient, + client_id, + consensus_height, + })) + .map_err(|e| LinkError::query(chain.id(), e))?; + + match events.pop() { + Some(IbcEvent::UpdateClient(event)) => Ok(event.height()), + Some(event) => Err(LinkError::unexpected_event(event)), + None => Err(LinkError::unexpected_event(IbcEvent::default())), + } + } + + #[inline] + fn partition_events( + mut tx_events: Vec, + ) -> (Vec, Vec, Vec) { + let mut errors = Vec::::new(); + let mut updates = Vec::::new(); + let mut others = Vec::::new(); + + while let Some(event) = tx_events.pop() { + match event { + IbcEvent::ChainError(_) => errors.push(event), + IbcEvent::UpdateClient(event) => updates.push(event), + _ => others.push(event), } } - None + + (errors, updates, others) } /// Handles updating the client on the destination chain @@ -733,19 +757,19 @@ impl RelayPath { src_chain_height: Height, tracking_id: &str, ) -> Result { - // Handle the update on the destination chain - // Check if a consensus state at update_height exists on destination chain already - if let Some(update_height) = Self::update_height( - self.dst_chain(), - self.dst_client_id().clone(), - src_chain_height, - ) { - return Ok(update_height); - } - - let mut dst_err_ev = None; - 'retry: for i in 0..MAX_RETRIES { + for i in 0..MAX_RETRIES { let dst_update = self.build_update_client_on_dst(src_chain_height)?; + if dst_update.is_empty() { + match Self::update_height( + self.dst_chain(), + self.dst_client_id().clone(), + src_chain_height, + ) { + Ok(height) => return Ok(height), + Err(_) => continue, + } + } + info!( "sending updateClient to client hosted on destination chain for height {} [try {}/{}]", src_chain_height, @@ -754,29 +778,26 @@ impl RelayPath { ); let tm = TrackedMsgs::new(dst_update, tracking_id); - let dst_tx_events = self .dst_chain() .send_messages_and_wait_commit(tm) .map_err(LinkError::relayer)?; info!("result: {}", PrettyEvents(&dst_tx_events)); - for event in dst_tx_events { - match event { - IbcEvent::ChainError(_) => { - dst_err_ev = Some(event); - continue 'retry; - } - IbcEvent::UpdateClient(ev) => return Ok(ev.height()), - _ => {} - } + if dst_tx_events + .iter() + .any(|e| matches!(e, IbcEvent::ChainError(_))) + { + continue; + } else if let Some(IbcEvent::UpdateClient(event)) = dst_tx_events + .iter() + .find(|e| matches!(e, IbcEvent::UpdateClient(_))) + { + return Ok(event.height()); } } - Err(LinkError::client(ForeignClientError::chain_error_event( - self.dst_chain().id(), - dst_err_ev.unwrap(), - ))) + Err(LinkError::update_client_failed()) } /// Handles updating the client on the source chain @@ -786,15 +807,7 @@ impl RelayPath { dst_chain_height: Height, tracking_id: &str, ) -> Result { - if let Some(update_height) = Self::update_height( - self.src_chain(), - self.src_client_id().clone(), - dst_chain_height, - ) { - Ok(update_height) - } else { - self.do_update_client_src(dst_chain_height, tracking_id, MAX_RETRIES) - } + self.do_update_client_src(dst_chain_height, tracking_id, MAX_RETRIES) } /// Perform actual update_client_src with retries. @@ -819,30 +832,34 @@ impl RelayPath { .map_err(LinkError::relayer)?; info!("result: {}", PrettyEvents(&src_tx_events)); - for event in src_tx_events { - match event { - err_event @ IbcEvent::ChainError(_) => { - if retries_left == 0 { - return Err(LinkError::client(ForeignClientError::chain_error_event( - self.src_chain().id(), - err_event, - ))); - } else { - return self.do_update_client_src( - dst_chain_height, - tracking_id, - retries_left - 1, - ); + let (mut errors, mut updates, mut others) = Self::partition_events(src_tx_events); + match (errors.pop(), updates.pop(), others.pop()) { + (None, Some(update_event), None) => Ok(update_event.height()), + (Some(chain_error), _, _) => { + if retries_left == 0 { + Err(LinkError::client(ForeignClientError::chain_error_event( + self.src_chain().id(), + chain_error, + ))) + } else { + self.do_update_client_src(dst_chain_height, tracking_id, retries_left - 1) + } + } + (None, None, None) => { + // `tm` is empty and update wasn't required + match Self::update_height( + self.src_chain(), + self.src_client_id().clone(), + dst_chain_height, + ) { + Ok(update_height) => Ok(update_height), + Err(_) if retries_left > 0 => { + self.do_update_client_src(dst_chain_height, tracking_id, retries_left - 1) } + _ => Err(LinkError::update_client_failed()), } - IbcEvent::UpdateClient(ev) => return Ok(ev.height()), - _ => {} } - } - if retries_left == 0 { - Err(LinkError::update_client_failed()) - } else { - self.do_update_client_src(dst_chain_height, tracking_id, retries_left - 1) + (None, _, _) => Err(LinkError::update_client_failed()), /* maybe misbehaviour events */ } } From e61c05ef2d5cb86d4541d5c689fa572cd6bce331 Mon Sep 17 00:00:00 2001 From: Shoaib Ahmed Date: Mon, 14 Mar 2022 16:04:47 +0530 Subject: [PATCH 16/45] Minor refactoring --- relayer/src/link/relay_path.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/relayer/src/link/relay_path.rs b/relayer/src/link/relay_path.rs index ccc35c5ef7..17f06dd0b7 100644 --- a/relayer/src/link/relay_path.rs +++ b/relayer/src/link/relay_path.rs @@ -2,7 +2,7 @@ use alloc::collections::BTreeMap as HashMap; use alloc::collections::VecDeque; use std::ops::Sub; use std::thread; -use std::time::{Duration, Instant}; +use std::time::Instant; use itertools::Itertools; use prost_types::Any; @@ -1479,7 +1479,7 @@ impl RelayPath { }; let duration = Timestamp::now() .duration_since(&update_time) - .unwrap_or(Duration::ZERO); + .unwrap_or_default(); Instant::now().sub(duration) } else { debug!( From 6d1a0de1a040573a3266705479c51e7cdcfd8fa3 Mon Sep 17 00:00:00 2001 From: Shoaib Ahmed Date: Mon, 14 Mar 2022 20:06:47 +0530 Subject: [PATCH 17/45] Avoid partitioning events --- relayer/src/link/relay_path.rs | 28 ++++++++++++++++------------ 1 file changed, 16 insertions(+), 12 deletions(-) diff --git a/relayer/src/link/relay_path.rs b/relayer/src/link/relay_path.rs index 17f06dd0b7..fc101e2bd4 100644 --- a/relayer/src/link/relay_path.rs +++ b/relayer/src/link/relay_path.rs @@ -732,22 +732,26 @@ impl RelayPath { } #[inline] - fn partition_events( + fn event_per_type( mut tx_events: Vec, - ) -> (Vec, Vec, Vec) { - let mut errors = Vec::::new(); - let mut updates = Vec::::new(); - let mut others = Vec::::new(); + ) -> ( + Option, + Option, + Option, + ) { + let mut error = None; + let mut update = None; + let mut other = None; while let Some(event) = tx_events.pop() { match event { - IbcEvent::ChainError(_) => errors.push(event), - IbcEvent::UpdateClient(event) => updates.push(event), - _ => others.push(event), + IbcEvent::ChainError(_) => error = Some(event), + IbcEvent::UpdateClient(event) => update = Some(event), + _ => other = Some(event), } } - (errors, updates, others) + (error, update, other) } /// Handles updating the client on the destination chain @@ -832,8 +836,8 @@ impl RelayPath { .map_err(LinkError::relayer)?; info!("result: {}", PrettyEvents(&src_tx_events)); - let (mut errors, mut updates, mut others) = Self::partition_events(src_tx_events); - match (errors.pop(), updates.pop(), others.pop()) { + let (error, update, other) = Self::event_per_type(src_tx_events); + match (error, update, other) { (None, Some(update_event), None) => Ok(update_event.height()), (Some(chain_error), _, _) => { if retries_left == 0 { @@ -846,7 +850,7 @@ impl RelayPath { } } (None, None, None) => { - // `tm` is empty and update wasn't required + // `tm` was empty and update wasn't required match Self::update_height( self.src_chain(), self.src_client_id().clone(), From 06919879f0eb1e8591134395b82ce33bd6062422 Mon Sep 17 00:00:00 2001 From: Shoaib Ahmed Date: Mon, 14 Mar 2022 22:45:28 +0530 Subject: [PATCH 18/45] Handle misbehaviour case explicitly --- relayer/src/link/relay_path.rs | 8 +++++++- 1 file changed, 7 insertions(+), 1 deletion(-) diff --git a/relayer/src/link/relay_path.rs b/relayer/src/link/relay_path.rs index fc101e2bd4..3aa07ac58d 100644 --- a/relayer/src/link/relay_path.rs +++ b/relayer/src/link/relay_path.rs @@ -863,7 +863,13 @@ impl RelayPath { _ => Err(LinkError::update_client_failed()), } } - (None, _, _) => Err(LinkError::update_client_failed()), /* maybe misbehaviour events */ + (_, _, event) => { + if !matches!(event, Some(IbcEvent::ClientMisbehaviour(_))) && retries_left > 0 { + self.do_update_client_src(dst_chain_height, tracking_id, retries_left - 1) + } else { + Err(LinkError::update_client_failed()) + } + } } } From aa799bf05a13977ff2187917797d9133f8a6d0ba Mon Sep 17 00:00:00 2001 From: Soares Chen Date: Tue, 15 Mar 2022 11:43:43 +0100 Subject: [PATCH 19/45] Add connection delay test --- Cargo.lock | 1 + tools/integration-test/Cargo.toml | 1 + .../src/tests/connection_delay.rs | 106 ++++++++++++++++++ tools/integration-test/src/tests/mod.rs | 1 + tools/integration-test/src/util/assert.rs | 11 ++ 5 files changed, 120 insertions(+) create mode 100644 tools/integration-test/src/tests/connection_delay.rs diff --git a/Cargo.lock b/Cargo.lock index 93bb148c24..53648c5fe9 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1399,6 +1399,7 @@ dependencies = [ "subtle-encoding", "tendermint", "tendermint-rpc", + "time", "tokio", "toml", "tracing", diff --git a/tools/integration-test/Cargo.toml b/tools/integration-test/Cargo.toml index a21e96a35c..19e54a53cb 100644 --- a/tools/integration-test/Cargo.toml +++ b/tools/integration-test/Cargo.toml @@ -41,6 +41,7 @@ prost = { version = "0.9", default-features = false } prost-types = { version = "0.9", default-features = false } semver = "1.0.6" flex-error = "0.4.4" +time = "0.3" [features] default = [] diff --git a/tools/integration-test/src/tests/connection_delay.rs b/tools/integration-test/src/tests/connection_delay.rs new file mode 100644 index 0000000000..1a913499d6 --- /dev/null +++ b/tools/integration-test/src/tests/connection_delay.rs @@ -0,0 +1,106 @@ +use core::time::Duration; +use time::OffsetDateTime; + +use crate::ibc::denom::derive_ibc_denom; +use crate::prelude::*; +use crate::util::random::random_u64_range; + +const CONNECTION_DELAY: Duration = Duration::from_secs(10); + +#[test] +fn test_connection_delay() -> Result<(), Error> { + run_binary_channel_test(&ConnectionDelayTest) +} + +pub struct ConnectionDelayTest; + +impl TestOverrides for ConnectionDelayTest { + fn connection_delay(&self) -> Duration { + CONNECTION_DELAY + } +} + +impl BinaryChannelTest for ConnectionDelayTest { + fn run( + &self, + _config: &TestConfig, + relayer: RelayerDriver, + chains: ConnectedChains, + channel: ConnectedChannel, + ) -> Result<(), Error> { + relayer.with_supervisor(|| { + let denom_a = chains.node_a.denom(); + + let wallet_a = chains.node_a.wallets().user1().cloned(); + let wallet_b = chains.node_b.wallets().user1().cloned(); + + let balance_a = chains + .node_a + .chain_driver() + .query_balance(&wallet_a.address(), &denom_a)?; + + let a_to_b_amount = random_u64_range(1000, 5000); + + info!( + "Sending IBC transfer from chain {} to chain {} with amount of {} {}", + chains.chain_id_a(), + chains.chain_id_b(), + a_to_b_amount, + denom_a + ); + + chains.node_a.chain_driver().transfer_token( + &channel.port_a.as_ref(), + &channel.channel_id_a.as_ref(), + &wallet_a.address(), + &wallet_b.address(), + a_to_b_amount, + &denom_a, + )?; + + let time1 = OffsetDateTime::now_utc(); + + let denom_b = derive_ibc_denom( + &channel.port_b.as_ref(), + &channel.channel_id_b.as_ref(), + &denom_a, + )?; + + info!( + "Waiting for user on chain B to receive IBC transferred amount of {} {}", + a_to_b_amount, denom_b + ); + + chains.node_a.chain_driver().assert_eventual_wallet_amount( + &wallet_a.as_ref(), + balance_a - a_to_b_amount, + &denom_a, + )?; + + chains.node_b.chain_driver().assert_eventual_wallet_amount( + &wallet_b.as_ref(), + a_to_b_amount, + &denom_b.as_ref(), + )?; + + info!( + "successfully performed IBC transfer from chain {} to chain {}", + chains.chain_id_a(), + chains.chain_id_b(), + ); + + let time2 = OffsetDateTime::now_utc(); + + assert_gt( + &format!( + "Expect IBC transfer to only be successfull after {}s", + CONNECTION_DELAY.as_secs() + ), + &(time2 - time1).try_into().unwrap(), + &CONNECTION_DELAY, + )?; + + Ok(()) + }) + } +} diff --git a/tools/integration-test/src/tests/mod.rs b/tools/integration-test/src/tests/mod.rs index 101834c2b9..d03b378777 100644 --- a/tools/integration-test/src/tests/mod.rs +++ b/tools/integration-test/src/tests/mod.rs @@ -7,6 +7,7 @@ pub mod clear_packet; pub mod client_expiration; +pub mod connection_delay; pub mod memo; pub mod supervisor; pub mod ternary_transfer; diff --git a/tools/integration-test/src/util/assert.rs b/tools/integration-test/src/util/assert.rs index 28cf0f4a0d..17dc85c1df 100644 --- a/tools/integration-test/src/util/assert.rs +++ b/tools/integration-test/src/util/assert.rs @@ -23,3 +23,14 @@ pub fn assert_not_eq(message: &str, left: &T, right: &T) -> Resul ))) } } + +pub fn assert_gt(message: &str, left: &T, right: &T) -> Result<(), Error> { + if left > right { + Ok(()) + } else { + Err(Error::assertion(format!( + "expect left ({:?}) to be greater than right ({:?}): {}", + left, right, message + ))) + } +} From a9abda4f4582ab666310ba0ab5e347ef853f30f2 Mon Sep 17 00:00:00 2001 From: Shoaib Ahmed Date: Wed, 16 Mar 2022 04:03:55 +0530 Subject: [PATCH 20/45] Use first UpdateClient event to determine processed_height --- relayer/src/link/relay_path.rs | 11 ++++++++--- 1 file changed, 8 insertions(+), 3 deletions(-) diff --git a/relayer/src/link/relay_path.rs b/relayer/src/link/relay_path.rs index 3aa07ac58d..f87964175b 100644 --- a/relayer/src/link/relay_path.rs +++ b/relayer/src/link/relay_path.rs @@ -710,12 +710,13 @@ impl RelayPath { self.has_delay() && op_data.has_packet_msgs() } + // Returns the `processed_height` for the consensus state at specified height fn update_height( chain: &C, client_id: ClientId, consensus_height: Height, ) -> Result { - let mut events = chain + let events = chain .query_txs(QueryTxRequest::Client(QueryClientEventRequest { height: Height::zero(), event_id: WithBlockDataType::UpdateClient, @@ -724,9 +725,13 @@ impl RelayPath { })) .map_err(|e| LinkError::query(chain.id(), e))?; - match events.pop() { + // The handler may treat redundant updates as no-ops and emit `UpdateClient` events for them + // but the `processed_height` is the height at which the first `UpdateClient` event for this + // consensus state/height was emitted. We expect that these events are received in the exact + // same order in which they were emitted. + match events.first() { Some(IbcEvent::UpdateClient(event)) => Ok(event.height()), - Some(event) => Err(LinkError::unexpected_event(event)), + Some(event) => Err(LinkError::unexpected_event(event.clone())), None => Err(LinkError::unexpected_event(IbcEvent::default())), } } From 54281b67a42d2b40c825c77098a36d53a2617b7d Mon Sep 17 00:00:00 2001 From: Shoaib Ahmed Date: Wed, 16 Mar 2022 04:12:52 +0530 Subject: [PATCH 21/45] Add comment --- relayer/src/link/relay_path.rs | 3 +++ 1 file changed, 3 insertions(+) diff --git a/relayer/src/link/relay_path.rs b/relayer/src/link/relay_path.rs index f87964175b..b54788a26f 100644 --- a/relayer/src/link/relay_path.rs +++ b/relayer/src/link/relay_path.rs @@ -1492,6 +1492,9 @@ impl RelayPath { .timestamp() } }; + + // set the `scheduled_time` to an instant in the past, i.e. when this client update was + // first processed (`processed_time`) let duration = Timestamp::now() .duration_since(&update_time) .unwrap_or_default(); From fc5c8666fd01ae4521c5877e6c09d4e81f2db7f0 Mon Sep 17 00:00:00 2001 From: Shoaib Ahmed Date: Wed, 16 Mar 2022 04:49:19 +0530 Subject: [PATCH 22/45] Check for frozen clients in case of misbehavior during client update --- relayer/src/link/operational_data.rs | 18 +++++++++++++++++- 1 file changed, 17 insertions(+), 1 deletion(-) diff --git a/relayer/src/link/operational_data.rs b/relayer/src/link/operational_data.rs index 71db7b7a68..9037eecc62 100644 --- a/relayer/src/link/operational_data.rs +++ b/relayer/src/link/operational_data.rs @@ -7,6 +7,7 @@ use nanoid::nanoid; use prost_types::Any; use tracing::{debug, info}; +use ibc::core::ics02_client::client_state::ClientState; use ibc::events::IbcEvent; use ibc::Height; @@ -162,7 +163,22 @@ impl OperationalData { client_update_opt.pop() } else { - None + let client_state = match self.target { + OperationalDataTarget::Source => relay_path + .src_chain() + .query_client_state(relay_path.src_client_id(), Height::zero()) + .map_err(|e| LinkError::query(relay_path.src_chain().id(), e))?, + OperationalDataTarget::Destination => relay_path + .dst_chain() + .query_client_state(relay_path.dst_client_id(), Height::zero()) + .map_err(|e| LinkError::query(relay_path.dst_chain().id(), e))?, + }; + + if client_state.is_frozen() { + return Ok(TrackedMsgs::new(vec![], &self.tracking_id)); + } else { + None + } }; let msgs: Vec = match client_update_msg { From 792011e8eb9d1d5b4149e69f56c83c7f66204432 Mon Sep 17 00:00:00 2001 From: Shoaib Ahmed Date: Wed, 16 Mar 2022 04:52:58 +0530 Subject: [PATCH 23/45] Adjust connection delay for avg block time --- relayer/src/link/relay_path.rs | 19 ++++++++++++++----- 1 file changed, 14 insertions(+), 5 deletions(-) diff --git a/relayer/src/link/relay_path.rs b/relayer/src/link/relay_path.rs index b54788a26f..875d9c90fb 100644 --- a/relayer/src/link/relay_path.rs +++ b/relayer/src/link/relay_path.rs @@ -1,8 +1,8 @@ use alloc::collections::BTreeMap as HashMap; use alloc::collections::VecDeque; -use std::ops::Sub; +use std::ops::{Add, Sub}; use std::thread; -use std::time::Instant; +use std::time::{Duration, Instant}; use itertools::Itertools; use prost_types::Any; @@ -28,7 +28,7 @@ use ibc::{ events::{IbcEvent, PrettyEvents, WithBlockDataType}, query::{QueryBlockRequest, QueryTxRequest}, signer::Signer, - timestamp::ZERO_DURATION, + timestamp::{Timestamp, ZERO_DURATION}, tx_msg::Msg, Height, }; @@ -55,9 +55,9 @@ use crate::link::relay_sender::{AsyncReply, SubmitReply}; use crate::link::relay_summary::RelaySummary; use crate::link::{pending, relay_sender}; use crate::util::queue::Queue; -use ibc::timestamp::Timestamp; const MAX_RETRIES: usize = 5; +const AVG_BLOCK_TIME_SECS: u64 = 5; pub struct RelayPath { channel: Channel, @@ -1541,7 +1541,16 @@ impl RelayPath { (true_res, false_res) } - let connection_delay = self.channel.connection_delay; + let connection_delay = { + match self.channel.connection_delay { + Duration::ZERO => self.channel.connection_delay, + _ => self + .channel + .connection_delay + .add(Duration::from_secs(AVG_BLOCK_TIME_SECS)), + } + }; + let (elapsed_src_ods, unelapsed_src_ods) = partition(self.src_operational_data.take(), |op| { op.scheduled_time.elapsed() > connection_delay From 75d7501ef6c39be447298a7f077222a36a7cd66e Mon Sep 17 00:00:00 2001 From: Shoaib Ahmed Date: Wed, 16 Mar 2022 14:44:50 +0530 Subject: [PATCH 24/45] Add TODO for moving to /header endpoint --- relayer/src/chain/cosmos.rs | 1 + 1 file changed, 1 insertion(+) diff --git a/relayer/src/chain/cosmos.rs b/relayer/src/chain/cosmos.rs index 74bb8b60e8..63ccd76862 100644 --- a/relayer/src/chain/cosmos.rs +++ b/relayer/src/chain/cosmos.rs @@ -1948,6 +1948,7 @@ impl ChainEndpoint for CosmosSdkChain { fn query_host_consensus_state(&self, height: ICSHeight) -> Result { let height = Height::try_from(height.revision_height).map_err(Error::invalid_height)?; + // TODO(hu55a1n1): use the `/header` RPC endpoint instead when we move to tendermint v0.35.x let response = self .block_on(self.rpc_client.block(height)) .map_err(|e| Error::rpc(self.config.rpc_addr.clone(), e))?; From bf799652a579700a736c423eec210df869f59dc4 Mon Sep 17 00:00:00 2001 From: Shoaib Ahmed Date: Wed, 16 Mar 2022 15:03:51 +0530 Subject: [PATCH 25/45] Make update_client_dst() similar to update_client_src() --- relayer/src/link/relay_path.rs | 85 +++++++++++++++++++++------------- 1 file changed, 52 insertions(+), 33 deletions(-) diff --git a/relayer/src/link/relay_path.rs b/relayer/src/link/relay_path.rs index 875d9c90fb..6811df338b 100644 --- a/relayer/src/link/relay_path.rs +++ b/relayer/src/link/relay_path.rs @@ -182,7 +182,7 @@ impl RelayPath { fn dst_channel(&self, height: Height) -> Result { self.dst_chain() .query_channel(self.dst_port_id(), self.dst_channel_id(), height) - .map_err(|e| LinkError::channel(ChannelError::query(self.src_chain().id(), e))) + .map_err(|e| LinkError::channel(ChannelError::query(self.dst_chain().id(), e))) } fn src_signer(&self) -> Result { @@ -766,47 +766,66 @@ impl RelayPath { src_chain_height: Height, tracking_id: &str, ) -> Result { - for i in 0..MAX_RETRIES { - let dst_update = self.build_update_client_on_dst(src_chain_height)?; - if dst_update.is_empty() { + self.do_update_client_dst(src_chain_height, tracking_id, MAX_RETRIES) + } + + /// Perform actual update_client_dst with retries. + /// + /// Note that the retry is only performed in the case when there + /// is a ChainError event. It would return error immediately if + /// there are other errors returned from calls such as + /// build_update_client_on_dst. + fn do_update_client_dst( + &self, + src_chain_height: Height, + tracking_id: &str, + retries_left: usize, + ) -> Result { + info!( "sending update_client to client hosted on source chain for height {} (retries left: {})", src_chain_height, retries_left ); + + let dst_update = self.build_update_client_on_dst(src_chain_height)?; + let tm = TrackedMsgs::new(dst_update, tracking_id); + let dst_tx_events = self + .dst_chain() + .send_messages_and_wait_commit(tm) + .map_err(LinkError::relayer)?; + info!("result: {}", PrettyEvents(&dst_tx_events)); + + let (error, update, other) = Self::event_per_type(dst_tx_events); + match (error, update, other) { + (None, Some(update_event), None) => Ok(update_event.height()), + (Some(chain_error), _, _) => { + if retries_left == 0 { + Err(LinkError::client(ForeignClientError::chain_error_event( + self.dst_chain().id(), + chain_error, + ))) + } else { + self.do_update_client_dst(src_chain_height, tracking_id, retries_left - 1) + } + } + (None, None, None) => { + // `tm` was empty and update wasn't required match Self::update_height( self.dst_chain(), self.dst_client_id().clone(), src_chain_height, ) { - Ok(height) => return Ok(height), - Err(_) => continue, + Ok(update_height) => Ok(update_height), + Err(_) if retries_left > 0 => { + self.do_update_client_dst(src_chain_height, tracking_id, retries_left - 1) + } + _ => Err(LinkError::update_client_failed()), } } - - info!( - "sending updateClient to client hosted on destination chain for height {} [try {}/{}]", - src_chain_height, - i + 1, - MAX_RETRIES, - ); - - let tm = TrackedMsgs::new(dst_update, tracking_id); - let dst_tx_events = self - .dst_chain() - .send_messages_and_wait_commit(tm) - .map_err(LinkError::relayer)?; - info!("result: {}", PrettyEvents(&dst_tx_events)); - - if dst_tx_events - .iter() - .any(|e| matches!(e, IbcEvent::ChainError(_))) - { - continue; - } else if let Some(IbcEvent::UpdateClient(event)) = dst_tx_events - .iter() - .find(|e| matches!(e, IbcEvent::UpdateClient(_))) - { - return Ok(event.height()); + (_, _, event) => { + if !matches!(event, Some(IbcEvent::ClientMisbehaviour(_))) && retries_left > 0 { + self.do_update_client_dst(src_chain_height, tracking_id, retries_left - 1) + } else { + Err(LinkError::update_client_failed()) + } } } - - Err(LinkError::update_client_failed()) } /// Handles updating the client on the source chain From 009d4f4ad947afa3b35778dc331e017ede4f6a02 Mon Sep 17 00:00:00 2001 From: Shoaib Ahmed Date: Wed, 16 Mar 2022 15:12:57 +0530 Subject: [PATCH 26/45] Fix integration test --- tools/integration-test/src/tests/connection_delay.rs | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/tools/integration-test/src/tests/connection_delay.rs b/tools/integration-test/src/tests/connection_delay.rs index 1a913499d6..91697338f7 100644 --- a/tools/integration-test/src/tests/connection_delay.rs +++ b/tools/integration-test/src/tests/connection_delay.rs @@ -1,9 +1,9 @@ use core::time::Duration; use time::OffsetDateTime; -use crate::ibc::denom::derive_ibc_denom; -use crate::prelude::*; -use crate::util::random::random_u64_range; +use ibc_test_framework::ibc::denom::derive_ibc_denom; +use ibc_test_framework::prelude::*; +use ibc_test_framework::util::random::random_u64_range; const CONNECTION_DELAY: Duration = Duration::from_secs(10); From 4df249a22f9754a343293df805d3804853da814d Mon Sep 17 00:00:00 2001 From: Shoaib Ahmed Date: Wed, 16 Mar 2022 16:09:33 +0530 Subject: [PATCH 27/45] Compare scheduled time against latest chain time --- relayer/src/link/relay_path.rs | 72 ++++++++++++++++------------------ 1 file changed, 34 insertions(+), 38 deletions(-) diff --git a/relayer/src/link/relay_path.rs b/relayer/src/link/relay_path.rs index 6811df338b..b306f29ce2 100644 --- a/relayer/src/link/relay_path.rs +++ b/relayer/src/link/relay_path.rs @@ -1,8 +1,8 @@ use alloc::collections::BTreeMap as HashMap; use alloc::collections::VecDeque; -use std::ops::{Add, Sub}; +use std::ops::Sub; use std::thread; -use std::time::{Duration, Instant}; +use std::time::Instant; use itertools::Itertools; use prost_types::Any; @@ -57,7 +57,6 @@ use crate::link::{pending, relay_sender}; use crate::util::queue::Queue; const MAX_RETRIES: usize = 5; -const AVG_BLOCK_TIME_SECS: u64 = 5; pub struct RelayPath { channel: Channel, @@ -1314,7 +1313,7 @@ impl RelayPath { /// Retains the operational data as pending, and associates it /// with one or more transaction hash(es). pub fn execute_schedule(&self) -> Result<(), LinkError> { - let (src_ods, dst_ods) = self.try_fetch_scheduled_operational_data(); + let (src_ods, dst_ods) = self.try_fetch_scheduled_operational_data()?; for od in dst_ods { let reply = @@ -1477,6 +1476,23 @@ impl RelayPath { Ok(()) } + // Returns an instant (in the past) that corresponds to the block timestamp of the chain at + // specified height. If the timestamp is in the future wrt the relayer's current time, we simply + // return the current timestamp. + fn chain_time_at_height( + chain: &impl ChainHandle, + height: Height, + ) -> Result { + let chain_time = chain + .query_host_consensus_state(height) + .map_err(LinkError::relayer)? + .timestamp(); + let duration = Timestamp::now() + .duration_since(&chain_time) + .unwrap_or_default(); + Ok(Instant::now().sub(duration)) + } + /// Adds a new operational data item for this relaying path to process later. /// If the relaying path has non-zero packet delays, this method also updates the client on the /// target chain with the appropriate headers. @@ -1492,32 +1508,21 @@ impl RelayPath { } // Update clients ahead of scheduling the operational data, if the delays are non-zero. + // If the connection-delay must be taken into account, set the `scheduled_time` to an + // instant in the past, i.e. when this client update was first processed (`processed_time`) od.scheduled_time = if self.conn_delay_needed(&od) { debug!("connection delay must be taken into account: updating client"); let target_height = od.proofs_height.increment(); - let update_time = match od.target { + match od.target { OperationalDataTarget::Source => { let update_height = self.update_client_src(target_height, &od.tracking_id)?; - self.src_chain() - .query_host_consensus_state(update_height) - .map_err(LinkError::relayer)? - .timestamp() + Self::chain_time_at_height(self.src_chain(), update_height)? } OperationalDataTarget::Destination => { let update_height = self.update_client_dst(target_height, &od.tracking_id)?; - self.dst_chain() - .query_host_consensus_state(update_height) - .map_err(LinkError::relayer)? - .timestamp() + Self::chain_time_at_height(self.dst_chain(), update_height)? } - }; - - // set the `scheduled_time` to an instant in the past, i.e. when this client update was - // first processed (`processed_time`) - let duration = Timestamp::now() - .duration_since(&update_time) - .unwrap_or_default(); - Instant::now().sub(duration) + } } else { debug!( "connection delay need not be taken into account: client update message will be \ @@ -1539,7 +1544,7 @@ impl RelayPath { /// scheduled), returns immediately with `vec![]`. fn try_fetch_scheduled_operational_data( &self, - ) -> (VecDeque, VecDeque) { + ) -> Result<(VecDeque, VecDeque), LinkError> { // Extracts elements from a Vec when the predicate returns true. // The mutable vector is then updated to the remaining unextracted elements. fn partition( @@ -1560,31 +1565,22 @@ impl RelayPath { (true_res, false_res) } - let connection_delay = { - match self.channel.connection_delay { - Duration::ZERO => self.channel.connection_delay, - _ => self - .channel - .connection_delay - .add(Duration::from_secs(AVG_BLOCK_TIME_SECS)), - } - }; + let connection_delay = self.channel.connection_delay; + let src_chain_time = Self::chain_time_at_height(self.src_chain(), Height::zero())?; + let dst_chain_time = Self::chain_time_at_height(self.dst_chain(), Height::zero())?; let (elapsed_src_ods, unelapsed_src_ods) = partition(self.src_operational_data.take(), |op| { - op.scheduled_time.elapsed() > connection_delay + op.scheduled_time.duration_since(src_chain_time) > connection_delay }); - - self.src_operational_data.replace(unelapsed_src_ods); - let (elapsed_dst_ods, unelapsed_dst_ods) = partition(self.dst_operational_data.take(), |op| { - op.scheduled_time.elapsed() > connection_delay + op.scheduled_time.duration_since(dst_chain_time) > connection_delay }); - + self.src_operational_data.replace(unelapsed_src_ods); self.dst_operational_data.replace(unelapsed_dst_ods); - (elapsed_src_ods, elapsed_dst_ods) + Ok((elapsed_src_ods, elapsed_dst_ods)) } /// Fetches an operational data that has fulfilled its predefined delay period. May _block_ From a42d8305e92d46278f6f6d73082d07c1e1398cac Mon Sep 17 00:00:00 2001 From: Shoaib Ahmed Date: Wed, 16 Mar 2022 16:41:15 +0530 Subject: [PATCH 28/45] Allow query_host_consensus_state to return latest state when Height is zero --- relayer/src/chain/cosmos.rs | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/relayer/src/chain/cosmos.rs b/relayer/src/chain/cosmos.rs index 63ccd76862..5edfad4591 100644 --- a/relayer/src/chain/cosmos.rs +++ b/relayer/src/chain/cosmos.rs @@ -1949,8 +1949,12 @@ impl ChainEndpoint for CosmosSdkChain { let height = Height::try_from(height.revision_height).map_err(Error::invalid_height)?; // TODO(hu55a1n1): use the `/header` RPC endpoint instead when we move to tendermint v0.35.x + let rpc_call = match height.value() { + 0 => self.rpc_client.latest_block(), + _ => self.rpc_client.block(height), + }; let response = self - .block_on(self.rpc_client.block(height)) + .block_on(rpc_call) .map_err(|e| Error::rpc(self.config.rpc_addr.clone(), e))?; Ok(response.block.header.into()) } From c98829c520d5bd6c1dd34d169418d05806bb5d7f Mon Sep 17 00:00:00 2001 From: Shoaib Ahmed Date: Wed, 16 Mar 2022 19:10:49 +0530 Subject: [PATCH 29/45] Fix conn delay elapsed calculation --- relayer/src/link/relay_path.rs | 27 +++++++++++++++++++-------- 1 file changed, 19 insertions(+), 8 deletions(-) diff --git a/relayer/src/link/relay_path.rs b/relayer/src/link/relay_path.rs index b306f29ce2..6ef1e1b0c3 100644 --- a/relayer/src/link/relay_path.rs +++ b/relayer/src/link/relay_path.rs @@ -709,9 +709,21 @@ impl RelayPath { self.has_delay() && op_data.has_packet_msgs() } + #[inline] + fn conn_delay_elapsed(&self, op: &OperationalData, chain_time: Instant) -> bool { + if self.conn_delay_needed(op) { + // since chain time instant is relative to relayer's current time, it is possible that + // `op.scheduled_time` is later (by nano secs) than `chain_time`, hence the call to + // `saturating_duration_since()`. + chain_time.saturating_duration_since(op.scheduled_time) > self.channel.connection_delay + } else { + true + } + } + // Returns the `processed_height` for the consensus state at specified height - fn update_height( - chain: &C, + fn update_height( + chain: &impl ChainHandle, client_id: ClientId, consensus_height: Height, ) -> Result { @@ -1565,21 +1577,20 @@ impl RelayPath { (true_res, false_res) } - let connection_delay = self.channel.connection_delay; let src_chain_time = Self::chain_time_at_height(self.src_chain(), Height::zero())?; - let dst_chain_time = Self::chain_time_at_height(self.dst_chain(), Height::zero())?; - let (elapsed_src_ods, unelapsed_src_ods) = partition(self.src_operational_data.take(), |op| { - op.scheduled_time.duration_since(src_chain_time) > connection_delay + self.conn_delay_elapsed(op, src_chain_time) }); + + let dst_chain_time = Self::chain_time_at_height(self.dst_chain(), Height::zero())?; let (elapsed_dst_ods, unelapsed_dst_ods) = partition(self.dst_operational_data.take(), |op| { - op.scheduled_time.duration_since(dst_chain_time) > connection_delay + self.conn_delay_elapsed(op, dst_chain_time) }); + self.src_operational_data.replace(unelapsed_src_ods); self.dst_operational_data.replace(unelapsed_dst_ods); - Ok((elapsed_src_ods, elapsed_dst_ods)) } From 68fa5d237a63ef0a40c9e81411e1453590d04ddc Mon Sep 17 00:00:00 2001 From: Shoaib Ahmed Date: Wed, 16 Mar 2022 19:28:14 +0530 Subject: [PATCH 30/45] Cleanup --- relayer/src/link/relay_path.rs | 71 +++++++++++++++++++++++----------- 1 file changed, 48 insertions(+), 23 deletions(-) diff --git a/relayer/src/link/relay_path.rs b/relayer/src/link/relay_path.rs index 6ef1e1b0c3..d8746a6058 100644 --- a/relayer/src/link/relay_path.rs +++ b/relayer/src/link/relay_path.rs @@ -202,6 +202,26 @@ impl RelayPath { .map_err(|e| LinkError::query(self.dst_chain().id(), e)) } + #[inline] + fn src_time_at_height(&self, height: Height) -> Result { + Self::chain_time_at_height(self.src_chain(), height) + } + + #[inline] + fn dst_time_at_height(&self, height: Height) -> Result { + Self::chain_time_at_height(self.dst_chain(), height) + } + + #[inline] + fn src_time_latest(&self) -> Result { + self.src_time_at_height(Height::zero()) + } + + #[inline] + fn dst_time_latest(&self) -> Result { + self.dst_time_at_height(Height::zero()) + } + fn unordered_channel(&self) -> bool { self.channel.ordering == Order::Unordered } @@ -699,12 +719,14 @@ impl RelayPath { /// Returns `true` if the connection delay for this relaying path is non-zero. /// Conversely, returns `false` if the delay is zero. + #[inline] fn has_delay(&self) -> bool { self.channel.connection_delay != ZERO_DURATION } /// Returns `true` iff the connection delay for this relaying path is non-zero and `op_data` /// contains packet messages. + #[inline] pub fn conn_delay_needed(&self, op_data: &OperationalData) -> bool { self.has_delay() && op_data.has_packet_msgs() } @@ -748,6 +770,10 @@ impl RelayPath { } #[inline] + // Loops over `tx_events` and returns a tuple of optional events where the first element is a + // `ChainError` variant, the second one is an `UpdateClient` variant and the third one is any + // other variant (usually `ClientMisbehaviour`). This function is essentially just an + // `Iterator::find()` for multiple variants with a single pass. fn event_per_type( mut tx_events: Vec, ) -> ( @@ -770,6 +796,23 @@ impl RelayPath { (error, update, other) } + // Returns an instant (in the past) that corresponds to the block timestamp of the chain at + // specified height (relative to the relayer's current time). If the timestamp is in the future + // wrt the relayer's current time, we simply return the current relayer time. + fn chain_time_at_height( + chain: &impl ChainHandle, + height: Height, + ) -> Result { + let chain_time = chain + .query_host_consensus_state(height) + .map_err(LinkError::relayer)? + .timestamp(); + let duration = Timestamp::now() + .duration_since(&chain_time) + .unwrap_or_default(); + Ok(Instant::now().sub(duration)) + } + /// Handles updating the client on the destination chain /// Returns the height at which the client update was processed fn update_client_dst( @@ -1488,23 +1531,6 @@ impl RelayPath { Ok(()) } - // Returns an instant (in the past) that corresponds to the block timestamp of the chain at - // specified height. If the timestamp is in the future wrt the relayer's current time, we simply - // return the current timestamp. - fn chain_time_at_height( - chain: &impl ChainHandle, - height: Height, - ) -> Result { - let chain_time = chain - .query_host_consensus_state(height) - .map_err(LinkError::relayer)? - .timestamp(); - let duration = Timestamp::now() - .duration_since(&chain_time) - .unwrap_or_default(); - Ok(Instant::now().sub(duration)) - } - /// Adds a new operational data item for this relaying path to process later. /// If the relaying path has non-zero packet delays, this method also updates the client on the /// target chain with the appropriate headers. @@ -1528,11 +1554,11 @@ impl RelayPath { match od.target { OperationalDataTarget::Source => { let update_height = self.update_client_src(target_height, &od.tracking_id)?; - Self::chain_time_at_height(self.src_chain(), update_height)? + self.src_time_at_height(update_height)? } OperationalDataTarget::Destination => { let update_height = self.update_client_dst(target_height, &od.tracking_id)?; - Self::chain_time_at_height(self.dst_chain(), update_height)? + self.dst_time_at_height(update_height)? } } } else { @@ -1552,8 +1578,7 @@ impl RelayPath { } /// Pulls out the operational elements with elapsed delay period and that can - /// now be processed. Does not block: if no OD fulfilled the delay period (or none is - /// scheduled), returns immediately with `vec![]`. + /// now be processed. fn try_fetch_scheduled_operational_data( &self, ) -> Result<(VecDeque, VecDeque), LinkError> { @@ -1577,13 +1602,13 @@ impl RelayPath { (true_res, false_res) } - let src_chain_time = Self::chain_time_at_height(self.src_chain(), Height::zero())?; + let src_chain_time = self.src_time_latest()?; let (elapsed_src_ods, unelapsed_src_ods) = partition(self.src_operational_data.take(), |op| { self.conn_delay_elapsed(op, src_chain_time) }); - let dst_chain_time = Self::chain_time_at_height(self.dst_chain(), Height::zero())?; + let dst_chain_time = self.dst_time_latest()?; let (elapsed_dst_ods, unelapsed_dst_ods) = partition(self.dst_operational_data.take(), |op| { self.conn_delay_elapsed(op, dst_chain_time) From f3c4a9382cc9cb5b9e5e27abe1849233ea36e703 Mon Sep 17 00:00:00 2001 From: Shoaib Ahmed Date: Wed, 16 Mar 2022 23:47:19 +0530 Subject: [PATCH 31/45] Check for block delay --- modules/src/core/ics04_channel/context.rs | 20 +++++--- relayer/src/link/operational_data.rs | 2 + relayer/src/link/relay_path.rs | 61 +++++++++++++++++++++-- 3 files changed, 73 insertions(+), 10 deletions(-) diff --git a/modules/src/core/ics04_channel/context.rs b/modules/src/core/ics04_channel/context.rs index 165cf688ad..756eee922c 100644 --- a/modules/src/core/ics04_channel/context.rs +++ b/modules/src/core/ics04_channel/context.rs @@ -99,13 +99,7 @@ pub trait ChannelReader { fn max_expected_time_per_block(&self) -> Duration; fn block_delay(&self, delay_period_time: Duration) -> u64 { - let expected_time_per_block = self.max_expected_time_per_block(); - if expected_time_per_block.is_zero() { - return 0; - } - - FloatCore::ceil(delay_period_time.as_secs_f64() / expected_time_per_block.as_secs_f64()) - as u64 + calculate_block_delay(delay_period_time, self.max_expected_time_per_block()) } } @@ -279,3 +273,15 @@ pub trait ChannelKeeper { /// Should never fail. fn increase_channel_counter(&mut self); } + +pub fn calculate_block_delay( + delay_period_time: Duration, + max_expected_time_per_block: Duration, +) -> u64 { + let expected_time_per_block = max_expected_time_per_block; + if expected_time_per_block.is_zero() { + return 0; + } + + FloatCore::ceil(delay_period_time.as_secs_f64() / expected_time_per_block.as_secs_f64()) as u64 +} diff --git a/relayer/src/link/operational_data.rs b/relayer/src/link/operational_data.rs index 9037eecc62..a8f17c168b 100644 --- a/relayer/src/link/operational_data.rs +++ b/relayer/src/link/operational_data.rs @@ -88,6 +88,7 @@ pub struct TransitMessage { #[derive(Clone)] pub struct OperationalData { pub proofs_height: Height, + pub update_processed_height: Option, pub batch: Vec, pub target: OperationalDataTarget, /// Stores the time when the clients on the target chain has been updated, i.e., when this data @@ -104,6 +105,7 @@ impl OperationalData { ) -> Self { OperationalData { proofs_height, + update_processed_height: None, batch: vec![], target, scheduled_time: Instant::now(), diff --git a/relayer/src/link/relay_path.rs b/relayer/src/link/relay_path.rs index d8746a6058..bd5c28b601 100644 --- a/relayer/src/link/relay_path.rs +++ b/relayer/src/link/relay_path.rs @@ -15,6 +15,7 @@ use ibc::{ }, ics04_channel::{ channel::{ChannelEnd, Order, QueryPacketEventDataRequest, State as ChannelState}, + context::calculate_block_delay, events::{SendPacket, WriteAcknowledgement}, msgs::{ acknowledgement::MsgAcknowledgement, chan_close_confirm::MsgChannelCloseConfirm, @@ -196,6 +197,12 @@ impl RelayPath { .map_err(|e| LinkError::signer(self.dst_chain().id(), e)) } + pub fn src_latest_height(&self) -> Result { + self.src_chain() + .query_latest_height() + .map_err(|e| LinkError::query(self.src_chain().id(), e)) + } + pub fn dst_latest_height(&self) -> Result { self.dst_chain() .query_latest_height() @@ -222,6 +229,16 @@ impl RelayPath { self.dst_time_at_height(Height::zero()) } + #[inline] + fn src_conn_block_delay(&self) -> Result { + self.conn_block_delay(self.src_chain()) + } + + #[inline] + fn dst_conn_block_delay(&self) -> Result { + self.conn_block_delay(self.dst_chain()) + } + fn unordered_channel(&self) -> bool { self.channel.ordering == Order::Unordered } @@ -732,7 +749,7 @@ impl RelayPath { } #[inline] - fn conn_delay_elapsed(&self, op: &OperationalData, chain_time: Instant) -> bool { + fn conn_time_delay_elapsed(&self, op: &OperationalData, chain_time: Instant) -> bool { if self.conn_delay_needed(op) { // since chain time instant is relative to relayer's current time, it is possible that // `op.scheduled_time` is later (by nano secs) than `chain_time`, hence the call to @@ -743,6 +760,35 @@ impl RelayPath { } } + #[inline] + fn conn_block_delay_elapsed( + &self, + op: &OperationalData, + block_delay: u64, + latest_height: Height, + ) -> bool { + if self.conn_delay_needed(op) { + latest_height + >= op + .update_processed_height + .expect("processed height not set") + .add(block_delay) + } else { + true + } + } + + fn conn_block_delay(&self, chain: &impl ChainHandle) -> Result { + // TODO(hu55a1n1): Ideally, we should get the `max_expected_time_per_block` using the + // `/genesis` endpoint once it is working in tendermint-rs. + let max_expected_time_per_block = + chain.config().map_err(LinkError::relayer)?.max_block_time; + Ok(calculate_block_delay( + self.channel.connection_delay, + max_expected_time_per_block, + )) + } + // Returns the `processed_height` for the consensus state at specified height fn update_height( chain: &impl ChainHandle, @@ -1554,10 +1600,12 @@ impl RelayPath { match od.target { OperationalDataTarget::Source => { let update_height = self.update_client_src(target_height, &od.tracking_id)?; + od.update_processed_height = Some(update_height); self.src_time_at_height(update_height)? } OperationalDataTarget::Destination => { let update_height = self.update_client_dst(target_height, &od.tracking_id)?; + od.update_processed_height = Some(update_height); self.dst_time_at_height(update_height)? } } @@ -1603,15 +1651,21 @@ impl RelayPath { } let src_chain_time = self.src_time_latest()?; + let src_block_delay = self.src_conn_block_delay()?; + let src_latest_height = self.src_latest_height()?; let (elapsed_src_ods, unelapsed_src_ods) = partition(self.src_operational_data.take(), |op| { - self.conn_delay_elapsed(op, src_chain_time) + self.conn_time_delay_elapsed(op, src_chain_time) + && self.conn_block_delay_elapsed(op, src_block_delay, src_latest_height) }); let dst_chain_time = self.dst_time_latest()?; + let dst_block_delay = self.dst_conn_block_delay()?; + let dst_latest_height = self.dst_latest_height()?; let (elapsed_dst_ods, unelapsed_dst_ods) = partition(self.dst_operational_data.take(), |op| { - self.conn_delay_elapsed(op, dst_chain_time) + self.conn_time_delay_elapsed(op, dst_chain_time) + && self.conn_block_delay_elapsed(op, dst_block_delay, dst_latest_height) }); self.src_operational_data.replace(unelapsed_src_ods); @@ -1629,6 +1683,7 @@ impl RelayPath { .or_else(|| self.dst_operational_data.pop_front()); if let Some(odata) = odata { + // TODO // Check if the delay period did not completely elapse let delay_left = self .channel From 111fb68fb6548d5ebb907d050436dcc83b81f5e9 Mon Sep 17 00:00:00 2001 From: Shoaib Ahmed Date: Wed, 16 Mar 2022 23:53:27 +0530 Subject: [PATCH 32/45] Add config comment for max_expected_time_per_block --- config.toml | 1 + 1 file changed, 1 insertion(+) diff --git a/config.toml b/config.toml index 95f121a0e3..8cd254ef7c 100644 --- a/config.toml +++ b/config.toml @@ -177,6 +177,7 @@ clock_drift = '5s' # The block time together with the clock drift are added to the source drift to estimate # the maximum clock drift when creating a client on this chain. Default: 10s # For cosmos-SDK chains a good approximation is `timeout_propose` + `timeout_commit` +# Note: This MUST be the same as the `max_expected_time_per_block` genesis parameter for Tendermint chains. max_block_time = '10s' # Specify the amount of time to be used as the light client trusting period. From 8526a1fd26cb6aeedcabfe9c4eb158b61b8fec38 Mon Sep 17 00:00:00 2001 From: Shoaib Ahmed Date: Thu, 17 Mar 2022 00:44:09 +0530 Subject: [PATCH 33/45] Wait for conn delay --- relayer/src/link.rs | 4 +- relayer/src/link/relay_path.rs | 102 ++++++++++++++++++++++----------- 2 files changed, 70 insertions(+), 36 deletions(-) diff --git a/relayer/src/link.rs b/relayer/src/link.rs index ca39bc605f..e4ef1b4e4c 100644 --- a/relayer/src/link.rs +++ b/relayer/src/link.rs @@ -211,7 +211,7 @@ impl Link { let mut results = vec![]; // Block waiting for all of the scheduled data (until `None` is returned) - while let Some(odata) = self.a_to_b.fetch_scheduled_operational_data() { + while let Some(odata) = self.a_to_b.fetch_scheduled_operational_data()? { let mut last_res = self .a_to_b .relay_from_operational_data::(odata)?; @@ -237,7 +237,7 @@ impl Link { let mut results = vec![]; // Block waiting for all of the scheduled data - while let Some(odata) = self.a_to_b.fetch_scheduled_operational_data() { + while let Some(odata) = self.a_to_b.fetch_scheduled_operational_data()? { let mut last_res = self .a_to_b .relay_from_operational_data::(odata)?; diff --git a/relayer/src/link/relay_path.rs b/relayer/src/link/relay_path.rs index bd5c28b601..a96baf5abc 100644 --- a/relayer/src/link/relay_path.rs +++ b/relayer/src/link/relay_path.rs @@ -2,7 +2,7 @@ use alloc::collections::BTreeMap as HashMap; use alloc::collections::VecDeque; use std::ops::Sub; use std::thread; -use std::time::Instant; +use std::time::{Duration, Instant}; use itertools::Itertools; use prost_types::Any; @@ -749,14 +749,23 @@ impl RelayPath { } #[inline] - fn conn_time_delay_elapsed(&self, op: &OperationalData, chain_time: Instant) -> bool { + fn conn_time_delay_elapsed( + &self, + op: &OperationalData, + chain_time: Instant, + ) -> Result<(), Duration> { if self.conn_delay_needed(op) { // since chain time instant is relative to relayer's current time, it is possible that // `op.scheduled_time` is later (by nano secs) than `chain_time`, hence the call to // `saturating_duration_since()`. - chain_time.saturating_duration_since(op.scheduled_time) > self.channel.connection_delay + let elapsed = chain_time.saturating_duration_since(op.scheduled_time); + if elapsed > self.channel.connection_delay { + Ok(()) + } else { + Err(elapsed) + } } else { - true + Ok(()) } } @@ -766,15 +775,20 @@ impl RelayPath { op: &OperationalData, block_delay: u64, latest_height: Height, - ) -> bool { + ) -> Result<(), u64> { if self.conn_delay_needed(op) { - latest_height - >= op - .update_processed_height - .expect("processed height not set") - .add(block_delay) + let acceptable_height = op + .update_processed_height + .expect("processed height not set") + .add(block_delay); + if latest_height >= acceptable_height { + Ok(()) + } else { + debug_assert!(acceptable_height.revision_number == latest_height.revision_number); + Err(acceptable_height.revision_height - latest_height.revision_height) + } } else { - true + Ok(()) } } @@ -1655,8 +1669,10 @@ impl RelayPath { let src_latest_height = self.src_latest_height()?; let (elapsed_src_ods, unelapsed_src_ods) = partition(self.src_operational_data.take(), |op| { - self.conn_time_delay_elapsed(op, src_chain_time) - && self.conn_block_delay_elapsed(op, src_block_delay, src_latest_height) + self.conn_time_delay_elapsed(op, src_chain_time).is_ok() + && self + .conn_block_delay_elapsed(op, src_block_delay, src_latest_height) + .is_ok() }); let dst_chain_time = self.dst_time_latest()?; @@ -1664,8 +1680,10 @@ impl RelayPath { let dst_latest_height = self.dst_latest_height()?; let (elapsed_dst_ods, unelapsed_dst_ods) = partition(self.dst_operational_data.take(), |op| { - self.conn_time_delay_elapsed(op, dst_chain_time) - && self.conn_block_delay_elapsed(op, dst_block_delay, dst_latest_height) + self.conn_time_delay_elapsed(op, dst_chain_time).is_ok() + && self + .conn_block_delay_elapsed(op, dst_block_delay, dst_latest_height) + .is_ok() }); self.src_operational_data.replace(unelapsed_src_ods); @@ -1676,42 +1694,58 @@ impl RelayPath { /// Fetches an operational data that has fulfilled its predefined delay period. May _block_ /// waiting for the delay period to pass. /// Returns `None` if there is no operational data scheduled. - pub(crate) fn fetch_scheduled_operational_data(&self) -> Option { - let odata = self - .src_operational_data - .pop_front() - .or_else(|| self.dst_operational_data.pop_front()); + pub(crate) fn fetch_scheduled_operational_data( + &self, + ) -> Result, LinkError> { + let (delay_time_left, delay_blocks_left, odata) = + if let Some(odata) = self.src_operational_data.pop_front() { + let src_chain_time = self.src_time_latest()?; + let src_block_delay = self.src_conn_block_delay()?; + let src_latest_height = self.src_latest_height()?; + ( + self.conn_time_delay_elapsed(&odata, src_chain_time).err(), + self.conn_block_delay_elapsed(&odata, src_block_delay, src_latest_height) + .err(), + Some(odata), + ) + } else if let Some(odata) = self.dst_operational_data.pop_front() { + let dst_chain_time = self.dst_time_latest()?; + let dst_block_delay = self.dst_conn_block_delay()?; + let dst_latest_height = self.dst_latest_height()?; + ( + self.conn_time_delay_elapsed(&odata, dst_chain_time).err(), + self.conn_block_delay_elapsed(&odata, dst_block_delay, dst_latest_height) + .err(), + Some(odata), + ) + } else { + (None, None, None) + }; if let Some(odata) = odata { - // TODO - // Check if the delay period did not completely elapse - let delay_left = self - .channel - .connection_delay - .checked_sub(odata.scheduled_time.elapsed()); - - match delay_left { - None => info!( + match (delay_time_left, delay_blocks_left) { + (None, None) => info!( "ready to fetch a scheduled op. data with batch of size {} targeting {}", odata.batch.len(), odata.target, ), - Some(delay_left) => { + (Some(delay_time), _) => { info!( "waiting ({:?} left) for a scheduled op. data with batch of size {} targeting {}", - delay_left, + delay_time, odata.batch.len(), odata.target, ); // Wait until the delay period passes - thread::sleep(delay_left); + thread::sleep(delay_time); } + (None, Some(_blocks)) => {} } - Some(odata) + Ok(Some(odata)) } else { - None + Ok(None) } } From 52684a33e6c4202255f9e38e4357d015cc12ac1c Mon Sep 17 00:00:00 2001 From: Shoaib Ahmed Date: Thu, 17 Mar 2022 14:30:16 +0530 Subject: [PATCH 34/45] More comments --- relayer/src/link/relay_path.rs | 19 +++++++++++-------- 1 file changed, 11 insertions(+), 8 deletions(-) diff --git a/relayer/src/link/relay_path.rs b/relayer/src/link/relay_path.rs index a96baf5abc..c05028de20 100644 --- a/relayer/src/link/relay_path.rs +++ b/relayer/src/link/relay_path.rs @@ -748,6 +748,7 @@ impl RelayPath { self.has_delay() && op_data.has_packet_msgs() } + /// Returns `Ok(())` if the connection-delay has elapsed and `Err(remaining-delay)` otherwise. #[inline] fn conn_time_delay_elapsed( &self, @@ -769,6 +770,7 @@ impl RelayPath { } } + /// Returns `Ok(())` if the connection-delay has elapsed and `Err(remaining-delay)` otherwise. #[inline] fn conn_block_delay_elapsed( &self, @@ -792,6 +794,7 @@ impl RelayPath { } } + /// Calculates and returns the block-delay based on the `max_expected_time_per_block` fn conn_block_delay(&self, chain: &impl ChainHandle) -> Result { // TODO(hu55a1n1): Ideally, we should get the `max_expected_time_per_block` using the // `/genesis` endpoint once it is working in tendermint-rs. @@ -803,7 +806,7 @@ impl RelayPath { )) } - // Returns the `processed_height` for the consensus state at specified height + /// Returns the `processed_height` for the consensus state at specified height fn update_height( chain: &impl ChainHandle, client_id: ClientId, @@ -829,11 +832,11 @@ impl RelayPath { } } + /// Loops over `tx_events` and returns a tuple of optional events where the first element is a + /// `ChainError` variant, the second one is an `UpdateClient` variant and the third one is any + /// other variant (usually `ClientMisbehaviour`). This function is essentially just an + /// `Iterator::find()` for multiple variants with a single pass. #[inline] - // Loops over `tx_events` and returns a tuple of optional events where the first element is a - // `ChainError` variant, the second one is an `UpdateClient` variant and the third one is any - // other variant (usually `ClientMisbehaviour`). This function is essentially just an - // `Iterator::find()` for multiple variants with a single pass. fn event_per_type( mut tx_events: Vec, ) -> ( @@ -856,9 +859,9 @@ impl RelayPath { (error, update, other) } - // Returns an instant (in the past) that corresponds to the block timestamp of the chain at - // specified height (relative to the relayer's current time). If the timestamp is in the future - // wrt the relayer's current time, we simply return the current relayer time. + /// Returns an instant (in the past) that corresponds to the block timestamp of the chain at + /// specified height (relative to the relayer's current time). If the timestamp is in the future + /// wrt the relayer's current time, we simply return the current relayer time. fn chain_time_at_height( chain: &impl ChainHandle, height: Height, From 287e3a9feffb083378f585cfa181354a4790ff50 Mon Sep 17 00:00:00 2001 From: Shoaib Ahmed Date: Fri, 18 Mar 2022 22:30:53 +0530 Subject: [PATCH 35/45] Extract all connection-delay logic from RelayPath --- relayer/src/link/operational_data.rs | 125 ++++++++++++++++++++-- relayer/src/link/relay_path.rs | 151 ++++++++------------------- 2 files changed, 158 insertions(+), 118 deletions(-) diff --git a/relayer/src/link/operational_data.rs b/relayer/src/link/operational_data.rs index a8f17c168b..49bab9a2fe 100644 --- a/relayer/src/link/operational_data.rs +++ b/relayer/src/link/operational_data.rs @@ -1,13 +1,14 @@ use alloc::borrow::Cow; use core::fmt; use core::iter; -use std::time::Instant; +use std::time::{Duration, Instant}; use nanoid::nanoid; use prost_types::Any; use tracing::{debug, info}; use ibc::core::ics02_client::client_state::ClientState; +use ibc::core::ics04_channel::context::calculate_block_delay; use ibc::events::IbcEvent; use ibc::Height; @@ -88,13 +89,11 @@ pub struct TransitMessage { #[derive(Clone)] pub struct OperationalData { pub proofs_height: Height, - pub update_processed_height: Option, pub batch: Vec, pub target: OperationalDataTarget, - /// Stores the time when the clients on the target chain has been updated, i.e., when this data - /// was scheduled. Necessary for packet delays. - pub scheduled_time: Instant, pub tracking_id: String, + // Stores `Some(non-zero connection-delay)` or `None` + connection_delay: Option, } impl OperationalData { @@ -102,13 +101,18 @@ impl OperationalData { proofs_height: Height, target: OperationalDataTarget, tracking_id: impl Into, + connection_delay: Duration, ) -> Self { + let connection_delay = if !connection_delay.is_zero() { + Some(ConnectionDelay::new(connection_delay)) + } else { + None + }; OperationalData { proofs_height, - update_processed_height: None, batch: vec![], target, - scheduled_time: Instant::now(), + connection_delay, tracking_id: tracking_id.into(), } } @@ -144,7 +148,7 @@ impl OperationalData { relay_path: &RelayPath, ) -> Result { // For zero delay we prepend the client update msgs. - let client_update_msg = if !relay_path.conn_delay_needed(self) { + let client_update_msg = if !self.conn_delay_needed() { let update_height = self.proofs_height.increment(); debug!( @@ -197,9 +201,112 @@ impl OperationalData { Ok(tm) } - pub fn has_packet_msgs(&self) -> bool { + /// Returns true iff the batch contains a packet event + fn has_packet_msgs(&self) -> bool { self.batch.iter().any(|msg| msg.event.packet().is_some()) } + + /// Returns the `connection_delay` iff the connection delay for this relaying path is non-zero + /// and the `batch` contains packet messages. + fn get_delay_if_needed(&self) -> Option<&ConnectionDelay> { + self.connection_delay + .as_ref() + .filter(|_| self.has_packet_msgs()) + } + + /// Returns `true` iff the connection delay for this relaying path is non-zero and `op_data` + /// contains packet messages. + pub fn conn_delay_needed(&self) -> bool { + self.get_delay_if_needed().is_some() + } + + /// Sets the scheduled time that is used for connection-delay calculations + pub fn set_scheduled_time(&mut self, scheduled_time: Instant) { + if let Some(mut delay) = self.connection_delay.as_mut() { + delay.scheduled_time = scheduled_time; + } + } + + /// Sets the update height that is used for connection-delay calculations + pub fn set_update_height(&mut self, update_height: Height) { + if let Some(mut delay) = self.connection_delay.as_mut() { + delay.update_height = Some(update_height); + } + } + + /// Returns `Ok(())` if the connection-delay has elapsed and `Err(remaining-delay)` otherwise. + pub fn conn_time_delay_elapsed(&self, chain_time: Instant) -> Result<(), Duration> { + if let Some(delay) = self.get_delay_if_needed() { + delay.conn_time_delay_elapsed(chain_time) + } else { + Ok(()) + } + } + + /// Returns `Ok(())` if the connection-delay has elapsed and `Err(remaining-delay)` otherwise. + pub fn conn_block_delay_elapsed( + &self, + max_expected_time_per_block: Duration, + latest_height: Height, + ) -> Result<(), u64> { + if let Some(delay) = self.get_delay_if_needed() { + let block_delay = delay.conn_block_delay(max_expected_time_per_block); + delay.conn_block_delay_elapsed(block_delay, latest_height) + } else { + Ok(()) + } + } +} + +/// A struct that holds everything that is required to calculate and deal with the connection-delay +/// feature. +#[derive(Clone)] +struct ConnectionDelay { + delay: Duration, + scheduled_time: Instant, + update_height: Option, +} + +impl ConnectionDelay { + fn new(delay: Duration) -> Self { + Self { + delay, + scheduled_time: Instant::now(), + update_height: None, + } + } + + /// Returns `Ok(())` if the connection-delay has elapsed and `Err(remaining-delay)` otherwise. + fn conn_time_delay_elapsed(&self, chain_time: Instant) -> Result<(), Duration> { + // since chain time instant is relative to relayer's current time, it is possible that + // `scheduled_time` is later (by nano secs) than `chain_time`, hence the call to + // `saturating_duration_since()`. + let elapsed = chain_time.saturating_duration_since(self.scheduled_time); + if elapsed > self.delay { + Ok(()) + } else { + Err(elapsed) + } + } + + /// Returns `Ok(())` if the connection-delay has elapsed and `Err(remaining-delay)` otherwise. + fn conn_block_delay_elapsed(&self, block_delay: u64, latest_height: Height) -> Result<(), u64> { + let acceptable_height = self + .update_height + .expect("processed height not set") + .add(block_delay); + if latest_height >= acceptable_height { + Ok(()) + } else { + debug_assert!(acceptable_height.revision_number == latest_height.revision_number); + Err(acceptable_height.revision_height - latest_height.revision_height) + } + } + + /// Calculates and returns the block-delay based on the `max_expected_time_per_block` + fn conn_block_delay(&self, max_expected_time_per_block: Duration) -> u64 { + calculate_block_delay(self.delay, max_expected_time_per_block) + } } /// A lightweight informational data structure that can be extracted diff --git a/relayer/src/link/relay_path.rs b/relayer/src/link/relay_path.rs index c05028de20..5594afeba1 100644 --- a/relayer/src/link/relay_path.rs +++ b/relayer/src/link/relay_path.rs @@ -15,7 +15,6 @@ use ibc::{ }, ics04_channel::{ channel::{ChannelEnd, Order, QueryPacketEventDataRequest, State as ChannelState}, - context::calculate_block_delay, events::{SendPacket, WriteAcknowledgement}, msgs::{ acknowledgement::MsgAcknowledgement, chan_close_confirm::MsgChannelCloseConfirm, @@ -29,7 +28,7 @@ use ibc::{ events::{IbcEvent, PrettyEvents, WithBlockDataType}, query::{QueryBlockRequest, QueryTxRequest}, signer::Signer, - timestamp::{Timestamp, ZERO_DURATION}, + timestamp::Timestamp, tx_msg::Msg, Height, }; @@ -209,34 +208,38 @@ impl RelayPath { .map_err(|e| LinkError::query(self.dst_chain().id(), e)) } - #[inline] fn src_time_at_height(&self, height: Height) -> Result { Self::chain_time_at_height(self.src_chain(), height) } - #[inline] fn dst_time_at_height(&self, height: Height) -> Result { Self::chain_time_at_height(self.dst_chain(), height) } - #[inline] fn src_time_latest(&self) -> Result { self.src_time_at_height(Height::zero()) } - #[inline] fn dst_time_latest(&self) -> Result { self.dst_time_at_height(Height::zero()) } - #[inline] - fn src_conn_block_delay(&self) -> Result { - self.conn_block_delay(self.src_chain()) + fn src_max_block_time(&self) -> Result { + // TODO(hu55a1n1): Ideally, we should get the `max_expected_time_per_block` using the + // `/genesis` endpoint once it is working in tendermint-rs. + Ok(self + .src_chain() + .config() + .map_err(LinkError::relayer)? + .max_block_time) } - #[inline] - fn dst_conn_block_delay(&self) -> Result { - self.conn_block_delay(self.dst_chain()) + fn dst_max_block_time(&self) -> Result { + Ok(self + .dst_chain() + .config() + .map_err(LinkError::relayer)? + .max_block_time) } fn unordered_channel(&self) -> bool { @@ -413,12 +416,14 @@ impl RelayPath { dst_latest_height, OperationalDataTarget::Source, events.tracking_id(), + self.channel.connection_delay, ); // Operational data targeting the destination chain (e.g., SendPacket messages) let mut dst_od = OperationalData::new( src_height, OperationalDataTarget::Destination, events.tracking_id(), + self.channel.connection_delay, ); for event in input { @@ -520,12 +525,7 @@ impl RelayPath { let mut odata = initial_od; for i in 0..MAX_RETRIES { - debug!( - "delayed by: {:?} [try {}/{}]", - odata.scheduled_time.elapsed(), - i + 1, - MAX_RETRIES - ); + debug!("[try {}/{}]", i + 1, MAX_RETRIES); // Consume the operational data by attempting to send its messages match self.send_from_operational_data::(odata.clone()) { @@ -734,78 +734,6 @@ impl RelayPath { self.recv_packet_acknowledged_on_src(&rp.packet) } - /// Returns `true` if the connection delay for this relaying path is non-zero. - /// Conversely, returns `false` if the delay is zero. - #[inline] - fn has_delay(&self) -> bool { - self.channel.connection_delay != ZERO_DURATION - } - - /// Returns `true` iff the connection delay for this relaying path is non-zero and `op_data` - /// contains packet messages. - #[inline] - pub fn conn_delay_needed(&self, op_data: &OperationalData) -> bool { - self.has_delay() && op_data.has_packet_msgs() - } - - /// Returns `Ok(())` if the connection-delay has elapsed and `Err(remaining-delay)` otherwise. - #[inline] - fn conn_time_delay_elapsed( - &self, - op: &OperationalData, - chain_time: Instant, - ) -> Result<(), Duration> { - if self.conn_delay_needed(op) { - // since chain time instant is relative to relayer's current time, it is possible that - // `op.scheduled_time` is later (by nano secs) than `chain_time`, hence the call to - // `saturating_duration_since()`. - let elapsed = chain_time.saturating_duration_since(op.scheduled_time); - if elapsed > self.channel.connection_delay { - Ok(()) - } else { - Err(elapsed) - } - } else { - Ok(()) - } - } - - /// Returns `Ok(())` if the connection-delay has elapsed and `Err(remaining-delay)` otherwise. - #[inline] - fn conn_block_delay_elapsed( - &self, - op: &OperationalData, - block_delay: u64, - latest_height: Height, - ) -> Result<(), u64> { - if self.conn_delay_needed(op) { - let acceptable_height = op - .update_processed_height - .expect("processed height not set") - .add(block_delay); - if latest_height >= acceptable_height { - Ok(()) - } else { - debug_assert!(acceptable_height.revision_number == latest_height.revision_number); - Err(acceptable_height.revision_height - latest_height.revision_height) - } - } else { - Ok(()) - } - } - - /// Calculates and returns the block-delay based on the `max_expected_time_per_block` - fn conn_block_delay(&self, chain: &impl ChainHandle) -> Result { - // TODO(hu55a1n1): Ideally, we should get the `max_expected_time_per_block` using the - // `/genesis` endpoint once it is working in tendermint-rs. - let max_expected_time_per_block = - chain.config().map_err(LinkError::relayer)?.max_block_time; - Ok(calculate_block_delay( - self.channel.connection_delay, - max_expected_time_per_block, - )) - } - /// Returns the `processed_height` for the consensus state at specified height fn update_height( chain: &impl ChainHandle, @@ -1542,6 +1470,7 @@ impl RelayPath { dst_current_height, OperationalDataTarget::Source, &odata.tracking_id, + self.channel.connection_delay, ) }) .push(TransitMessage { @@ -1611,18 +1540,18 @@ impl RelayPath { // Update clients ahead of scheduling the operational data, if the delays are non-zero. // If the connection-delay must be taken into account, set the `scheduled_time` to an // instant in the past, i.e. when this client update was first processed (`processed_time`) - od.scheduled_time = if self.conn_delay_needed(&od) { + let scheduled_time = if od.conn_delay_needed() { debug!("connection delay must be taken into account: updating client"); let target_height = od.proofs_height.increment(); match od.target { OperationalDataTarget::Source => { let update_height = self.update_client_src(target_height, &od.tracking_id)?; - od.update_processed_height = Some(update_height); + od.set_update_height(update_height); self.src_time_at_height(update_height)? } OperationalDataTarget::Destination => { let update_height = self.update_client_dst(target_height, &od.tracking_id)?; - od.update_processed_height = Some(update_height); + od.set_update_height(update_height); self.dst_time_at_height(update_height)? } } @@ -1634,6 +1563,8 @@ impl RelayPath { Instant::now() }; + od.set_scheduled_time(scheduled_time); + match od.target { OperationalDataTarget::Source => self.src_operational_data.push_back(od), OperationalDataTarget::Destination => self.dst_operational_data.push_back(od), @@ -1668,24 +1599,24 @@ impl RelayPath { } let src_chain_time = self.src_time_latest()?; - let src_block_delay = self.src_conn_block_delay()?; + let src_max_block_time = self.src_max_block_time()?; let src_latest_height = self.src_latest_height()?; let (elapsed_src_ods, unelapsed_src_ods) = partition(self.src_operational_data.take(), |op| { - self.conn_time_delay_elapsed(op, src_chain_time).is_ok() - && self - .conn_block_delay_elapsed(op, src_block_delay, src_latest_height) + op.conn_time_delay_elapsed(src_chain_time).is_ok() + && op + .conn_block_delay_elapsed(src_max_block_time, src_latest_height) .is_ok() }); let dst_chain_time = self.dst_time_latest()?; - let dst_block_delay = self.dst_conn_block_delay()?; + let dst_max_block_time = self.dst_max_block_time()?; let dst_latest_height = self.dst_latest_height()?; let (elapsed_dst_ods, unelapsed_dst_ods) = partition(self.dst_operational_data.take(), |op| { - self.conn_time_delay_elapsed(op, dst_chain_time).is_ok() - && self - .conn_block_delay_elapsed(op, dst_block_delay, dst_latest_height) + op.conn_time_delay_elapsed(dst_chain_time).is_ok() + && op + .conn_block_delay_elapsed(dst_max_block_time, dst_latest_height) .is_ok() }); @@ -1700,24 +1631,26 @@ impl RelayPath { pub(crate) fn fetch_scheduled_operational_data( &self, ) -> Result, LinkError> { - let (delay_time_left, delay_blocks_left, odata) = + let (time_left, blocks_left, odata) = if let Some(odata) = self.src_operational_data.pop_front() { let src_chain_time = self.src_time_latest()?; - let src_block_delay = self.src_conn_block_delay()?; + let src_block_time = self.src_max_block_time()?; let src_latest_height = self.src_latest_height()?; ( - self.conn_time_delay_elapsed(&odata, src_chain_time).err(), - self.conn_block_delay_elapsed(&odata, src_block_delay, src_latest_height) + odata.conn_time_delay_elapsed(src_chain_time).err(), + odata + .conn_block_delay_elapsed(src_block_time, src_latest_height) .err(), Some(odata), ) } else if let Some(odata) = self.dst_operational_data.pop_front() { let dst_chain_time = self.dst_time_latest()?; - let dst_block_delay = self.dst_conn_block_delay()?; + let dst_block_time = self.dst_max_block_time()?; let dst_latest_height = self.dst_latest_height()?; ( - self.conn_time_delay_elapsed(&odata, dst_chain_time).err(), - self.conn_block_delay_elapsed(&odata, dst_block_delay, dst_latest_height) + odata.conn_time_delay_elapsed(dst_chain_time).err(), + odata + .conn_block_delay_elapsed(dst_block_time, dst_latest_height) .err(), Some(odata), ) @@ -1726,7 +1659,7 @@ impl RelayPath { }; if let Some(odata) = odata { - match (delay_time_left, delay_blocks_left) { + match (time_left, blocks_left) { (None, None) => info!( "ready to fetch a scheduled op. data with batch of size {} targeting {}", odata.batch.len(), From d1b712b9dbcc847bbd10133bef0064653a5f3617 Mon Sep 17 00:00:00 2001 From: Shoaib Ahmed Date: Thu, 24 Mar 2022 18:23:02 +0530 Subject: [PATCH 36/45] Address review feedback --- relayer/src/link/operational_data.rs | 2 +- relayer/src/link/relay_path.rs | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/relayer/src/link/operational_data.rs b/relayer/src/link/operational_data.rs index 49bab9a2fe..dd11d6fb8b 100644 --- a/relayer/src/link/operational_data.rs +++ b/relayer/src/link/operational_data.rs @@ -92,7 +92,7 @@ pub struct OperationalData { pub batch: Vec, pub target: OperationalDataTarget, pub tracking_id: String, - // Stores `Some(non-zero connection-delay)` or `None` + /// Stores `Some(ConnectionDelay)` if the delay is non-zero and `None` otherwise connection_delay: Option, } diff --git a/relayer/src/link/relay_path.rs b/relayer/src/link/relay_path.rs index 5594afeba1..d36164b1eb 100644 --- a/relayer/src/link/relay_path.rs +++ b/relayer/src/link/relay_path.rs @@ -1627,7 +1627,7 @@ impl RelayPath { /// Fetches an operational data that has fulfilled its predefined delay period. May _block_ /// waiting for the delay period to pass. - /// Returns `None` if there is no operational data scheduled. + /// Returns `Ok(None)` if there is no operational data scheduled. pub(crate) fn fetch_scheduled_operational_data( &self, ) -> Result, LinkError> { From ef95dd2b5350ddd88f192c7932f133ddbf23c817 Mon Sep 17 00:00:00 2001 From: Shoaib Ahmed Date: Wed, 30 Mar 2022 14:48:12 +0530 Subject: [PATCH 37/45] Address review feedback --- modules/src/core/ics04_channel/context.rs | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/modules/src/core/ics04_channel/context.rs b/modules/src/core/ics04_channel/context.rs index 756eee922c..005af2529c 100644 --- a/modules/src/core/ics04_channel/context.rs +++ b/modules/src/core/ics04_channel/context.rs @@ -278,10 +278,10 @@ pub fn calculate_block_delay( delay_period_time: Duration, max_expected_time_per_block: Duration, ) -> u64 { - let expected_time_per_block = max_expected_time_per_block; - if expected_time_per_block.is_zero() { + if max_expected_time_per_block.is_zero() { return 0; } - FloatCore::ceil(delay_period_time.as_secs_f64() / expected_time_per_block.as_secs_f64()) as u64 + FloatCore::ceil(delay_period_time.as_secs_f64() / max_expected_time_per_block.as_secs_f64()) + as u64 } From beb9fd9678e1b895a685b575c94359f10e66fe63 Mon Sep 17 00:00:00 2001 From: Shoaib Ahmed Date: Wed, 30 Mar 2022 14:49:55 +0530 Subject: [PATCH 38/45] Closures for conn-delay specific lazy eval --- relayer/src/link/operational_data.rs | 45 ++++++++++-------- relayer/src/link/relay_path.rs | 71 +++++++++++++--------------- 2 files changed, 56 insertions(+), 60 deletions(-) diff --git a/relayer/src/link/operational_data.rs b/relayer/src/link/operational_data.rs index 004ef7f7fd..0ce93739d5 100644 --- a/relayer/src/link/operational_data.rs +++ b/relayer/src/link/operational_data.rs @@ -234,26 +234,29 @@ impl OperationalData { } } - /// Returns `Ok(())` if the connection-delay has elapsed and `Err(remaining-delay)` otherwise. - pub fn conn_time_delay_elapsed(&self, chain_time: Instant) -> Result<(), Duration> { + /// Returns `Ok(remaining-delay)` on success or `LinkError` if the input closure fails. + pub fn conn_time_delay_remaining( + &self, + chain_time: impl FnOnce() -> Result, + ) -> Result { if let Some(delay) = self.get_delay_if_needed() { - delay.conn_time_delay_elapsed(chain_time) + Ok(delay.conn_time_delay_remaining(chain_time()?)) } else { - Ok(()) + Ok(Duration::ZERO) } } - /// Returns `Ok(())` if the connection-delay has elapsed and `Err(remaining-delay)` otherwise. - pub fn conn_block_delay_elapsed( + /// Returns `Ok(remaining-delay)` on success or `LinkError` if an input closure fails. + pub fn conn_block_delay_remaining( &self, - max_expected_time_per_block: Duration, - latest_height: Height, - ) -> Result<(), u64> { + max_expected_time_per_block: impl FnOnce() -> Result, + latest_height: impl FnOnce() -> Result, + ) -> Result { if let Some(delay) = self.get_delay_if_needed() { - let block_delay = delay.conn_block_delay(max_expected_time_per_block); - delay.conn_block_delay_elapsed(block_delay, latest_height) + let block_delay = delay.conn_block_delay(max_expected_time_per_block()?); + Ok(delay.conn_block_delay_remaining(block_delay, latest_height()?)) } else { - Ok(()) + Ok(0) } } } @@ -276,30 +279,30 @@ impl ConnectionDelay { } } - /// Returns `Ok(())` if the connection-delay has elapsed and `Err(remaining-delay)` otherwise. - fn conn_time_delay_elapsed(&self, chain_time: Instant) -> Result<(), Duration> { + /// Returns `remaining-delay` if connection-delay hasn't elapsed and `Duration::ZERO` otherwise. + fn conn_time_delay_remaining(&self, chain_time: Instant) -> Duration { // since chain time instant is relative to relayer's current time, it is possible that // `scheduled_time` is later (by nano secs) than `chain_time`, hence the call to // `saturating_duration_since()`. let elapsed = chain_time.saturating_duration_since(self.scheduled_time); - if elapsed > self.delay { - Ok(()) + if elapsed >= self.delay { + Duration::ZERO } else { - Err(elapsed) + self.delay - elapsed } } - /// Returns `Ok(())` if the connection-delay has elapsed and `Err(remaining-delay)` otherwise. - fn conn_block_delay_elapsed(&self, block_delay: u64, latest_height: Height) -> Result<(), u64> { + /// Returns `remaining-delay` if connection-delay hasn't elapsed and `0` otherwise. + fn conn_block_delay_remaining(&self, block_delay: u64, latest_height: Height) -> u64 { let acceptable_height = self .update_height .expect("processed height not set") .add(block_delay); if latest_height >= acceptable_height { - Ok(()) + 0 } else { debug_assert!(acceptable_height.revision_number == latest_height.revision_number); - Err(acceptable_height.revision_height - latest_height.revision_height) + acceptable_height.revision_height - latest_height.revision_height } } diff --git a/relayer/src/link/relay_path.rs b/relayer/src/link/relay_path.rs index 8df6703893..c90c70fddb 100644 --- a/relayer/src/link/relay_path.rs +++ b/relayer/src/link/relay_path.rs @@ -1582,43 +1582,39 @@ impl RelayPath { // The mutable vector is then updated to the remaining unextracted elements. fn partition( queue: VecDeque, - pred: impl Fn(&T) -> bool, - ) -> (VecDeque, VecDeque) { + pred: impl Fn(&T) -> Result, + ) -> Result<(VecDeque, VecDeque), LinkError> { let mut true_res = VecDeque::new(); let mut false_res = VecDeque::new(); for e in queue.into_iter() { - if pred(&e) { + if pred(&e)? { true_res.push_back(e); } else { false_res.push_back(e); } } - (true_res, false_res) + Ok((true_res, false_res)) } - let src_chain_time = self.src_time_latest()?; - let src_max_block_time = self.src_max_block_time()?; - let src_latest_height = self.src_latest_height()?; + let src_chain_time = || self.src_time_latest(); + let src_max_block_time = || self.src_max_block_time(); + let src_latest_height = || self.src_latest_height(); let (elapsed_src_ods, unelapsed_src_ods) = partition(self.src_operational_data.take(), |op| { - op.conn_time_delay_elapsed(src_chain_time).is_ok() - && op - .conn_block_delay_elapsed(src_max_block_time, src_latest_height) - .is_ok() - }); - - let dst_chain_time = self.dst_time_latest()?; - let dst_max_block_time = self.dst_max_block_time()?; - let dst_latest_height = self.dst_latest_height()?; + Ok(op.conn_time_delay_remaining(src_chain_time)?.is_zero() + && op.conn_block_delay_remaining(src_max_block_time, src_latest_height)? == 0) + })?; + + let dst_chain_time = || self.dst_time_latest(); + let dst_max_block_time = || self.dst_max_block_time(); + let dst_latest_height = || self.dst_latest_height(); let (elapsed_dst_ods, unelapsed_dst_ods) = partition(self.dst_operational_data.take(), |op| { - op.conn_time_delay_elapsed(dst_chain_time).is_ok() - && op - .conn_block_delay_elapsed(dst_max_block_time, dst_latest_height) - .is_ok() - }); + Ok(op.conn_time_delay_remaining(dst_chain_time)?.is_zero() + && op.conn_block_delay_remaining(dst_max_block_time, dst_latest_height)? == 0) + })?; self.src_operational_data.replace(unelapsed_src_ods); self.dst_operational_data.replace(unelapsed_dst_ods); @@ -1633,39 +1629,35 @@ impl RelayPath { ) -> Result, LinkError> { let (time_left, blocks_left, odata) = if let Some(odata) = self.src_operational_data.pop_front() { - let src_chain_time = self.src_time_latest()?; - let src_block_time = self.src_max_block_time()?; - let src_latest_height = self.src_latest_height()?; + let src_chain_time = || self.src_time_latest(); + let src_block_time = || self.src_max_block_time(); + let src_latest_height = || self.src_latest_height(); ( - odata.conn_time_delay_elapsed(src_chain_time).err(), - odata - .conn_block_delay_elapsed(src_block_time, src_latest_height) - .err(), + odata.conn_time_delay_remaining(src_chain_time)?, + odata.conn_block_delay_remaining(src_block_time, src_latest_height)?, Some(odata), ) } else if let Some(odata) = self.dst_operational_data.pop_front() { - let dst_chain_time = self.dst_time_latest()?; - let dst_block_time = self.dst_max_block_time()?; - let dst_latest_height = self.dst_latest_height()?; + let dst_chain_time = || self.dst_time_latest(); + let dst_block_time = || self.dst_max_block_time(); + let dst_latest_height = || self.dst_latest_height(); ( - odata.conn_time_delay_elapsed(dst_chain_time).err(), - odata - .conn_block_delay_elapsed(dst_block_time, dst_latest_height) - .err(), + odata.conn_time_delay_remaining(dst_chain_time)?, + odata.conn_block_delay_remaining(dst_block_time, dst_latest_height)?, Some(odata), ) } else { - (None, None, None) + (Duration::ZERO, 0, None) }; if let Some(odata) = odata { match (time_left, blocks_left) { - (None, None) => info!( + (Duration::ZERO, 0) => info!( "ready to fetch a scheduled op. data with batch of size {} targeting {}", odata.batch.len(), odata.target, ), - (Some(delay_time), _) => { + (delay_time, 0) => { info!( "waiting ({:?} left) for a scheduled op. data with batch of size {} targeting {}", delay_time, @@ -1676,7 +1668,8 @@ impl RelayPath { // Wait until the delay period passes thread::sleep(delay_time); } - (None, Some(_blocks)) => {} + (Duration::ZERO, _delay_blocks) => {} + (_delay_time, _delay_blocks) => {} } Ok(Some(odata)) From ec076a7719417873c4bf3ee724fb1056c65ccaa5 Mon Sep 17 00:00:00 2001 From: Shoaib Ahmed Date: Wed, 30 Mar 2022 15:13:17 +0530 Subject: [PATCH 39/45] Minor refactoring --- relayer/src/link/operational_data.rs | 21 ++++++++++++++------- 1 file changed, 14 insertions(+), 7 deletions(-) diff --git a/relayer/src/link/operational_data.rs b/relayer/src/link/operational_data.rs index 0ce93739d5..552156db36 100644 --- a/relayer/src/link/operational_data.rs +++ b/relayer/src/link/operational_data.rs @@ -235,10 +235,13 @@ impl OperationalData { } /// Returns `Ok(remaining-delay)` on success or `LinkError` if the input closure fails. - pub fn conn_time_delay_remaining( + pub fn conn_time_delay_remaining( &self, - chain_time: impl FnOnce() -> Result, - ) -> Result { + chain_time: ChainTime, + ) -> Result + where + ChainTime: FnOnce() -> Result, + { if let Some(delay) = self.get_delay_if_needed() { Ok(delay.conn_time_delay_remaining(chain_time()?)) } else { @@ -247,11 +250,15 @@ impl OperationalData { } /// Returns `Ok(remaining-delay)` on success or `LinkError` if an input closure fails. - pub fn conn_block_delay_remaining( + pub fn conn_block_delay_remaining( &self, - max_expected_time_per_block: impl FnOnce() -> Result, - latest_height: impl FnOnce() -> Result, - ) -> Result { + max_expected_time_per_block: MaxBlockTime, + latest_height: LatestHeight, + ) -> Result + where + MaxBlockTime: FnOnce() -> Result, + LatestHeight: FnOnce() -> Result, + { if let Some(delay) = self.get_delay_if_needed() { let block_delay = delay.conn_block_delay(max_expected_time_per_block()?); Ok(delay.conn_block_delay_remaining(block_delay, latest_height()?)) From afff3c4be1bbb2901d5b9a23a992c55fdfd5327c Mon Sep 17 00:00:00 2001 From: Shoaib Ahmed Date: Wed, 30 Mar 2022 15:46:44 +0530 Subject: [PATCH 40/45] Add helpers has_conn_delay_elapsed() and conn_delay_remaining() --- relayer/src/link/operational_data.rs | 32 +++++++++++++++++ relayer/src/link/relay_path.rs | 54 ++++++++++++++-------------- 2 files changed, 59 insertions(+), 27 deletions(-) diff --git a/relayer/src/link/operational_data.rs b/relayer/src/link/operational_data.rs index 552156db36..0a1ad80368 100644 --- a/relayer/src/link/operational_data.rs +++ b/relayer/src/link/operational_data.rs @@ -266,6 +266,38 @@ impl OperationalData { Ok(0) } } + + pub fn has_conn_delay_elapsed( + &self, + chain_time: ChainTime, + max_expected_time_per_block: MaxBlockTime, + latest_height: LatestHeight, + ) -> Result + where + ChainTime: FnOnce() -> Result, + MaxBlockTime: FnOnce() -> Result, + LatestHeight: FnOnce() -> Result, + { + Ok(self.conn_time_delay_remaining(chain_time)?.is_zero() + && self.conn_block_delay_remaining(max_expected_time_per_block, latest_height)? == 0) + } + + pub fn conn_delay_remaining( + &self, + chain_time: ChainTime, + max_expected_time_per_block: MaxBlockTime, + latest_height: LatestHeight, + ) -> Result<(Duration, u64), LinkError> + where + ChainTime: FnOnce() -> Result, + MaxBlockTime: FnOnce() -> Result, + LatestHeight: FnOnce() -> Result, + { + Ok(( + self.conn_time_delay_remaining(chain_time)?, + self.conn_block_delay_remaining(max_expected_time_per_block, latest_height)?, + )) + } } /// A struct that holds everything that is required to calculate and deal with the connection-delay diff --git a/relayer/src/link/relay_path.rs b/relayer/src/link/relay_path.rs index c90c70fddb..c0b55822a0 100644 --- a/relayer/src/link/relay_path.rs +++ b/relayer/src/link/relay_path.rs @@ -1598,22 +1598,22 @@ impl RelayPath { Ok((true_res, false_res)) } - let src_chain_time = || self.src_time_latest(); - let src_max_block_time = || self.src_max_block_time(); - let src_latest_height = || self.src_latest_height(); let (elapsed_src_ods, unelapsed_src_ods) = partition(self.src_operational_data.take(), |op| { - Ok(op.conn_time_delay_remaining(src_chain_time)?.is_zero() - && op.conn_block_delay_remaining(src_max_block_time, src_latest_height)? == 0) + op.has_conn_delay_elapsed( + || self.src_time_latest(), + || self.src_max_block_time(), + || self.src_latest_height(), + ) })?; - let dst_chain_time = || self.dst_time_latest(); - let dst_max_block_time = || self.dst_max_block_time(); - let dst_latest_height = || self.dst_latest_height(); let (elapsed_dst_ods, unelapsed_dst_ods) = partition(self.dst_operational_data.take(), |op| { - Ok(op.conn_time_delay_remaining(dst_chain_time)?.is_zero() - && op.conn_block_delay_remaining(dst_max_block_time, dst_latest_height)? == 0) + op.has_conn_delay_elapsed( + || self.dst_time_latest(), + || self.dst_max_block_time(), + || self.dst_latest_height(), + ) })?; self.src_operational_data.replace(unelapsed_src_ods); @@ -1627,27 +1627,27 @@ impl RelayPath { pub(crate) fn fetch_scheduled_operational_data( &self, ) -> Result, LinkError> { - let (time_left, blocks_left, odata) = + let ((time_left, blocks_left), odata) = if let Some(odata) = self.src_operational_data.pop_front() { - let src_chain_time = || self.src_time_latest(); - let src_block_time = || self.src_max_block_time(); - let src_latest_height = || self.src_latest_height(); ( - odata.conn_time_delay_remaining(src_chain_time)?, - odata.conn_block_delay_remaining(src_block_time, src_latest_height)?, + odata.conn_delay_remaining( + || self.src_time_latest(), + || self.src_max_block_time(), + || self.src_latest_height(), + )?, Some(odata), ) } else if let Some(odata) = self.dst_operational_data.pop_front() { - let dst_chain_time = || self.dst_time_latest(); - let dst_block_time = || self.dst_max_block_time(); - let dst_latest_height = || self.dst_latest_height(); ( - odata.conn_time_delay_remaining(dst_chain_time)?, - odata.conn_block_delay_remaining(dst_block_time, dst_latest_height)?, + odata.conn_delay_remaining( + || self.dst_time_latest(), + || self.dst_max_block_time(), + || self.dst_latest_height(), + )?, Some(odata), ) } else { - (Duration::ZERO, 0, None) + ((Duration::ZERO, 0), None) }; if let Some(odata) = odata { @@ -1657,19 +1657,19 @@ impl RelayPath { odata.batch.len(), odata.target, ), - (delay_time, 0) => { + (time_left, 0) => { info!( "waiting ({:?} left) for a scheduled op. data with batch of size {} targeting {}", - delay_time, + time_left, odata.batch.len(), odata.target, ); // Wait until the delay period passes - thread::sleep(delay_time); + thread::sleep(time_left); } - (Duration::ZERO, _delay_blocks) => {} - (_delay_time, _delay_blocks) => {} + (Duration::ZERO, _blocks_left) => {} + (_time_left, _blocks_left) => {} } Ok(Some(odata)) From 8b32f5190821a6228888f33592098a3e31a7f02f Mon Sep 17 00:00:00 2001 From: Shoaib Ahmed Date: Wed, 30 Mar 2022 16:28:36 +0530 Subject: [PATCH 41/45] Handle connection block delay --- relayer/src/link/operational_data.rs | 36 ++++---- relayer/src/link/relay_path.rs | 130 ++++++++++++++++----------- 2 files changed, 96 insertions(+), 70 deletions(-) diff --git a/relayer/src/link/operational_data.rs b/relayer/src/link/operational_data.rs index 0a1ad80368..ee27116cc9 100644 --- a/relayer/src/link/operational_data.rs +++ b/relayer/src/link/operational_data.rs @@ -237,10 +237,10 @@ impl OperationalData { /// Returns `Ok(remaining-delay)` on success or `LinkError` if the input closure fails. pub fn conn_time_delay_remaining( &self, - chain_time: ChainTime, + chain_time: &ChainTime, ) -> Result where - ChainTime: FnOnce() -> Result, + ChainTime: Fn() -> Result, { if let Some(delay) = self.get_delay_if_needed() { Ok(delay.conn_time_delay_remaining(chain_time()?)) @@ -252,12 +252,12 @@ impl OperationalData { /// Returns `Ok(remaining-delay)` on success or `LinkError` if an input closure fails. pub fn conn_block_delay_remaining( &self, - max_expected_time_per_block: MaxBlockTime, - latest_height: LatestHeight, + max_expected_time_per_block: &MaxBlockTime, + latest_height: &LatestHeight, ) -> Result where - MaxBlockTime: FnOnce() -> Result, - LatestHeight: FnOnce() -> Result, + MaxBlockTime: Fn() -> Result, + LatestHeight: Fn() -> Result, { if let Some(delay) = self.get_delay_if_needed() { let block_delay = delay.conn_block_delay(max_expected_time_per_block()?); @@ -269,14 +269,14 @@ impl OperationalData { pub fn has_conn_delay_elapsed( &self, - chain_time: ChainTime, - max_expected_time_per_block: MaxBlockTime, - latest_height: LatestHeight, + chain_time: &ChainTime, + max_expected_time_per_block: &MaxBlockTime, + latest_height: &LatestHeight, ) -> Result where - ChainTime: FnOnce() -> Result, - MaxBlockTime: FnOnce() -> Result, - LatestHeight: FnOnce() -> Result, + ChainTime: Fn() -> Result, + MaxBlockTime: Fn() -> Result, + LatestHeight: Fn() -> Result, { Ok(self.conn_time_delay_remaining(chain_time)?.is_zero() && self.conn_block_delay_remaining(max_expected_time_per_block, latest_height)? == 0) @@ -284,14 +284,14 @@ impl OperationalData { pub fn conn_delay_remaining( &self, - chain_time: ChainTime, - max_expected_time_per_block: MaxBlockTime, - latest_height: LatestHeight, + chain_time: &ChainTime, + max_expected_time_per_block: &MaxBlockTime, + latest_height: &LatestHeight, ) -> Result<(Duration, u64), LinkError> where - ChainTime: FnOnce() -> Result, - MaxBlockTime: FnOnce() -> Result, - LatestHeight: FnOnce() -> Result, + ChainTime: Fn() -> Result, + MaxBlockTime: Fn() -> Result, + LatestHeight: Fn() -> Result, { Ok(( self.conn_time_delay_remaining(chain_time)?, diff --git a/relayer/src/link/relay_path.rs b/relayer/src/link/relay_path.rs index c0b55822a0..1a7f7d154a 100644 --- a/relayer/src/link/relay_path.rs +++ b/relayer/src/link/relay_path.rs @@ -1601,18 +1601,18 @@ impl RelayPath { let (elapsed_src_ods, unelapsed_src_ods) = partition(self.src_operational_data.take(), |op| { op.has_conn_delay_elapsed( - || self.src_time_latest(), - || self.src_max_block_time(), - || self.src_latest_height(), + &|| self.src_time_latest(), + &|| self.src_max_block_time(), + &|| self.src_latest_height(), ) })?; let (elapsed_dst_ods, unelapsed_dst_ods) = partition(self.dst_operational_data.take(), |op| { op.has_conn_delay_elapsed( - || self.dst_time_latest(), - || self.dst_max_block_time(), - || self.dst_latest_height(), + &|| self.dst_time_latest(), + &|| self.dst_max_block_time(), + &|| self.dst_latest_height(), ) })?; @@ -1621,58 +1621,84 @@ impl RelayPath { Ok((elapsed_src_ods, elapsed_dst_ods)) } - /// Fetches an operational data that has fulfilled its predefined delay period. May _block_ - /// waiting for the delay period to pass. - /// Returns `Ok(None)` if there is no operational data scheduled. - pub(crate) fn fetch_scheduled_operational_data( - &self, - ) -> Result, LinkError> { - let ((time_left, blocks_left), odata) = - if let Some(odata) = self.src_operational_data.pop_front() { - ( - odata.conn_delay_remaining( - || self.src_time_latest(), - || self.src_max_block_time(), - || self.src_latest_height(), - )?, - Some(odata), - ) - } else if let Some(odata) = self.dst_operational_data.pop_front() { - ( - odata.conn_delay_remaining( - || self.dst_time_latest(), - || self.dst_max_block_time(), - || self.dst_latest_height(), - )?, - Some(odata), - ) - } else { - ((Duration::ZERO, 0), None) - }; - - if let Some(odata) = odata { - match (time_left, blocks_left) { - (Duration::ZERO, 0) => info!( + fn wait_for_conn_delay( + odata: OperationalData, + chain_time: &ChainTime, + max_expected_time_per_block: &MaxBlockTime, + latest_height: &LatestHeight, + ) -> Result + where + ChainTime: Fn() -> Result, + MaxBlockTime: Fn() -> Result, + LatestHeight: Fn() -> Result, + { + let (time_left, blocks_left) = + odata.conn_delay_remaining(chain_time, max_expected_time_per_block, latest_height)?; + + match (time_left, blocks_left) { + (Duration::ZERO, 0) => { + info!( "ready to fetch a scheduled op. data with batch of size {} targeting {}", odata.batch.len(), odata.target, - ), - (time_left, 0) => { - info!( - "waiting ({:?} left) for a scheduled op. data with batch of size {} targeting {}", - time_left, - odata.batch.len(), - odata.target, - ); + ); + Ok(odata) + } + (Duration::ZERO, blocks_left) => { + info!( + "waiting ({:?} blocks left) for a scheduled op. data with batch of size {} targeting {}", + blocks_left, + odata.batch.len(), + odata.target, + ); - // Wait until the delay period passes - thread::sleep(time_left); - } - (Duration::ZERO, _blocks_left) => {} - (_time_left, _blocks_left) => {} + // Wait until the delay period passes + thread::sleep(blocks_left * max_expected_time_per_block()?); + + Ok(odata) + } + (time_left, _) => { + info!( + "waiting ({:?} left) for a scheduled op. data with batch of size {} targeting {}", + time_left, + odata.batch.len(), + odata.target, + ); + + // Wait until the delay period passes + thread::sleep(time_left); + + // `blocks_left` maybe non-zero, so recurse to recheck that all delays are handled. + Self::wait_for_conn_delay( + odata, + chain_time, + max_expected_time_per_block, + latest_height, + ) } + } + } - Ok(Some(odata)) + /// Fetches an operational data that has fulfilled its predefined delay period. May _block_ + /// waiting for the delay period to pass. + /// Returns `Ok(None)` if there is no operational data scheduled. + pub(crate) fn fetch_scheduled_operational_data( + &self, + ) -> Result, LinkError> { + if let Some(odata) = self.src_operational_data.pop_front() { + Ok(Some(Self::wait_for_conn_delay( + odata, + &|| self.src_time_latest(), + &|| self.src_max_block_time(), + &|| self.src_latest_height(), + )?)) + } else if let Some(odata) = self.dst_operational_data.pop_front() { + Ok(Some(Self::wait_for_conn_delay( + odata, + &|| self.dst_time_latest(), + &|| self.dst_max_block_time(), + &|| self.dst_latest_height(), + )?)) } else { Ok(None) } From 2f5679edfd8c8025b622e51f8d07c06037565e08 Mon Sep 17 00:00:00 2001 From: Shoaib Ahmed Date: Wed, 30 Mar 2022 16:44:10 +0530 Subject: [PATCH 42/45] Cast block delay to u32 --- relayer/src/link/relay_path.rs | 3 +++ 1 file changed, 3 insertions(+) diff --git a/relayer/src/link/relay_path.rs b/relayer/src/link/relay_path.rs index 1a7f7d154a..8570de4310 100644 --- a/relayer/src/link/relay_path.rs +++ b/relayer/src/link/relay_path.rs @@ -1,5 +1,6 @@ use alloc::collections::BTreeMap as HashMap; use alloc::collections::VecDeque; +use std::convert::TryInto; use std::ops::Sub; use std::thread; use std::time::{Duration, Instant}; @@ -1652,6 +1653,8 @@ impl RelayPath { odata.target, ); + let blocks_left: u32 = blocks_left.try_into().expect("blocks_left > u32::MAX"); + // Wait until the delay period passes thread::sleep(blocks_left * max_expected_time_per_block()?); From e0583586d24dfca4e2da9aeb5ced05d653843ecf Mon Sep 17 00:00:00 2001 From: Shoaib Ahmed Date: Fri, 1 Apr 2022 02:20:50 +0530 Subject: [PATCH 43/45] Extract out CLI specific link code --- relayer/src/link.rs | 55 +----------- relayer/src/link/cli.rs | 154 +++++++++++++++++++++++++++++++++ relayer/src/link/relay_path.rs | 103 ++-------------------- 3 files changed, 163 insertions(+), 149 deletions(-) create mode 100644 relayer/src/link/cli.rs diff --git a/relayer/src/link.rs b/relayer/src/link.rs index e4ef1b4e4c..de11e12e74 100644 --- a/relayer/src/link.rs +++ b/relayer/src/link.rs @@ -4,10 +4,8 @@ use ibc::{ ics04_channel::channel::State as ChannelState, ics24_host::identifier::{ChannelId, PortChannelId, PortId}, }, - events::IbcEvent, Height, }; -use tracing::error_span; use crate::chain::counterparty::check_channel_counterparty; use crate::chain::handle::ChainHandle; @@ -15,6 +13,7 @@ use crate::channel::{Channel, ChannelSide}; use crate::link::error::LinkError; use crate::link::relay_path::RelayPath; +pub mod cli; pub mod error; pub mod operational_data; mod pending; @@ -194,56 +193,4 @@ impl Link { // going slowly, but reliably. Link::new_from_opts(chain_b, chain_a, opts, with_tx_confirmation) } - - /// Implements the `packet-recv` CLI - pub fn build_and_send_recv_packet_messages(&self) -> Result, LinkError> { - let _span = error_span!( - "PacketRecvCmd", - src_chain = %self.a_to_b.src_chain().id(), - src_port = %self.a_to_b.src_port_id(), - src_channel = %self.a_to_b.src_channel_id(), - dst_chain = %self.a_to_b.dst_chain().id(), - ) - .entered(); - - self.a_to_b.build_recv_packet_and_timeout_msgs(None)?; - - let mut results = vec![]; - - // Block waiting for all of the scheduled data (until `None` is returned) - while let Some(odata) = self.a_to_b.fetch_scheduled_operational_data()? { - let mut last_res = self - .a_to_b - .relay_from_operational_data::(odata)?; - results.append(&mut last_res.events); - } - - Ok(results) - } - - /// Implements the `packet-ack` CLI - pub fn build_and_send_ack_packet_messages(&self) -> Result, LinkError> { - let _span = error_span!( - "PacketAckCmd", - src_chain = %self.a_to_b.src_chain().id(), - src_port = %self.a_to_b.src_port_id(), - src_channel = %self.a_to_b.src_channel_id(), - dst_chain = %self.a_to_b.dst_chain().id(), - ) - .entered(); - - self.a_to_b.build_packet_ack_msgs(None)?; - - let mut results = vec![]; - - // Block waiting for all of the scheduled data - while let Some(odata) = self.a_to_b.fetch_scheduled_operational_data()? { - let mut last_res = self - .a_to_b - .relay_from_operational_data::(odata)?; - results.append(&mut last_res.events); - } - - Ok(results) - } } diff --git a/relayer/src/link/cli.rs b/relayer/src/link/cli.rs new file mode 100644 index 0000000000..004605a446 --- /dev/null +++ b/relayer/src/link/cli.rs @@ -0,0 +1,154 @@ +use std::convert::TryInto; +use std::thread; +use std::time::{Duration, Instant}; + +use ibc::events::IbcEvent; +use ibc::Height; +use tracing::{error_span, info}; + +use crate::chain::handle::ChainHandle; +use crate::link::error::LinkError; +use crate::link::operational_data::OperationalData; +use crate::link::relay_path::RelayPath; +use crate::link::{relay_sender, Link}; + +impl RelayPath { + /// Fetches an operational data that has fulfilled its predefined delay period. May _block_ + /// waiting for the delay period to pass. + /// Returns `Ok(None)` if there is no operational data scheduled. + pub(crate) fn fetch_scheduled_operational_data( + &self, + ) -> Result, LinkError> { + if let Some(odata) = self.src_operational_data.pop_front() { + Ok(Some(wait_for_conn_delay( + odata, + &|| self.src_time_latest(), + &|| self.src_max_block_time(), + &|| self.src_latest_height(), + )?)) + } else if let Some(odata) = self.dst_operational_data.pop_front() { + Ok(Some(wait_for_conn_delay( + odata, + &|| self.dst_time_latest(), + &|| self.dst_max_block_time(), + &|| self.dst_latest_height(), + )?)) + } else { + Ok(None) + } + } +} + +impl Link { + /// Implements the `packet-recv` CLI + pub fn build_and_send_recv_packet_messages(&self) -> Result, LinkError> { + let _span = error_span!( + "PacketRecvCmd", + src_chain = %self.a_to_b.src_chain().id(), + src_port = %self.a_to_b.src_port_id(), + src_channel = %self.a_to_b.src_channel_id(), + dst_chain = %self.a_to_b.dst_chain().id(), + ) + .entered(); + + self.a_to_b.build_recv_packet_and_timeout_msgs(None)?; + + let mut results = vec![]; + + // Block waiting for all of the scheduled data (until `None` is returned) + while let Some(odata) = self.a_to_b.fetch_scheduled_operational_data()? { + let mut last_res = self + .a_to_b + .relay_from_operational_data::(odata)?; + results.append(&mut last_res.events); + } + + Ok(results) + } + + /// Implements the `packet-ack` CLI + pub fn build_and_send_ack_packet_messages(&self) -> Result, LinkError> { + let _span = error_span!( + "PacketAckCmd", + src_chain = %self.a_to_b.src_chain().id(), + src_port = %self.a_to_b.src_port_id(), + src_channel = %self.a_to_b.src_channel_id(), + dst_chain = %self.a_to_b.dst_chain().id(), + ) + .entered(); + + self.a_to_b.build_packet_ack_msgs(None)?; + + let mut results = vec![]; + + // Block waiting for all of the scheduled data + while let Some(odata) = self.a_to_b.fetch_scheduled_operational_data()? { + let mut last_res = self + .a_to_b + .relay_from_operational_data::(odata)?; + results.append(&mut last_res.events); + } + + Ok(results) + } +} + +fn wait_for_conn_delay( + odata: OperationalData, + chain_time: &ChainTime, + max_expected_time_per_block: &MaxBlockTime, + latest_height: &LatestHeight, +) -> Result +where + ChainTime: Fn() -> Result, + MaxBlockTime: Fn() -> Result, + LatestHeight: Fn() -> Result, +{ + let (time_left, blocks_left) = + odata.conn_delay_remaining(chain_time, max_expected_time_per_block, latest_height)?; + + match (time_left, blocks_left) { + (Duration::ZERO, 0) => { + info!( + "ready to fetch a scheduled op. data with batch of size {} targeting {}", + odata.batch.len(), + odata.target, + ); + Ok(odata) + } + (Duration::ZERO, blocks_left) => { + info!( + "waiting ({:?} blocks left) for a scheduled op. data with batch of size {} targeting {}", + blocks_left, + odata.batch.len(), + odata.target, + ); + + let blocks_left: u32 = blocks_left.try_into().expect("blocks_left > u32::MAX"); + + // Wait until the delay period passes + thread::sleep(blocks_left * max_expected_time_per_block()?); + + Ok(odata) + } + (time_left, _) => { + info!( + "waiting ({:?} left) for a scheduled op. data with batch of size {} targeting {}", + time_left, + odata.batch.len(), + odata.target, + ); + + // Wait until the delay period passes + thread::sleep(time_left); + + // `blocks_left` maybe non-zero, so recurse to recheck that all delays are handled. + wait_for_conn_delay( + odata, + chain_time, + max_expected_time_per_block, + latest_height, + ) + } + } +} diff --git a/relayer/src/link/relay_path.rs b/relayer/src/link/relay_path.rs index 8570de4310..343ec03692 100644 --- a/relayer/src/link/relay_path.rs +++ b/relayer/src/link/relay_path.rs @@ -1,8 +1,6 @@ use alloc::collections::BTreeMap as HashMap; use alloc::collections::VecDeque; -use std::convert::TryInto; use std::ops::Sub; -use std::thread; use std::time::{Duration, Instant}; use ibc_proto::google::protobuf::Any; @@ -75,8 +73,8 @@ pub struct RelayPath { // mostly timeout packet messages. // The operational data targeting the destination chain // comprises mostly RecvPacket and Ack msgs. - src_operational_data: Queue, - dst_operational_data: Queue, + pub(crate) src_operational_data: Queue, + pub(crate) dst_operational_data: Queue, // Toggle for the transaction confirmation mechanism. confirm_txes: bool, @@ -197,13 +195,13 @@ impl RelayPath { .map_err(|e| LinkError::signer(self.dst_chain().id(), e)) } - pub fn src_latest_height(&self) -> Result { + pub(crate) fn src_latest_height(&self) -> Result { self.src_chain() .query_latest_height() .map_err(|e| LinkError::query(self.src_chain().id(), e)) } - pub fn dst_latest_height(&self) -> Result { + pub(crate) fn dst_latest_height(&self) -> Result { self.dst_chain() .query_latest_height() .map_err(|e| LinkError::query(self.dst_chain().id(), e)) @@ -217,15 +215,15 @@ impl RelayPath { Self::chain_time_at_height(self.dst_chain(), height) } - fn src_time_latest(&self) -> Result { + pub(crate) fn src_time_latest(&self) -> Result { self.src_time_at_height(Height::zero()) } - fn dst_time_latest(&self) -> Result { + pub(crate) fn dst_time_latest(&self) -> Result { self.dst_time_at_height(Height::zero()) } - fn src_max_block_time(&self) -> Result { + pub(crate) fn src_max_block_time(&self) -> Result { // TODO(hu55a1n1): Ideally, we should get the `max_expected_time_per_block` using the // `/genesis` endpoint once it is working in tendermint-rs. Ok(self @@ -235,7 +233,7 @@ impl RelayPath { .max_block_time) } - fn dst_max_block_time(&self) -> Result { + pub(crate) fn dst_max_block_time(&self) -> Result { Ok(self .dst_chain() .config() @@ -1622,91 +1620,6 @@ impl RelayPath { Ok((elapsed_src_ods, elapsed_dst_ods)) } - fn wait_for_conn_delay( - odata: OperationalData, - chain_time: &ChainTime, - max_expected_time_per_block: &MaxBlockTime, - latest_height: &LatestHeight, - ) -> Result - where - ChainTime: Fn() -> Result, - MaxBlockTime: Fn() -> Result, - LatestHeight: Fn() -> Result, - { - let (time_left, blocks_left) = - odata.conn_delay_remaining(chain_time, max_expected_time_per_block, latest_height)?; - - match (time_left, blocks_left) { - (Duration::ZERO, 0) => { - info!( - "ready to fetch a scheduled op. data with batch of size {} targeting {}", - odata.batch.len(), - odata.target, - ); - Ok(odata) - } - (Duration::ZERO, blocks_left) => { - info!( - "waiting ({:?} blocks left) for a scheduled op. data with batch of size {} targeting {}", - blocks_left, - odata.batch.len(), - odata.target, - ); - - let blocks_left: u32 = blocks_left.try_into().expect("blocks_left > u32::MAX"); - - // Wait until the delay period passes - thread::sleep(blocks_left * max_expected_time_per_block()?); - - Ok(odata) - } - (time_left, _) => { - info!( - "waiting ({:?} left) for a scheduled op. data with batch of size {} targeting {}", - time_left, - odata.batch.len(), - odata.target, - ); - - // Wait until the delay period passes - thread::sleep(time_left); - - // `blocks_left` maybe non-zero, so recurse to recheck that all delays are handled. - Self::wait_for_conn_delay( - odata, - chain_time, - max_expected_time_per_block, - latest_height, - ) - } - } - } - - /// Fetches an operational data that has fulfilled its predefined delay period. May _block_ - /// waiting for the delay period to pass. - /// Returns `Ok(None)` if there is no operational data scheduled. - pub(crate) fn fetch_scheduled_operational_data( - &self, - ) -> Result, LinkError> { - if let Some(odata) = self.src_operational_data.pop_front() { - Ok(Some(Self::wait_for_conn_delay( - odata, - &|| self.src_time_latest(), - &|| self.src_max_block_time(), - &|| self.src_latest_height(), - )?)) - } else if let Some(odata) = self.dst_operational_data.pop_front() { - Ok(Some(Self::wait_for_conn_delay( - odata, - &|| self.dst_time_latest(), - &|| self.dst_max_block_time(), - &|| self.dst_latest_height(), - )?)) - } else { - Ok(None) - } - } - fn restore_src_client(&self) -> ForeignClient { ForeignClient::restore( self.src_client_id().clone(), From f8f6a94e7ba4228fc5c3b902ea9a94ad6c3b7b73 Mon Sep 17 00:00:00 2001 From: Shoaib Ahmed Date: Fri, 1 Apr 2022 17:12:53 +0530 Subject: [PATCH 44/45] Make opdata methods priv --- relayer/src/link/operational_data.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/relayer/src/link/operational_data.rs b/relayer/src/link/operational_data.rs index ee27116cc9..82c1cae72a 100644 --- a/relayer/src/link/operational_data.rs +++ b/relayer/src/link/operational_data.rs @@ -235,7 +235,7 @@ impl OperationalData { } /// Returns `Ok(remaining-delay)` on success or `LinkError` if the input closure fails. - pub fn conn_time_delay_remaining( + fn conn_time_delay_remaining( &self, chain_time: &ChainTime, ) -> Result @@ -250,7 +250,7 @@ impl OperationalData { } /// Returns `Ok(remaining-delay)` on success or `LinkError` if an input closure fails. - pub fn conn_block_delay_remaining( + fn conn_block_delay_remaining( &self, max_expected_time_per_block: &MaxBlockTime, latest_height: &LatestHeight, From 14dcb2ae44e47c17a0b03a56aa32d1a74840ee3e Mon Sep 17 00:00:00 2001 From: Shoaib Ahmed Date: Fri, 1 Apr 2022 18:02:16 +0530 Subject: [PATCH 45/45] Apply suggestions from review --- relayer/src/link/relay_path.rs | 49 ++++++++++++++++------------------ 1 file changed, 23 insertions(+), 26 deletions(-) diff --git a/relayer/src/link/relay_path.rs b/relayer/src/link/relay_path.rs index 343ec03692..dff7dd8a6f 100644 --- a/relayer/src/link/relay_path.rs +++ b/relayer/src/link/relay_path.rs @@ -10,7 +10,9 @@ use tracing::{debug, error, info, span, trace, warn, Level}; use ibc::{ core::{ ics02_client::{ - client_consensus::QueryClientEventRequest, events::UpdateClient as UpdateClientEvent, + client_consensus::QueryClientEventRequest, + events::ClientMisbehaviour as ClientMisbehaviourEvent, + events::UpdateClient as UpdateClientEvent, }, ics04_channel::{ channel::{ChannelEnd, Order, QueryPacketEventDataRequest, State as ChannelState}, @@ -760,30 +762,31 @@ impl RelayPath { } /// Loops over `tx_events` and returns a tuple of optional events where the first element is a - /// `ChainError` variant, the second one is an `UpdateClient` variant and the third one is any - /// other variant (usually `ClientMisbehaviour`). This function is essentially just an - /// `Iterator::find()` for multiple variants with a single pass. + /// `ChainError` variant, the second one is an `UpdateClient` variant and the third one is a + /// `ClientMisbehaviour` variant. This function is essentially just an `Iterator::find()` for + /// multiple variants with a single pass. #[inline] fn event_per_type( mut tx_events: Vec, ) -> ( Option, Option, - Option, + Option, ) { let mut error = None; let mut update = None; - let mut other = None; + let mut misbehaviour = None; while let Some(event) = tx_events.pop() { match event { IbcEvent::ChainError(_) => error = Some(event), IbcEvent::UpdateClient(event) => update = Some(event), - _ => other = Some(event), + IbcEvent::ClientMisbehaviour(event) => misbehaviour = Some(event), + _ => {} } } - (error, update, other) + (error, update, misbehaviour) } /// Returns an instant (in the past) that corresponds to the block timestamp of the chain at @@ -835,10 +838,12 @@ impl RelayPath { .map_err(LinkError::relayer)?; info!("result: {}", PrettyEvents(&dst_tx_events)); - let (error, update, other) = Self::event_per_type(dst_tx_events); - match (error, update, other) { + let (error, update, misbehaviour) = Self::event_per_type(dst_tx_events); + match (error, update, misbehaviour) { + // All updates were successful, no errors and no misbehaviour. (None, Some(update_event), None) => Ok(update_event.height()), (Some(chain_error), _, _) => { + // Atleast one chain-error so retry if possible. if retries_left == 0 { Err(LinkError::client(ForeignClientError::chain_error_event( self.dst_chain().id(), @@ -862,13 +867,8 @@ impl RelayPath { _ => Err(LinkError::update_client_failed()), } } - (_, _, event) => { - if !matches!(event, Some(IbcEvent::ClientMisbehaviour(_))) && retries_left > 0 { - self.do_update_client_dst(src_chain_height, tracking_id, retries_left - 1) - } else { - Err(LinkError::update_client_failed()) - } - } + // Atleast one misbehaviour event, so don't retry. + (_, _, Some(_misbehaviour)) => Err(LinkError::update_client_failed()), } } @@ -904,10 +904,12 @@ impl RelayPath { .map_err(LinkError::relayer)?; info!("result: {}", PrettyEvents(&src_tx_events)); - let (error, update, other) = Self::event_per_type(src_tx_events); - match (error, update, other) { + let (error, update, misbehaviour) = Self::event_per_type(src_tx_events); + match (error, update, misbehaviour) { + // All updates were successful, no errors and no misbehaviour. (None, Some(update_event), None) => Ok(update_event.height()), (Some(chain_error), _, _) => { + // Atleast one chain-error so retry if possible. if retries_left == 0 { Err(LinkError::client(ForeignClientError::chain_error_event( self.src_chain().id(), @@ -931,13 +933,8 @@ impl RelayPath { _ => Err(LinkError::update_client_failed()), } } - (_, _, event) => { - if !matches!(event, Some(IbcEvent::ClientMisbehaviour(_))) && retries_left > 0 { - self.do_update_client_src(dst_chain_height, tracking_id, retries_left - 1) - } else { - Err(LinkError::update_client_failed()) - } - } + // Atleast one misbehaviour event, so don't retry. + (_, _, Some(_misbehaviour)) => Err(LinkError::update_client_failed()), } }