diff --git a/.changelog/unreleased/bug-fixes/ibc-relayer/1772-fix-conn-delay-check.md b/.changelog/unreleased/bug-fixes/ibc-relayer/1772-fix-conn-delay-check.md new file mode 100644 index 0000000000..3bbf8ce62f --- /dev/null +++ b/.changelog/unreleased/bug-fixes/ibc-relayer/1772-fix-conn-delay-check.md @@ -0,0 +1,2 @@ +- Fix the connection delay logic to use the timestamp of the host block when the client update header was installed. + ([#1772](https://github.com/informalsystems/ibc-rs/issues/1772)) \ No newline at end of file diff --git a/config.toml b/config.toml index 7fb6f24bff..0ccad6b9af 100644 --- a/config.toml +++ b/config.toml @@ -177,6 +177,7 @@ clock_drift = '5s' # The block time together with the clock drift are added to the source drift to estimate # the maximum clock drift when creating a client on this chain. Default: 10s # For cosmos-SDK chains a good approximation is `timeout_propose` + `timeout_commit` +# Note: This MUST be the same as the `max_expected_time_per_block` genesis parameter for Tendermint chains. max_block_time = '10s' # Specify the amount of time to be used as the light client trusting period. diff --git a/modules/src/core/ics04_channel/context.rs b/modules/src/core/ics04_channel/context.rs index 165cf688ad..005af2529c 100644 --- a/modules/src/core/ics04_channel/context.rs +++ b/modules/src/core/ics04_channel/context.rs @@ -99,13 +99,7 @@ pub trait ChannelReader { fn max_expected_time_per_block(&self) -> Duration; fn block_delay(&self, delay_period_time: Duration) -> u64 { - let expected_time_per_block = self.max_expected_time_per_block(); - if expected_time_per_block.is_zero() { - return 0; - } - - FloatCore::ceil(delay_period_time.as_secs_f64() / expected_time_per_block.as_secs_f64()) - as u64 + calculate_block_delay(delay_period_time, self.max_expected_time_per_block()) } } @@ -279,3 +273,15 @@ pub trait ChannelKeeper { /// Should never fail. fn increase_channel_counter(&mut self); } + +pub fn calculate_block_delay( + delay_period_time: Duration, + max_expected_time_per_block: Duration, +) -> u64 { + if max_expected_time_per_block.is_zero() { + return 0; + } + + FloatCore::ceil(delay_period_time.as_secs_f64() / max_expected_time_per_block.as_secs_f64()) + as u64 +} diff --git a/relayer/src/chain.rs b/relayer/src/chain.rs index 3da5d0f0b6..e008a61b65 100644 --- a/relayer/src/chain.rs +++ b/relayer/src/chain.rs @@ -270,6 +270,8 @@ pub trait ChainEndpoint: Sized { request: QueryBlockRequest, ) -> Result<(Vec, Vec), Error>; + fn query_host_consensus_state(&self, height: ICSHeight) -> Result; + // Provable queries fn proven_client_state( &self, diff --git a/relayer/src/chain/cosmos.rs b/relayer/src/chain/cosmos.rs index 496e564cb6..f7adc8fcdf 100644 --- a/relayer/src/chain/cosmos.rs +++ b/relayer/src/chain/cosmos.rs @@ -1948,6 +1948,20 @@ impl ChainEndpoint for CosmosSdkChain { } } + fn query_host_consensus_state(&self, height: ICSHeight) -> Result { + let height = Height::try_from(height.revision_height).map_err(Error::invalid_height)?; + + // TODO(hu55a1n1): use the `/header` RPC endpoint instead when we move to tendermint v0.35.x + let rpc_call = match height.value() { + 0 => self.rpc_client.latest_block(), + _ => self.rpc_client.block(height), + }; + let response = self + .block_on(rpc_call) + .map_err(|e| Error::rpc(self.config.rpc_addr.clone(), e))?; + Ok(response.block.header.into()) + } + fn proven_client_state( &self, client_id: &ClientId, diff --git a/relayer/src/chain/handle.rs b/relayer/src/chain/handle.rs index 2b91790553..4433063272 100644 --- a/relayer/src/chain/handle.rs +++ b/relayer/src/chain/handle.rs @@ -333,6 +333,11 @@ pub enum ChainRequest { request: QueryBlockRequest, reply_to: ReplyTo<(Vec, Vec)>, }, + + QueryHostConsensusState { + height: Height, + reply_to: ReplyTo, + }, } pub trait ChainHandle: Clone + Send + Sync + Serialize + Debug + 'static { @@ -560,4 +565,6 @@ pub trait ChainHandle: Clone + Send + Sync + Serialize + Debug + 'static { &self, request: QueryBlockRequest, ) -> Result<(Vec, Vec), Error>; + + fn query_host_consensus_state(&self, height: Height) -> Result; } diff --git a/relayer/src/chain/handle/base.rs b/relayer/src/chain/handle/base.rs index 23b13c8f6f..4a76f77c6f 100644 --- a/relayer/src/chain/handle/base.rs +++ b/relayer/src/chain/handle/base.rs @@ -454,6 +454,10 @@ impl ChainHandle for BaseChainHandle { ) -> Result<(Vec, Vec), Error> { self.send(|reply_to| ChainRequest::QueryPacketEventDataFromBlocks { request, reply_to }) } + + fn query_host_consensus_state(&self, height: Height) -> Result { + self.send(|reply_to| ChainRequest::QueryHostConsensusState { height, reply_to }) + } } impl Serialize for BaseChainHandle { diff --git a/relayer/src/chain/handle/cache.rs b/relayer/src/chain/handle/cache.rs index dac48d1ea0..203aa4d0d8 100644 --- a/relayer/src/chain/handle/cache.rs +++ b/relayer/src/chain/handle/cache.rs @@ -412,4 +412,8 @@ impl ChainHandle for CachingChainHandle { ) -> Result<(Vec, Vec), Error> { self.inner().query_blocks(request) } + + fn query_host_consensus_state(&self, height: Height) -> Result { + self.inner.query_host_consensus_state(height) + } } diff --git a/relayer/src/chain/handle/counting.rs b/relayer/src/chain/handle/counting.rs index 948a24bb11..98afec74fa 100644 --- a/relayer/src/chain/handle/counting.rs +++ b/relayer/src/chain/handle/counting.rs @@ -448,4 +448,8 @@ impl ChainHandle for CountingChainHandle { self.inc_metric("query_blocks"); self.inner().query_blocks(request) } + + fn query_host_consensus_state(&self, height: Height) -> Result { + self.inner.query_host_consensus_state(height) + } } diff --git a/relayer/src/chain/mock.rs b/relayer/src/chain/mock.rs index ed6506f79f..a4b6f94c9a 100644 --- a/relayer/src/chain/mock.rs +++ b/relayer/src/chain/mock.rs @@ -302,6 +302,10 @@ impl ChainEndpoint for MockChain { unimplemented!() } + fn query_host_consensus_state(&self, _height: Height) -> Result { + unimplemented!() + } + fn proven_client_state( &self, _client_id: &ClientId, diff --git a/relayer/src/chain/runtime.rs b/relayer/src/chain/runtime.rs index 13b5eec28f..24a04534ca 100644 --- a/relayer/src/chain/runtime.rs +++ b/relayer/src/chain/runtime.rs @@ -411,6 +411,10 @@ where self.query_blocks(request, reply_to)? }, + Ok(ChainRequest::QueryHostConsensusState { height, reply_to }) => { + self.query_host_consensus_state(height, reply_to)? + }, + Err(e) => error!("received error via chain request channel: {}", e), } }, @@ -870,4 +874,19 @@ where Ok(()) } + + fn query_host_consensus_state( + &self, + height: Height, + reply_to: ReplyTo, + ) -> Result<(), Error> { + let result = self + .chain + .query_host_consensus_state(height) + .map(|h| h.wrap_any()); + + reply_to.send(result).map_err(Error::send)?; + + Ok(()) + } } diff --git a/relayer/src/link.rs b/relayer/src/link.rs index ca39bc605f..de11e12e74 100644 --- a/relayer/src/link.rs +++ b/relayer/src/link.rs @@ -4,10 +4,8 @@ use ibc::{ ics04_channel::channel::State as ChannelState, ics24_host::identifier::{ChannelId, PortChannelId, PortId}, }, - events::IbcEvent, Height, }; -use tracing::error_span; use crate::chain::counterparty::check_channel_counterparty; use crate::chain::handle::ChainHandle; @@ -15,6 +13,7 @@ use crate::channel::{Channel, ChannelSide}; use crate::link::error::LinkError; use crate::link::relay_path::RelayPath; +pub mod cli; pub mod error; pub mod operational_data; mod pending; @@ -194,56 +193,4 @@ impl Link { // going slowly, but reliably. Link::new_from_opts(chain_b, chain_a, opts, with_tx_confirmation) } - - /// Implements the `packet-recv` CLI - pub fn build_and_send_recv_packet_messages(&self) -> Result, LinkError> { - let _span = error_span!( - "PacketRecvCmd", - src_chain = %self.a_to_b.src_chain().id(), - src_port = %self.a_to_b.src_port_id(), - src_channel = %self.a_to_b.src_channel_id(), - dst_chain = %self.a_to_b.dst_chain().id(), - ) - .entered(); - - self.a_to_b.build_recv_packet_and_timeout_msgs(None)?; - - let mut results = vec![]; - - // Block waiting for all of the scheduled data (until `None` is returned) - while let Some(odata) = self.a_to_b.fetch_scheduled_operational_data() { - let mut last_res = self - .a_to_b - .relay_from_operational_data::(odata)?; - results.append(&mut last_res.events); - } - - Ok(results) - } - - /// Implements the `packet-ack` CLI - pub fn build_and_send_ack_packet_messages(&self) -> Result, LinkError> { - let _span = error_span!( - "PacketAckCmd", - src_chain = %self.a_to_b.src_chain().id(), - src_port = %self.a_to_b.src_port_id(), - src_channel = %self.a_to_b.src_channel_id(), - dst_chain = %self.a_to_b.dst_chain().id(), - ) - .entered(); - - self.a_to_b.build_packet_ack_msgs(None)?; - - let mut results = vec![]; - - // Block waiting for all of the scheduled data - while let Some(odata) = self.a_to_b.fetch_scheduled_operational_data() { - let mut last_res = self - .a_to_b - .relay_from_operational_data::(odata)?; - results.append(&mut last_res.events); - } - - Ok(results) - } } diff --git a/relayer/src/link/cli.rs b/relayer/src/link/cli.rs new file mode 100644 index 0000000000..004605a446 --- /dev/null +++ b/relayer/src/link/cli.rs @@ -0,0 +1,154 @@ +use std::convert::TryInto; +use std::thread; +use std::time::{Duration, Instant}; + +use ibc::events::IbcEvent; +use ibc::Height; +use tracing::{error_span, info}; + +use crate::chain::handle::ChainHandle; +use crate::link::error::LinkError; +use crate::link::operational_data::OperationalData; +use crate::link::relay_path::RelayPath; +use crate::link::{relay_sender, Link}; + +impl RelayPath { + /// Fetches an operational data that has fulfilled its predefined delay period. May _block_ + /// waiting for the delay period to pass. + /// Returns `Ok(None)` if there is no operational data scheduled. + pub(crate) fn fetch_scheduled_operational_data( + &self, + ) -> Result, LinkError> { + if let Some(odata) = self.src_operational_data.pop_front() { + Ok(Some(wait_for_conn_delay( + odata, + &|| self.src_time_latest(), + &|| self.src_max_block_time(), + &|| self.src_latest_height(), + )?)) + } else if let Some(odata) = self.dst_operational_data.pop_front() { + Ok(Some(wait_for_conn_delay( + odata, + &|| self.dst_time_latest(), + &|| self.dst_max_block_time(), + &|| self.dst_latest_height(), + )?)) + } else { + Ok(None) + } + } +} + +impl Link { + /// Implements the `packet-recv` CLI + pub fn build_and_send_recv_packet_messages(&self) -> Result, LinkError> { + let _span = error_span!( + "PacketRecvCmd", + src_chain = %self.a_to_b.src_chain().id(), + src_port = %self.a_to_b.src_port_id(), + src_channel = %self.a_to_b.src_channel_id(), + dst_chain = %self.a_to_b.dst_chain().id(), + ) + .entered(); + + self.a_to_b.build_recv_packet_and_timeout_msgs(None)?; + + let mut results = vec![]; + + // Block waiting for all of the scheduled data (until `None` is returned) + while let Some(odata) = self.a_to_b.fetch_scheduled_operational_data()? { + let mut last_res = self + .a_to_b + .relay_from_operational_data::(odata)?; + results.append(&mut last_res.events); + } + + Ok(results) + } + + /// Implements the `packet-ack` CLI + pub fn build_and_send_ack_packet_messages(&self) -> Result, LinkError> { + let _span = error_span!( + "PacketAckCmd", + src_chain = %self.a_to_b.src_chain().id(), + src_port = %self.a_to_b.src_port_id(), + src_channel = %self.a_to_b.src_channel_id(), + dst_chain = %self.a_to_b.dst_chain().id(), + ) + .entered(); + + self.a_to_b.build_packet_ack_msgs(None)?; + + let mut results = vec![]; + + // Block waiting for all of the scheduled data + while let Some(odata) = self.a_to_b.fetch_scheduled_operational_data()? { + let mut last_res = self + .a_to_b + .relay_from_operational_data::(odata)?; + results.append(&mut last_res.events); + } + + Ok(results) + } +} + +fn wait_for_conn_delay( + odata: OperationalData, + chain_time: &ChainTime, + max_expected_time_per_block: &MaxBlockTime, + latest_height: &LatestHeight, +) -> Result +where + ChainTime: Fn() -> Result, + MaxBlockTime: Fn() -> Result, + LatestHeight: Fn() -> Result, +{ + let (time_left, blocks_left) = + odata.conn_delay_remaining(chain_time, max_expected_time_per_block, latest_height)?; + + match (time_left, blocks_left) { + (Duration::ZERO, 0) => { + info!( + "ready to fetch a scheduled op. data with batch of size {} targeting {}", + odata.batch.len(), + odata.target, + ); + Ok(odata) + } + (Duration::ZERO, blocks_left) => { + info!( + "waiting ({:?} blocks left) for a scheduled op. data with batch of size {} targeting {}", + blocks_left, + odata.batch.len(), + odata.target, + ); + + let blocks_left: u32 = blocks_left.try_into().expect("blocks_left > u32::MAX"); + + // Wait until the delay period passes + thread::sleep(blocks_left * max_expected_time_per_block()?); + + Ok(odata) + } + (time_left, _) => { + info!( + "waiting ({:?} left) for a scheduled op. data with batch of size {} targeting {}", + time_left, + odata.batch.len(), + odata.target, + ); + + // Wait until the delay period passes + thread::sleep(time_left); + + // `blocks_left` maybe non-zero, so recurse to recheck that all delays are handled. + wait_for_conn_delay( + odata, + chain_time, + max_expected_time_per_block, + latest_height, + ) + } + } +} diff --git a/relayer/src/link/error.rs b/relayer/src/link/error.rs index 0bdd67fbdf..67b11869bc 100644 --- a/relayer/src/link/error.rs +++ b/relayer/src/link/error.rs @@ -143,7 +143,9 @@ define_error! { e.channel_id, e.chain_id) }, - } + UpdateClientFailed + |_| { "failed to update client" }, + } } impl HasExpiredOrFrozenError for LinkErrorDetail { diff --git a/relayer/src/link/operational_data.rs b/relayer/src/link/operational_data.rs index e7d3a2bbd9..82c1cae72a 100644 --- a/relayer/src/link/operational_data.rs +++ b/relayer/src/link/operational_data.rs @@ -1,12 +1,14 @@ use alloc::borrow::Cow; use core::fmt; use core::iter; -use std::time::Instant; +use std::time::{Duration, Instant}; use ibc_proto::google::protobuf::Any; use nanoid::nanoid; use tracing::{debug, info}; +use ibc::core::ics02_client::client_state::ClientState; +use ibc::core::ics04_channel::context::calculate_block_delay; use ibc::events::IbcEvent; use ibc::Height; @@ -89,10 +91,9 @@ pub struct OperationalData { pub proofs_height: Height, pub batch: Vec, pub target: OperationalDataTarget, - /// Stores the time when the clients on the target chain has been updated, i.e., when this data - /// was scheduled. Necessary for packet delays. - pub scheduled_time: Instant, pub tracking_id: String, + /// Stores `Some(ConnectionDelay)` if the delay is non-zero and `None` otherwise + connection_delay: Option, } impl OperationalData { @@ -100,12 +101,18 @@ impl OperationalData { proofs_height: Height, target: OperationalDataTarget, tracking_id: impl Into, + connection_delay: Duration, ) -> Self { + let connection_delay = if !connection_delay.is_zero() { + Some(ConnectionDelay::new(connection_delay)) + } else { + None + }; OperationalData { proofs_height, batch: vec![], target, - scheduled_time: Instant::now(), + connection_delay, tracking_id: tracking_id.into(), } } @@ -141,7 +148,7 @@ impl OperationalData { relay_path: &RelayPath, ) -> Result { // For zero delay we prepend the client update msgs. - let client_update_msg = if relay_path.zero_delay() { + let client_update_msg = if !self.conn_delay_needed() { let update_height = self.proofs_height.increment(); debug!( @@ -162,7 +169,22 @@ impl OperationalData { client_update_opt.pop() } else { - None + let client_state = match self.target { + OperationalDataTarget::Source => relay_path + .src_chain() + .query_client_state(relay_path.src_client_id(), Height::zero()) + .map_err(|e| LinkError::query(relay_path.src_chain().id(), e))?, + OperationalDataTarget::Destination => relay_path + .dst_chain() + .query_client_state(relay_path.dst_client_id(), Height::zero()) + .map_err(|e| LinkError::query(relay_path.dst_chain().id(), e))?, + }; + + if client_state.is_frozen() { + return Ok(TrackedMsgs::new(vec![], &self.tracking_id)); + } else { + None + } }; let msgs: Vec = match client_update_msg { @@ -178,6 +200,155 @@ impl OperationalData { Ok(tm) } + + /// Returns true iff the batch contains a packet event + fn has_packet_msgs(&self) -> bool { + self.batch.iter().any(|msg| msg.event.packet().is_some()) + } + + /// Returns the `connection_delay` iff the connection delay for this relaying path is non-zero + /// and the `batch` contains packet messages. + fn get_delay_if_needed(&self) -> Option<&ConnectionDelay> { + self.connection_delay + .as_ref() + .filter(|_| self.has_packet_msgs()) + } + + /// Returns `true` iff the connection delay for this relaying path is non-zero and `op_data` + /// contains packet messages. + pub fn conn_delay_needed(&self) -> bool { + self.get_delay_if_needed().is_some() + } + + /// Sets the scheduled time that is used for connection-delay calculations + pub fn set_scheduled_time(&mut self, scheduled_time: Instant) { + if let Some(mut delay) = self.connection_delay.as_mut() { + delay.scheduled_time = scheduled_time; + } + } + + /// Sets the update height that is used for connection-delay calculations + pub fn set_update_height(&mut self, update_height: Height) { + if let Some(mut delay) = self.connection_delay.as_mut() { + delay.update_height = Some(update_height); + } + } + + /// Returns `Ok(remaining-delay)` on success or `LinkError` if the input closure fails. + fn conn_time_delay_remaining( + &self, + chain_time: &ChainTime, + ) -> Result + where + ChainTime: Fn() -> Result, + { + if let Some(delay) = self.get_delay_if_needed() { + Ok(delay.conn_time_delay_remaining(chain_time()?)) + } else { + Ok(Duration::ZERO) + } + } + + /// Returns `Ok(remaining-delay)` on success or `LinkError` if an input closure fails. + fn conn_block_delay_remaining( + &self, + max_expected_time_per_block: &MaxBlockTime, + latest_height: &LatestHeight, + ) -> Result + where + MaxBlockTime: Fn() -> Result, + LatestHeight: Fn() -> Result, + { + if let Some(delay) = self.get_delay_if_needed() { + let block_delay = delay.conn_block_delay(max_expected_time_per_block()?); + Ok(delay.conn_block_delay_remaining(block_delay, latest_height()?)) + } else { + Ok(0) + } + } + + pub fn has_conn_delay_elapsed( + &self, + chain_time: &ChainTime, + max_expected_time_per_block: &MaxBlockTime, + latest_height: &LatestHeight, + ) -> Result + where + ChainTime: Fn() -> Result, + MaxBlockTime: Fn() -> Result, + LatestHeight: Fn() -> Result, + { + Ok(self.conn_time_delay_remaining(chain_time)?.is_zero() + && self.conn_block_delay_remaining(max_expected_time_per_block, latest_height)? == 0) + } + + pub fn conn_delay_remaining( + &self, + chain_time: &ChainTime, + max_expected_time_per_block: &MaxBlockTime, + latest_height: &LatestHeight, + ) -> Result<(Duration, u64), LinkError> + where + ChainTime: Fn() -> Result, + MaxBlockTime: Fn() -> Result, + LatestHeight: Fn() -> Result, + { + Ok(( + self.conn_time_delay_remaining(chain_time)?, + self.conn_block_delay_remaining(max_expected_time_per_block, latest_height)?, + )) + } +} + +/// A struct that holds everything that is required to calculate and deal with the connection-delay +/// feature. +#[derive(Clone)] +struct ConnectionDelay { + delay: Duration, + scheduled_time: Instant, + update_height: Option, +} + +impl ConnectionDelay { + fn new(delay: Duration) -> Self { + Self { + delay, + scheduled_time: Instant::now(), + update_height: None, + } + } + + /// Returns `remaining-delay` if connection-delay hasn't elapsed and `Duration::ZERO` otherwise. + fn conn_time_delay_remaining(&self, chain_time: Instant) -> Duration { + // since chain time instant is relative to relayer's current time, it is possible that + // `scheduled_time` is later (by nano secs) than `chain_time`, hence the call to + // `saturating_duration_since()`. + let elapsed = chain_time.saturating_duration_since(self.scheduled_time); + if elapsed >= self.delay { + Duration::ZERO + } else { + self.delay - elapsed + } + } + + /// Returns `remaining-delay` if connection-delay hasn't elapsed and `0` otherwise. + fn conn_block_delay_remaining(&self, block_delay: u64, latest_height: Height) -> u64 { + let acceptable_height = self + .update_height + .expect("processed height not set") + .add(block_delay); + if latest_height >= acceptable_height { + 0 + } else { + debug_assert!(acceptable_height.revision_number == latest_height.revision_number); + acceptable_height.revision_height - latest_height.revision_height + } + } + + /// Calculates and returns the block-delay based on the `max_expected_time_per_block` + fn conn_block_delay(&self, max_expected_time_per_block: Duration) -> u64 { + calculate_block_delay(self.delay, max_expected_time_per_block) + } } /// A lightweight informational data structure that can be extracted diff --git a/relayer/src/link/relay_path.rs b/relayer/src/link/relay_path.rs index 522382942c..dff7dd8a6f 100644 --- a/relayer/src/link/relay_path.rs +++ b/relayer/src/link/relay_path.rs @@ -1,7 +1,7 @@ use alloc::collections::BTreeMap as HashMap; use alloc::collections::VecDeque; -use std::thread; -use std::time::Instant; +use std::ops::Sub; +use std::time::{Duration, Instant}; use ibc_proto::google::protobuf::Any; use itertools::Itertools; @@ -9,6 +9,11 @@ use tracing::{debug, error, info, span, trace, warn, Level}; use ibc::{ core::{ + ics02_client::{ + client_consensus::QueryClientEventRequest, + events::ClientMisbehaviour as ClientMisbehaviourEvent, + events::UpdateClient as UpdateClientEvent, + }, ics04_channel::{ channel::{ChannelEnd, Order, QueryPacketEventDataRequest, State as ChannelState}, events::{SendPacket, WriteAcknowledgement}, @@ -24,7 +29,7 @@ use ibc::{ events::{IbcEvent, PrettyEvents, WithBlockDataType}, query::{QueryBlockRequest, QueryTxRequest}, signer::Signer, - timestamp::ZERO_DURATION, + timestamp::Timestamp, tx_msg::Msg, Height, }; @@ -70,8 +75,8 @@ pub struct RelayPath { // mostly timeout packet messages. // The operational data targeting the destination chain // comprises mostly RecvPacket and Ack msgs. - src_operational_data: Queue, - dst_operational_data: Queue, + pub(crate) src_operational_data: Queue, + pub(crate) dst_operational_data: Queue, // Toggle for the transaction confirmation mechanism. confirm_txes: bool, @@ -177,7 +182,7 @@ impl RelayPath { fn dst_channel(&self, height: Height) -> Result { self.dst_chain() .query_channel(self.dst_port_id(), self.dst_channel_id(), height) - .map_err(|e| LinkError::channel(ChannelError::query(self.src_chain().id(), e))) + .map_err(|e| LinkError::channel(ChannelError::query(self.dst_chain().id(), e))) } fn src_signer(&self) -> Result { @@ -192,12 +197,52 @@ impl RelayPath { .map_err(|e| LinkError::signer(self.dst_chain().id(), e)) } - pub fn dst_latest_height(&self) -> Result { + pub(crate) fn src_latest_height(&self) -> Result { + self.src_chain() + .query_latest_height() + .map_err(|e| LinkError::query(self.src_chain().id(), e)) + } + + pub(crate) fn dst_latest_height(&self) -> Result { self.dst_chain() .query_latest_height() .map_err(|e| LinkError::query(self.dst_chain().id(), e)) } + fn src_time_at_height(&self, height: Height) -> Result { + Self::chain_time_at_height(self.src_chain(), height) + } + + fn dst_time_at_height(&self, height: Height) -> Result { + Self::chain_time_at_height(self.dst_chain(), height) + } + + pub(crate) fn src_time_latest(&self) -> Result { + self.src_time_at_height(Height::zero()) + } + + pub(crate) fn dst_time_latest(&self) -> Result { + self.dst_time_at_height(Height::zero()) + } + + pub(crate) fn src_max_block_time(&self) -> Result { + // TODO(hu55a1n1): Ideally, we should get the `max_expected_time_per_block` using the + // `/genesis` endpoint once it is working in tendermint-rs. + Ok(self + .src_chain() + .config() + .map_err(LinkError::relayer)? + .max_block_time) + } + + pub(crate) fn dst_max_block_time(&self) -> Result { + Ok(self + .dst_chain() + .config() + .map_err(LinkError::relayer)? + .max_block_time) + } + fn unordered_channel(&self) -> bool { self.channel.ordering == Order::Unordered } @@ -372,12 +417,14 @@ impl RelayPath { dst_latest_height, OperationalDataTarget::Source, events.tracking_id(), + self.channel.connection_delay, ); // Operational data targeting the destination chain (e.g., SendPacket messages) let mut dst_od = OperationalData::new( src_height, OperationalDataTarget::Destination, events.tracking_id(), + self.channel.connection_delay, ); for event in input { @@ -479,12 +526,7 @@ impl RelayPath { let mut odata = initial_od; for i in 0..MAX_RETRIES { - debug!( - "delayed by: {:?} [try {}/{}]", - odata.scheduled_time.elapsed(), - i + 1, - MAX_RETRIES - ); + debug!("[try {}/{}]", i + 1, MAX_RETRIES); // Consume the operational data by attempting to send its messages match self.send_from_operational_data::(odata.clone()) { @@ -693,103 +735,207 @@ impl RelayPath { self.recv_packet_acknowledged_on_src(&rp.packet) } - /// Returns `true` if the delay for this relaying path is zero. - /// Conversely, returns `false` if the delay is non-zero. - pub fn zero_delay(&self) -> bool { - self.channel.connection_delay == ZERO_DURATION + /// Returns the `processed_height` for the consensus state at specified height + fn update_height( + chain: &impl ChainHandle, + client_id: ClientId, + consensus_height: Height, + ) -> Result { + let events = chain + .query_txs(QueryTxRequest::Client(QueryClientEventRequest { + height: Height::zero(), + event_id: WithBlockDataType::UpdateClient, + client_id, + consensus_height, + })) + .map_err(|e| LinkError::query(chain.id(), e))?; + + // The handler may treat redundant updates as no-ops and emit `UpdateClient` events for them + // but the `processed_height` is the height at which the first `UpdateClient` event for this + // consensus state/height was emitted. We expect that these events are received in the exact + // same order in which they were emitted. + match events.first() { + Some(IbcEvent::UpdateClient(event)) => Ok(event.height()), + Some(event) => Err(LinkError::unexpected_event(event.clone())), + None => Err(LinkError::unexpected_event(IbcEvent::default())), + } + } + + /// Loops over `tx_events` and returns a tuple of optional events where the first element is a + /// `ChainError` variant, the second one is an `UpdateClient` variant and the third one is a + /// `ClientMisbehaviour` variant. This function is essentially just an `Iterator::find()` for + /// multiple variants with a single pass. + #[inline] + fn event_per_type( + mut tx_events: Vec, + ) -> ( + Option, + Option, + Option, + ) { + let mut error = None; + let mut update = None; + let mut misbehaviour = None; + + while let Some(event) = tx_events.pop() { + match event { + IbcEvent::ChainError(_) => error = Some(event), + IbcEvent::UpdateClient(event) => update = Some(event), + IbcEvent::ClientMisbehaviour(event) => misbehaviour = Some(event), + _ => {} + } + } + + (error, update, misbehaviour) + } + + /// Returns an instant (in the past) that corresponds to the block timestamp of the chain at + /// specified height (relative to the relayer's current time). If the timestamp is in the future + /// wrt the relayer's current time, we simply return the current relayer time. + fn chain_time_at_height( + chain: &impl ChainHandle, + height: Height, + ) -> Result { + let chain_time = chain + .query_host_consensus_state(height) + .map_err(LinkError::relayer)? + .timestamp(); + let duration = Timestamp::now() + .duration_since(&chain_time) + .unwrap_or_default(); + Ok(Instant::now().sub(duration)) } /// Handles updating the client on the destination chain + /// Returns the height at which the client update was processed fn update_client_dst( &self, src_chain_height: Height, tracking_id: &str, - ) -> Result<(), LinkError> { - // Handle the update on the destination chain - // Check if a consensus state at update_height exists on destination chain already - if self - .dst_chain() - .proven_client_consensus(self.dst_client_id(), src_chain_height, Height::zero()) - .is_ok() - { - return Ok(()); - } - - let mut dst_err_ev = None; - for i in 0..MAX_RETRIES { - let dst_update = self.build_update_client_on_dst(src_chain_height)?; - info!( - "sending updateClient to client hosted on destination chain for height {} [try {}/{}]", - src_chain_height, - i + 1, MAX_RETRIES, - ); - - let tm = TrackedMsgs::new(dst_update, tracking_id); - - let dst_tx_events = self - .dst_chain() - .send_messages_and_wait_commit(tm) - .map_err(LinkError::relayer)?; - info!("result: {}", PrettyEvents(&dst_tx_events)); + ) -> Result { + self.do_update_client_dst(src_chain_height, tracking_id, MAX_RETRIES) + } - dst_err_ev = dst_tx_events - .into_iter() - .find(|event| matches!(event, IbcEvent::ChainError(_))); + /// Perform actual update_client_dst with retries. + /// + /// Note that the retry is only performed in the case when there + /// is a ChainError event. It would return error immediately if + /// there are other errors returned from calls such as + /// build_update_client_on_dst. + fn do_update_client_dst( + &self, + src_chain_height: Height, + tracking_id: &str, + retries_left: usize, + ) -> Result { + info!( "sending update_client to client hosted on source chain for height {} (retries left: {})", src_chain_height, retries_left ); - if dst_err_ev.is_none() { - return Ok(()); + let dst_update = self.build_update_client_on_dst(src_chain_height)?; + let tm = TrackedMsgs::new(dst_update, tracking_id); + let dst_tx_events = self + .dst_chain() + .send_messages_and_wait_commit(tm) + .map_err(LinkError::relayer)?; + info!("result: {}", PrettyEvents(&dst_tx_events)); + + let (error, update, misbehaviour) = Self::event_per_type(dst_tx_events); + match (error, update, misbehaviour) { + // All updates were successful, no errors and no misbehaviour. + (None, Some(update_event), None) => Ok(update_event.height()), + (Some(chain_error), _, _) => { + // Atleast one chain-error so retry if possible. + if retries_left == 0 { + Err(LinkError::client(ForeignClientError::chain_error_event( + self.dst_chain().id(), + chain_error, + ))) + } else { + self.do_update_client_dst(src_chain_height, tracking_id, retries_left - 1) + } } + (None, None, None) => { + // `tm` was empty and update wasn't required + match Self::update_height( + self.dst_chain(), + self.dst_client_id().clone(), + src_chain_height, + ) { + Ok(update_height) => Ok(update_height), + Err(_) if retries_left > 0 => { + self.do_update_client_dst(src_chain_height, tracking_id, retries_left - 1) + } + _ => Err(LinkError::update_client_failed()), + } + } + // Atleast one misbehaviour event, so don't retry. + (_, _, Some(_misbehaviour)) => Err(LinkError::update_client_failed()), } - - Err(LinkError::client(ForeignClientError::chain_error_event( - self.dst_chain().id(), - dst_err_ev.unwrap(), - ))) } /// Handles updating the client on the source chain + /// Returns the height at which the client update was processed fn update_client_src( &self, dst_chain_height: Height, tracking_id: &str, - ) -> Result<(), LinkError> { - if self - .src_chain() - .proven_client_consensus(self.src_client_id(), dst_chain_height, Height::zero()) - .is_ok() - { - return Ok(()); - } - - let mut src_err_ev = None; - for _ in 0..MAX_RETRIES { - let src_update = self.build_update_client_on_src(dst_chain_height)?; - info!( - "sending updateClient to client hosted on source chain for height {}", - dst_chain_height, - ); - - let tm = TrackedMsgs::new(src_update, tracking_id); - - let src_tx_events = self - .src_chain() - .send_messages_and_wait_commit(tm) - .map_err(LinkError::relayer)?; - info!("result: {}", PrettyEvents(&src_tx_events)); + ) -> Result { + self.do_update_client_src(dst_chain_height, tracking_id, MAX_RETRIES) + } - src_err_ev = src_tx_events - .into_iter() - .find(|event| matches!(event, IbcEvent::ChainError(_))); + /// Perform actual update_client_src with retries. + /// + /// Note that the retry is only performed in the case when there + /// is a ChainError event. It would return error immediately if + /// there are other errors returned from calls such as + /// build_update_client_on_src. + fn do_update_client_src( + &self, + dst_chain_height: Height, + tracking_id: &str, + retries_left: usize, + ) -> Result { + info!( "sending update_client to client hosted on source chain for height {} (retries left: {})", dst_chain_height, retries_left ); - if src_err_ev.is_none() { - return Ok(()); + let src_update = self.build_update_client_on_src(dst_chain_height)?; + let tm = TrackedMsgs::new(src_update, tracking_id); + let src_tx_events = self + .src_chain() + .send_messages_and_wait_commit(tm) + .map_err(LinkError::relayer)?; + info!("result: {}", PrettyEvents(&src_tx_events)); + + let (error, update, misbehaviour) = Self::event_per_type(src_tx_events); + match (error, update, misbehaviour) { + // All updates were successful, no errors and no misbehaviour. + (None, Some(update_event), None) => Ok(update_event.height()), + (Some(chain_error), _, _) => { + // Atleast one chain-error so retry if possible. + if retries_left == 0 { + Err(LinkError::client(ForeignClientError::chain_error_event( + self.src_chain().id(), + chain_error, + ))) + } else { + self.do_update_client_src(dst_chain_height, tracking_id, retries_left - 1) + } } + (None, None, None) => { + // `tm` was empty and update wasn't required + match Self::update_height( + self.src_chain(), + self.src_client_id().clone(), + dst_chain_height, + ) { + Ok(update_height) => Ok(update_height), + Err(_) if retries_left > 0 => { + self.do_update_client_src(dst_chain_height, tracking_id, retries_left - 1) + } + _ => Err(LinkError::update_client_failed()), + } + } + // Atleast one misbehaviour event, so don't retry. + (_, _, Some(_misbehaviour)) => Err(LinkError::update_client_failed()), } - - Err(LinkError::client(ForeignClientError::chain_error_event( - self.src_chain().id(), - src_err_ev.unwrap(), - ))) } /// Returns relevant packet events for building RecvPacket and timeout messages. @@ -1209,7 +1355,7 @@ impl RelayPath { /// Retains the operational data as pending, and associates it /// with one or more transaction hash(es). pub fn execute_schedule(&self) -> Result<(), LinkError> { - let (src_ods, dst_ods) = self.try_fetch_scheduled_operational_data(); + let (src_ods, dst_ods) = self.try_fetch_scheduled_operational_data()?; for od in dst_ods { let reply = @@ -1320,6 +1466,7 @@ impl RelayPath { dst_current_height, OperationalDataTarget::Source, &odata.tracking_id, + self.channel.connection_delay, ) }) .push(TransitMessage { @@ -1387,22 +1534,32 @@ impl RelayPath { } // Update clients ahead of scheduling the operational data, if the delays are non-zero. - if !self.zero_delay() { - debug!("connection delay is non-zero: updating client"); + // If the connection-delay must be taken into account, set the `scheduled_time` to an + // instant in the past, i.e. when this client update was first processed (`processed_time`) + let scheduled_time = if od.conn_delay_needed() { + debug!("connection delay must be taken into account: updating client"); let target_height = od.proofs_height.increment(); match od.target { OperationalDataTarget::Source => { - self.update_client_src(target_height, &od.tracking_id)? + let update_height = self.update_client_src(target_height, &od.tracking_id)?; + od.set_update_height(update_height); + self.src_time_at_height(update_height)? } OperationalDataTarget::Destination => { - self.update_client_dst(target_height, &od.tracking_id)? + let update_height = self.update_client_dst(target_height, &od.tracking_id)?; + od.set_update_height(update_height); + self.dst_time_at_height(update_height)? } - }; + } } else { - debug!("connection delay is zero: client update message will be prepended later"); - } + debug!( + "connection delay need not be taken into account: client update message will be \ + prepended later" + ); + Instant::now() + }; - od.scheduled_time = Instant::now(); + od.set_scheduled_time(scheduled_time); match od.target { OperationalDataTarget::Source => self.src_operational_data.push_back(od), @@ -1413,88 +1570,51 @@ impl RelayPath { } /// Pulls out the operational elements with elapsed delay period and that can - /// now be processed. Does not block: if no OD fulfilled the delay period (or none is - /// scheduled), returns immediately with `vec![]`. + /// now be processed. fn try_fetch_scheduled_operational_data( &self, - ) -> (VecDeque, VecDeque) { + ) -> Result<(VecDeque, VecDeque), LinkError> { // Extracts elements from a Vec when the predicate returns true. // The mutable vector is then updated to the remaining unextracted elements. fn partition( queue: VecDeque, - pred: impl Fn(&T) -> bool, - ) -> (VecDeque, VecDeque) { + pred: impl Fn(&T) -> Result, + ) -> Result<(VecDeque, VecDeque), LinkError> { let mut true_res = VecDeque::new(); let mut false_res = VecDeque::new(); for e in queue.into_iter() { - if pred(&e) { + if pred(&e)? { true_res.push_back(e); } else { false_res.push_back(e); } } - (true_res, false_res) + Ok((true_res, false_res)) } - let connection_delay = self.channel.connection_delay; let (elapsed_src_ods, unelapsed_src_ods) = partition(self.src_operational_data.take(), |op| { - op.scheduled_time.elapsed() > connection_delay - }); - - self.src_operational_data.replace(unelapsed_src_ods); + op.has_conn_delay_elapsed( + &|| self.src_time_latest(), + &|| self.src_max_block_time(), + &|| self.src_latest_height(), + ) + })?; let (elapsed_dst_ods, unelapsed_dst_ods) = partition(self.dst_operational_data.take(), |op| { - op.scheduled_time.elapsed() > connection_delay - }); + op.has_conn_delay_elapsed( + &|| self.dst_time_latest(), + &|| self.dst_max_block_time(), + &|| self.dst_latest_height(), + ) + })?; + self.src_operational_data.replace(unelapsed_src_ods); self.dst_operational_data.replace(unelapsed_dst_ods); - - (elapsed_src_ods, elapsed_dst_ods) - } - - /// Fetches an operational data that has fulfilled its predefined delay period. May _block_ - /// waiting for the delay period to pass. - /// Returns `None` if there is no operational data scheduled. - pub(crate) fn fetch_scheduled_operational_data(&self) -> Option { - let odata = self - .src_operational_data - .pop_front() - .or_else(|| self.dst_operational_data.pop_front()); - - if let Some(odata) = odata { - // Check if the delay period did not completely elapse - let delay_left = self - .channel - .connection_delay - .checked_sub(odata.scheduled_time.elapsed()); - - match delay_left { - None => info!( - "ready to fetch a scheduled op. data with batch of size {} targeting {}", - odata.batch.len(), - odata.target, - ), - Some(delay_left) => { - info!( - "waiting ({:?} left) for a scheduled op. data with batch of size {} targeting {}", - delay_left, - odata.batch.len(), - odata.target, - ); - - // Wait until the delay period passes - thread::sleep(delay_left); - } - } - - Some(odata) - } else { - None - } + Ok((elapsed_src_ods, elapsed_dst_ods)) } fn restore_src_client(&self) -> ForeignClient { diff --git a/tools/test-framework/src/relayer/chain.rs b/tools/test-framework/src/relayer/chain.rs index 964b95a1d7..94ee98549e 100644 --- a/tools/test-framework/src/relayer/chain.rs +++ b/tools/test-framework/src/relayer/chain.rs @@ -387,4 +387,8 @@ where ) -> Result<(Vec, Vec), Error> { self.value().query_blocks(request) } + + fn query_host_consensus_state(&self, height: Height) -> Result { + self.value().query_host_consensus_state(height) + } }